Skip to content

Commit

Permalink
update table identifier to path
Browse files Browse the repository at this point in the history
  • Loading branch information
Cheun99 committed Jan 9, 2025
1 parent 2827876 commit 8754e65
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class MultiTableSink
MultiTableAggregatedCommitInfo>,
SupportSchemaEvolutionSink {

@Getter private final Map<String, SeaTunnelSink> sinks;
@Getter private final Map<TablePath, SeaTunnelSink> sinks;
private final int replicaNum;

public MultiTableSink(MultiTableFactoryContext context) {
Expand All @@ -72,9 +72,10 @@ public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> createWri
Map<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> writers = new HashMap<>();
Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new HashMap<>();
for (int i = 0; i < replicaNum; i++) {
for (String tableIdentifier : sinks.keySet()) {
SeaTunnelSink sink = sinks.get(tableIdentifier);
for (TablePath tablePath : sinks.keySet()) {
SeaTunnelSink sink = sinks.get(tablePath);
int index = context.getIndexOfSubtask() * replicaNum + i;
String tableIdentifier = tablePath.toString();
writers.put(
SinkIdentifier.of(tableIdentifier, index),
sink.createWriter(new SinkContextProxy(index, replicaNum, context)));
Expand All @@ -91,10 +92,10 @@ public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> restoreWr
Map<SinkIdentifier, SinkWriter.Context> sinkWritersContext = new HashMap<>();

for (int i = 0; i < replicaNum; i++) {
for (String tableIdentifier : sinks.keySet()) {
SeaTunnelSink sink = sinks.get(tableIdentifier);
for (TablePath tablePath : sinks.keySet()) {
SeaTunnelSink sink = sinks.get(tablePath);
int index = context.getIndexOfSubtask() * replicaNum + i;
SinkIdentifier sinkIdentifier = SinkIdentifier.of(tableIdentifier, index);
SinkIdentifier sinkIdentifier = SinkIdentifier.of(tablePath.toString(), index);
List<?> state =
states.stream()
.map(
Expand All @@ -113,7 +114,7 @@ public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> restoreWr
sink.restoreWriter(
new SinkContextProxy(index, replicaNum, context), state));
}
sinkWritersContext.put(SinkIdentifier.of(tableIdentifier, index), context);
sinkWritersContext.put(sinkIdentifier, context);
}
}
return new MultiTableSinkWriter(writers, replicaNum, sinkWritersContext);
Expand All @@ -127,12 +128,13 @@ public Optional<Serializer<MultiTableState>> getWriterStateSerializer() {
@Override
public Optional<SinkCommitter<MultiTableCommitInfo>> createCommitter() throws IOException {
Map<String, SinkCommitter<?>> committers = new HashMap<>();
for (String tableIdentifier : sinks.keySet()) {
SeaTunnelSink sink = sinks.get(tableIdentifier);
for (TablePath tablePath : sinks.keySet()) {
SeaTunnelSink sink = sinks.get(tablePath);
sink.createCommitter()
.ifPresent(
committer ->
committers.put(tableIdentifier, (SinkCommitter<?>) committer));
committers.put(
tablePath.toString(), (SinkCommitter<?>) committer));
}
if (committers.isEmpty()) {
return Optional.empty();
Expand All @@ -149,12 +151,12 @@ public Optional<Serializer<MultiTableCommitInfo>> getCommitInfoSerializer() {
public Optional<SinkAggregatedCommitter<MultiTableCommitInfo, MultiTableAggregatedCommitInfo>>
createAggregatedCommitter() throws IOException {
Map<String, SinkAggregatedCommitter<?, ?>> aggCommitters = new HashMap<>();
for (String tableIdentifier : sinks.keySet()) {
SeaTunnelSink sink = sinks.get(tableIdentifier);
for (TablePath tablePath : sinks.keySet()) {
SeaTunnelSink sink = sinks.get(tablePath);
Optional<SinkAggregatedCommitter<?, ?>> sinkOptional = sink.createAggregatedCommitter();
sinkOptional.ifPresent(
sinkAggregatedCommitter ->
aggCommitters.put(tableIdentifier, sinkAggregatedCommitter));
aggCommitters.put(tablePath.toString(), sinkAggregatedCommitter));
}
if (aggCommitters.isEmpty()) {
return Optional.empty();
Expand All @@ -171,7 +173,7 @@ public List<TablePath> getSinkTables() {
tablePaths.add(
((CatalogTable) values.get(i).getWriteCatalogTable().get()).getTablePath());
} else {
tablePaths.add(TablePath.of(sinks.keySet().toArray(new String[0])[i]));
tablePaths.add(sinks.keySet().toArray(new TablePath[0])[i]);
}
}
return tablePaths;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
Expand Down Expand Up @@ -178,7 +179,7 @@ SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createAndPrepareSi

public static <IN, StateT, CommitInfoT, AggregatedCommitInfoT>
SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createMultiTableSink(
Map<String, SeaTunnelSink> sinks,
Map<TablePath, SeaTunnelSink> sinks,
ReadonlyConfig options,
ClassLoader classLoader) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.catalog.TablePath;

import lombok.Getter;

Expand All @@ -27,10 +28,10 @@
@Getter
public class MultiTableFactoryContext extends TableSinkFactoryContext {

private final Map<String, SeaTunnelSink> sinks;
private final Map<TablePath, SeaTunnelSink> sinks;

public MultiTableFactoryContext(
ReadonlyConfig options, ClassLoader classLoader, Map<String, SeaTunnelSink> sinks) {
ReadonlyConfig options, ClassLoader classLoader, Map<TablePath, SeaTunnelSink> sinks) {
super(null, options, classLoader);
this.sinks = sinks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryException;
Expand Down Expand Up @@ -184,7 +185,7 @@ public static SeaTunnelSink createSink(
return sink;
} else {
if (catalogTables.size() > 1) {
Map<String, SeaTunnelSink> sinks = new HashMap<>();
Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
catalogTables.forEach(
catalogTable -> {
Expand All @@ -202,7 +203,7 @@ public static SeaTunnelSink createSink(
.createSink(context)
.createSink();
action.setJobContext(jobContext);
sinks.put(catalogTable.getTablePath().toString(), action);
sinks.put(catalogTable.getTablePath(), action);
});
return FactoryUtil.createMultiTableSink(sinks, readonlyConfig, classLoader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
Expand Down Expand Up @@ -106,7 +107,7 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
fromSourceTable(sinkConfig, upstreamDataStreams).orElse(input);
Optional<? extends Factory> factory = plugins.get(i);
boolean fallBack = !factory.isPresent() || isFallback(factory.get());
Map<String, SeaTunnelSink> sinks = new HashMap<>();
Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
if (fallBack) {
for (CatalogTable catalogTable : stream.getCatalogTables()) {
SeaTunnelSink fallBackSink =
Expand All @@ -122,8 +123,7 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
fallBackSink.setTypeInfo(sourceType);
handleSaveMode(fallBackSink);
TableIdentifier tableId = catalogTable.getTableId();
String tableIdName = tableId.toTablePath().toString();
sinks.put(tableIdName, fallBackSink);
sinks.put(tableId.toTablePath(), fallBackSink);
}
} else {
for (CatalogTable catalogTable : stream.getCatalogTables()) {
Expand All @@ -141,8 +141,7 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
seaTunnelSink.setJobContext(jobContext);
handleSaveMode(seaTunnelSink);
TableIdentifier tableId = catalogTable.getTableId();
String tableIdName = tableId.toTablePath().toString();
sinks.put(tableIdName, seaTunnelSink);
sinks.put(tableId.toTablePath(), seaTunnelSink);
}
}
SeaTunnelSink sink =
Expand All @@ -168,7 +167,9 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS

// if not support multi table, rollback
public SeaTunnelSink tryGenerateMultiTableSink(
Map<String, SeaTunnelSink> sinks, ReadonlyConfig sinkConfig, ClassLoader classLoader) {
Map<TablePath, SeaTunnelSink> sinks,
ReadonlyConfig sinkConfig,
ClassLoader classLoader) {
if (sinks.values().stream().anyMatch(sink -> !(sink instanceof SupportMultiTableSink))) {
log.info("Unsupported multi table sink api, rollback to sink template");
// choose the first sink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
Expand Down Expand Up @@ -107,7 +108,7 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
fromSourceTable(sinkConfig, upstreamDataStreams).orElse(input);
Optional<? extends Factory> factory = plugins.get(i);
boolean fallBack = !factory.isPresent() || isFallback(factory.get());
Map<String, SeaTunnelSink> sinks = new HashMap<>();
Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
if (fallBack) {
for (CatalogTable catalogTable : stream.getCatalogTables()) {
SeaTunnelSink fallBackSink =
Expand All @@ -123,8 +124,7 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
fallBackSink.setTypeInfo(sourceType);
handleSaveMode(fallBackSink);
TableIdentifier tableId = catalogTable.getTableId();
String tableIdName = tableId.toTablePath().toString();
sinks.put(tableIdName, fallBackSink);
sinks.put(tableId.toTablePath(), fallBackSink);
}
} else {
for (CatalogTable catalogTable : stream.getCatalogTables()) {
Expand All @@ -142,8 +142,7 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
seaTunnelSink.setJobContext(jobContext);
handleSaveMode(seaTunnelSink);
TableIdentifier tableId = catalogTable.getTableId();
String tableIdName = tableId.toTablePath().toString();
sinks.put(tableIdName, seaTunnelSink);
sinks.put(tableId.toTablePath(), seaTunnelSink);
}
}
SeaTunnelSink sink =
Expand Down Expand Up @@ -174,7 +173,9 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS

// if not support multi table, rollback
public SeaTunnelSink tryGenerateMultiTableSink(
Map<String, SeaTunnelSink> sinks, ReadonlyConfig sinkConfig, ClassLoader classLoader) {
Map<TablePath, SeaTunnelSink> sinks,
ReadonlyConfig sinkConfig,
ClassLoader classLoader) {
if (sinks.values().stream().anyMatch(sink -> !(sink instanceof SupportMultiTableSink))) {
log.info("Unsupported multi table sink api, rollback to sink template");
// choose the first sink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
Expand Down Expand Up @@ -168,7 +169,7 @@ public void handleSaveMode(SeaTunnelSink sink) {
}
}
} else if (sink instanceof MultiTableSink) {
Map<String, SeaTunnelSink> sinks = ((MultiTableSink) sink).getSinks();
Map<TablePath, SeaTunnelSink> sinks = ((MultiTableSink) sink).getSinks();
for (SeaTunnelSink seaTunnelSink : sinks.values()) {
handleSaveMode(seaTunnelSink);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.engine.core.dag.actions;

import org.apache.seatunnel.api.table.catalog.TablePath;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand All @@ -25,5 +27,6 @@
@NoArgsConstructor
@AllArgsConstructor
public class SinkConfig implements Config {
private String multipleRowTableId;
// private String multipleRowTableId;
private TablePath tablePath;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.ChangeStreamTableSourceCheckpoint;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
Expand Down Expand Up @@ -657,16 +658,18 @@ private static <T> T findLast(LinkedHashMap<?, T> map) {
log.info("Unsupported multi table sink api, rollback to sink template");
return Optional.empty();
}
Map<String, SeaTunnelSink> sinks = new HashMap<>();
Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
Set<URL> jars =
sinkActions.stream()
.flatMap(a -> a.getJarUrls().stream())
.collect(Collectors.toSet());
sinkActions.forEach(
action -> {
SeaTunnelSink sink = action.getSink();
String tableId = action.getConfig().getMultipleRowTableId();
sinks.put(tableId, sink);
// String tableId =
// action.getConfig().getMultipleRowTableId();
TablePath tablePath = action.getConfig().getTablePath();
sinks.put(tablePath, sink);
});
SeaTunnelSink<?, ?, ?, ?> sink =
FactoryUtil.createMultiTableSink(sinks, options, classLoader);
Expand Down Expand Up @@ -698,12 +701,11 @@ private static <T> T findLast(LinkedHashMap<?, T> map) {
FactoryUtil.createAndPrepareSink(
catalogTable, readonlyConfig, classLoader, factoryId);
sink.setJobContext(jobConfig.getJobContext());
SinkConfig actionConfig =
new SinkConfig(catalogTable.getTableId().toTablePath().toString());
SinkConfig actionConfig = new SinkConfig(catalogTable.getTableId().toTablePath());
long id = idGenerator.getNextId();
String actionName =
JobConfigParser.createSinkActionName(
configIndex, factoryId, actionConfig.getMultipleRowTableId());
configIndex, factoryId, actionConfig.getTablePath().toString());
SinkAction<?, ?, ?, ?> sinkAction =
new SinkAction<>(
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ private List<PhysicalVertex> getShuffleTask(
(PhysicalExecutionFlow) nextFlow;
SinkAction sinkAction = (SinkAction) sinkFlow.getAction();
String sinkTableId =
sinkAction.getConfig().getMultipleRowTableId();
sinkAction.getConfig().getTablePath().toString();

long taskIDPrefix = idGenerator.getNextId();
long taskGroupIDPrefix = idGenerator.getNextId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
Expand Down Expand Up @@ -527,7 +528,7 @@ public static void handleSaveMode(SeaTunnelSink sink) {
}
}
} else if (sink instanceof MultiTableSink) {
Map<String, SeaTunnelSink> sinks = ((MultiTableSink) sink).getSinks();
Map<TablePath, SeaTunnelSink> sinks = ((MultiTableSink) sink).getSinks();
for (SeaTunnelSink seaTunnelSink : sinks.values()) {
handleSaveMode(seaTunnelSink);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ public SinkFlowLifeCycle(
boolean isMulti = sinkAction.getSink() instanceof MultiTableSink;
if (isMulti) {
sinkTables = ((MultiTableSink) sinkAction.getSink()).getSinkTables();
String[] upstreamTablePaths =
TablePath[] upstreamTablePaths =
((MultiTableSink) sinkAction.getSink())
.getSinks()
.keySet()
.toArray(new String[0]);
.toArray(new TablePath[0]);
for (int i = 0; i < ((MultiTableSink) sinkAction.getSink()).getSinks().size(); i++) {
tablesMaps.put(TablePath.of(upstreamTablePaths[i]), sinkTables.get(i));
tablesMaps.put(upstreamTablePaths[i], sinkTables.get(i));
}
} else {
Optional<CatalogTable> catalogTable = sinkAction.getSink().getWriteCatalogTable();
Expand Down

0 comments on commit 8754e65

Please sign in to comment.