Blogs

SpringSource Blog

Understanding AMQP, the protocol used by RabbitMQ

Peter Ledbrook

Update I changed the first paragraph to clarify the relationship between RabbitMQ and JMS.

RabbitMQ is a lightweight, reliable, scalable and portable message broker. But unlike many message brokers familiar to Java developers, it's not based on JMS. Instead, your applications communicate with it via a platform-neutral, wire-level protocol: the Advanced Message Queuing Protocol (AMQP). Fortunately there's already a Java client library and SpringSource is working on first class Spring and Grails integration – so don't worry about having to do low-level stuff to use RabbitMQ. You can even find AMQP client libraries that expose a JMS interface. But AMQP is sufficiently different in operation from JMS that it might cause headaches for Java developers that are used to the JMS model.

In order to ease the transition, I'll be looking in this post at the basic concepts that underpin AMQP along with three common usage scenarios. By the end, you will hopefully have a good enough understanding to configure RabbitMQ and use it via the APIs provided by Spring and Grails.

Exchanges, queues, and bindings

Like any messaging system, AMQP is a message protocol that deals with publishers and consumers. The publishers produce the messages, the consumers pick them up and process them. It's the job of the message broker (such as RabbitMQ) to ensure that the messages from a publisher go to the right consumers. In order to do that, the broker uses two key components: exchanges and queues. The following diagram shows how they connect a publisher to a consumer:

rabbit-basics

As you can see, the setup is pretty straightforward. A publisher sends messages to a named exchange and a consumer pulls messages from a queue (or the queue pushes them to the consumer depending on the configuration). Of course, the connections have to be made in the first place, so how do publishers and consumers discover each other? Via the name of the exchange. Usually, either the publisher or consumer creates the exchange with a given name and then makes that name public. How that publication happens depends on the circumstances, but one might put it in public API documentation or send it to known clients.

How are the messages routed from the exchange to the queue? Good question. First, the queue has to be attached to the given exchange. Typically, a consumer creates a queue and attaches it to an exchange at the same time. Second, messages received by the exchange have to be matched to the queue – a process called "binding".

To understand binding, it's useful to understand the structure of an AMQP message:

rabbit-message

The headers and properties of the message are basically key/value pairs. The difference between them is that headers are defined by the AMQP specification whereas properties can contain arbitrary, application-specific information. The actual message content is just a sequence of bytes, so if you want to pass text around in your messages, then you should standardise on an encoding. UTF-8 is a good bet. You can specify a content type and encoding in the message headers if you want, but that's apparently not particularly common.

What does this have to do with binding? One of the standard headers is called routing-key and it is this that the broker uses to match messages to queues. Each queue specifies a "binding key" and if that key matches the value of the routing-key header, the queue receives the message.

Things are slightly complicated by the concept of exchange types. The AMQP spec. defines the following four types:

Exchange type Behaviour
Direct The binding key must match the routing key exactly – no wildcard support.
Topic Same as Direct, but wildcards are allowed in the binding key. '#' matches zero or more dot-delimited words and '*' matches exactly one such word.
Fanout The routing and binding keys are ignored – all published messages go to all bound queues.
Headers

Update I corrected the information on wildcards, which work on the basis of dot-delimited words or terms.

For example, say a publisher sends a message with a routing key of "NYSE" to a topic exchange called "Stocks". If a consumer creates a queue attached to "Stocks" with a binding key of "#", "*", or "NYSE", then that consumer will get the message because all three binding keys match "NYSE". However, if the message is published to a direct exchange, then the consumer will not get the message if the binding key is "#" or "*" since those characters are treated as literals, not wildcards. Interestingly, "#.#" will also match "NYSE" despite the routing key not having a dot.

Now consider a message with a routing key of "NYSE.TECH.MSFT". What binding keys will match it given that the message is going to a topic exchange?

Binding key Match?
NYSE.TECH.MSFT Yes
# Yes
NYSE.# Yes
*.* No
NYSE.* No
NYSE.TECH.* Yes
NYSE.*.MSFT Yes

That's really all there is to it. Flexibility is provided by support for multiple consumers per queue and multiple queues per exchange. In fact, a single queue can even be bound to multiple exchanges. Now let's look at some of those scenarios.

RPC

An AMQP broker can act as an RPC mechanism between a client and a service. The general setup is like this, using a direct exchange:

rabbit-rpc

The general sequence goes:

  1. Client sends message to the queue, specifying: (a) a routing key that matches the service; and (b) the name of a queue to pick the response up from.
  2. Exchange passes the message to the service's queue ("ops_q" in this case).
  3. The queue pushes the message to the service, which then does some work and sends a response message back to the exchange, specifying a routing_key that matches the reply queue.
  4. The client picks the response message off the reply queue.

