Message Driven POJOs!

Mark Fisher

Of all the new Spring 2.0 features and improvements, I must admit that Message-Driven POJOs are one of my personal favorites. I have a feeling that a lot of other Spring users will feel the same way.

Here I am providing a quick introduction. There is a lot more to show, and I will follow this up with other posts. For now though - this should provide you with enough information to get up and running with some truly POJO-based asynchronous JMS! I hope you are as excited about that as I am ;)

Prerequisites:

You will need the following JAR files on your classpath. I've also listed the versions that I am using (any spring-2.x version should be fine. I just dropped RC3 in there about 2 minutes ago in fact):

  • activemq-core-3.2.2.jar
  • concurrent-1.3.4.jar
  • geronimo-spec-j2ee-managment-1.0-rc4.jar
  • commmons-logging-1.0.4.jar
  • log4j-1.2.9.jar
  • jms-1.1.jar
  • spring-2.0-rc3.jar

Setup the Environment

First, we need to setup the environment. I am going to be using ActiveMQ, but the impact of changing a provider will be limited to modifications within this one file. I'm calling this file "shared-context.xml" since as you will see shortly, I am going to be importing these bean definitions for both sides of the JMS communication. Here are the "shared" bean definitions: the connection factory and two queues (one for the requests and one for replies):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans.xsd"
>

       
    <bean id="requestQueue" class="org.activemq.message.ActiveMQQueue">
        <constructor-arg value="requestQueue"/>
    </bean>
 
    <bean id="replyQueue" class="org.activemq.message.ActiveMQQueue">
        <constructor-arg value="replyQueue"/>
    </bean>
 
    <bean id="connectionFactory" class="org.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>
 
</beans>

As you see, I will be running ActiveMQ on tcp (I'm just running 'activemq' from the bin directory of the distribution). It is also possible to run embedded (with "vm://localhost" instead) - or you can run the main method of the org.activemq.broker.impl.Main class. If you want to grab the distribution, visit: http://www.activemq.org.

The Example Domain

I'm keeping things intentionally simple here - the main goal is to demonstrate how the pieces fit together. One of the most important things I want to point out though is that these classes in my "domain" are POJOs. You will see no Spring or JMS dependencies at all.

Ultimately, we will accept input from the user (a "name" via stdin) and this will be transformed into a "registration request" for some unspecified event. The message will be sent asynchronously, but we will have another queue to handle replies. The ReplyNotifier will then write the confirmation (or a "not confirmed" message) to stdout.

I'm creating all of these classes in a "blog.mdp" package by the way.
The first class is the RegistrationRequest:

package blog.mdp;

import java.io.Serializable;

public class RegistrationRequest implements Serializable {

    private static final long serialVersionUID = -6097635701783502292L;

    private String name;
       
    public RegistrationRequest(String name) {
        this.name = name;
    }
       
    public String getName() {
        return name;
    }
}

Next is the RegistrationReply:

package blog.mdp;

import java.io.Serializable;

public class RegistrationReply implements Serializable {

    private static final long serialVersionUID = -2119692510721245260L;

    private String name;
    private int confirmationId;
       
    public RegistrationReply(String name, int confirmationId) {
        this.name = name;
        this.confirmationId = confirmationId;
    }
       
    public String toString() {
        return (confirmationId >= 0)
                ? name + ": Confirmed #" + confirmationId
                : name + ": Not Confirmed";
    }
}

And the RegistrationService:

package blog.mdp;

import java.util.HashMap;
import java.util.Map;

public class RegistrationService {
       
    private Map registrations = new HashMap();
    private int counter = 100;

    public RegistrationReply processRequest(RegistrationRequest request) {
        int id = counter++;
        if (id % 5 == 0) {
            id = -1;
        }
        else {
            registrations.put(new Integer(id), request);
        }
        return new RegistrationReply(request.getName(), id);
    }
}

As you can see, this is merely providing an example. In reality, something would probably be done with the registrations map. Also, you see that 20% of registration attempts will be denied (given a -1 confirmationId) - not a very practical way to process registration requests, but it will provide some variety to the reply messages. Again, the important thing is that this service class has NO ties to Spring or JMS. Nevertheless, as you will see in just a moment, it is going to be handling the payload of messages sent via JMS. In other words, this RegistrationService IS the Message-Driven POJO.

Finally, create a simple class to log the reply messages:

package blog.mdp;

public class ReplyNotifier {

    public void notify(RegistrationReply reply) {
        System.out.println(reply);
    }
}

Configure the Message-Driven POJO

Now for the most important part. How do we use Spring to configure the POJO service so that it will receive JMS Messages? The answer comes in the form of 2 bean definitions (well, 3 if you count the service itself). In this next bean definition file, notice the "container" which actually receives the message and enables the use of an asynchronous listener. The container needs to be aware of the connectionFactory and the destination from which it receives messages. There are multiple types of containers available, but that is beyond the scope of this blog. Read the reference document for more information: Message Listener Containers.

The "listener" in this case is an instance of Spring's MessageListenerAdapter. It has a reference to the delegate (the POJO service) and the name of the handler method. In this case, we've also provided a defaultResponseDestination. For a void-returning method, you would obviously not need to do this. Also (and probably more likely in a production application), you can leave this out in favor of setting the "reply-to" property of the incoming JMS Message instead.

Now that we've discussed the various players, here are the bean definitions (I've named this file "server-context.xml"):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans.xsd"
>

       
    <import resource="shared-context.xml"/>
       
    <bean id="registrationService" class="blog.mdp.RegistrationService"/>
       
    <bean id="listener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="registrationService"/>
        <property name="defaultListenerMethod" value="processRequest"/>
        <property name="defaultResponseDestination" ref="replyQueue"/>
    </bean>
       
    <bean id="container" class="org.springframework.jms.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="messageListener" ref="listener"/>
        <property name="destination" ref="requestQueue"/>
    </bean>
       
