diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java index 23f4fc455bb..c9d810e1fee 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java @@ -53,7 +53,7 @@ public class MultiTableSink MultiTableAggregatedCommitInfo>, SupportSchemaEvolutionSink { - @Getter private final Map sinks; + @Getter private final Map sinks; private final int replicaNum; public MultiTableSink(MultiTableFactoryContext context) { @@ -72,9 +72,10 @@ public SinkWriter createWri Map> writers = new HashMap<>(); Map sinkWritersContext = new HashMap<>(); for (int i = 0; i < replicaNum; i++) { - for (String tableIdentifier : sinks.keySet()) { - SeaTunnelSink sink = sinks.get(tableIdentifier); + for (TablePath tablePath : sinks.keySet()) { + SeaTunnelSink sink = sinks.get(tablePath); int index = context.getIndexOfSubtask() * replicaNum + i; + String tableIdentifier = tablePath.toString(); writers.put( SinkIdentifier.of(tableIdentifier, index), sink.createWriter(new SinkContextProxy(index, replicaNum, context))); @@ -91,10 +92,10 @@ public SinkWriter restoreWr Map sinkWritersContext = new HashMap<>(); for (int i = 0; i < replicaNum; i++) { - for (String tableIdentifier : sinks.keySet()) { - SeaTunnelSink sink = sinks.get(tableIdentifier); + for (TablePath tablePath : sinks.keySet()) { + SeaTunnelSink sink = sinks.get(tablePath); int index = context.getIndexOfSubtask() * replicaNum + i; - SinkIdentifier sinkIdentifier = SinkIdentifier.of(tableIdentifier, index); + SinkIdentifier sinkIdentifier = SinkIdentifier.of(tablePath.toString(), index); List state = states.stream() .map( @@ -113,7 +114,7 @@ public SinkWriter restoreWr sink.restoreWriter( new SinkContextProxy(index, replicaNum, context), state)); } - sinkWritersContext.put(SinkIdentifier.of(tableIdentifier, index), context); + sinkWritersContext.put(sinkIdentifier, context); } } return new MultiTableSinkWriter(writers, replicaNum, sinkWritersContext); @@ -127,12 +128,13 @@ public Optional> getWriterStateSerializer() { @Override public Optional> createCommitter() throws IOException { Map> committers = new HashMap<>(); - for (String tableIdentifier : sinks.keySet()) { - SeaTunnelSink sink = sinks.get(tableIdentifier); + for (TablePath tablePath : sinks.keySet()) { + SeaTunnelSink sink = sinks.get(tablePath); sink.createCommitter() .ifPresent( committer -> - committers.put(tableIdentifier, (SinkCommitter) committer)); + committers.put( + tablePath.toString(), (SinkCommitter) committer)); } if (committers.isEmpty()) { return Optional.empty(); @@ -149,12 +151,12 @@ public Optional> getCommitInfoSerializer() { public Optional> createAggregatedCommitter() throws IOException { Map> aggCommitters = new HashMap<>(); - for (String tableIdentifier : sinks.keySet()) { - SeaTunnelSink sink = sinks.get(tableIdentifier); + for (TablePath tablePath : sinks.keySet()) { + SeaTunnelSink sink = sinks.get(tablePath); Optional> sinkOptional = sink.createAggregatedCommitter(); sinkOptional.ifPresent( sinkAggregatedCommitter -> - aggCommitters.put(tableIdentifier, sinkAggregatedCommitter)); + aggCommitters.put(tablePath.toString(), sinkAggregatedCommitter)); } if (aggCommitters.isEmpty()) { return Optional.empty(); @@ -171,7 +173,7 @@ public List getSinkTables() { tablePaths.add( ((CatalogTable) values.get(i).getWriteCatalogTable().get()).getTablePath()); } else { - tablePaths.add(TablePath.of(sinks.keySet().toArray(new String[0])[i])); + tablePaths.add(sinks.keySet().toArray(new TablePath[0])[i]); } } return tablePaths; diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java index c94b88be7cc..e11afd1d19e 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java @@ -32,6 +32,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.transform.SeaTunnelTransform; @@ -178,7 +179,7 @@ SeaTunnelSink createAndPrepareSi public static SeaTunnelSink createMultiTableSink( - Map sinks, + Map sinks, ReadonlyConfig options, ClassLoader classLoader) { try { diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/MultiTableFactoryContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/MultiTableFactoryContext.java index 809b747eba2..6caeeae8edf 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/MultiTableFactoryContext.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/MultiTableFactoryContext.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.table.catalog.TablePath; import lombok.Getter; @@ -27,10 +28,10 @@ @Getter public class MultiTableFactoryContext extends TableSinkFactoryContext { - private final Map sinks; + private final Map sinks; public MultiTableFactoryContext( - ReadonlyConfig options, ClassLoader classLoader, Map sinks) { + ReadonlyConfig options, ClassLoader classLoader, Map sinks) { super(null, options, classLoader); this.sinks = sinks; } diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java index 0d5bee0d156..a8a245dd9bb 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java @@ -28,6 +28,7 @@ import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.FactoryException; @@ -184,7 +185,7 @@ public static SeaTunnelSink createSink( return sink; } else { if (catalogTables.size() > 1) { - Map sinks = new HashMap<>(); + Map sinks = new HashMap<>(); ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig); catalogTables.forEach( catalogTable -> { @@ -202,7 +203,7 @@ public static SeaTunnelSink createSink( .createSink(context) .createSink(); action.setJobContext(jobContext); - sinks.put(catalogTable.getTablePath().toString(), action); + sinks.put(catalogTable.getTablePath(), action); }); return FactoryUtil.createMultiTableSink(sinks, readonlyConfig, classLoader); } diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index b6de6d2cad2..6f24e1c3fef 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -30,6 +30,7 @@ import org.apache.seatunnel.api.sink.SupportSaveMode; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSinkFactory; @@ -106,7 +107,7 @@ public List execute(List upstreamDataS fromSourceTable(sinkConfig, upstreamDataStreams).orElse(input); Optional factory = plugins.get(i); boolean fallBack = !factory.isPresent() || isFallback(factory.get()); - Map sinks = new HashMap<>(); + Map sinks = new HashMap<>(); if (fallBack) { for (CatalogTable catalogTable : stream.getCatalogTables()) { SeaTunnelSink fallBackSink = @@ -122,8 +123,7 @@ public List execute(List upstreamDataS fallBackSink.setTypeInfo(sourceType); handleSaveMode(fallBackSink); TableIdentifier tableId = catalogTable.getTableId(); - String tableIdName = tableId.toTablePath().toString(); - sinks.put(tableIdName, fallBackSink); + sinks.put(tableId.toTablePath(), fallBackSink); } } else { for (CatalogTable catalogTable : stream.getCatalogTables()) { @@ -141,8 +141,7 @@ public List execute(List upstreamDataS seaTunnelSink.setJobContext(jobContext); handleSaveMode(seaTunnelSink); TableIdentifier tableId = catalogTable.getTableId(); - String tableIdName = tableId.toTablePath().toString(); - sinks.put(tableIdName, seaTunnelSink); + sinks.put(tableId.toTablePath(), seaTunnelSink); } } SeaTunnelSink sink = @@ -168,7 +167,9 @@ public List execute(List upstreamDataS // if not support multi table, rollback public SeaTunnelSink tryGenerateMultiTableSink( - Map sinks, ReadonlyConfig sinkConfig, ClassLoader classLoader) { + Map sinks, + ReadonlyConfig sinkConfig, + ClassLoader classLoader) { if (sinks.values().stream().anyMatch(sink -> !(sink instanceof SupportMultiTableSink))) { log.info("Unsupported multi table sink api, rollback to sink template"); // choose the first sink diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index c7d4e1f8800..d41bfe34ce1 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -30,6 +30,7 @@ import org.apache.seatunnel.api.sink.SupportSaveMode; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSinkFactory; @@ -107,7 +108,7 @@ public List execute(List upstreamDataS fromSourceTable(sinkConfig, upstreamDataStreams).orElse(input); Optional factory = plugins.get(i); boolean fallBack = !factory.isPresent() || isFallback(factory.get()); - Map sinks = new HashMap<>(); + Map sinks = new HashMap<>(); if (fallBack) { for (CatalogTable catalogTable : stream.getCatalogTables()) { SeaTunnelSink fallBackSink = @@ -123,8 +124,7 @@ public List execute(List upstreamDataS fallBackSink.setTypeInfo(sourceType); handleSaveMode(fallBackSink); TableIdentifier tableId = catalogTable.getTableId(); - String tableIdName = tableId.toTablePath().toString(); - sinks.put(tableIdName, fallBackSink); + sinks.put(tableId.toTablePath(), fallBackSink); } } else { for (CatalogTable catalogTable : stream.getCatalogTables()) { @@ -142,8 +142,7 @@ public List execute(List upstreamDataS seaTunnelSink.setJobContext(jobContext); handleSaveMode(seaTunnelSink); TableIdentifier tableId = catalogTable.getTableId(); - String tableIdName = tableId.toTablePath().toString(); - sinks.put(tableIdName, seaTunnelSink); + sinks.put(tableId.toTablePath(), seaTunnelSink); } } SeaTunnelSink sink = @@ -174,7 +173,9 @@ public List execute(List upstreamDataS // if not support multi table, rollback public SeaTunnelSink tryGenerateMultiTableSink( - Map sinks, ReadonlyConfig sinkConfig, ClassLoader classLoader) { + Map sinks, + ReadonlyConfig sinkConfig, + ClassLoader classLoader) { if (sinks.values().stream().anyMatch(sink -> !(sink instanceof SupportMultiTableSink))) { log.info("Unsupported multi table sink api, rollback to sink template"); // choose the first sink diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index 6c3aabe691d..d4f99d65f54 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -27,6 +27,7 @@ import org.apache.seatunnel.api.sink.SupportSaveMode; import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; @@ -168,7 +169,7 @@ public void handleSaveMode(SeaTunnelSink sink) { } } } else if (sink instanceof MultiTableSink) { - Map sinks = ((MultiTableSink) sink).getSinks(); + Map sinks = ((MultiTableSink) sink).getSinks(); for (SeaTunnelSink seaTunnelSink : sinks.values()) { handleSaveMode(seaTunnelSink); } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java index 17d92545252..3d5ac886d1d 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.engine.core.dag.actions; +import org.apache.seatunnel.api.table.catalog.TablePath; + import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; @@ -25,5 +27,6 @@ @NoArgsConstructor @AllArgsConstructor public class SinkConfig implements Config { - private String multipleRowTableId; + // private String multipleRowTableId; + private TablePath tablePath; } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 22abdfbffdf..4dc2ff392cc 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -33,6 +33,7 @@ import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.factory.ChangeStreamTableSourceCheckpoint; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.FactoryUtil; @@ -657,7 +658,7 @@ private static T findLast(LinkedHashMap map) { log.info("Unsupported multi table sink api, rollback to sink template"); return Optional.empty(); } - Map sinks = new HashMap<>(); + Map sinks = new HashMap<>(); Set jars = sinkActions.stream() .flatMap(a -> a.getJarUrls().stream()) @@ -665,8 +666,10 @@ private static T findLast(LinkedHashMap map) { sinkActions.forEach( action -> { SeaTunnelSink sink = action.getSink(); - String tableId = action.getConfig().getMultipleRowTableId(); - sinks.put(tableId, sink); + // String tableId = + // action.getConfig().getMultipleRowTableId(); + TablePath tablePath = action.getConfig().getTablePath(); + sinks.put(tablePath, sink); }); SeaTunnelSink sink = FactoryUtil.createMultiTableSink(sinks, options, classLoader); @@ -698,12 +701,11 @@ private static T findLast(LinkedHashMap map) { FactoryUtil.createAndPrepareSink( catalogTable, readonlyConfig, classLoader, factoryId); sink.setJobContext(jobConfig.getJobContext()); - SinkConfig actionConfig = - new SinkConfig(catalogTable.getTableId().toTablePath().toString()); + SinkConfig actionConfig = new SinkConfig(catalogTable.getTableId().toTablePath()); long id = idGenerator.getNextId(); String actionName = JobConfigParser.createSinkActionName( - configIndex, factoryId, actionConfig.getMultipleRowTableId()); + configIndex, factoryId, actionConfig.getTablePath().toString()); SinkAction sinkAction = new SinkAction<>( id, diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java index bcc49daae2a..cbc8db96322 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java @@ -330,7 +330,7 @@ private List getShuffleTask( (PhysicalExecutionFlow) nextFlow; SinkAction sinkAction = (SinkAction) sinkFlow.getAction(); String sinkTableId = - sinkAction.getConfig().getMultipleRowTableId(); + sinkAction.getConfig().getTablePath().toString(); long taskIDPrefix = idGenerator.getNextId(); long taskGroupIDPrefix = idGenerator.getNextId(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 46883fdfcfd..a8fa0de57a0 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -27,6 +27,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SupportSaveMode; import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.common.utils.RetryUtils; @@ -527,7 +528,7 @@ public static void handleSaveMode(SeaTunnelSink sink) { } } } else if (sink instanceof MultiTableSink) { - Map sinks = ((MultiTableSink) sink).getSinks(); + Map sinks = ((MultiTableSink) sink).getSinks(); for (SeaTunnelSink seaTunnelSink : sinks.values()) { handleSaveMode(seaTunnelSink); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 3f47ee84c3b..95996e3b8bb 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -123,13 +123,13 @@ public SinkFlowLifeCycle( boolean isMulti = sinkAction.getSink() instanceof MultiTableSink; if (isMulti) { sinkTables = ((MultiTableSink) sinkAction.getSink()).getSinkTables(); - String[] upstreamTablePaths = + TablePath[] upstreamTablePaths = ((MultiTableSink) sinkAction.getSink()) .getSinks() .keySet() - .toArray(new String[0]); + .toArray(new TablePath[0]); for (int i = 0; i < ((MultiTableSink) sinkAction.getSink()).getSinks().size(); i++) { - tablesMaps.put(TablePath.of(upstreamTablePaths[i]), sinkTables.get(i)); + tablesMaps.put(upstreamTablePaths[i], sinkTables.get(i)); } } else { Optional catalogTable = sinkAction.getSink().getWriteCatalogTable();