For the sake of imagination

Yearn, dare, do

Chatting Through JActor

Permalink

Abstract : Nearly 40 years ago Carl Hewitt, Peter Bishop and Richard Steiger introduced the actor model. Since then it has been built in some languages (such as Scala, Erlang, etc.) and has been implemented by several frameworks. One (of not so many) Java actor frameworks is the JActor framework - “a high-throughput Java Actor framework” (as described by its author Bill la Forge). In this post we are going to take a (brief) look at JActor framework by building a simple backbone for chatting (j)actors.

Goal : Build a simple chat application using JActor framework

Acknowledgement : My gratitude goes to the open source community and especially to:

Bill la Forge creator of JActor framework

Code : Project code can be found @ GitHub under Apache License, Version 2.0

JActor is based on the idea that mailboxes should be handled by separate threads, but not the actors. This means that actors sharing common mailbox will be communicating quite fast and without the need of memory synchronisation, since everything is happening locally to the thread. This on the other hand brings us to two different basic cases:
1. Actors sharing a mailbox
2. Actors with different mailboxes

For our chat scenario we will build two types of actors in order to capture the two basic cases. The first one (ChatActor) would represent the actual, physical users of our system while the second type (StorageActor) will represent a storage (such as a DB, File system, etc.). ChatActors will exchange messages using the same mailbox (and thus the same thread) while a ChatActor and StorageActor will exchange messages using two different mailboxes (possibly two different threads).

NOTE: For more information about JActor, take a look at the short (20 slides) presentation written by Bill la Forge available on-line. For more code samples, check:
* JActor in action
* IBM’s developerworks blog of Bill la Forge

Maven dependencies:

This project is build using Maven and several libraries, namely JActor v4.1.0, Logback, Google’s implementation of JSR305, Google’s Guava, cglib (needed by Spring), Spring framework, FEST-Assert, FEST-Reflect, JUnit, and Mockito. The maven pom.xml looks as follows:

pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.ingini.jactor</groupId>
    <artifactId>jactor-chat</artifactId>
    <version>1.0</version>

    <properties>
        <cglib.version>2.2</cglib.version>
        <spring.version>3.1.1.RELEASE</spring.version>
        <logback.version>1.0.6</logback.version>
        <fest-reflect.version>1.4</fest-reflect.version>
        <fest-assert.version>1.4</fest-assert.version>
        <jactor.version>4.1.0</jactor.version>
        <jsr305.version>2.0.1</jsr305.version>
        <guava.version>13.0.1</guava.version>
        <junit.version>4.10</junit.version>
        <mockito-all.version>1.9.0</mockito-all.version>
    </properties>

    <dependencies>

        <!-- JActor framework -->
        <dependency>
            <groupId>org.agilewiki.jactor</groupId>
            <artifactId>jactor</artifactId>
            <version>${jactor.version}</version>
        </dependency>

        <!-- Logging -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>

        <!-- Tools -->
        <dependency>
            <groupId>com.google.code.findbugs</groupId>
            <artifactId>jsr305</artifactId>
            <version>${jsr305.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>${guava.version}</version>
        </dependency>

        <dependency>
            <groupId>cglib</groupId>
            <artifactId>cglib-nodep</artifactId>
            <version>${cglib.version}</version>
        </dependency>

        <!-- Spring dependencies -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <!-- Test dependencies -->
        <dependency>
            <groupId>org.easytesting</groupId>
            <artifactId>fest-reflect</artifactId>
            <version>${fest-reflect.version}</version>
        </dependency>

        <dependency>
            <groupId>org.easytesting</groupId>
            <artifactId>fest-assert</artifactId>
            <version>${fest-assert.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>

        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-all</artifactId>
            <version>${mockito-all.version}</version>
        </dependency>

    </dependencies>

</project>

Step 1: The ChatActor

Before we define our ChatActor we need some identifier which will allow us to create and find an actor for a user. So we define a simple UserId type:

UserId.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package org.ingini.jactor.chat.domain;

import com.google.common.base.Objects;

import javax.annotation.concurrent.Immutable;

@Immutable
public class UserId {

    private final long code;

    public UserId(long code) {
        this.code = code;
    }

    public long getCode() {
        return code;
    }

    @Override
    public int hashCode() {
        return Objects.hashCode(code);
    }

    @Override
    public boolean equals(Object obj) {
        if(!(obj instanceof UserId)) {
            return false;
        }
        return Objects.equal(this.code, ((UserId) obj).code);
    }

    @Override
    public String toString() {
        return String.valueOf(code);
    }
}

Likewise, we can define a MessageId:

MessageId.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package org.ingini.jactor.chat.domain;

import com.google.common.base.Objects;

import javax.annotation.concurrent.Immutable;

@Immutable
public class MessageId {

    private final long code;

    public MessageId(long code) {
        this.code = code;
    }

    public long getCode() {
        return code;
    }

    @Override
    public int hashCode() {
        return Objects.hashCode(code);
    }

    @Override
    public boolean equals(Object obj) {
        if(!(obj instanceof MessageId)) {
            return false;
        }
        return Objects.equal(this.code, ((MessageId) obj).code);
    }

    @Override
    public String toString() {
        return String.valueOf(code);
    }
}

You can think of these identifiers as a type-safe, sequence based artifacts for implementing the identification aspect of our system. Once we have them, we can define a ChatActor type, which (as mentioned earlier) will represent an actual, physical user of our system:

ChatActor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package org.ingini.jactor.chat.actor;

import com.google.common.collect.Maps;
import org.agilewiki.jactor.Actor;
import org.agilewiki.jactor.RP;
import org.agilewiki.jactor.lpc.JLPCActor;
import org.ingini.jactor.chat.domain.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.TimeUnit;

public class ChatActor extends JLPCActor {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private final UserId id;

    private final Map<MessageId, ChatMsg> messages = Maps.newHashMap();

    private final Actor storageActor;

    public ChatActor(UserId id, Actor storageActor) {
        this.id = id;
        this.storageActor = storageActor;
    }

    public void process(ChatMsg req) throws Exception {
        TimeUnit.SECONDS.sleep(2);
        logger.info("Actor {} received new msg ({}) from " + req.getSender(), id, req.getContent());

        messages.put(req.getId(), req);
    }

    public Confirmation get(MsgConfirmation req) throws Exception {
        logger.info("Confirming message {}", req.getMessageId());

        logger.info("Start sleeping");
        TimeUnit.SECONDS.sleep(2);
        logger.info("Stop sleeping");
        return messages.containsKey(req.getMessageId()) ? Confirmation.YES : Confirmation.NO;
    }

    public void persist(MessageId messageId) throws Exception {
        PersistChatMsg persistChatMsg = new PersistChatMsg(messages.get(messageId));
        persistChatMsg.send(this, storageActor, new RP<MessageId>() {
            @Override
            public void processResponse(MessageId messageId) throws Exception {
                logger.info("Removing message {}", messageId);
                messages.remove(messageId);
            }
        });
    }
}

In the actor we have three methods which will expose three different ways of communication:
1. The void process(ChatMsg req) (line 29) is of the type fire-and-forget. This non-blocking behavior is tested in the TestChat.testProcess() method.
2. The Confirmation get(MsgConfirmation req) (line 36) and the TestChat.testGet() shows how you can use a JAFuture instance to wait for the result of a message processing. In this example we wait for a Confirmation result when processing a MsgConfirmation message. Note that there is a second, type-safe way of getting the result by invoking MsgConfirmation.send(future, actor), this can be seen in the TestChat.testPersist().
3. The void persist(MessageId messageId) (line 45) shows a blocking communication between two actors, each one with it’s own mailbox. Take a look at the TestChat.testPersist() for a use-case scenario.

Quick side-note: In the course of this post you will see several places with TimeUnit.SECONDS.sleep(*);. The purposes of this is to make the blocking or non-blocking behavior more evident.

Step 2: Fixing the messages

Our model representation of the chat data (that is going to be exchanged between the chat actors) is encapsulated in a ChatMsg type:

ChatMsg.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package org.ingini.jactor.chat.domain;

import org.agilewiki.jactor.Actor;
import org.agilewiki.jactor.RP;
import org.agilewiki.jactor.lpc.JLPCActor;
import org.agilewiki.jactor.lpc.Request;
import org.ingini.jactor.chat.actor.ChatActor;

import javax.annotation.concurrent.Immutable;

@Immutable
public class ChatMsg extends Request<Void, ChatActor> {

    private final MessageId id;
    private final UserId sender;
    private final String content;

    public ChatMsg(MessageId id, UserId sender, String content) {
        this.id = id;
        this.sender = sender;
        this.content = content;
    }

    @Override
    public boolean isTargetType(Actor targetActor) {
        return targetActor instanceof ChatActor;
    }

    @Override
    public void processRequest(JLPCActor targetActor, RP rp) throws Exception {
       ((ChatActor) targetActor).process(this);

        //Finished processing msg
        rp.processResponse(null);
    }

    public MessageId getId() {
        return id;
    }

    public String getContent() {
        return content;
    }

    public UserId getSender() {
        return sender;
    }

    @Override
    public String toString() {
        return "ChatMsg{" +
                "sender=" + sender +
                ", content='" + content + '\'' +
                '}';
    }
}

For our example we also need a MsgConfirmation type which we use to confirm the presents of a message in a ChatActor’s messages map (see ChatActor.java, line 20):

MsgConfirmation.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package org.ingini.jactor.chat.domain;

import org.agilewiki.jactor.Actor;
import org.agilewiki.jactor.RP;
import org.agilewiki.jactor.lpc.JLPCActor;
import org.agilewiki.jactor.lpc.Request;
import org.ingini.jactor.chat.actor.ChatActor;

import javax.annotation.concurrent.Immutable;

@Immutable
public class MsgConfirmation extends Request<Confirmation, ChatActor> {

    private final MessageId messageId;

    public MsgConfirmation(MessageId messageId) {
        this.messageId = messageId;
    }

    public MessageId getMessageId() {
        return messageId;
    }

    @Override
    public boolean isTargetType(Actor targetActor) {
        return targetActor instanceof ChatActor;
    }

    @Override
    public void processRequest(JLPCActor targetActor, RP rp) throws Exception {
        ChatActor chatActor = (ChatActor) targetActor;
        rp.processResponse(chatActor.get(this));
    }
}

The Confirmation is a simple enum:

MsgConfirmation.java
1
2
3
4
5
package org.ingini.jactor.chat.domain;

public enum Confirmation {
    YES, NO
}

We need one more message type in order to support the communication between a ChatActor and a StorageActor. This is the PersistChatMsg message:

PersistChatMsg.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package org.ingini.jactor.chat.domain;

import org.agilewiki.jactor.Actor;
import org.agilewiki.jactor.RP;
import org.agilewiki.jactor.lpc.JLPCActor;
import org.agilewiki.jactor.lpc.Request;
import org.ingini.jactor.chat.actor.StorageActor;

import javax.annotation.concurrent.Immutable;

@Immutable
public class PersistChatMsg extends Request<MessageId, StorageActor> {

    private final ChatMsg message;
    public PersistChatMsg(ChatMsg message) {
        this.message = message;
    }

    @Override
    public boolean isTargetType(Actor targetActor) {
        return targetActor instanceof StorageActor;
    }

    @Override
    public void processRequest(JLPCActor targetActor, RP rp) throws Exception {
       ((StorageActor) targetActor).process(this, rp);
    }

    public ChatMsg getMessage() {
        return message;
    }
}

Step 3: StorageActor

We can’t do without our StorageActor, defined as follows:

StorageActor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package org.ingini.jactor.chat.actor;

import org.agilewiki.jactor.RP;
import org.agilewiki.jactor.lpc.JLPCActor;
import org.ingini.jactor.chat.domain.PersistChatMsg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

public class StorageActor extends JLPCActor {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    public void process(PersistChatMsg persistChatMsg, RP rp) throws Exception {
        TimeUnit.SECONDS.sleep(3);
        logger.info("Message persisted!");
        rp.processResponse(persistChatMsg.getMessage().getId());
    }
}

Step 4: Gluing everything together

Now that we have all pieces of the puzzle, we can start putting them in the right places. For this purpose, we need a Spring Java-based configuration class:

JActorConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package org.ingini.jactor.chat.conf;

import org.agilewiki.jactor.Actor;
import org.agilewiki.jactor.JAMailboxFactory;
import org.agilewiki.jactor.Mailbox;
import org.agilewiki.jactor.MailboxFactory;
import org.ingini.jactor.chat.actor.StorageActor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan({"org.ingini.jactor.chat.conf", "org.ingini.jactor.chat.service", "org.ingini.jactor.chat.actor", "org.ingini.jactor.chat.domain" })
public class JActorConfig {

    @Autowired
    private MailboxFactory messageMailboxFactory;

    @Bean
    public MailboxFactory getMessagesMailboxFactory() {
        double blockingCoefficient = 0.2D; // Almost non-blocking Chat processing logic
        return JAMailboxFactory.newMailboxFactory(
                Math.round((float) (Runtime.getRuntime().availableProcessors() / (1 - blockingCoefficient))));
    }

    @Bean
    public Mailbox chatMailbox() {
        return messageMailboxFactory.createMailbox();
    }

    @Bean
    public Actor storageActor() throws Exception {
        StorageActor storageActor = new StorageActor();
        storageActor.initialize(messageMailboxFactory.createMailbox());
        return storageActor;
    }
}

… and a service for creating and finding ChatActors:

JActorService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package org.ingini.jactor.chat.service;

import com.google.common.base.Objects;
import org.agilewiki.jactor.Actor;
import org.agilewiki.jactor.Mailbox;
import org.ingini.jactor.chat.actor.ChatActor;
import org.ingini.jactor.chat.domain.UserId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.concurrent.ThreadSafe;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Service
@ThreadSafe
public class JActorService {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private final ConcurrentMap<UserId, ChatActor> actorMap = new ConcurrentHashMap<UserId, ChatActor>();

    @Autowired
    private Mailbox chatMailbox;

    @Autowired
    private Actor storageActor;

    public ChatActor findOrInit(UserId id) {

        ChatActor clientActor;
        if (!actorMap.containsKey(id)) {
            clientActor = new ChatActor(id, storageActor);

            try {
                clientActor.initialize(chatMailbox);
            } catch (Exception e) {
                logger.error("Problem while initializing actor {}", id, e);
                throw new IllegalStateException(e);
            }
            return Objects.firstNonNull(actorMap.putIfAbsent(id, clientActor), clientActor);
        }

        return actorMap.get(id);
    }

}

Step 5: Testing

Finally, here is our test case:

TestChat.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package org.ingini.jactor.chat;

import org.agilewiki.jactor.Actor;
import org.agilewiki.jactor.JAFuture;
import org.fest.reflect.core.Reflection;
import org.ingini.jactor.chat.actor.ChatActor;
import org.ingini.jactor.chat.conf.JActorConfig;
import org.ingini.jactor.chat.domain.*;
import org.ingini.jactor.chat.service.JActorService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import static org.fest.assertions.Assertions.assertThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;

@ContextConfiguration(classes = {JActorConfig.class})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestChat {

    @Autowired
    private JActorService jActorService;

    @Test
    public void testProcess() throws Exception {
        //GIVEN
        UserId jactor101Id = new UserId(101);
        ChatActor jactor101 = jActorService.findOrInit(jactor101Id);
        UserId jactor201Id = new UserId(201);
        ChatActor jactor201 = jActorService.findOrInit(jactor201Id);

        Logger logger101Mock = mock(Logger.class);
        Reflection.field("logger").ofType(Logger.class).in(jactor101).set(logger101Mock);
        Logger logger201Mock = mock(Logger.class);
        Reflection.field("logger").ofType(Logger.class).in(jactor201).set(logger201Mock);

        //WHEN
        ChatMsg chatMsg201 = new ChatMsg(new MessageId(1), jactor201Id, "Hi, I'm 201!");
        chatMsg201.sendEvent(jactor101);

        ChatMsg chatMsg101 = new ChatMsg(new MessageId(2), jactor101Id, "Hi, I'm 101!");
        chatMsg101.sendEvent(jactor201);

        //THEN
        verify(logger101Mock, never()).info(eq("Actor {} received new msg ({}) from 201"), eq(jactor101Id), eq(chatMsg201.getContent()));
        verify(logger201Mock, never()).info(eq("Actor {} received new msg ({}) from 101"), eq(jactor201Id), eq(chatMsg101.getContent()));
        verify(logger101Mock, timeout(5000).times(1)).info(eq("Actor {} received new msg ({}) from 201"), eq(jactor101Id), eq(chatMsg201.getContent()));
        verify(logger201Mock, timeout(5000).times(1)).info(eq("Actor {} received new msg ({}) from 101"), eq(jactor201Id), eq(chatMsg101.getContent()));
    }

    @Test
    public void testGet() throws Exception {
        //GIVEN
        MsgConfirmation confirmationChatMsg = new MsgConfirmation(new MessageId(1));
        UserId jactorId = new UserId(401);
        Actor jactor = jActorService.findOrInit(jactorId);

        Logger logger101Mock = mock(Logger.class);
        Reflection.field("logger").ofType(Logger.class).in(jactor).set(logger101Mock);

        //WHEN
        JAFuture future = new JAFuture();
        Object result = future.send(jactor, confirmationChatMsg);

        //THEN
        assertThat(result).isEqualTo(Confirmation.NO);

        verify(logger101Mock, times(1)).info(eq("Start sleeping"));
        verify(logger101Mock, times(1)).info(eq("Stop sleeping"));

    }

    @Test
    public void testPersist() throws Exception {
        //GIVEN
        UserId jactorId = new UserId(601);
        ChatActor jactor = jActorService.findOrInit(jactorId);

        Logger loggerMock = mock(Logger.class);
        Reflection.field("logger").ofType(Logger.class).in(jactor).set(loggerMock);

        MessageId messageId = new MessageId(1);
        UserId sender = new UserId(701);
        ChatMsg chatMsg = new ChatMsg(messageId, sender, "Test msg");

        //WHEN
        jactor.process(chatMsg);

        //THEN
        verify(loggerMock, timeout(5000).times(1)).info(eq("Actor {} received new msg ({}) from " + sender.getCode()), eq(jactorId), eq(chatMsg.getContent()));

        //AND GIVEN
        MsgConfirmation confirmationChatMsg = new MsgConfirmation(messageId);

        //WHEN
        JAFuture future = new JAFuture();
        Confirmation confirmation = confirmationChatMsg.send(future, jactor);

        //THEN
        assertThat(confirmation).isEqualTo(Confirmation.YES);

        //AND WHEN
        jactor.persist(messageId);

        confirmation = confirmationChatMsg.send(future, jactor);

        //THEN
        assertThat(confirmation).isEqualTo(Confirmation.NO);

    }
}

Comments