Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache-kafka/3.1' into 3.1-sync-ak-to-c…
Browse files Browse the repository at this point in the history
…cs-11-may-2022

* apache-kafka/3.1: (51 commits)
  MINOR: reload4j build dependency fixes (apache#12144)
  KAFKA-13255: Use config.properties.exclude when mirroring topics (apache#11401)
  KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionsManager (round 3) (apache#12096)
  KAFKA-13794: Follow up to fix producer batch comparator (apache#12006)
  fix: make sliding window works without grace period (#kafka-13739) (apache#11980)
  3.1.1 release notes (apache#12001)
  KAFKA-13794; Fix comparator of `inflightBatchesBySequence` in `TransactionManager` (apache#11991)
  KAFKA-13782; Ensure correct partition added to txn after abort on full batch (apache#11995)
  KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default (apache#11908)
  KAFKA-13775: CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1 (apache#11962)
  ...
  • Loading branch information
jeffkbkim committed May 12, 2022
2 parents 8c11f49 + bafa69e commit e249b1b
Show file tree
Hide file tree
Showing 102 changed files with 3,486 additions and 943 deletions.
67 changes: 34 additions & 33 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -208,44 +208,46 @@ License Version 2.0:
audience-annotations-0.5.0
commons-cli-1.4
commons-lang3-3.8.1
jackson-annotations-2.12.3
jackson-core-2.12.3
jackson-databind-2.12.3
jackson-dataformat-csv-2.12.3
jackson-datatype-jdk8-2.12.3
jackson-jaxrs-base-2.12.3
jackson-jaxrs-json-provider-2.12.3
jackson-module-jaxb-annotations-2.12.3
jackson-module-paranamer-2.10.5
jackson-module-scala_2.13-2.12.3
jackson-annotations-2.12.6
jackson-core-2.12.6
jackson-databind-2.12.6.1
jackson-dataformat-csv-2.12.6
jackson-datatype-jdk8-2.12.6
jackson-jaxrs-base-2.12.6
jackson-jaxrs-json-provider-2.12.6
jackson-module-jaxb-annotations-2.12.6
jackson-module-scala_2.13-2.12.6
jakarta.validation-api-2.0.2
javassist-3.27.0-GA
jetty-client-9.4.43.v20210629
jetty-continuation-9.4.43.v20210629
jetty-http-9.4.43.v20210629
jetty-io-9.4.43.v20210629
jetty-security-9.4.43.v20210629
jetty-server-9.4.43.v20210629
jetty-servlet-9.4.43.v20210629
jetty-servlets-9.4.43.v20210629
jetty-util-9.4.43.v20210629
jetty-util-ajax-9.4.43.v20210629
jetty-client-9.4.44.v20210927
jetty-continuation-9.4.44.v20210927
jetty-http-9.4.44.v20210927
jetty-io-9.4.44.v20210927
jetty-security-9.4.44.v20210927
jetty-server-9.4.44.v20210927
jetty-servlet-9.4.44.v20210927
jetty-servlets-9.4.44.v20210927
jetty-util-9.4.44.v20210927
jetty-util-ajax-9.4.44.v20210927
jersey-common-2.34
jersey-server-2.34
jose4j-0.7.8
log4j-1.2.17
lz4-java-1.8.0
maven-artifact-3.8.1
metrics-core-2.2.0
metrics-core-4.1.12.1
netty-buffer-4.1.68.Final
netty-codec-4.1.68.Final
netty-common-4.1.68.Final
netty-handler-4.1.68.Final
netty-resolver-4.1.68.Final
netty-transport-4.1.68.Final
netty-transport-native-epoll-4.1.68.Final
netty-transport-native-unix-common-4.1.68.Final
netty-buffer-4.1.73.Final
netty-codec-4.1.73.Final
netty-common-4.1.73.Final
netty-handler-4.1.73.Final
netty-resolver-4.1.73.Final
netty-tcnative-classes-2.0.46.Final
netty-transport-4.1.73.Final
netty-transport-native-epoll-4.1.73.Final
netty-transport-native-unix-common-4.1.73.Final
netty-transport-classes-epoll-4.1.73.Final
plexus-utils-3.2.1
reload4j-1.2.19
rocksdbjni-6.22.1.1
scala-collection-compat_2.13-2.4.4
scala-library-2.13.6
Expand Down Expand Up @@ -285,7 +287,6 @@ jersey-container-servlet-2.34
jersey-container-servlet-core-2.34
jersey-client-2.34
jersey-hk2-2.34
jersey-media-jaxb-2.31

---------------------------------------
CDDL 1.1 + GPLv2 with classpath exception
Expand All @@ -300,13 +301,13 @@ MIT License

argparse4j-0.7.0, see: licenses/argparse-MIT
jopt-simple-5.0.4, see: licenses/jopt-simple-MIT
slf4j-api-1.7.30, see: licenses/slf4j-MIT
slf4j-log4j12-1.7.30, see: licenses/slf4j-MIT
slf4j-api-1.7.36, see: licenses/slf4j-MIT
slf4j-reload4j-1.7.36, see: licenses/slf4j-MIT

---------------------------------------
BSD 2-Clause

zstd-jni-1.5.0-4 see: licenses/zstd-jni-BSD-2-clause
zstd-jni-1.5.0-4, see: licenses/zstd-jni-BSD-2-clause

---------------------------------------
BSD 3-Clause
Expand Down
4 changes: 2 additions & 2 deletions bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ if [ -z "$INCLUDE_TEST_JARS" ]; then
fi

# Exclude jars not necessary for running commands.
regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc)$"
regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc|connect-file.*\.jar)$"
should_include_file() {
if [ "$INCLUDE_TEST_JARS" = true ]; then
return 0
Expand Down Expand Up @@ -171,7 +171,7 @@ do
CLASSPATH="$CLASSPATH:$dir/*"
done

for cc_pkg in "api" "transforms" "runtime" "file" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
for cc_pkg in "api" "transforms" "runtime" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
do
for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
do
Expand Down
35 changes: 22 additions & 13 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ allprojects {
}
}
}

task printAllDependencies(type: DependencyReportTask) {}
}

ext {
Expand Down Expand Up @@ -926,7 +928,7 @@ project(':core') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -1329,6 +1331,7 @@ project(':clients') {
include "**/org/apache/kafka/common/security/scram/*"
include "**/org/apache/kafka/common/security/token/delegation/*"
include "**/org/apache/kafka/common/security/oauthbearer/*"
include "**/org/apache/kafka/common/security/oauthbearer/secured/*"
include "**/org/apache/kafka/server/authorizer/*"
include "**/org/apache/kafka/server/policy/*"
include "**/org/apache/kafka/server/quota/*"
Expand Down Expand Up @@ -1650,7 +1653,7 @@ project(':tools') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -1700,7 +1703,7 @@ project(':trogdor') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -2007,7 +2010,10 @@ project(':streams:upgrade-system-tests-0100') {
archivesBaseName = "kafka-streams-upgrade-system-tests-0100"

dependencies {
testImplementation libs.kafkaStreams_0100
testImplementation(libs.kafkaStreams_0100) {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'log4j', module: 'log4j'
}
testRuntimeOnly libs.junitJupiter
}

Expand All @@ -2020,7 +2026,10 @@ project(':streams:upgrade-system-tests-0101') {
archivesBaseName = "kafka-streams-upgrade-system-tests-0101"

dependencies {
testImplementation libs.kafkaStreams_0101
testImplementation(libs.kafkaStreams_0101) {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'log4j', module: 'log4j'
}
testRuntimeOnly libs.junitJupiter
}

Expand Down Expand Up @@ -2300,7 +2309,7 @@ project(':connect:api') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -2337,7 +2346,7 @@ project(':connect:transforms') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -2377,7 +2386,7 @@ project(':connect:json') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -2443,8 +2452,8 @@ project(':connect:runtime') {

tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
// No need to copy log4j since the module has an explicit dependency on that
include('slf4j-log4j12*')
include('log4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -2524,7 +2533,7 @@ project(':connect:file') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -2563,7 +2572,7 @@ project(':connect:basic-auth-extension') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -2610,7 +2619,7 @@ project(':connect:mirror') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -2645,7 +2654,7 @@ project(':connect:mirror-client') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down
3 changes: 3 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@
<subpackage name="server">
<allow pkg="org.apache.kafka.common" />

<!-- This is required to make AlterConfigPolicyTest work. -->
<allow pkg="org.apache.kafka.server.policy" />

<subpackage name="common">
<allow pkg="org.apache.kafka.server.common" />
</subpackage>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ public class CommonClientConfigs {
public static Map<String, Object> postProcessReconnectBackoffConfigs(AbstractConfig config,
Map<String, Object> parsedValues) {
HashMap<String, Object> rval = new HashMap<>();
if ((!config.originals().containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
config.originals().containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
Map<String, Object> originalConfig = config.originals();
if ((!originalConfig.containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
originalConfig.containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
log.debug("Disabling exponential reconnect backoff because {} is set, but {} is not.",
RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MAX_MS_CONFIG);
rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,6 @@ private void handleConnections() {
// Therefore, it is still necessary to check isChannelReady before attempting to send on this
// connection.
if (discoverBrokerVersions) {
this.connectionStates.checkingApiVersions(node);
nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
log.debug("Completed connection to node {}. Fetching API versions.", node);
} else {
Expand All @@ -964,6 +963,11 @@ private void handleInitiateApiVersionRequests(long now) {
String node = entry.getKey();
if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) {
log.debug("Initiating API versions fetch from node {}.", node);
// We transition the connection to the CHECKING_API_VERSIONS state only when
// the ApiVersionsRequest is queued up to be sent out. Without this, the client
// could remain in the CHECKING_API_VERSIONS state forever if the channel does
// not before ready.
this.connectionStates.checkingApiVersions(node);
ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue();
ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true);
doSend(clientRequest, true, now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public Collection<String> topics() {

public static void handleMetadataErrors(MetadataResponse response) {
for (TopicMetadata tm : response.topicMetadata()) {
if (shouldRefreshMetadata(tm.error())) throw tm.error().exception();
for (PartitionMetadata pm : tm.partitionMetadata()) {
if (shouldRefreshMetadata(pm.error)) {
throw pm.error.exception();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public class ConsumerConfig extends AbstractConfig {
" <code>read_committed</code> mode, consumer.poll() will only return messages up to the last stable offset (LSO), which is the one less than the offset of the first open transaction." +
" In particular any messages appearing after messages belonging to ongoing transactions will be withheld until the relevant transaction has been completed. As a result, <code>read_committed</code>" +
" consumers will not be able to read up to the high watermark when there are in flight transactions.</p><p> Further, when in <code>read_committed</code> the seekToEnd method will" +
" return the LSO";
" return the LSO</p>";

public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2186,11 +2186,11 @@ public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> par
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
* the amount of time allocated by {@code request.timeout.ms} expires
* the amount of time allocated by {@code default.api.timeout.ms} expires
*/
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
return endOffsets(partitions, Duration.ofMillis(requestTimeoutMs));
return endOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ void maybeUpdateSubscriptionMetadata() {
}
}

private boolean coordinatorUnknownAndUnready(Timer timer) {
return coordinatorUnknown() && !ensureCoordinatorReady(timer);
}

/**
* Poll for coordinator events. This ensures that the coordinator is known and that the consumer
* has joined the group (if it is using group management). This also handles periodic offset commits
Expand All @@ -480,7 +484,7 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) {
// Always update the heartbeat last poll time so that the heartbeat thread does not leave the
// group proactively due to application inactivity even if (say) the coordinator cannot be found.
pollHeartbeat(timer.currentTimeMs());
if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
if (coordinatorUnknownAndUnready(timer)) {
return false;
}

Expand Down Expand Up @@ -517,15 +521,13 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) {
}
}
} else {
// For manually assigned partitions, if there are no ready nodes, await metadata.
// For manually assigned partitions, if coordinator is unknown, make sure we lookup one and await metadata.
// If connections to all nodes fail, wakeups triggered while attempting to send fetch
// requests result in polls returning immediately, causing a tight loop of polls. Without
// the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
// awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
// When group management is used, metadata wait is already performed for this scenario as
// coordinator is unknown, hence this check is not required.
if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
client.awaitMetadataUpdate(timer);
// awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop.
if (coordinatorUnknownAndUnready(timer)) {
return false;
}
}

Expand Down Expand Up @@ -1021,7 +1023,7 @@ public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets,
return true;

do {
if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
if (coordinatorUnknownAndUnready(timer)) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ public interface Callback {
/**
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
* be called when the record sent to the server has been acknowledged. When exception is not null in the callback,
* metadata will contain the special -1 value for all fields except for topicPartition, which will be valid.
* metadata will contain the special -1 value for all fields. If topicPartition cannot be
* choosen, a -1 value will be assigned.
*
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). An empty metadata
* with -1 value for all fields except for topicPartition will be returned if an error occurred.
* with -1 value for all fields will be returned if an error occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
* Possible thrown exceptions include:
*
Expand Down
Loading

0 comments on commit e249b1b

Please sign in to comment.