Skip to content

Commit

Permalink
fix(filters): fix buffered records not being flushed (streamthoughts#667
Browse files Browse the repository at this point in the history
)

Resolves: streamthoughts#667
  • Loading branch information
a.dekin committed Oct 21, 2024
1 parent b53fabc commit 59b4936
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectContext;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecordOffset;
import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord;
Expand Down Expand Up @@ -89,6 +90,18 @@ public RecordsIterable<FileRecord<TypedStruct>> apply(final RecordsIterable<File
// Apply the filter-chain on current record.
results.addAll(apply(context, record.value(), doHasNext));
}

// Flush all records buffered in the filter chain applying subsequent filters to each buffered record
if (!hasNext && records.isEmpty()) {
FilterNode node = rootNode;
while (node != null) {
List<FileRecord<TypedStruct>> flushed = node
.flush(newContextFor(FileObjectOffset::empty, fileObjectObject.metadata()));
results.addAll(flushed);
node = node.onSuccess;
}
}

return new RecordsIterable<>(results);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;


public class DefaultRecordFilterPipelineTest {
Expand Down Expand Up @@ -190,6 +191,101 @@ public void shouldNotFlushBufferedRecordsGivenNoAcceptFilterAndThereIsNoRemainin
assertEquals(record2, records.collect().get(0));
}

@Test
public void shouldFlushBufferedRecordsGivenAcceptFilterEmptyRecordsIterableAndNoRemainingRecords() {

final FileRecord<TypedStruct> record1 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value1");
final FileRecord<TypedStruct> record2 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value2");

List<FileRecord<TypedStruct>> bufferedRecords = List.of(record1, record2);
TestFilter filter1 = new TestFilter()
.setBuffer(bufferedRecords);

DefaultRecordFilterPipeline pipeline = new DefaultRecordFilterPipeline(Collections.singletonList(filter1));
pipeline.init(context);

RecordsIterable<FileRecord<TypedStruct>> records = pipeline.apply(new RecordsIterable<>(), false);

assertNotNull(records);
List<FileRecord<TypedStruct>> filteredRecords = records.collect();
Assertions.assertIterableEquals(bufferedRecords, filteredRecords);
}

@Test
public void shouldFlushBufferedRecordsFromFirstFilterGivenAcceptFiltersEmptyRecordsIterableAndNoRemainingRecods() {

final FileRecord<TypedStruct> record1 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value1");
final FileRecord<TypedStruct> record2 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value2");

List<FileRecord<TypedStruct>> bufferedRecords = List.of(record1, record2);
TestFilter filter1 = new TestFilter()
.setBuffer(bufferedRecords);
TestFilter filter2 = new TestFilter()
.setFunction(((context1, record, hasNext) -> RecordsIterable.of(record)));

DefaultRecordFilterPipeline pipeline = new DefaultRecordFilterPipeline(List.of(filter1, filter2));
pipeline.init(context);

RecordsIterable<FileRecord<TypedStruct>> records = pipeline.apply(new RecordsIterable<>(), false);

assertNotNull(records);
List<FileRecord<TypedStruct>> filteredRecords = records.collect();
Assertions.assertIterableEquals(bufferedRecords, filteredRecords);
}

@Test
public void shouldFlushBufferedRecordsFromLastFilterGivenAcceptFiltersEmptyRecordsIterableAndNoRemainingRecods() {

final FileRecord<TypedStruct> record1 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value1");
final FileRecord<TypedStruct> record2 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value2");

List<FileRecord<TypedStruct>> bufferedRecords = List.of(record1, record2);
TestFilter filter1 = new TestFilter();
TestFilter filter2 = new TestFilter()
.setBuffer(bufferedRecords);
DefaultRecordFilterPipeline pipeline = new DefaultRecordFilterPipeline(List.of(filter1, filter2));
pipeline.init(context);

RecordsIterable<FileRecord<TypedStruct>> records = pipeline.apply(new RecordsIterable<>(), false);

assertNotNull(records);
List<FileRecord<TypedStruct>> filteredRecords = records.collect();
Assertions.assertIterableEquals(bufferedRecords, filteredRecords);
}

@Test
public void shouldFlushBufferedRecordsFromAllFiltersGivenAcceptFiltersEmptyRecordsIterableAndNoRemainingRecods() {

final FileRecord<TypedStruct> record1 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value1");
final FileRecord<TypedStruct> record2 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value2");
final FileRecord<TypedStruct> record3 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value3");
final FileRecord<TypedStruct> record4 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value4");
final FileRecord<TypedStruct> record5 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value5");

List<FileRecord<TypedStruct>> allBuffered = List.of(record1, record2, record3, record4, record5);

List<FileRecord<TypedStruct>> bufferedRecords1 = List.of(record1);
List<FileRecord<TypedStruct>> bufferedRecords2 = List.of(record2, record3);
List<FileRecord<TypedStruct>> bufferedRecords3 = List.of(record4, record5);
TestFilter filter1 = new TestFilter()
.setFunction(((context1, record, hasNext) -> RecordsIterable.of(record)))
.setBuffer(bufferedRecords1);
TestFilter filter2 = new TestFilter()
.setFunction(((context1, record, hasNext) -> RecordsIterable.of(record)))
.setBuffer(bufferedRecords2);
TestFilter filter3 = new TestFilter()
.setFunction(((context1, record, hasNext) -> RecordsIterable.of(record)))
.setBuffer(bufferedRecords3);
DefaultRecordFilterPipeline pipeline = new DefaultRecordFilterPipeline(List.of(filter1, filter2, filter3));
pipeline.init(context);

RecordsIterable<FileRecord<TypedStruct>> records = pipeline.apply(new RecordsIterable<>(), false);

assertNotNull(records);
List<FileRecord<TypedStruct>> filteredRecords = records.collect();
Assertions.assertIterableEquals(allBuffered, filteredRecords);
}

@Test
public void shouldReturnRecordUnchangedGivenNoFilter() {

Expand Down

0 comments on commit 59b4936

Please sign in to comment.