From 99127740583b310007d7f70d371e73977fbd12c3 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Fri, 24 Jan 2025 10:30:30 +0800 Subject: [PATCH] [FLINK-37208][Runtime] Properly notify a new key is selected for async state operators (#26063) --- .../operators/BaseKeyedProcessOperator.java | 7 +--- ...edTwoInputNonBroadcastProcessOperator.java | 14 +------ .../BaseKeyedTwoOutputProcessOperator.java | 7 +--- ...KeyedTwoInputBroadcastProcessOperator.java | 8 +--- .../impl/context/DefaultStateManagerTest.java | 30 +++++++-------- .../operators/KeyedProcessOperatorTest.java | 14 +++---- ...dTwoInputBroadcastProcessOperatorTest.java | 16 ++++---- ...oInputNonBroadcastProcessOperatorTest.java | 22 +++++++---- .../KeyedTwoOutputProcessOperatorTest.java | 21 ++++++---- .../api/input/KeyedStateInputFormatTest.java | 27 ++++++++----- .../AbstractAsyncStateStreamOperator.java | 36 +++++++++++++----- .../AbstractAsyncStateStreamOperatorV2.java | 38 ++++++++++++++++--- .../asyncprocessing/AsyncStateProcessing.java | 5 +++ .../AsyncIntervalJoinOperatorTest.java | 12 +++--- ...eyedTwoInputStreamOperatorTestHarness.java | 10 +++++ .../stream/StreamingJoinOperatorTestBase.java | 13 ++++--- 16 files changed, 172 insertions(+), 108 deletions(-) diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedProcessOperator.java index d47c3a0d23184..2028b5ee96965 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedProcessOperator.java @@ -26,7 +26,6 @@ import org.apache.flink.datastream.impl.common.OutputCollector; import org.apache.flink.datastream.impl.common.TimestampCollector; import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import javax.annotation.Nullable; @@ -83,10 +82,8 @@ protected NonPartitionedContext getNonPartitionedContext() { } @Override - @SuppressWarnings({"rawtypes"}) - public void setKeyContextElement1(StreamRecord record) throws Exception { - super.setKeyContextElement1(record); - keySet.add(getCurrentKey()); + public void newKeySelected(Object newKey) { + keySet.add(newKey); } @Override diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedTwoInputNonBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedTwoInputNonBroadcastProcessOperator.java index 0861189a2abee..8d1f821beb9d0 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedTwoInputNonBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedTwoInputNonBroadcastProcessOperator.java @@ -26,7 +26,6 @@ import org.apache.flink.datastream.impl.common.OutputCollector; import org.apache.flink.datastream.impl.common.TimestampCollector; import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import javax.annotation.Nullable; @@ -89,17 +88,8 @@ protected NonPartitionedContext getNonPartitionedContext() { } @Override - @SuppressWarnings({"rawtypes"}) - public void setKeyContextElement1(StreamRecord record) throws Exception { - super.setKeyContextElement1(record); - keySet.add(getCurrentKey()); - } - - @Override - @SuppressWarnings({"rawtypes"}) - public void setKeyContextElement2(StreamRecord record) throws Exception { - super.setKeyContextElement2(record); - keySet.add(getCurrentKey()); + public void newKeySelected(Object newKey) { + keySet.add(newKey); } @Override diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedTwoOutputProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedTwoOutputProcessOperator.java index cc3ba7b636379..6648388cb9bb0 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedTwoOutputProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/BaseKeyedTwoOutputProcessOperator.java @@ -26,7 +26,6 @@ import org.apache.flink.datastream.impl.common.OutputCollector; import org.apache.flink.datastream.impl.common.TimestampCollector; import org.apache.flink.datastream.impl.context.DefaultTwoOutputNonPartitionedContext; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; @@ -105,10 +104,8 @@ protected TwoOutputNonPartitionedContext getNonPartitionedCo } @Override - @SuppressWarnings({"rawtypes"}) - public void setKeyContextElement1(StreamRecord record) throws Exception { - super.setKeyContextElement1(record); - keySet.add(getCurrentKey()); + public void newKeySelected(Object newKey) { + keySet.add(newKey); } @Override diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java index bb985643e76df..86b334c1f1e19 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java @@ -34,7 +34,6 @@ import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.Triggerable; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import javax.annotation.Nullable; @@ -117,11 +116,8 @@ protected NonPartitionedContext getNonPartitionedContext() { } @Override - @SuppressWarnings({"rawtypes"}) - // Only element from input1 should be considered as the other side is broadcast input. - public void setKeyContextElement1(StreamRecord record) throws Exception { - super.setKeyContextElement1(record); - keySet.add(getCurrentKey()); + public void newKeySelected(Object newKey) { + keySet.add(newKey); } @Override diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultStateManagerTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultStateManagerTest.java index 5366fca796281..2d60e31dd9d7d 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultStateManagerTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultStateManagerTest.java @@ -30,8 +30,8 @@ import org.apache.flink.datastream.impl.operators.MockSumAggregateProcessFunction; import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.MockStreamingRuntimeContext; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness; import org.junit.jupiter.api.Test; @@ -98,8 +98,8 @@ void testListState() throws Exception { new KeyedProcessOperator<>( function, (KeySelector) value -> value); - try (KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( + try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( processOperator, (KeySelector) value -> value, Types.INT)) { @@ -120,8 +120,8 @@ void testAggState() throws Exception { KeyedProcessOperator processOperator = new KeyedProcessOperator<>(function); - try (KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( + try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( processOperator, (KeySelector) value -> value, Types.INT)) { @@ -148,8 +148,8 @@ void testValueState() throws Exception { KeyedProcessOperator processOperator = new KeyedProcessOperator<>(function); - try (KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( + try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( processOperator, (KeySelector) value -> value, Types.INT)) { @@ -175,8 +175,8 @@ void testMapState() throws Exception { KeyedProcessOperator processOperator = new KeyedProcessOperator<>(function); - try (KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( + try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( processOperator, (KeySelector) value -> value, Types.INT)) { @@ -203,8 +203,8 @@ void testReducingState() throws Exception { KeyedProcessOperator processOperator = new KeyedProcessOperator<>(function); - try (KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( + try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( processOperator, (KeySelector) value -> value, Types.INT)) { @@ -231,8 +231,8 @@ void testBroadcastMapState() throws Exception { KeyedProcessOperator processOperator = new KeyedProcessOperator<>(function); - try (KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( + try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( processOperator, (KeySelector) value -> value, Types.INT)) { @@ -260,8 +260,8 @@ void testBroadcastListState() throws Exception { new KeyedProcessOperator<>( function, (KeySelector) value -> value); - try (KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( + try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( processOperator, (KeySelector) value -> value, Types.INT)) { diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperatorTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperatorTest.java index f3cd604c445d3..49c4d5f1b7e2c 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperatorTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperatorTest.java @@ -25,7 +25,7 @@ import org.apache.flink.datastream.api.context.PartitionedContext; import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness; import org.junit.jupiter.api.Test; @@ -51,8 +51,8 @@ public void processRecord( } }); - try (KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( + try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( processOperator, (KeySelector) value -> value, Types.INT)) { @@ -98,8 +98,8 @@ public void endInput(NonPartitionedContext ctx) { } }); - try (KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( + try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( processOperator, (KeySelector) value -> value, Types.INT)) { @@ -133,8 +133,8 @@ public void processRecord( // -1 is an invalid key in this suite. (ignore) -> -1); - try (KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( + try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( processOperator, (KeySelector) value -> value, Types.INT)) { diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperatorTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperatorTest.java index c9e4b65276c5a..9684d2decaaaa 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperatorTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperatorTest.java @@ -25,7 +25,7 @@ import org.apache.flink.datastream.api.context.PartitionedContext; import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness; import org.junit.jupiter.api.Test; @@ -61,8 +61,8 @@ public void processRecordFromBroadcastInput( } }); - try (KeyedTwoInputStreamOperatorTestHarness testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( + try (AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( processOperator, (KeySelector) (data) -> (long) (data + 1), (KeySelector) value -> value + 1, @@ -130,11 +130,11 @@ public void endBroadcastInput(NonPartitionedContext ctx) { } }); - try (KeyedTwoInputStreamOperatorTestHarness testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( + try (AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( processOperator, (KeySelector) Long::valueOf, - (KeySelector) value -> value, + null, Types.LONG)) { testHarness.open(); testHarness.processElement1(new StreamRecord<>(1)); // key is 1L @@ -175,8 +175,8 @@ public void processRecordFromBroadcastInput( // -1 is an invalid key in this suite. (out) -> -1L); - try (KeyedTwoInputStreamOperatorTestHarness testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( + try (AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( processOperator, (KeySelector) Long::valueOf, (KeySelector) value -> value, diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperatorTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperatorTest.java index 4ff5501768f7b..96107c7b6f12d 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperatorTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperatorTest.java @@ -25,7 +25,7 @@ import org.apache.flink.datastream.api.context.PartitionedContext; import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness; import org.junit.jupiter.api.Test; @@ -59,8 +59,8 @@ public void processRecordFromSecondInput( } }); - try (KeyedTwoInputStreamOperatorTestHarness testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( + try (AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( processOperator, (KeySelector) (data) -> (long) (data + 1), (KeySelector) value -> value + 1, @@ -134,8 +134,8 @@ public void endSecondInput(NonPartitionedContext ctx) { } }); - try (KeyedTwoInputStreamOperatorTestHarness testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( + try (AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( processOperator, (KeySelector) Long::valueOf, (KeySelector) value -> value, @@ -183,8 +183,8 @@ public void processRecordFromSecondInput( // -1 is an invalid key in this suite. (out) -> -1L); - try (KeyedTwoInputStreamOperatorTestHarness testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( + try (AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( processOperator, (KeySelector) Long::valueOf, (KeySelector) value -> value, @@ -192,6 +192,14 @@ public void processRecordFromSecondInput( testHarness.open(); assertThatThrownBy(() -> testHarness.processElement1(new StreamRecord<>(1))) .isInstanceOf(IllegalStateException.class); + } + try (AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = + AsyncKeyedTwoInputStreamOperatorTestHarness.create( + processOperator, + (KeySelector) Long::valueOf, + (KeySelector) value -> value, + Types.LONG)) { + testHarness.open(); assertThatThrownBy(() -> testHarness.processElement2(new StreamRecord<>(1L))) .isInstanceOf(IllegalStateException.class); } diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperatorTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperatorTest.java index 7d0680325b921..f494f7ac633dc 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperatorTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperatorTest.java @@ -25,7 +25,7 @@ import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext; import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness; import org.apache.flink.util.OutputTag; import org.junit.jupiter.api.Test; @@ -59,8 +59,8 @@ public void processRecord( }, sideOutputTag); - try (KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( + try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( processOperator, (KeySelector) value -> value, Types.INT)) { @@ -116,8 +116,8 @@ public void endInput( }, sideOutputTag); - try (KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( + try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( processOperator, (KeySelector) value -> value, Types.INT)) { @@ -161,14 +161,21 @@ public void processRecord( // -1 is an invalid key in this suite. (KeySelector) value -> -1); - try (KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( + try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( processOperator, (KeySelector) value -> value, Types.INT)) { testHarness.open(); assertThatThrownBy(() -> testHarness.processElement(new StreamRecord<>(1))) .isInstanceOf(IllegalStateException.class); + } + try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = + AsyncKeyedOneInputStreamOperatorTestHarness.create( + processOperator, + (KeySelector) value -> value, + Types.INT)) { + testHarness.open(); emitToFirstOutput.set(false); assertThatThrownBy(() -> testHarness.processElement(new StreamRecord<>(1))) .isInstanceOf(IllegalStateException.class); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java index c2cb6aa597e8f..fa653e71f5f14 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java @@ -43,6 +43,7 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.MockStreamingRuntimeContext; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; import org.junit.Assert; @@ -74,7 +75,8 @@ class KeyedStateInputFormatTest { void testCreatePartitionedInputSplits(boolean asyncState) throws Exception { OperatorID operatorID = OperatorIDGenerator.fromUid("uid"); - OperatorSubtaskState state = createOperatorSubtaskState(createFlatMap(asyncState)); + OperatorSubtaskState state = + createOperatorSubtaskState(createFlatMap(asyncState), asyncState); OperatorState operatorState = new OperatorState(null, null, operatorID, 1, 128); operatorState.putState(0, state); @@ -95,7 +97,8 @@ void testCreatePartitionedInputSplits(boolean asyncState) throws Exception { void testMaxParallelismRespected(boolean asyncState) throws Exception { OperatorID operatorID = OperatorIDGenerator.fromUid("uid"); - OperatorSubtaskState state = createOperatorSubtaskState(createFlatMap(asyncState)); + OperatorSubtaskState state = + createOperatorSubtaskState(createFlatMap(asyncState), asyncState); OperatorState operatorState = new OperatorState(null, null, operatorID, 1, 128); operatorState.putState(0, state); @@ -118,7 +121,8 @@ void testMaxParallelismRespected(boolean asyncState) throws Exception { void testReadState(boolean asyncState) throws Exception { OperatorID operatorID = OperatorIDGenerator.fromUid("uid"); - OperatorSubtaskState state = createOperatorSubtaskState(createFlatMap(asyncState)); + OperatorSubtaskState state = + createOperatorSubtaskState(createFlatMap(asyncState), asyncState); OperatorState operatorState = new OperatorState(null, null, operatorID, 1, 128); operatorState.putState(0, state); @@ -143,7 +147,8 @@ void testReadState(boolean asyncState) throws Exception { void testReadMultipleOutputPerKey(boolean asyncState) throws Exception { OperatorID operatorID = OperatorIDGenerator.fromUid("uid"); - OperatorSubtaskState state = createOperatorSubtaskState(createFlatMap(asyncState)); + OperatorSubtaskState state = + createOperatorSubtaskState(createFlatMap(asyncState), asyncState); OperatorState operatorState = new OperatorState(null, null, operatorID, 1, 128); operatorState.putState(0, state); @@ -169,7 +174,8 @@ void testReadMultipleOutputPerKey(boolean asyncState) throws Exception { void testInvalidProcessReaderFunctionFails(boolean asyncState) throws Exception { OperatorID operatorID = OperatorIDGenerator.fromUid("uid"); - OperatorSubtaskState state = createOperatorSubtaskState(createFlatMap(asyncState)); + OperatorSubtaskState state = + createOperatorSubtaskState(createFlatMap(asyncState), asyncState); OperatorState operatorState = new OperatorState(null, null, operatorID, 1, 128); operatorState.putState(0, state); @@ -194,7 +200,7 @@ void testReadTime() throws Exception { OperatorSubtaskState state = createOperatorSubtaskState( - new KeyedProcessOperator<>(new StatefulFunctionWithTime())); + new KeyedProcessOperator<>(new StatefulFunctionWithTime()), false); OperatorState operatorState = new OperatorState(null, null, operatorID, 1, 128); operatorState.putState(0, state); @@ -252,10 +258,13 @@ private OneInputStreamOperator createFlatMap(boolean asyncState) } private OperatorSubtaskState createOperatorSubtaskState( - OneInputStreamOperator operator) throws Exception { + OneInputStreamOperator operator, boolean async) throws Exception { try (KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( - operator, id -> id, Types.INT, 128, 1, 0)) { + async + ? AsyncKeyedOneInputStreamOperatorTestHarness.create( + operator, id -> id, Types.INT, 128, 1, 0) + : new KeyedOneInputStreamOperatorTestHarness<>( + operator, id -> id, Types.INT, 128, 1, 0)) { testHarness.setup(VoidSerializer.INSTANCE); testHarness.open(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index 32f88ff775979..15191551fb3f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -166,6 +166,18 @@ public final void setAsyncKeyedContextElement( // in Fig.5, each state request itself is also protected by a paired reference count. currentProcessingContext.retain(); asyncExecutionController.setCurrentContext(currentProcessingContext); + newKeySelected(currentProcessingContext.getKey()); + } + + /** + * A hook that will be invoked after a new key is selected. It is not recommended to perform + * async state here. Only some synchronous logic is suggested. + * + * @param newKey the new key selected. + */ + public void newKeySelected(Object newKey) { + // By default, do nothing. + // Subclass could override this method to do some operations when a new key is selected. } @Override @@ -300,32 +312,36 @@ public InternalTimerService getInternalTimerService( } @Override - @SuppressWarnings("unchecked") public void setKeyContextElement1(StreamRecord record) throws Exception { + // This method is invoked only when isAsyncStateProcessingEnabled() is false. super.setKeyContextElement1(record); if (stateKeySelector1 != null) { - setAsyncKeyedContextElement(record, stateKeySelector1); + newKeySelected(getCurrentKey()); } } @Override - @SuppressWarnings("unchecked") public void setKeyContextElement2(StreamRecord record) throws Exception { + // This method is invoked only when isAsyncStateProcessingEnabled() is false. super.setKeyContextElement2(record); if (stateKeySelector2 != null) { - setAsyncKeyedContextElement(record, stateKeySelector2); + newKeySelected(getCurrentKey()); } } @Override public Object getCurrentKey() { - RecordContext currentContext = asyncExecutionController.getCurrentContext(); - if (currentContext == null) { - throw new UnsupportedOperationException( - "Have not set the current key yet, this may because the operator has not " - + "started to run, or you are invoking this under a non-keyed context."); + if (isAsyncStateProcessingEnabled()) { + RecordContext currentContext = asyncExecutionController.getCurrentContext(); + if (currentContext == null) { + throw new UnsupportedOperationException( + "Have not set the current key yet, this may because the operator has not " + + "started to run, or you are invoking this under a non-keyed context."); + } + return currentContext.getKey(); + } else { + return super.getCurrentKey(); } - return currentContext.getKey(); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index 955faa770b381..407dd1778177e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -163,17 +163,43 @@ public final void setAsyncKeyedContextElement( // in Fig.5, each state request itself is also protected by a paired reference count. currentProcessingContext.retain(); asyncExecutionController.setCurrentContext(currentProcessingContext); + newKeySelected(currentProcessingContext.getKey()); + } + + /** + * A hook that will be invoked after a new key is selected. It is not recommended to perform + * async state here. Only some synchronous logic is suggested. + * + * @param newKey the new key selected. + */ + public void newKeySelected(Object newKey) { + // By default, do nothing. + // Subclass could override this method to do some operations when a new key is selected. + } + + @Override + protected void internalSetKeyContextElement( + StreamRecord record, KeySelector selector) throws Exception { + // This method is invoked only when isAsyncStateProcessingEnabled() is false. + super.internalSetKeyContextElement(record, selector); + if (selector != null) { + newKeySelected(getCurrentKey()); + } } @Override public Object getCurrentKey() { - RecordContext currentContext = asyncExecutionController.getCurrentContext(); - if (currentContext == null) { - throw new UnsupportedOperationException( - "Have not set the current key yet, this may because the operator has not " - + "started to run, or you are invoking this under a non-keyed context."); + if (isAsyncStateProcessingEnabled()) { + RecordContext currentContext = asyncExecutionController.getCurrentContext(); + if (currentContext == null) { + throw new UnsupportedOperationException( + "Have not set the current key yet, this may because the operator has not " + + "started to run, or you are invoking this under a non-keyed context."); + } + return currentContext.getKey(); + } else { + return super.getCurrentKey(); } - return currentContext.getKey(); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessing.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessing.java index 563f3fd1ad83d..6e151b52ebcf5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessing.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessing.java @@ -56,6 +56,11 @@ static ThrowingConsumer, Exception> makeRecordProcessor( AsyncStateProcessingOperator asyncOperator, KeySelector keySelector, ThrowingConsumer, Exception> processor) { + if (keySelector == null) { + // A non-keyed input does not need to set the key context and perform async context + // switches. + return processor; + } switch (asyncOperator.getElementOrder()) { case RECORD_ORDER: return (record) -> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java index ab1999e24843f..80bb3450cc17a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java @@ -412,7 +412,7 @@ public void processElement( }); try (TestHarness testHarness = - TestHarness.create( + TestHarness.createOne( op, (elem) -> elem.key, (elem) -> elem.key, @@ -454,7 +454,7 @@ public void processElement( }); try (TestHarness testHarness = - TestHarness.create( + TestHarness.createOne( op, (elem) -> elem.key, (elem) -> elem.key, @@ -493,7 +493,7 @@ public void processElement( }); try (TestHarness testHarness = - TestHarness.create( + TestHarness.createOne( op, (elem) -> elem.key, (elem) -> elem.key, @@ -661,7 +661,7 @@ private TestHarness createTestHarness( TestElem.serializer(), new PassthroughFunction()); - return TestHarness.create( + return TestHarness.createOne( operator, (elem) -> elem.key, // key (elem) -> elem.key, // key @@ -690,7 +690,7 @@ private JoinTestBuilder setupHarness( new PassthroughFunction()); TestHarness t = - TestHarness.create( + TestHarness.createOne( operator, (elem) -> elem.key, // key (elem) -> elem.key, // key @@ -966,7 +966,7 @@ public TestHarness( super(executor, operator, keySelector1, keySelector2, keyType, 1, 1, 0); } - public static TestHarness create( + public static TestHarness createOne( TwoInputStreamOperator> operator, KeySelector keySelector1, KeySelector keySelector2, diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java index 17eacf983ddda..ae666e3f541d6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java @@ -88,6 +88,16 @@ OP create(FunctionWithException constructor) return future.get(); } + public static + AsyncKeyedTwoInputStreamOperatorTestHarness create( + TwoInputStreamOperator operator, + KeySelector keySelector1, + KeySelector keySelector2, + TypeInformation keyType) + throws Exception { + return create(operator, keySelector1, keySelector2, keyType, 1, 1, 0); + } + public static AsyncKeyedTwoInputStreamOperatorTestHarness create( TwoInputStreamOperator operator, diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java index 4ba36ab0d08f2..851d35939e439 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java @@ -19,7 +19,9 @@ package org.apache.flink.table.runtime.operators.join.stream; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator; import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; @@ -102,12 +104,13 @@ public abstract class StreamingJoinOperatorTestBase { @BeforeEach void beforeEach(TestInfo testInfo) throws Exception { + TwoInputStreamOperator operator = createJoinOperator(testInfo); testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( - createJoinOperator(testInfo), - leftKeySelector, - rightKeySelector, - joinKeyTypeInfo); + operator instanceof AsyncStateProcessingOperator + ? AsyncKeyedTwoInputStreamOperatorTestHarness.create( + operator, leftKeySelector, rightKeySelector, joinKeyTypeInfo) + : new KeyedTwoInputStreamOperatorTestHarness<>( + operator, leftKeySelector, rightKeySelector, joinKeyTypeInfo); testHarness.open(); // extend for mini-batch join test assertor =