Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add approx. transactional api #98

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ let package = Package(
.package(url: "https://github.com/apple/swift-nio.git", from: "2.55.0"),
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.0.0-alpha.1"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
.package(url: "https://github.com/ordo-one/package-concurrency-helpers", .upToNextMajor(from: "1.0.0")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't add this dependency.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can copy helpers from there.

// The zstd Swift package produces warnings that we cannot resolve:
// https://github.com/facebook/zstd/issues/3328
.package(url: "https://github.com/facebook/zstd.git", from: "1.5.0"),
Expand Down Expand Up @@ -73,6 +74,7 @@ let package = Package(
name: "SwiftKafka",
dependencies: [
"Crdkafka",
.product(name: "ConcurrencyHelpers", package: "package-concurrency-helpers", moduleAliases: ["ConcurrencyHelpers" : "BlockingCallWrapper"]),
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle"),
.product(name: "Logging", package: "swift-log"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public struct KafkaProducerConfiguration {
/// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
/// Default: `false`
public var enableIdempotence: Bool = false

public var transactionalId: String?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some comments on this and what it does. If it only makes sense for the transaction producer we should not expose it all in this configuration and maybe have to introduce a new KafkaTransactionalProducerConfiguration

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think of some wrapper around producer configuration like the following?

struct KafkaTransactionalProducerConfiguration {
    var producerConfiguration: KafkaTransactionalProducerConfiguration
    public var transactionalId: String
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that works. Presumably there are some configurations that don't apply to a transactional producer. We probably should introduce a new configuration and just duplicate stuff like we did for the consumer and producer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you suggest which properties do you have in mind, please?


/// Producer queue options.
public var queue: KafkaConfiguration.QueueOptions = .init()
Expand Down Expand Up @@ -108,6 +110,9 @@ extension KafkaProducerConfiguration {
var resultDict: [String: String] = [:]

resultDict["enable.idempotence"] = String(self.enableIdempotence)
if let transactionalId {
resultDict["transactional.id"] = transactionalId
}
resultDict["queue.buffering.max.messages"] = String(self.queue.bufferingMaxMessages)
resultDict["queue.buffering.max.kbytes"] = String(self.queue.bufferingMaxKBytes)
resultDict["queue.buffering.max.ms"] = String(self.queue.bufferingMaxMilliseconds)
Expand Down
21 changes: 21 additions & 0 deletions Sources/SwiftKafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ public final class KafkaConsumer: Sendable, Service {
}
}
}

func client() throws -> RDKafkaClient {
return try stateMachine.withLockedValue { try $0.client() }
}
}

// MARK: - KafkaConsumer + StateMachine
Expand Down Expand Up @@ -548,5 +552,22 @@ extension KafkaConsumer {
return nil
}
}

func client() throws -> RDKafkaClient {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .initializing(let client, _):
return client
case .consuming(let client, _):
return client
case .consumptionStopped(let client):
return client
case .finishing(let client):
return client
case .finished:
throw KafkaError.client(reason: "Client is stopped")
}
}
}
}
48 changes: 46 additions & 2 deletions Sources/SwiftKafka/KafkaError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public struct KafkaError: Error, CustomStringConvertible {
}

static func rdKafkaError(
wrapping error: rd_kafka_resp_err_t, file: String = #fileID, line: UInt = #line
wrapping error: rd_kafka_resp_err_t, isFatal: Bool = false, file: String = #fileID, line: UInt = #line
) -> KafkaError {
let errorMessage = String(cString: rd_kafka_err2str(error))
return KafkaError(
Expand Down Expand Up @@ -143,6 +143,36 @@ public struct KafkaError: Error, CustomStringConvertible {
)
)
}

static func transactionAborted(
reason: String, file: String = #fileID, line: UInt = #line
) -> KafkaError {
return KafkaError(
backing: .init(
code: .transactionAborted, reason: reason, file: file, line: line
)
)
}

static func transactionIncomplete(
reason: String, file: String = #fileID, line: UInt = #line
) -> KafkaError {
return KafkaError(
backing: .init(
code: .transactionIncomplete, reason: reason, file: file, line: line
)
)
}

static func transactionOutOfAttempts(
numOfAttempts: UInt64, file: String = #fileID, line: UInt = #line
) -> KafkaError {
return KafkaError(
backing: .init(
code: .transactionOutOfAttempts, reason: "Out of \(numOfAttempts) attempts", file: file, line: line
)
)
}
}

