diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java index b2cbe02ebf226..ee6733033d28d 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java @@ -349,14 +349,15 @@ void computeAndCreateTopicPartitions() throws ExecutionException, InterruptedExc } } - private void createNewTopics(Set newSourceTopics, Map sourceTopicToPartitionCounts) + // visible for testing + void createNewTopics(Set newSourceTopics, Map sourceTopicToPartitionCounts) throws ExecutionException, InterruptedException { Map sourceTopicToConfig = describeTopicConfigs(newSourceTopics); Map newTopics = newSourceTopics.stream() .map(sourceTopic -> { String remoteTopic = formatRemoteTopic(sourceTopic); int partitionCount = sourceTopicToPartitionCounts.get(sourceTopic).intValue(); - Map configs = configToMap(sourceTopicToConfig.get(sourceTopic)); + Map configs = configToMap(targetConfig(sourceTopicToConfig.get(sourceTopic))); return new NewTopic(remoteTopic, partitionCount, (short) replicationFactor) .configs(configs); }) diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java index b4c8ca651d29a..7daa96b330fc8 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java @@ -36,7 +36,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.eq; @@ -152,6 +155,49 @@ public void testConfigPropertyFiltering() { .anyMatch(x -> x.name().equals("min.insync.replicas")), "should not replicate excluded properties"); } + @Test + public void testNewTopicConfigs() throws Exception { + Map filterConfig = new HashMap<>(); + filterConfig.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "follower\\.replication\\.throttled\\.replicas, " + + "leader\\.replication\\.throttled\\.replicas, " + + "message\\.timestamp\\.difference\\.max\\.ms, " + + "message\\.timestamp\\.type, " + + "unclean\\.leader\\.election\\.enable, " + + "min\\.insync\\.replicas," + + "exclude_param.*"); + DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter(); + filter.configure(filterConfig); + + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), x -> true, filter)); + + final String topic = "testtopic"; + List entries = new ArrayList<>(); + entries.add(new ConfigEntry("name-1", "value-1")); + entries.add(new ConfigEntry("exclude_param.param1", "value-param1")); + entries.add(new ConfigEntry("min.insync.replicas", "2")); + Config config = new Config(entries); + doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); + doAnswer(invocation -> { + Map newTopics = invocation.getArgument(0); + assertNotNull(newTopics.get("source." + topic)); + Map targetConfig = newTopics.get("source." + topic).configs(); + + // property 'name-1' isn't defined in the exclude filter -> should be replicated + assertNotNull(targetConfig.get("name-1"), "should replicate properties"); + + // this property is in default list, just double check it: + String prop1 = "min.insync.replicas"; + assertNull(targetConfig.get(prop1), "should not replicate excluded properties " + prop1); + // this property is only in exclude filter custom parameter, also tests regex on the way: + String prop2 = "exclude_param.param1"; + assertNull(targetConfig.get(prop2), "should not replicate excluded properties " + prop2); + return null; + }).when(connector).createNewTopics(any()); + connector.createNewTopics(Collections.singleton(topic), Collections.singletonMap(topic, 1L)); + verify(connector).createNewTopics(any(), any()); + } + @Test public void testMirrorSourceConnectorTaskConfig() { List knownSourceTopicPartitions = new ArrayList<>(); diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java index 43b1fcbf6d1d3..9e60e4880dc5e 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java @@ -86,6 +86,7 @@ public void testReplication() throws Exception { waitForTopicCreated(backup, "test-topic-1"); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG), "topic config was not synced"); + createAndTestNewTopicWithConfigFilter(); assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(), "Records were not produced to primary cluster."); @@ -260,4 +261,12 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio assertEquals(0, records.count(), "consumer record size is not zero"); backupConsumer.close(); } + + /* + * Returns expected topic name on target cluster. + */ + @Override + String backupClusterTopicName(String topic) { + return topic; + } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index 6fb7a81676bc2..8f692ca911612 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter; import org.apache.kafka.connect.mirror.MirrorClient; import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; import org.apache.kafka.connect.mirror.MirrorMakerConfig; @@ -60,6 +61,7 @@ import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -155,6 +157,9 @@ public void startClusters(Map additionalMM2Config) throws Except mm2Props.putAll(basicMM2Config()); mm2Props.putAll(additionalMM2Config); + // exclude topic config: + mm2Props.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "delete\\.retention\\..*"); + mm2Config = new MirrorMakerConfig(mm2Props); primaryWorkerProps = mm2Config.workerConfig(new SourceAndTarget(BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS)); backupWorkerProps.putAll(mm2Config.workerConfig(new SourceAndTarget(PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS))); @@ -259,7 +264,8 @@ public void testReplication() throws Exception { waitForTopicCreated(primary, "mm2-offset-syncs.backup.internal"); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "primary.test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG), "topic config was not synced"); - + createAndTestNewTopicWithConfigFilter(); + assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(), "Records were not produced to primary cluster."); assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1").count(), @@ -429,6 +435,7 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio // one way replication from primary to backup mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false"); + mm2Config = new MirrorMakerConfig(mm2Props); waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); @@ -519,6 +526,44 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception { assertFalse(primaryTopics.contains("mm2-offset-syncs." + BACKUP_CLUSTER_ALIAS + ".internal")); } + /* + * Run tests for Exclude Filter for copying topic configurations + */ + void createAndTestNewTopicWithConfigFilter() throws Exception { + // create topic with configuration to test: + final Map topicConfig = new HashMap<>(); + topicConfig.put("delete.retention.ms", "1000"); // should be excluded (default value is 86400000) + topicConfig.put("retention.bytes", "1000"); // should be included, default value is -1 + + final String topic = "test-topic-with-config"; + final String backupTopic = backupClusterTopicName(topic); + + primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig); + waitForTopicCreated(backup, backupTopic); + + String primaryConfig, backupConfig; + + primaryConfig = getTopicConfig(primary.kafka(), topic, "delete.retention.ms"); + backupConfig = getTopicConfig(backup.kafka(), backupTopic, "delete.retention.ms"); + assertNotEquals(primaryConfig, backupConfig, + "`delete.retention.ms` should be different, because it's in exclude filter! "); + + // regression test for the config that are still supposed to be replicated + primaryConfig = getTopicConfig(primary.kafka(), topic, "retention.bytes"); + backupConfig = getTopicConfig(backup.kafka(), backupTopic, "retention.bytes"); + assertEquals(primaryConfig, backupConfig, + "`retention.bytes` should be the same, because it isn't in exclude filter! "); + assertEquals("1000", backupConfig, + "`retention.bytes` should be the same, because it's explicitly defined! "); + } + + /* + * Returns expected topic name on target cluster. + */ + String backupClusterTopicName(String topic) { + return PRIMARY_CLUSTER_ALIAS + "." + topic; + } + /* * launch the connectors on kafka connect cluster and check if they are running */