/ Java

AKKA Ask Pattern: For Those Times When You Have to Block

Abstract : Since you are here, chances are, you are in one of those situations where you have to come up with a blocking solution using AKKA. Thus I’m going to skip the actor model introduction which is necessary to understand how AKKA actor model implementation works (you can read more about the actor model on the AKKA docs website). In this post you will find out how you can take advantage of AKKA’s ask pattern to do blocking when necessary.

Goal : Develop a “fire-and-await-confirmation” system based on AKKA ask pattern

Acknowledgement : My gratitude goes to the open source community and especially to:

Jonas Bonér (@jboner) for the creation of AKKA

Viktor Klang (@viktorklang) for his attention to the details

Maxime Nowak (@maximenowak) for his eye-opening discussions and “religious” clean-coding

Code : You can download the full source of the project from GitHub

Maven Dependencies : There is nothing fancy in the pom.xml. Thus I’m not going to list it here.

Step 1 : The business model

We are going to develop a system for processing business messages. What we want from our system is to confirm that all messages are processed within a given timeout. Now, I would like to emphasis one more time on within a given timeout. If you think that you need to wait forever, I would advise you to re-think very, very carefully. There is a chance you have some bad smell in your architecture. Now, back to our business model, we need two domain objects:

1 ) a BusinessMessage to be processed by our BusinessService:

package org.ingini.akka.blocking.message;

import javax.annotation.concurrent.Immutable;

@Immutable
public class BusinessMessage {

    private final String msgBody;

    public BusinessMessage(String msgBody) {
        this.msgBody = msgBody;
    }

    public String getMsgBody() {
        return msgBody;
    }
}

2 ) an AcknowledgementMessage :

package org.ingini.akka.blocking.message;

import javax.annotation.concurrent.Immutable;

@Immutable
public class AcknowledgementMessage {

    private static final AcknowledgementMessage INSTANCE = new AcknowledgementMessage();

    private AcknowledgementMessage() {
    }

    public static AcknowledgementMessage getInstance() {
        return INSTANCE;
    }
}

The AcknowledgementMessage will be sent by our BusinessConfirmationActor, once a BusinessMessage is processed.

Next we are going to “spoil” our business domain heaven by introducing a third domain object called BusinessMessages. The purposes of this wrapper object are purely technical - to ensure immutability, to increase readability, and to decrease maintainability costs.

package org.ingini.akka.blocking.message;

import javax.annotation.concurrent.Immutable;
import java.util.Collections;
import java.util.List;

@Immutable
public class BusinessMessages {
    private final List<BusinessMessage> businessMessages;

    public BusinessMessages(List<BusinessMessage> businessMessages) {
        this.businessMessages = Collections.unmodifiableList(businessMessages);
    }

    public List<BusinessMessage> getBusinessMessages() {
        return businessMessages;
    }
}

As you can see, BusinessMessages is a simple wrapper over an unmodifiable list of messages. And as you will see, a bit later, this object makes reading a bit easier when implementing the actor infrastructure.

Step 2 : The business service

For the sake of simplicity and clarity I’m going to keep our business service as simple as possible. So, here it is:

package org.ingini.akka.blocking.service;

import org.ingini.akka.blocking.message.BusinessMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class BusinessService {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    public void doBusiness(BusinessMessage businessMessage) {
        logger.info("Processing business message {}", businessMessage.getMsgBody());
    }
}

It does nothing more than pringing the message body of our business message. Now, let’s concentrate on the more important part - the infrastructural backbone provided by AKKA:

Step 3 : AKKA - The infrastructural backbone

Our most basic piece of machinery will be the BusinessConfirmationActor. As its name says, this actor will do some business (line 16) and when finished with the business doing it will send a confirmation (line 17):

package org.ingini.akka.blocking.actor;

import akka.actor.UntypedActor;
import org.ingini.akka.blocking.message.AcknowledgementMessage;
import org.ingini.akka.blocking.message.BusinessMessage;
import org.ingini.akka.blocking.service.BusinessService;
import org.springframework.beans.factory.annotation.Autowired;

