Abstract : Nearly 40 years ago Carl Hewitt, Peter Bishop and Richard Steiger introduced the actor model. Since then it has been built in some languages (such as Scala, Erlang, etc.) and has been implemented by several frameworks. One (of not so many) Java actor frameworks is the JActor framework - "a high-throughput Java Actor framework" (as described by its author Bill la Forge). In this post we are going to take a (brief) look at JActor framework by building a simple backbone for chatting (j)actors.

Goal : Build a simple chat application using JActor framework

Acknowledgement : My gratitude goes to the open source community and especially to:

Bill la Forge creator of JActor framework

* Code *: Project code can be found @ GitHub under Apache License, Version 2.0

JActor is based on the idea that mailboxes should be handled by separate threads, but not the actors. This means that actors sharing common mailbox will be communicating quite fast and without the need of memory synchronisation, since everything is happening locally to the thread. This on the other hand brings us to two different basic cases:
1. Actors sharing a mailbox
2. Actors with different mailboxes

For our chat scenario we will build two types of actors in order to capture the two basic cases. The first one (ChatActor) would represent the actual, physical users of our system while the second type (StorageActor) will represent a storage (such as a DB, File system, etc.). ChatActors will exchange messages using the same mailbox (and thus the same thread) while a ChatActor and StorageActor will exchange messages using two different mailboxes (possibly two different threads).

NOTE: For more information about JActor, take a look at the short (20 slides) presentation written by Bill la Forge available on-line. For more code samples, check:
* JActor in action
* IBM's developerworks blog of Bill la Forge

* Maven dependencies: *

This project is build using Maven and several libraries, namely JActor v4.1.0, Logback, Google's implementation of JSR305, Google's Guava, cglib (needed by Spring), Spring framework, FEST-Assert, FEST-Reflect, JUnit, and Mockito. The maven pom.xml looks as follows:

{% codeblock pom.xml lang:xml %}

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0

<groupId>org.ingini.jactor</groupId>
<artifactId>jactor-chat</artifactId>
<version>1.0</version>

<properties>
    <cglib.version>2.2</cglib.version>
    <spring.version>3.1.1.RELEASE</spring.version>
    <logback.version>1.0.6</logback.version>
    <fest-reflect.version>1.4</fest-reflect.version>
    <fest-assert.version>1.4</fest-assert.version>
    <jactor.version>4.1.0</jactor.version>
    <jsr305.version>2.0.1</jsr305.version>
    <guava.version>13.0.1</guava.version>
    <junit.version>4.10</junit.version>
    <mockito-all.version>1.9.0</mockito-all.version>
</properties>

<dependencies>

    <!-- JActor framework -->
    <dependency>
        <groupId>org.agilewiki.jactor</groupId>
        <artifactId>jactor</artifactId>
        <version>${jactor.version}</version>
    </dependency>

    <!-- Logging -->
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>${logback.version}</version>
    </dependency>

    <!-- Tools -->
    <dependency>
        <groupId>com.google.code.findbugs</groupId>
        <artifactId>jsr305</artifactId>
        <version>${jsr305.version}</version>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>${guava.version}</version>
    </dependency>

    <dependency>
        <groupId>cglib</groupId>
        <artifactId>cglib-nodep</artifactId>
        <version>${cglib.version}</version>
    </dependency>

    <!-- Spring dependencies -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring.version}</version>
    </dependency>

    <!-- Test dependencies -->
    <dependency>
        <groupId>org.easytesting</groupId>
        <artifactId>fest-reflect</artifactId>
        <version>${fest-reflect.version}</version>
    </dependency>

    <dependency>
        <groupId>org.easytesting</groupId>
        <artifactId>fest-assert</artifactId>
        <version>${fest-assert.version}</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>${spring.version}</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>${junit.version}</version>
    </dependency>

    <dependency>
        <groupId>org.mockito</groupId>
        <artifactId>mockito-all</artifactId>
        <version>${mockito-all.version}</version>
    </dependency>

</dependencies>

