Skip to content

Commit

Permalink
KAFKA-13790; ReplicaManager should be robust to all partition updates…
Browse files Browse the repository at this point in the history
… from kraft metadata log (#12085)

This patch refactors the `Partition.makeLeader` and `Partition.makeFollower` to be robust to all partition updates from the KRaft metadata log. Particularly, it ensures the following invariants:

- A partition update is accepted if the partition epoch is equal or newer. The partition epoch is updated by the AlterPartition path as well so we accept an update from the metadata log with the same partition epoch in order to fully update the partition state.
- The leader epoch state offset is only updated when the leader epoch is bumped.
- The follower states are only updated when the leader epoch is bumped.
- Fetchers are only restarted when the leader epoch is bumped. This was already the case but this patch adds unit tests to prove/maintain it.

In the mean time, the patch unifies the state change logs to be similar in both ZK and KRaft world.

Reviewers: Jason Gustafson <jason@confluent.io>
  • Loading branch information
a0x8o committed May 9, 2022
1 parent 10b0141 commit 641143a
Show file tree
Hide file tree
Showing 26 changed files with 756 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.kafka.clients;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -203,4 +205,15 @@ public static Map<String, Object> postProcessReconnectBackoffConfigs(AbstractCon
}
return rval;
}

public static void postValidateSaslMechanismConfig(AbstractConfig config) {
SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
String clientSaslMechanism = config.getString(SaslConfigs.SASL_MECHANISM);
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
if (clientSaslMechanism == null || clientSaslMechanism.isEmpty()) {
throw new ConfigException(SaslConfigs.SASL_MECHANISM, null, "When the " + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG +
" configuration enables SASL, mechanism must be non-null and non-empty string.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;

import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -212,6 +214,7 @@ public class AdminClientConfig extends AbstractConfig {
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
Expand All @@ -220,6 +223,7 @@ public class AdminClientConfig extends AbstractConfig {

@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
CommonClientConfigs.postValidateSaslMechanismConfig(this);
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;

Expand Down Expand Up @@ -351,6 +353,7 @@ public class ConsumerConfig extends AbstractConfig {
.define(GROUP_INSTANCE_ID_CONFIG,
Type.STRING,
null,
new ConfigDef.NonEmptyString(),
Importance.MEDIUM,
GROUP_INSTANCE_ID_DOC)
.define(SESSION_TIMEOUT_MS_CONFIG,
Expand Down Expand Up @@ -572,6 +575,7 @@ public class ConsumerConfig extends AbstractConfig {
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
Expand All @@ -580,6 +584,7 @@ public class ConsumerConfig extends AbstractConfig {

@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
CommonClientConfigs.postValidateSaslMechanismConfig(this);
Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
maybeOverrideClientId(refinedConfigs);
return refinedConfigs;
Expand All @@ -602,11 +607,16 @@ private void maybeOverrideClientId(Map<String, Object> configs) {
protected static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs,
Deserializer<?> keyDeserializer,
Deserializer<?> valueDeserializer) {
// validate deserializer configuration, if the passed deserializer instance is null, the user must explicitly set a valid deserializer configuration value
Map<String, Object> newConfigs = new HashMap<>(configs);
if (keyDeserializer != null)
newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
else if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null)
throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG, null, "must be non-null.");
if (valueDeserializer != null)
newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null)
throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG, null, "must be non-null.");
return newConfigs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -448,6 +449,7 @@ public class ProducerConfig extends AbstractConfig {
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(SECURITY_PROVIDERS_CONFIG,
Expand Down Expand Up @@ -477,6 +479,7 @@ public class ProducerConfig extends AbstractConfig {

@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
CommonClientConfigs.postValidateSaslMechanismConfig(this);
Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
postProcessAndValidateIdempotenceConfigs(refinedConfigs);
maybeOverrideClientId(refinedConfigs);
Expand Down Expand Up @@ -559,11 +562,16 @@ private static String parseAcks(String acksString) {
static Map<String, Object> appendSerializerToConfig(Map<String, Object> configs,
Serializer<?> keySerializer,
Serializer<?> valueSerializer) {
// validate serializer configuration, if the passed serializer instance is null, the user must explicitly set a valid serializer configuration value
Map<String, Object> newConfigs = new HashMap<>(configs);
if (keySerializer != null)
newConfigs.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass());
else if (newConfigs.get(KEY_SERIALIZER_CLASS_CONFIG) == null)
throw new ConfigException(KEY_SERIALIZER_CLASS_CONFIG, null, "must be non-null.");
if (valueSerializer != null)
newConfigs.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass());
else if (newConfigs.get(VALUE_SERIALIZER_CLASS_CONFIG) == null)
throw new ConfigException(VALUE_SERIALIZER_CLASS_CONFIG, null, "must be non-null.");
return newConfigs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public enum SslClientAuth {
NONE;

public static final List<SslClientAuth> VALUES =
Collections.unmodifiableList(Arrays.asList(SslClientAuth.values()));
Collections.unmodifiableList(Arrays.asList(SslClientAuth.values()));

public static SslClientAuth forConfig(String key) {
if (key == null) {
Expand All @@ -45,4 +45,9 @@ public static SslClientAuth forConfig(String key) {
}
return null;
}

@Override
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class CommonClientConfigsTest {
private static class TestConfig extends AbstractConfig {
Expand All @@ -44,11 +51,23 @@ private static class TestConfig extends AbstractConfig {
1000L,
atLeast(0L),
ConfigDef.Importance.LOW,
"");
"")
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
ConfigDef.Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(SaslConfigs.SASL_MECHANISM,
ConfigDef.Type.STRING,
SaslConfigs.DEFAULT_SASL_MECHANISM,
ConfigDef.Importance.MEDIUM,
SaslConfigs.SASL_MECHANISM_DOC);
}

@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
CommonClientConfigs.postValidateSaslMechanismConfig(this);
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
}

Expand Down Expand Up @@ -82,4 +101,17 @@ public void testExponentialBackoffDefaults() {
assertEquals(Long.valueOf(123L),
reconnectBackoffSetConf.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG));
}

@Test
public void testInvalidSaslMechanism() {
Map<String, Object> configs = new HashMap<>();
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
configs.put(SaslConfigs.SASL_MECHANISM, null);
ConfigException ce = assertThrows(ConfigException.class, () -> new TestConfig(configs));
assertTrue(ce.getMessage().contains(SaslConfigs.SASL_MECHANISM));

configs.put(SaslConfigs.SASL_MECHANISM, "");
ce = assertThrows(ConfigException.class, () -> new TestConfig(configs));
assertTrue(ce.getMessage().contains(SaslConfigs.SASL_MECHANISM));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.clients.consumer;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
Expand All @@ -30,6 +32,8 @@

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class ConsumerConfigTest {
Expand Down Expand Up @@ -98,6 +102,19 @@ public void testAppendDeserializerToConfig() {
assertEquals(newConfigs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), valueDeserializerClass);
}

@Test
public void testAppendDeserializerToConfigWithException() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, null);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
assertThrows(ConfigException.class, () -> ConsumerConfig.appendDeserializerToConfig(configs, null, valueDeserializer));

configs.clear();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, null);
assertThrows(ConfigException.class, () -> ConsumerConfig.appendDeserializerToConfig(configs, keyDeserializer, null));
}

@Test
public void ensureDefaultThrowOnUnsupportedStableFlagToFalse() {
assertFalse(new ConsumerConfig(properties).getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
Expand All @@ -108,4 +125,24 @@ public void testDefaultPartitionAssignor() {
assertEquals(Arrays.asList(RangeAssignor.class, CooperativeStickyAssignor.class),
new ConsumerConfig(properties).getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
}

@Test
public void testInvalidGroupInstanceId() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
configs.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "");
ConfigException ce = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
assertTrue(ce.getMessage().contains(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));
}

@Test
public void testInvalidSecurityProtocol() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc");
ConfigException ce = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@
*/
package org.apache.kafka.clients.producer;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.config.ConfigException;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ProducerConfigTest {

Expand Down Expand Up @@ -62,6 +64,19 @@ public void testAppendSerializerToConfig() {
assertEquals(newConfigs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClass);
}

@Test
public void testAppendSerializerToConfigWithException() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, null);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
assertThrows(ConfigException.class, () -> ProducerConfig.appendSerializerToConfig(configs, null, valueSerializer));

configs.clear();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, null);
assertThrows(ConfigException.class, () -> ProducerConfig.appendSerializerToConfig(configs, keySerializer, null));
}

@Test
public void testInvalidCompressionType() {
Map<String, Object> configs = new HashMap<>();
Expand All @@ -70,4 +85,14 @@ public void testInvalidCompressionType() {
configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "abc");
assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
}

@Test
public void testInvalidSecurityProtocol() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc");
ConfigException ce = assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;

import java.util.Map;
import java.util.HashMap;

import static org.apache.kafka.common.config.ConfigDef.ValidString.in;

/** Configuration required for MirrorClient to talk to a given target cluster.
* <p>
* Generally, these properties come from an mm2.properties configuration file
Expand Down Expand Up @@ -99,6 +103,7 @@ private Map<String, Object> clientConfig(String prefix) {
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
Expand All @@ -125,6 +130,7 @@ private Map<String, Object> clientConfig(String prefix) {
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
Expand Down
Loading

0 comments on commit 641143a

Please sign in to comment.