Skip to content

Commit

Permalink
KAFKA-13763: Refactor IncrementalCooperativeAssignor for improved uni…
Browse files Browse the repository at this point in the history
…t testing (#11983)

The goals here include:

1. Create an overloaded variant of the IncrementalCooperativeAssignor::performTaskAssignment method that is more testing friendly
2. Simplify the parameter list for the IncrementalCooperativeAssignor::handleLostAssignments method, which in turn simplifies the logic for testing this class
3. Capture repeated Java 8 streams logic in simple, reusable, easily-verifiable utility methods added to the ConnectUtils class

Reviewers: Luke Chen <showuon@gmail.com>
  • Loading branch information
a0x8o committed May 9, 2022
1 parent db2339d commit 10b0141
Show file tree
Hide file tree
Showing 35 changed files with 2,086 additions and 813 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
import org.apache.kafka.common.Cluster;
Expand Down Expand Up @@ -117,10 +116,24 @@ public MockProducer(final Cluster cluster,
*
* Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
*/
@SuppressWarnings("deprecation")
public MockProducer(final boolean autoComplete,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
this(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer);
this(Cluster.empty(), autoComplete, new org.apache.kafka.clients.producer.internals.DefaultPartitioner(), keySerializer, valueSerializer);
}

/**
* Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers.
*
* Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(cluster, autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
*/
@SuppressWarnings("deprecation")
public MockProducer(final Cluster cluster,
final boolean autoComplete,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
this(cluster, autoComplete, new org.apache.kafka.clients.producer.internals.DefaultPartitioner(), keySerializer, valueSerializer);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,16 @@ public interface Partitioner extends Configurable, Closeable {
void close();

/**
* Note this method is only implemented in DefatultPartitioner and UniformStickyPartitioner which
* are now deprecated. See KIP-794 for more info.
*
* Notifies the partitioner a new batch is about to be created. When using the sticky partitioner,
* this method can change the chosen sticky partition for the new batch.
* this method can change the chosen sticky partition for the new batch.
* @param topic The topic name
* @param cluster The current cluster metadata
* @param prevPartition The partition previously selected for the record that triggered a new batch
*/
@Deprecated
default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
Expand Down Expand Up @@ -93,6 +92,26 @@ public class ProducerConfig extends AbstractConfig {
+ "This <code>linger.ms</code> setting defaults to 0, which means we'll immediately send out a record even the accumulated "
+ "batch size is under this <code>batch.size</code> setting.";

/** <code>partitioner.adaptive.partitioning.enable</code> */
public static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG = "partitioner.adaptive.partitioning.enable";
private static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC =
"When set to 'true', the producer will try to adapt to broker performance and produce more messages to partitions hosted on faster brokers. "
+ "If 'false', producer will try to distribute messages uniformly. Note: this setting has no effect if a custom partitioner is used";

/** <code>partitioner.availability.timeout.ms</code> */
public static final String PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG = "partitioner.availability.timeout.ms";
private static final String PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC =
"If a broker cannot process produce requests from a partition for <code>" + PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG + "</code> time, "
+ "the partitioner treats that partition as not available. If the value is 0, this logic is disabled. "
+ "Note: this setting has no effect if a custom partitioner is used or <code>" + PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG
+ "<code/> is set to 'false'";

/** <code>partitioner.ignore.keys</code> */
public static final String PARTITIONER_IGNORE_KEYS_CONFIG = "partitioner.ignore.keys";
private static final String PARTITIONER_IGNORE_KEYS_DOC = "When set to 'true' the producer won't use record keys to choose a partition. "
+ "If 'false', producer would choose a partition based on a hash of the key when a key is present. "
+ "Note: this setting has no effect if a custom partitioner is used.";

/** <code>acks</code> */
public static final String ACKS_CONFIG = "acks";
private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "
Expand Down Expand Up @@ -259,21 +278,18 @@ public class ProducerConfig extends AbstractConfig {
public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
private static final String PARTITIONER_CLASS_DOC = "A class to use to determine which partition to be send to when produce the records. Available options are:" +
"<ul>" +
"<li><code>org.apache.kafka.clients.producer.internals.DefaultPartitioner</code>: The default partitioner. " +
"This strategy will try sticking to a partition until the batch is full, or <code>linger.ms</code> is up. It works with the strategy:" +
"<li>If not set, the default partitioning logic is used. " +
"This strategy will try sticking to a partition until " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" +
"<ul>" +
"<li>If no partition is specified but a key is present, choose a partition based on a hash of the key</li>" +
"<li>If no partition or key is present, choose the sticky partition that changes when the batch is full, or <code>linger.ms</code> is up.</li>" +
"<li>If no partition or key is present, choose the sticky partition that changes when " + BATCH_SIZE_CONFIG + " bytes are produced to the partition.</li>" +
"</ul>" +
"</li>" +
"<li><code>org.apache.kafka.clients.producer.RoundRobinPartitioner</code>: This partitioning strategy is that " +
"each record in a series of consecutive records will be sent to a different partition(no matter if the 'key' is provided or not), " +
"until we run out of partitions and start over again. Note: There's a known issue that will cause uneven distribution when new batch is created. " +
"Please check KAFKA-9965 for more detail." +
"</li>" +
"<li><code>org.apache.kafka.clients.producer.UniformStickyPartitioner</code>: This partitioning strategy will " +
"try sticking to a partition(no matter if the 'key' is provided or not) until the batch is full, or <code>linger.ms</code> is up." +
"</li>" +
"</ul>" +
"<p>Implementing the <code>org.apache.kafka.clients.producer.Partitioner</code> interface allows you to plug in a custom partitioner.";

Expand Down Expand Up @@ -334,6 +350,9 @@ public class ProducerConfig extends AbstractConfig {
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, CompressionType.NONE.name, in(Utils.enumOptions(CompressionType.class)), Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.define(PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC)
.define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC)
.define(PARTITIONER_IGNORE_KEYS_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, PARTITIONER_IGNORE_KEYS_DOC)
.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
.define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
Expand Down Expand Up @@ -418,7 +437,7 @@ public class ProducerConfig extends AbstractConfig {
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(PARTITIONER_CLASS_CONFIG,
Type.CLASS,
DefaultPartitioner.class,
null,
Importance.MEDIUM, PARTITIONER_CLASS_DOC)
.define(INTERCEPTOR_CLASSES_CONFIG,
Type.LIST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@


/**
* NOTE this partitioner is deprecated and shouldn't be used. To use default partitioning logic
* remove partitioner.class configuration setting and set partitioner.ignore.keys=true.
* See KIP-794 for more info.
*
* The partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
Expand All @@ -33,6 +37,7 @@
*
* See KIP-480 for details about sticky partitioning.
*/
@Deprecated
public class UniformStickyPartitioner implements Partitioner {

private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
Expand All @@ -59,6 +64,7 @@ public void close() {}
* If a batch completed for the current sticky partition, change the sticky partition.
* Alternately, if no sticky partition has been determined, set one.
*/
@SuppressWarnings("deprecation")
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ public void deallocate(ByteBuffer buffer, int size) {
}

public void deallocate(ByteBuffer buffer) {
deallocate(buffer, buffer.capacity());
if (buffer != null)
deallocate(buffer, buffer.capacity());
}

/**
Expand Down
Loading

0 comments on commit 10b0141

Please sign in to comment.