{% endcodeblock %}

*Step 1: The ChatActor *

Before we define our ChatActor we need some identifier which will allow us to create and find an actor for a user. So we define a simple UserId type:

{% codeblock UserId.java lang:java %} package org.ingini.jactor.chat.domain;

import com.google.common.base.Objects;

import javax.annotation.concurrent.Immutable;

@Immutable public class UserId {

private final long code;

public UserId(long code) {
    this.code = code;
}

public long getCode() {
    return code;
}

@Override
public int hashCode() {
    return Objects.hashCode(code);
}

@Override
public boolean equals(Object obj) {
    if(!(obj instanceof UserId)) {
        return false;
    }
    return Objects.equal(this.code, ((UserId) obj).code);
}

@Override
public String toString() {
    return String.valueOf(code);
}

} {% endcodeblock %}

Likewise, we can define a MessageId:

{% codeblock MessageId.java lang:java %} package org.ingini.jactor.chat.domain;

import com.google.common.base.Objects;

import javax.annotation.concurrent.Immutable;

@Immutable public class MessageId {

private final long code;

public MessageId(long code) {
    this.code = code;
}

public long getCode() {
    return code;
}

@Override
public int hashCode() {
    return Objects.hashCode(code);
}

@Override
public boolean equals(Object obj) {
    if(!(obj instanceof MessageId)) {
        return false;
    }
    return Objects.equal(this.code, ((MessageId) obj).code);
}

@Override
public String toString() {
    return String.valueOf(code);
}

} {% endcodeblock %}

You can think of these identifiers as a type-safe, sequence based artifacts for implementing the identification aspect of our system. Once we have them, we can define a ChatActor type, which (as mentioned earlier) will represent an actual, physical user of our system:

{% codeblock ChatActor.java lang:java mark:29,36,45 %} package org.ingini.jactor.chat.actor;

import com.google.common.collect.Maps;
import org.agilewiki.jactor.Actor;
import org.agilewiki.jactor.RP;
import org.agilewiki.jactor.lpc.JLPCActor;
import org.ingini.jactor.chat.domain.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.TimeUnit;

public class ChatActor extends JLPCActor {

private final Logger logger = LoggerFactory.getLogger(this.getClass());

private final UserId id;

private final Map<MessageId, ChatMsg> messages = Maps.newHashMap();

private final Actor storageActor;

public ChatActor(UserId id, Actor storageActor) {
    this.id = id;
    this.storageActor = storageActor;
}

public void process(ChatMsg req) throws Exception {
    TimeUnit.SECONDS.sleep(2);
    logger.info("Actor {} received new msg ({}) from " + req.getSender(), id, req.getContent());

    messages.put(req.getId(), req);
}

public Confirmation get(MsgConfirmation req) throws Exception {
    logger.info("Confirming message {}", req.getMessageId());

    logger.info("Start sleeping");
    TimeUnit.SECONDS.sleep(2);
    logger.info("Stop sleeping");
    return messages.containsKey(req.getMessageId()) ? Confirmation.YES : Confirmation.NO;
}

public void persist(MessageId messageId) throws Exception {
    PersistChatMsg persistChatMsg = new PersistChatMsg(messages.get(messageId));
    persistChatMsg.send(this, storageActor, new RP<MessageId>() {
        @Override
        public void processResponse(MessageId messageId) throws Exception {
            logger.info("Removing message {}", messageId);
            messages.remove(messageId);
        }
    });
}

} {% endcodeblock %}

In the actor we have three methods which will expose three different ways of communication:
1. The void process(ChatMsg req) (line 29) is of the type fire-and-forget. This non-blocking behavior is tested in the TestChat.testProcess() method.
2. The Confirmation get(MsgConfirmation req) (line 36) and the TestChat.testGet() shows how you can use a JAFuture instance to wait for the result of a message processing. In this example we wait for a Confirmation result when processing a MsgConfirmation message. Note that there is a second, type-safe way of getting the result by invoking MsgConfirmation.send(future, actor), this can be seen in the TestChat.testPersist().
3. The void persist(MessageId messageId) (line 45) shows a blocking communication between two actors, each one with it's own mailbox. Take a look at the TestChat.testPersist() for a use-case scenario.

