Skip to content

Commit

Permalink
[Fix][Flink] Fix NPE when get null row from upstream transform (#8469)
Browse files Browse the repository at this point in the history
  • Loading branch information
litiliu authored Jan 8, 2025
1 parent 776ac94 commit 2827876
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -148,15 +149,18 @@ protected DataStream<SeaTunnelRow> flinkTransform(
}

return stream.transform(
String.format("%s-Transform", transform.getPluginName()),
TypeInformation.of(SeaTunnelRow.class),
new StreamMap<>(
flinkRuntimeEnvironment
.getStreamExecutionEnvironment()
.clean(
row ->
((SeaTunnelMapTransform<SeaTunnelRow>) transform)
.map(row))));
String.format("%s-Transform", transform.getPluginName()),
TypeInformation.of(SeaTunnelRow.class),
new StreamMap<>(
flinkRuntimeEnvironment
.getStreamExecutionEnvironment()
.clean(
row ->
((SeaTunnelMapTransform<SeaTunnelRow>)
transform)
.map(row))))
// null value shouldn't be passed to downstream
.filter(Objects::nonNull);
}

public static class ArrayFlatMap implements FlatMapFunction<SeaTunnelRow, SeaTunnelRow> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public void testFilterRowKind(TestContainer container)
Container.ExecResult execResult3 =
container.executeJob("/filter_row_kind_include_insert.conf");
Assertions.assertEquals(0, execResult3.getExitCode());

Container.ExecResult execResult4 =
container.executeJob("/filter_row_to_next_transform.json");
Assertions.assertEquals(0, execResult4.getExitCode());
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"env": {
"jobMode": "batch",
"parallelism": 1
},
"source": [
{
"plugin_name": "FakeSource",
"plugin_output": "fake",
"row.num": 5,
"schema": {
"fields": {
"name": "string",
"age": "int",
"card": "int"
}
}
}
],
"transform": [
{
"plugin_name": "FilterRowKind",
"plugin_input": "fake",
"plugin_output": "fake1",
"exclude_kinds": ["INSERT"]
},
{
"plugin_name": "Copy",
"plugin_input": "fake1",
"plugin_output": "fake2",
"fields": {
"name1": "name",
"age1": "age",
"card1": "card"
}
}
],
"sink": [
{
"plugin_name": "Console",
"plugin_input": "fake2"
}
]
}

0 comments on commit 2827876

Please sign in to comment.