Happy Messaging with ActiveMQ and SI (Part 1)

In one of my current projects we needed to set up a communication channel between two distinct Grails applications. One of them is the master application which runs an embedded ActiveMQ message broker [0]. The second one - the slave application - provides service APIs to the master application. Since Grails heavily relies on Spring, we decided to use Spring Integration as messaging framework [1]. Spring Integration is a messaging framework which supports various Enterprise Application Integration Patterns [2], without being bound to any specific messaging protocol. Since our project team chose to use ActiveMQ we go with JMS as underlying messaging protocol in our project.

Setting up an embedded ActiveMQ message broker

ActiveMQ is a fully JMS 1.1 compliant messaging provider which is available under the Apache Software License. It has quite a bag of features, the most important ones for us where persistent messages support. Besides running ActiveMQ as a distinct server, one can choose to run ActiveMQ as an embedded server inside the application. Configuring an embedded ActiveMQ broker using Grails' Beans DSL is pretty straight-forward (once you get used to the Beans DSL of course):
 
xmlns amq:'http://activemq.apache.org/schema/core'

def brokerName = 'myEmbeddedBroker'

amq.'broker'(brokerName: brokerName, useJmx: true, persistent: false) {
  amq.'managementContext'  {
    amq.'managementContext'(connectorPort: 2011, jmxDomainName: 'embeddedBroker')
  }

  amq.'plugins'  {
    amq.'loggingBrokerPlugin'
  }

  amq.'transportConnectors'  {
    amq.'transportConnector'(name: 'openwire', uri: 'tcp://localhost:61616')
  }
}
 
The code above configures an embedded broker called myEmbeddedBroker which only persists messages in-memory (persist: false), exposes itself as JMX bean (useJmx: true) and configures a transport connector using Openwire over TCP. In order to let the master application (which holds the configuration above) connect to its embedded message broker, we need to set up a connection factory:
 
connectionFactoryLocal(ActiveMQConnectionFactory)  {
  brokerURL = "vm://${brokerName}"
}
 
After all, we will define two message queues one for outgoing API requests to the slave application and one for incoming responses:
 
"requestQueue"(org.apache.activemq.command.ActiveMQQueue, "QUEUE.REQUEST")
"responseQueue"(org.apache.activemq.command.ActiveMQQueue, "QUEUE.RESPONSE")
 

Spring Integration comes into play

So far we have set up an embedded message broker which could be used for plain JMS API message exchange. In our project we decided to go with Spring Integration because it already implements several EAI patterns (e.g. router, gateway, etc.) and abstract from the underlying messaging protocol. A reference manual on Spring Integration can be found at [3], but let me give you a short introduction. Spring Integration (SI) is a messaging framework which implements patterns found in the book Enterprise Application Integration Patterns [4]. That is, SI is all about messages and message exchange. To exchange a message from point A to point B there needs to be a channel between A and B. Besides messages, channels are the second most important domain entity in SI. Channels are injected into your application components just like any other Spring bean. The basic MessageChannel interface is pretty rudimentary:
 
public interface MessageChannel {

	boolean send(Message<?> message);
	boolean send(Message<?> message, long timeout);
}
 
The use-case in our project was to automatically create a message and send it to some preconfigured channel whenever the programmers chooses to call a service API method:
 

def someApi

def doSomething()  {
   someApi.executeRemotely('first param', 'second param') // this should trigger message creation and sending/receiving

}

 
A call to executeRemotely should automatically create a message object from the input parameters and send it to some sort of API request channel. Luckily, SI provides the concept of gateways which solve that particular problem. At runtime, a gateway is a proxy object for a particular interface which, on a method call, creates a message object and sends it via some preconfigured channel. Like channels, gateways are Spring beans and can therefore be configured via the Beans DSL:
 
xmlns integration:'http://www.springframework.org/schema/integration'
xmlns jms:'http://www.springframework.org/schema/integration/jms'

integration.'channel'(id: 'apiChannelRequest')
integration.'channel'(id: 'apiChannelResponse')

integration.'gateway'(id: 'someApi', 'service-interface': org.ast.SomeApi.class.getName(), 'default-request-channel': 'apiChannelRequest', 'default-reply-channel': 'apiChannelResponse')  {

  integration.'method'(name: 'executeRemotely')  {
    integration.'header'(name: 'API_METHOD_NAME', value: 'executeRemotely')
  }
}
 
As you can see from the configuration snippet above, the gateway has a request/reply channel configured since gateways are synchronous (in SI 2.0 there is asynchronous gateway support) and bidirectional. The SomeApi interface uses SI annotations for further message configuration:
 
interface SomeApi {
    Boolean executeRemotely(final @Header("HEADER_NAME") String param1, final String param2)
}
 
From the gateway's view the interface above means: whenever executeRemotely is called, put param1 into a message header with name HEADER_NAME and put the second parameter into the message's payload. Maybe you noticed the API_METHOD_NAME parameter in the gateway configuration above - that was a message header too. We needed to manually inject a unique method identification token (in our case the method name only was enough) in order to call the correct method on the slave application side.

Configuring JMS messaging

So far we've set up an environment with an embedded ActiveMQ message broker and two ActiveMQ message queues. Now we need to configure the link between the SI channels configured in the last section and those JMS queues. Since gateways are bidirectional, SI needs to store some reply channel information whenever instantiating an API request. This is automatically done via the gateway implementation. If we would run inside a SI environment only we wouldn't need to care about this fact. In our case, we chose to use gatways to communicate between a master and a slave application which are in production deployed on separate server instances. In SI, a JMSOutboundGateway can be used for those JMS request/reply scenarios. It is the clue between SI channels and out ActiveMQ JMS queues:
 
jms.'outbound-gateway'(id: "jmsGateway", 'connection-factory': 'pooledJmsConnectionFactoryLocal', 'request-destination': "requestQueue", 'request-channel': "apiChannelRequest", 'reply-destination': "responseQueue",'reply-channel': 'apiChannelResponse')
 
In the slave application, there needs to be an inverse configuration using a JMS inbound gateway:
 
jms.'inbound-gateway'(id: 'jmsInbound', 'connection-factory': 'pooledJmsConnectionFactoryRemote', 'request-destination-name': 'QUEUE.REQUEST', 'request-channel': 'incomingRequest', 'reply-destination-name': 'QUEUE.RESPONSE')
 
The configuration snippet inside the slave application simply routes incoming messages to the incomingRequests channel. Notice that no reply channel has been specified in order to keep the reply channel which has been added by the master application in the message. In the next part of this article series we'll have a closer look at the slave application and how it is configured to invoke methods an Grails service beans.

[0] ActiveMQ Message Broker
[1] Spring Integration
[2] Enterprise Application Integration
[3] Spring Integration - Reference Manual
[4] Amazon: Enterprise Integration Patterns