Quick side-note: In the course of this post you will see several places with TimeUnit.SECONDS.sleep(*);. The purposes of this is to make the blocking or non-blocking behavior more evident.

*Step 2: Fixing the messages *

Our model representation of the chat data (that is going to be exchanged between the chat actors) is encapsulated in a ChatMsg type:

{% codeblock ChatMsg.java lang:java %} package org.ingini.jactor.chat.domain;

import org.agilewiki.jactor.Actor;
import org.agilewiki.jactor.RP;
import org.agilewiki.jactor.lpc.JLPCActor;
import org.agilewiki.jactor.lpc.Request;
import org.ingini.jactor.chat.actor.ChatActor;

import javax.annotation.concurrent.Immutable;

@Immutable public class ChatMsg extends Request {

private final MessageId id;
private final UserId sender;
private final String content;

public ChatMsg(MessageId id, UserId sender, String content) {
    this.id = id;
    this.sender = sender;
    this.content = content;
}

@Override
public boolean isTargetType(Actor targetActor) {
    return targetActor instanceof ChatActor;
}

@Override
public void processRequest(JLPCActor targetActor, RP rp) throws Exception {
   ((ChatActor) targetActor).process(this);

    //Finished processing msg
    rp.processResponse(null);
}

public MessageId getId() {
    return id;
}

public String getContent() {
    return content;
}

public UserId getSender() {
    return sender;
}

@Override
public String toString() {
    return "ChatMsg{" +
            "sender=" + sender +
            ", content='" + content + '\'' +
            '}';
}

} {% endcodeblock %}

For our example we also need a MsgConfirmation type which we use to confirm the presents of a message in a ChatActor's messages map (see ChatActor.java, line 20):

{% codeblock MsgConfirmation.java lang:java %} package org.ingini.jactor.chat.domain;

import org.agilewiki.jactor.Actor;
import org.agilewiki.jactor.RP;
import org.agilewiki.jactor.lpc.JLPCActor;
import org.agilewiki.jactor.lpc.Request;
import org.ingini.jactor.chat.actor.ChatActor;

import javax.annotation.concurrent.Immutable;

@Immutable public class MsgConfirmation extends Request {

private final MessageId messageId;

public MsgConfirmation(MessageId messageId) {
    this.messageId = messageId;
}

public MessageId getMessageId() {
    return messageId;
}

@Override
public boolean isTargetType(Actor targetActor) {
    return targetActor instanceof ChatActor;
}

@Override
public void processRequest(JLPCActor targetActor, RP rp) throws Exception {
    ChatActor chatActor = (ChatActor) targetActor;
    rp.processResponse(chatActor.get(this));
}

} {% endcodeblock %}

The Confirmation is a simple enum:
{% codeblock MsgConfirmation.java lang:java %} package org.ingini.jactor.chat.domain;

public enum Confirmation {
YES, NO } {% endcodeblock %}

We need one more message type in order to support the communication between a ChatActor and a StorageActor. This is the PersistChatMsg message:
{% codeblock PersistChatMsg.java lang:java %} package org.ingini.jactor.chat.domain;

import org.agilewiki.jactor.Actor;
import org.agilewiki.jactor.RP;
import org.agilewiki.jactor.lpc.JLPCActor;
import org.agilewiki.jactor.lpc.Request;
import org.ingini.jactor.chat.actor.StorageActor;

import javax.annotation.concurrent.Immutable;

@Immutable public class PersistChatMsg extends Request {

private final ChatMsg message;
public PersistChatMsg(ChatMsg message) {
    this.message = message;
}

@Override
public boolean isTargetType(Actor targetActor) {
    return targetActor instanceof StorageActor;
}

@Override
public void processRequest(JLPCActor targetActor, RP rp) throws Exception {
   ((StorageActor) targetActor).process(this, rp);
}

public ChatMsg getMessage() {
    return message;
}

} {% endcodeblock %}

