diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java index 8d3156e2..507687b9 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java @@ -43,9 +43,12 @@ public final class FileNameFragment extends ConfigFragment { static final String FILE_MAX_RECORDS = "file.max.records"; static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone"; static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source"; - static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template"; + public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template"; static final String DEFAULT_FILENAME_TEMPLATE = "{{topic}}-{{partition}}-{{start_offset}}"; + public static final String FILE_PREFIX_TEMPLATE_CONFIG = "file.prefix.template"; + static final String DEFAULT_FILE_PREFIX_TEMPLATE = "topics/{{topic}}/partition={{partition}}/"; + public FileNameFragment(final AbstractConfig cfg) { super(cfg); } @@ -112,6 +115,16 @@ public void ensureValid(final String name, final Object value) { // UnusedAssignment ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_SOURCE); + configDef.define(FILE_PREFIX_TEMPLATE_CONFIG, ConfigDef.Type.STRING, DEFAULT_FILE_PREFIX_TEMPLATE, + new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, + "The template for file prefix on S3. " + + "Supports `{{ variable }}` placeholders for substituting variables. " + + "Currently supported variables are `topic` and `partition` " + + "and are mandatory to have these in the directory structure." + + "Example prefix : topics/{{topic}}/partition/{{partition}}/", + GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.LONG, FILE_PREFIX_TEMPLATE_CONFIG); + return configDef; } @@ -185,4 +198,8 @@ public int getMaxRecordsPerFile() { return cfg.getInt(FILE_MAX_RECORDS); } + public String getFilePrefixTemplateConfig() { + return cfg.getString(FILE_PREFIX_TEMPLATE_CONFIG); + } + } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FileExtractionPatterns.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FileExtractionPatterns.java index 301ac01a..499da563 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FileExtractionPatterns.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FileExtractionPatterns.java @@ -28,7 +28,5 @@ public class FileExtractionPatterns { public static final String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_PARTITION_KEY + ">\\d+)"; public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)"; public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)"; - public static final String DEFAULT_PREFIX_FILE_PATH_PATTERN = "topics/{{" + PATTERN_TOPIC_KEY + "}}/partition={{" - + PATTERN_PARTITION_KEY + "}}/"; public static final String ANY_FILENAME_PATTERN = ".*$"; } diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index dfff23c1..5ae53bb0 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -16,6 +16,8 @@ package io.aiven.kafka.connect.s3.source; +import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_NAME_TEMPLATE_CONFIG; +import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_PREFIX_TEMPLATE_CONFIG; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; @@ -164,6 +166,7 @@ void bytesTest(final boolean addPrefix) { final var topicName = IntegrationBase.topicName(testInfo); final ObjectDistributionStrategy objectDistributionStrategy; final int partitionId = 0; + final String prefixPattern = "topics/{{topic}}/partition={{partition}}/"; String s3Prefix = ""; if (addPrefix) { objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILEPATH; @@ -172,8 +175,10 @@ void bytesTest(final boolean addPrefix) { objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILENAME; } + final String fileNamePatternSeparator = "_"; + final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 2, objectDistributionStrategy, - addPrefix, s3Prefix); + addPrefix, s3Prefix, prefixPattern, fileNamePatternSeparator); connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue()); connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); @@ -182,13 +187,16 @@ void bytesTest(final boolean addPrefix) { final String testData2 = "Hello, Kafka Connect S3 Source! object 2"; final List offsetKeys = new ArrayList<>(); - // write 2 objects to s3 - offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "0", addPrefix, s3Prefix)); - offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "0", addPrefix, s3Prefix)); - offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "1", addPrefix, s3Prefix)); - offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "1", addPrefix, s3Prefix)); - offsetKeys.add(writeToS3(topicName, new byte[0], "3", addPrefix, s3Prefix)); + offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "0", addPrefix, s3Prefix, + fileNamePatternSeparator)); + offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "0", addPrefix, s3Prefix, + fileNamePatternSeparator)); + offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "1", addPrefix, s3Prefix, + fileNamePatternSeparator)); + offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "1", addPrefix, s3Prefix, + fileNamePatternSeparator)); + offsetKeys.add(writeToS3(topicName, new byte[0], "3", addPrefix, s3Prefix, "-")); assertThat(testBucketAccessor.listObjects()).hasSize(5); @@ -210,7 +218,7 @@ void bytesTest(final boolean addPrefix) { void avroTest(final TestInfo testInfo) throws IOException { final var topicName = IntegrationBase.topicName(testInfo); final boolean addPrefix = false; - final Map connectorConfig = getAvroConfig(topicName, InputFormat.AVRO, addPrefix, "", + final Map connectorConfig = getAvroConfig(topicName, InputFormat.AVRO, addPrefix, "", "", ObjectDistributionStrategy.OBJECT_HASH); connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); @@ -237,12 +245,12 @@ void avroTest(final TestInfo testInfo) throws IOException { final Set offsetKeys = new HashSet<>(); final String s3Prefix = ""; - offsetKeys.add(writeToS3(topicName, outputStream1, "1", false, s3Prefix)); - offsetKeys.add(writeToS3(topicName, outputStream2, "1", false, s3Prefix)); + offsetKeys.add(writeToS3(topicName, outputStream1, "1", false, s3Prefix, "-")); + offsetKeys.add(writeToS3(topicName, outputStream2, "1", false, s3Prefix, "-")); - offsetKeys.add(writeToS3(topicName, outputStream3, "2", false, s3Prefix)); - offsetKeys.add(writeToS3(topicName, outputStream4, "2", false, s3Prefix)); - offsetKeys.add(writeToS3(topicName, outputStream5, "2", false, s3Prefix)); + offsetKeys.add(writeToS3(topicName, outputStream3, "2", false, s3Prefix, "-")); + offsetKeys.add(writeToS3(topicName, outputStream4, "2", false, s3Prefix, "-")); + offsetKeys.add(writeToS3(topicName, outputStream5, "2", false, s3Prefix, "-")); assertThat(testBucketAccessor.listObjects()).hasSize(5); @@ -271,10 +279,11 @@ void parquetTest(final boolean addPrefix) throws IOException { final var topicName = IntegrationBase.topicName(testInfo); final String partition = "0"; final ObjectDistributionStrategy objectDistributionStrategy; + final String prefixPattern = "bucket/topics/{{topic}}/partition/{{partition}}/"; String s3Prefix = ""; if (addPrefix) { objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILEPATH; - s3Prefix = "topics/" + topicName + "/partition=" + partition + "/"; + s3Prefix = "bucket/topics/" + topicName + "/partition/" + partition + "/"; } else { objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILENAME; } @@ -284,7 +293,7 @@ void parquetTest(final boolean addPrefix) throws IOException { final String name = "testuser"; final Map connectorConfig = getAvroConfig(topicName, InputFormat.PARQUET, addPrefix, s3Prefix, - objectDistributionStrategy); + prefixPattern, objectDistributionStrategy); connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); final Path path = ContentUtils.getTmpFilePath(name); @@ -306,10 +315,10 @@ void parquetTest(final boolean addPrefix) throws IOException { } private Map getAvroConfig(final String topicName, final InputFormat inputFormat, - final boolean addPrefix, final String s3Prefix, + final boolean addPrefix, final String s3Prefix, final String prefixPattern, final ObjectDistributionStrategy objectDistributionStrategy) { final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 4, objectDistributionStrategy, - addPrefix, s3Prefix); + addPrefix, s3Prefix, prefixPattern, "-"); connectorConfig.put(INPUT_FORMAT_KEY, inputFormat.getValue()); connectorConfig.put(SCHEMA_REGISTRY_URL, schemaRegistry.getSchemaRegistryUrl()); connectorConfig.put(VALUE_CONVERTER_KEY, "io.confluent.connect.avro.AvroConverter"); @@ -322,7 +331,7 @@ private Map getAvroConfig(final String topicName, final InputFor void jsonTest(final TestInfo testInfo) { final var topicName = IntegrationBase.topicName(testInfo); final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1, - ObjectDistributionStrategy.PARTITION_IN_FILENAME, false, ""); + ObjectDistributionStrategy.PARTITION_IN_FILENAME, false, "", "", "-"); connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.JSONL.getValue()); connectorConfig.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.json.JsonConverter"); @@ -335,7 +344,7 @@ void jsonTest(final TestInfo testInfo) { } final byte[] jsonBytes = jsonBuilder.toString().getBytes(StandardCharsets.UTF_8); - final String offsetKey = writeToS3(topicName, jsonBytes, "1", false, ""); + final String offsetKey = writeToS3(topicName, jsonBytes, "1", false, "", "-"); // Poll Json messages from the Kafka topic and deserialize them final List records = IntegrationBase.consumeJsonMessages(topicName, 500, @@ -369,9 +378,9 @@ private static byte[] generateNextAvroMessagesStartingFromId(final int messageId } private static String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId, - final boolean addPrefix, final String s3Prefix) { - final String objectKey = (addPrefix ? addPrefixOrDefault(s3Prefix) : "") + topicName + "-" + partitionId + "-" - + System.currentTimeMillis() + ".txt"; + final boolean addPrefix, final String s3Prefix, final String separator) { + final String objectKey = (addPrefix ? addPrefixOrDefault(s3Prefix) : "") + topicName + separator + partitionId + + separator + System.currentTimeMillis() + ".txt"; final PutObjectRequest request = PutObjectRequest.builder().bucket(TEST_BUCKET_NAME).key(objectKey).build(); s3Client.putObject(request, RequestBody.fromBytes(testDataBytes)); return OBJECT_KEY + SEPARATOR + objectKey; @@ -382,7 +391,8 @@ private static String addPrefixOrDefault(final String s3Prefix) { } private Map getConfig(final String connectorName, final String topics, final int maxTasks, - final ObjectDistributionStrategy taskDistributionConfig, final boolean addPrefix, final String s3Prefix) { + final ObjectDistributionStrategy taskDistributionConfig, final boolean addPrefix, final String s3Prefix, + final String prefixPattern, final String fileNameSeparator) { final Map config = new HashMap<>(basicS3ConnectorConfig(addPrefix, s3Prefix)); config.put("name", connectorName); config.put(TARGET_TOPICS, topics); @@ -390,6 +400,11 @@ private Map getConfig(final String connectorName, final String t config.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.converters.ByteArrayConverter"); config.put("tasks.max", String.valueOf(maxTasks)); config.put(OBJECT_DISTRIBUTION_STRATEGY, taskDistributionConfig.value()); + config.put(FILE_NAME_TEMPLATE_CONFIG, + "{{topic}}" + fileNameSeparator + "{{partition}}" + fileNameSeparator + "{{start_offset}}"); + if (addPrefix) { + config.put(FILE_PREFIX_TEMPLATE_CONFIG, prefixPattern); + } return config; } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index 786f239d..f9df0fa9 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -18,7 +18,6 @@ import static io.aiven.kafka.connect.common.config.SourceConfigFragment.MAX_POLL_RECORDS; import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.ANY_FILENAME_PATTERN; -import static io.aiven.kafka.connect.common.source.input.utils.FileExtractionPatterns.DEFAULT_PREFIX_FILE_PATH_PATTERN; import java.util.ArrayList; import java.util.HashSet; @@ -205,13 +204,13 @@ private DistributionStrategy initializeObjectDistributionStrategy() { switch (objectDistributionStrategy) { case PARTITION_IN_FILENAME : - this.filePattern = FilePatternUtils.configurePattern( - s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().originalTemplate()); + this.filePattern = FilePatternUtils + .configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); distributionStrategy = new PartitionInFilenameDistributionStrategy(maxTasks); break; case PARTITION_IN_FILEPATH : - this.filePattern = FilePatternUtils - .configurePattern(DEFAULT_PREFIX_FILE_PATH_PATTERN + ANY_FILENAME_PATTERN); + this.filePattern = FilePatternUtils.configurePattern( + s3SourceConfig.getS3FileNameFragment().getFilePrefixTemplateConfig() + ANY_FILENAME_PATTERN); distributionStrategy = new PartitionInPathDistributionStrategy(maxTasks); break; default :