diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java index 7582543e2f4..615876c4173 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java @@ -45,6 +45,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; @@ -148,15 +149,18 @@ protected DataStream flinkTransform( } return stream.transform( - String.format("%s-Transform", transform.getPluginName()), - TypeInformation.of(SeaTunnelRow.class), - new StreamMap<>( - flinkRuntimeEnvironment - .getStreamExecutionEnvironment() - .clean( - row -> - ((SeaTunnelMapTransform) transform) - .map(row)))); + String.format("%s-Transform", transform.getPluginName()), + TypeInformation.of(SeaTunnelRow.class), + new StreamMap<>( + flinkRuntimeEnvironment + .getStreamExecutionEnvironment() + .clean( + row -> + ((SeaTunnelMapTransform) + transform) + .map(row)))) + // null value shouldn't be passed to downstream + .filter(Objects::nonNull); } public static class ArrayFlatMap implements FlatMapFunction { diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java index c4104f734c9..d923d7137be 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java @@ -39,6 +39,10 @@ public void testFilterRowKind(TestContainer container) Container.ExecResult execResult3 = container.executeJob("/filter_row_kind_include_insert.conf"); Assertions.assertEquals(0, execResult3.getExitCode()); + + Container.ExecResult execResult4 = + container.executeJob("/filter_row_to_next_transform.json"); + Assertions.assertEquals(0, execResult4.getExitCode()); } @TestTemplate diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_to_next_transform.json b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_to_next_transform.json new file mode 100644 index 00000000000..72819644a90 --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/filter_row_to_next_transform.json @@ -0,0 +1,44 @@ +{ + "env": { + "jobMode": "batch", + "parallelism": 1 + }, + "source": [ + { + "plugin_name": "FakeSource", + "plugin_output": "fake", + "row.num": 5, + "schema": { + "fields": { + "name": "string", + "age": "int", + "card": "int" + } + } + } + ], + "transform": [ + { + "plugin_name": "FilterRowKind", + "plugin_input": "fake", + "plugin_output": "fake1", + "exclude_kinds": ["INSERT"] + }, + { + "plugin_name": "Copy", + "plugin_input": "fake1", + "plugin_output": "fake2", + "fields": { + "name1": "name", + "age1": "age", + "card1": "card" + } + } + ], + "sink": [ + { + "plugin_name": "Console", + "plugin_input": "fake2" + } + ] +}