Step 3: StorageActor

We can't do without our StorageActor, defined as follows:

{% codeblock StorageActor.java lang:java %} package org.ingini.jactor.chat.actor;

import org.agilewiki.jactor.RP;
import org.agilewiki.jactor.lpc.JLPCActor;
import org.ingini.jactor.chat.domain.PersistChatMsg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

public class StorageActor extends JLPCActor {

private final Logger logger = LoggerFactory.getLogger(this.getClass());

public void process(PersistChatMsg persistChatMsg, RP rp) throws Exception {
    TimeUnit.SECONDS.sleep(3);
    logger.info("Message persisted!");
    rp.processResponse(persistChatMsg.getMessage().getId());
}

} {% endcodeblock %}

Step 4: Gluing everything together

Now that we have all pieces of the puzzle, we can start putting them in the right places. For this purpose, we need a Spring Java-based configuration class:

{% codeblock JActorConfig.java lang:java %} package org.ingini.jactor.chat.conf;

import org.agilewiki.jactor.Actor;
import org.agilewiki.jactor.JAMailboxFactory;
import org.agilewiki.jactor.Mailbox;
import org.agilewiki.jactor.MailboxFactory;
import org.ingini.jactor.chat.actor.StorageActor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration @ComponentScan({"org.ingini.jactor.chat.conf", "org.ingini.jactor.chat.service", "org.ingini.jactor.chat.actor", "org.ingini.jactor.chat.domain" }) public class JActorConfig {

@Autowired
private MailboxFactory messageMailboxFactory;

@Bean
public MailboxFactory getMessagesMailboxFactory() {
    double blockingCoefficient = 0.2D; // Almost non-blocking Chat processing logic
    return JAMailboxFactory.newMailboxFactory(
            Math.round((float) (Runtime.getRuntime().availableProcessors() / (1 - blockingCoefficient))));
}

@Bean
public Mailbox chatMailbox() {
    return messageMailboxFactory.createMailbox();
}

@Bean
public Actor storageActor() throws Exception {
    StorageActor storageActor = new StorageActor();
    storageActor.initialize(messageMailboxFactory.createMailbox());
    return storageActor;
}

} {% endcodeblock %}

... and a service for creating and finding ChatActors:

{% codeblock JActorService.java lang:java %} package org.ingini.jactor.chat.service;

import com.google.common.base.Objects;
import org.agilewiki.jactor.Actor;
import org.agilewiki.jactor.Mailbox;
import org.ingini.jactor.chat.actor.ChatActor;
import org.ingini.jactor.chat.domain.UserId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.concurrent.ThreadSafe;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Service @ThreadSafe public class JActorService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());

private final ConcurrentMap<UserId, ChatActor> actorMap = new ConcurrentHashMap<UserId, ChatActor>();

@Autowired
private Mailbox chatMailbox;

@Autowired
private Actor storageActor;

public ChatActor findOrInit(UserId id) {

    ChatActor clientActor;
    if (!actorMap.containsKey(id)) {
        clientActor = new ChatActor(id, storageActor);

        try {
            clientActor.initialize(chatMailbox);
        } catch (Exception e) {
            logger.error("Problem while initializing actor {}", id, e);
            throw new IllegalStateException(e);
        }
        return Objects.firstNonNull(actorMap.putIfAbsent(id, clientActor), clientActor);
    }

    return actorMap.get(id);
}

} {% endcodeblock %}

Step 5: Testing

Finally, here is our test case:

{% codeblock TestChat.java lang:java %} package org.ingini.jactor.chat;

import org.agilewiki.jactor.Actor;
import org.agilewiki.jactor.JAFuture;
import org.fest.reflect.core.Reflection;
import org.ingini.jactor.chat.actor.ChatActor;
import org.ingini.jactor.chat.conf.JActorConfig;
import org.ingini.jactor.chat.domain.*;
import org.ingini.jactor.chat.service.JActorService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import static org.fest.assertions.Assertions.assertThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;

