Pivotal has a new connector available that lets you send and consume messages using Spring’s battle tested JmsTemplate and the JMS API with RabbitMQ as your broker.
In case you’re not familiar, RabbitMQ is an AMQP broker. You can download it on just about any platform. Some even have installers (like brew install rabbitmq
on the Mac, and sudo apt-get install rabbitmq-server
on Ubuntu Linux).
If you like the JMS API, this connector lets you shift to using RabbitMQ’s rock solid message broker without having to alter the rest of your application.
But this connector also affords you the option to seamlessly migrate towards AMQP messaging with Rabbit by first switching to RabbitMQ as a broker, and later updating your code using Spring AMQP.
If you’re new to Spring JMS and JmsTemplate
, please read the rest of this blog to see how simple it makes JMS messaging.
Note: The RabbitMQ JMS connector is focused on supporting the JMS 1.1. spec, and is compliant with the most important elements of that specification. Click to read the complete details regarding the RabbitMQ JMS connector implementation.
This example of RabbitMQ JMS starts with building a stock trade listener, as shown below:
package com.rabbitmq.jms.sample;
import javax.jms.ConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
@Configuration
public class StockConsumer {
private static final Log log = LogFactory.getLog(StockConsumer.class);
@Bean
public DefaultMessageListenerContainer jmsListener(ConnectionFactory connectionFactory) {
DefaultMessageListenerContainer jmsListener = new DefaultMessageListenerContainer();
jmsListener.setConnectionFactory(connectionFactory);
jmsListener.setDestinationName("rabbit-trader-channel");
jmsListener.setPubSubDomain(true);
MessageListenerAdapter adapter = new MessageListenerAdapter(new Receiver());
adapter.setDefaultListenerMethod("receive");
jmsListener.setMessageListener(adapter);
return jmsListener;
}
protected static class Receiver {
public void receive(String message) {
log.info("Received " + message);
}
}
}
The StockConsumer
class contains Spring bean definitions that will be added to the application context at startup. The jmsListener()
method creates an object that subscribes to JMS messages and responds accordingly.
It includes:
DefaultMessageListenerContainer
– this container will respond asynchronously to messages. Observe how the method signature defines it’s expectation to be injected with a javax.jms.ConnectionFactory. RabbitMQ’s JMS connector implements this spec interface.- The destination name is rabbit-trader-channel.
- pubSubDomain has been switched to true. That means it will consume messages in a topic-oriented fashion, allowing other consumers to receive copies as well.
- The
Receiver
class doesn’t implement any JMS spec interfaces. But that’s okay! It is wrapped with Spring’sMessageListenerAdapter
. This lets you create message driven POJOs.
We just looked at the consumption of message. Now let’s proceed to writing a stock quote producer. You can find a simple chunk of code here:
package com.rabbitmq.jms.sample;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import com.rabbitmq.jms.admin.RMQConnectionFactory;
@EnableAutoConfiguration
@EnableScheduling
@Configuration
@ComponentScan
public class StockQuoter {
private static final Log log = LogFactory.getLog(StockQuoter.class);
private List<String> stocks = new ArrayList<String>();
private Map<String, Double> lastPrice = new HashMap<String, Double>();
{
stocks.add("AAPL");
stocks.add("GD");
stocks.add("BRK.B");
lastPrice.put("AAPL", 494.64);
lastPrice.put("GD", 86.74);
lastPrice.put("BRK.B", 113.59);
}
@Autowired
JmsTemplate jmsTemplate;
@Bean
ConnectionFactory connectionFactory() {
return new RMQConnectionFactory();
}
@Scheduled(fixedRate = 5000L) // every 5 seconds
public void publishQuote() {
// Pick a random stock symbol
Collections.shuffle(stocks);
final String symbol = stocks.get(0);
// Toss a coin and decide if the price goes...
if (RandomUtils.nextBoolean()) {
// ...up by a random 0-10%
lastPrice.put(symbol, new Double(Math.round(lastPrice.get(symbol) * (1 + RandomUtils.nextInt(10)/100.0) * 100) / 100));
} else {
// ...or down by a similar random amount
lastPrice.put(symbol, new Double(Math.round(lastPrice.get(symbol) * (1 - RandomUtils.nextInt(10)/100.0) * 100) / 100));
}
// Log new price locally
log.info("Quote..." + symbol + " is now " + lastPrice.get(symbol));
MessageCreator messageCreator = new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage("Quote..." + symbol + " is now " + lastPrice.get(symbol));
}
};
jmsTemplate.send("rabbit-trader-channel", messageCreator);
}
public static void main(String[] args) {
ApplicationContext ctx = SpringApplication.run(StockQuoter.class, args);
log.info("connectionFactory => " + ctx.getBean("connectionFactory"));
}
}
This stock quoter has the following key parts.
JmsTemplate
is a key component used to publish JMS messages. It removes the need for you to deal with JMS’s busy API and instead focus on sending messages.- By using the
@EnableAutoConfiguration
annotation, Spring Boot is signaled to automatically create aJmsTemplate
when it detects spring-jms on your classpath. - The central piece of this demo is the
RMQConnectionFactory
. This connection factory empowersJmsTemplate
to connect to RabbitMQ and use it as a JMS broker. Spring Boot automatically plugs it into theJmsTemplate
so you don’t have to lift a finger.
@Bean
ConnectionFactory connectionFactory() {
return new RMQConnectionFactory();
}
- This stock quoting demonstration app uses Spring Scheduling to generate a new quote every five seconds. It starts with a list of stocks and an initial price, then randomly raises or lowers the price by anywhere from 0 – 10% (at random). The key method which publishes the quote can be found in this line:
jmsTemplate.send("rabbit-trader-channel", messageCreator);
Note: For demonstration purposes, the produce and the consumer run inside the same JVM as one application. This is not a requirement. You can easily have each part running in a separate application.
You launch the application by typing:
$ mvn clean package && java -jar target/trader-1.0.4-SNAPSHOT.jar
You should then expect output similar to this:
. ____ _ __ _ _
/ / ___'_ __ _ _(_)_ __ __ _
( ( )___ | '_ | '_| | '_ / _` |
/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |___, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v0.5.0.BUILD-SNAPSHOT)
2013-09-16 15:19:59.158 INFO 68774 --- [ckQuoter.main()] com.rabbitmq.jms.sample.StockQuoter : Starting StockQuoter on retina with PID 68774 (/Users/gturnquist/src/trader/target/classes started by gturnquist)
2013-09-16 15:19:59.193 INFO 68774 --- [ckQuoter.main()] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@22df3d0f: startup date [Mon Sep 16 15:19:59 CDT 2013]; root of context hierarchy
2013-09-16 15:19:59.449 INFO 68774 --- [ckQuoter.main()] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.scheduling.annotation.SchedulingConfiguration' of type [class org.springframework.scheduling.annotation.SchedulingConfiguration$$EnhancerByCGLIB$$62f4010e] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2013-09-16 15:19:59.602 INFO 68774 --- [ckQuoter.main()] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647
2013-09-16 15:19:59.679 INFO 68774 --- [ckQuoter.main()] com.rabbitmq.jms.sample.StockQuoter : Started StockQuoter in 0.727 seconds
2013-09-16 15:19:59.680 INFO 68774 --- [pool-3-thread-1] com.rabbitmq.jms.sample.StockQuoter : Quote...AAPL is now 464.0
2013-09-16 15:19:59.680 INFO 68774 --- [ckQuoter.main()] com.rabbitmq.jms.sample.StockQuoter : connectionFactory => com.rabbitmq.jms.admin.RMQConnectionFactory@68d69cac
2013-09-16 15:19:59.801 INFO 68774 --- [ jmsListener-1] com.rabbitmq.jms.sample.StockConsumer : Received Quote...AAPL is now 464.0
2013-09-16 15:20:04.678 INFO 68774 --- [pool-3-thread-1] com.rabbitmq.jms.sample.StockQuoter : Quote...BRK.B is now 113.0
2013-09-16 15:20:04.779 INFO 68774 --- [ jmsListener-1] com.rabbitmq.jms.sample.StockConsumer : Received Quote...BRK.B is now 113.0
2013-09-16 15:20:09.678 INFO 68774 --- [pool-3-thread-1] com.rabbitmq.jms.sample.StockQuoter : Quote...GD is now 91.0
2013-09-16 15:20:09.753 INFO 68774 --- [ jmsListener-1] com.rabbitmq.jms.sample.StockConsumer : Received Quote...GD is now 91.0
You can see that it prints out the details of connectionFactory
, revealing it to be an RMQConnectionFactory
. Then as it produces stock quotes in one part, they are transmitted through RabbitMQ and read from the other end.