From the perspective of the client, the call could either be blocking or non-blocking. How easy it is to do one or the other, though, depends on the client library in use.

The key to the RPC scenario is making sure that the client and service are using the same exchange for the initial request and that the client knows what to specify for the routing key.

As for the reply queue, it's typically created by the client, which then populates the reply_to header appropriately. Also, although you can use a different exchange for the replies compared to the requests, it's much more common to use the same exchange for both requests and replies.

Pub(lish)/Sub(scribe)

JMS has the concept of topic queues that ensure that messages from a publisher go to all subscribers. You can easily achieve the same behaviour in AMQP by binding multiple queues to an exchange like so:

rabbit-pub-sub

Even better, the queues can filter which messages they receive via the binding key. If a consumer wants to receive all messages, then it can specify a binding key of "#" – the "match any number of words" wildcard. Rather confusingly for your average developer, "*" matches zero or one (dot-delimited) words as mentioned earlier.

Work distribution

Imagine you have an application that has a bunch of jobs that need executing. With AMQP, you can hook up multiple consumers such that each job goes to one, and only one, of those consumers. The publisher doesn't care which consumer does the work, just that the work is done. This is work distribution.

Configuring it is pretty straightforward, as shown in this diagram:

rabbit-work

So you have one queue bound to the exchange with multiple consumers sharing that queue. This setup guarantees that only one consumer processes a given message, no matter how many consumers there are.

Those are the three main usage patterns for AMQP brokers. Although I have described each individually, it's fairly common to combine them. For example, you could have multiple services sharing the same queue (work distribution) in the RPC pattern. It's really up to you to decide how to configure the exchanges and queues, and now you should have a good enough understanding to work out the appropriate setup for your situation.

If you want to go further into AMQP, then check out the specification itself, particularly the section on General Architecture. And to get started with RabbitMQ, just go to its website.

Similar Posts

Share this Post
  • Digg
  • Sphinn
  • del.icio.us
  • Facebook
  • Mixx
  • Google Bookmarks
  • DZone
  • LinkedIn
  • Slashdot
  • Technorati
  • TwitThis
 

