Skip to content

Commit

Permalink
MINOR: Improve docs about how to provide multiple log.dir (#12119)
Browse files Browse the repository at this point in the history
Reviewer:  Luke Chen <showuon@gmail.com>
  • Loading branch information
a0x8o committed May 4, 2022
1 parent 0072ecd commit c730e45
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 121 deletions.
2 changes: 0 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,6 @@
files="(MetadataImage).java"/>
<suppress checks="ImportControl"
files="ApiVersionsResponse.java"/>
<suppress checks="AvoidStarImport"
files="MetadataVersionTest.java"/>

<!-- Storage -->
<suppress checks="(CyclomaticComplexity|ParameterNumber)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThro
null,
ListenerType.ZK_BROKER
);
assertEquals(new HashSet<ApiKeys>(ApiKeys.zkBrokerApis()), apiKeysInResponse(response));
assertEquals(new HashSet<>(ApiKeys.zkBrokerApis()), apiKeysInResponse(response));
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
assertTrue(response.data().supportedFeatures().isEmpty());
assertTrue(response.data().finalizedFeatures().isEmpty());
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/BrokerFeatures.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
}

/**
* Returns the default finalized features that a new Kafka cluster with IBP config >= KAFKA_2_7_IV0
* Returns the default finalized features that a new Kafka cluster with IBP config >= IBP_2_7_IV0
* needs to be bootstrapped with.
*/
def defaultFinalizedFeatures: Features[FinalizedVersionRange] = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ object KafkaConfig {
/** ********* Log Configuration ***********/
val NumPartitionsDoc = "The default number of log partitions per topic"
val LogDirDoc = "The directory in which the log data is kept (supplemental for " + LogDirsProp + " property)"
val LogDirsDoc = "The directories in which the log data is kept. If not set, the value in " + LogDirProp + " is used"
val LogDirsDoc = "A comma-separated list of the directories where the log data is stored. If not set, the value in " + LogDirProp + " is used."
val LogSegmentBytesDoc = "The maximum size of a single log file"
val LogRollTimeMillisDoc = "The maximum time before a new log segment is rolled out (in milliseconds). If not set, the value in " + LogRollTimeHoursProp + " is used"
val LogRollTimeHoursDoc = "The maximum time before a new log segment is rolled out (in hours), secondary to " + LogRollTimeMillisProp + " property"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,52 @@

package org.apache.kafka.server.common;

import java.util.Arrays;
import java.util.HashSet;
import org.apache.kafka.common.feature.Features;
import org.apache.kafka.common.feature.FinalizedVersionRange;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.message.ApiMessageType.ListenerType;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.FinalizedFeatureKey;
import org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureKey;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.RecordVersion;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;

import java.util.Arrays;

import static org.apache.kafka.server.common.MetadataVersion.*;
import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2;
import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_2_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV2;
import static org.apache.kafka.server.common.MetadataVersion.IBP_0_8_0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_0_8_1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_0_8_2;
import static org.apache.kafka.server.common.MetadataVersion.IBP_0_9_0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_1_0_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_1_1_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_0_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_0_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_1_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_1_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_1_IV2;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_2_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_2_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_3_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_3_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_4_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_4_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_5_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV2;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_1_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Test;

class MetadataVersionTest {

Expand Down Expand Up @@ -241,103 +264,4 @@ public void testVersion() {
assertEquals("3.2-IV0", IBP_3_2_IV0.version());
}

@Test
public void shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue() {
ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse(
10,
RecordVersion.V1,
Features.emptySupportedFeatures(),
Features.emptyFinalizedFeatures(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER
);
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
assertEquals(10, response.throttleTimeMs());
assertTrue(response.data().supportedFeatures().isEmpty());
assertTrue(response.data().finalizedFeatures().isEmpty());
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, response.data().finalizedFeaturesEpoch());
}

@Test
public void shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() {
ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse(
10,
RecordVersion.V1,
Features.supportedFeatures(
Utils.mkMap(Utils.mkEntry("feature", new SupportedVersionRange((short) 1, (short) 4)))),
Features.finalizedFeatures(
Utils.mkMap(Utils.mkEntry("feature", new FinalizedVersionRange((short) 2, (short) 3)))),
10L,
null,
ListenerType.ZK_BROKER
);

verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
assertEquals(10, response.throttleTimeMs());
assertEquals(1, response.data().supportedFeatures().size());
SupportedFeatureKey sKey = response.data().supportedFeatures().find("feature");
assertNotNull(sKey);
assertEquals(1, sKey.minVersion());
assertEquals(4, sKey.maxVersion());
assertEquals(1, response.data().finalizedFeatures().size());
FinalizedFeatureKey fKey = response.data().finalizedFeatures().find("feature");
assertNotNull(fKey);
assertEquals(2, fKey.minVersionLevel());
assertEquals(3, fKey.maxVersionLevel());
assertEquals(10, response.data().finalizedFeaturesEpoch());
}

private void verifyApiKeysForMagic(ApiVersionsResponse response, Byte maxMagic) {
for (ApiVersion version : response.data().apiKeys()) {
assertTrue(ApiKeys.forId(version.apiKey()).minRequiredInterBrokerMagic <= maxMagic);
}
}

@Test
public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() {
ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME,
RecordVersion.current(),
Features.emptySupportedFeatures(),
Features.emptyFinalizedFeatures(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER
);
assertEquals(new HashSet<ApiKeys>(ApiKeys.zkBrokerApis()), apiKeysInResponse(response));
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
assertTrue(response.data().supportedFeatures().isEmpty());
assertTrue(response.data().finalizedFeatures().isEmpty());
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, response.data().finalizedFeaturesEpoch());
}

@Test
public void testMetadataQuorumApisAreDisabled() {
ApiVersionsResponse response = ApiVersionsResponse.createApiVersionsResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME,
RecordVersion.current(),
Features.emptySupportedFeatures(),
Features.emptyFinalizedFeatures(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER
);

// Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them
HashSet<ApiKeys> exposedApis = apiKeysInResponse(response);
assertFalse(exposedApis.contains(ApiKeys.ENVELOPE));
assertFalse(exposedApis.contains(ApiKeys.VOTE));
assertFalse(exposedApis.contains(ApiKeys.BEGIN_QUORUM_EPOCH));
assertFalse(exposedApis.contains(ApiKeys.END_QUORUM_EPOCH));
assertFalse(exposedApis.contains(ApiKeys.DESCRIBE_QUORUM));
}

private HashSet<ApiKeys> apiKeysInResponse(ApiVersionsResponse apiVersions) {
HashSet<ApiKeys> apiKeys = new HashSet<>();
for (ApiVersion version : apiVersions.data().apiKeys()) {
apiKeys.add(ApiKeys.forId(version.apiKey()));
}
return apiKeys;
}
}

0 comments on commit c730e45

Please sign in to comment.