</beans>

The last step here is to provide a bootstrap mechanism for running the service since this is a simple standalone example. I've just created a trivial main method to startup an ApplicationContext with the relevant bean definitions and then block:

package blog.mdp;

import java.io.IOException;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class RegistrationServiceRunner {
       
    public static void main(String[] args) throws IOException {
        new ClassPathXmlApplicationContext("/blog/mdp/server-context.xml");
        System.in.read();
    }
}

Configure the Client

On the "client" side, we will send the registration requests and log the replies. First, I will list the bean definitions. After the previous section, you should understand the role of the "container" and "listener". In this case, the delegate is the ReplyNotifier and since it has a void return type, it does not itself send replies (therefore, no 'defaultResponseDestination' property is present). I've named this file "client-context.xml":

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans.xsd"
>

       
    <import resource="shared-context.xml"/>
       
    <bean id="replyNotifier" class="blog.mdp.ReplyNotifier"/>
       
    <bean id="listener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="replyNotifier"/>
        <property name="defaultListenerMethod" value="notify"/>
    </bean>
       
    <bean id="container" class="org.springframework.jms.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="messageListener" ref="listener"/>
        <property name="destination" ref="replyQueue"/>
    </bean>
       
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="defaultDestination" ref="requestQueue"/>
    </bean>
       
</beans>

There is another bean defined there - an instance of Spring's "jmsTemplate". We will use that to send the registration request messages to its defaultDestination. With the simple convertAndSend(..) methods that Spring provides, the sending of JMS messages is trivial. I've created a class that takes user input and then sends the message by using this "jmsTemplate":

package blog.mdp;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;

public class RegistrationConsole {
       
    public static void main(String[] args) throws IOException {
        ApplicationContext context = new ClassPathXmlApplicationContext("/blog/mdp/client-context.xml");
        JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate");
               
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));      
               
        for (;;) {
            System.out.print("To Register, Enter Name: ");
            String name = reader.readLine();
            RegistrationRequest request = new RegistrationRequest(name);
            jmsTemplate.convertAndSend(request);
        }
    }
}

Running the Example

Now for the fun part. Startup the ActiveMQ broker (as briefly discussed in the "Setup the Environment" section). Run the main(..) method of the RegistrationServiceRunner. Run the main(..) method of the RegistrationConsole. Enter a name, and you should see a reply in that same console.

Further Resources

Hopefully, that's enough to give you an idea of what Spring's new Message-Driven POJO support is about. However, as I mentioned, there is quite a bit more involved - different container types, transaction support, configuration of consumer threading, pluggable message conversion strategies, etc. Stay tuned to the Interface21 Team Blog for more examples and information about those features. In the meantime, you can check out the Spring Reference Documentation on JMS. Also, be sure to visit the "Remoting and JMS" section of the Spring Support Forums as you begin to explore this exciting new functionality.

 

