Skip to content

z archived 2.0 documentation

RuedigerMoeller edited this page Jun 22, 2015 · 1 revision

Getting started

setup

add kontraktor 2.xto your maven dependencies.

e.g. sample pom (ofc set your name in there)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>de.ruedigermoeller</groupId>
    <artifactId>kontraktor-basics-tutorial</artifactId>
    <version>1.0</version>

    <dependencies>
        <dependency>
          <groupId>de.ruedigermoeller</groupId>
          <artifactId>kontraktor</artifactId>
          <version>LATEST</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Hello Kontraktor

Now let's create a first actor.

package org.nustaq.konktraktor.tutorial.basics;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;

/**
 * Created by ruedi on 09.10.14.
 */
public class HelloKontraktor extends Actor<HelloKontraktor> {

    public void $sayMalHello(String toWhom) {
        System.out.println("Hello "+toWhom);
    }

    public static void main(String arg[]) {
        HelloKontraktor actor = Actors.AsActor(HelloKontraktor.class);
        actor.$sayMalHello("me");
        System.out.println("I am probably faster");
    }

}

Looks like a normal class. However if you run it, you'll see

I am probably faster
Hello me

This is because: Actors.AsActor(HelloKontraktor.class); trnasforms the given class into an Actor (by generating a wrapper and returning it). All public methods of an Actor get called asynchronous. So when you call $sayMalHello() the following happens:

  • the method is called on the generated wrapper object of the actor instance.
  • the wrapper creates an event object from the arguments of the call
  • the event object is put onto the actor's queue ("mailbox").
  • the execution or DispatcherThread associated with the actor reads the event from the mailbox and calls it on the "real" actor class instance.

This way, all execution inside an actor is kept single threaded.

As the main thread executing the main method reaches the System.out.println("I am probably faster"); faster than the enqueing + dequeueing of the event from the actor's mailbox is happening, you see "I am probably faster" before the "Hello" message once you run the program.

Because of the asynchronous nature of an actors public methods, these messages are not allowed to return values (will result in Runtime error when tried). The only return value allowed are Futures (see below).

####Using a Callback to receive a result async

In order to show the mechanics of using a callback to return a result asynchronous to the sender, I'll add a Main actor class. So we now have 2 actors, each with a dedicated execution thread.

import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Callback;
import static org.nustaq.kontraktor.Actors.*;

public class HelloKontraktor extends Actor<HelloKontraktor> {

    public void $init() {
        Thread.currentThread().setName("HelloContraktor");
    }

    public void $sayMalHello(String toWhom) {
        System.out.println("Hello "+toWhom);
    }

    public void $callMeBack( String whom, Callback<String> cb ) {
        System.out.println("processing method in thread '"+Thread.currentThread().getName()+"'");
        cb.receive("Hello "+whom+", from callback", null );
    }

    public static class Main extends Actor<Main> {

        public void $main() {
            Thread.currentThread().setName("Main");

            HelloKontraktor actor = AsActor(HelloKontraktor.class);
            actor.$init();

            actor.$sayMalHello("me");
            actor.$callMeBack( "me again", (result, error) ->
                System.out.println("thread '"+Thread.currentThread().getName()+"': "+result)
            );
            System.out.println("I might be faster");

        }
    }

    public static void main(String arg[]) {
        ((Main) AsActor(Main.class)).$main();
    }
}

Running this yields:

Hello me
I might be faster
processing method in thread 'HelloContraktor'
thread 'Main': Hello me again, from callback

Note that the receive() method of the HelloKontraktor actor is called from the thread associated with it, while the calling actor (Main) receives the callback in its own thread. So processing inside an actor stays single threaded even when receiving callbacks.

This is because Kontraktor scans the arguments of a public method for Callback types. Instead of directly invoking the callback method, it puts an event onto 'Main' mailbox which then is consumed + executed by the thread associated with 'Main'.

Kontraktor provides a way to do actor-based concurrency without requiring boilerplate code, however there are limits (performance reasons, instrumentation limits). So stick to built in Callback interface as described to avoid trouble.

Its possible to use self defined interfaces for actor callbacks using inThread() method or @InThread annotations, however this requires deeper understanding of underlying mechanics.

Futures

Futures are a somewhat similar concept to Callbacks, however they allow for more elegant and concise code. Basically a Future is a Callback created by the the callee. While a callback can receive a stream of results, a Future can receive a single result only.

Access actor state asynchronously

import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Future;
import org.nustaq.kontraktor.Promise;

public class WelcomeToTheFuture extends Actor<WelcomeToTheFuture> {

    String internalState = "internal State";

    public Future<String> $getInternalState() {
        return new Promise(internalState); // fulfilled promise
    }

    public static class FutureMain extends Actor<FutureMain> {

        public void $main() {

            WelcomeToTheFuture welcome = Actors.AsActor(WelcomeToTheFuture.class);

            // get internal state asynchronously
            welcome.$getInternalState().then( (result, err) -> System.out.println(result) );
        }
    }

