Skip to content

Commit

Permalink
KAFKA-13785: [6/N][Emit final] Copy: Emit final for TimeWindowedKStre…
Browse files Browse the repository at this point in the history
…amImpl (#12100)

This is a copy PR of #11896, authored by @lihaosky (Hao Li): Initial implementation to emit final for TimeWindowedKStreamImpl. This PR is on top of #12030

Author: Hao Li
Reviewers: John Roesler <vvcephei@apache.org>
  • Loading branch information
a0x8o committed May 3, 2022
1 parent ec3ed96 commit 0072ecd
Show file tree
Hide file tree
Showing 86 changed files with 4,572 additions and 769 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2301,6 +2301,7 @@ project(':jmh-benchmarks') {
// jmh requires jopt 4.x while `core` depends on 5.0, they are not binary compatible
exclude group: 'net.sf.jopt-simple', module: 'jopt-simple'
}
implementation project(':server-common')
implementation project(':clients')
implementation project(':metadata')
implementation project(':streams')
Expand Down
4 changes: 4 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@
files="metadata[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="BooleanExpressionComplexity"
files="(MetadataImage).java"/>
<suppress checks="ImportControl"
files="ApiVersionsResponse.java"/>
<suppress checks="AvoidStarImport"
files="MetadataVersionTest.java"/>

<!-- Storage -->
<suppress checks="(CyclomaticComplexity|ParameterNumber)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.feature.Features;
import org.apache.kafka.common.feature.FinalizedVersionRange;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiMessageType.ListenerType;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
Expand Down Expand Up @@ -121,6 +123,32 @@ public static ApiVersionsResponse createApiVersionsResponse(
);
}

public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
RecordVersion minRecordVersion,
Features<SupportedVersionRange> latestSupportedFeatures,
Features<FinalizedVersionRange> finalizedFeatures,
long finalizedFeaturesEpoch,
NodeApiVersions controllerApiVersions,
ListenerType listenerType
) {
ApiVersionCollection apiKeys;
if (controllerApiVersions != null) {
apiKeys = intersectForwardableApis(
listenerType, minRecordVersion, controllerApiVersions.allSupportedApiVersions());
} else {
apiKeys = filterApis(minRecordVersion, listenerType);
}

return createApiVersionsResponse(
throttleTimeMs,
apiKeys,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch
);
}

public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
ApiVersionCollection apiVersions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@

package org.apache.kafka.common.requests;

import java.util.HashSet;
import org.apache.kafka.common.feature.Features;
import org.apache.kafka.common.feature.FinalizedVersionRange;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiMessageType.ListenerType;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
import org.apache.kafka.common.message.ApiVersionsResponseData.FinalizedFeatureKey;
import org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureKey;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.RecordVersion;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -102,6 +110,92 @@ public void shouldHaveCommonlyAgreedApiVersionResponseWithControllerOnForwardabl
ApiKeys.JOIN_GROUP.latestVersion(), commonResponse);
}

@Test
public void shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue() {
ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse(
10,
RecordVersion.V1,
Features.emptySupportedFeatures(),
Features.emptyFinalizedFeatures(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER
);
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
assertEquals(10, response.throttleTimeMs());
assertTrue(response.data().supportedFeatures().isEmpty());
assertTrue(response.data().finalizedFeatures().isEmpty());
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, response.data().finalizedFeaturesEpoch());
}

@Test
public void shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() {
ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse(
10,
RecordVersion.V1,
Features.supportedFeatures(
Utils.mkMap(Utils.mkEntry("feature", new SupportedVersionRange((short) 1, (short) 4)))),
Features.finalizedFeatures(
Utils.mkMap(Utils.mkEntry("feature", new FinalizedVersionRange((short) 2, (short) 3)))),
10L,
null,
ListenerType.ZK_BROKER
);

verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
assertEquals(10, response.throttleTimeMs());
assertEquals(1, response.data().supportedFeatures().size());
SupportedFeatureKey sKey = response.data().supportedFeatures().find("feature");
assertNotNull(sKey);
assertEquals(1, sKey.minVersion());
assertEquals(4, sKey.maxVersion());
assertEquals(1, response.data().finalizedFeatures().size());
FinalizedFeatureKey fKey = response.data().finalizedFeatures().find("feature");
assertNotNull(fKey);
assertEquals(2, fKey.minVersionLevel());
assertEquals(3, fKey.maxVersionLevel());
assertEquals(10, response.data().finalizedFeaturesEpoch());
}

