+ This only affects modules that specify a logging backend (connect-runtime and kafka-tools are two such examples).
+ A number of modules, including kafka-clients, leave it to the application to specify the logging backend.
+ More information can be found at reload4j.
+ Projects that depend on the affected modules from the Kafka project should use
+ slf4j-log4j12 version 1.7.35 or above or
+ slf4j-reload4j to avoid
+ possible compatibility issues originating from the logging framework.
The example connectors, FileStreamSourceConnector and FileStreamSinkConnector, have been
removed from the default classpath. To use them in Kafka Connect standalone or distributed mode they need to be
explicitly added, for example CLASSPATH=./lib/connect-file-3.2.0.jar ./bin/connect-distributed.sh.
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 1dec000f3..7571a069f 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -17,8 +17,6 @@
package org.apache.kafka.jmh.fetcher;
-import kafka.api.ApiVersion;
-import kafka.api.ApiVersion$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.DelayedOperations;
import kafka.cluster.AlterPartitionListener;
@@ -69,6 +67,7 @@
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.MetadataVersion;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -145,7 +144,7 @@ public void setup() throws IOException {
setFlushStartOffsetCheckpointMs(10000L).
setRetentionCheckMs(1000L).
setMaxPidExpirationMs(60000).
- setInterBrokerProtocolVersion(ApiVersion.latestVersion()).
+ setInterBrokerProtocolVersion(MetadataVersion.latest()).
setScheduler(scheduler).
setBrokerTopicStats(brokerTopicStats).
setLogDirFailureChannel(logDirFailureChannel).
@@ -174,7 +173,7 @@ public void setup() throws IOException {
OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class);
Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Option.apply(0L));
AlterPartitionManager isrChannelManager = Mockito.mock(AlterPartitionManager.class);
- Partition partition = new Partition(tp, 100, ApiVersion$.MODULE$.latestVersion(),
+ Partition partition = new Partition(tp, 100, MetadataVersion.latest(),
0, Time.SYSTEM, alterPartitionListener, new DelayedOperationsMock(tp),
Mockito.mock(MetadataCache.class), logManager, isrChannelManager);
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index 1bc695ecb..4daddd29b 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -17,8 +17,6 @@
package org.apache.kafka.jmh.partition;
-import kafka.api.ApiVersion;
-import kafka.api.ApiVersion$;
import kafka.cluster.DelayedOperations;
import kafka.cluster.AlterPartitionListener;
import kafka.cluster.Partition;
@@ -42,6 +40,7 @@
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.MetadataVersion;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -111,7 +110,7 @@ public void setup() throws IOException {
setFlushStartOffsetCheckpointMs(10000L).
setRetentionCheckMs(1000L).
setMaxPidExpirationMs(60000).
- setInterBrokerProtocolVersion(ApiVersion.latestVersion()).
+ setInterBrokerProtocolVersion(MetadataVersion.latest()).
setScheduler(scheduler).
setBrokerTopicStats(brokerTopicStats).
setLogDirFailureChannel(logDirFailureChannel).
@@ -125,7 +124,7 @@ public void setup() throws IOException {
AlterPartitionListener alterPartitionListener = Mockito.mock(AlterPartitionListener.class);
AlterPartitionManager alterPartitionManager = Mockito.mock(AlterPartitionManager.class);
partition = new Partition(tp, 100,
- ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
+ MetadataVersion.latest(), 0, Time.SYSTEM,
alterPartitionListener, delayedOperations,
Mockito.mock(MetadataCache.class), logManager, alterPartitionManager);
partition.createLogIfNotExists(true, false, offsetCheckpoints, topicId);
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index cf7201d4c..f1f3d76ba 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -17,8 +17,6 @@
package org.apache.kafka.jmh.partition;
-import kafka.api.ApiVersion;
-import kafka.api.ApiVersion$;
import kafka.cluster.DelayedOperations;
import kafka.cluster.AlterPartitionListener;
import kafka.cluster.Partition;
@@ -39,6 +37,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.MetadataVersion;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -97,7 +96,7 @@ public void setUp() {
setFlushStartOffsetCheckpointMs(10000L).
setRetentionCheckMs(1000L).
setMaxPidExpirationMs(60000).
- setInterBrokerProtocolVersion(ApiVersion.latestVersion()).
+ setInterBrokerProtocolVersion(MetadataVersion.latest()).
setScheduler(scheduler).
setBrokerTopicStats(brokerTopicStats).
setLogDirFailureChannel(logDirFailureChannel).
@@ -124,7 +123,7 @@ public void setUp() {
AlterPartitionListener alterPartitionListener = Mockito.mock(AlterPartitionListener.class);
AlterPartitionManager alterPartitionManager = Mockito.mock(AlterPartitionManager.class);
partition = new Partition(topicPartition, 100,
- ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
+ MetadataVersion.latest(), 0, Time.SYSTEM,
alterPartitionListener, delayedOperations,
Mockito.mock(MetadataCache.class), logManager, alterPartitionManager);
partition.makeLeader(partitionState, offsetCheckpoints, topicId);
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java
index 24ac53e78..cfbc66b66 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.jmh.record;
-import kafka.api.ApiVersion;
import kafka.common.LongRef;
import kafka.log.AppendOrigin;
import kafka.log.LogValidator;
@@ -26,6 +25,7 @@
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.MetadataVersion;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
@@ -58,7 +58,7 @@ public void measureValidateMessagesAndAssignOffsetsCompressed(Blackhole bh) {
CompressionCodec.getCompressionCodec(compressionType.id),
false, messageVersion, TimestampType.CREATE_TIME, Long.MAX_VALUE, 0,
new AppendOrigin.Client$(),
- ApiVersion.latestVersion(),
+ MetadataVersion.latest(),
brokerTopicStats,
requestLocal);
}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 919179ac3..dcbacac7e 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.jmh.server;
-import kafka.api.ApiVersion;
import kafka.cluster.Partition;
import kafka.log.CleanerConfig;
import kafka.log.LogConfig;
@@ -39,6 +38,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.MetadataVersion;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
@@ -107,7 +107,7 @@ public void setup() {
this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files),
LogConfig.apply(), new MockConfigRepository(), CleanerConfig.apply(1, 4 * 1024 * 1024L, 0.9d,
1024 * 1024, 32 * 1024 * 1024,
- Double.MAX_VALUE, 15 * 1000, true, "MD5"), time, ApiVersion.latestVersion());
+ Double.MAX_VALUE, 15 * 1000, true, "MD5"), time, MetadataVersion.latest());
scheduler.startup();
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
final MetadataCache metadataCache =
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index ac9a7f4c5..e1649db22 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.jmh.server;
-import kafka.api.ApiVersion;
import kafka.cluster.Partition;
import kafka.log.CleanerConfig;
import kafka.log.Defaults;
@@ -44,6 +43,7 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.MetadataVersion;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -134,7 +134,7 @@ public void setup() {
setFlushStartOffsetCheckpointMs(10000L).
setRetentionCheckMs(1000L).
setMaxPidExpirationMs(60000).
- setInterBrokerProtocolVersion(ApiVersion.latestVersion()).
+ setInterBrokerProtocolVersion(MetadataVersion.latest()).
setScheduler(scheduler).
setBrokerTopicStats(brokerTopicStats).
setLogDirFailureChannel(failureChannel).
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
new file mode 100644
index 000000000..0292dab1d
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+/**
+ * This class contains the different Kafka versions.
+ * Right now, we use them for upgrades - users can configure the version of the API brokers will use to communicate between themselves.
+ * This is only for inter-broker communications - when communicating with clients, the client decides on the API version.
+ *
+ * Note that the ID we initialize for each version is important.
+ * We consider a version newer than another if it is lower in the enum list (to avoid depending on lexicographic order)
+ *
+ * Since the api protocol may change more than once within the same release and to facilitate people deploying code from
+ * trunk, we have the concept of internal versions (first introduced during the 0.10.0 development cycle). For example,
+ * the first time we introduce a version change in a release, say 0.10.0, we will add a config value "0.10.0-IV0" and a
+ * corresponding enum constant IBP_0_10_0-IV0. We will also add a config value "0.10.0" that will be mapped to the
+ * latest internal version object, which is IBP_0_10_0-IV0. When we change the protocol a second time while developing
+ * 0.10.0, we will add a new config value "0.10.0-IV1" and a corresponding enum constant IBP_0_10_0-IV1. We will change
+ * the config value "0.10.0" to map to the latest internal version IBP_0_10_0-IV1. The config value of
+ * "0.10.0-IV0" is still mapped to IBP_0_10_0-IV0. This way, if people are deploying from trunk, they can use
+ * "0.10.0-IV0" and "0.10.0-IV1" to upgrade one internal version at a time. For most people who just want to use
+ * released version, they can use "0.10.0" when upgrading to the 0.10.0 release.
+ */
+public enum MetadataVersion {
+ IBP_0_8_0(-1, "0.8.0", ""),
+ IBP_0_8_1(-1, "0.8.1", ""),
+ IBP_0_8_2(-1, "0.8.2", ""),
+ IBP_0_9_0(-1, "0.9.0", ""),
+
+ // 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
+ IBP_0_10_0_IV0(-1, "0.10.0", "IV0"),
+
+ // 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL handshake).
+ IBP_0_10_0_IV1(-1, "0.10.0", "IV1"),
+
+ // introduced for JoinGroup protocol change in KIP-62
+ IBP_0_10_1_IV0(-1, "0.10.1", "IV0"),
+
+ // 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+ IBP_0_10_1_IV1(-1, "0.10.1", "IV1"),
+
+ // introduced ListOffsetRequest v1 in KIP-79
+ IBP_0_10_1_IV2(-1, "0.10.1", "IV2"),
+
+ // introduced UpdateMetadataRequest v3 in KIP-103
+ IBP_0_10_2_IV0(-1, "0.10.2", "IV0"),
+
+ // KIP-98 (idempotent and transactional producer support)
+ IBP_0_11_0_IV0(-1, "0.11.0", "IV0"),
+
+ // introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+ IBP_0_11_0_IV1(-1, "0.11.0", "IV1"),
+
+ // Introduced leader epoch fetches to the replica fetcher via KIP-101
+ IBP_0_11_0_IV2(-1, "0.11.0", "IV2"),
+
+ // Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112
+ IBP_1_0_IV0(-1, "1.0", "IV0"),
+
+ // Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental fetch requests,
+ // and KafkaStorageException for fetch requests.
+ IBP_1_1_IV0(-1, "1.1", "IV0"),
+
+ // Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log divergence between leader and follower after fast leader fail over)
+ IBP_2_0_IV0(-1, "2.0", "IV0"),
+
+ // Several request versions were bumped due to KIP-219 (Improve quota communication)
+ IBP_2_0_IV1(-1, "2.0", "IV1"),
+
+ // Introduced new schemas for group offset (v2) and group metadata (v2) (KIP-211)
+ IBP_2_1_IV0(-1, "2.1", "IV0"),
+
+ // New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
+ IBP_2_1_IV1(-1, "2.1", "IV1"),
+
+ // Support ZStandard Compression Codec (KIP-110)
+ IBP_2_1_IV2(-1, "2.1", "IV2"),
+
+ // Introduced broker generation (KIP-380), and
+ // LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
+ IBP_2_2_IV0(-1, "2.2", "IV0"),
+
+ // New error code for ListOffsets when a new leader is lagging behind former HW (KIP-207)
+ IBP_2_2_IV1(-1, "2.2", "IV1"),
+
+ // Introduced static membership.
+ IBP_2_3_IV0(-1, "2.3", "IV0"),
+
+ // Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, and replica_id to OffsetsForLeaderRequest
+ IBP_2_3_IV1(-1, "2.3", "IV1"),
+
+ // Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest
+ IBP_2_4_IV0(-1, "2.4", "IV0"),
+
+ // Flexible version support in inter-broker APIs
+ IBP_2_4_IV1(-1, "2.4", "IV1"),
+
+ // No new APIs, equivalent to 2.4-IV1
+ IBP_2_5_IV0(-1, "2.5", "IV0"),
+
+ // Introduced StopReplicaRequest V3 containing the leader epoch for each partition (KIP-570)
+ IBP_2_6_IV0(-1, "2.6", "IV0"),
+
+ // Introduced feature versioning support (KIP-584)
+ IBP_2_7_IV0(-1, "2.7", "IV0"),
+
+ // Bup Fetch protocol for Raft protocol (KIP-595)
+ IBP_2_7_IV1(-1, "2.7", "IV1"),
+
+ // Introduced AlterPartition (KIP-497)
+ IBP_2_7_IV2(-1, "2.7", "IV2"),
+
+ // Flexible versioning on ListOffsets, WriteTxnMarkers and OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
+ IBP_2_8_IV0(-1, "2.8", "IV0"),
+
+ // Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516)
+ IBP_2_8_IV1(-1, "2.8", "IV1"),
+
+ // Introduce AllocateProducerIds (KIP-730)
+ IBP_3_0_IV0(1, "3.0", "IV0"),
+
+ // Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734)
+ // Assume message format version is 3.0 (KIP-724)
+ IBP_3_0_IV1(2, "3.0", "IV1"),
+
+ // Adds topic IDs to Fetch requests/responses (KIP-516)
+ IBP_3_1_IV0(3, "3.1", "IV0"),
+
+ // Support for leader recovery for unclean leader election (KIP-704)
+ IBP_3_2_IV0(4, "3.2", "IV0");
+
+ public static final MetadataVersion[] VALUES = MetadataVersion.values();
+ private final Optional featureLevel;
+ private final String release;
+ private final String ibpVersion;
+
+ MetadataVersion(int featureLevel, String release, String subVersion) {
+ if (featureLevel > 0) {
+ this.featureLevel = Optional.of((short) featureLevel);
+ } else {
+ this.featureLevel = Optional.empty();
+ }
+ this.release = release;
+ if (subVersion.isEmpty()) {
+ this.ibpVersion = release;
+ } else {
+ this.ibpVersion = String.format("%s-%s", release, subVersion);
+ }
+ }
+
+ public Optional featureLevel() {
+ return featureLevel;
+ }
+
+ public boolean isSaslInterBrokerHandshakeRequestEnabled() {
+ return this.isAtLeast(IBP_0_10_0_IV1);
+ }
+
+ public boolean isOffsetForLeaderEpochSupported() {
+ return this.isAtLeast(IBP_0_11_0_IV2);
+ }
+
+ public boolean isFeatureVersioningSupported() {
+ return this.isAtLeast(IBP_2_7_IV0);
+ }
+
+ public boolean isTruncationOnFetchSupported() {
+ return this.isAtLeast(IBP_2_7_IV1);
+ }
+
+ public boolean isAlterIsrSupported() {
+ return this.isAtLeast(IBP_2_7_IV2);
+ }
+
+ public boolean isTopicIdsSupported() {
+ return this.isAtLeast(IBP_2_8_IV0);
+ }
+
+ public boolean isAllocateProducerIdsSupported() {
+ return this.isAtLeast(IBP_3_0_IV0);
+ }
+
+
+ public RecordVersion highestSupportedRecordVersion() {
+ if (this.isLessThan(IBP_0_10_0_IV0)) {
+ return RecordVersion.V0;
+ } else if (this.isLessThan(IBP_0_11_0_IV0)) {
+ return RecordVersion.V1;
+ } else {
+ return RecordVersion.V2;
+ }
+ }
+
+ private static final Map IBP_VERSIONS;
+ static {
+ {
+ IBP_VERSIONS = new HashMap<>();
+ Map maxInterVersion = new HashMap<>();
+ for (MetadataVersion metadataVersion : VALUES) {
+ maxInterVersion.put(metadataVersion.release, metadataVersion);
+ IBP_VERSIONS.put(metadataVersion.ibpVersion, metadataVersion);
+ }
+ IBP_VERSIONS.putAll(maxInterVersion);
+ }
+ }
+
+ public String shortVersion() {
+ return release;
+ }
+
+ public String version() {
+ return ibpVersion;
+ }
+
+ /**
+ * Return an `MetadataVersion` instance for `versionString`, which can be in a variety of formats (e.g. "0.8.0", "0.8.0.x",
+ * "0.10.0", "0.10.0-IV1"). `IllegalArgumentException` is thrown if `versionString` cannot be mapped to an `MetadataVersion`.
+ * Note that 'misconfigured' values such as "1.0.1" will be parsed to `IBP_1_0_IV0` as we ignore anything after the first
+ * two digits for versions that don't start with "0."
+ */
+ public static MetadataVersion fromVersionString(String versionString) {
+ String[] versionSegments = versionString.split(Pattern.quote("."));
+ int numSegments = (versionString.startsWith("0.")) ? 3 : 2;
+ String key;
+ if (numSegments >= versionSegments.length) {
+ key = versionString;
+ } else {
+ key = String.join(".", Arrays.copyOfRange(versionSegments, 0, numSegments));
+ }
+ return Optional.ofNullable(IBP_VERSIONS.get(key)).orElseThrow(() ->
+ new IllegalArgumentException("Version " + versionString + " is not a valid version")
+ );
+ }
+
+ /**
+ * Return the minimum `MetadataVersion` that supports `RecordVersion`.
+ */
+ public static MetadataVersion minSupportedFor(RecordVersion recordVersion) {
+ switch (recordVersion) {
+ case V0:
+ return IBP_0_8_0;
+ case V1:
+ return IBP_0_10_0_IV0;
+ case V2:
+ return IBP_0_11_0_IV0;
+ default:
+ throw new IllegalArgumentException("Invalid message format version " + recordVersion);
+ }
+ }
+
+ public static MetadataVersion latest() {
+ return VALUES[VALUES.length - 1];
+ }
+
+ public boolean isAtLeast(MetadataVersion otherVersion) {
+ return this.compareTo(otherVersion) >= 0;
+ }
+
+ public boolean isLessThan(MetadataVersion otherVersion) {
+ return this.compareTo(otherVersion) < 0;
+ }
+
+ @Override
+ public String toString() {
+ return ibpVersion;
+ }
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersionValidator.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersionValidator.java
new file mode 100644
index 000000000..d685dd018
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersionValidator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.ConfigDef.Validator;
+import org.apache.kafka.common.config.ConfigException;
+
+public class MetadataVersionValidator implements Validator {
+
+ @Override
+ public void ensureValid(String name, Object value) {
+ try {
+ MetadataVersion.fromVersionString(value.toString());
+ } catch (IllegalArgumentException e) {
+ throw new ConfigException(name, value.toString(), e.getMessage());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "[" + Arrays.stream(MetadataVersion.VALUES).map(MetadataVersion::version).collect(
+ Collectors.joining(", ")) + "]";
+ }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
new file mode 100644
index 000000000..7ad8754b7
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.ApiMessageType.ListenerType;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ApiVersionsResponseData.FinalizedFeatureKey;
+import org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureKey;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.utils.Utils;
+
+import static org.apache.kafka.server.common.MetadataVersion.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.junit.jupiter.api.Test;
+
+class MetadataVersionTest {
+
+ @Test
+ public void testFeatureLevel() {
+ int firstFeatureLevelIndex = Arrays.asList(MetadataVersion.VALUES).indexOf(IBP_3_0_IV0);
+ for (int i = 0; i < firstFeatureLevelIndex; i++) {
+ assertFalse(MetadataVersion.VALUES[i].featureLevel().isPresent());
+ }
+ short expectedFeatureLevel = 1;
+ for (int i = firstFeatureLevelIndex; i < MetadataVersion.VALUES.length; i++) {
+ MetadataVersion metadataVersion = MetadataVersion.VALUES[i];
+ short featureLevel = metadataVersion.featureLevel().orElseThrow(() ->
+ new IllegalArgumentException(
+ String.format("Metadata version %s must have a non-null feature level", metadataVersion.version())));
+ assertEquals(expectedFeatureLevel, featureLevel,
+ String.format("Metadata version %s should have feature level %s", metadataVersion.version(), expectedFeatureLevel));
+ expectedFeatureLevel += 1;
+ }
+ }
+
+ @Test
+ public void testFromVersionString() {
+ assertEquals(IBP_0_8_0, MetadataVersion.fromVersionString("0.8.0"));
+ assertEquals(IBP_0_8_0, MetadataVersion.fromVersionString("0.8.0.0"));
+ assertEquals(IBP_0_8_0, MetadataVersion.fromVersionString("0.8.0.1"));
+ // should throw an exception as long as IBP_8_0_IV0 is not defined
+ assertThrows(IllegalArgumentException.class, () -> MetadataVersion.fromVersionString("8.0"));
+
+ assertEquals(IBP_0_8_1, MetadataVersion.fromVersionString("0.8.1"));
+ assertEquals(IBP_0_8_1, MetadataVersion.fromVersionString("0.8.1.0"));
+ assertEquals(IBP_0_8_1, MetadataVersion.fromVersionString("0.8.1.1"));
+
+ assertEquals(IBP_0_8_2, MetadataVersion.fromVersionString("0.8.2"));
+ assertEquals(IBP_0_8_2, MetadataVersion.fromVersionString("0.8.2.0"));
+ assertEquals(IBP_0_8_2, MetadataVersion.fromVersionString("0.8.2.1"));
+
+ assertEquals(IBP_0_9_0, MetadataVersion.fromVersionString("0.9.0"));
+ assertEquals(IBP_0_9_0, MetadataVersion.fromVersionString("0.9.0.0"));
+ assertEquals(IBP_0_9_0, MetadataVersion.fromVersionString("0.9.0.1"));
+
+ assertEquals(IBP_0_10_0_IV0, MetadataVersion.fromVersionString("0.10.0-IV0"));
+
+ assertEquals(IBP_0_10_0_IV1, MetadataVersion.fromVersionString("0.10.0"));
+ assertEquals(IBP_0_10_0_IV1, MetadataVersion.fromVersionString("0.10.0.0"));
+ assertEquals(IBP_0_10_0_IV1, MetadataVersion.fromVersionString("0.10.0.0-IV0"));
+ assertEquals(IBP_0_10_0_IV1, MetadataVersion.fromVersionString("0.10.0.1"));
+
+ assertEquals(IBP_0_10_1_IV0, MetadataVersion.fromVersionString("0.10.1-IV0"));
+ assertEquals(IBP_0_10_1_IV1, MetadataVersion.fromVersionString("0.10.1-IV1"));
+
+ assertEquals(IBP_0_10_1_IV2, MetadataVersion.fromVersionString("0.10.1"));
+ assertEquals(IBP_0_10_1_IV2, MetadataVersion.fromVersionString("0.10.1.0"));
+ assertEquals(IBP_0_10_1_IV2, MetadataVersion.fromVersionString("0.10.1-IV2"));
+ assertEquals(IBP_0_10_1_IV2, MetadataVersion.fromVersionString("0.10.1.1"));
+
+ assertEquals(IBP_0_10_2_IV0, MetadataVersion.fromVersionString("0.10.2"));
+ assertEquals(IBP_0_10_2_IV0, MetadataVersion.fromVersionString("0.10.2.0"));
+ assertEquals(IBP_0_10_2_IV0, MetadataVersion.fromVersionString("0.10.2-IV0"));
+ assertEquals(IBP_0_10_2_IV0, MetadataVersion.fromVersionString("0.10.2.1"));
+
+ assertEquals(IBP_0_11_0_IV0, MetadataVersion.fromVersionString("0.11.0-IV0"));
+ assertEquals(IBP_0_11_0_IV1, MetadataVersion.fromVersionString("0.11.0-IV1"));
+
+ assertEquals(IBP_0_11_0_IV2, MetadataVersion.fromVersionString("0.11.0"));
+ assertEquals(IBP_0_11_0_IV2, MetadataVersion.fromVersionString("0.11.0.0"));
+ assertEquals(IBP_0_11_0_IV2, MetadataVersion.fromVersionString("0.11.0-IV2"));
+ assertEquals(IBP_0_11_0_IV2, MetadataVersion.fromVersionString("0.11.0.1"));
+
+ assertEquals(IBP_1_0_IV0, MetadataVersion.fromVersionString("1.0"));
+ assertEquals(IBP_1_0_IV0, MetadataVersion.fromVersionString("1.0.0"));
+ assertEquals(IBP_1_0_IV0, MetadataVersion.fromVersionString("1.0.0-IV0"));
+ assertEquals(IBP_1_0_IV0, MetadataVersion.fromVersionString("1.0.1"));
+ assertThrows(IllegalArgumentException.class, () -> MetadataVersion.fromVersionString("0.1.0"));
+ assertThrows(IllegalArgumentException.class, () -> MetadataVersion.fromVersionString("0.1.0.0"));
+ assertThrows(IllegalArgumentException.class, () -> MetadataVersion.fromVersionString("0.1.0-IV0"));
+ assertThrows(IllegalArgumentException.class, () -> MetadataVersion.fromVersionString("0.1.0.0-IV0"));
+
+ assertEquals(IBP_1_1_IV0, MetadataVersion.fromVersionString("1.1-IV0"));
+
+ assertEquals(IBP_2_0_IV1, MetadataVersion.fromVersionString("2.0"));
+ assertEquals(IBP_2_0_IV0, MetadataVersion.fromVersionString("2.0-IV0"));
+ assertEquals(IBP_2_0_IV1, MetadataVersion.fromVersionString("2.0-IV1"));
+
+ assertEquals(IBP_2_1_IV2, MetadataVersion.fromVersionString("2.1"));
+ assertEquals(IBP_2_1_IV0, MetadataVersion.fromVersionString("2.1-IV0"));
+ assertEquals(IBP_2_1_IV1, MetadataVersion.fromVersionString("2.1-IV1"));
+ assertEquals(IBP_2_1_IV2, MetadataVersion.fromVersionString("2.1-IV2"));
+
+ assertEquals(IBP_2_2_IV1, MetadataVersion.fromVersionString("2.2"));
+ assertEquals(IBP_2_2_IV0, MetadataVersion.fromVersionString("2.2-IV0"));
+ assertEquals(IBP_2_2_IV1, MetadataVersion.fromVersionString("2.2-IV1"));
+
+ assertEquals(IBP_2_3_IV1, MetadataVersion.fromVersionString("2.3"));
+ assertEquals(IBP_2_3_IV0, MetadataVersion.fromVersionString("2.3-IV0"));
+ assertEquals(IBP_2_3_IV1, MetadataVersion.fromVersionString("2.3-IV1"));
+
+ assertEquals(IBP_2_4_IV1, MetadataVersion.fromVersionString("2.4"));
+ assertEquals(IBP_2_4_IV0, MetadataVersion.fromVersionString("2.4-IV0"));
+ assertEquals(IBP_2_4_IV1, MetadataVersion.fromVersionString("2.4-IV1"));
+
+ assertEquals(IBP_2_5_IV0, MetadataVersion.fromVersionString("2.5"));
+ assertEquals(IBP_2_5_IV0, MetadataVersion.fromVersionString("2.5-IV0"));
+
+ assertEquals(IBP_2_6_IV0, MetadataVersion.fromVersionString("2.6"));
+ assertEquals(IBP_2_6_IV0, MetadataVersion.fromVersionString("2.6-IV0"));
+
+ assertEquals(IBP_2_7_IV0, MetadataVersion.fromVersionString("2.7-IV0"));
+ assertEquals(IBP_2_7_IV1, MetadataVersion.fromVersionString("2.7-IV1"));
+ assertEquals(IBP_2_7_IV2, MetadataVersion.fromVersionString("2.7-IV2"));
+
+ assertEquals(IBP_2_8_IV1, MetadataVersion.fromVersionString("2.8"));
+ assertEquals(IBP_2_8_IV0, MetadataVersion.fromVersionString("2.8-IV0"));
+ assertEquals(IBP_2_8_IV1, MetadataVersion.fromVersionString("2.8-IV1"));
+
+ assertEquals(IBP_3_0_IV1, MetadataVersion.fromVersionString("3.0"));
+ assertEquals(IBP_3_0_IV0, MetadataVersion.fromVersionString("3.0-IV0"));
+ assertEquals(IBP_3_0_IV1, MetadataVersion.fromVersionString("3.0-IV1"));
+
+ assertEquals(IBP_3_1_IV0, MetadataVersion.fromVersionString("3.1"));
+ assertEquals(IBP_3_1_IV0, MetadataVersion.fromVersionString("3.1-IV0"));
+
+ assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2"));
+ assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2-IV0"));
+ }
+
+ @Test
+ public void testMinSupportedVersionFor() {
+ assertEquals(IBP_0_8_0, MetadataVersion.minSupportedFor(RecordVersion.V0));
+ assertEquals(IBP_0_10_0_IV0, MetadataVersion.minSupportedFor(RecordVersion.V1));
+ assertEquals(IBP_0_11_0_IV0, MetadataVersion.minSupportedFor(RecordVersion.V2));
+
+ // Ensure that all record versions have a defined min version so that we remember to update the method
+ for (RecordVersion recordVersion : RecordVersion.values()) {
+ assertNotNull(MetadataVersion.minSupportedFor(recordVersion));
+ }
+ }
+
+ @Test
+ public void testShortVersion() {
+ assertEquals("0.8.0", IBP_0_8_0.shortVersion());
+ assertEquals("0.10.0", IBP_0_10_0_IV0.shortVersion());
+ assertEquals("0.10.0", IBP_0_10_0_IV1.shortVersion());
+ assertEquals("0.11.0", IBP_0_11_0_IV0.shortVersion());
+ assertEquals("0.11.0", IBP_0_11_0_IV1.shortVersion());
+ assertEquals("0.11.0", IBP_0_11_0_IV2.shortVersion());
+ assertEquals("1.0", IBP_1_0_IV0.shortVersion());
+ assertEquals("1.1", IBP_1_1_IV0.shortVersion());
+ assertEquals("2.0", IBP_2_0_IV0.shortVersion());
+ assertEquals("2.0", IBP_2_0_IV1.shortVersion());
+ assertEquals("2.1", IBP_2_1_IV0.shortVersion());
+ assertEquals("2.1", IBP_2_1_IV1.shortVersion());
+ assertEquals("2.1", IBP_2_1_IV2.shortVersion());
+ assertEquals("2.2", IBP_2_2_IV0.shortVersion());
+ assertEquals("2.2", IBP_2_2_IV1.shortVersion());
+ assertEquals("2.3", IBP_2_3_IV0.shortVersion());
+ assertEquals("2.3", IBP_2_3_IV1.shortVersion());
+ assertEquals("2.4", IBP_2_4_IV0.shortVersion());
+ assertEquals("2.5", IBP_2_5_IV0.shortVersion());
+ assertEquals("2.6", IBP_2_6_IV0.shortVersion());
+ assertEquals("2.7", IBP_2_7_IV2.shortVersion());
+ assertEquals("2.8", IBP_2_8_IV0.shortVersion());
+ assertEquals("2.8", IBP_2_8_IV1.shortVersion());
+ assertEquals("3.0", IBP_3_0_IV0.shortVersion());
+ assertEquals("3.0", IBP_3_0_IV1.shortVersion());
+ assertEquals("3.1", IBP_3_1_IV0.shortVersion());
+ assertEquals("3.2", IBP_3_2_IV0.shortVersion());
+ }
+
+ @Test
+ public void testVersion() {
+ assertEquals("0.8.0", IBP_0_8_0.version());
+ assertEquals("0.8.2", IBP_0_8_2.version());
+ assertEquals("0.10.0-IV0", IBP_0_10_0_IV0.version());
+ assertEquals("0.10.0-IV1", IBP_0_10_0_IV1.version());
+ assertEquals("0.11.0-IV0", IBP_0_11_0_IV0.version());
+ assertEquals("0.11.0-IV1", IBP_0_11_0_IV1.version());
+ assertEquals("0.11.0-IV2", IBP_0_11_0_IV2.version());
+ assertEquals("1.0-IV0", IBP_1_0_IV0.version());
+ assertEquals("1.1-IV0", IBP_1_1_IV0.version());
+ assertEquals("2.0-IV0", IBP_2_0_IV0.version());
+ assertEquals("2.0-IV1", IBP_2_0_IV1.version());
+ assertEquals("2.1-IV0", IBP_2_1_IV0.version());
+ assertEquals("2.1-IV1", IBP_2_1_IV1.version());
+ assertEquals("2.1-IV2", IBP_2_1_IV2.version());
+ assertEquals("2.2-IV0", IBP_2_2_IV0.version());
+ assertEquals("2.2-IV1", IBP_2_2_IV1.version());
+ assertEquals("2.3-IV0", IBP_2_3_IV0.version());
+ assertEquals("2.3-IV1", IBP_2_3_IV1.version());
+ assertEquals("2.4-IV0", IBP_2_4_IV0.version());
+ assertEquals("2.5-IV0", IBP_2_5_IV0.version());
+ assertEquals("2.6-IV0", IBP_2_6_IV0.version());
+ assertEquals("2.7-IV2", IBP_2_7_IV2.version());
+ assertEquals("2.8-IV0", IBP_2_8_IV0.version());
+ assertEquals("2.8-IV1", IBP_2_8_IV1.version());
+ assertEquals("3.0-IV0", IBP_3_0_IV0.version());
+ assertEquals("3.0-IV1", IBP_3_0_IV1.version());
+ assertEquals("3.1-IV0", IBP_3_1_IV0.version());
+ assertEquals("3.2-IV0", IBP_3_2_IV0.version());
+ }
+
+ @Test
+ public void shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue() {
+ ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse(
+ 10,
+ RecordVersion.V1,
+ Features.emptySupportedFeatures(),
+ Features.emptyFinalizedFeatures(),
+ ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
+ null,
+ ListenerType.ZK_BROKER
+ );
+ verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
+ assertEquals(10, response.throttleTimeMs());
+ assertTrue(response.data().supportedFeatures().isEmpty());
+ assertTrue(response.data().finalizedFeatures().isEmpty());
+ assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, response.data().finalizedFeaturesEpoch());
+ }
+
+ @Test
+ public void shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() {
+ ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse(
+ 10,
+ RecordVersion.V1,
+ Features.supportedFeatures(
+ Utils.mkMap(Utils.mkEntry("feature", new SupportedVersionRange((short) 1, (short) 4)))),
+ Features.finalizedFeatures(
+ Utils.mkMap(Utils.mkEntry("feature", new FinalizedVersionRange((short) 2, (short) 3)))),
+ 10L,
+ null,
+ ListenerType.ZK_BROKER
+ );
+
+ verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
+ assertEquals(10, response.throttleTimeMs());
+ assertEquals(1, response.data().supportedFeatures().size());
+ SupportedFeatureKey sKey = response.data().supportedFeatures().find("feature");
+ assertNotNull(sKey);
+ assertEquals(1, sKey.minVersion());
+ assertEquals(4, sKey.maxVersion());
+ assertEquals(1, response.data().finalizedFeatures().size());
+ FinalizedFeatureKey fKey = response.data().finalizedFeatures().find("feature");
+ assertNotNull(fKey);
+ assertEquals(2, fKey.minVersionLevel());
+ assertEquals(3, fKey.maxVersionLevel());
+ assertEquals(10, response.data().finalizedFeaturesEpoch());
+ }
+
+ private void verifyApiKeysForMagic(ApiVersionsResponse response, Byte maxMagic) {
+ for (ApiVersion version : response.data().apiKeys()) {
+ assertTrue(ApiKeys.forId(version.apiKey()).minRequiredInterBrokerMagic <= maxMagic);
+ }
+ }
+
+ @Test
+ public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() {
+ ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse(
+ AbstractResponse.DEFAULT_THROTTLE_TIME,
+ RecordVersion.current(),
+ Features.emptySupportedFeatures(),
+ Features.emptyFinalizedFeatures(),
+ ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
+ null,
+ ListenerType.ZK_BROKER
+ );
+ assertEquals(new HashSet(ApiKeys.zkBrokerApis()), apiKeysInResponse(response));
+ assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
+ assertTrue(response.data().supportedFeatures().isEmpty());
+ assertTrue(response.data().finalizedFeatures().isEmpty());
+ assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, response.data().finalizedFeaturesEpoch());
+ }
+
+ @Test
+ public void testMetadataQuorumApisAreDisabled() {
+ ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse(
+ AbstractResponse.DEFAULT_THROTTLE_TIME,
+ RecordVersion.current(),
+ Features.emptySupportedFeatures(),
+ Features.emptyFinalizedFeatures(),
+ ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
+ null,
+ ListenerType.ZK_BROKER
+ );
+
+ // Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them
+ HashSet exposedApis = apiKeysInResponse(response);
+ assertFalse(exposedApis.contains(ApiKeys.ENVELOPE));
+ assertFalse(exposedApis.contains(ApiKeys.VOTE));
+ assertFalse(exposedApis.contains(ApiKeys.BEGIN_QUORUM_EPOCH));
+ assertFalse(exposedApis.contains(ApiKeys.END_QUORUM_EPOCH));
+ assertFalse(exposedApis.contains(ApiKeys.DESCRIBE_QUORUM));
+ }
+
+ private HashSet apiKeysInResponse(ApiVersionsResponse apiVersions) {
+ HashSet apiKeys = new HashSet<>();
+ for (ApiVersion version : apiVersions.data().apiKeys()) {
+ apiKeys.add(ApiKeys.forId(version.apiKey()));
+ }
+ return apiKeys;
+ }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionValidatorTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionValidatorTest.java
new file mode 100644
index 000000000..c4255946b
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionValidatorTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+
+public class MetadataVersionValidatorTest {
+
+ @Test
+ public void testMetadataVersionValidator() {
+ String str = new MetadataVersionValidator().toString();
+ String[] apiVersions = str.substring(1).split(",");
+ assertEquals(MetadataVersion.VALUES.length, apiVersions.length);
+ }
+
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 2f1134b43..23c021c63 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1090,6 +1090,9 @@ public static class InternalConfig {
// Private API used to control the emit latency for left/outer join results (https://issues.apache.org/jira/browse/KAFKA-10847)
public static final String EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "__emit.interval.ms.kstreams.outer.join.spurious.results.fix__";
+ // Private API used to control the emit latency for windowed aggregation results for ON_WINDOW_CLOSE emit strategy
+ public static final String EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION = "__emit.interval.ms.kstreams.windowed.aggregation__";
+
// Private API used to control the usage of consistency offset vectors
public static final String IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED = "__iq.consistency.offset"
+ ".vector.enabled__";
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java b/streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java
new file mode 100644
index 000000000..a10b95061
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
+import org.apache.kafka.streams.kstream.internals.emitstrategy.WindowCloseStrategy;
+import org.apache.kafka.streams.kstream.internals.emitstrategy.WindowUpdateStrategy;
+
+/**
+ * This interface controls the strategy that can be used to control how we emit results in a processor.
+ */
+public interface EmitStrategy {
+
+ enum StrategyType {
+ ON_WINDOW_CLOSE,
+ ON_WINDOW_UPDATE
+ }
+
+ /**
+ * Returns the strategy type
+ * @return Emit strategy type
+ */
+ StrategyType type();
+
+ /**
+ * This strategy indicates that the aggregated result for a window will only be emitted when the
+ * window closes instead of when there's an update to the window. Window close means that current
+ * event time is larger than (window end time + grace period).
+ *
+ *
This strategy should only be used for windows which can close. An exception will be thrown
+ * if it's used with {@link UnlimitedWindow}.
+ *
+ * @see TimeWindows
+ * @see SlidingWindows
+ * @see SessionWindows
+ * @see UnlimitedWindows
+ * @see WindowUpdateStrategy
+ *
+ * @return WindowCloseStrategy instance
+ */
+ static EmitStrategy onWindowClose() {
+ return new WindowCloseStrategy();
+ }
+
+ /**
+ * This strategy indicates that the aggregated result for a window will be emitted every time
+ * when there's an update to the window instead of when the window closes.
+ *
+ * @see TimeWindows
+ * @see SlidingWindows
+ * @see SessionWindows
+ * @see UnlimitedWindows
+ * @see WindowCloseStrategy
+ *
+ * @return WindowCloseStrategy instance
+ */
+ static EmitStrategy onWindowUpdate() {
+ return new WindowUpdateStrategy();
+ }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 8e88b5de7..7750e3b65 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -18,11 +18,18 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
@@ -30,6 +37,7 @@
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
@@ -37,6 +45,9 @@
import java.util.Map;
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
+import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
+import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@@ -48,6 +59,7 @@ public class KStreamWindowAggregate implements
private final Windows windows;
private final Initializer initializer;
private final Aggregator super KIn, ? super VIn, VAgg> aggregator;
+ private final EmitStrategy emitStrategy;
private boolean sendOldValues = false;
@@ -55,10 +67,26 @@ public KStreamWindowAggregate(final Windows windows,
final String storeName,
final Initializer initializer,
final Aggregator super KIn, ? super VIn, VAgg> aggregator) {
+ this(windows, storeName, EmitStrategy.onWindowUpdate(), initializer, aggregator);
+ }
+
+ public KStreamWindowAggregate(final Windows windows,
+ final String storeName,
+ final EmitStrategy emitStrategy,
+ final Initializer initializer,
+ final Aggregator super KIn, ? super VIn, VAgg> aggregator) {
this.windows = windows;
this.storeName = storeName;
+ this.emitStrategy = emitStrategy;
this.initializer = initializer;
this.aggregator = aggregator;
+
+ if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+ if (!(windows instanceof TimeWindows)) {
+ throw new IllegalArgumentException("ON_WINDOW_CLOSE strategy is only supported for "
+ + "TimeWindows and SlidingWindows for TimeWindowedKStream");
+ }
+ }
}
@Override
@@ -80,22 +108,54 @@ private class KStreamWindowAggregateProcessor extends ContextualProcessor windowStore;
private TimestampedTupleForwarder, VAgg> tupleForwarder;
private Sensor droppedRecordsSensor;
+ private Sensor emittedRecordsSensor;
+ private Sensor emitFinalLatencySensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+ private long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP;
+ private InternalProcessorContext, Change> internalProcessorContext;
+ private final TimeTracker timeTracker = new TimeTracker();
+ private final Time time = Time.SYSTEM;
@Override
public void init(final ProcessorContext, Change> context) {
super.init(context);
- final InternalProcessorContext, Change> internalProcessorContext =
- (InternalProcessorContext, Change>) context;
+ internalProcessorContext = (InternalProcessorContext, Change>) context;
final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
final String threadId = Thread.currentThread().getName();
droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics);
+ emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(),
+ internalProcessorContext.currentNode().name(), metrics);
+ emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(),
+ internalProcessorContext.currentNode().name(), metrics);
windowStore = context.getStateStore(storeName);
- tupleForwarder = new TimestampedTupleForwarder<>(
- windowStore,
- context,
- new TimestampedCacheFlushListener<>(context),
- sendOldValues);
+
+ if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+ // Don't set flush lister which emit cache results
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ windowStore,
+ context,
+ sendOldValues);
+ } else {
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ windowStore,
+ context,
+ new TimestampedCacheFlushListener<>(context),
+ sendOldValues);
+ }
+
+ // Restore last emit close time for ON_WINDOW_CLOSE strategy
+ if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+ final Long lastEmitTime = internalProcessorContext.processorMetadataForKey(storeName);
+ if (lastEmitTime != null) {
+ lastEmitWindowCloseTime = lastEmitTime;
+ }
+ final long emitInterval = StreamsConfig.InternalConfig.getLong(
+ context.appConfigs(),
+ EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
+ 1000L
+ );
+ timeTracker.setEmitInterval(emitInterval);
+ }
}
@Override
@@ -120,15 +180,16 @@ public void process(final Record record) {
// first get the matching windows
final long timestamp = record.timestamp();
observedStreamTime = Math.max(observedStreamTime, timestamp);
- final long closeTime = observedStreamTime - windows.gracePeriodMs();
+ final long windowCloseTime = observedStreamTime - windows.gracePeriodMs();
final Map matchedWindows = windows.windowsFor(timestamp);
- // try update the window, and create the new window for the rest of unmatched window that do not exist yet
+ // try update the window whose end time is still larger than the window close time,
+ // and create the new window for the rest of unmatched window that do not exist yet;
for (final Map.Entry entry : matchedWindows.entrySet()) {
final Long windowStart = entry.getKey();
final long windowEnd = entry.getValue().end();
- if (windowEnd > closeTime) {
+ if (windowEnd > windowCloseTime) {
final ValueAndTimestamp oldAggAndTimestamp = windowStore.fetch(record.key(), windowStart);
VAgg oldAgg = getValueOrNull(oldAggAndTimestamp);
@@ -146,10 +207,12 @@ public void process(final Record record) {
// update the store with the new value
windowStore.put(record.key(), ValueAndTimestamp.make(newAgg, newTimestamp), windowStart);
- tupleForwarder.maybeForward(
- record.withKey(new Windowed<>(record.key(), entry.getValue()))
- .withValue(new Change<>(newAgg, sendOldValues ? oldAgg : null))
- .withTimestamp(newTimestamp));
+ if (emitStrategy.type() == StrategyType.ON_WINDOW_UPDATE) {
+ tupleForwarder.maybeForward(
+ record.withKey(new Windowed<>(record.key(), entry.getValue()))
+ .withValue(new Change<>(newAgg, sendOldValues ? oldAgg : null))
+ .withTimestamp(newTimestamp));
+ }
} else {
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata = context().recordMetadata().get();
@@ -165,7 +228,7 @@ public void process(final Record record) {
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(),
record.timestamp(),
windowStart, windowEnd,
- closeTime,
+ windowCloseTime,
observedStreamTime
);
} else {
@@ -177,13 +240,88 @@ public void process(final Record record) {
"streamTime=[{}]",
record.timestamp(),
windowStart, windowEnd,
- closeTime,
+ windowCloseTime,
observedStreamTime
);
}
droppedRecordsSensor.record();
}
}
+
+ tryEmitFinalResult(record, windowCloseTime);
+ }
+
+ private void tryEmitFinalResult(final Record record, final long windowCloseTime) {
+ if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
+ return;
+ }
+
+ final long now = internalProcessorContext.currentSystemTimeMs();
+ // Throttle emit frequency as an optimization, the tradeoff is that we need to remember the
+ // window close time when we emitted last time so that we can restart from there in the next emit
+ if (now < timeTracker.nextTimeToEmit) {
+ return;
+ }
+
+ // Schedule next emit time based on now to avoid the case that if system time jumps a lot,
+ // this can be triggered every time
+ timeTracker.nextTimeToEmit = now;
+ timeTracker.advanceNextTimeToEmit();
+
+ // Window close time has not progressed, there will be no windows to close hence no records to emit
+ if (lastEmitWindowCloseTime != ConsumerRecord.NO_TIMESTAMP && lastEmitWindowCloseTime >= windowCloseTime) {
+ return;
+ }
+
+ final long emitRangeUpperBoundInclusive = windowCloseTime - windows.size();
+ // No window has ever closed and hence no need to emit any records
+ if (emitRangeUpperBoundInclusive < 0) {
+ return;
+ }
+
+
+ // Set emitRangeLowerBoundInclusive to -1L if lastEmitWindowCloseTime was not set so that
+ // we would fetch from 0L for the first time; otherwise set it to lastEmitWindowCloseTime - windows.size().
+ //
+ // Note if we get here, it means emitRangeUpperBoundInclusive > 0, which means windowCloseTime > windows.size(),
+ // Because we always set lastEmitWindowCloseTime to windowCloseTime before, it means
+ // lastEmitWindowCloseTime - windows.size() should always > 0
+ // As a result, emitRangeLowerBoundInclusive is always >= 0
+ final long emitRangeLowerBoundInclusive = lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP ?
+ -1L : lastEmitWindowCloseTime - windows.size();
+
+ if (lastEmitWindowCloseTime != ConsumerRecord.NO_TIMESTAMP) {
+ final Map matchedCloseWindows = windows.windowsFor(emitRangeUpperBoundInclusive);
+ final Map matchedEmitWindows = windows.windowsFor(emitRangeLowerBoundInclusive);
+
+ // Don't fetch store if there is no new stores that are closed since the last time we emitted
+ if (matchedCloseWindows.equals(matchedEmitWindows)) {
+ log.trace("no new windows to emit. LastEmitCloseTime={}, newCloseTime={}",
+ lastEmitWindowCloseTime, windowCloseTime);
+ return;
+ }
+ }
+
+ final long startMs = time.milliseconds();
+
+ final KeyValueIterator, ValueAndTimestamp> windowToEmit = windowStore
+ .fetchAll(emitRangeLowerBoundInclusive + 1, emitRangeUpperBoundInclusive);
+
+ int emittedCount = 0;
+ while (windowToEmit.hasNext()) {
+ emittedCount++;
+ final KeyValue, ValueAndTimestamp> kv = windowToEmit.next();
+ tupleForwarder.maybeForward(
+ record.withKey(kv.key)
+ .withValue(new Change<>(kv.value.value(), null))
+ .withTimestamp(kv.value.timestamp())
+ .withHeaders(record.headers()));
+ }
+ emittedRecordsSensor.record(emittedCount);
+ emitFinalLatencySensor.record(time.milliseconds() - startMs);
+
+ lastEmitWindowCloseTime = windowCloseTime;
+ internalProcessorContext.addProcessorMetadataKeyValue(storeName, windowCloseTime);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 7a82d0834..16d689099 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -20,12 +20,15 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.UnlimitedWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
@@ -39,6 +42,7 @@
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
+import org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
@@ -47,6 +51,7 @@ public class TimeWindowedKStreamImpl extends AbstractStr
private final Windows windows;
private final GroupedStreamAggregateBuilder aggregateBuilder;
+ private EmitStrategy emitStrategy = EmitStrategy.onWindowUpdate();
TimeWindowedKStreamImpl(final Windows windows,
final InternalStreamsBuilder builder,
@@ -107,7 +112,7 @@ private KTable, Long> doCount(final Named named,
return aggregateBuilder.build(
new NamedInternal(aggregateName),
materialize(materializedInternal),
- new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+ new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
materializedInternal.queryableStoreName(),
materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
materializedInternal.valueSerde());
@@ -155,7 +160,7 @@ public KTable, VR> aggregate(final Initializer initializer,
return aggregateBuilder.build(
new NamedInternal(aggregateName),
materialize(materializedInternal),
- new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
+ new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, initializer, aggregator),
materializedInternal.queryableStoreName(),
materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
materializedInternal.valueSerde());
@@ -202,12 +207,22 @@ public KTable, V> reduce(final Reducer reducer,
return aggregateBuilder.build(
new NamedInternal(reduceName),
materialize(materializedInternal),
- new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
+ new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
materializedInternal.queryableStoreName(),
materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
materializedInternal.valueSerde());
}
+ //@Override
+ public TimeWindowedKStream emitStrategy(final EmitStrategy emitStrategy) {
+ if (this.windows instanceof UnlimitedWindows
+ && emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+ throw new IllegalArgumentException("ON_WINDOW_CLOSE emit strategy cannot be used for UnlimitedWindows");
+ }
+ this.emitStrategy = emitStrategy;
+ return this;
+ }
+
private StoreBuilder> materialize(final MaterializedInternal> materialized) {
WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
@@ -232,11 +247,19 @@ private StoreBuilder> materialize(final Mater
);
break;
case ROCKS_DB:
- supplier = Stores.persistentTimestampedWindowStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod),
- Duration.ofMillis(windows.size()),
- false
+ supplier = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE ?
+ RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
+ materialized.storeName(),
+ Duration.ofMillis(retentionPeriod),
+ Duration.ofMillis(windows.size()),
+ false,
+ false
+ ) :
+ Stores.persistentTimestampedWindowStore(
+ materialized.storeName(),
+ Duration.ofMillis(retentionPeriod),
+ Duration.ofMillis(windows.size()),
+ false
);
break;
default:
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
index 49f2ab157..ed6cfefdc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
@@ -45,6 +45,14 @@ class TimestampedTupleForwarder {
cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues);
}
+ TimestampedTupleForwarder(final StateStore store,
+ final ProcessorContext> context,
+ final boolean sendOldValues) {
+ this.context = (InternalProcessorContext>) context;
+ this.sendOldValues = sendOldValues;
+ cachingEnabled = false;
+ }
+
public void maybeForward(final Record> record) {
if (!cachingEnabled) {
if (sendOldValues) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/emitstrategy/WindowCloseStrategy.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/emitstrategy/WindowCloseStrategy.java
new file mode 100644
index 000000000..ddbf1090a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/emitstrategy/WindowCloseStrategy.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.emitstrategy;
+
+import org.apache.kafka.streams.kstream.EmitStrategy;
+
+/**
+ * An emit strategy which indicates only output when a window closes.
+ */
+public class WindowCloseStrategy implements EmitStrategy {
+
+ @Override
+ public StrategyType type() {
+ return StrategyType.ON_WINDOW_CLOSE;
+ }
+
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/emitstrategy/WindowUpdateStrategy.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/emitstrategy/WindowUpdateStrategy.java
new file mode 100644
index 000000000..0f87ab22f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/emitstrategy/WindowUpdateStrategy.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.emitstrategy;
+
+import org.apache.kafka.streams.kstream.EmitStrategy;
+
+/**
+ * An emit strategy which indicates output everytime when a window gets an update.
+ */
+public class WindowUpdateStrategy implements EmitStrategy {
+
+ @Override
+ public StrategyType type() {
+ return StrategyType.ON_WINDOW_UPDATE;
+ }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
index 231d9a627..8dcd265a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
@@ -21,6 +21,9 @@
import java.util.Map;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.AVG_LATENCY_DESCRIPTION;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.MAX_LATENCY_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RECORD_E2E_LATENCY;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RECORD_E2E_LATENCY_AVG_DESCRIPTION;
@@ -31,6 +34,8 @@
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMinAndMaxToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
public class ProcessorNodeMetrics {
private ProcessorNodeMetrics() {}
@@ -62,6 +67,17 @@ private ProcessorNodeMetrics() {}
private static final String FORWARD_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + FORWARD_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
+ private static final String EMITTED_RECORDS = "window-aggregate-final-emit";
+ private static final String EMITTED_RECORDS_DESCRIPTION = "emit final records";
+ private static final String EMITTED_RECORDS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + EMITTED_RECORDS_DESCRIPTION;
+ private static final String EMITTED_RECORDS_RATE_DESCRIPTION =
+ RATE_DESCRIPTION_PREFIX + EMITTED_RECORDS_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
+
+ private static final String EMIT_FINAL_LATENCY = EMITTED_RECORDS + LATENCY_SUFFIX;
+ private static final String EMIT_FINAL_DESCRIPTION = "calls to emit final";
+ private static final String EMIT_FINAL_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION + EMIT_FINAL_DESCRIPTION;
+ private static final String EMIT_FINAL_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION + EMIT_FINAL_DESCRIPTION;
+
public static Sensor suppressionEmitSensor(final String threadId,
final String taskId,
final String processorNodeId,
@@ -165,6 +181,42 @@ public static Sensor e2ELatencySensor(final String threadId,
return sensor;
}
+ public static Sensor emitFinalLatencySensor(final String threadId,
+ final String taskId,
+ final String processorNodeId,
+ final StreamsMetricsImpl streamsMetrics) {
+ final String sensorName = processorNodeId + "-" + EMIT_FINAL_LATENCY;
+ final Sensor sensor = streamsMetrics.nodeLevelSensor(threadId, taskId, processorNodeId, sensorName, RecordingLevel.DEBUG);
+ final Map tagMap = streamsMetrics.nodeLevelTagMap(threadId, taskId, processorNodeId);
+ addAvgAndMaxToSensor(
+ sensor,
+ PROCESSOR_NODE_LEVEL_GROUP,
+ tagMap,
+ EMIT_FINAL_LATENCY,
+ EMIT_FINAL_AVG_LATENCY_DESCRIPTION,
+ EMIT_FINAL_MAX_LATENCY_DESCRIPTION
+ );
+ return sensor;
+ }
+
+ public static Sensor emittedRecordsSensor(final String threadId,
+ final String taskId,
+ final String processorNodeId,
+ final StreamsMetricsImpl streamsMetrics) {
+ final String sensorName = processorNodeId + "-" + EMITTED_RECORDS;
+ final Sensor sensor = streamsMetrics.nodeLevelSensor(threadId, taskId, processorNodeId, sensorName, RecordingLevel.DEBUG);
+ final Map tagMap = streamsMetrics.nodeLevelTagMap(threadId, taskId, processorNodeId);
+ addRateOfSumAndSumMetricsToSensor(
+ sensor,
+ PROCESSOR_NODE_LEVEL_GROUP,
+ tagMap,
+ EMITTED_RECORDS,
+ EMITTED_RECORDS_RATE_DESCRIPTION,
+ EMITTED_RECORDS_TOTAL_DESCRIPTION
+ );
+ return sensor;
+ }
+
private static Sensor throughputParentSensor(final String threadId,
final String taskId,
final String metricNamePrefix,
@@ -207,4 +259,6 @@ private static Sensor throughputSensor(final String threadId,
);
return sensor;
}
+
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
index ceabd15e5..4f2587d1e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
@@ -21,6 +21,10 @@
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.WindowStore;
@@ -34,6 +38,8 @@ public class RocksDBTimeOrderedWindowStore
private final boolean retainDuplicates;
private final long windowSize;
+
+ private StateStoreContext stateStoreContext;
private int seqnum = 0;
RocksDBTimeOrderedWindowStore(
@@ -49,6 +55,7 @@ public class RocksDBTimeOrderedWindowStore
@Override
public void init(final StateStoreContext context, final StateStore root) {
+ stateStoreContext = context;
wrapped().init(context, root);
}
@@ -168,6 +175,21 @@ public boolean hasIndex() {
return wrapped().hasIndex();
}
+ @Override
+ public QueryResult query(final Query query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+
+ return StoreQueryUtils.handleBasicQueries(
+ query,
+ positionBound,
+ config,
+ this,
+ getPosition(),
+ stateStoreContext
+ );
+ }
+
private void maybeUpdateSeqnumForDups() {
if (retainDuplicates) {
seqnum = (seqnum + 1) & 0x7FFFFFFF;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
new file mode 100644
index 000000000..9abefce6a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.util.Collection;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.UnlimitedWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThrows;
+
+@SuppressWarnings({"unchecked"})
+@Category({IntegrationTest.class})
+@RunWith(Parameterized.class)
+public class TimeWindowedKStreamIntegrationTest {
+ private static final int NUM_BROKERS = 1;
+
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS,
+ mkProperties(
+ mkMap(mkEntry("log.retention.hours", "-1"), mkEntry("log.retention.bytes", "-1")) // Don't expire records since we manipulate timestamp
+ )
+ );
+
+ @BeforeClass
+ public static void startCluster() throws IOException {
+ CLUSTER.start();
+ }
+
+ @AfterClass
+ public static void closeCluster() {
+ CLUSTER.stop();
+ }
+
+
+ private StreamsBuilder builder;
+ private Properties streamsConfiguration;
+ private KafkaStreams kafkaStreams;
+ private String streamOneInput;
+ private String streamTwoInput;
+ private String outputTopic;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Parameter
+ public StrategyType type;
+
+ @Parameter(1)
+ public boolean withCache;
+
+ @Parameter(2)
+ public EmitStrategy emitStrategy;
+
+ private boolean emitFinal;
+
+ @Parameterized.Parameters(name = "{0}_{1}")
+ public static Collection