    public static void main(String arg[]) {((FutureMain) Actors.AsActor(FutureMain.class)).$main();}
}

Instead of passing a Callback as an argument, one passes the 'callback' closure to the 'then' or 'map' method of the future.

This can also be chained, e.g. one can chain then(.).then(.).map(.) to achieve ordered processing in an asynchronous fashion without blocking execution threads. The closure is invoked once the Future is fulfilled.

From the view of the callee, a Promise (implements Future) is returned. ǹew Promise(result) is an immediately fulfilled promise, however its also possible to return an unfilfilled Promise p = new Promise();[..]; return p;, which is later fulfilled by calling .receive(result, error).

Example:

// get internal state asynchronously
welcome.$getInternalState().then( (result, err) -> System.out.println(result) );

welcome.$getInternalState()
   .map((result, err) -> {
       System.out.println("Map 0:"+result);
       return new Promise<>("Call returned:" + result);
   })
   .map( (result, err ) -> {
       System.out.println("Map 1:"+result);
       return new Promise<>(result.length());
   })
   .then( (result, error) -> System.out.println("then 0 received:"+result+".some other processing") )
   .then( (result, error) -> System.out.println("then 1 received:"+result+" finished") );

yields:

internal State
Map 0:internal State
Map 1:Call returned:internal State
then 0 received:28.some other processing
then 1 received:28 finished

Note that map() transforms the result while then() simply return the result originally received. As always: All callback closures get executed in the calling actors thread automatically.

Futures allow for higher order functionality to organize paralellism and concurrency.

e.g. adding the following to the sample above:

public Future $delayMe() {
    Promise unfulfilled = new Promise();
    delayed(1000, () -> unfulfilled.receive("done",null)); // built in utility
    return unfulfilled;
}

public static class FutureMain extends Actor<FutureMain> {