@Test
public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() {
ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME,
RecordVersion.current(),
Features.emptySupportedFeatures(),
Features.emptyFinalizedFeatures(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER
);
assertEquals(new HashSet<ApiKeys>(ApiKeys.zkBrokerApis()), apiKeysInResponse(response));
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
assertTrue(response.data().supportedFeatures().isEmpty());
assertTrue(response.data().finalizedFeatures().isEmpty());
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, response.data().finalizedFeaturesEpoch());
}

@Test
public void testMetadataQuorumApisAreDisabled() {
ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME,
RecordVersion.current(),
Features.emptySupportedFeatures(),
Features.emptyFinalizedFeatures(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER
);

// Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them
HashSet<ApiKeys> exposedApis = apiKeysInResponse(response);
assertFalse(exposedApis.contains(ApiKeys.ENVELOPE));
assertFalse(exposedApis.contains(ApiKeys.VOTE));
assertFalse(exposedApis.contains(ApiKeys.BEGIN_QUORUM_EPOCH));
assertFalse(exposedApis.contains(ApiKeys.END_QUORUM_EPOCH));
assertFalse(exposedApis.contains(ApiKeys.DESCRIBE_QUORUM));
}

@Test
public void testIntersect() {
assertFalse(ApiVersionsResponse.intersect(null, null).isPresent());
Expand Down Expand Up @@ -145,4 +239,18 @@ private void verifyVersions(short forwardableAPIKey,
assertEquals(expectedVersionsForForwardableAPI, commonResponse.find(forwardableAPIKey));
}

private void verifyApiKeysForMagic(ApiVersionsResponse response, Byte maxMagic) {
for (ApiVersion version : response.data().apiKeys()) {
assertTrue(ApiKeys.forId(version.apiKey()).minRequiredInterBrokerMagic <= maxMagic);
}
}

private HashSet<ApiKeys> apiKeysInResponse(ApiVersionsResponse apiVersions) {
HashSet<ApiKeys> apiKeys = new HashSet<>();
for (ApiVersion version : apiVersions.data().apiKeys()) {
apiKeys.add(ApiKeys.forId(version.apiKey()));
}
return apiKeys;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package kafka.server.builders;

import kafka.api.ApiVersion;
import kafka.log.CleanerConfig;
import kafka.log.LogConfig;
import kafka.log.LogManager;
Expand All @@ -26,6 +25,7 @@
import kafka.server.metadata.ConfigRepository;
import kafka.utils.Scheduler;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.MetadataVersion;
import scala.collection.JavaConverters;

import java.io.File;
Expand All @@ -46,7 +46,7 @@ public class LogManagerBuilder {
private long retentionCheckMs = 1000L;
private int maxTransactionTimeoutMs = 15 * 60 * 1000;
private int maxPidExpirationMs = 60000;
private ApiVersion interBrokerProtocolVersion = ApiVersion.latestVersion();
private MetadataVersion interBrokerProtocolVersion = MetadataVersion.latest();
private Scheduler scheduler = null;
private BrokerTopicStats brokerTopicStats = null;
private LogDirFailureChannel logDirFailureChannel = null;
Expand Down Expand Up @@ -113,7 +113,7 @@ public LogManagerBuilder setMaxPidExpirationMs(int maxPidExpirationMs) {
return this;
}

public LogManagerBuilder setInterBrokerProtocolVersion(ApiVersion interBrokerProtocolVersion) {
public LogManagerBuilder setInterBrokerProtocolVersion(MetadataVersion interBrokerProtocolVersion) {
this.interBrokerProtocolVersion = interBrokerProtocolVersion;
return this;
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.Optional
import java.util.concurrent.CompletableFuture

import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.api.LeaderAndIsr
import kafka.common.UnexpectedAppendOffsetException
import kafka.controller.{KafkaController, StateChangeLogger}
import kafka.log._
Expand All @@ -42,6 +42,7 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion

import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -232,7 +233,7 @@ case class CommittedPartitionState(
*/
class Partition(val topicPartition: TopicPartition,
val replicaLagTimeMaxMs: Long,
interBrokerProtocolVersion: ApiVersion,
interBrokerProtocolVersion: MetadataVersion,
localBrokerId: Int,
time: Time,
alterPartitionListener: AlterPartitionListener,
Expand Down
35 changes: 18 additions & 17 deletions core/src/main/scala/kafka/controller/ControllerChannelManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{KafkaException, Node, Reconfigurable, TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion._

import scala.jdk.CollectionConverters._
import scala.collection.mutable.HashMap
Expand Down Expand Up @@ -390,7 +391,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
.setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava)
.setIsNew(isNew || alreadyNew)

if (config.interBrokerProtocolVersion >= KAFKA_3_2_IV0) {
if (config.interBrokerProtocolVersion.isAtLeast(IBP_3_2_IV0)) {
partitionState.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
}

Expand Down Expand Up @@ -460,12 +461,12 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,

private def sendLeaderAndIsrRequest(controllerEpoch: Int, stateChangeLog: StateChangeLogger): Unit = {
val leaderAndIsrRequestVersion: Short =
if (config.interBrokerProtocolVersion >= KAFKA_3_2_IV0) 6
else if (config.interBrokerProtocolVersion >= KAFKA_2_8_IV1) 5
else if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 4
else if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV0) 3
else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 2
else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
if (config.interBrokerProtocolVersion.isAtLeast(IBP_3_2_IV0)) 6
else if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV1)) 5
else if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_4_IV1)) 4
else if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_4_IV0)) 3
else if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_2_IV0)) 2
else if (config.interBrokerProtocolVersion.isAtLeast(IBP_1_0_IV0)) 1
else 0