public class BusinessConfirmationActor extends UntypedActor {

    @Autowired
    private BusinessService businessService;

    @Override
    public void onReceive(Object message) throws Exception {
        businessService.doBusiness((BusinessMessage) message);
        getSender().tell(AcknowledgementMessage.getInstance());
    }
}

Due to the simplicity of our actor, the only bit that might appear a bit interesting is the autowiring of the BusinessService. The autoworing is backed-up by Spring and by a custom Spring-to-AKKA solution. You can find out more in a post I’ve already dedicated to that AKKA actor dependency injection using Spring and of course you can examine the code directly on GitHub.

The second piece of our AKKA infrastructure is the FireAndAwaitMediatorActor. An implementation of the gang of four (GoF) Mediator Pattern. It mediates the communication between a client service (which we haven’t seen yet) and the instances of BusinessConfirmationActor.

package org.ingini.akka.blocking.actor;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.routing.RoundRobinRouter;
import org.ingini.akka.blocking.message.AcknowledgementMessage;
import org.ingini.akka.blocking.message.BusinessMessages;
import org.ingini.akka.di.DependencyInjectionProps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;

import java.util.List;

public class FireAndAwaitMediatorActor extends UntypedActor {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private static final int MAX_NUMBER_OF_ACTORS = 5;
    private ActorRef router;

    @Autowired
    private ApplicationContext applicationContext;

    private long messageCounter;
    private ActorRef acknowledgeRequester;

    @Override
    public void preStart() {
        router = getContext().actorOf(new DependencyInjectionProps(//
                applicationContext, BusinessConfirmationActor.class)//
                .withRouter(new RoundRobinRouter(MAX_NUMBER_OF_ACTORS)), "router");
    }

    @Override
    public void onReceive(Object message) {
        if (message instanceof BusinessMessages) {
            acknowledgeRequester = getSender();
            List messages = ((BusinessMessages) message).getBusinessMessages();
            messageCounter = messages.size();
            processBusinessMessages(messages);
            return;
        } else if (message instanceof AcknowledgementMessage) {
            processAcknowledgementMessage(message);
            return;
        }

        logger.error("Cannot process message: {}", message);

        throw new IllegalStateException("Cannot process message!");
    }

    private void processBusinessMessages(List messages) {
        logger.info("Total message for processing: {}", messageCounter);
        for (Object msg : messages) {
            router.tell(msg, getSelf());
        }
    }

    private void processAcknowledgementMessage(Object message) {
        messageCounter--;
        if (messageCounter < 1 && acknowledgeRequester != null) {
            logger.info("Finally all messages are processed! Notifying requester ... ");
            acknowledgeRequester.tell(message);
        } else {
            logger.info("Messages are processed but no acknowledge requester is set. " + //
                    "Awaiting for acknowledgement request.");
        }
    }
}

FireAndAwaitMediatorActor encapsulates routing logic provided by AKKA’s RoundRobinRouter. The router will create at maximum 5 BusinessConfirmationActors and give them a BusinessMessage for processing (look at line 57). Once all BusinessMessage are sent, we will await for the messageCounter to drop to 0 and then the FireAndAwaitMediatorActor will send a confirmation message back to the original sender of the BusinessMessages (line 65), and that’s our client service, so continue reading.

Step 4: The client (service)

Having looked at the backbone infrastructure and the business logic, what’s left is to see how we can use our system. For that purpose, I have developed two tests cases. Pay attention to lines 60, 64, 84, and 88. These are the lines you need when you develop your own client service. The Patterns.ask() will send a message (the 2nd argument) to the actor provided as first argument and will return an AKKA Future which will be completed within the specified timeout duration (given as 3rd argument). Once we have the AKKA Future we are going to await on it for a result.

package org.ingini.akka.blocking;