    public void $main() {

        WelcomeToTheFuture welcome = Actors.AsActor(WelcomeToTheFuture.class);
        // get internal state asynchronously
        welcome.$getInternalState().then( (result, err) -> System.out.println(result) );

        welcome.$delayMe().then( (r,e) -> System.out.println("delay finished") );

yields:

internal State
Map 0:internal State
Map 1:Call returned:internal State
then 0 received:28.some other processing
then 1 received:28 finished
delay finished

The $delay method returns an unfulfilled future which is later on triggered. Typical application of this occurs if a long running request to an external system is done (e.g. get some URL content).

Parallelism by custom scheduling

In an actor application, parallelism/concurrency is not solved using blocking mutexes and locks, instead kind of application defined "scheduling" of closures is used.

Example: A class which repeats an action X times with an interval of Y millis.

public class Repeater extends Actor<Repeater> {

    public Future $repeat(String tag, long millis, int count ) {
        Promise p = new Promise(); // unfulfilled
        // dummy implementation of some code:
        System.out.println("doing stuff "+tag+" "+count);
        if ( count > 0 ) {
            delayed( millis, () -> $repeat(tag, millis, count-1).then( (r,e) -> p.signal()) );
        } else {
            p.signal();
        }
        return p;
    }

    public static class RepeaterMain extends Actor<RepeaterMain> {

        public void $main() {
            Repeater rep = Actors.AsActor(Repeater.class);

            rep.$repeat("Task A", 1000, 5 ).then( (r,e) -> System.out.println("Task A finished"));
            rep.$repeat("Task B", 1500, 3 ).then( (r,e) -> System.out.println("Task B finished"));
            rep.$repeat("Task C", 500, 7 ).then( (r,e) -> System.out.println("Task C finished"));
        }

    }

    public static void main(String arg[]) {((RepeaterMain) Actors.AsActor(RepeaterMain.class)).$main();}
}

yields:

doing stuff Task A 5
doing stuff Task B 3
doing stuff Task C 7
doing stuff Task C 6
doing stuff Task A 4
doing stuff Task C 5
doing stuff Task B 2
doing stuff Task C 4
doing stuff Task A 3
doing stuff Task C 3
doing stuff Task C 2
doing stuff Task B 1
doing stuff Task A 2
doing stuff Task C 1
doing stuff Task C 0
Task C finished
doing stuff Task A 1
doing stuff Task B 0
Task B finished
doing stuff Task A 0
Task A finished

(Note: Still only one thread is used)

Example: A class which schedules a flow of events according to a given priority.

public class PriorityScheduler extends Actor<PriorityScheduler> {

    public static final int MAXPRIO = 100;
    LinkedList queues[] = new LinkedList[MAXPRIO+1];

    public Future schedule( int priority ) {
        Promise promise = new Promise();
        if ( queues[priority] == null ) {
            queues[priority] = new LinkedList();
        }
        queues[priority].add(promise);
        self().$run();
        return promise;
    }

    public void $run() {
        for (int i = 0; i < queues.length; i++) {
            LinkedList<Promise> queue = queues[i];
            queue.forEach( (promise) -> promise.signal() );
            queue.clear();
        }
    }

}

usage is then prioScheduler.$schedule( prio ).then( (r,e) -> doStuff() ). This means the scheduler does not actually execute the action, it just triggers futures in the correct order. Ofc this will only have effect if events come in at a high rate.

####using POSMs (Plain Old Synchronous Methods)

Typically the public methods of an actor make up the asynchronous interface of your actor. Inside your actor you can define non-public methods which work like normal synchronous java methods and are not restricted in any way. Ofc you might use ordinary classes to implement your application logic. Use actors to separate logical single threaded units or service interfaces.

The most common beginners error is to replace classes by actors. Use actors to replace Threads.

If an actor calls methods on this they are executed synchronously. An actor might use self() to execute its own method asynchronously.

    public static class Main extends Actor<Main> {
        public void $main() {
            init(); // sync

            HelloKontraktor actor = setupHelloActor(); // sync

            actor.$sayMalHello("me"); // async (foreign actor)
            actor.$callMeBack( "me again", (result, error) ->
                System.out.println("thread '"+Thread.currentThread().getName()+"': "+result)
            );
            System.out.println("I might be faster");

            $asyncMethod();      // synchronous call to own async actor method
            this.$asyncMethod(); // synchronous call to own async actor method
            self().$asyncMethod(); // ASYNC call on my own actor method
        }

        public void $asyncMethod() {
        }

        private HelloKontraktor setupHelloActor() {
            HelloKontraktor actor = AsActor(HelloKontraktor.class);
            actor.$init();
            return actor;
        }

        protected void init() {
            Thread.currentThread().setName("Main");
        }
    }

example to illustrate difference

public class MyActor ... {
    private void asyncTest() {
        self().$asyncMethod(1);
        self().$asyncMethod(2);
        $asyncMethod(3);
    }

    public void $asyncMethod(int num) {
        System.out.println("NUM:"+num);
    }
}

yields:

NUM:3
NUM:1
NUM:2

'Waiting' for several tasks to complete using yield()

This also shows how the Hoarde class can be used to speed up computation by using concurrent execution. Each YieldEcample Actor is executed in a separate thread.

public class YieldExample extends Actor<YieldExample> {

    public Future<Long> heavyComputing( int sumUp ) {
        long sum = 0;
        for ( int i = 0; i < sumUp; i++) {
            sum += i;
        }
        return new Promise<>(sum);
    }

    public static class YieldMain extends Actor<YieldMain> {

        public void $main() {
            Hoarde<YieldExample> hoarde = new Hoarde<YieldExample>(4, YieldExample.class);

            Future[] futures = hoarde.map((yieldExample, index) -> yieldExample.heavyComputing(10000*(index+1)));

            yield(futures).then( (fulfilledFutures , error ) -> {
                for (int i = 0; i < fulfilledFutures.length; i++) {
                    Future fulfilledFuture = fulfilledFutures[i];
                    System.out.println("result:" + fulfilledFuture.getResult() );
                }
                self().$stop(); // stop main only after haveing receiced the result
            });

            hoarde.each( (example)-> example.$stop() );
        }
    }

    public static void main(String arg[]) { ((YieldMain) Actors.AsActor(YieldMain.class)).$main();}
}

see also solution for [Dining Philosophers] (https://github.com/RuedigerMoeller/kontraktor/blob/trunk/src/examples/java/org/nustaq/kontraktor/examples/Dining.java) and Computing Pi

$sync()

Using the $sync().then( .. ) utility method one can ensure, that a receiving actor has processed all prior messages before executing the closure given in the then() method.

Ordering

asynchronous calls sent to the mailbox of an actor are executed FIFO. Callbacks received by an actor are put to a separate 'mailbox' ("Callback Queue") which is processed with higher priority than the mailbox. Reason is reduction of queue sizes when using more complex actor programs. Additionally this reduces the latency observed from a sending actor.

Internal representation

When `Actors.AsActor``is called, the following happens

  • a subclass .._ActorProxy of the actor class is generated
  • the subclass overrides all public methods with code to put a message onto the mailbox, arguments of public methods are scanned for callback's and futures in order to inject thread juggling code (remember if one actor calls back another, the initiator calls inside his thread, the receiver receives the callback in receiver's thread)
  • if a method violates Kontraktor's restrictions, a Runtime Exception is thrown along some explanation about the cause of violation.
  • non public methods of the actor proxy subclass are overridden with code throwing an exception "only public methods allowed .. "
  • An instance of the Actor class is created (using .newInstance(), require empty constructor), an instance of the generated subclass (_ActorProxy) is created and the 'real instance' is written to the proxy's __target field.

The reason for the trickery is to be able to refer to an Actor proxy like the "real" actor class. This keeps type safety, possibility to inherit actors, refactorability, searchability, IDE code completion without the need for boilerplating and handcrafted dispatch.

Using Java's Proxy Class would require to define an interface along each actor class, as message objects are created implicitely from the arguments of a method, no "Message Classes" have to be defined (as e.g. Akka requires). There is no need to do dispatching of messages, as each message (=queued method call) is directly is routed to its target method.

Proxy state vs Actor state

The ActorProxy also inherits all fields of the Actor class, these fields are unused ofc which can lead to some suprises when looking into the debugger or trying to use actor references (=the generated actor proxy) as a key in a hashmap (as equals will access the unitialized state).

See a debugger screenshot. Note that the value of field 'sampleState' is available only in the 'real' Actor instance pointed by __target.

There are tweaks to allow synchronous access to inner actor state (e.g. to enable equals, hashing, toString). However this leads to multithreaded access to inner actor state and should be used on immutable fields or at least only read volatile declared fields (see @CallersideMethod). Note transparent distributability (TCP, Http, WebSockets) gets lost once those tweaks are used.

####Actors and threads. Scheduling

Kontraktor actors use fixed size queues. By default each Actor gets a separate thread and Scheduler with fixed size. Using Actors.asActor(.., Dispatcher ) methods for actor creation its possible to

  • define a different queue size than default
  • force several actors to share the same thread
  • dispatch N actors to M threads automatially
cbt = Actors.AsActor(CBTActor.class, new ElasticScheduler( 1, MSG_QUEUE_SIZE));

or

ElasticScheduler sched = new ElasticScheduler( 4, 8000 ); // use max 4 threads (if needed)
a = Actors.AsActor(MyActorA.class, sched );
b = Actors.AsActor(MyActorB.class, sched );
c = Actors.AsActor(MyActorC.class, sched );

Elastic scheduler will automatically scale up depending on observed runtime behaviour.

####Deadlocks and Queue Sizes

Actors are not a silver bullet to solve concurrency issues, so its still possible to create deadlocks. If an actors mailbox is full, the calling actor is blocked, so if there is a complex mesh of communicating actors, things might go wrong.

The most common reason for this to happen is

  • you block execution inside one actor (Never ever put blocking code in async methods)
  • queues are too short

A rule of thumb is to have a course grained actor design, use a dedicated thread per actor (=default) if not performance relevant. Use multiple actors on a single thread only if this is performance critical. E.g. when doing a middleware handling 100's of NIO connections, it pays off to run 'connection handling' actors on the same Scheduler, it will not make sense to also run unrelated actors of this application it (increases risk of deadlocks).

Kontraktor's model of remote messaging

There are several adaptors to publish an Actor via a network interface (TCP, Http, WebSockets) as of version 2.0 only TCP remoting is mature enough to be used. Publishing an Actor requires all messages (actor methods) to have Serializable arguments and results, the actor itself does not have to implement Serializable, as Actor references are automatically transformed when passed to a remote location. Kontraktor uses fast-serialization internally, so you don't need to care for manual optimization (e.g. Externalizable), just make parameter classes 'Serializable' and take care not to transmit a whole application by accidental non-transient references (attention: anonymous classes!).

Of course some types such as Futures, Spores, Callbacks, refs to Actors are handled in a special way behind the scenes such that the application code does not have to care wether an Actor runs inside the same VM or remotely.

As a general advice: Its favourable to figure out which Actors/Services of your application might get distributed and favour a conservative design for those. Its not a problem to run an Actor locally even if has been planned to be run remotely. It can be vice versa.

The following can be problematic:

  • passing very large Objects
  • passing Actor References, Futures, Callbacks back and forth. Try to have them do a single hop only.
  • prefer a service-alike "Facade" pattern for the Actor published to the network.

Differences amongst local and remote Actors

  • Callbacks: A Callback can have multiple results. Therefore the network-marshalling layer cannot know wether a callback will receive further results. To signal there might be more results, Callback.CONT e.g. cback.receive(xy,Callback.CONT); must be passed as the error object. As soon the error object of a receive call has a null object or an Object different to Callback.CONT, the network marshalling layer will close the channel, so further callbacks made from remote won't be transmitted. Its best practice to always keep this convention, to have any Actor be remote-able
  • connection close: Once the connection is closed all remote references (actors, callbacks, futures) are invalid.

Plain TCP Remoting

Publishing an Actor via TCP:

KVServer servingActor = Actors.AsActor(KVServer.class);
servingActor.$init().onResult(result -> {
    try {
        TCPActorServer.Publish(servingActor, 7777);
    } catch (IOException e) {
        e.printStackTrace();
    }
}).onError( error -> ((Exception) error).printStackTrace() );

exposes the Actor class "KVServer" via TCP

Exposing an Actor as a JSon Webservice (ALPHA)

The Webservice interface is rather simple and require simple structured Objects (Pojo's, limited set of collections), if one is willing to dive into the details of fast-serializations KSON/JSON encoding, its possible to transmit reasonable complex Pojos as well without handcrafted en/decoding. There is also the possibility to use "HTTP POST"-method to invoke messages with more complex data to transmit.

  • Callbacks and Futures are translated to Request/(Streaming)Response.
  • Spore's don't work via Http
  • Request pipelining is done on top of Http internally to speed things up.

By default an internal WebServer is used. The 4K webserver provides a way using Netty to expose Actors via Http/WebSockets. (work in progress)

KVServer servingActor = Actors.AsActor(KVServer.class);
// http://localhost:8088/kvservice/method/arg0/arg1
RestActorServer.Publish("kvservice", 8088, servingActor);

see also blog post and example project http://java-is-the-new-c.blogspot.de/2014/12/a-persistent-keyvalue-server-in-40.html .

Calling Actors from JavaScript via WebSockets

requires 4K webserver (work in progress)

Calling Actors written in JavaScript from Java

requires 4K webserver (work in progress)

Short introduction to Actors

An actor (in Kontraktor) is similar to a class, except

  • a method call (== message) is executed asynchronous. To avoid confusion, as a convention all asynchronous methods are flagged with a '$' as first character of method name. Kontraktor automatically transforms a method call into a message which is put onto a queue (mailbox). The queue is processed asynchronous and single threaded.
  • all methods (=messages) of an actor are executed single threaded (no parallelism). There is no shared state (kontraktor allows to explicitely break this using annotations to allow synchronous access to e.g. immutable state of an actor)

Why Actor's

  • Actors provide an easier way to deal with concurrency compared to conventional multithreading/locking. Especially once a system grows, lock-based multithreaded software gets out of control quickly (race conditions, deadlocks, starvation due to lock contention)
  • actor based applications can be distributed with ease. Design for distributability. Decide at deployment time.
  • Threads are a limited resource. As network latency is bound to physics, there will be no improvement regarding latency regardless of hardware improvements.

A busy distributed system tends to have a lot of requests/messages in flight. If a thread is consumed for each (blocking) remote request (as with synchronous RPC/Messaging) the number of requests in flight is limited to the amount of threads your hardware is able to support. As the number of messages in flight easily can be in the hundreds of thousands in a busy distributed application, synchronous request/response always perform poorly (cpu is shuffeling threads, everybody is waiting ..). There is no magical solution to this, so its required to use an asynchronous style of programming (e.g. with Actors) when crossing machine (or even thread) boundaries in order to make use of your hardware's capabilities.

Even worse, cloud deployments and WAN-distributed applications will increase network hop latency even more so blocking, synchronous request/response throughput lowers to ridiculous levels (like < 50 per second per client).

messages vs methods

Current actor frameworks remind on first attempts of object orientation in C: a method was called handcrafted like 'receive( objectPtr, "methodName")' and the receiving class had kind of manual dispatch for incoming methods. In Kontraktor, all public methods of an actor class are automatically converted into asynchronous messages. The arguments of an asynchronous method make up a message object put on to the actors mailbox. This keeps code boilerplatefree. Refactoring, code completion and other common tools still work. Its also possible to subclass actors and override message handling (async methods).

// method call
int result = obj.getXY();

// actor message
actor.$getXY().then( (result, error) -> { .. } );

what is happening behind the scenes ?

Technically actors are implemented by associating a message queue with each instance of an actor class. The actor's excecution thread then will read messages (method calls) from this queue and process them in order. An actor instance divides into 2 parts.

  • the real implementation
  • an instrumented facade, the "Actor Ref". The actor refs gets generated behind the scenes and is a subclass of the Actor implementation class, but any public method call is translated into putting a message (made of the method's arguments) to the mailbox of the actor.

The actor ref only is accessible to outer code. This way its guaranteed no foreign threads accidentally access an actor's state concurrently.

this and self()

Inside an actor, 'this' denotes the actor implementation class, so calls on "this" work the same as in usual classes. However if an actor needs to pass a reference to itself to the outside or wants to enqueue a message to itself, self() can be used.

example for self() vs this

    public void $someActorMethod(int x) {
        $computeSomething(x);          // normal direct method call
        self().$computeSomething(x);   // enqueue message to self
    }

Type issues with self() when subclassing

Kontraktor uses some generics trickery in order to have the self() method return the type of the current class as one does not want to cast like ((ActorSubclass)self()).actorSubclassMethod(). Therefore its required to define an Actor class like

public class MyActor extends Actor<MyActor> { .. }

This way the return type of self() is correctly defined. This gets even more complicated once a class deriving Actor needs be subclassed. This can be done like this:

// abstract class extending actor
public abstract class AbstractService<T extends AbstractService> extends Actor<T> {
...
}

an Actor class subclassing this then is defined like:

public class ConcreteService extends AbstractService<ConcreteService> {
...
}

If this gets too complicated (deeper hierarchy of inheritance), its always possible to overload self() returning the appropriate return type like

public MyConcreteClass self() { return (MyConcreteClass)super.self(); }

Constraints

As Kontraktor generates classes at runtime, one should stick to the Callback interface defined by Kontrakor as this is recognized by the Runtime and treated special. Note that 'special' Classes like Callback are recognized only if part of the arguments of a method, not when nested inside an object graph (performance consideration, cannot deep scan object graph at runtime for each method call).

Overloading is not supported for public message-methods (so each method must have a distinct name). Overriding is supported.

Callbacks, Futures and Promises, Spore's

Callbacks are a common pattern of asynchronous communication/message passing. The built in Callback interface of Kontraktor looks like this

public interface Callback<T> extends Serializable
{
    public void receive(T result, Object error );
}

example of a message requiring a callback

public void $whatTimeIsIt( Callback<Long> result ) {
    result.receive( System.currentTimeMillis(), null );
}

example of a call to such a message:

anActor.$whatTimeIsIt( (res,err) -> System.out.println(res) );

The difference to "normal" non-actor callback usage is, that Kontraktor will automatically run the callback method inside the callers actor thread. This is important as a naive callback approach introduces multithreaded access to your actor state, as the callback method will run in a foreign thread.

Note: The built-in Callback class is recognized and transformed by Kontraktor (args of a message are scanned). In case of remoting, automatic Callback ID registration/deregistration is done behind the scenes. So defining your own Callback interfaces will not work or will work with limitations only (see @InThread).

A Future is like a callback created on receiver side. Futures enable some elegant programming patterns, though it requires some time to get used to them.

public Future<Long> $whatTimeIsIt() {
    return new Promise(System.currentTimeMillis());
}

the sender then does:

anActor.$whatTimeIsIt()
    .then( (res,err) -> System.out.println("Time is:"+res) );
    // .then( .. ) .map( .. )

or (more convenient but slower)

anActor.$whatTimeIsIt()
    .timeoutIn(1000)
    .onResult( res -> System.out.println("Time is:"+res) )
    .onError(  err -> System.out.println("Error is:"+err) )
    .onTimeout( to -> System.out.println("Timeout !") );

What's happening:

  • when the sender invokes the $whatTimeIsIt() message, it synchronously obtains a future object (generated by Kontraktor behind the scenes).
  • Once the message is taken+processed from the mailbox of the receiving actor, the result of the "Promise" is automatically forwarded to the Future object held by the sender. The "then" method of the future gets called int the processing thread of the sender (guarantee of single-threadedness).

A future having obtained a result is called "fulfilled". The "Promise" object can be passed to further processing, "fulfillment" can be arbitrary delayed.

example: returning an unfulfilled promise

public Future<Long> $whatTimeIsItIn(long millis) {
    Promise<Long> p = new Promise<>(); // unfulfilled
    // fulfill the promise with a delay
    delayed( 
      millis, 
      () -> p.receive( System.currentTimeMills, null /*no error*/) );
    return p; // unfulfilled
}

or like this

public Future<Long> $whatTimeIsItInInternet() {
    Promise<Long> p = new Promise<>(); // unfulfilled
    // fulfill the promise with a delay
    $crawlInternetForCurrentTime().then( 
      (r,e) -> p.receive( r, e ); // fulfil promise
    );
    return p; // unfulfilled
}

public Future<Long> $crawlInternetForCurrentTime(long millis) {
    Promise<Long> p = new Promise<>(); // unfulfilled
    // crawl internet, when time found p.receive( time, null );
    [..]
    return p; // unfulfilled
}

the unfulfilled promise in this example is fulfilled once the internet crawling message has been processed successful.

yield

yield( future, future, ..).then(.) allows to execute the given then-closure once all futures have completed and receives an array of completed futures. Their result can be acessed using future[i].getResult().

streaming

In contradiction to a future, a callback may receive several result objects. When planning to do this with remoted actors, the sender of a callback has to send "Actor.CONT" as an error object for each non-final response to signal more results might be received. If an error is signaled or the error object is null, internal housekeeping will release the data structures required to route a callback result coming in from a remote actor. E.G. callback.receive( someObject, CONT ); callback.receive( someObject, CONT ); callback.receive( someObject, null );

####Spore's

Note: The idea of sending code to data instead vice versa is not new (e.g. query languages like SQL, map reduce), however I took the name "Spore" from a Scala lib to avoid introducing another terminology.

The original actor model defines a pure asynchronous message passing system. It misses one important pattern.

You can

  • move data to code (send/receive messages from an actor)
  • move code to data (e.g. map-reduce, sql)

The idea is not exactly new, however with statically compiled languages this is hard to implement. Using Kontraktor "Spores" we can pass a snipped of code to an (e.g. remote) actor. This snipped of code then works on the receiving actors data (using the receiving actors thread) and talks back to the sender via a callback "channel".

In distributed systems its cheaper to send a snipped of code and let it work on the data in a remote actor instead of streaming the a data set over the wire to the receiving actor to do some computation.

An examples is query-like functionality. A remote actor owns a big collection and you want to compute some aggregation or just filter the collection and send some results back.

store.$stream(
    new Spore() {
        // LOCAL init is executed at sender side
        int hits; // state used in remote
        {
            hits = 0; // intialize / capture state on caller side
        }
        // REMOTE method executed remotely/in foreign actor, receiver side
        // you can access any state captured (held by the spore object directly)
        public void remote(Object input) {
            if ( input instanceof SampleRecord &&
                ((SampleRecord) input).getHits() > 10 && 
                ((SampleRecord) input).getHits() < 100 ) {
                    receive(input, Callback.CONT); // send back
                    hits++;
                }
            } else {
                if ( Callback.FIN.equals(input) ) {
                    // send back and signal finish
                    receive(hits,Callback.FIN);
                }
            }
        }
    }.then( (result, error) -> {
            if ( Callback.FIN.equals(error) ) {
                System.out.println("Hits:"+result);
            }
            System.out.println("local received match "+result);
    });
);

message declaration on receiver side:

public void $stream( Spore spore ) {
    // stream data to the spore
    store.values().forEachRemaining( (v) -> spore.remote(v) );
    // signal finish to spore 
    spore.remote(Callback.FIN);
}

Constraint A spore must not access fields of its enclosing class as the hidden this$0 reference is automatically cleared during serialization. This is not a real constraint, you can capture any state from the outer class as shown above in the initializer block of the Spore which is executed on caller side. Note you can introduce parallelism accidentally by accessing the parent class from an anonymous spore.

Update since kontraktor 2.0-beta3 its recommended to listen to remote results in a different way like:

store.$stream(
    new Spore() {
        // REMOTE method executed remotely/in foreign actor, receiver side
        // you can access any state captured (held by the spore object directly)
        public void remote(Object input) {
            [...]
        }
    }.then( (result, error) -> {
        // LOCAL this method is invoked on sender side and receives
        // the results evaluated remotely by the spore
    });
);

This avoids nasty issues caused by accidental serialization of local state/lambdas.

Actor Lifecycle

Actors need explicit lifecycle management. $stop() terminates an actor. $close terminates an actor and in addition closes the associated network connection (in case of remoting). As one can handout multiple remote actor references via a single connection, calling $stop on a remote actor will just stop the actor, $close will close the connection and therefore invalidate all actor references obtained/routed via the associated connection.

@Callerside, @InThread

For pragmatice reasons, its possible to declare methods @Callerside. This means they are executed in the calling actors thread. They are executing directly on the actor-proxy instance, so they cannot access fields of the actor (they will be empty/unitialized). However they may invoke messages on the target actor.

There are two major applications for this

  • utility methods, which finally delegate to 'non-@callerside' methods.
  • breaking the encapsulation and access local actor state. This can be useful to access final fields of an actor e.g. to put an actor to a hashmap. Ofc an application may also decide to allow synchronous access to synchronized/threadsafe fields of an actor.

As @Callerside methods execute on the actor-proxy instance, its required to delegate to the real actor instance. e.g.:

@Callerside public int getMyActorId() {
    return getActor().actorId;
}

Warning 1 this does not work when using a remoted actor.

Warning 2 this way concurrent (multiple threads) access to local actor state is possible. You need to apply traditional thread-safe programming patterns then (volatile, atomic's, ConcurrentHashMap).

@InThread allows to wrap interfaces into a thread-changing proxy. Its used by Kontraktor internally to implement core interfaces like Callback Check the sourcecode of kontraktor for examples.

Misc (blocking code, timers)

Never block an actors execution thread as this will easily starve execution of other actors or create deadlocks because the mailbox of the blocked actor does not accept messages anymore.

In order to call blocking code (e.g. a synchronous webservice, synchronous database interface, access to remote file system) use Actor.exec:

public void $myActorMethod(..) {
   exec( () -> return dataBase.doBlockingOperation() )
     .onResult( r -> ... ) 
     .onError( e -> ... );
   // or just '.then( (res, err) -> ... )'
}

Its possible to execute a closure with delay using delayed. Example (from 4K source):

    public void $runPing(long millis) {
        if ( System.currentTimeMillis() - lastPong >= NUM_MISSING_PONGS_FOR_TIMEOUT * millis ) {
            server.removeSession(context);
            self().$onClose();
            self().$stop();
        }
        if ( ! isStopped() ) {
            server.sendWSPingMessage(context);
            // trigger next ping
            delayed(millis, () -> $runPing(millis)); 
        }
    }

Scheduling Actors onto Threads

Use course grained Actor designs. Actors are meant to replace threads, not to replace classes.

ElasticScheduler

By default each Actor gets a dedicated ElasticScheduler having a default queue size of 32768 (see ElasticScheduler.DEFAULTQSIZE) and a single thread assigned. Its possible to assign several Actors to the same ElasticScheduler instance. A M to N mapping of actor to threads can achieved by assigning a number of threads (default is 1) to a shared ElasticScheduler.

ElasticScheduler then laod balances N actors to M thread using a unique runtime profiling based dispatching algorithm which minimizes context switches and cache misses. Compared to plain java.*.Executor driven dispatching actor messages on to random threads (=Cache miss+Context switches), kontraktor's ElasticScheduler keeps a dynamically recomputed Actor-to-Thread affinity in order to achieve maximum performance (=reduce context swicthes and cache misses).

If an ElasticScheduler has assigned e.g. 4 threads and 50 actors, it will initially try to run all actors on a single thread, adding new threads (up to 4 in this example) only if current threads cannot handle the load. As (backoff'ed) spin looping is used to poll actor queues, this also reduces CPU usage in low load scenarios.

details see following blog-posts:

http://java-is-the-new-c.blogspot.de/2014/10/alternatives-to-executors-when.html

http://java-is-the-new-c.blogspot.de/2014/10/experiment-cache-effects-when.html

http://java-is-the-new-c.blogspot.de/2014/10/follow-up-executors-and-cache-locality.html

QueueSizes

Deadlocks

Dealing with stopped/blocked actors

Logging

By default, logging is done to sysout. (Note there are also some global flags to increase verbosiness in case you run into trouble e.g. ElasticScheduler.DEBUG_SCHEDULING in case of deadlock trouble or DispatcherThread.DUMP_CATCHED to let kontraktor print stacktraces on exceptions put into a future.

Kontraktor logs asynchonously. In order to redirect log to your favourite logger its possible to plug in an adaptor (example of redirecting log to a remote node):

Log.Lg.$setLogWrapper(
     (Thread t, int severity, Object source, Throwable ex, String msg) -> {
         String exString = null;
         if (ex != null) {
             StringWriter sw = new StringWriter();
             PrintWriter pw = new PrintWriter(sw);
             ex.printStackTrace(pw);
             exString = sw.toString();
         }
         if (actor.isStopped()) {
             if (Log.Lg.getSeverity() <= severity)
                 Log.Lg.defaultLogger.msg(t, severity, source, ex, msg);
         } else {
             actor.$remoteLog(severity, nodeId + "[" + source + "]", exString == null ? msg : msg + "\n" + exString);
         }
     }
);
Log.Lg.warn(this, " start logging from " + nodeId);

Note this only replaces the internal (synchronous) logwriter, the asynchronous handling of logging is kept.

Remoting

Overview + Remoterefs model

Kontraktor has a different model for actor distribution than the original actor model proposes. The original actor model assumes, actors can reach each other via a uniform network infrastructure.

In Kontraktor, a point to point chain is built implicitely. An actor reference (proxy) to a remote actor routes all calls using the connection it was obtained from. This means if an actorref is passed from A=>B=C, a message send onto the actor ref from process C will take the route C=>B=>A (and reverse in case of a callback or future returned).

Advantages are:

  • network can be heterogenous (e.g. some actors are reachable via LAN, some are reachable via websockets or plain http, still they can interact).
  • there is no need for a central registry tracking the location of actors

Disadvantages are, that one has to avoid passing remote references along several hops, as this will create overhead and latency depending of the number of hops. As I believe its convenient to have a high level of abstractions, its still true that fully "transparent" remoting isn't really achievable in practice. Philosophy of Kontraktor is to design an application distributable upfront (by adding "remoteable" Actor-Interfaces even when running single process). Depending on real world deployment and performance requirements one then may choose level of distribution at deployment time. Remoteable Actors should avoid handing out/requiring actor refs in their public service interface in order to avoid strong coupling and message routing overhead. Service-Style interface design is preferable here (not mandatory ofc).

In case e.g. a Gateway component acting as a mediator facade to assign (sharded) backend services to client applications, handing out remote refs to backend actors does not inflict overhead as there will be kind of routing / message forwarding anyway.

So Kontraktors distribution model matches real infrastructire topology better compared to the vision of a homogenous network topology.

Access Control/Security

Frequently not all messages available on an actor interface should made available to remote clients. Kontraktor provides the @Local annotation to flag 'unremotable' methods. Additionally, this can be configured (depends on transport stack, e.g. 4K server allows to forbid/allow methods available for web clients by configuration files). One probably would like clients to disallow calling $stop() on a remote actor ;).

TCP

Exposing actors via TCP is straight forward. Note all arguments must be serializable.

See example code here:

https://github.com/RuedigerMoeller/advcalendar2014/blob/master/src/main/java/keyvalue/KVServer.java

related blog post: http://java-is-the-new-c.blogspot.de/2014/12/a-persistent-keyvalue-server-in-40.html

see 'Getting started with Kontraktor remoting' for details

Http/WebServices

its possible to generically expose an actor via http. JSon / KSon encoding is used then for communication. As string encodings and HTTP protocol semantics add additional constraints, this can be used with simple structured actors only (no passing of references etc.). This feature is not officially supported in 2.0 as there is neither enough documentation nor enough test coverage. I actually have a working application allowing to expose Java actors to JavaScript clients seamlessly over websockets (even implementation of actors in JS + calling them from Java is possible).

####netty adaptor, Websockets alpha, see 4k webserver project (alpha) I am currently also investigating ways to plug into new jee 7 servlet 3.1 async features.

Message Bus, Many to Many

fast-cast could be used for basic 1:N reliable UDP messaging. A convenience wrapper built upon kontraktor might be released later (2015).