From d378d522460d8d68d4971565614ea1c67d53bc0b Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 11 May 2022 07:46:41 -0500 Subject: [PATCH] MINOR: reload4j build dependency fixes (#12144) * Replace `log4j` with `reload4j` in `copyDependantLibs`. Since we have some projects that have an explicit `reload4j` dependency, it was included in the final release release tar - i.e. it was effectively a workaround for this bug. * Exclude `log4j` and `slf4j-log4j12` transitive dependencies for `streams:upgrade-system-tests`. Versions 0100 and 0101 had a transitive dependency to `log4j` and `slf4j-log4j12` via `zkclient` and `zookeeper`. This avoids classpath conflicts that lead to [NoSuchFieldError](https://github.com/qos-ch/reload4j/issues/41) in system tests. Reviewers: Jason Gustafson --- build.gradle | 32 +- .../clients/ClusterConnectionStates.java | 1 - .../clients/ClusterConnectionStatesTest.java | 38 +- .../kafka/clients/NetworkClientTest.java | 31 +- .../connect/mirror/MirrorCheckpointTask.java | 2 +- .../kafka/connect/mirror/OffsetSync.java | 6 +- .../kafka/connect/mirror/OffsetSyncStore.java | 6 +- .../kafka/connect/mirror/Scheduler.java | 22 +- .../MirrorConnectorsIntegrationBaseTest.java | 2 +- core/src/main/scala/kafka/api/Request.scala | 4 + .../kafka/server/ConfigAdminManager.scala | 3 +- .../scala/kafka/server/DelayedFetch.scala | 86 ++-- .../scala/kafka/server/FetchDataInfo.scala | 60 ++- .../main/scala/kafka/server/KafkaApis.scala | 74 +-- .../server/ReplicaAlterLogDirsThread.scala | 25 +- .../scala/kafka/server/ReplicaManager.scala | 56 +- .../metadata/BrokerMetadataPublisher.scala | 9 + .../server/metadata/MetadataPublisher.scala | 5 + .../api/PlaintextAdminIntegrationTest.scala | 71 ++- .../kafka/server/DelayedFetchTest.scala | 55 +- .../unit/kafka/server/KafkaApisTest.scala | 28 +- .../ReplicaAlterLogDirsThreadTest.scala | 54 +- .../ReplicaManagerConcurrencyTest.scala | 20 +- .../server/ReplicaManagerQuotasTest.scala | 50 +- .../kafka/server/ReplicaManagerTest.scala | 487 +++++++++--------- .../metadata/BrokerMetadataListenerTest.scala | 4 + .../BrokerMetadataPublisherTest.scala | 23 +- .../scala/unit/kafka/utils/TestUtils.scala | 19 + gradle/dependencies.gradle | 1 + .../kafka/controller/AclControlManager.java | 9 +- .../ConfigurationControlManager.java | 16 +- .../controller/AclControlManagerTest.java | 30 ++ .../ConfigurationControlManagerTest.java | 67 ++- .../org/apache/kafka/raft/RaftConfig.java | 2 +- tests/docker/Dockerfile | 2 + vagrant/base.sh | 3 + 36 files changed, 858 insertions(+), 545 deletions(-) diff --git a/build.gradle b/build.gradle index 68dbbf513d..b80de61385 100644 --- a/build.gradle +++ b/build.gradle @@ -922,7 +922,7 @@ project(':core') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -1701,7 +1701,7 @@ project(':tools') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -1751,7 +1751,7 @@ project(':trogdor') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2071,7 +2071,10 @@ project(':streams:upgrade-system-tests-0100') { archivesBaseName = "kafka-streams-upgrade-system-tests-0100" dependencies { - testImplementation libs.kafkaStreams_0100 + testImplementation(libs.kafkaStreams_0100) { + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'log4j', module: 'log4j' + } testRuntimeOnly libs.junitJupiter } @@ -2084,7 +2087,10 @@ project(':streams:upgrade-system-tests-0101') { archivesBaseName = "kafka-streams-upgrade-system-tests-0101" dependencies { - testImplementation libs.kafkaStreams_0101 + testImplementation(libs.kafkaStreams_0101) { + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'log4j', module: 'log4j' + } testRuntimeOnly libs.junitJupiter } @@ -2391,7 +2397,7 @@ project(':connect:api') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2428,7 +2434,7 @@ project(':connect:transforms') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2468,7 +2474,7 @@ project(':connect:json') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2534,8 +2540,8 @@ project(':connect:runtime') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { + // No need to copy log4j since the module has an explicit dependency on that include('slf4j-log4j12*') - include('log4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2614,7 +2620,7 @@ project(':connect:file') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2653,7 +2659,7 @@ project(':connect:basic-auth-extension') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2700,7 +2706,7 @@ project(':connect:mirror') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') @@ -2735,7 +2741,7 @@ project(':connect:mirror-client') { tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntimeClasspath) { include('slf4j-log4j12*') - include('log4j*jar') + include('reload4j*jar') } from (configurations.runtimeClasspath) { exclude('kafka-clients*') diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index 95efdbeae4..f4d9092258 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -246,7 +246,6 @@ public long pollDelayMs(String id, long now) { public void checkingApiVersions(String id) { NodeConnectionState nodeState = nodeState(id); nodeState.state = ConnectionState.CHECKING_API_VERSIONS; - resetReconnectBackoff(nodeState); resetConnectionSetupTimeout(nodeState); connectingNodes.remove(id); } diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java index 72cc123921..96fe89ca11 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java @@ -231,20 +231,8 @@ public void testMaxReconnectBackoff() { @Test public void testExponentialReconnectBackoff() { - double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1)) - / Math.log(reconnectBackoffExpBase); - - // Run through 10 disconnects and check that reconnect backoff value is within expected range for every attempt - for (int i = 0; i < 10; i++) { - connectionStates.connecting(nodeId1, time.milliseconds(), "localhost"); - connectionStates.disconnected(nodeId1, time.milliseconds()); - // Calculate expected backoff value without jitter - long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp)) - * reconnectBackoffMs); - long currentBackoff = connectionStates.connectionDelay(nodeId1, time.milliseconds()); - assertEquals(expectedBackoff, currentBackoff, reconnectBackoffJitter * expectedBackoff); - time.sleep(connectionStates.connectionDelay(nodeId1, time.milliseconds()) + 1); - } + verifyReconnectExponentialBackoff(false); + verifyReconnectExponentialBackoff(true); } @Test @@ -426,4 +414,26 @@ private void setupMultipleIPs() { this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax, connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, new LogContext(), this.multipleIPHostResolver); } + + private void verifyReconnectExponentialBackoff(boolean enterCheckingApiVersionState) { + double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1)) + / Math.log(reconnectBackoffExpBase); + + connectionStates.remove(nodeId1); + // Run through 10 disconnects and check that reconnect backoff value is within expected range for every attempt + for (int i = 0; i < 10; i++) { + connectionStates.connecting(nodeId1, time.milliseconds(), "localhost"); + if (enterCheckingApiVersionState) { + connectionStates.checkingApiVersions(nodeId1); + } + + connectionStates.disconnected(nodeId1, time.milliseconds()); + // Calculate expected backoff value without jitter + long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp)) + * reconnectBackoffMs); + long currentBackoff = connectionStates.connectionDelay(nodeId1, time.milliseconds()); + assertEquals(expectedBackoff, currentBackoff, reconnectBackoffJitter * expectedBackoff); + time.sleep(connectionStates.connectionDelay(nodeId1, time.milliseconds()) + 1); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index fe1e9d1920..63b44835f6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -82,6 +82,8 @@ public class NetworkClientTest { protected final long reconnectBackoffMaxMsTest = 10 * 10000; protected final long connectionSetupTimeoutMsTest = 5 * 1000; protected final long connectionSetupTimeoutMaxMsTest = 127 * 1000; + private final int reconnectBackoffExpBase = ClusterConnectionStates.RECONNECT_BACKOFF_EXP_BASE; + private final double reconnectBackoffJitter = ClusterConnectionStates.RECONNECT_BACKOFF_JITTER; private final TestMetadataUpdater metadataUpdater = new TestMetadataUpdater(Collections.singletonList(node)); private final NetworkClient client = createNetworkClient(reconnectBackoffMaxMsTest); private final NetworkClient clientWithNoExponentialBackoff = createNetworkClient(reconnectBackoffMsTest); @@ -831,13 +833,28 @@ public void testDisconnectDuringUserMetadataRequest() { @Test public void testServerDisconnectAfterInternalApiVersionRequest() throws Exception { - awaitInFlightApiVersionRequest(); - selector.serverDisconnect(node.idString()); - - // The failed ApiVersion request should not be forwarded to upper layers - List responses = client.poll(0, time.milliseconds()); - assertFalse(client.hasInFlightRequests(node.idString())); - assertTrue(responses.isEmpty()); + final long numIterations = 5; + double reconnectBackoffMaxExp = Math.log(reconnectBackoffMaxMsTest / (double) Math.max(reconnectBackoffMsTest, 1)) + / Math.log(reconnectBackoffExpBase); + for (int i = 0; i < numIterations; i++) { + selector.clear(); + awaitInFlightApiVersionRequest(); + selector.serverDisconnect(node.idString()); + + // The failed ApiVersion request should not be forwarded to upper layers + List responses = client.poll(0, time.milliseconds()); + assertFalse(client.hasInFlightRequests(node.idString())); + assertTrue(responses.isEmpty()); + + long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp)) + * reconnectBackoffMsTest); + long delay = client.connectionDelay(node, time.milliseconds()); + assertEquals(expectedBackoff, delay, reconnectBackoffJitter * expectedBackoff); + if (i == numIterations - 1) { + break; + } + time.sleep(delay + 1); + } } @Test diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index 47631998fb..30fb695d92 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -105,7 +105,7 @@ public void start(Map props) { } @Override - public void commit() throws InterruptedException { + public void commit() { // nop } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java index 68e6441f18..e1ecb1e1db 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java @@ -39,9 +39,9 @@ public class OffsetSync { new Field(TOPIC_KEY, Type.STRING), new Field(PARTITION_KEY, Type.INT32)); - private TopicPartition topicPartition; - private long upstreamOffset; - private long downstreamOffset; + private final TopicPartition topicPartition; + private final long upstreamOffset; + private final long downstreamOffset; public OffsetSync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) { this.topicPartition = topicPartition; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java index 600dda46f3..9152cd5aa0 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java @@ -30,9 +30,9 @@ /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */ class OffsetSyncStore implements AutoCloseable { - private KafkaConsumer consumer; - private Map offsetSyncs = new HashMap<>(); - private TopicPartition offsetSyncTopicPartition; + private final KafkaConsumer consumer; + private final Map offsetSyncs = new HashMap<>(); + private final TopicPartition offsetSyncTopicPartition; OffsetSyncStore(MirrorConnectorConfig config) { consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(), diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java index 20f2ca7e2c..0644d6a6c6 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java @@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory; class Scheduler implements AutoCloseable { - private static Logger log = LoggerFactory.getLogger(Scheduler.class); + private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class); private final String name; private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); @@ -62,11 +62,11 @@ void execute(Task task, String description) { try { executor.submit(() -> executeThread(task, description)).get(timeout.toMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - log.warn("{} was interrupted running task: {}", name, description); + LOG.warn("{} was interrupted running task: {}", name, description); } catch (TimeoutException e) { - log.error("{} timed out running task: {}", name, description); + LOG.error("{} timed out running task: {}", name, description); } catch (Throwable e) { - log.error("{} caught exception in task: {}", name, description, e); + LOG.error("{} caught exception in task: {}", name, description, e); } } @@ -76,10 +76,10 @@ public void close() { try { boolean terminated = executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); if (!terminated) { - log.error("{} timed out during shutdown of internal scheduler.", name); + LOG.error("{} timed out during shutdown of internal scheduler.", name); } } catch (InterruptedException e) { - log.warn("{} was interrupted during shutdown of internal scheduler.", name); + LOG.warn("{} was interrupted during shutdown of internal scheduler.", name); } } @@ -92,21 +92,21 @@ private void run(Task task, String description) { long start = System.currentTimeMillis(); task.run(); long elapsed = System.currentTimeMillis() - start; - log.info("{} took {} ms", description, elapsed); + LOG.info("{} took {} ms", description, elapsed); if (elapsed > timeout.toMillis()) { - log.warn("{} took too long ({} ms) running task: {}", name, elapsed, description); + LOG.warn("{} took too long ({} ms) running task: {}", name, elapsed, description); } } catch (InterruptedException e) { - log.warn("{} was interrupted running task: {}", name, description); + LOG.warn("{} was interrupted running task: {}", name, description); } catch (Throwable e) { - log.error("{} caught exception in scheduled task: {}", name, description, e); + LOG.error("{} caught exception in scheduled task: {}", name, description, e); } } private void executeThread(Task task, String description) { Thread.currentThread().setName(name + "-" + description); if (closed) { - log.info("{} skipping task due to shutdown: {}", name, description); + LOG.info("{} skipping task due to shutdown: {}", name, description); return; } run(task, description); diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index 8f692ca911..f325e15695 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -729,7 +729,7 @@ private void createTopics() { /* * Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly */ - protected void warmUpConsumer(Map consumerProps) throws InterruptedException { + protected void warmUpConsumer(Map consumerProps) { Consumer dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1"); dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); dummyConsumer.commitSync(); diff --git a/core/src/main/scala/kafka/api/Request.scala b/core/src/main/scala/kafka/api/Request.scala index 653b5f653a..6c405a45b0 100644 --- a/core/src/main/scala/kafka/api/Request.scala +++ b/core/src/main/scala/kafka/api/Request.scala @@ -25,6 +25,10 @@ object Request { // Broker ids are non-negative int. def isValidBrokerId(brokerId: Int): Boolean = brokerId >= 0 + def isConsumer(replicaId: Int): Boolean = { + replicaId < 0 && replicaId != FutureLocalReplicaId + } + def describeReplicaId(replicaId: Int): String = { replicaId match { case OrdinaryConsumerId => "consumer" diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala b/core/src/main/scala/kafka/server/ConfigAdminManager.scala index a7f5c6bdef..e7d6c33ab2 100644 --- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala +++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala @@ -499,7 +499,8 @@ object ConfigAdminManager { .orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST))) .getOrElse("") .split(",").toList - val newValueList = oldValueList ::: alterConfigOp.configEntry.value.split(",").toList + val appendingValueList = alterConfigOp.configEntry.value.split(",").toList.filter(value => !oldValueList.contains(value)) + val newValueList = oldValueList ::: appendingValueList configProps.setProperty(alterConfigOp.configEntry.name, newValueList.mkString(",")) } case OpType.SUBTRACT => { diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 8d38ef8b6d..3eb8eedf4c 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -23,7 +23,6 @@ import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.common.TopicIdPartition import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.replica.ClientMetadata import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} @@ -38,36 +37,23 @@ case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInf } } -/** - * The fetch metadata maintained by the delayed fetch operation - */ -case class FetchMetadata(fetchMinBytes: Int, - fetchMaxBytes: Int, - hardMaxBytesLimit: Boolean, - fetchOnlyLeader: Boolean, - fetchIsolation: FetchIsolation, - isFromFollower: Boolean, - replicaId: Int, - fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]) { - - override def toString = "FetchMetadata(minBytes=" + fetchMinBytes + ", " + - "maxBytes=" + fetchMaxBytes + ", " + - "onlyLeader=" + fetchOnlyLeader + ", " + - "fetchIsolation=" + fetchIsolation + ", " + - "replicaId=" + replicaId + ", " + - "partitionStatus=" + fetchPartitionStatus + ")" -} /** * A delayed fetch operation that can be created by the replica manager and watched * in the fetch operation purgatory */ -class DelayedFetch(delayMs: Long, - fetchMetadata: FetchMetadata, - replicaManager: ReplicaManager, - quota: ReplicaQuota, - clientMetadata: Option[ClientMetadata], - responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit) - extends DelayedOperation(delayMs) { +class DelayedFetch( + params: FetchParams, + fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], + replicaManager: ReplicaManager, + quota: ReplicaQuota, + responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit +) extends DelayedOperation(params.maxWaitMs) { + + override def toString: String = { + s"DelayedFetch(params=$params" + + s", numPartitions=${fetchPartitionStatus.size}" + + ")" + } /** * The operation can be completed if: @@ -84,16 +70,16 @@ class DelayedFetch(delayMs: Long, */ override def tryComplete(): Boolean = { var accumulatedSize = 0 - fetchMetadata.fetchPartitionStatus.foreach { + fetchPartitionStatus.foreach { case (topicIdPartition, fetchStatus) => val fetchOffset = fetchStatus.startOffsetMetadata val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch try { if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { val partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition) - val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader) + val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, params.fetchOnlyLeader) - val endOffset = fetchMetadata.fetchIsolation match { + val endOffset = params.isolation match { case FetchLogEnd => offsetSnapshot.logEndOffset case FetchHighWatermark => offsetSnapshot.highWatermark case FetchTxnCommitted => offsetSnapshot.lastStableOffset @@ -105,19 +91,19 @@ class DelayedFetch(delayMs: Long, if (endOffset.messageOffset != fetchOffset.messageOffset) { if (endOffset.onOlderSegment(fetchOffset)) { // Case F, this can happen when the new fetch operation is on a truncated leader - debug(s"Satisfying fetch $fetchMetadata since it is fetching later segments of partition $topicIdPartition.") + debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") return forceComplete() } else if (fetchOffset.onOlderSegment(endOffset)) { // Case F, this can happen when the fetch operation is falling behind the current segment // or the partition has just rolled a new segment - debug(s"Satisfying fetch $fetchMetadata immediately since it is fetching older segments.") + debug(s"Satisfying fetch $this immediately since it is fetching older segments.") // We will not force complete the fetch request if a replica should be throttled. - if (!fetchMetadata.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, fetchMetadata.replicaId)) + if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) return forceComplete() } else if (fetchOffset.messageOffset < endOffset.messageOffset) { // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) - if (!fetchMetadata.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, fetchMetadata.replicaId)) + if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) accumulatedSize += bytesAvailable } } @@ -131,7 +117,7 @@ class DelayedFetch(delayMs: Long, debug(s"Could not obtain last offset for leader epoch for partition $topicIdPartition, epochEndOffset=$epochEndOffset.") return forceComplete() } else if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) { - debug(s"Satisfying fetch $fetchMetadata since it has diverging epoch requiring truncation for partition " + + debug(s"Satisfying fetch $this since it has diverging epoch requiring truncation for partition " + s"$topicIdPartition epochEndOffset=$epochEndOffset fetchEpoch=$fetchEpoch fetchOffset=${fetchStatus.fetchInfo.fetchOffset}.") return forceComplete() } @@ -139,30 +125,30 @@ class DelayedFetch(delayMs: Long, } } catch { case _: NotLeaderOrFollowerException => // Case A or Case B - debug(s"Broker is no longer the leader or follower of $topicIdPartition, satisfy $fetchMetadata immediately") + debug(s"Broker is no longer the leader or follower of $topicIdPartition, satisfy $this immediately") return forceComplete() case _: UnknownTopicOrPartitionException => // Case C - debug(s"Broker no longer knows of partition $topicIdPartition, satisfy $fetchMetadata immediately") + debug(s"Broker no longer knows of partition $topicIdPartition, satisfy $this immediately") return forceComplete() case _: KafkaStorageException => // Case D - debug(s"Partition $topicIdPartition is in an offline log directory, satisfy $fetchMetadata immediately") + debug(s"Partition $topicIdPartition is in an offline log directory, satisfy $this immediately") return forceComplete() case _: FencedLeaderEpochException => // Case E debug(s"Broker is the leader of partition $topicIdPartition, but the requested epoch " + - s"$fetchLeaderEpoch is fenced by the latest leader epoch, satisfy $fetchMetadata immediately") + s"$fetchLeaderEpoch is fenced by the latest leader epoch, satisfy $this immediately") return forceComplete() } } // Case G - if (accumulatedSize >= fetchMetadata.fetchMinBytes) + if (accumulatedSize >= params.minBytes) forceComplete() else false } override def onExpiration(): Unit = { - if (fetchMetadata.isFromFollower) + if (params.isFromFollower) DelayedFetchMetrics.followerExpiredRequestMeter.mark() else DelayedFetchMetrics.consumerExpiredRequestMeter.mark() @@ -173,18 +159,18 @@ class DelayedFetch(delayMs: Long, */ override def onComplete(): Unit = { val logReadResults = replicaManager.readFromLocalLog( - replicaId = fetchMetadata.replicaId, - fetchOnlyFromLeader = fetchMetadata.fetchOnlyLeader, - fetchIsolation = fetchMetadata.fetchIsolation, - fetchMaxBytes = fetchMetadata.fetchMaxBytes, - hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit, - readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo }, - clientMetadata = clientMetadata, + replicaId = params.replicaId, + fetchOnlyFromLeader = params.fetchOnlyLeader, + fetchIsolation = params.isolation, + fetchMaxBytes = params.maxBytes, + hardMaxBytesLimit = params.hardMaxBytesLimit, + readPartitionInfo = fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo }, + clientMetadata = params.clientMetadata, quota = quota) val fetchPartitionData = logReadResults.map { case (tp, result) => - val isReassignmentFetch = fetchMetadata.isFromFollower && - replicaManager.isAddingReplica(tp.topicPartition, fetchMetadata.replicaId) + val isReassignmentFetch = params.isFromFollower && + replicaManager.isAddingReplica(tp.topicPartition, params.replicaId) tp -> result.toFetchPartitionData(isReassignmentFetch) } diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala index f6cf725843..82e8092c10 100644 --- a/core/src/main/scala/kafka/server/FetchDataInfo.scala +++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala @@ -17,15 +17,67 @@ package kafka.server +import kafka.api.Request +import org.apache.kafka.common.IsolationLevel import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.record.Records +import org.apache.kafka.common.replica.ClientMetadata +import org.apache.kafka.common.requests.FetchRequest sealed trait FetchIsolation case object FetchLogEnd extends FetchIsolation case object FetchHighWatermark extends FetchIsolation case object FetchTxnCommitted extends FetchIsolation -case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, - records: Records, - firstEntryIncomplete: Boolean = false, - abortedTransactions: Option[List[FetchResponseData.AbortedTransaction]] = None) +object FetchIsolation { + def apply( + request: FetchRequest + ): FetchIsolation = { + apply(request.replicaId, request.isolationLevel) + } + + def apply( + replicaId: Int, + isolationLevel: IsolationLevel + ): FetchIsolation = { + if (!Request.isConsumer(replicaId)) + FetchLogEnd + else if (isolationLevel == IsolationLevel.READ_COMMITTED) + FetchTxnCommitted + else + FetchHighWatermark + } +} + +case class FetchParams( + requestVersion: Short, + replicaId: Int, + maxWaitMs: Long, + minBytes: Int, + maxBytes: Int, + isolation: FetchIsolation, + clientMetadata: Option[ClientMetadata] +) { + def isFromFollower: Boolean = Request.isValidBrokerId(replicaId) + def isFromConsumer: Boolean = Request.isConsumer(replicaId) + def fetchOnlyLeader: Boolean = isFromFollower || (isFromConsumer && clientMetadata.isEmpty) + def hardMaxBytesLimit: Boolean = requestVersion <= 2 + + override def toString: String = { + s"FetchParams(requestVersion=$requestVersion" + + s", replicaId=$replicaId" + + s", maxWaitMs=$maxWaitMs" + + s", minBytes=$minBytes" + + s", maxBytes=$maxBytes" + + s", isolation=$isolation" + + s", clientMetadata= $clientMetadata" + + ")" + } +} + +case class FetchDataInfo( + fetchOffsetMetadata: LogOffsetMetadata, + records: Records, + firstEntryIncomplete: Boolean = false, + abortedTransactions: Option[List[FetchResponseData.AbortedTransaction]] = None +) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 84c14e069c..dd3fb2dfea 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -695,18 +695,6 @@ class KafkaApis(val requestChannel: RequestChannel, forgottenTopics, topicNames) - val clientMetadata: Option[ClientMetadata] = if (versionId >= 11) { - // Fetch API version 11 added preferred replica logic - Some(new DefaultClientMetadata( - fetchRequest.rackId, - clientId, - request.context.clientAddress, - request.context.principal, - request.context.listenerName.value)) - } else { - None - } - val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower) { @@ -943,31 +931,49 @@ class KafkaApis(val requestChannel: RequestChannel, } } - // for fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being throttled given - // no bytes were recorded in the recent quota window - // trying to fetch more bytes would result in a guaranteed throttling potentially blocking consumer progress - val maxQuotaWindowBytes = if (fetchRequest.isFromFollower) - Int.MaxValue - else - quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt - - val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes) - val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes) - if (interesting.isEmpty) + if (interesting.isEmpty) { processResponseCallback(Seq.empty) - else { + } else { + // for fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being throttled given + // no bytes were recorded in the recent quota window + // trying to fetch more bytes would result in a guaranteed throttling potentially blocking consumer progress + val maxQuotaWindowBytes = if (fetchRequest.isFromFollower) + Int.MaxValue + else + quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt + + val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes) + val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes) + + val clientMetadata: Option[ClientMetadata] = if (versionId >= 11) { + // Fetch API version 11 added preferred replica logic + Some(new DefaultClientMetadata( + fetchRequest.rackId, + clientId, + request.context.clientAddress, + request.context.principal, + request.context.listenerName.value)) + } else { + None + } + + val params = FetchParams( + requestVersion = versionId, + replicaId = fetchRequest.replicaId, + maxWaitMs = fetchRequest.maxWait, + minBytes = fetchMinBytes, + maxBytes = fetchMaxBytes, + isolation = FetchIsolation(fetchRequest), + clientMetadata = clientMetadata + ) + // call the replica manager to fetch messages from the local replica replicaManager.fetchMessages( - fetchRequest.maxWait.toLong, - fetchRequest.replicaId, - fetchMinBytes, - fetchMaxBytes, - versionId <= 2, - interesting, - replicationQuota(fetchRequest), - processResponseCallback, - fetchRequest.isolationLevel, - clientMetadata) + params = params, + fetchInfos = interesting, + quota = replicationQuota(fetchRequest), + responseCallback = processResponseCallback, + ) } } diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 2ce33c838a..4a6a6e070c 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -100,17 +100,22 @@ class ReplicaAlterLogDirsThread(name: String, val fetchData = request.fetchData(topicNames.asJava) + val fetchParams = FetchParams( + requestVersion = request.version, + maxWaitMs = 0L, // timeout is 0 so that the callback will be executed immediately + replicaId = Request.FutureLocalReplicaId, + minBytes = request.minBytes, + maxBytes = request.maxBytes, + isolation = FetchLogEnd, + clientMetadata = None + ) + replicaMgr.fetchMessages( - 0L, // timeout is 0 so that the callback will be executed immediately - Request.FutureLocalReplicaId, - request.minBytes, - request.maxBytes, - false, - fetchData.asScala.toSeq, - UnboundedQuota, - processResponseCallback, - request.isolationLevel, - None) + params = fetchParams, + fetchInfos = fetchData.asScala.toSeq, + quota = UnboundedQuota, + responseCallback = processResponseCallback + ) if (partitionData == null) throw new IllegalStateException(s"Failed to fetch data for partitions ${fetchData.keySet().toArray.mkString(",")}") diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 1c97671c26..e84abbe5f4 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -29,7 +29,6 @@ import kafka.common.RecordValidationException import kafka.controller.{KafkaController, StateChangeLogger} import kafka.log._ import kafka.metrics.KafkaMetricsGroup -import kafka.server.{FetchMetadata => SFetchMetadata} import kafka.server.HostedPartition.Online import kafka.server.QuotaFactory.QuotaManagers import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} @@ -989,38 +988,24 @@ class ReplicaManager(val config: KafkaConfig, * the callback function will be triggered either when timeout or required fetch info is satisfied. * Consumers may fetch from any replica, but followers can only fetch from the leader. */ - def fetchMessages(timeout: Long, - replicaId: Int, - fetchMinBytes: Int, - fetchMaxBytes: Int, - hardMaxBytesLimit: Boolean, - fetchInfos: Seq[(TopicIdPartition, PartitionData)], - quota: ReplicaQuota, - responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit, - isolationLevel: IsolationLevel, - clientMetadata: Option[ClientMetadata]): Unit = { - val isFromFollower = Request.isValidBrokerId(replicaId) - val isFromConsumer = !(isFromFollower || replicaId == Request.FutureLocalReplicaId) - val fetchIsolation = if (!isFromConsumer) - FetchLogEnd - else if (isolationLevel == IsolationLevel.READ_COMMITTED) - FetchTxnCommitted - else - FetchHighWatermark - + def fetchMessages( + params: FetchParams, + fetchInfos: Seq[(TopicIdPartition, PartitionData)], + quota: ReplicaQuota, + responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit + ): Unit = { // Restrict fetching to leader if request is from follower or from a client with older version (no ClientMetadata) - val fetchOnlyFromLeader = isFromFollower || (isFromConsumer && clientMetadata.isEmpty) def readFromLog(): Seq[(TopicIdPartition, LogReadResult)] = { val result = readFromLocalLog( - replicaId = replicaId, - fetchOnlyFromLeader = fetchOnlyFromLeader, - fetchIsolation = fetchIsolation, - fetchMaxBytes = fetchMaxBytes, - hardMaxBytesLimit = hardMaxBytesLimit, + replicaId = params.replicaId, + fetchOnlyFromLeader = params.fetchOnlyLeader, + fetchIsolation = params.isolation, + fetchMaxBytes = params.maxBytes, + hardMaxBytesLimit = params.hardMaxBytesLimit, readPartitionInfo = fetchInfos, quota = quota, - clientMetadata = clientMetadata) - if (isFromFollower) updateFollowerFetchState(replicaId, result) + clientMetadata = params.clientMetadata) + if (params.isFromFollower) updateFollowerFetchState(params.replicaId, result) else result } @@ -1051,10 +1036,10 @@ class ReplicaManager(val config: KafkaConfig, // 4) some error happens while reading data // 5) we found a diverging epoch // 6) has a preferred read replica - if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || + if (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData || hasDivergingEpoch || hasPreferredReadReplica) { val fetchPartitionData = logReadResults.map { case (tp, result) => - val isReassignmentFetch = isFromFollower && isAddingReplica(tp.topicPartition, replicaId) + val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId) tp -> result.toFetchPartitionData(isReassignmentFetch) } responseCallback(fetchPartitionData) @@ -1067,10 +1052,13 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, - fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus) - val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata, - responseCallback) + val delayedFetch = new DelayedFetch( + params = params, + fetchPartitionStatus = fetchPartitionStatus, + replicaManager = this, + quota = quota, + responseCallback = responseCallback + ) // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index e653e6e5b2..fb6bb61544 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -18,6 +18,7 @@ package kafka.server.metadata import java.util.Properties +import java.util.concurrent.atomic.AtomicLong import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.TransactionCoordinator @@ -118,6 +119,11 @@ class BrokerMetadataPublisher(conf: KafkaConfig, */ var _firstPublish = true + /** + * This is updated after all components (e.g. LogManager) has finished publishing the new metadata delta + */ + val publishedOffsetAtomic = new AtomicLong(-1) + override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = { val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch() @@ -249,6 +255,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig, if (_firstPublish) { finishInitializingReplicaManager(newImage) } + publishedOffsetAtomic.set(newImage.highestOffsetAndEpoch().offset) } catch { case t: Throwable => error(s"Error publishing broker metadata at $highestOffsetAndEpoch", t) throw t @@ -257,6 +264,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig, } } + override def publishedOffset: Long = publishedOffsetAtomic.get() + def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = { conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props) } diff --git a/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala index 104d164d9c..b63a2c056c 100644 --- a/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala @@ -30,4 +30,9 @@ trait MetadataPublisher { * delta to the previous image. */ def publish(delta: MetadataDelta, newImage: MetadataImage): Unit + + /** + * The highest offset of metadata topic which has been published + */ + def publishedOffset: Long } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 81e9f692a8..d6aa7a7e9a 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -1755,8 +1755,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(0, allReassignmentsMap.size()) } - @Test - def testValidIncrementalAlterConfigs(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testValidIncrementalAlterConfigs(quorum: String): Unit = { client = Admin.create(createConfig) // Create topics @@ -1793,6 +1794,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet) alterResult.all.get + if (isKRaftTest()) { + TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer, "Timeout waiting for topic configs propagating to brokers") + } + // Verify that topics were updated correctly var describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava) var configs = describeResult.all.get @@ -1807,7 +1812,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals("lz4", configs.get(topic2Resource).get(LogConfig.CompressionTypeProp).value) assertEquals("delete,compact", configs.get(topic2Resource).get(LogConfig.CleanupPolicyProp).value) - //verify subtract operation, including from an empty property + // verify subtract operation, including from an empty property topic1AlterConfigs = Seq( new AlterConfigOp(new ConfigEntry(LogConfig.CleanupPolicyProp, LogConfig.Compact), AlterConfigOp.OpType.SUBTRACT), new AlterConfigOp(new ConfigEntry(LogConfig.LeaderReplicationThrottledReplicasProp, "0"), AlterConfigOp.OpType.SUBTRACT) @@ -1825,6 +1830,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet) alterResult.all.get + if (isKRaftTest()) { + TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer, "Timeout waiting for topic configs propagating to brokers") + } + // Verify that topics were updated correctly describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava) configs = describeResult.all.get @@ -1852,7 +1861,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals("delete", configs.get(topic1Resource).get(LogConfig.CleanupPolicyProp).value) - //Alter topics with validateOnly=true with invalid configs + // Alter topics with validateOnly=true with invalid configs topic1AlterConfigs = Seq( new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "zip"), AlterConfigOp.OpType.SET) ).asJava @@ -1861,8 +1870,56 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { topic1Resource -> topic1AlterConfigs ).asJava, new AlterConfigsOptions().validateOnly(true)) - assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException], - Some("Invalid config value for resource")) + if (isKRaftTest()) { + assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException], + Some("Invalid value zip for configuration compression.type")) + } else { + assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException], + Some("Invalid config value for resource")) + } + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAppendAlreadyExistsConfigsAndSubtractNotExistsConfigs(quorum: String): Unit = { + client = Admin.create(createConfig) + + // Create topics + val topic = "incremental-alter-configs-topic" + val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic) + + val appendValues = s"0:${brokers.head.config.brokerId}" + val subtractValues = brokers.tail.map(broker => s"0:${broker.config.brokerId}").mkString(",") + assertNotEquals("", subtractValues) + + val topicCreateConfigs = new Properties + topicCreateConfigs.setProperty(LogConfig.LeaderReplicationThrottledReplicasProp, appendValues) + createTopic(topic, numPartitions = 1, replicationFactor = 1, topicCreateConfigs) + + // Append value that is already present + val topicAppendConfigs = Seq( + new AlterConfigOp(new ConfigEntry(LogConfig.LeaderReplicationThrottledReplicasProp, appendValues), AlterConfigOp.OpType.APPEND), + ).asJavaCollection + + val appendResult = client.incrementalAlterConfigs(Map(topicResource -> topicAppendConfigs).asJava) + appendResult.all.get + + // Subtract values that are not present + val topicSubtractConfigs = Seq( + new AlterConfigOp(new ConfigEntry(LogConfig.LeaderReplicationThrottledReplicasProp, subtractValues), AlterConfigOp.OpType.SUBTRACT) + ).asJavaCollection + val subtractResult = client.incrementalAlterConfigs(Map(topicResource -> topicSubtractConfigs).asJava) + subtractResult.all.get + + if (isKRaftTest()) { + TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer) + } + + // Verify that topics were updated correctly + val describeResult = client.describeConfigs(Seq(topicResource).asJava) + val configs = describeResult.all.get + + assertEquals(appendValues, configs.get(topicResource).get(LogConfig.LeaderReplicationThrottledReplicasProp).value) } @Test @@ -2352,7 +2409,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { * Note: this test requires some custom static broker and controller configurations, which are set up in * BaseAdminIntegrationTest.modifyConfigs and BaseAdminIntegrationTest.kraftControllerConfigs. */ - @ParameterizedTest + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testCreateTopicsReturnsConfigs(quorum: String): Unit = { client = Admin.create(super.createConfig) diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index 581af29bec..940968f411 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -17,13 +17,14 @@ package kafka.server import java.util.Optional + import scala.collection.Seq import kafka.cluster.Partition import kafka.log.LogOffsetSnapshot import org.apache.kafka.common.{TopicIdPartition, Uuid} import org.apache.kafka.common.errors.{FencedLeaderEpochException, NotLeaderOrFollowerException} import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset -import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.FetchRequest import org.junit.jupiter.api.Test @@ -47,7 +48,7 @@ class DelayedFetchTest { val fetchStatus = FetchPartitionStatus( startOffsetMetadata = LogOffsetMetadata(fetchOffset), fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) - val fetchMetadata = buildFetchMetadata(replicaId, topicIdPartition, fetchStatus) + val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) var fetchResultOpt: Option[FetchPartitionData] = None def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { @@ -55,12 +56,12 @@ class DelayedFetchTest { } val delayedFetch = new DelayedFetch( - delayMs = 500, - fetchMetadata = fetchMetadata, + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), replicaManager = replicaManager, quota = replicaQuota, - clientMetadata = None, - responseCallback = callback) + responseCallback = callback + ) val partition: Partition = mock(classOf[Partition]) @@ -93,7 +94,7 @@ class DelayedFetchTest { val fetchStatus = FetchPartitionStatus( startOffsetMetadata = LogOffsetMetadata(fetchOffset), fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) - val fetchMetadata = buildFetchMetadata(replicaId, topicIdPartition, fetchStatus) + val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) var fetchResultOpt: Option[FetchPartitionData] = None def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { @@ -101,12 +102,12 @@ class DelayedFetchTest { } val delayedFetch = new DelayedFetch( - delayMs = 500, - fetchMetadata = fetchMetadata, + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), replicaManager = replicaManager, quota = replicaQuota, - clientMetadata = None, - responseCallback = callback) + responseCallback = callback + ) when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) .thenThrow(new NotLeaderOrFollowerException(s"Replica for $topicIdPartition not available")) @@ -130,7 +131,7 @@ class DelayedFetchTest { val fetchStatus = FetchPartitionStatus( startOffsetMetadata = LogOffsetMetadata(fetchOffset), fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch)) - val fetchMetadata = buildFetchMetadata(replicaId, topicIdPartition, fetchStatus) + val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) var fetchResultOpt: Option[FetchPartitionData] = None def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { @@ -138,12 +139,12 @@ class DelayedFetchTest { } val delayedFetch = new DelayedFetch( - delayMs = 500, - fetchMetadata = fetchMetadata, + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), replicaManager = replicaManager, quota = replicaQuota, - clientMetadata = None, - responseCallback = callback) + responseCallback = callback + ) val partition: Partition = mock(classOf[Partition]) when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) @@ -166,17 +167,19 @@ class DelayedFetchTest { assertTrue(fetchResultOpt.isDefined) } - private def buildFetchMetadata(replicaId: Int, - topicIdPartition: TopicIdPartition, - fetchStatus: FetchPartitionStatus): FetchMetadata = { - FetchMetadata(fetchMinBytes = 1, - fetchMaxBytes = maxBytes, - hardMaxBytesLimit = false, - fetchOnlyLeader = true, - fetchIsolation = FetchLogEnd, - isFromFollower = true, + private def buildFollowerFetchParams( + replicaId: Int, + maxWaitMs: Int + ): FetchParams = { + FetchParams( + requestVersion = ApiKeys.FETCH.latestVersion, replicaId = replicaId, - fetchPartitionStatus = Seq((topicIdPartition, fetchStatus))) + maxWaitMs = maxWaitMs, + minBytes = 1, + maxBytes = maxBytes, + isolation = FetchLogEnd, + clientMetadata = None + ) } private def expectReadFromReplica(replicaId: Int, diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 972fc21795..de8bfabc25 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -71,7 +71,6 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity} import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record._ -import org.apache.kafka.common.replica.ClientMetadata import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse @@ -2358,12 +2357,13 @@ class KafkaApisTest { when(replicaManager.getLogConfig(ArgumentMatchers.eq(tp))).thenReturn(None) - when(replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean, - any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], any[ReplicaQuota], - any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit](), any[IsolationLevel], - any[Option[ClientMetadata]]) - ).thenAnswer(invocation => { - val callback = invocation.getArgument(7).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit] + when(replicaManager.fetchMessages( + any[FetchParams], + any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], + any[ReplicaQuota], + any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]() + )).thenAnswer(invocation => { + val callback = invocation.getArgument(3).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit] val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8))) callback(Seq(tidp -> FetchPartitionData(Errors.NONE, hw, 0, records, @@ -2946,12 +2946,13 @@ class KafkaApisTest { val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8))) - when(replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean, - any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], any[ReplicaQuota], - any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit](), any[IsolationLevel], - any[Option[ClientMetadata]]) - ).thenAnswer(invocation => { - val callback = invocation.getArgument(7).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit] + when(replicaManager.fetchMessages( + any[FetchParams], + any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], + any[ReplicaQuota], + any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]() + )).thenAnswer(invocation => { + val callback = invocation.getArgument(3).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit] callback(Seq(tidp0 -> FetchPartitionData(Errors.NONE, hw, 0, records, None, None, None, Option.empty, isReassignmentFetch = isReassigning))) }) @@ -2978,7 +2979,6 @@ class KafkaApisTest { else assertEquals(0, brokerTopicStats.allTopicsStats.reassignmentBytesOutPerSec.get.count()) assertEquals(records.sizeInBytes(), brokerTopicStats.allTopicsStats.replicationBytesOutRate.get.count()) - } @Test diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index bf2671a16e..2c93a2c2af 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -31,10 +31,10 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.{FetchRequest, UpdateMetadataRequest} -import org.apache.kafka.common.{IsolationLevel, TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test -import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong} +import org.mockito.ArgumentMatchers.{any, anyBoolean} import org.mockito.Mockito.{doNothing, mock, never, times, verify, when} import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} @@ -271,18 +271,26 @@ class ReplicaAlterLogDirsThreadTest { responseData: FetchPartitionData): Unit = { val callbackCaptor: ArgumentCaptor[Seq[(TopicIdPartition, FetchPartitionData)] => Unit] = ArgumentCaptor.forClass(classOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]) + + val expectedFetchParams = FetchParams( + requestVersion = ApiKeys.FETCH.latestVersion, + replicaId = Request.FutureLocalReplicaId, + maxWaitMs = 0L, + minBytes = 0, + maxBytes = config.replicaFetchResponseMaxBytes, + isolation = FetchLogEnd, + clientMetadata = None + ) + + println(expectedFetchParams) + when(replicaManager.fetchMessages( - timeout = ArgumentMatchers.eq(0L), - replicaId = ArgumentMatchers.eq(Request.FutureLocalReplicaId), - fetchMinBytes = ArgumentMatchers.eq(0), - fetchMaxBytes = ArgumentMatchers.eq(config.replicaFetchResponseMaxBytes), - hardMaxBytesLimit = ArgumentMatchers.eq(false), + params = ArgumentMatchers.eq(expectedFetchParams), fetchInfos = ArgumentMatchers.eq(Seq(topicIdPartition -> requestData)), quota = ArgumentMatchers.eq(UnboundedQuota), responseCallback = callbackCaptor.capture(), - isolationLevel = ArgumentMatchers.eq(IsolationLevel.READ_UNCOMMITTED), - clientMetadata = ArgumentMatchers.eq(None) )).thenAnswer(_ => { + println("Did we get the callback?") callbackCaptor.getValue.apply(Seq((topicIdPartition, responseData))) }) } @@ -701,16 +709,10 @@ class ReplicaAlterLogDirsThreadTest { when(replicaManager.logManager).thenReturn(logManager) when(replicaManager.fetchMessages( - anyLong(), - anyInt(), - anyInt(), - anyInt(), - any(), - any(), - any(), + any[FetchParams], + any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], + any[ReplicaQuota], responseCallback.capture(), - any(), - any(), )).thenAnswer(_ => responseCallback.getValue.apply(Seq.empty[(TopicIdPartition, FetchPartitionData)])) //Create the thread @@ -939,16 +941,10 @@ class ReplicaAlterLogDirsThreadTest { responseCallback: ArgumentCaptor[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]): Unit = { stub(logT1p0, logT1p1, futureLog, partition, replicaManager) when(replicaManager.fetchMessages( - anyLong(), - anyInt(), - anyInt(), - anyInt(), - any(), - any(), - any(), - responseCallback.capture(), - any(), - any()) - ).thenAnswer(_ => responseCallback.getValue.apply(Seq.empty[(TopicIdPartition, FetchPartitionData)])) + any[FetchParams], + any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], + any[ReplicaQuota], + responseCallback.capture() + )).thenAnswer(_ => responseCallback.getValue.apply(Seq.empty[(TopicIdPartition, FetchPartitionData)])) } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala index d281788ab8..df95f701c5 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala @@ -28,7 +28,7 @@ import kafka.utils.TestUtils.waitUntilTrue import kafka.utils.{MockTime, ShutdownableThread, TestUtils} import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, TopicRecord} import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.SimpleRecord import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata import org.apache.kafka.common.requests.{FetchRequest, ProduceResponse} @@ -224,17 +224,21 @@ class ReplicaManagerConcurrencyTest { } } - replicaManager.fetchMessages( - timeout = random.nextInt(100), + val fetchParams = FetchParams( + requestVersion = ApiKeys.FETCH.latestVersion, replicaId = replicaId, - fetchMinBytes = 1, - fetchMaxBytes = 1024 * 1024, - hardMaxBytesLimit = false, + maxWaitMs = random.nextInt(100), + minBytes = 1, + maxBytes = 1024 * 1024, + isolation = FetchIsolation(replicaId, IsolationLevel.READ_UNCOMMITTED), + clientMetadata = Some(clientMetadata) + ) + + replicaManager.fetchMessages( + params = fetchParams, fetchInfos = Seq(topicIdPartition -> partitionData), quota = QuotaFactory.UnboundedQuota, responseCallback = fetchCallback, - isolationLevel = IsolationLevel.READ_UNCOMMITTED, - clientMetadata = Some(clientMetadata) ) val fetchResult = future.get() diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index d7b69bc61d..ea03b8d2fe 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -18,11 +18,13 @@ package kafka.server import java.io.File import java.util.{Collections, Optional, Properties} + import kafka.cluster.Partition import kafka.log.{LogManager, LogOffsetSnapshot, UnifiedLog} import kafka.server.QuotaFactory.QuotaManagers import kafka.utils._ import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.requests.FetchRequest.PartitionData @@ -205,17 +207,23 @@ class ReplicaManagerQuotasTest { val tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("t1", 0)) val fetchPartitionStatus = FetchPartitionStatus(LogOffsetMetadata(messageOffset = 50L, segmentBaseOffset = 0L, relativePositionInSegment = 250), new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty())) - val fetchMetadata = FetchMetadata(fetchMinBytes = 1, - fetchMaxBytes = 1000, - hardMaxBytesLimit = true, - fetchOnlyLeader = true, - fetchIsolation = FetchLogEnd, - isFromFollower = true, + val fetchParams = FetchParams( + requestVersion = ApiKeys.FETCH.latestVersion, replicaId = 1, - fetchPartitionStatus = List((tp, fetchPartitionStatus)) + maxWaitMs = 600, + minBytes = 1, + maxBytes = 1000, + isolation = FetchLogEnd, + clientMetadata = None ) - new DelayedFetch(delayMs = 600, fetchMetadata = fetchMetadata, replicaManager = replicaManager, - quota = null, clientMetadata = None, responseCallback = null) { + + new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(tp -> fetchPartitionStatus), + replicaManager = replicaManager, + quota = null, + responseCallback = null + ) { override def forceComplete(): Boolean = true } } @@ -249,17 +257,23 @@ class ReplicaManagerQuotasTest { val tidp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("t1", 0)) val fetchPartitionStatus = FetchPartitionStatus(LogOffsetMetadata(messageOffset = 50L, segmentBaseOffset = 0L, relativePositionInSegment = 250), new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty())) - val fetchMetadata = FetchMetadata(fetchMinBytes = 1, - fetchMaxBytes = 1000, - hardMaxBytesLimit = true, - fetchOnlyLeader = true, - fetchIsolation = FetchLogEnd, - isFromFollower = false, + val fetchParams = FetchParams( + requestVersion = ApiKeys.FETCH.latestVersion, replicaId = FetchRequest.CONSUMER_REPLICA_ID, - fetchPartitionStatus = List((tidp, fetchPartitionStatus)) + maxWaitMs = 600, + minBytes = 1, + maxBytes = 1000, + isolation = FetchHighWatermark, + clientMetadata = None ) - new DelayedFetch(delayMs = 600, fetchMetadata = fetchMetadata, replicaManager = replicaManager, - quota = null, clientMetadata = None, responseCallback = null) { + + new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(tidp -> fetchPartitionStatus), + replicaManager = replicaManager, + quota = null, + responseCallback = null + ) { override def forceComplete(): Boolean = true } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index ac2fc9926d..aa28ce7269 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -269,7 +269,7 @@ class ReplicaManagerTest { Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) - assertTrue(appendResult.isFired) + assertTrue(appendResult.hasFired) } finally { rm.shutdown(checkpointHW = false) } @@ -508,12 +508,15 @@ class ReplicaManagerTest { } // fetch as follower to advance the high watermark - fetchAsFollower(replicaManager, new TopicIdPartition(topicId, new TopicPartition(topic, 0)), + fetchPartitionAsFollower( + replicaManager, + new TopicIdPartition(topicId, new TopicPartition(topic, 0)), new PartitionData(Uuid.ZERO_UUID, numRecords, 0, 100000, Optional.empty()), - isolationLevel = IsolationLevel.READ_UNCOMMITTED) + replicaId = 1 + ) // fetch should return empty since LSO should be stuck at 0 - var consumerFetchResult = fetchAsConsumer(replicaManager, new TopicIdPartition(topicId, new TopicPartition(topic, 0)), + var consumerFetchResult = fetchPartitionAsConsumer(replicaManager, new TopicIdPartition(topicId, new TopicPartition(topic, 0)), new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), isolationLevel = IsolationLevel.READ_COMMITTED) var fetchData = consumerFetchResult.assertFired @@ -523,10 +526,15 @@ class ReplicaManagerTest { assertEquals(Some(List.empty[FetchResponseData.AbortedTransaction]), fetchData.abortedTransactions) // delayed fetch should timeout and return nothing - consumerFetchResult = fetchAsConsumer(replicaManager, new TopicIdPartition(topicId, new TopicPartition(topic, 0)), + consumerFetchResult = fetchPartitionAsConsumer( + replicaManager, + new TopicIdPartition(topicId, new TopicPartition(topic, 0)), new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), - isolationLevel = IsolationLevel.READ_COMMITTED, minBytes = 1000) - assertFalse(consumerFetchResult.isFired) + isolationLevel = IsolationLevel.READ_COMMITTED, + minBytes = 1000, + maxWaitMs = 1000 + ) + assertFalse(consumerFetchResult.hasFired) timer.advanceClock(1001) fetchData = consumerFetchResult.assertFired @@ -544,21 +552,27 @@ class ReplicaManagerTest { // the LSO has advanced, but the appended commit marker has not been replicated, so // none of the data from the transaction should be visible yet - consumerFetchResult = fetchAsConsumer(replicaManager, new TopicIdPartition(topicId, new TopicPartition(topic, 0)), + consumerFetchResult = fetchPartitionAsConsumer( + replicaManager, + new TopicIdPartition(topicId, new TopicPartition(topic, 0)), new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), - isolationLevel = IsolationLevel.READ_COMMITTED) + isolationLevel = IsolationLevel.READ_COMMITTED + ) fetchData = consumerFetchResult.assertFired assertEquals(Errors.NONE, fetchData.error) assertTrue(fetchData.records.batches.asScala.isEmpty) // fetch as follower to advance the high watermark - fetchAsFollower(replicaManager, new TopicIdPartition(topicId, new TopicPartition(topic, 0)), + fetchPartitionAsFollower( + replicaManager, + new TopicIdPartition(topicId, new TopicPartition(topic, 0)), new PartitionData(Uuid.ZERO_UUID, numRecords + 1, 0, 100000, Optional.empty()), - isolationLevel = IsolationLevel.READ_UNCOMMITTED) + replicaId = 1 + ) // now all of the records should be fetchable - consumerFetchResult = fetchAsConsumer(replicaManager, new TopicIdPartition(topicId, new TopicPartition(topic, 0)), + consumerFetchResult = fetchPartitionAsConsumer(replicaManager, new TopicIdPartition(topicId, new TopicPartition(topic, 0)), new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), isolationLevel = IsolationLevel.READ_COMMITTED) @@ -622,16 +636,24 @@ class ReplicaManagerTest { .onFire { response => assertEquals(Errors.NONE, response.error) } // fetch as follower to advance the high watermark - fetchAsFollower(replicaManager, new TopicIdPartition(topicId, new TopicPartition(topic, 0)), + fetchPartitionAsFollower( + replicaManager, + new TopicIdPartition(topicId, new TopicPartition(topic, 0)), new PartitionData(Uuid.ZERO_UUID, numRecords + 1, 0, 100000, Optional.empty()), - isolationLevel = IsolationLevel.READ_UNCOMMITTED) + replicaId = 1 + ) // Set the minBytes in order force this request to enter purgatory. When it returns, we should still // see the newly aborted transaction. - val fetchResult = fetchAsConsumer(replicaManager, new TopicIdPartition(topicId, new TopicPartition(topic, 0)), + val fetchResult = fetchPartitionAsConsumer( + replicaManager, + new TopicIdPartition(topicId, new TopicPartition(topic, 0)), new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), - isolationLevel = IsolationLevel.READ_COMMITTED, minBytes = 10000) - assertFalse(fetchResult.isFired) + isolationLevel = IsolationLevel.READ_COMMITTED, + minBytes = 10000, + maxWaitMs = 1000 + ) + assertFalse(fetchResult.hasFired) timer.advanceClock(1001) val fetchData = fetchResult.assertFired @@ -687,8 +709,12 @@ class ReplicaManagerTest { } // Followers are always allowed to fetch above the high watermark - val followerFetchResult = fetchAsFollower(rm, new TopicIdPartition(topicId, new TopicPartition(topic, 0)), - new PartitionData(Uuid.ZERO_UUID, 1, 0, 100000, Optional.empty())) + val followerFetchResult = fetchPartitionAsFollower( + rm, + new TopicIdPartition(topicId, new TopicPartition(topic, 0)), + new PartitionData(Uuid.ZERO_UUID, 1, 0, 100000, Optional.empty()), + replicaId = 1 + ) val followerFetchData = followerFetchResult.assertFired assertEquals(Errors.NONE, followerFetchData.error, "Should not give an exception") assertTrue(followerFetchData.records.batches.iterator.hasNext, "Should return some data") @@ -696,7 +722,7 @@ class ReplicaManagerTest { // Consumers are not allowed to consume above the high watermark. However, since the // high watermark could be stale at the time of the request, we do not return an out of // range error and instead return an empty record set. - val consumerFetchResult = fetchAsConsumer(rm, new TopicIdPartition(topicId, new TopicPartition(topic, 0)), + val consumerFetchResult = fetchPartitionAsConsumer(rm, new TopicIdPartition(topicId, new TopicPartition(topic, 0)), new PartitionData(Uuid.ZERO_UUID, 1, 0, 100000, Optional.empty())) val consumerFetchData = consumerFetchResult.assertFired assertEquals(Errors.NONE, consumerFetchData.error, "Should not give an exception") @@ -753,51 +779,34 @@ class ReplicaManagerTest { } // We receive one valid request from the follower and replica state is updated - var successfulFetch: Option[FetchPartitionData] = None - def callback(response: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { - successfulFetch = response.headOption.filter(_._1 == tidp).map(_._2) - } - val validFetchPartitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, maxFetchBytes, Optional.of(leaderEpoch)) - replicaManager.fetchMessages( - timeout = 0L, - replicaId = 1, - fetchMinBytes = 1, - fetchMaxBytes = maxFetchBytes, - hardMaxBytesLimit = false, - fetchInfos = Seq(tidp -> validFetchPartitionData), - quota = UnboundedQuota, - isolationLevel = IsolationLevel.READ_UNCOMMITTED, - responseCallback = callback, - clientMetadata = None + val validFetchResult = fetchPartitionAsFollower( + replicaManager, + tidp, + validFetchPartitionData, + replicaId = 1 ) - assertTrue(successfulFetch.isDefined) + assertEquals(Errors.NONE, validFetchResult.assertFired.error) assertEquals(0L, followerReplica.stateSnapshot.logStartOffset) assertEquals(0L, followerReplica.stateSnapshot.logEndOffset) - // Next we receive an invalid request with a higher fetch offset, but an old epoch. // We expect that the replica state does not get updated. val invalidFetchPartitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 3L, 0L, maxFetchBytes, Optional.of(leaderEpoch - 1)) - replicaManager.fetchMessages( - timeout = 0L, - replicaId = 1, - fetchMinBytes = 1, - fetchMaxBytes = maxFetchBytes, - hardMaxBytesLimit = false, - fetchInfos = Seq(tidp -> invalidFetchPartitionData), - quota = UnboundedQuota, - isolationLevel = IsolationLevel.READ_UNCOMMITTED, - responseCallback = callback, - clientMetadata = None + + val invalidFetchResult = fetchPartitionAsFollower( + replicaManager, + tidp, + invalidFetchPartitionData, + replicaId = 1 ) - assertTrue(successfulFetch.isDefined) + assertEquals(Errors.FENCED_LEADER_EPOCH, invalidFetchResult.assertFired.error) assertEquals(0L, followerReplica.stateSnapshot.logStartOffset) assertEquals(0L, followerReplica.stateSnapshot.logEndOffset) @@ -806,23 +815,17 @@ class ReplicaManagerTest { val divergingFetchPartitionData = new FetchRequest.PartitionData(tidp.topicId, 3L, 0L, maxFetchBytes, Optional.of(leaderEpoch), Optional.of(leaderEpoch - 1)) - replicaManager.fetchMessages( - timeout = 0L, - replicaId = 1, - fetchMinBytes = 1, - fetchMaxBytes = maxFetchBytes, - hardMaxBytesLimit = false, - fetchInfos = Seq(tidp -> divergingFetchPartitionData), - quota = UnboundedQuota, - isolationLevel = IsolationLevel.READ_UNCOMMITTED, - responseCallback = callback, - clientMetadata = None + val divergingEpochResult = fetchPartitionAsFollower( + replicaManager, + tidp, + divergingFetchPartitionData, + replicaId = 1 ) - assertTrue(successfulFetch.isDefined) + assertEquals(Errors.NONE, divergingEpochResult.assertFired.error) + assertTrue(divergingEpochResult.assertFired.divergingEpoch.isDefined) assertEquals(0L, followerReplica.stateSnapshot.logStartOffset) assertEquals(0L, followerReplica.stateSnapshot.logEndOffset) - } finally { replicaManager.shutdown(checkpointHW = false) } @@ -871,18 +874,14 @@ class ReplicaManagerTest { def callback(response: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { successfulFetch = response } - replicaManager.fetchMessages( - timeout = 0L, + + fetchPartitions( + replicaManager, replicaId = 1, - fetchMinBytes = 1, - fetchMaxBytes = maxFetchBytes, - hardMaxBytesLimit = false, fetchInfos = Seq(inconsistentTidp -> validFetchPartitionData), - quota = UnboundedQuota, - isolationLevel = IsolationLevel.READ_UNCOMMITTED, - responseCallback = callback, - clientMetadata = None + responseCallback = callback ) + val fetch1 = successfulFetch.headOption.filter(_._1 == inconsistentTidp).map(_._2) assertTrue(fetch1.isDefined) assertEquals(Errors.INCONSISTENT_TOPIC_ID, fetch1.get.error) @@ -891,17 +890,11 @@ class ReplicaManagerTest { // Fetch messages simulating an ID in the log. // We should not see topic ID errors. val zeroTidp = new TopicIdPartition(Uuid.ZERO_UUID, tidp.topicPartition) - replicaManager.fetchMessages( - timeout = 0L, + fetchPartitions( + replicaManager, replicaId = 1, - fetchMinBytes = 1, - fetchMaxBytes = maxFetchBytes, - hardMaxBytesLimit = false, fetchInfos = Seq(zeroTidp -> validFetchPartitionData), - quota = UnboundedQuota, - isolationLevel = IsolationLevel.READ_UNCOMMITTED, - responseCallback = callback, - clientMetadata = None + responseCallback = callback ) val fetch2 = successfulFetch.headOption.filter(_._1 == zeroTidp).map(_._2) assertTrue(fetch2.isDefined) @@ -932,17 +925,11 @@ class ReplicaManagerTest { assertEquals(None, replicaManager.getPartitionOrException(tp2).topicId) // Fetch messages simulating the request containing a topic ID. We should not have an error. - replicaManager.fetchMessages( - timeout = 0L, + fetchPartitions( + replicaManager, replicaId = 1, - fetchMinBytes = 1, - fetchMaxBytes = maxFetchBytes, - hardMaxBytesLimit = false, fetchInfos = Seq(tidp2 -> validFetchPartitionData), - quota = UnboundedQuota, - isolationLevel = IsolationLevel.READ_UNCOMMITTED, - responseCallback = callback, - clientMetadata = None + responseCallback = callback ) val fetch3 = successfulFetch.headOption.filter(_._1 == tidp2).map(_._2) assertTrue(fetch3.isDefined) @@ -950,17 +937,11 @@ class ReplicaManagerTest { // Fetch messages simulating the request not containing a topic ID. We should not have an error. val zeroTidp2 = new TopicIdPartition(Uuid.ZERO_UUID, tidp2.topicPartition) - replicaManager.fetchMessages( - timeout = 0L, + fetchPartitions( + replicaManager, replicaId = 1, - fetchMinBytes = 1, - fetchMaxBytes = maxFetchBytes, - hardMaxBytesLimit = false, fetchInfos = Seq(zeroTidp2 -> validFetchPartitionData), - quota = UnboundedQuota, - isolationLevel = IsolationLevel.READ_UNCOMMITTED, - responseCallback = callback, - clientMetadata = None + responseCallback = callback ) val fetch4 = successfulFetch.headOption.filter(_._1 == zeroTidp2).map(_._2) assertTrue(fetch4.isDefined) @@ -1051,20 +1032,19 @@ class ReplicaManagerTest { assertFalse(tp1Status.get.records.batches.iterator.hasNext) } - replicaManager.fetchMessages( - timeout = 1000, + fetchPartitions( + replicaManager, replicaId = 1, - fetchMinBytes = 0, - fetchMaxBytes = Int.MaxValue, - hardMaxBytesLimit = false, fetchInfos = Seq( tidp0 -> new PartitionData(Uuid.ZERO_UUID, 1, 0, 100000, Optional.empty()), - tidp1 -> new PartitionData(Uuid.ZERO_UUID, 1, 0, 100000, Optional.empty())), - quota = UnboundedQuota, + tidp1 -> new PartitionData(Uuid.ZERO_UUID, 1, 0, 100000, Optional.empty()) + ), responseCallback = fetchCallback, - isolationLevel = IsolationLevel.READ_UNCOMMITTED, - clientMetadata = None + maxWaitMs = 1000, + minBytes = 0, + maxBytes = Int.MaxValue ) + val tp0Log = replicaManager.localLog(tp0) assertTrue(tp0Log.isDefined) assertEquals(1, tp0Log.get.highWatermark, "hw should be incremented") @@ -1228,12 +1208,12 @@ class ReplicaManagerTest { val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") - val consumerResult = fetchAsConsumer(replicaManager, tidp0, + val consumerResult = fetchPartitionAsConsumer(replicaManager, tidp0, new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), clientMetadata = Some(metadata)) // Fetch from follower succeeds - assertTrue(consumerResult.isFired) + assertTrue(consumerResult.hasFired) // But only leader will compute preferred replica assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty) @@ -1286,12 +1266,12 @@ class ReplicaManagerTest { val metadata = new DefaultClientMetadata("rack-a", "client-id", InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") - val consumerResult = fetchAsConsumer(replicaManager, tidp0, + val consumerResult = fetchPartitionAsConsumer(replicaManager, tidp0, new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), clientMetadata = Some(metadata)) // Fetch from leader succeeds - assertTrue(consumerResult.isFired) + assertTrue(consumerResult.hasFired) // Returns a preferred replica (should just be the leader, which is None) assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined) @@ -1334,12 +1314,12 @@ class ReplicaManagerTest { val metadata = new DefaultClientMetadata("rack-a", "client-id", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, "default") - val consumerResult = fetchAsConsumer(replicaManager, tidp0, + val consumerResult = fetchPartitionAsConsumer(replicaManager, tidp0, new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), clientMetadata = Some(metadata)) // Fetch from follower succeeds - assertTrue(consumerResult.isFired) + assertTrue(consumerResult.hasFired) // Expect not run the preferred read replica selection assertEquals(0, replicaManager.replicaSelectorOpt.get.asInstanceOf[MockReplicaSelector].getSelectionCount) @@ -1395,12 +1375,12 @@ class ReplicaManagerTest { InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, "default") // If a preferred read replica is selected, the fetch response returns immediately, even if min bytes and timeout conditions are not met. - val consumerResult = fetchAsConsumer(replicaManager, tidp0, + val consumerResult = fetchPartitionAsConsumer(replicaManager, tidp0, new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), - minBytes = 1, clientMetadata = Some(metadata), timeout = 5000) + minBytes = 1, clientMetadata = Some(metadata), maxWaitMs = 5000) // Fetch from leader succeeds - assertTrue(consumerResult.isFired) + assertTrue(consumerResult.hasFired) // No delayed fetch was inserted assertEquals(0, replicaManager.delayedFetchPurgatory.watched) @@ -1454,24 +1434,33 @@ class ReplicaManagerTest { // Increment the hw in the leader by fetching from the last offset val fetchOffset = simpleRecords.size - var followerResult = fetchAsFollower(replicaManager, tidp0, + var followerResult = fetchPartitionAsFollower( + replicaManager, + tidp0, new PartitionData(Uuid.ZERO_UUID, fetchOffset, 0, 100000, Optional.empty()), - clientMetadata = None) - assertTrue(followerResult.isFired) + replicaId = 1, + minBytes = 0 + ) + assertTrue(followerResult.hasFired) assertEquals(0, followerResult.assertFired.highWatermark) - assertTrue(appendResult.isFired, "Expected producer request to be acked") + assertTrue(appendResult.hasFired, "Expected producer request to be acked") // Fetch from the same offset, no new data is expected and hence the fetch request should // go to the purgatory - followerResult = fetchAsFollower(replicaManager, tidp0, + followerResult = fetchPartitionAsFollower( + replicaManager, + tidp0, new PartitionData(Uuid.ZERO_UUID, fetchOffset, 0, 100000, Optional.empty()), - clientMetadata = None, minBytes = 1000) - assertFalse(followerResult.isFired, "Request completed immediately unexpectedly") + replicaId = 1, + minBytes = 1000, + maxWaitMs = 1000 + ) + assertFalse(followerResult.hasFired, "Request completed immediately unexpectedly") // Complete the request in the purgatory by advancing the clock timer.advanceClock(1001) - assertTrue(followerResult.isFired) + assertTrue(followerResult.hasFired) assertEquals(fetchOffset, followerResult.assertFired.highWatermark) } @@ -1545,16 +1534,15 @@ class ReplicaManagerTest { val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") var partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, Optional.of(0)) - var fetchResult = sendConsumerFetch(replicaManager, tidp0, partitionData, Some(clientMetadata)) - assertNotNull(fetchResult.get) - assertEquals(Errors.NONE, fetchResult.get.error) + var fetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData, + clientMetadata = Some(clientMetadata)) + assertEquals(Errors.NONE, fetchResult.assertFired.error) // Fetch from follower, with empty ClientMetadata (which implies an older version) partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, Optional.of(0)) - fetchResult = sendConsumerFetch(replicaManager, tidp0, partitionData, None) - assertNotNull(fetchResult.get) - assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error) + fetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData) + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.assertFired.error) } finally { replicaManager.shutdown() } @@ -1596,16 +1584,14 @@ class ReplicaManagerTest { val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, Optional.empty()) - val nonPurgatoryFetchResult = sendConsumerFetch(replicaManager, tidp0, partitionData, None) - assertNotNull(nonPurgatoryFetchResult.get) - assertEquals(Errors.NONE, nonPurgatoryFetchResult.get.error) + val nonPurgatoryFetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData) + assertEquals(Errors.NONE, nonPurgatoryFetchResult.assertFired.error) assertMetricCount(1) - val purgatoryFetchResult = sendConsumerFetch(replicaManager, tidp0, partitionData, None, timeout = 10) - assertNull(purgatoryFetchResult.get) + val purgatoryFetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData, maxWaitMs = 10) + assertFalse(purgatoryFetchResult.hasFired) mockTimer.advanceClock(11) - assertNotNull(purgatoryFetchResult.get) - assertEquals(Errors.NONE, purgatoryFetchResult.get.error) + assertEquals(Errors.NONE, purgatoryFetchResult.assertFired.error) assertMetricCount(2) } @@ -1638,8 +1624,8 @@ class ReplicaManagerTest { val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, Optional.empty()) - val fetchResult = sendConsumerFetch(replicaManager, tidp0, partitionData, None, timeout = 10) - assertNull(fetchResult.get) + val fetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData, maxWaitMs = 10) + assertFalse(fetchResult.hasFired) // Become a follower and ensure that the delayed fetch returns immediately val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, @@ -1656,9 +1642,7 @@ class ReplicaManagerTest { topicIds.asJava, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) - - assertNotNull(fetchResult.get) - assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error) + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.assertFired.error) } finally { replicaManager.shutdown() } @@ -1696,8 +1680,14 @@ class ReplicaManagerTest { val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, Optional.of(1)) - val fetchResult = sendConsumerFetch(replicaManager, tidp0, partitionData, Some(clientMetadata), timeout = 10) - assertNull(fetchResult.get) + val fetchResult = fetchPartitionAsConsumer( + replicaManager, + tidp0, + partitionData, + clientMetadata = Some(clientMetadata), + maxWaitMs = 10 + ) + assertFalse(fetchResult.hasFired) // Become a follower and ensure that the delayed fetch returns immediately val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, @@ -1714,9 +1704,7 @@ class ReplicaManagerTest { topicIds.asJava, Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) - - assertNotNull(fetchResult.get) - assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.get.error) + assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.assertFired.error) } finally { replicaManager.shutdown() } @@ -1752,15 +1740,13 @@ class ReplicaManagerTest { val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") var partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, Optional.of(1)) - var fetchResult = sendConsumerFetch(replicaManager, tidp0, partitionData, Some(clientMetadata)) - assertNotNull(fetchResult.get) - assertEquals(Errors.NONE, fetchResult.get.error) + var fetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData, clientMetadata = Some(clientMetadata)) + assertEquals(Errors.NONE, fetchResult.assertFired.error) partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, Optional.empty()) - fetchResult = sendConsumerFetch(replicaManager, tidp0, partitionData, Some(clientMetadata)) - assertNotNull(fetchResult.get) - assertEquals(Errors.NONE, fetchResult.get.error) + fetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData, clientMetadata = Some(clientMetadata)) + assertEquals(Errors.NONE, fetchResult.assertFired.error) } @Test @@ -1795,8 +1781,8 @@ class ReplicaManagerTest { val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, Optional.of(1)) - val fetchResult = sendConsumerFetch(replicaManager, tidp0, partitionData, None, timeout = 10) - assertNull(fetchResult.get) + val fetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData, maxWaitMs = 10) + assertFalse(fetchResult.hasFired) when(replicaManager.metadataCache.contains(tp0)).thenReturn(true) // We have a fetch in purgatory, now receive a stop replica request and @@ -1807,8 +1793,7 @@ class ReplicaManagerTest { .setDeletePartition(true) .setLeaderEpoch(LeaderAndIsr.EpochDuringDelete))) - assertNotNull(fetchResult.get) - assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error) + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.assertFired.error) } @Test @@ -1880,30 +1865,6 @@ class ReplicaManagerTest { produceResult } - private def sendConsumerFetch(replicaManager: ReplicaManager, - topicIdPartition: TopicIdPartition, - partitionData: FetchRequest.PartitionData, - clientMetadataOpt: Option[ClientMetadata], - timeout: Long = 0L): AtomicReference[FetchPartitionData] = { - val fetchResult = new AtomicReference[FetchPartitionData]() - def callback(response: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { - fetchResult.set(response.toMap.apply(topicIdPartition)) - } - replicaManager.fetchMessages( - timeout = timeout, - replicaId = Request.OrdinaryConsumerId, - fetchMinBytes = 1, - fetchMaxBytes = 100, - hardMaxBytesLimit = false, - fetchInfos = Seq(topicIdPartition -> partitionData), - quota = UnboundedQuota, - isolationLevel = IsolationLevel.READ_UNCOMMITTED, - responseCallback = callback, - clientMetadata = clientMetadataOpt - ) - fetchResult - } - /** * This method assumes that the test using created ReplicaManager calls * ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing @@ -2085,11 +2046,11 @@ class ReplicaManagerTest { private var fun: Option[T => Unit] = None def assertFired: T = { - assertTrue(isFired, "Callback has not been fired") + assertTrue(hasFired, "Callback has not been fired") value.get } - def isFired: Boolean = { + def hasFired: Boolean = { value.isDefined } @@ -2100,7 +2061,7 @@ class ReplicaManagerTest { def onFire(fun: T => Unit): CallbackResult[T] = { this.fun = Some(fun) - if (this.isFired) fire(value.get) + if (this.hasFired) fire(value.get) this } } @@ -2128,33 +2089,67 @@ class ReplicaManagerTest { result } - private def fetchAsConsumer(replicaManager: ReplicaManager, - partition: TopicIdPartition, - partitionData: PartitionData, - minBytes: Int = 0, - isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED, - clientMetadata: Option[ClientMetadata] = None, - timeout: Long = 1000): CallbackResult[FetchPartitionData] = { - fetchMessages(replicaManager, replicaId = -1, partition, partitionData, minBytes, isolationLevel, clientMetadata, timeout) - } - - private def fetchAsFollower(replicaManager: ReplicaManager, - partition: TopicIdPartition, - partitionData: PartitionData, - minBytes: Int = 0, - isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED, - clientMetadata: Option[ClientMetadata] = None): CallbackResult[FetchPartitionData] = { - fetchMessages(replicaManager, replicaId = 1, partition, partitionData, minBytes, isolationLevel, clientMetadata) - } - - private def fetchMessages(replicaManager: ReplicaManager, - replicaId: Int, - partition: TopicIdPartition, - partitionData: PartitionData, - minBytes: Int, - isolationLevel: IsolationLevel, - clientMetadata: Option[ClientMetadata], - timeout: Long = 1000): CallbackResult[FetchPartitionData] = { + private def fetchPartitionAsConsumer( + replicaManager: ReplicaManager, + partition: TopicIdPartition, + partitionData: PartitionData, + maxWaitMs: Long = 0, + minBytes: Int = 1, + maxBytes: Int = 1024 * 1024, + isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED, + clientMetadata: Option[ClientMetadata] = None, + ): CallbackResult[FetchPartitionData] = { + val isolation = isolationLevel match { + case IsolationLevel.READ_COMMITTED => FetchTxnCommitted + case IsolationLevel.READ_UNCOMMITTED => FetchHighWatermark + } + + fetchPartition( + replicaManager, + replicaId = Request.OrdinaryConsumerId, + partition, + partitionData, + minBytes, + maxBytes, + isolation, + clientMetadata, + maxWaitMs + ) + } + + private def fetchPartitionAsFollower( + replicaManager: ReplicaManager, + partition: TopicIdPartition, + partitionData: PartitionData, + replicaId: Int, + maxWaitMs: Long = 0, + minBytes: Int = 1, + maxBytes: Int = 1024 * 1024, + ): CallbackResult[FetchPartitionData] = { + fetchPartition( + replicaManager, + replicaId = replicaId, + partition, + partitionData, + minBytes = minBytes, + maxBytes = maxBytes, + isolation = FetchLogEnd, + clientMetadata = None, + maxWaitMs = maxWaitMs + ) + } + + private def fetchPartition( + replicaManager: ReplicaManager, + replicaId: Int, + partition: TopicIdPartition, + partitionData: PartitionData, + minBytes: Int, + maxBytes: Int, + isolation: FetchIsolation, + clientMetadata: Option[ClientMetadata], + maxWaitMs: Long + ): CallbackResult[FetchPartitionData] = { val result = new CallbackResult[FetchPartitionData]() def fetchCallback(responseStatus: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { assertEquals(1, responseStatus.size) @@ -2163,22 +2158,52 @@ class ReplicaManagerTest { result.fire(fetchData) } - replicaManager.fetchMessages( - timeout = timeout, + fetchPartitions( + replicaManager, replicaId = replicaId, - fetchMinBytes = minBytes, - fetchMaxBytes = Int.MaxValue, - hardMaxBytesLimit = false, fetchInfos = Seq(partition -> partitionData), - quota = UnboundedQuota, responseCallback = fetchCallback, - isolationLevel = isolationLevel, + maxWaitMs = maxWaitMs, + minBytes = minBytes, + maxBytes = maxBytes, + isolation = isolation, clientMetadata = clientMetadata ) result } + private def fetchPartitions( + replicaManager: ReplicaManager, + replicaId: Int, + fetchInfos: Seq[(TopicIdPartition, PartitionData)], + responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit, + requestVersion: Short = ApiKeys.FETCH.latestVersion, + maxWaitMs: Long = 0, + minBytes: Int = 1, + maxBytes: Int = 1024 * 1024, + quota: ReplicaQuota = UnboundedQuota, + isolation: FetchIsolation = FetchLogEnd, + clientMetadata: Option[ClientMetadata] = None + ): Unit = { + val params = FetchParams( + requestVersion = requestVersion, + replicaId = replicaId, + maxWaitMs = maxWaitMs, + minBytes = minBytes, + maxBytes = maxBytes, + isolation = isolation, + clientMetadata = clientMetadata + ) + + replicaManager.fetchMessages( + params, + fetchInfos, + quota, + responseCallback + ) + } + private def setupReplicaManagerWithMockedPurgatories( timer: MockTimer, brokerId: Int = 0, @@ -3142,14 +3167,11 @@ class ReplicaManagerTest { // Send a produce request and advance the highwatermark val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords) - fetchMessages( + fetchPartitionAsFollower( replicaManager, - otherId, topicIdPartition, new PartitionData(Uuid.ZERO_UUID, numOfRecords, 0, Int.MaxValue, Optional.empty()), - Int.MaxValue, - IsolationLevel.READ_UNCOMMITTED, - None + replicaId = otherId ) assertEquals(Errors.NONE, leaderResponse.get.error) @@ -3211,14 +3233,11 @@ class ReplicaManagerTest { // Send a produce request and advance the highwatermark val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords) - fetchMessages( + fetchPartitionAsFollower( replicaManager, - otherId, topicIdPartition, new PartitionData(Uuid.ZERO_UUID, numOfRecords, 0, Int.MaxValue, Optional.empty()), - Int.MaxValue, - IsolationLevel.READ_UNCOMMITTED, - None + replicaId = otherId ) assertEquals(Errors.NONE, leaderResponse.get.error) @@ -3484,15 +3503,15 @@ class ReplicaManagerTest { assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) // Send a fetch request - val fetchCallback = fetchMessages( + val fetchCallback = fetchPartitionAsFollower( replicaManager, - otherId, topicIdPartition, new PartitionData(Uuid.ZERO_UUID, 0, 0, Int.MaxValue, Optional.empty()), - Int.MaxValue, - IsolationLevel.READ_UNCOMMITTED, - None + replicaId = otherId, + minBytes = Int.MaxValue, + maxWaitMs = 1000 ) + assertFalse(fetchCallback.hasFired) // Change the local replica to follower val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false) diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala index d04377a21c..93dc8bdd39 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala @@ -84,6 +84,8 @@ class BrokerMetadataListenerTest { Collections.emptyMap[String, VersionRange](), Optional.empty[String](), true), delta.clusterDelta().broker(1)) } + + override def publishedOffset: Long = -1 }).get() } finally { listener.close() @@ -125,6 +127,8 @@ class BrokerMetadataListenerTest { override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = { image = newImage } + + override def publishedOffset: Long = -1 } private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw") diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index 329c9d1e1e..46932d6c88 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -19,7 +19,7 @@ package unit.kafka.server.metadata import java.util.Collections.{singleton, singletonMap} import java.util.Properties -import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import kafka.log.UnifiedLog import kafka.server.KafkaConfig @@ -30,12 +30,13 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource.Type.BROKER +import org.apache.kafka.common.utils.Exit import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage} import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.metadata.PartitionRegistration import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito import org.mockito.invocation.InvocationOnMock @@ -44,6 +45,24 @@ import org.mockito.stubbing.Answer import scala.jdk.CollectionConverters._ class BrokerMetadataPublisherTest { + val exitException = new AtomicReference[Throwable](null) + + @BeforeEach + def setUp(): Unit = { + Exit.setExitProcedure((code, _) => exitException.set(new RuntimeException(s"Exit ${code}"))) + Exit.setHaltProcedure((code, _) => exitException.set(new RuntimeException(s"Halt ${code}"))) + } + + @AfterEach + def tearDown(): Unit = { + Exit.resetExitProcedure(); + Exit.resetHaltProcedure(); + val exception = exitException.get() + if (exception != null) { + throw exception + } + } + @Test def testGetTopicDelta(): Unit = { assert(BrokerMetadataPublisher.getTopicDelta( diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index bc51644cb5..31ba10f79c 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1127,6 +1127,25 @@ object TestUtils extends Logging { throw new IllegalStateException(s"Cannot get topic: $topic, partition: $partition in server metadata cache")) } + /** + * Wait until the kraft broker metadata have caught up to the controller, before calling this, we should make sure + * the related metadata message has already been committed to the controller metadata log. + */ + def ensureConsistentKRaftMetadata( + brokers: Seq[KafkaBroker], + controllerServer: ControllerServer, + msg: String = "Timeout waiting for controller metadata propagating to brokers" + ): Unit = { + val controllerOffset = controllerServer.raftManager.replicatedLog.endOffset().offset - 1 + TestUtils.waitUntilTrue( + () => { + brokers.forall { broker => + val metadataOffset = broker.asInstanceOf[BrokerServer].metadataPublisher.publishedOffset + metadataOffset >= controllerOffset + } + }, msg) + } + def waitUntilControllerElected(zkClient: KafkaZkClient, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = { val (controllerId, _) = computeUntilTrue(zkClient.getControllerId, waitTime = timeout)(_.isDefined) controllerId.getOrElse(throw new AssertionError(s"Controller not elected after $timeout ms")) diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 066d62bc85..6ce71917b7 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -100,6 +100,7 @@ versions += [ kafka_28: "2.8.1", kafka_30: "3.0.1", kafka_31: "3.1.0", + kafka_32: "3.2.0", lz4: "1.8.0", mavenArtifact: "3.8.4", metrics: "2.2.0", diff --git a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java index b0fbfb58a3..d3fc0fe76e 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java @@ -41,11 +41,14 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; /** @@ -142,7 +145,7 @@ static void validateNewAcl(AclBinding binding) { ControllerResult> deleteAcls(List filters) { List results = new ArrayList<>(); - List records = new ArrayList<>(); + Set records = new HashSet<>(); for (AclBindingFilter filter : filters) { try { validateFilter(filter); @@ -152,11 +155,11 @@ ControllerResult> deleteAcls(List filter results.add(new AclDeleteResult(ApiError.fromThrowable(e).exception())); } } - return ControllerResult.atomicOf(records, results); + return ControllerResult.atomicOf(records.stream().collect(Collectors.toList()), results); } AclDeleteResult deleteAclsForFilter(AclBindingFilter filter, - List records) { + Set records) { List deleted = new ArrayList<>(); for (Entry entry : idToAcl.entrySet()) { Uuid id = entry.getKey(); diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 558e55b902..cde9d39569 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -213,15 +213,19 @@ private void incrementalAlterConfigResource(ConfigResource configResource, "key " + key + " because its type is not LIST.")); return; } - List newValueParts = getParts(newValue, key, configResource); + List oldValueList = getParts(newValue, key, configResource); if (opType == APPEND) { - if (!newValueParts.contains(opValue)) { - newValueParts.add(opValue); + for (String value : opValue.split(",")) { + if (!oldValueList.contains(value)) { + oldValueList.add(value); + } + } + } else { + for (String value : opValue.split(",")) { + oldValueList.remove(value); } - newValue = String.join(",", newValueParts); - } else if (newValueParts.remove(opValue)) { - newValue = String.join(",", newValueParts); } + newValue = String.join(",", oldValueList); break; } if (!Objects.equals(currentValue, newValue)) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java index 368eaa1443..08b362a727 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java @@ -306,4 +306,34 @@ public void testCreateAclDeleteAcl() { (AccessControlEntryRecord) list.get(0).message()).toBinding()); assertFalse(iterator.hasNext()); } + + @Test + public void testDeleteDedupe() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + AclControlManager manager = new AclControlManager(snapshotRegistry, Optional.empty()); + MockClusterMetadataAuthorizer authorizer = new MockClusterMetadataAuthorizer(); + authorizer.loadSnapshot(manager.idToAcl()); + + AclBinding aclBinding = new AclBinding(new ResourcePattern(TOPIC, "topic-1", LITERAL), + new AccessControlEntry("User:user", "10.0.0.1", AclOperation.ALL, ALLOW)); + + ControllerResult> createResult = manager.createAcls(Arrays.asList(aclBinding)); + Uuid id = ((AccessControlEntryRecord) createResult.records().get(0).message()).id(); + assertEquals(1, createResult.records().size()); + + ControllerResult> deleteAclResultsAnyFilter = manager.deleteAcls(Arrays.asList(AclBindingFilter.ANY)); + assertEquals(1, deleteAclResultsAnyFilter.records().size()); + assertEquals(id, ((RemoveAccessControlEntryRecord) deleteAclResultsAnyFilter.records().get(0).message()).id()); + assertEquals(1, deleteAclResultsAnyFilter.response().size()); + + ControllerResult> deleteAclResultsSpecificFilter = manager.deleteAcls(Arrays.asList(aclBinding.toFilter())); + assertEquals(1, deleteAclResultsSpecificFilter.records().size()); + assertEquals(id, ((RemoveAccessControlEntryRecord) deleteAclResultsSpecificFilter.records().get(0).message()).id()); + assertEquals(1, deleteAclResultsSpecificFilter.response().size()); + + ControllerResult> deleteAclResultsBothFilters = manager.deleteAcls(Arrays.asList(AclBindingFilter.ANY, aclBinding.toFilter())); + assertEquals(1, deleteAclResultsBothFilters.records().size()); + assertEquals(id, ((RemoveAccessControlEntryRecord) deleteAclResultsBothFilters.records().get(0).message()).id()); + assertEquals(2, deleteAclResultsBothFilters.response().size()); + } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index 1ba9591e6a..007e84ffc0 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -53,6 +53,7 @@ import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SUBTRACT; import static org.apache.kafka.common.config.ConfigResource.Type.BROKER; import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; +import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD; import static org.apache.kafka.metadata.ConfigSynonym.HOURS_TO_MILLISECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -137,10 +138,10 @@ public void testReplay() throws Exception { RecordTestUtils.assertBatchIteratorContains(asList( asList(new ApiMessageAndVersion(new ConfigRecord(). setResourceType(TOPIC.id()).setResourceName("mytopic"). - setName("abc").setValue("x,y,z"), (short) 0), + setName("abc").setValue("x,y,z"), CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion(new ConfigRecord(). setResourceType(TOPIC.id()).setResourceName("mytopic"). - setName("def").setValue("blah"), (short) 0))), + setName("def").setValue("blah"), CONFIG_RECORD.highestSupportedVersion()))), manager.iterator(Long.MAX_VALUE)); } @@ -159,7 +160,7 @@ public void testIncrementalAlterConfigs() { assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion( new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic"). - setName("abc").setValue("123"), (short) 0)), + setName("abc").setValue("123"), CONFIG_RECORD.highestSupportedVersion())), toMap(entry(BROKER0, new ApiError(Errors.INVALID_CONFIG, "Can't SUBTRACT to key baz because its type is not LIST.")), entry(MYTOPIC, ApiError.NONE))), result); @@ -168,13 +169,59 @@ public void testIncrementalAlterConfigs() { assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion( new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic"). - setName("abc").setValue(null), (short) 0)), + setName("abc").setValue(null), CONFIG_RECORD.highestSupportedVersion())), toMap(entry(MYTOPIC, ApiError.NONE))), manager.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap( entry("abc", entry(DELETE, "xyz"))))), true)); } + @Test + public void testIncrementalAlterMultipleConfigValues() { + ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). + setKafkaConfigSchema(SCHEMA). + build(); + + ControllerResult> result = manager. + incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123,456,789"))))), true); + + assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion( + new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic"). + setName("abc").setValue("123,456,789"), CONFIG_RECORD.highestSupportedVersion())), + toMap(entry(MYTOPIC, ApiError.NONE))), result); + + RecordTestUtils.replayAll(manager, result.records()); + + // It's ok for the appended value to be already present + result = manager + .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123,456"))))), true); + assertEquals( + ControllerResult.atomicOf(Collections.emptyList(), toMap(entry(MYTOPIC, ApiError.NONE))), + result + ); + RecordTestUtils.replayAll(manager, result.records()); + + result = manager + .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(SUBTRACT, "123,456"))))), true); + assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion( + new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic"). + setName("abc").setValue("789"), CONFIG_RECORD.highestSupportedVersion())), + toMap(entry(MYTOPIC, ApiError.NONE))), + result); + RecordTestUtils.replayAll(manager, result.records()); + + // It's ok for the deleted value not to be present + result = manager + .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", entry(SUBTRACT, "123456"))))), true); + assertEquals( + ControllerResult.atomicOf(Collections.emptyList(), toMap(entry(MYTOPIC, ApiError.NONE))), + result + ); + RecordTestUtils.replayAll(manager, result.records()); + + assertEquals("789", manager.getConfigs(MYTOPIC).get("abc")); + } + @Test public void testIncrementalAlterConfigsWithoutExistence() { ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). @@ -191,7 +238,7 @@ public void testIncrementalAlterConfigsWithoutExistence() { assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion( new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("ExistingTopic"). - setName("def").setValue("newVal"), (short) 0)), + setName("def").setValue("newVal"), CONFIG_RECORD.highestSupportedVersion())), toMap(entry(BROKER0, new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "Unknown resource.")), entry(existingTopic, ApiError.NONE))), result); @@ -242,9 +289,9 @@ public void testIncrementalAlterConfigsWithPolicy() { build(); assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion( new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0"). - setName("foo.bar").setValue("123"), (short) 0), new ApiMessageAndVersion( + setName("foo.bar").setValue("123"), CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion( new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0"). - setName("quux").setValue("456"), (short) 0)), + setName("quux").setValue("456"), CONFIG_RECORD.highestSupportedVersion())), toMap(entry(MYTOPIC, new ApiError(Errors.POLICY_VIOLATION, "Expected: AlterConfigPolicy.RequestMetadata(resource=ConfigResource(" + "type=TOPIC, name='mytopic'), configs={}). Got: " + @@ -267,10 +314,10 @@ public void testLegacyAlterConfigs() { List expectedRecords1 = asList( new ApiMessageAndVersion(new ConfigRecord(). setResourceType(TOPIC.id()).setResourceName("mytopic"). - setName("abc").setValue("456"), (short) 0), + setName("abc").setValue("456"), CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion(new ConfigRecord(). setResourceType(TOPIC.id()).setResourceName("mytopic"). - setName("def").setValue("901"), (short) 0)); + setName("def").setValue("901"), CONFIG_RECORD.highestSupportedVersion())); assertEquals(ControllerResult.atomicOf( expectedRecords1, toMap(entry(MYTOPIC, ApiError.NONE))), manager.legacyAlterConfigs( @@ -286,7 +333,7 @@ expectedRecords1, toMap(entry(MYTOPIC, ApiError.NONE))), .setResourceName("mytopic") .setName("abc") .setValue(null), - (short) 0)), + CONFIG_RECORD.highestSupportedVersion())), toMap(entry(MYTOPIC, ApiError.NONE))), manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", "901")))), true)); diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java b/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java index 0833df0bb2..3ce72a591f 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java @@ -68,7 +68,7 @@ public class RaftConfig { public static final String QUORUM_FETCH_TIMEOUT_MS_CONFIG = QUORUM_PREFIX + "fetch.timeout.ms"; public static final String QUORUM_FETCH_TIMEOUT_MS_DOC = "Maximum time without a successful fetch from " + - "the current leader before becoming a candidate and triggering a election for voters; Maximum time without " + + "the current leader before becoming a candidate and triggering an election for voters; Maximum time without " + "receiving fetch from a majority of the quorum before asking around to see if there's a new epoch for leader"; public static final int DEFAULT_QUORUM_FETCH_TIMEOUT_MS = 2_000; diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 3f312d254c..25d39b8b29 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -65,6 +65,7 @@ RUN mkdir -p "/opt/kafka-2.7.1" && chmod a+rw /opt/kafka-2.7.1 && curl -s "$KAFK RUN mkdir -p "/opt/kafka-2.8.1" && chmod a+rw /opt/kafka-2.8.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.8.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.8.1" RUN mkdir -p "/opt/kafka-3.0.1" && chmod a+rw /opt/kafka-3.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.0.1" RUN mkdir -p "/opt/kafka-3.1.0" && chmod a+rw /opt/kafka-3.1.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.1.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.1.0" +RUN mkdir -p "/opt/kafka-3.2.0" && chmod a+rw /opt/kafka-3.2.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.2.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.2.0" # Streams test dependencies RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar @@ -84,6 +85,7 @@ RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.7.1-test.jar" -o /opt/kafka-2.7.1/lib RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.8.1-test.jar" -o /opt/kafka-2.8.1/libs/kafka-streams-2.8.1-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.0.1-test.jar" -o /opt/kafka-3.0.1/libs/kafka-streams-3.0.1-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.1.0-test.jar" -o /opt/kafka-3.1.0/libs/kafka-streams-3.1.0-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.2.0-test.jar" -o /opt/kafka-3.1.0/libs/kafka-streams-3.2.0-test.jar # The version of Kibosh to use for testing. # If you update this, also update vagrant/base.sh diff --git a/vagrant/base.sh b/vagrant/base.sh index 13cc9ff0b7..3cf19e4d79 100755 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -152,6 +152,9 @@ get_kafka 3.0.1 2.12 chmod a+rw /opt/kafka-3.0.1 get_kafka 3.1.0 2.12 chmod a+rw /opt/kafka-3.1.0 +get_kafka 3.2.0 2.12 +chmod a+rw /opt/kafka-3.2.0 + # For EC2 nodes, we want to use /mnt, which should have the local disk. On local # VMs, we can just create it if it doesn't exist and use it like we'd use