Skip to content

Commit

Permalink
[Improve] [connector-file-base] Solves the problem of file duplicatio…
Browse files Browse the repository at this point in the history
…n under ParallelSource
  • Loading branch information
leibaoxin committed Jan 8, 2025
1 parent 8bd6047 commit c485e61
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

Expand All @@ -37,7 +39,8 @@ public class FileSourceSplitEnumerator
private static final Logger LOGGER = LoggerFactory.getLogger(FileSourceSplitEnumerator.class);

private final Context<FileSourceSplit> context;
private final Set<FileSourceSplit> allSplit = new HashSet<>();
private final Set<FileSourceSplit> allSplit =
new TreeSet<>(Comparator.comparing(FileSourceSplit::splitId));
private Set<FileSourceSplit> assignedSplit;
private final List<String> filePaths;
private final AtomicInteger assignCount = new AtomicInteger(0);
Expand Down
6 changes: 6 additions & 0 deletions seatunnel-translation/seatunnel-translation-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,11 @@
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-file-base</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package org.apache.seatunnel.translation.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSourceReader;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@Slf4j
public class ParallelSourceTest {

@Test
void testParallelSourceForPollingFileAllocation() throws Exception {
int fileSize = 15;
int parallelism = 4;

// create file source
BaseFileSource baseFileSource =
new BaseFileSource() {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
filePaths = new ArrayList<>();
for (int i = 0; i < fileSize; i++) {
filePaths.add("file" + i + ".txt");
}
}

@Override
public String getPluginName() {
return FileSystemType.HDFS.getFileSystemPluginName();
}
};

// prepare files
baseFileSource.prepare(null);

ParallelSource parallelSource =
new ParallelSource(baseFileSource, null, parallelism, "parallel-source-test", 0);
ParallelSource parallelSource2 =
new ParallelSource(baseFileSource, null, parallelism, "parallel-source-test2", 1);
ParallelSource parallelSource3 =
new ParallelSource(baseFileSource, null, parallelism, "parallel-source-test3", 2);
ParallelSource parallelSource4 =
new ParallelSource(baseFileSource, null, parallelism, "parallel-source-test4", 3);

parallelSource.open();
parallelSource2.open();
parallelSource3.open();
parallelSource4.open();

// execute file allocation process
parallelSource.splitEnumerator.run();
parallelSource2.splitEnumerator.run();
parallelSource3.splitEnumerator.run();
parallelSource4.splitEnumerator.run();

// Gets the splits assigned for each reader
List<FileSourceSplit> sourceSplits =
((BaseFileSourceReader) parallelSource.reader).snapshotState(0);
List<FileSourceSplit> sourceSplits2 =
((BaseFileSourceReader) parallelSource2.reader).snapshotState(0);
List<FileSourceSplit> sourceSplits3 =
((BaseFileSourceReader) parallelSource3.reader).snapshotState(0);
List<FileSourceSplit> sourceSplits4 =
((BaseFileSourceReader) parallelSource4.reader).snapshotState(0);

log.info(
"parallel source1 splits => {}",
sourceSplits.stream().map(FileSourceSplit::splitId).collect(Collectors.toList()));

log.info(
"parallel source2 splits => {}",
sourceSplits2.stream().map(FileSourceSplit::splitId).collect(Collectors.toList()));

log.info(
"parallel source3 splits => {}",
sourceSplits3.stream().map(FileSourceSplit::splitId).collect(Collectors.toList()));

log.info(
"parallel source4 splits => {}",
sourceSplits4.stream().map(FileSourceSplit::splitId).collect(Collectors.toList()));

// check that there are no duplicate file assignments
Set<FileSourceSplit> splitSet = new HashSet<>();
splitSet.addAll(sourceSplits);
splitSet.addAll(sourceSplits2);
splitSet.addAll(sourceSplits3);
splitSet.addAll(sourceSplits4);

Assertions.assertEquals(splitSet.size(), fileSize);
}
}

0 comments on commit c485e61

Please sign in to comment.