Skip to content

Commit

Permalink
KAFKA-13255: Use config.properties.exclude when mirroring topics (#11401
Browse files Browse the repository at this point in the history
)


Reviewers: Mickael Maison <mickael.maison@gmail.com>
  • Loading branch information
bdesert authored and tombentley committed Apr 29, 2022
1 parent b068124 commit 742de94
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,15 @@ void computeAndCreateTopicPartitions() throws ExecutionException, InterruptedExc
}
}

private void createNewTopics(Set<String> newSourceTopics, Map<String, Long> sourceTopicToPartitionCounts)
// visible for testing
void createNewTopics(Set<String> newSourceTopics, Map<String, Long> sourceTopicToPartitionCounts)
throws ExecutionException, InterruptedException {
Map<String, Config> sourceTopicToConfig = describeTopicConfigs(newSourceTopics);
Map<String, NewTopic> newTopics = newSourceTopics.stream()
.map(sourceTopic -> {
String remoteTopic = formatRemoteTopic(sourceTopic);
int partitionCount = sourceTopicToPartitionCounts.get(sourceTopic).intValue();
Map<String, String> configs = configToMap(sourceTopicToConfig.get(sourceTopic));
Map<String, String> configs = configToMap(targetConfig(sourceTopicToConfig.get(sourceTopic)));
return new NewTopic(remoteTopic, partitionCount, (short) replicationFactor)
.configs(configs);
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
Expand Down Expand Up @@ -152,6 +155,49 @@ public void testConfigPropertyFiltering() {
.anyMatch(x -> x.name().equals("min.insync.replicas")), "should not replicate excluded properties");
}

@Test
public void testNewTopicConfigs() throws Exception {
Map<String, Object> filterConfig = new HashMap<>();
filterConfig.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "follower\\.replication\\.throttled\\.replicas, "
+ "leader\\.replication\\.throttled\\.replicas, "
+ "message\\.timestamp\\.difference\\.max\\.ms, "
+ "message\\.timestamp\\.type, "
+ "unclean\\.leader\\.election\\.enable, "
+ "min\\.insync\\.replicas,"
+ "exclude_param.*");
DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter();
filter.configure(filterConfig);

MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"),
new DefaultReplicationPolicy(), x -> true, filter));

final String topic = "testtopic";
List<ConfigEntry> entries = new ArrayList<>();
entries.add(new ConfigEntry("name-1", "value-1"));
entries.add(new ConfigEntry("exclude_param.param1", "value-param1"));
entries.add(new ConfigEntry("min.insync.replicas", "2"));
Config config = new Config(entries);
doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any());
doAnswer(invocation -> {
Map<String, NewTopic> newTopics = invocation.getArgument(0);
assertNotNull(newTopics.get("source." + topic));
Map<String, String> targetConfig = newTopics.get("source." + topic).configs();

// property 'name-1' isn't defined in the exclude filter -> should be replicated
assertNotNull(targetConfig.get("name-1"), "should replicate properties");

// this property is in default list, just double check it:
String prop1 = "min.insync.replicas";
assertNull(targetConfig.get(prop1), "should not replicate excluded properties " + prop1);
// this property is only in exclude filter custom parameter, also tests regex on the way:
String prop2 = "exclude_param.param1";
assertNull(targetConfig.get(prop2), "should not replicate excluded properties " + prop2);
return null;
}).when(connector).createNewTopics(any());
connector.createNewTopics(Collections.singleton(topic), Collections.singletonMap(topic, 1L));
verify(connector).createNewTopics(any(), any());
}

@Test
public void testMirrorSourceConnectorTaskConfig() {
List<TopicPartition> knownSourceTopicPartitions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public void testReplication() throws Exception {
waitForTopicCreated(backup, "test-topic-1");
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG),
"topic config was not synced");
createAndTestNewTopicWithConfigFilter();

assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
"Records were not produced to primary cluster.");
Expand Down Expand Up @@ -260,4 +261,12 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio
assertEquals(0, records.count(), "consumer record size is not zero");
backupConsumer.close();
}

/*
* Returns expected topic name on target cluster.
*/
@Override
String backupClusterTopicName(String topic) {
return topic;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
import org.apache.kafka.connect.mirror.MirrorClient;
import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
import org.apache.kafka.connect.mirror.MirrorMakerConfig;
Expand Down Expand Up @@ -60,6 +61,7 @@
import org.slf4j.LoggerFactory;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -155,6 +157,9 @@ public void startClusters(Map<String, String> additionalMM2Config) throws Except
mm2Props.putAll(basicMM2Config());
mm2Props.putAll(additionalMM2Config);

// exclude topic config:
mm2Props.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "delete\\.retention\\..*");

mm2Config = new MirrorMakerConfig(mm2Props);
primaryWorkerProps = mm2Config.workerConfig(new SourceAndTarget(BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS));
backupWorkerProps.putAll(mm2Config.workerConfig(new SourceAndTarget(PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS)));
Expand Down Expand Up @@ -259,7 +264,8 @@ public void testReplication() throws Exception {
waitForTopicCreated(primary, "mm2-offset-syncs.backup.internal");
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "primary.test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG),
"topic config was not synced");

createAndTestNewTopicWithConfigFilter();

assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
"Records were not produced to primary cluster.");
assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1").count(),
Expand Down Expand Up @@ -429,6 +435,7 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio
// one way replication from primary to backup
mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");


mm2Config = new MirrorMakerConfig(mm2Props);

waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
Expand Down Expand Up @@ -519,6 +526,44 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception {
assertFalse(primaryTopics.contains("mm2-offset-syncs." + BACKUP_CLUSTER_ALIAS + ".internal"));
}

/*
* Run tests for Exclude Filter for copying topic configurations
*/
void createAndTestNewTopicWithConfigFilter() throws Exception {
// create topic with configuration to test:
final Map<String, String> topicConfig = new HashMap<>();
topicConfig.put("delete.retention.ms", "1000"); // should be excluded (default value is 86400000)
topicConfig.put("retention.bytes", "1000"); // should be included, default value is -1

final String topic = "test-topic-with-config";
final String backupTopic = backupClusterTopicName(topic);

primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig);
waitForTopicCreated(backup, backupTopic);

String primaryConfig, backupConfig;

primaryConfig = getTopicConfig(primary.kafka(), topic, "delete.retention.ms");
backupConfig = getTopicConfig(backup.kafka(), backupTopic, "delete.retention.ms");
assertNotEquals(primaryConfig, backupConfig,
"`delete.retention.ms` should be different, because it's in exclude filter! ");

// regression test for the config that are still supposed to be replicated
primaryConfig = getTopicConfig(primary.kafka(), topic, "retention.bytes");
backupConfig = getTopicConfig(backup.kafka(), backupTopic, "retention.bytes");
assertEquals(primaryConfig, backupConfig,
"`retention.bytes` should be the same, because it isn't in exclude filter! ");
assertEquals("1000", backupConfig,
"`retention.bytes` should be the same, because it's explicitly defined! ");
}

/*
* Returns expected topic name on target cluster.
*/
String backupClusterTopicName(String topic) {
return PRIMARY_CLUSTER_ALIAS + "." + topic;
}

/*
* launch the connectors on kafka connect cluster and check if they are running
*/
Expand Down

0 comments on commit 742de94

Please sign in to comment.