Skip to content

Commit

Permalink
From review - adding configs for prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
muralibasani committed Jan 9, 2025
1 parent 87ccede commit e23306c
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ public final class FileNameFragment extends ConfigFragment {
static final String FILE_MAX_RECORDS = "file.max.records";
static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone";
static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source";
static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
static final String DEFAULT_FILENAME_TEMPLATE = "{{topic}}-{{partition}}-{{start_offset}}";

public static final String FILE_PREFIX_TEMPLATE_CONFIG = "file.prefix.template";
static final String DEFAULT_FILE_PREFIX_TEMPLATE = "topics/{{topic}}/partition={{partition}}/";

public FileNameFragment(final AbstractConfig cfg) {
super(cfg);
}
Expand Down Expand Up @@ -112,6 +115,16 @@ public void ensureValid(final String name, final Object value) {
// UnusedAssignment
ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_SOURCE);

configDef.define(FILE_PREFIX_TEMPLATE_CONFIG, ConfigDef.Type.STRING, DEFAULT_FILE_PREFIX_TEMPLATE,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
"The template for file prefix on S3. "
+ "Supports `{{ variable }}` placeholders for substituting variables. "
+ "Currently supported variables are `topic` and `partition` "
+ "and are mandatory to have these in the directory structure."
+ "Example prefix : topics/{{topic}}/partition/{{partition}}/",
GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.LONG, FILE_PREFIX_TEMPLATE_CONFIG);

return configDef;
}

Expand Down Expand Up @@ -185,4 +198,8 @@ public int getMaxRecordsPerFile() {
return cfg.getInt(FILE_MAX_RECORDS);
}

public String getFilePrefixTemplateConfig() {
return cfg.getString(FILE_PREFIX_TEMPLATE_CONFIG);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,5 @@ public class FileExtractionPatterns {
public static final String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_PARTITION_KEY + ">\\d+)";
public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)";
public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)";
public static final String DEFAULT_PREFIX_FILE_PATH_PATTERN = "topics/{{" + PATTERN_TOPIC_KEY + "}}/partition={{"
+ PATTERN_PARTITION_KEY + "}}/";
public static final String ANY_FILENAME_PATTERN = ".*$";
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.aiven.kafka.connect.s3.source;

import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_NAME_TEMPLATE_CONFIG;
import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_PREFIX_TEMPLATE_CONFIG;
import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER;
import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY;
import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL;
Expand Down Expand Up @@ -164,6 +166,7 @@ void bytesTest(final boolean addPrefix) {
final var topicName = IntegrationBase.topicName(testInfo);
final ObjectDistributionStrategy objectDistributionStrategy;
final int partitionId = 0;
final String prefixPattern = "topics/{{topic}}/partition={{partition}}/";
String s3Prefix = "";
if (addPrefix) {
objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILEPATH;
Expand All @@ -172,8 +175,10 @@ void bytesTest(final boolean addPrefix) {
objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILENAME;
}

final String fileNamePatternSeparator = "_";

final Map<String, String> connectorConfig = getConfig(CONNECTOR_NAME, topicName, 2, objectDistributionStrategy,
addPrefix, s3Prefix);
addPrefix, s3Prefix, prefixPattern, fileNamePatternSeparator);

connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue());
connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig);
Expand All @@ -182,13 +187,16 @@ void bytesTest(final boolean addPrefix) {
final String testData2 = "Hello, Kafka Connect S3 Source! object 2";

final List<String> offsetKeys = new ArrayList<>();

// write 2 objects to s3
offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "0", addPrefix, s3Prefix));
offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "0", addPrefix, s3Prefix));
offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "1", addPrefix, s3Prefix));
offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "1", addPrefix, s3Prefix));
offsetKeys.add(writeToS3(topicName, new byte[0], "3", addPrefix, s3Prefix));
offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "0", addPrefix, s3Prefix,
fileNamePatternSeparator));
offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "0", addPrefix, s3Prefix,
fileNamePatternSeparator));
offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "1", addPrefix, s3Prefix,
fileNamePatternSeparator));
offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "1", addPrefix, s3Prefix,
fileNamePatternSeparator));
offsetKeys.add(writeToS3(topicName, new byte[0], "3", addPrefix, s3Prefix, "-"));

