Messaging in 5 minutes

The players

Simply put, there are three basic elements in messaging:

  • Producer: Sends messages to a given topic or queue.
  • Consumer: Receives messages from a given topic or queue.
  • Broker: Manages the topics and queues and implements the different policies.

For more details, and different models, see MsgArchitecture#Architectural_models

What's the difference between topics and queues?

http://activemq.apache.org/how-does-a-queue-compare-to-a-topic.html

What's a durable subscription?

http://activemq.apache.org/how-do-durable-queues-and-topics-work.html

What's the meaning of "last image caching" or "subscription recovery policy"?

http://activemq.apache.org/subscription-recovery-policy.html

Take into account that you'll have to include the next field in your header (when subscribing) in order to receive the messages:

  • "activemq.retroactive" : "True"

How do I communicate with the broker?

There are several protocols available:

http://activemq.apache.org/protocols.html

Using Stomp is one of the easiest ways to communicate with the broker. You can find fully functional implementations of the protocol for C, C++, Python, Ruby, Perl , PHP, etc.

We highly recommend a Python implementation called stomp.py (http://code.google.com/p/stomppy).

Do you have testing brokers?

Yes, we have one testing brokers: dev.msg.cern.ch.

You can use it (for free).

What do MSG stands for?

MSG used to stand for "Messaging System for Grids" when we had dreams of a vendor neutral API layer and lots of enterprise-y stuff like that. Once we landed in reality and started to implement more tightly on top of ActiveMQ and STOMP, it's just a shorthand for 'messaging'....

You can find more information about implementation of messagin in EGEE in:

stomp.py

stomp.py is one of the supported libraries in MSG and its use is highly recommended. We have an RPM available of this (stomppy) in the SA1 Package Repository

That's why we're going to show you some practical examples on how to use it.

The very basics

TBW

The Listener

In order to establish connection (as a consumer) you usually do something like this:

conn = stomp.Connection([(HOST,PORT)])
conn.set_listener('MyPersonalListener', MyListener())
conn.start()
conn.connect()       
conn.subscribe(destination=TOPIC, ack='auto', headers=HEADERS)

The connection management has to be done using the associated listener.

A basic listener looks like this:

class MyListener(object):
    def on_connecting(self, host_and_port):
        print 'connecting : %s:%s'%host_and_port

    def on_disconnected(self):
        print "lost connection"

    def on_message(self, headers, body):
        self.__print_async("MESSAGE", headers, body)

    def on_error(self, headers, body):
        self.__print_async("ERROR", headers, body)

    def on_receipt(self, headers, body):
        self.__print_async("RECEIPT", headers, body)

    def on_connected(self, headers, body):
        pass
    
    def __print_async(self, frame_type, headers, body):
        print "\r  \r",
        print frame_type
        
        for header_key in headers.keys():
            print '%s: %s' % (header_key, headers[header_key])
            
        print
        print body
        print '> ',
        sys.stdout.flush()

Let's imagine that we wanna count the number of received messages. Because the reception of the messages is asynchronous (among other things) we'll have to modify our listener. Instead of just printing the message we should add something like:

def on_message(self, headers, body):
    global N
    self.__print_async("MESSAGE", headers, body)
    N+=1

Creating a durable consumer

In order to create a durable consumer, you need to do two things:

  • send a client-id header during the connect()
  • send a activemq.subscriptionName header during the subscribe()

conn = stomp.Connection([(HOST,PORT)])
conn.set_listener('MyDurableListener', MyListener())
conn.start()
conn.connect(headers = {'client-id' : 'my-client-id-here'})       
conn.subscribe(destination=TOPIC, ack='auto', 
               headers = {'activemq.subscriptionName' : 'my-subscription-name-here'})

If you subscribe to many topics in a single client, each subscription needs a different subscription name, but would share the same client ID.

Regarding the producer, you have to add the field 'persistent':'true' to the header each time you send a message:

http://activemq.apache.org/stomp.html#Stomp-PersistenceofStompmessages

Setting message expiration

Even if we're using a durable consumer, we don't have to be interested in every message. After a consumer shutdown, doesn't make sense to flood it with useless messages.

The next situation is pretty usual:

  • The durable consumer disconnects.
  • Messages keep arriving to the topic (and they're moved to the "pending messages queue", flooding the server).
  • The durable consumer reconnects after a few weeks and has to deal with thousands of messages!

Depending on the application the consumer may not be interested in, let's say, messages older than 24 hours. We can setup an expiration time in our publisher using the field expires.

# Expires in 5 seconds
EXPIRATION_TIME=5000
CURRENT_TIME=int(time.time()*1000)
conn.send('testing!',destination=TOPIC,headers={'persistent':'true','expires':CURRENT_TIME+EXPIRATION_TIME}, ack='auto')

Both CURRENT_TIME and EXPIRATION_TIME have to be in milliseconds.

Using selector to filter messages

Two important things to note:
  • Stomp(version1.0) does not support filtering based on the body of the message.
  • Stomp(version1.0) supports filtering based on headers.

In order to filter messages using selector

In the producer code

Add the appropriate header(s) with values during the send().

conn = stomp.Connection([(HOST,PORT)])
conn.add_listener( MyListener())
conn.start()
conn.connect()
id = 0
conn.send('my-msg-here', destination=TOPIC,  ack='auto', 
                   headers = {'clientid'  :  id, 'groupid' : 'my-value-here'})

In the consumer code

Provide the proper filtering values for the headers using selector header during the subscribe()

conn = stomp.Connection([(HOST,PORT)])
conn.add_listener( MyListener())
conn.start()
conn.connect()       
conn.subscribe( destination=TOPIC, ack='auto', 
               headers = { 'selector' : "clientid  = '%d'  AND 'groupid' : 'my-value-here' " %id} )

Note that anything that follows ' selector ' : must be a valid SQL92 statement.

How to redirect the reply to a given topic or queue (reply-to)

Just add the field 'reply-to' to the header of your message:

conn.send('BODY',headers={'destination':'/topic/foo','reply-to':'/queue/bar'})

In the example above, the message is sent to '/topic/foo' and we expect a reply from the consumer receiving it. In order to let him know where to reply we add the field 'reply-to' to the header. The reply, in this case, will be sent to '/queue/bar'.

My client doesn't reconnect automatically after a broker shutdown

In order to be able to reconnect after a broker shutdown just modify your listener with:

def on_disconnected(self):
    global conn

    print 'lost connection'       
    print 'trying to reconnect'
 
    conn.start()
    conn.connect(wait=True)    
    conn.subscribe(destination=TOPIC, ack='auto', headers=HEADERS)          

Apart from that, check your main loop or whatever your software was doing when the disconnection took place.

-- JoseDana - 23 Apr 2009

Edit | Attach | Watch | Print version | History: r19 < r18 < r17 < r16 < r15 | Backlinks | Raw View | WYSIWYG | More topic actions
Topic revision: r19 - 2011-03-24 - LionelCons
 
    • Cern Search Icon Cern Search
    • TWiki Search Icon TWiki Search
    • Google Search Icon Google Search

    EGEE All webs login

This site is powered by the TWiki collaboration platform Powered by Perl This site is powered by the TWiki collaboration platformCopyright &© by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
Ideas, requests, problems regarding TWiki? Ask a support question or Send feedback