This simple library allows you to create your own distributed transaction coordinator :
case class Message(...)
class CoordinatorImpl extends Coordinator[Message, TransactorImpl]
class TransactorImpl extends Transactor[Message, ProcessorImpl] { //2nd phase
override def complete = ...
override def rollback = ...
//def tid - transaction id (and also name of the actor)
}
class ProcessorImpl extends Processor[Task] { //1st phase
def process(r: Req[Message]) = Future{...} //should return Commit or Rollback vote
def complete(r: Req[Message]) = Future{...}
def rollback(r: Req[Message]) = Future{...}
}
where
- Message - is a message participating in transaction
- Coordinator - is a supervisor
- Transactor - implements 2PC protocol
- Processor - is an actor which actually does the job
Just implement complete/rollback to adopt this to your underlying system like JDBC, JMS or some other messaging protocol.
To send set of messages, participating in one distributed transaction, use:
val coordinator = system.actorOf(Props(classOf[CoordinatorImpl]), "coordinator")
val request = ReqSeq(tid, Seq(msg1, ..., msgN))
coordinator ! request
where
- tid - is unique id which correlates messages in XA transaction
If you want to reduce data from processors to some actor - just point them to one tid-scoped actor
class Receiver() extends Actor { ... }
//in your supervisor
context.child(tid) getOrElse context.actorOf(Props[Receiver], tid)
//in your message
case class Message(a: ActorRef, e: Option[Throwable], ...)
//in processor (data with same tid will arrive sequentially)
def process(r: Req[Message]) = r.req.a ? res
def complete(m: Req[Message]) = m.body.a ! Done
def rollback(m: Req[Message]) = m.body.a ! m.e //or m.a ! PoisonPill
Some protocols may send data chunk by chunk. You can use chunked transaction to have every chunk processed immediately:
import Helper._
import Helper.implic.defaultAskTimeout
implicit val expectations = 5.expected[Message]
val chunk1 = ReqSeq("100500", Seq(msg1, msg2, msg3))
coordinator ! chunk1
coordinator ! <something_else>
val chunk2 = ReqSeq("100500", Seq(msg4, msg5))
val result = coordinator ? chunk2
Expectation is the count of messages which expected to be received in current transaction. Note that you have to specify real expectation (full count of messages in transaction) only for last chunk - just use 0 for all previous.
You may want implement Merge trait to customize voting process or have end-of-transaction predicate more customized:
class MyMerge(acc: Seq[Msg], votes: Seq[Vote[Msg]]) extends Merge[Msg] {
def isFull: Boolean = ... //should be true if all parts of chunked transaction received
def mergeVotes = ... //returns Commit[T] _ or Rollback[T] _ based on votes
}
implicit val merge = MyMerge
val request = new ReqSeq(...)