Skip to content

Commit

Permalink
MINOR: Fix RecordContext Javadoc (#12130)
Browse files Browse the repository at this point in the history
A prior commit accidentally changed the javadoc for RecordContext.
In reality, it is not reachable from api.Processor, only Processor.

Reviewers: Guozhang Wang <guozhang@apache.org>
  • Loading branch information
a0x8o committed May 6, 2022
1 parent c730e45 commit db2339d
Show file tree
Hide file tree
Showing 50 changed files with 3,310 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,11 @@ public class ProducerConfig extends AbstractConfig {
/** <code>max.in.flight.requests.per.connection</code> */
public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
+ " Note that if this config is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"
+ " message re-ordering after a failed send due to retries (i.e., if retries are enabled)."
+ " Additionally, enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.";
+ " Note that if this configuration is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"
+ " message reordering after a failed send due to retries (i.e., if retries are enabled); "
+ " if retries are disabled or if <code>enable.idempotence</code> is set to true, ordering will be preserved."
+ " Additionally, enabling idempotence requires the value of this configuration to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. ";

/** <code>retries</code> */
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.source;

/**
* An enum to represent the level of support for connector-defined transaction boundaries.
*/
public enum ConnectorTransactionBoundaries {
/**
* Signals that a connector can define its own transaction boundaries.
*/
SUPPORTED,
/**
* Signals that a connector cannot define its own transaction boundaries.
*/
UNSUPPORTED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.source;

/**
* An enum to represent the level of support for exactly-once delivery from a source connector.
*/
public enum ExactlyOnceSupport {
/**
* Signals that a connector supports exactly-once delivery.
*/
SUPPORTED,
/**
* Signals that a connector does not support exactly-once delivery.
*/
UNSUPPORTED;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.kafka.connect.connector.Connector;

import java.util.Map;

/**
* SourceConnectors implement the connector interface to pull data from another system and send
* it to Kafka.
Expand All @@ -28,4 +30,46 @@ public abstract class SourceConnector extends Connector {
protected SourceConnectorContext context() {
return (SourceConnectorContext) context;
}

/**
* Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
* Connector authors can assume that worker-level exactly-once support is enabled when this method is invoked.
*
* <p>For backwards compatibility, the default implementation will return {@code null}, but connector authors are
* strongly encouraged to override this method to return a non-null value such as
* {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
*
* <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
* {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.
*
* @param connectorConfig the configuration that will be used for the connector.
* @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support with the given
* configuration, and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If this method is overridden by a
* connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot provide
* exactly-once guarantees.
* @since 3.3
*/
public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
return null;
}

/**
* Signals whether the connector implementation is capable of defining the transaction boundaries for a
* connector with the given configuration. This method is called before {@link #start(Map)}, only when the
* runtime supports exactly-once and the connector configuration includes {@code transaction.boundary=connector}.
*
* <p>This method need not be implemented if the connector implementation does not support defining
* transaction boundaries.
*
* @param connectorConfig the configuration that will be used for the connector
* @return {@link ConnectorTransactionBoundaries#SUPPORTED} if the connector will define its own transaction boundaries,
* or {@link ConnectorTransactionBoundaries#UNSUPPORTED} otherwise. If this method is overridden by a
* connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot define its own
* transaction boundaries.
* @since 3.3
* @see TransactionContext
*/
public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String, String> connectorConfig) {
return ConnectorTransactionBoundaries.UNSUPPORTED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,63 @@
*/
package org.apache.kafka.connect.source;

import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.connector.Task;

import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
* SourceTask is a Task that pulls records from another system for storage in Kafka.
*/
public abstract class SourceTask implements Task {

/**
* The configuration key that determines how source tasks will define transaction boundaries
* when exactly-once support is enabled.
*/
public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary";

/**
* Represents the permitted values for the {@link #TRANSACTION_BOUNDARY_CONFIG} property.
*/
public enum TransactionBoundary {
/**
* A new transaction will be started and committed for every batch of records returned by {@link #poll()}.
*/
POLL,
/**
* Transactions will be started and committed on a user-defined time interval.
*/
INTERVAL,
/**
* Transactions will be defined by the connector itself, via a {@link TransactionContext}.
*/
CONNECTOR;

/**
* The default transaction boundary style that will be used for source connectors when no style is explicitly
* configured.
*/
public static final TransactionBoundary DEFAULT = POLL;

/**
* Parse a {@link TransactionBoundary} from the given string.
* @param property the string to parse; should not be null
* @return the {@link TransactionBoundary} whose name matches the given string
* @throws IllegalArgumentException if there is no transaction boundary type with the given name
*/
public static TransactionBoundary fromProperty(String property) {
return TransactionBoundary.valueOf(property.toUpperCase(Locale.ROOT).trim());
}

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}

protected SourceTaskContext context;

/**
Expand All @@ -44,16 +90,13 @@ public void initialize(SourceTaskContext context) {
public abstract void start(Map<String, String> props);

/**
* <p>
* Poll this source task for new records. If no data is currently available, this method
* should block but return control to the caller regularly (by returning {@code null}) in
* order for the task to transition to the {@code PAUSED} state if requested to do so.
* </p>
* <p>
* The task will be {@link #stop() stopped} on a separate thread, and when that happens
* this method is expected to unblock, quickly finish up any remaining processing, and
* return.
* </p>
*
* @return a list of source records
*/
Expand All @@ -63,12 +106,10 @@ public void initialize(SourceTaskContext context) {
* <p>
* Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This
* method should block until the commit is complete.
* </p>
* <p>
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
* </p>
*/
public void commit() throws InterruptedException {
// This space intentionally left blank.
Expand All @@ -91,17 +132,14 @@ public void commit() throws InterruptedException {
* <p>
* Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
* also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker.
* </p>
* <p>
* This is an alias for {@link #commitRecord(SourceRecord, RecordMetadata)} for backwards compatibility. The default
* implementation of {@link #commitRecord(SourceRecord, RecordMetadata)} just calls this method. It is not necessary
* to override both methods.
* </p>
* <p>
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
* </p>
*
* @param record {@link SourceRecord} that was successfully sent via the producer or filtered by a transformation
* @throws InterruptedException
Expand All @@ -115,19 +153,16 @@ public void commitRecord(SourceRecord record) throws InterruptedException {
/**
* <p>
* Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
* also called when a record is filtered by a transformation or when {@link ConnectorConfig} "errors.tolerance" is set to "all"
* also called when a record is filtered by a transformation or when "errors.tolerance" is set to "all"
* and thus will never be ACK'd by a broker.
* In both cases {@code metadata} will be null.
* </p>
* <p>
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
* </p>
* <p>
* The default implementation just calls {@link #commitRecord(SourceRecord)}, which is a nop by default. It is
* not necessary to implement both methods.
* </p>
*
* @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception
* @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,29 @@ public interface SourceTaskContext {
* Get the OffsetStorageReader for this SourceTask.
*/
OffsetStorageReader offsetStorageReader();

/**
* Get a {@link TransactionContext} that can be used to define producer transaction boundaries
* when exactly-once support is enabled for the connector.
*
* <p>This method was added in Apache Kafka 3.2. Source tasks that use this method but want to
* maintain backward compatibility so they can also be deployed to older Connect runtimes
* should guard the call to this method with a try-catch block, since calling this method will result in a
* {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the source connector is deployed to
* Connect runtimes older than Kafka 3.2. For example:
* <pre>
* TransactionContext transactionContext;
* try {
* transactionContext = context.transactionContext();
* } catch (NoSuchMethodError | NoClassDefFoundError e) {
* transactionContext = null;
* }
* </pre>
*
* @return the transaction context, or null if the connector was not configured to specify transaction boundaries
* @since 3.3
*/
default TransactionContext transactionContext() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.source;

/**
* Provided to source tasks to allow them to define their own producer transaction boundaries when
* exactly-once support is enabled.
*/
public interface TransactionContext {

/**
* Request a transaction commit after the next batch of records from {@link SourceTask#poll()}
* is processed.
*/
void commitTransaction();

/**
* Request a transaction commit after a source record is processed. The source record will be the
* last record in the committed transaction.
* @param record the record to commit the transaction after; may not be null.
*/
void commitTransaction(SourceRecord record);

/**
* Requests a transaction abort after the next batch of records from {@link SourceTask#poll()}. All of
* the records in that transaction will be discarded and will not appear in a committed transaction.
* However, offsets for that transaction will still be committed so than the records in that transaction
* are not reprocessed. If the data should instead be reprocessed, the task should not invoke this method
* and should instead throw an exception.
*/
void abortTransaction();

/**
* Requests a transaction abort after a source record is processed. The source record will be the
* last record in the aborted transaction. All of the records in that transaction will be discarded
* and will not appear in a committed transaction. However, offsets for that transaction will still
* be committed so that the records in that transaction are not reprocessed. If the data should be
* reprocessed, the task should not invoke this method and should instead throw an exception.
* @param record the record to abort the transaction after; may not be null.
*/
void abortTransaction(SourceRecord record);
}
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ object Kafka extends Logging {

try server.startup()
catch {
case _: Throwable =>
case e: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
fatal("Exiting Kafka due to fatal exception during startup.", e)
Exit.exit(1)
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ class LogManager(logDirs: Seq[File],
if (destLog == null)
throw new KafkaStorageException(s"The future replica for $topicPartition is offline")

destLog.renameDir(UnifiedLog.logDirName(topicPartition))
destLog.renameDir(UnifiedLog.logDirName(topicPartition), true)
destLog.updateHighWatermark(sourceLog.highWatermark)

// Now that future replica has been successfully renamed to be the current replica
Expand All @@ -1022,7 +1022,7 @@ class LogManager(logDirs: Seq[File],
}

try {
sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition))
sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), true)
// Now that replica in source log directory has been successfully renamed for deletion.
// Close the log, update checkpoint files, and enqueue this log to be deleted.
sourceLog.close()
Expand Down Expand Up @@ -1069,7 +1069,7 @@ class LogManager(logDirs: Seq[File],
cleaner.updateCheckpoints(removedLog.parentDirFile, partitionToRemove = Option(topicPartition))
}
}
removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition))
removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), false)
if (checkpoint) {
val logDir = removedLog.parentDirFile
val logsToCheckpoint = logsInDir(logDir)
Expand Down
Loading

0 comments on commit db2339d

Please sign in to comment.