Skip to content

Commit

Permalink
MINOR: reload4j build dependency fixes (#12144)
Browse files Browse the repository at this point in the history
* Replace `log4j` with `reload4j` in `copyDependantLibs`. Since we have
  some projects that have an explicit `reload4j` dependency, it
  was included in the final release release tar - i.e. it was effectively
  a workaround for this bug.
* Exclude `log4j` and `slf4j-log4j12` transitive dependencies for
  `streams:upgrade-system-tests`. Versions 0100 and 0101
  had a transitive dependency to `log4j` and `slf4j-log4j12` via
  `zkclient` and `zookeeper`. This avoids classpath conflicts that lead
  to [NoSuchFieldError](qos-ch/reload4j#41) in
  system tests.

Reviewers: Jason Gustafson <jason@confluent.io>
  • Loading branch information
a0x8o committed May 11, 2022
1 parent 641143a commit d378d52
Show file tree
Hide file tree
Showing 36 changed files with 858 additions and 545 deletions.
32 changes: 19 additions & 13 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ project(':core') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -1701,7 +1701,7 @@ project(':tools') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -1751,7 +1751,7 @@ project(':trogdor') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -2071,7 +2071,10 @@ project(':streams:upgrade-system-tests-0100') {
archivesBaseName = "kafka-streams-upgrade-system-tests-0100"

dependencies {
testImplementation libs.kafkaStreams_0100
testImplementation(libs.kafkaStreams_0100) {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'log4j', module: 'log4j'
}
testRuntimeOnly libs.junitJupiter
}

Expand All @@ -2084,7 +2087,10 @@ project(':streams:upgrade-system-tests-0101') {
archivesBaseName = "kafka-streams-upgrade-system-tests-0101"

dependencies {
testImplementation libs.kafkaStreams_0101
testImplementation(libs.kafkaStreams_0101) {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'log4j', module: 'log4j'
}
testRuntimeOnly libs.junitJupiter
}

Expand Down Expand Up @@ -2391,7 +2397,7 @@ project(':connect:api') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -2428,7 +2434,7 @@ project(':connect:transforms') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -2468,7 +2474,7 @@ project(':connect:json') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -2534,8 +2540,8 @@ project(':connect:runtime') {

tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
// No need to copy log4j since the module has an explicit dependency on that
include('slf4j-log4j12*')
include('log4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -2614,7 +2620,7 @@ project(':connect:file') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -2653,7 +2659,7 @@ project(':connect:basic-auth-extension') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -2700,7 +2706,7 @@ project(':connect:mirror') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down Expand Up @@ -2735,7 +2741,7 @@ project(':connect:mirror-client') {
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntimeClasspath) {
include('slf4j-log4j12*')
include('log4j*jar')
include('reload4j*jar')
}
from (configurations.runtimeClasspath) {
exclude('kafka-clients*')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ public long pollDelayMs(String id, long now) {
public void checkingApiVersions(String id) {
NodeConnectionState nodeState = nodeState(id);
nodeState.state = ConnectionState.CHECKING_API_VERSIONS;
resetReconnectBackoff(nodeState);
resetConnectionSetupTimeout(nodeState);
connectingNodes.remove(id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,20 +231,8 @@ public void testMaxReconnectBackoff() {

@Test
public void testExponentialReconnectBackoff() {
double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1))
/ Math.log(reconnectBackoffExpBase);

// Run through 10 disconnects and check that reconnect backoff value is within expected range for every attempt
for (int i = 0; i < 10; i++) {
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost");
connectionStates.disconnected(nodeId1, time.milliseconds());
// Calculate expected backoff value without jitter
long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp))
* reconnectBackoffMs);
long currentBackoff = connectionStates.connectionDelay(nodeId1, time.milliseconds());
assertEquals(expectedBackoff, currentBackoff, reconnectBackoffJitter * expectedBackoff);
time.sleep(connectionStates.connectionDelay(nodeId1, time.milliseconds()) + 1);
}
verifyReconnectExponentialBackoff(false);
verifyReconnectExponentialBackoff(true);
}

@Test
Expand Down Expand Up @@ -426,4 +414,26 @@ private void setupMultipleIPs() {
this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax,
connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, new LogContext(), this.multipleIPHostResolver);
}

private void verifyReconnectExponentialBackoff(boolean enterCheckingApiVersionState) {
double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1))
/ Math.log(reconnectBackoffExpBase);

connectionStates.remove(nodeId1);
// Run through 10 disconnects and check that reconnect backoff value is within expected range for every attempt
for (int i = 0; i < 10; i++) {
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost");
if (enterCheckingApiVersionState) {
connectionStates.checkingApiVersions(nodeId1);
}

connectionStates.disconnected(nodeId1, time.milliseconds());
// Calculate expected backoff value without jitter
long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp))
* reconnectBackoffMs);
long currentBackoff = connectionStates.connectionDelay(nodeId1, time.milliseconds());
assertEquals(expectedBackoff, currentBackoff, reconnectBackoffJitter * expectedBackoff);
time.sleep(connectionStates.connectionDelay(nodeId1, time.milliseconds()) + 1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class NetworkClientTest {
protected final long reconnectBackoffMaxMsTest = 10 * 10000;
protected final long connectionSetupTimeoutMsTest = 5 * 1000;
protected final long connectionSetupTimeoutMaxMsTest = 127 * 1000;
private final int reconnectBackoffExpBase = ClusterConnectionStates.RECONNECT_BACKOFF_EXP_BASE;
private final double reconnectBackoffJitter = ClusterConnectionStates.RECONNECT_BACKOFF_JITTER;
private final TestMetadataUpdater metadataUpdater = new TestMetadataUpdater(Collections.singletonList(node));
private final NetworkClient client = createNetworkClient(reconnectBackoffMaxMsTest);
private final NetworkClient clientWithNoExponentialBackoff = createNetworkClient(reconnectBackoffMsTest);
Expand Down Expand Up @@ -831,13 +833,28 @@ public void testDisconnectDuringUserMetadataRequest() {

@Test
public void testServerDisconnectAfterInternalApiVersionRequest() throws Exception {
awaitInFlightApiVersionRequest();
selector.serverDisconnect(node.idString());

// The failed ApiVersion request should not be forwarded to upper layers
List<ClientResponse> responses = client.poll(0, time.milliseconds());
assertFalse(client.hasInFlightRequests(node.idString()));
assertTrue(responses.isEmpty());
final long numIterations = 5;
double reconnectBackoffMaxExp = Math.log(reconnectBackoffMaxMsTest / (double) Math.max(reconnectBackoffMsTest, 1))
/ Math.log(reconnectBackoffExpBase);
for (int i = 0; i < numIterations; i++) {
selector.clear();
awaitInFlightApiVersionRequest();
selector.serverDisconnect(node.idString());

// The failed ApiVersion request should not be forwarded to upper layers
List<ClientResponse> responses = client.poll(0, time.milliseconds());
assertFalse(client.hasInFlightRequests(node.idString()));
assertTrue(responses.isEmpty());

long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp))
* reconnectBackoffMsTest);
long delay = client.connectionDelay(node, time.milliseconds());
assertEquals(expectedBackoff, delay, reconnectBackoffJitter * expectedBackoff);
if (i == numIterations - 1) {
break;
}
time.sleep(delay + 1);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void start(Map<String, String> props) {
}

@Override
public void commit() throws InterruptedException {
public void commit() {
// nop
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public class OffsetSync {
new Field(TOPIC_KEY, Type.STRING),
new Field(PARTITION_KEY, Type.INT32));

private TopicPartition topicPartition;
private long upstreamOffset;
private long downstreamOffset;
private final TopicPartition topicPartition;
private final long upstreamOffset;
private final long downstreamOffset;

public OffsetSync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
this.topicPartition = topicPartition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@

/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
class OffsetSyncStore implements AutoCloseable {
private KafkaConsumer<byte[], byte[]> consumer;
private Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
private TopicPartition offsetSyncTopicPartition;
private final KafkaConsumer<byte[], byte[]> consumer;
private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
private final TopicPartition offsetSyncTopicPartition;

OffsetSyncStore(MirrorConnectorConfig config) {
consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.slf4j.LoggerFactory;

class Scheduler implements AutoCloseable {
private static Logger log = LoggerFactory.getLogger(Scheduler.class);
private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);

private final String name;
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
Expand Down Expand Up @@ -62,11 +62,11 @@ void execute(Task task, String description) {
try {
executor.submit(() -> executeThread(task, description)).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("{} was interrupted running task: {}", name, description);
LOG.warn("{} was interrupted running task: {}", name, description);
} catch (TimeoutException e) {
log.error("{} timed out running task: {}", name, description);
LOG.error("{} timed out running task: {}", name, description);
} catch (Throwable e) {
log.error("{} caught exception in task: {}", name, description, e);
LOG.error("{} caught exception in task: {}", name, description, e);
}
}

Expand All @@ -76,10 +76,10 @@ public void close() {
try {
boolean terminated = executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
if (!terminated) {
log.error("{} timed out during shutdown of internal scheduler.", name);
LOG.error("{} timed out during shutdown of internal scheduler.", name);
}
} catch (InterruptedException e) {
log.warn("{} was interrupted during shutdown of internal scheduler.", name);
LOG.warn("{} was interrupted during shutdown of internal scheduler.", name);
}
}

Expand All @@ -92,21 +92,21 @@ private void run(Task task, String description) {
long start = System.currentTimeMillis();
task.run();
long elapsed = System.currentTimeMillis() - start;
log.info("{} took {} ms", description, elapsed);
LOG.info("{} took {} ms", description, elapsed);
if (elapsed > timeout.toMillis()) {
log.warn("{} took too long ({} ms) running task: {}", name, elapsed, description);
LOG.warn("{} took too long ({} ms) running task: {}", name, elapsed, description);
}
} catch (InterruptedException e) {
log.warn("{} was interrupted running task: {}", name, description);
LOG.warn("{} was interrupted running task: {}", name, description);
} catch (Throwable e) {
log.error("{} caught exception in scheduled task: {}", name, description, e);
LOG.error("{} caught exception in scheduled task: {}", name, description, e);
}
}

private void executeThread(Task task, String description) {
Thread.currentThread().setName(name + "-" + description);
if (closed) {
log.info("{} skipping task due to shutdown: {}", name, description);
LOG.info("{} skipping task due to shutdown: {}", name, description);
return;
}
run(task, description);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ private void createTopics() {
/*
* Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
*/
protected void warmUpConsumer(Map<String, Object> consumerProps) throws InterruptedException {
protected void warmUpConsumer(Map<String, Object> consumerProps) {
Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
dummyConsumer.commitSync();
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/api/Request.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ object Request {
// Broker ids are non-negative int.
def isValidBrokerId(brokerId: Int): Boolean = brokerId >= 0

def isConsumer(replicaId: Int): Boolean = {
replicaId < 0 && replicaId != FutureLocalReplicaId
}

def describeReplicaId(replicaId: Int): String = {
replicaId match {
case OrdinaryConsumerId => "consumer"
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/ConfigAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,8 @@ object ConfigAdminManager {
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST)))
.getOrElse("")
.split(",").toList
val newValueList = oldValueList ::: alterConfigOp.configEntry.value.split(",").toList
val appendingValueList = alterConfigOp.configEntry.value.split(",").toList.filter(value => !oldValueList.contains(value))
val newValueList = oldValueList ::: appendingValueList
configProps.setProperty(alterConfigOp.configEntry.name, newValueList.mkString(","))
}
case OpType.SUBTRACT => {
Expand Down
Loading

0 comments on commit d378d52

Please sign in to comment.