diff --git a/primitive/message.go b/primitive/message.go index fd7e9c63..b330dc1d 100644 --- a/primitive/message.go +++ b/primitive/message.go @@ -59,6 +59,7 @@ const ( PropertyTranscationCheckTimes = "TRANSACTION_CHECK_TIMES" PropertyCheckImmunityTimeInSeconds = "CHECK_IMMUNITY_TIME_IN_SECONDS" PropertyShardingKey = "SHARDING_KEY" + PropertyTransactionID = "__transactionId__" ) type Message struct { diff --git a/producer/producer.go b/producer/producer.go index 65e39c2b..910bb23b 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -462,13 +462,20 @@ func (tp *transactionProducer) checkTransactionState() { if uniqueKey == "" { uniqueKey = callback.Msg.MsgId } + transactionId := callback.Msg.GetProperty(primitive.PropertyTransactionID) + if transactionId == "" { + transactionId = callback.Header.TransactionId + } + if transactionId == "" { + transactionId = callback.Msg.TransactionId + } header := &internal.EndTransactionRequestHeader{ CommitLogOffset: callback.Header.CommitLogOffset, ProducerGroup: tp.producer.group, TranStateTableOffset: callback.Header.TranStateTableOffset, FromTransactionCheck: true, MsgID: uniqueKey, - TransactionId: callback.Header.TransactionId, + TransactionId: transactionId, CommitOrRollback: tp.transactionState(localTransactionState), }