leaderAndIsrRequestMap.forKeyValue { (broker, leaderAndIsrPartitionStates) =>
Expand Down Expand Up @@ -511,13 +512,13 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,

val partitionStates = updateMetadataRequestPartitionInfoMap.values.toBuffer
val updateMetadataRequestVersion: Short =
if (config.interBrokerProtocolVersion >= KAFKA_2_8_IV1) 7
else if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 6
else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 5
else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 4
else if (config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3
else if (config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2
else if (config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV1)) 7
else if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_4_IV1)) 6
else if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_2_IV0)) 5
else if (config.interBrokerProtocolVersion.isAtLeast(IBP_1_0_IV0)) 4
else if (config.interBrokerProtocolVersion.isAtLeast(IBP_0_10_2_IV0)) 3
else if (config.interBrokerProtocolVersion.isAtLeast(IBP_0_10_0_IV1)) 2
else if (config.interBrokerProtocolVersion.isAtLeast(IBP_0_9_0)) 1
else 0

val liveBrokers = controllerContext.liveOrShuttingDownBrokers.iterator.map { broker =>
Expand Down Expand Up @@ -567,9 +568,9 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
private def sendStopReplicaRequests(controllerEpoch: Int, stateChangeLog: StateChangeLogger): Unit = {
val traceEnabled = stateChangeLog.isTraceEnabled
val stopReplicaRequestVersion: Short =
if (config.interBrokerProtocolVersion >= KAFKA_2_6_IV0) 3
else if (config.interBrokerProtocolVersion >= KAFKA_2_4_IV1) 2
else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 1
if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_6_IV0)) 3
else if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_4_IV1)) 2
else if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_2_IV0)) 1
else 0

def responseCallback(brokerId: Int, isPartitionDeleted: TopicPartition => Boolean)
Expand Down
Loading

0 comments on commit 0072ecd

Please sign in to comment.