@ContextConfiguration(classes = {JActorConfig.class}) @RunWith(SpringJUnit4ClassRunner.class) public class TestChat {

@Autowired
private JActorService jActorService;

@Test
public void testProcess() throws Exception {
    //GIVEN
    UserId jactor101Id = new UserId(101);
    ChatActor jactor101 = jActorService.findOrInit(jactor101Id);
    UserId jactor201Id = new UserId(201);
    ChatActor jactor201 = jActorService.findOrInit(jactor201Id);

    Logger logger101Mock = mock(Logger.class);
    Reflection.field("logger").ofType(Logger.class).in(jactor101).set(logger101Mock);
    Logger logger201Mock = mock(Logger.class);
    Reflection.field("logger").ofType(Logger.class).in(jactor201).set(logger201Mock);

    //WHEN
    ChatMsg chatMsg201 = new ChatMsg(new MessageId(1), jactor201Id, "Hi, I'm 201!");
    chatMsg201.sendEvent(jactor101);

    ChatMsg chatMsg101 = new ChatMsg(new MessageId(2), jactor101Id, "Hi, I'm 101!");
    chatMsg101.sendEvent(jactor201);

    //THEN
    verify(logger101Mock, never()).info(eq("Actor {} received new msg ({}) from 201"), eq(jactor101Id), eq(chatMsg201.getContent()));
    verify(logger201Mock, never()).info(eq("Actor {} received new msg ({}) from 101"), eq(jactor201Id), eq(chatMsg101.getContent()));
    verify(logger101Mock, timeout(5000).times(1)).info(eq("Actor {} received new msg ({}) from 201"), eq(jactor101Id), eq(chatMsg201.getContent()));
    verify(logger201Mock, timeout(5000).times(1)).info(eq("Actor {} received new msg ({}) from 101"), eq(jactor201Id), eq(chatMsg101.getContent()));
}

@Test
public void testGet() throws Exception {
    //GIVEN
    MsgConfirmation confirmationChatMsg = new MsgConfirmation(new MessageId(1));
    UserId jactorId = new UserId(401);
    Actor jactor = jActorService.findOrInit(jactorId);

    Logger logger101Mock = mock(Logger.class);
    Reflection.field("logger").ofType(Logger.class).in(jactor).set(logger101Mock);

    //WHEN
    JAFuture future = new JAFuture();
    Object result = future.send(jactor, confirmationChatMsg);

    //THEN
    assertThat(result).isEqualTo(Confirmation.NO);

    verify(logger101Mock, times(1)).info(eq("Start sleeping"));
    verify(logger101Mock, times(1)).info(eq("Stop sleeping"));

}

@Test
public void testPersist() throws Exception {
    //GIVEN
    UserId jactorId = new UserId(601);
    ChatActor jactor = jActorService.findOrInit(jactorId);

    Logger loggerMock = mock(Logger.class);
    Reflection.field("logger").ofType(Logger.class).in(jactor).set(loggerMock);

    MessageId messageId = new MessageId(1);
    UserId sender = new UserId(701);
    ChatMsg chatMsg = new ChatMsg(messageId, sender, "Test msg");

    //WHEN
    jactor.process(chatMsg);

    //THEN
    verify(loggerMock, timeout(5000).times(1)).info(eq("Actor {} received new msg ({}) from " + sender.getCode()), eq(jactorId), eq(chatMsg.getContent()));

    //AND GIVEN
    MsgConfirmation confirmationChatMsg = new MsgConfirmation(messageId);

    //WHEN
    JAFuture future = new JAFuture();
    Confirmation confirmation = confirmationChatMsg.send(future, jactor);

    //THEN
    assertThat(confirmation).isEqualTo(Confirmation.YES);

    //AND WHEN
    jactor.persist(messageId);

    confirmation = confirmationChatMsg.send(future, jactor);

    //THEN
    assertThat(confirmation).isEqualTo(Confirmation.NO);

}

} {% endcodeblock %}