assertThat(testBucketAccessor.listObjects()).hasSize(5);

Expand All @@ -210,7 +218,7 @@ void bytesTest(final boolean addPrefix) {
void avroTest(final TestInfo testInfo) throws IOException {
final var topicName = IntegrationBase.topicName(testInfo);
final boolean addPrefix = false;
final Map<String, String> connectorConfig = getAvroConfig(topicName, InputFormat.AVRO, addPrefix, "",
final Map<String, String> connectorConfig = getAvroConfig(topicName, InputFormat.AVRO, addPrefix, "", "",
ObjectDistributionStrategy.OBJECT_HASH);

connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig);
Expand All @@ -237,12 +245,12 @@ void avroTest(final TestInfo testInfo) throws IOException {
final Set<String> offsetKeys = new HashSet<>();
final String s3Prefix = "";

offsetKeys.add(writeToS3(topicName, outputStream1, "1", false, s3Prefix));
offsetKeys.add(writeToS3(topicName, outputStream2, "1", false, s3Prefix));
offsetKeys.add(writeToS3(topicName, outputStream1, "1", false, s3Prefix, "-"));
offsetKeys.add(writeToS3(topicName, outputStream2, "1", false, s3Prefix, "-"));

offsetKeys.add(writeToS3(topicName, outputStream3, "2", false, s3Prefix));
offsetKeys.add(writeToS3(topicName, outputStream4, "2", false, s3Prefix));
offsetKeys.add(writeToS3(topicName, outputStream5, "2", false, s3Prefix));
offsetKeys.add(writeToS3(topicName, outputStream3, "2", false, s3Prefix, "-"));
offsetKeys.add(writeToS3(topicName, outputStream4, "2", false, s3Prefix, "-"));
offsetKeys.add(writeToS3(topicName, outputStream5, "2", false, s3Prefix, "-"));

assertThat(testBucketAccessor.listObjects()).hasSize(5);

Expand Down Expand Up @@ -271,10 +279,11 @@ void parquetTest(final boolean addPrefix) throws IOException {
final var topicName = IntegrationBase.topicName(testInfo);
final String partition = "0";
final ObjectDistributionStrategy objectDistributionStrategy;
final String prefixPattern = "bucket/topics/{{topic}}/partition/{{partition}}/";
String s3Prefix = "";
if (addPrefix) {
objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILEPATH;
s3Prefix = "topics/" + topicName + "/partition=" + partition + "/";
s3Prefix = "bucket/topics/" + topicName + "/partition/" + partition + "/";
} else {
objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILENAME;
}
Expand All @@ -284,7 +293,7 @@ void parquetTest(final boolean addPrefix) throws IOException {
final String name = "testuser";

final Map<String, String> connectorConfig = getAvroConfig(topicName, InputFormat.PARQUET, addPrefix, s3Prefix,
objectDistributionStrategy);
prefixPattern, objectDistributionStrategy);
connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig);
final Path path = ContentUtils.getTmpFilePath(name);

Expand All @@ -306,10 +315,10 @@ void parquetTest(final boolean addPrefix) throws IOException {
}

private Map<String, String> getAvroConfig(final String topicName, final InputFormat inputFormat,
final boolean addPrefix, final String s3Prefix,
final boolean addPrefix, final String s3Prefix, final String prefixPattern,
final ObjectDistributionStrategy objectDistributionStrategy) {
final Map<String, String> connectorConfig = getConfig(CONNECTOR_NAME, topicName, 4, objectDistributionStrategy,
addPrefix, s3Prefix);
addPrefix, s3Prefix, prefixPattern, "-");
connectorConfig.put(INPUT_FORMAT_KEY, inputFormat.getValue());
connectorConfig.put(SCHEMA_REGISTRY_URL, schemaRegistry.getSchemaRegistryUrl());
connectorConfig.put(VALUE_CONVERTER_KEY, "io.confluent.connect.avro.AvroConverter");
Expand All @@ -322,7 +331,7 @@ private Map<String, String> getAvroConfig(final String topicName, final InputFor
void jsonTest(final TestInfo testInfo) {
final var topicName = IntegrationBase.topicName(testInfo);
final Map<String, String> connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1,
ObjectDistributionStrategy.PARTITION_IN_FILENAME, false, "");
ObjectDistributionStrategy.PARTITION_IN_FILENAME, false, "", "", "-");
connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.JSONL.getValue());
connectorConfig.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.json.JsonConverter");

Expand All @@ -335,7 +344,7 @@ void jsonTest(final TestInfo testInfo) {
}
final byte[] jsonBytes = jsonBuilder.toString().getBytes(StandardCharsets.UTF_8);

final String offsetKey = writeToS3(topicName, jsonBytes, "1", false, "");
final String offsetKey = writeToS3(topicName, jsonBytes, "1", false, "", "-");

// Poll Json messages from the Kafka topic and deserialize them
final List<JsonNode> records = IntegrationBase.consumeJsonMessages(topicName, 500,
Expand Down Expand Up @@ -369,9 +378,9 @@ private static byte[] generateNextAvroMessagesStartingFromId(final int messageId
}

private static String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId,
final boolean addPrefix, final String s3Prefix) {
final String objectKey = (addPrefix ? addPrefixOrDefault(s3Prefix) : "") + topicName + "-" + partitionId + "-"
+ System.currentTimeMillis() + ".txt";
final boolean addPrefix, final String s3Prefix, final String separator) {
final String objectKey = (addPrefix ? addPrefixOrDefault(s3Prefix) : "") + topicName + separator + partitionId
+ separator + System.currentTimeMillis() + ".txt";
final PutObjectRequest request = PutObjectRequest.builder().bucket(TEST_BUCKET_NAME).key(objectKey).build();
s3Client.putObject(request, RequestBody.fromBytes(testDataBytes));
return OBJECT_KEY + SEPARATOR + objectKey;
Expand All @@ -382,14 +391,20 @@ private static String addPrefixOrDefault(final String s3Prefix) {
}

private Map<String, String> getConfig(final String connectorName, final String topics, final int maxTasks,
final ObjectDistributionStrategy taskDistributionConfig, final boolean addPrefix, final String s3Prefix) {
final ObjectDistributionStrategy taskDistributionConfig, final boolean addPrefix, final String s3Prefix,
final String prefixPattern, final String fileNameSeparator) {
final Map<String, String> config = new HashMap<>(basicS3ConnectorConfig(addPrefix, s3Prefix));
config.put("name", connectorName);
config.put(TARGET_TOPICS, topics);
config.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
config.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.converters.ByteArrayConverter");
config.put("tasks.max", String.valueOf(maxTasks));
config.put(OBJECT_DISTRIBUTION_STRATEGY, taskDistributionConfig.value());
config.put(FILE_NAME_TEMPLATE_CONFIG,
"{{topic}}" + fileNameSeparator + "{{partition}}" + fileNameSeparator + "{{start_offset}}");
if (addPrefix) {
config.put(FILE_PREFIX_TEMPLATE_CONFIG, prefixPattern);
}
return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static io.aiven.kafka.connect.common.config.SourceConfigFragment.MAX_POLL_RECORDS;
import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.ANY_FILENAME_PATTERN;
import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.DEFAULT_PREFIX_FILE_PATH_PATTERN;

import java.util.ArrayList;
import java.util.HashSet;
Expand Down Expand Up @@ -205,13 +204,13 @@ private DistributionStrategy initializeObjectDistributionStrategy() {

switch (objectDistributionStrategy) {
case PARTITION_IN_FILENAME :
this.filePattern = FilePatternUtils.configurePattern(
s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().originalTemplate());
this.filePattern = FilePatternUtils
.configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString());
distributionStrategy = new PartitionInFilenameDistributionStrategy(maxTasks);
break;
case PARTITION_IN_FILEPATH :
this.filePattern = FilePatternUtils
.configurePattern(DEFAULT_PREFIX_FILE_PATH_PATTERN + ANY_FILENAME_PATTERN);
this.filePattern = FilePatternUtils.configurePattern(
s3SourceConfig.getS3FileNameFragment().getFilePrefixTemplateConfig() + ANY_FILENAME_PATTERN);
distributionStrategy = new PartitionInPathDistributionStrategy(maxTasks);
break;
default :
Expand Down

0 comments on commit e23306c

Please sign in to comment.