Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initiate S3 Multi-part upload on receiving first event #318

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ public void clear() {
fileBuffers.clear();
}

@Override
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
// One entry per file, so the entire file can be removed to reduce memory overhead.
fileBuffers.remove(identifier);
}

@Override
public Map<String, List<SinkRecord>> records() {
return Collections.unmodifiableMap(fileBuffers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ public void clear() {
fileBuffers.clear();
}

@Override
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
// One record per file, so remove the entry to reduce memory
fileBuffers.remove(identifier);
}

@Override
public Map<String, List<SinkRecord>> records() {
return Collections.unmodifiableMap(fileBuffers);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.grouper;

public class PartitionOffset {

private Long offset;
private int partition;

public PartitionOffset(final int partition, final Long offset) {
this.offset = offset;
this.partition = partition;
}

public int getPartition() {
return partition;
}

public void setPartition(final int partition) {
this.partition = partition;
}

public Long getOffset() {
return offset;
}

public void setOffset(final Long offset) {
this.offset = offset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ public interface RecordGrouper {
*/
void clear();

/**
* Clear processed records from memory
*
* @param records
* all records already processed to Sink
*/
void clearProcessedRecords(String identifier, List<SinkRecord> records);

/**
* Get all records associated with files, grouped by the file name.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.grouper;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.kafka.connect.sink.SinkRecord;

public class SinkRecordsBatch {

private int numberOfRecords;
final private List<SinkRecord> sinkRecords;
final private String filename;
final private long recordCreationDate = System.currentTimeMillis();

public SinkRecordsBatch(final String filename) {
this.filename = filename;
sinkRecords = new ArrayList<>();
numberOfRecords = 0;
}

public SinkRecordsBatch(final String filename, final List<SinkRecord> sinkRecords) {
this.filename = filename;
this.sinkRecords = new ArrayList<>(sinkRecords);
numberOfRecords = sinkRecords.size();
}
public SinkRecordsBatch(final String filename, final SinkRecord sinkRecord) {
this.filename = filename;
this.sinkRecords = new ArrayList<>();
this.sinkRecords.add(sinkRecord);
numberOfRecords = 1;
}

public void addSinkRecord(final SinkRecord sinkRecord) {
this.sinkRecords.add(sinkRecord);
this.numberOfRecords++;
}

public List<SinkRecord> getSinkRecords() {
// Ensure access to the Sink Records can only be changed through the apis and not accidentally by another
// process.
return Collections.unmodifiableList(sinkRecords);
}

public void removeSinkRecords(final List<SinkRecord> sinkRecords) {
this.sinkRecords.removeAll(sinkRecords);
}

public int getNumberOfRecords() {
return numberOfRecords;
}

public String getFilename() {
return filename;
}

public long getRecordCreationDate() {
return recordCreationDate;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package io.aiven.kafka.connect.common.grouper;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -38,13 +38,13 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper {

private final Template filenameTemplate;

private final Map<TopicPartitionKey, SinkRecord> currentHeadRecords = new HashMap<>();
private final Map<TopicPartitionKey, PartitionOffset> currentHeadRecords = new HashMap<>();

private final Map<String, List<SinkRecord>> fileBuffers = new HashMap<>();
private final Map<String, SinkRecordsBatch> fileBuffers = new HashMap<>();

private final StableTimeFormatter timeFormatter;

private final Rotator<List<SinkRecord>> rotator;
private final Rotator<SinkRecordsBatch> rotator;

TopicPartitionKeyRecordGrouper(final Template filenameTemplate, final Integer maxRecordsPerFile,
final TimestampSource tsSource) {
Expand All @@ -59,7 +59,7 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper {
if (unlimited) {
return false;
} else {
return buffer == null || buffer.size() >= maxRecordsPerFile;
return buffer == null || buffer.getNumberOfRecords() >= maxRecordsPerFile;
}
};
}
Expand All @@ -68,15 +68,16 @@ public class TopicPartitionKeyRecordGrouper implements RecordGrouper {
public void put(final SinkRecord record) {
Objects.requireNonNull(record, "record cannot be null");
final String recordKey = resolveRecordKeyFor(record);
fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record);
fileBuffers.computeIfAbsent(recordKey, ignored -> new SinkRecordsBatch(recordKey)).addSinkRecord(record);
}

protected String resolveRecordKeyFor(final SinkRecord record) {
final var key = recordKey(record);

final TopicPartitionKey tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()),
key);
final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk, ignored -> record);
final PartitionOffset currentHeadRecord = currentHeadRecords.computeIfAbsent(tpk,
ignored -> new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()));
String objectKey = generateObjectKey(tpk, currentHeadRecord, record);
if (rotator.rotate(fileBuffers.get(objectKey))) {
// Create new file using this record as the head record.
Expand All @@ -97,14 +98,14 @@ private String recordKey(final SinkRecord record) {
return key;
}

public String generateObjectKey(final TopicPartitionKey tpk, final SinkRecord headRecord,
public String generateObjectKey(final TopicPartitionKey tpk, final PartitionOffset headRecord,
final SinkRecord currentRecord) {
final Function<Parameter, String> setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean()
? String.format("%020d", headRecord.kafkaOffset())
: Long.toString(headRecord.kafkaOffset());
? String.format("%020d", headRecord.getOffset())
: Long.toString(headRecord.getOffset());
final Function<Parameter, String> setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean()
? String.format("%010d", headRecord.kafkaPartition())
: Long.toString(headRecord.kafkaPartition());
? String.format("%010d", headRecord.getPartition())
: Long.toString(headRecord.getPartition());

return filenameTemplate.instance()
.bindVariable(FilenameTemplateVariable.TOPIC.name, tpk.topicPartition::topic)
Expand All @@ -118,8 +119,8 @@ public String generateObjectKey(final TopicPartitionKey tpk, final SinkRecord he
protected String generateNewRecordKey(final SinkRecord record) {
final var key = recordKey(record);
final var tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition()), key);
currentHeadRecords.put(tpk, record);
return generateObjectKey(tpk, record, record);
currentHeadRecords.put(tpk, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()));
return generateObjectKey(tpk, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()), record);
}

@Override
Expand All @@ -128,9 +129,20 @@ public void clear() {
fileBuffers.clear();
}

@Override
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
final SinkRecordsBatch grouperRecord = fileBuffers.getOrDefault(identifier, null);
if (Objects.isNull(grouperRecord)) {
return;
}
grouperRecord.removeSinkRecords(records);
}

@Override
public Map<String, List<SinkRecord>> records() {
return Collections.unmodifiableMap(fileBuffers);
return Collections.unmodifiableMap(fileBuffers.values()
.stream()
.collect(Collectors.toMap(SinkRecordsBatch::getFilename, SinkRecordsBatch::getSinkRecords)));
}

public static class TopicPartitionKey {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package io.aiven.kafka.connect.common.grouper;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
Expand All @@ -46,14 +46,14 @@
class TopicPartitionRecordGrouper implements RecordGrouper {

private final Template filenameTemplate;
// Offsets are a Long and Partitions are an Integer
private final Map<TopicPartition, PartitionOffset> currentHeadRecords = new HashMap<>();

private final Map<TopicPartition, SinkRecord> currentHeadRecords = new HashMap<>();

private final Map<String, List<SinkRecord>> fileBuffers = new HashMap<>();
private final Map<String, SinkRecordsBatch> fileBuffers = new HashMap<>();

private final StableTimeFormatter timeFormatter;

private final Rotator<List<SinkRecord>> rotator;
private final Rotator<SinkRecordsBatch> rotator;

/**
* A constructor.
Expand All @@ -78,7 +78,7 @@ class TopicPartitionRecordGrouper implements RecordGrouper {
if (unlimited) {
return false;
} else {
return buffer == null || buffer.size() >= maxRecordsPerFile;
return buffer == null || buffer.getNumberOfRecords() >= maxRecordsPerFile;
}
};
}
Expand All @@ -87,28 +87,30 @@ class TopicPartitionRecordGrouper implements RecordGrouper {
public void put(final SinkRecord record) {
Objects.requireNonNull(record, "record cannot be null");
final String recordKey = resolveRecordKeyFor(record);
fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList<>()).add(record);
fileBuffers.computeIfAbsent(recordKey, ignored -> new SinkRecordsBatch(recordKey)).addSinkRecord(record);
}

protected String resolveRecordKeyFor(final SinkRecord record) {
final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
final SinkRecord currentHeadRecord = currentHeadRecords.computeIfAbsent(topicPartition, ignored -> record);
final PartitionOffset currentHeadRecord = currentHeadRecords.computeIfAbsent(topicPartition,
ignored -> new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()));
String recordKey = generateRecordKey(topicPartition, currentHeadRecord, record);
if (rotator.rotate(fileBuffers.get(recordKey))) {
// Create new file using this record as the head record.
recordKey = generateNewRecordKey(record);
}

return recordKey;
}

private String generateRecordKey(final TopicPartition topicPartition, final SinkRecord headRecord,
private String generateRecordKey(final TopicPartition topicPartition, final PartitionOffset headRecord,
final SinkRecord currentRecord) {
final Function<Parameter, String> setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean()
? String.format("%020d", headRecord.kafkaOffset())
: Long.toString(headRecord.kafkaOffset());
? String.format("%020d", headRecord.getOffset())
: Long.toString(headRecord.getOffset());
final Function<Parameter, String> setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean()
? String.format("%010d", headRecord.kafkaPartition())
: Long.toString(headRecord.kafkaPartition());
? String.format("%010d", headRecord.getPartition())
: Long.toString(headRecord.getPartition());

return filenameTemplate.instance()
.bindVariable(FilenameTemplateVariable.TOPIC.name, topicPartition::topic)
Expand All @@ -120,8 +122,9 @@ private String generateRecordKey(final TopicPartition topicPartition, final Sink

protected String generateNewRecordKey(final SinkRecord record) {
final TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
currentHeadRecords.put(topicPartition, record);
return generateRecordKey(topicPartition, record, record);
currentHeadRecords.put(topicPartition, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()));
return generateRecordKey(topicPartition, new PartitionOffset(record.kafkaPartition(), record.kafkaOffset()),
record);
}

@Override
Expand All @@ -130,9 +133,20 @@ public void clear() {
fileBuffers.clear();
}

@Override
public void clearProcessedRecords(final String identifier, final List<SinkRecord> records) {
final SinkRecordsBatch grouperRecord = fileBuffers.getOrDefault(identifier, null);
if (Objects.isNull(grouperRecord)) {
return;
}
grouperRecord.removeSinkRecords(records);
}

@Override
public Map<String, List<SinkRecord>> records() {
return Collections.unmodifiableMap(fileBuffers);
return Collections.unmodifiableMap(fileBuffers.values()
.stream()
.collect(Collectors.toMap(SinkRecordsBatch::getFilename, SinkRecordsBatch::getSinkRecords)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ private KafkaProducer<String, GenericRecord> newProducer() {
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProps.put("schema.registry.url", SCHEMA_REGISTRY.getSchemaRegistryUrl());
producerProps.put("linger.ms", 1000);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

linger.ms was added to send all the test events in one batch, so that the flush method is not called in between small batches of kafka events being sent causing the integration tests to fail.

return new KafkaProducer<>(producerProps);
}

Expand Down
Loading
Loading