extension KafkaError {
Expand All @@ -162,6 +192,10 @@ extension KafkaError {
case messageConsumption
case topicCreation
case topicDeletion
case transactionAborted
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking if we should add subcodes or provide code from Kafka as is..

case transactionIncomplete
case notInTransaction // FIXME: maybe add subcode ?
case transactionOutOfAttempts
}

fileprivate var backingCode: BackingCode
Expand All @@ -188,6 +222,12 @@ extension KafkaError {
public static let topicCreation = ErrorCode(.topicCreation)
/// Deleting a topic failed.
public static let topicDeletion = ErrorCode(.topicDeletion)
/// Transaction was aborted (can be re-tried from scratch).
public static let transactionAborted = ErrorCode(.transactionAborted)
/// Transaction could not be completed
public static let transactionIncomplete = ErrorCode(.transactionIncomplete)
/// Out of provided number of attempts
public static let transactionOutOfAttempts = ErrorCode(.transactionOutOfAttempts)

public var description: String {
return String(describing: self.backingCode)
Expand All @@ -206,17 +246,21 @@ extension KafkaError {
let file: String

let line: UInt

let isFatal: Bool

fileprivate init(
code: KafkaError.ErrorCode,
reason: String,
file: String,
line: UInt
line: UInt,
isFatal: Bool = false
) {
self.code = code
self.reason = reason
self.file = file
self.line = line
self.isFatal = isFatal
}

// Only the error code matters for equality.
Expand Down
88 changes: 85 additions & 3 deletions Sources/SwiftKafka/KafkaProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,42 @@ public final class KafkaProducer: Service, Sendable {
return KafkaProducerMessageID(rawValue: newMessageID)
}
}

func initTransactions(timeout: Duration = .seconds(5)) async throws {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we definitely want to create a new KafkaTransactionalProducer type so that the APIs stay clear and separated between the normal producer and the transaction one.

Additionally, I think the APIs should look a bit different. I can imagine something like this

public func withTransaction<Result>(_ body: @Sendable (KafkaTransaction) async throws -> Void) async throws -> Result {
    // init transaction
    // begin transaction
    // call out to user code and pass a new `Transaction` struct to them
    // at the end call abort or finish transaction for them
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes perfect sense to have a separate producer for transactions.
We have it at a higher level but that might be done here

guard config.transactionalId != nil else {
throw KafkaError.config(
reason: "Could not initialize transactions because transactionalId is not set in config")
}
let client = try self.stateMachine.withLockedValue { try $0.initTransactions() }
try await client.initTransactions(timeout: timeout)
}

func beginTransaction() throws {
let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
try client.beginTransaction()
}

func send(
offsets: RDKafkaTopicPartitionList,
forConsumer consumer: KafkaConsumer,
timeout: Duration = .kafkaUntilEndOfTransactionTimeout,
attempts: UInt64 = .max
) async throws {
let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
let consumerClient = try consumer.client()
try await consumerClient.withKafkaHandlePointer {
try await client.send(attempts: attempts, offsets: offsets, forConsumerKafkaHandle: $0, timeout: timeout)
}
}

func abortTransaction(
timeout: Duration = .kafkaUntilEndOfTransactionTimeout,
attempts: UInt64) async throws {
let client = try self.stateMachine.withLockedValue { try $0.transactionsClient() }
try await client.abortTransaction(attempts: attempts, timeout: timeout)
}


}

// MARK: - KafkaProducer + StateMachine
Expand Down Expand Up @@ -288,6 +324,18 @@ extension KafkaProducer {
source: Producer.Source?,
topicHandles: RDKafkaTopicHandles
)
/// The ``KafkaProducer`` has started and is ready to use, transactions were initialized.
///
/// - Parameter messageIDCounter:Used to incrementally assign unique IDs to messages.
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
/// - Parameter topicHandles: Class containing all topic names with their respective `rd_kafka_topic_t` pointer.
case startedWithTransactions(
client: RDKafkaClient,
messageIDCounter: UInt,
source: Producer.Source?,
topicHandles: RDKafkaTopicHandles
)
/// Producer is still running but the event asynchronous sequence was terminated.
/// All incoming events will be dropped.
///
Expand Down Expand Up @@ -355,7 +403,7 @@ extension KafkaProducer {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .started(let client, _, let source, _):
case .started(let client, _, let source, _), .startedWithTransactions(let client, _, let source, _):
return .pollAndYield(client: client, source: source)
case .consumptionStopped(let client):
return .pollWithoutYield(client: client)
Expand Down Expand Up @@ -398,6 +446,19 @@ extension KafkaProducer {
newMessageID: newMessageID,
topicHandles: topicHandles
)
case .startedWithTransactions(let client, let messageIDCounter, let source, let topicHandles):
let newMessageID = messageIDCounter + 1
self.state = .startedWithTransactions(
client: client,
messageIDCounter: newMessageID,
source: source,
topicHandles: topicHandles
)
return .send(
client: client,
newMessageID: newMessageID,
topicHandles: topicHandles
)
case .consumptionStopped:
throw KafkaError.connectionClosed(reason: "Sequence consuming events was abruptly terminated, producer closed")
case .finishing:
Expand All @@ -423,7 +484,7 @@ extension KafkaProducer {
fatalError("\(#function) invoked while still in state \(self.state)")
case .consumptionStopped:
fatalError("messageSequenceTerminated() must not be invoked more than once")
case .started(let client, _, let source, _):
case .started(let client, _, let source, _), .startedWithTransactions(let client, _, let source, _):
self.state = .consumptionStopped(client: client)
return .finishSource(source: source)
case .finishing(let client, let source):
Expand All @@ -443,13 +504,34 @@ extension KafkaProducer {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .started(let client, _, let source, _):
case .started(let client, _, let source, _), .startedWithTransactions(let client, _, let source, _):
self.state = .finishing(client: client, source: source)
case .consumptionStopped(let client):
self.state = .finishing(client: client, source: nil)
case .finishing, .finished:
break
}
}

mutating func initTransactions() throws -> RDKafkaClient {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .started(let client, let messageIDCounter, let source, let topicHandles):
self.state = .startedWithTransactions(client: client, messageIDCounter: messageIDCounter, source: source, topicHandles: topicHandles)
return client
case .startedWithTransactions:
throw KafkaError.config(reason: "Transactions were already initialized")
case .consumptionStopped, .finishing, .finished:
throw KafkaError.connectionClosed(reason: "Producer is stopping or finished")
}
}

func transactionsClient() throws -> RDKafkaClient {
guard case let .startedWithTransactions(client, _, _, _) = self.state else {
throw KafkaError.transactionAborted(reason: "Transactions were not initialized or producer is being stopped")
}
return client
}
}
}
Loading