25 responses


  1. So the pub/sub example uses temporary queues created by the clients, while the work distribution example uses a durable queue created by the exchange?


  2. Is there JTA/XA support?
    I.e. it's fairly common to have a distributed transaction in an event driven system that receives a message from one queue, does some DB manipulations and pushes another message to a second queue. Would this kind of use-case be possible with RabbitMQ in a XA manner?

    Erwin


  3. how cool is that!!! It was the Reliance MF, CEO's birthday yesterday :) Sundeep Sikka may u live to be a 103 http://tinyurl.com/DiversificationFund


  4. @Corby I'll try to get someone a bit more informed to answer your question, but I think that's a valid way of looking at it. I'm not sure about the distinction between temporary and durable though. Certainly in the pub/sub model the client owns the queue, where as in work distribution the clients don't.

    @Erwin I believe there will be transaction support with the Spring integration. I'll try to get confirmation of that.


  5. @Erwin The AMQP 0.8 spec defines a "dtx" class for distributed TXs, but as we understand it, RabbitMQ (which currently implements 0.8) does not support them. It does support local transactions for coordinating publish/ack commands from the client to the broker.

    Generally, SpringSource recommend against using XA even in JMS-based environments. Here's a JavaWorld article describes the alternatives:

    http://www.javaworld.com/javaworld/jw-01-2009/jw-01-spring-transactions.html


  6. @Corby – It should not matter whether it is durable or temporary queue. In fact in AMQP you "assert" queue is there and if it is not it gets created. With durable queues, though, you could use also persistent messages, so even if consumer is down, messages are stored. Anyway, whether you use AMQP for pub/sub or for load balancing messages – it does not matter whether it is durable or not.

    @Erwin – I would also encourage scenarios where XA is *not* used. The article mentioned by Peter does very good job in explaining different scenarios and approaches to "distributed" transactions. Often, designing services so that they are idempotent, and retrying messages scales far better than XA.

    @Peter – I would consider another example as well: Load balancing (and failover of) topic subscribers (in pub/sub scenario). With JMS there is no easy way to achieve this. ActiveMQ implements it via Virtual Destinations – a combination of topic and queue (http://activemq.apache.org/virtual-destinations.html). With AMQP it is easy. Just have multiple consumers (on same queue) in the pub/sub scenario… AMQP model of always consuming queues and having additional concept (exchanges) that is only responsible for routing messages to queues has clear advantage to JMS topics and queues.


  7. Finalmente chegou o que todos esperavam…
    A tecnologia da Tv Digital via internet chegou para lhe proporcionar assistir canais ao vivo em seu computador.
    452 canais de Tv Digital transmitidos 24h por dia Ao Vivo, sem taxas de assinatura ou mensalidades
    http://www.tvdigitalnopc.com.br


  8. Hi Peter,

    I am trying to run a simple 'fanout' app. I am not able to find out that how does a listener configures the binding of its queue to an existing 'fanout' exchange. As I understand, there are two ways to listen to a message:

    =============
    1. Implement MessageListener and override the onMessage method along with the configuration of SimpleMessageListenerContainer (where there is no mention of exchange & only the queueName can be set).

    2. Use the RabbitTemplate's receiveAndConvert() method along with the configuration of RabbitTemplate (where there is no mention of queue & only the exchange can be set).

    ============

    So the question is where and how do we bind the exchange (on which a published message has been sent) to the consumer/listener's queue?

    Thanks,
    Kshitiz


  9. Hi Peter,

    I am trying to run a simple 'fanout' app. I am not able to find out that how does a listener configures the binding of its queue to an existing 'fanout' exchange. As I understand, there are two ways to listen to a message:

    =============
    1. Implement MessageListener and override the onMessage method along with the configuration of SimpleMessageListenerContainer (where there is no mention of exchange & only the queueName can be set).

    2. Use the RabbitTemplate's receiveAndConvert() method along with the configuration of RabbitTemplate (where there is no mention of queue & only the exchange can be set).

    ============

    So the question is where and how do we bind the exchange (on which a published message has been sent) to the consumer/listener's queue?

    Thanks,
    Kshitiz


  10. @Kshitiz I've unapproved one of your posts since they seemed like duplicates to me. Let me know if you'd like it back for any reason.

    On to your question: this can be done via the RabbitAdmin class, which has a declareBinding() method. Simply create an instance of Binding (as described in section 1.1.4 of the user guide) and pass it to that method. Are you using Spring AMQP or the Grails plugin? If the latter, then you can inject the bean named "adm" into your bootstrap and declare the binding from there.

    Hope that helps.


  11. Thanks Peter,

    So we have to bind the exchange and queues at the broker level. does that mean we should know in advance that how many consumers are sitting out there and on which queue(s) are they listening?

    Kshitiz


  12. Thanks Peter,

    So we have to bind the exchange and queues at the broker level. does that mean we should know in advance that how many consumers are sitting out there and on which queue(s) are they listening?

    Kshitiz


  13. @Kshitiz You don't need to know that information in advance. You could either have multiple consumers using the same queue or have every consumer create its own queue (and do the binding). Note that these two approaches have very different behaviour.


  14. @Peter, the very statement you made(have every consumer create its own queue (and do the binding)) is what I wanted to ask about.

    My Post1 in this context was that:

    I am not able to find out that how does a listener configures the binding of its queue to an existing 'fanout' exchange. As I understand, there are two ways to listen to a message:

    =============
    1. Implement MessageListener and override the onMessage method along with the configuration of SimpleMessageListenerContainer (where there is no mention of exchange & only the queueName can be set).

    2. Use the RabbitTemplate's receiveAndConvert() method along with the configuration of RabbitTemplate (where there is no mention of queue & only the exchange can be set).

    ============

    So the question is where and how do we bind the exchange (on which a published message has been sent) to the consumer/listener's queue?

    Thanks,
    Kshitiz


  15. Peter,

    Please forgive me If I am not understanding it. I have to publish a message and that should be delivered to each subscriber for that message. Now, what are the steps I should take after creating an exchange and send a message to that exchange..

    1. Should I keep some queues (bounded to that exchange) handy so that a consumer binds to one of those handy queues and get the message?
    In this case, I should be knowing how many consumers are going to get that message and make those many queues handy at the broker level..
    Seems not correct as I can not know in advance that how many consumers will be consuming that message.

    OR

    2. Create a queue, and bind to that queue in the consumer's code
    but I don't know how to do that as I understand, there are two ways to listen to a message:
    Either implement MessageListener and override the onMessage method along with the configuration of SimpleMessageListenerContainer (where there is no mention of exchange & only the queueName can be set). Or use the RabbitTemplate's receiveAndConvert() method along with the configuration of RabbitTemplate (where there is no mention of queue & only the exchange can be set).

    Please let me know about the other ways I should do it..

    Thanks,
    Kshitiz


  16. You want to go with option 2. In fact, you probably want a non-durable, auto-delete queue for each consumer. This would give you the semantics of a consumer subscribing to a publisher/exchange. In condensed form, the code in the consumer would look something like:

    // Create a dedicated queue for this consumer
    Queue q = RabbitAdmin.declareQueue();
    
    // Make sure that our fanout exchange exists
    FanoutExchange ex = new FanoutExchange("my.fanout");
    RabbitAdmin.declareExchange(ex);
    
    // Bind this queue to our fanout exchange
    Binding b = new Binding(q, ex);
    RabbitAdmin.declareBinding(b);
    

  17. Thanks Peter,

    Now I understand it, but now I am stuck at another point. Messages are published correctly at the queue which exists exclusively for a particular consumer, and they are shown when I fire rabbitmqctl.bat list_queues, but they are not being consumed at the consumer end.

    My application is a war file deployed in Tomcat and there is no exception as such.

    I have implemented MessageListener and overriden the onMessage method along with the configuration of SimpleMessageListenerContainer.

    Do I need to acknowledge the message or something else?


  18. Hi Peter,

    Thanks, it's working now. The next milestone I have to achieve is a faster consumption of each message. The scenario is like this:

    I have to send emails/SMSes to different people after consuming the messages internally.
    It is very similar to the work distribution pattern advised by you. I want to just post a message and consume it faster by having multiple consumers ready. The message should be consumed only by one consumer…

    Should I make a no. of consumers using a shared rabbitmq queue in which each consumer consumes the message posted on the exchange, deletes it from the queue and then after passes that message to a new thread for processing ?

    But having a single queue might hamper the performance.

    Is there any other way which can cater my requirement ? what do you suggest?


  19. @Kshitiz At this point, you should consider using the RabbitMQ mailing list: discuss@rabbitmq.com.

    Anyway, I think you should go with a single queue with multiple consumers sharing it. That is the appropriate pattern for your use case. I would be surprised if you notice the RabbitMQ queue being the bottleneck. Note that all your consumers have to do is process the messages they pick up from the queue. RabbitMQ will ensure that each message is processed by one and only one consumer.


  20. Thanks for the directions Peter,

    Can you tell me that how can we ensure that the message gets deleted from RabbitMQ only after a consumer has processed it properly. Is there any way to let RabbitMQ know about it? any flag ? or any property of queues/exchanges or something like that ?

    Regards,
    Kshitiz


  21. Thanks, for that very good intro.
    I have a question : "Client sends message to the "queue", specifying: (a) a routing key that matches the service; and (b) the name of a queue to pick the response up from."

    Did you really mean queue or exchange ?!


  22. @Kshitiz I'm not sure. If the consumer ACKs the message, then RabbitMQ will treat that message as done. If you need more control, then transactions may do what you need: http://www.rabbitmq.com/faq.html#atomic-operations

    @MonTarO For RPC operations, the client effectively specifies a queue to send the message to. The reason "queue" is in quotes is because as with all message sending, the client communicates with the exchange. So technically the message goes to the exchange which routes it to the queue. From a conceptual standpoint, you can think of the message going straight to the queue.

    Hope that helps,

    Peter


  23. Clarifying the statement '# matches any number of dot-delimited words': to be exact, # matches zero or more words.


  24. Hi Peter,
    I am using Spring-AMQP for my Consumer application.

    The use case details are given below.
    Different clients/producers will send messages to a RabbitMQ Topic Exchange with different routing keys for each queue.

    My consumer application (using Spring-AMQP) has to Listen for messages from these different queues and process those messages. Each queue can be identified using a Binding Key pattern.

    The Consumer application only knows the name of Topic Exchange before running the application. But the list of queue names and their binding keys are dynamic (ie., only fetched at Runtime from database table)).

    I tried using 'SimpleMessageListenerContainer' as @Bean in my @Configuration class.
    But as you see from below code snippet, we have to ALWAYS set the QueueName in the configuration file. When we remove 'setQueueName()' from @Bean (and use it in application code), I get the error "java.lang.IllegalArgumentException: Queue name must not be null."

    @Bean
    public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory() );
    container.setQueueName("SOME QUEUE NAME");

    return container;
    }

    My question is, how to pass different queue names to 'SimpleMessageListenerContainer' from the application code(rather than the Spring @Configuration class), so that the consumer application can listen on different queues for incoming messages.

    Or is there any other way for this usecase requirement?


  25. Sam: I'm afraid I don't know, but I think you can create the SimpleMessageListenerContainer outside of the Spring configuration and start/stop it manually. The best idea though would be to ask on the Spring forums: http://forum.springsource.org/forumdisplay.php?f=74

8 trackbacks

Leave a Reply