Abstract : Nearly 40 years ago Carl Hewitt, Peter Bishop and Richard Steiger introduced the actor model. Since then it has been built in some languages (such as Scala, Erlang, etc.) and has been implemented by several frameworks. One (of not so many) Java actor frameworks is the JActor framework - “a high-throughput Java Actor framework” (as described by its author Bill la Forge). In this post we are going to take a (brief) look at JActor framework by building a simple backbone for chatting (j)actors.
Goal : Build a simple chat application using JActor framework
Acknowledgement : My gratitude goes to the open source community and especially to:
JActor is based on the idea that mailboxes should be handled by separate threads, but not the actors. This means that actors sharing common mailbox will be communicating quite fast and without the need of memory synchronisation, since everything is happening locally to the thread. This on the other hand brings us to two different basic cases:
1. Actors sharing a mailbox
2. Actors with different mailboxes
For our chat scenario we will build two types of actors in order to capture the two basic cases. The first one (ChatActor) would represent the actual, physical users of our system while the second type (StorageActor) will represent a storage (such as a DB, File system, etc.). ChatActors will exchange messages using the same mailbox (and thus the same thread) while a ChatActor and StorageActor will exchange messages using two different mailboxes (possibly two different threads).
You can think of these identifiers as a type-safe, sequence based artifacts for implementing the identification aspect of our system. Once we have them, we can define a ChatActor type, which (as mentioned earlier) will represent an actual, physical user of our system:
packageorg.ingini.jactor.chat.actor;importcom.google.common.collect.Maps;importorg.agilewiki.jactor.Actor;importorg.agilewiki.jactor.RP;importorg.agilewiki.jactor.lpc.JLPCActor;importorg.ingini.jactor.chat.domain.*;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.util.Map;importjava.util.concurrent.TimeUnit;publicclassChatActorextendsJLPCActor{privatefinalLoggerlogger=LoggerFactory.getLogger(this.getClass());privatefinalUserIdid;privatefinalMap<MessageId,ChatMsg>messages=Maps.newHashMap();privatefinalActorstorageActor;publicChatActor(UserIdid,ActorstorageActor){this.id=id;this.storageActor=storageActor;}publicvoidprocess(ChatMsgreq)throwsException{TimeUnit.SECONDS.sleep(2);logger.info("Actor {} received new msg ({}) from "+req.getSender(),id,req.getContent());messages.put(req.getId(),req);}publicConfirmationget(MsgConfirmationreq)throwsException{logger.info("Confirming message {}",req.getMessageId());logger.info("Start sleeping");TimeUnit.SECONDS.sleep(2);logger.info("Stop sleeping");returnmessages.containsKey(req.getMessageId())?Confirmation.YES:Confirmation.NO;}publicvoidpersist(MessageIdmessageId)throwsException{PersistChatMsgpersistChatMsg=newPersistChatMsg(messages.get(messageId));persistChatMsg.send(this,storageActor,newRP<MessageId>(){@OverridepublicvoidprocessResponse(MessageIdmessageId)throwsException{logger.info("Removing message {}",messageId);messages.remove(messageId);}});}}
In the actor we have three methods which will expose three different ways of communication:
1. The void process(ChatMsg req) (line 29) is of the type fire-and-forget. This non-blocking behavior is tested in the TestChat.testProcess() method.
2. The Confirmation get(MsgConfirmation req) (line 36) and the TestChat.testGet() shows how you can use a JAFuture instance to wait for the result of a message processing. In this example we wait for a Confirmation result when processing a MsgConfirmation message. Note that there is a second, type-safe way of getting the result by invoking MsgConfirmation.send(future, actor), this can be seen in the TestChat.testPersist().
3. The void persist(MessageId messageId) (line 45) shows a blocking communication between two actors, each one with it’s own mailbox. Take a look at the TestChat.testPersist() for a use-case scenario.
Quick side-note: In the course of this post you will see several places with TimeUnit.SECONDS.sleep(*);. The purposes of this is to make the blocking or non-blocking behavior more evident.
Step 2: Fixing the messages
Our model representation of the chat data (that is going to be exchanged between the chat actors) is encapsulated in a ChatMsg type:
For our example we also need a MsgConfirmation type which we use to confirm the presents of a message in a ChatActor’s messages map (see ChatActor.java, line 20):
Now that we have all pieces of the puzzle, we can start putting them in the right places. For this purpose, we need a Spring Java-based configuration class:
packageorg.ingini.jactor.chat.service;importcom.google.common.base.Objects;importorg.agilewiki.jactor.Actor;importorg.agilewiki.jactor.Mailbox;importorg.ingini.jactor.chat.actor.ChatActor;importorg.ingini.jactor.chat.domain.UserId;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjavax.annotation.concurrent.ThreadSafe;importjava.util.concurrent.ConcurrentHashMap;importjava.util.concurrent.ConcurrentMap;@Service@ThreadSafepublicclassJActorService{privatefinalLoggerlogger=LoggerFactory.getLogger(this.getClass());privatefinalConcurrentMap<UserId,ChatActor>actorMap=newConcurrentHashMap<UserId,ChatActor>();@AutowiredprivateMailboxchatMailbox;@AutowiredprivateActorstorageActor;publicChatActorfindOrInit(UserIdid){ChatActorclientActor;if(!actorMap.containsKey(id)){clientActor=newChatActor(id,storageActor);try{clientActor.initialize(chatMailbox);}catch(Exceptione){logger.error("Problem while initializing actor {}",id,e);thrownewIllegalStateException(e);}returnObjects.firstNonNull(actorMap.putIfAbsent(id,clientActor),clientActor);}returnactorMap.get(id);}}
packageorg.ingini.jactor.chat;importorg.agilewiki.jactor.Actor;importorg.agilewiki.jactor.JAFuture;importorg.fest.reflect.core.Reflection;importorg.ingini.jactor.chat.actor.ChatActor;importorg.ingini.jactor.chat.conf.JActorConfig;importorg.ingini.jactor.chat.domain.*;importorg.ingini.jactor.chat.service.JActorService;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.slf4j.Logger;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.test.context.ContextConfiguration;importorg.springframework.test.context.junit4.SpringJUnit4ClassRunner;importstaticorg.fest.assertions.Assertions.assertThat;importstaticorg.mockito.Matchers.eq;importstaticorg.mockito.Mockito.*;@ContextConfiguration(classes={JActorConfig.class})@RunWith(SpringJUnit4ClassRunner.class)publicclassTestChat{@AutowiredprivateJActorServicejActorService;@TestpublicvoidtestProcess()throwsException{//GIVENUserIdjactor101Id=newUserId(101);ChatActorjactor101=jActorService.findOrInit(jactor101Id);UserIdjactor201Id=newUserId(201);ChatActorjactor201=jActorService.findOrInit(jactor201Id);Loggerlogger101Mock=mock(Logger.class);Reflection.field("logger").ofType(Logger.class).in(jactor101).set(logger101Mock);Loggerlogger201Mock=mock(Logger.class);Reflection.field("logger").ofType(Logger.class).in(jactor201).set(logger201Mock);//WHENChatMsgchatMsg201=newChatMsg(newMessageId(1),jactor201Id,"Hi, I'm 201!");chatMsg201.sendEvent(jactor101);ChatMsgchatMsg101=newChatMsg(newMessageId(2),jactor101Id,"Hi, I'm 101!");chatMsg101.sendEvent(jactor201);//THENverify(logger101Mock,never()).info(eq("Actor {} received new msg ({}) from 201"),eq(jactor101Id),eq(chatMsg201.getContent()));verify(logger201Mock,never()).info(eq("Actor {} received new msg ({}) from 101"),eq(jactor201Id),eq(chatMsg101.getContent()));verify(logger101Mock,timeout(5000).times(1)).info(eq("Actor {} received new msg ({}) from 201"),eq(jactor101Id),eq(chatMsg201.getContent()));verify(logger201Mock,timeout(5000).times(1)).info(eq("Actor {} received new msg ({}) from 101"),eq(jactor201Id),eq(chatMsg101.getContent()));}@TestpublicvoidtestGet()throwsException{//GIVENMsgConfirmationconfirmationChatMsg=newMsgConfirmation(newMessageId(1));UserIdjactorId=newUserId(401);Actorjactor=jActorService.findOrInit(jactorId);Loggerlogger101Mock=mock(Logger.class);Reflection.field("logger").ofType(Logger.class).in(jactor).set(logger101Mock);//WHENJAFuturefuture=newJAFuture();Objectresult=future.send(jactor,confirmationChatMsg);//THENassertThat(result).isEqualTo(Confirmation.NO);verify(logger101Mock,times(1)).info(eq("Start sleeping"));verify(logger101Mock,times(1)).info(eq("Stop sleeping"));}@TestpublicvoidtestPersist()throwsException{//GIVENUserIdjactorId=newUserId(601);ChatActorjactor=jActorService.findOrInit(jactorId);LoggerloggerMock=mock(Logger.class);Reflection.field("logger").ofType(Logger.class).in(jactor).set(loggerMock);MessageIdmessageId=newMessageId(1);UserIdsender=newUserId(701);ChatMsgchatMsg=newChatMsg(messageId,sender,"Test msg");//WHENjactor.process(chatMsg);//THENverify(loggerMock,timeout(5000).times(1)).info(eq("Actor {} received new msg ({}) from "+sender.getCode()),eq(jactorId),eq(chatMsg.getContent()));//AND GIVENMsgConfirmationconfirmationChatMsg=newMsgConfirmation(messageId);//WHENJAFuturefuture=newJAFuture();Confirmationconfirmation=confirmationChatMsg.send(future,jactor);//THENassertThat(confirmation).isEqualTo(Confirmation.YES);//AND WHENjactor.persist(messageId);confirmation=confirmationChatMsg.send(future,jactor);//THENassertThat(confirmation).isEqualTo(Confirmation.NO);}}