24 responses


  1. This is a great article. Just one question though. Is there a reason why you are using ActiveMQ 3.2.2 instead of 4.0.1? Is that because the 3.2 branch is more reliable or stable?


  2. Due to some API changes in Spring 2.0-M2 and beyond, ActiveMQ 4.0 doesn't work with Spring 2.0. There are fixes in ActiveMQ's 4.1 dev branch in conjunction with xbean-spring 2.5 that will make things work correctly. Rather than do an example with a pre-release version of ActiveMQ, I'm sure that Mark just wanted to use what you'd be using in production.


  3. ActiveMQ 4.0 and 4.0.1 work with Spring 2.0-RC2 just fine. The xbean stuff does not work, but you only need that if you're running a full blown broker. If you're doing what's being done in this blog entry (connecting to a broker running in a seperate process), then there are no problems.


  4. MDP's rule!! But can you explain why we need concurrent-1.3.4.jar and geronimo-spec-j2ee-management-1.0-rc4.jar on the classpath?


  5. Great article. Quick question. How do the SimpleMessageListenerContainers get started? Doesn't something have to call the start() method?


  6. Hi Chris

    How do the SimpleMessageListenerContainers get started? Doesn't something have to call the start() method?

    The SimpleMessageListenerContainer class extends the AbstractMessageListenerContainer base class that exposes a public boolean property called 'autoStart' which defaults to 'true'. This means that the infrastructure to receive messages will startup automatically (when running in a Spring IoC container). You can set the value of this property to 'false', in which case you will have to call the 'start()' method explicitly to get the ball rolling.

    Cheers
    Rick


  7. Hello.
    I was testing the application of the article above in Windows XP, with the dependences(.jar files, which downloaded from the FTP Maven repository) with the classpath placed in the Ant script file even so is running correctly however my Java sends an error message:

    Buildfile: build.xml

    compile:

    consumer:
    [echo] Running the consumer class
    [java] java.util.zip.ZipException: error in opening zip file
    [java] at java.util.zip.ZipFile.open(Native Method)
    [java] at java.util.zip.ZipFile.(ZipFile.java:203)
    [java] at java.util.zip.ZipFile.(ZipFile.java:234)
    [java] at org.apache.tools.ant.AntClassLoader.getResourceURL(AntClassLoader.java:919)
    [java] at org.apache.tools.ant.AntClassLoader.getResource(AntClassLoader.java:832)
    [java] at org.apache.log4j.helpers.Loader.getResource(Loader.java:78)
    [java] at org.apache.log4j.LogManager.(LogManager.java:94)
    [java] at org.apache.log4j.Logger.getLogger(Logger.java:94)
    [java] at org.apache.commons.logging.impl.Log4JLogger.getLogger(Log4JLogger.java:229)
    [java] at org.apache.commons.logging.impl.Log4JLogger.(Log4JLogger.java:65)
    [java] at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    [java] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
    [java] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
    [java] at java.lang.reflect.Constructor.newInstance(Constructor.java:494)
    [java] at org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:529)
    [java] at org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:235)
    [java] at org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:209)
    [java] at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:351)
    [java] at org.springframework.util.ClassUtils.(ClassUtils.java:58)
    [java] at org.springframework.core.io.DefaultResourceLoader.(DefaultResourceLoader.java:53)
    [java] at org.springframework.context.support.AbstractApplicationContext.(AbstractApplicationContext.java:178)
    [java] at org.springframework.context.support.AbstractRefreshableApplicationContext.(AbstractRefreshableApplicationContext.java:78)
    [java] at org.springframework.context.support.AbstractXmlApplicationContext.(AbstractXmlApplicationContext.java:58)
    [java] at org.springframework.context.support.ClassPathXmlApplicationContext.(ClassPathXmlApplicationContext.java:90)
    [java] at org.springframework.context.support.ClassPathXmlApplicationContext.(ClassPathXmlApplicationContext.java:77)
    [java] at org.springframework.context.support.ClassPathXmlApplicationContext.(ClassPathXmlApplicationContext.java:68)
    [java] at RegistrationServiceRunner.main(Unknown Source)
    ………………………

    The build.xml:

    -

    -
    This script requires Ant 1.6 or higher usage: ant -help display ant help screen ant help display this message ant clean delete the built directory ant consumer run consumer class which waits until a messages have been received ant producer run a producer class publishing a message ant jar creates a jar file containing all application

    -

    -

    -

    -

    -

    -
    -

    -

    -
    Running the consumer class

    -
    Running the producer class

    Thanks.


  8. Thanks for taking time to do this Mark. It was a much easier introduction than anything I found on the Spring site.


  9. Excellent article. I always had this query, and it is more relevant now with the Spring JMS.
    Where can we find the details about what can be set for different Spring beans? Unless someone goes and checks the JavaDoc APIs of each and every Spring JMS class, you won't even know what all options are possible. So, it would be very helpful if there is a detailed documetnation about the different properties that are possible for Spring JMS beans and the significance of different values for them. Something similar to what we have for Struts tags.
    Please reply.


  10. Excellent article.
    I always had this query, and it is more relevant now with the Spring JMS.
    Where can we find the details about what can be set for different Spring beans? Unless someone goes and checks the JavaDoc APIs of each and every Spring JMS class, you won't even know what all options are possible. So, it would be very helpful if there is a detailed documetnation about the different properties that are possible for Spring JMS beans and the significance of different values for them. Something similar to what we have for Struts tags.
    Please reply.


  11. I just got this example working with ActiveMQ-4.1.
    In order to make this happen, I had to give the correct package name (it has changed in v4.1) for the ActiveMQQueue and ActiveMQConnectionFactory in
    server-context.xml. Both of these classes are in the package org.apache.activemq. The Apache project has been busy putting their stamp on ActiveMQ. I figured this out after poring through the javadocs for v4.1. Truly, the brilliance of Spring shines through in this example. A big thank
    you to Rod, Juergen and the Spring team too!


  12. Hey gr8 explaination!! Can I get the source code for it?


  13. hey Viraf, I couldnt find ActiveMQQueue in org.apache.activemq. I am using v4.1.1 could u plz guide me to change share-context.xml


  14. For anyone wanting this to work with ActiveMQ 4.1.1 here are the updates…


  15. Sorry quoting doesn't work…

    Just replace org.activemq.message.ActiveMQQueue with org.apache.activemq.command.ActiveMQQueue
    and org.activemq.ActiveMQConnectionFactory with org.apache.activemq.spring.ActiveMQConnectionFactory


  16. I am using websphere mq and webspehere appserver to run my spring app. I have connection factory (type com.ibm.ejs.jms.JMSQueueConnectionFactoryHandle) configured in websphere appserver and a destination Q (type com.ibm.mq.jms.MQQueue) configured in websphere appserver.
    These are exposed as jndi's to my app.

    My config looks like this (doesn't work - am not sure what bits I am missing)

    How do I change the above configuration to expect the JMSQueueConnectionFactoryHandle and com.ibm.mq.jms.MQQueue types? Can you please guide?


  17. I have this example working now - by starting activeMQ from it's install folder. However, I now want to embed the start-up of it by setting . From my understanding, if I do this, all I'd have to do is start the server and then the client. And activeMQ should be started up automatically. This isn't working for me though. Maybe someone could explain what's going on in the background and what I'm doing wrong here .
    Thanks


  18. Great article !!

    Is there anyway to use MDPs to communicate with components of a application through firewalls?

    Thanks.


  19. MDP's rule!! But can you explain why we need concurrent-1.3.4.jar and geronimo-spec-j2ee-management-1.0-rc4.jar on the classpath

    Seems like it is required by ActiveMQ. I got this application running with ActiveMQ 4.1 without any problem, and I didn't included those two jar files. All the classes of those two jar's were inside the ActiveMQ 4.1 distribution jar file.


  20. how do I configure tomcat with activemq connection factory so that I can use jndi based factory to use the queues/topics?


  21. how do I configure tomcat with activemq connection factory so that I can use jndi based factory to use the queues/topics?

    The configuration is documented on the ActiveMQ site: http://activemq.apache.org/tomcat.html


  22. On November 5, 2007 at 7:43 pm, Mark Fisher said:

    The configuration is documented on the ActiveMQ site: http://activemq.apache.org/tomcat.html

    I am using activemq-4.2-SNAPSHOT… and it does not seem to work….. Is there a problem with this release.


  23. I modified your application so that it has a client and a server webapp. It works great.
    Then I modified it so that it works with JNDI, the server application does not seem to issue a reply whereas when I check the activemq mysql database, I see those messages sent from the client side. It does not seem to work… Thanks for suggestions.
    Here are my config files:
    web.xml:

    ActiveMQ JMS Connection Factory
    jms/ConnectionFactory
    org.apache.activemq.ActiveMQConnectionFactory
    Container

    ActiveMQ request Queue
    jms/requestQueue
    org.apache.activemq.command.ActiveMQQueue

    ActiveMQ reply Queue
    jms/replyQueue
    org.apache.activemq.command.ActiveMQQueue

    context.xml:

    shared-context-jndi.xml:

    org.apache.activemq.jndi.ActiveMQInitialContextFactory

    requestQueue
    replyQueue

    java:comp/env/jms/ConnectionFactory

    requestQueue

    replyQueue


  24. simple and great! kudos!!!

2 trackbacks

Leave a Reply

Quote selected text