import akka.actor.ActorRef;
import akka.dispatch.Await;
import akka.dispatch.Future;
import akka.pattern.Patterns;
import akka.util.Duration;
import akka.util.Timeout;
import org.ingini.akka.blocking.message.AcknowledgementMessage;
import org.ingini.akka.blocking.message.BusinessMessage;
import org.ingini.akka.blocking.message.BusinessMessages;
import org.ingini.akka.blocking.service.BusinessService;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;

import static org.fest.assertions.Assertions.assertThat;

@ContextConfiguration(classes = {Bootstrap.class})
@RunWith(SpringJUnit4ClassRunner.class)
public class BusinessTest {

    @Autowired
    private ActorRef fireAndAwaitRouter;

    @Rule
    public ExpectedException expectedExceptionRule = ExpectedException.none();

    @Mock
    public BusinessService mockedBusinessService;

    @Before
    public void beforeTest() {
        MockitoAnnotations.initMocks(this);
    }

    @Test
    public void allActorsShouldAcknowledgeWithinTime() throws Exception {
        //GIVEN
        int numberOfMessages = 100;
        List<BusinessMessage> messages = new ArrayList<BusinessMessage>(numberOfMessages);
        for (int i = 0; i < numberOfMessages; i++) {
            messages.add(new BusinessMessage("Msg number: " + i));
        }

        Duration duration = Duration.apply("10 sec");

        //WHEN
        Future<Object> answer = Patterns.ask(fireAndAwaitRouter, new BusinessMessages(messages), //
                Timeout.durationToTimeout(duration));

        //THEN
        assertThat(Await.result(answer, duration))//
                .isEqualTo(AcknowledgementMessage.getInstance());

    }

    @Test
    public void failureToAcknowledgeWithinTimeout() throws Exception {
        //GIVEN
        expectedExceptionRule.expect(TimeoutException.class);
        expectedExceptionRule.expectMessage("Futures timed out after [1] milliseconds");

        int numberOfMessages = 100000;
        List<BusinessMessage> messages = new ArrayList<BusinessMessage>(numberOfMessages);
        for (int i = 0; i < numberOfMessages; i++) {
            messages.add(new BusinessMessage("Msg number: " + i));
        }

        Duration duration = Duration.apply("1 millisecond");

        //WHEN
        Future<Object> answer = Patterns.ask(fireAndAwaitRouter, //
                new BusinessMessages(messages), Timeout.durationToTimeout(duration));

        //THEN
        Await.result(answer, duration);

    }
}

Within the method allActorsShouldAcknowledgeWithinTime(), we test that all actors (BusinessConfirmationActors) acknowledge their business messages within 10 sec by not expecting any (await-ing) timeout exception. The second test case, failureToAcknowledgeWithinTimeout(), is that the actors won’t acknowledge the processing of 100’000 messages within 1 millisecond, thus we will have an (await-ing) timeout exception. Having said that, I have to point out that the second test case may actually fail if you have a fast machine. In order to do it correctly, we can actually mock the BusinessService within a BusinessConfirmationActor and make it block for at least 1.1 milliseconds. I haven’t done that to keep the post shorter.

Conclusion : Before I wish you a farewell my dear reader, I would like to draw your attention on three things:

Blocking is almost never a good idea, so try to avoid it.

Using AKKA is not so difficult and you should not be afraid to use it. Once you understand the actor model and how it’s implemented in AKKA it’s really a fun to work with it.

Do NOT sacrifice the quality of your code. In our example, I’ve introduced the BusinessMessages wrapper object which has no direct business value. Or does it? How would you like to see:

if( message instanceof List ) {
...
}

rather than

if( message instanceof BusinessMessages ) {
...
}

or even better: apply the GoF Strategy pattern and remove all ifs! Farewell for now and don’t forget to leave a comment!

Ivan Hristov

Ivan Hristov

I am a lead software engineer working on software topics related to cloud, machine learning, and site reliability engineering.

Read More