Messaging in 5 minutes
The players
Simply put, there are three basic elements in messaging:
- Producer: Sends messages to a given topic or queue.
- Consumer: Receives messages from a given topic or queue.
- Broker: Manages the topics and queues and implements the different policies.
For more details, and different models, see
MsgArchitecture#Architectural_models
What's the difference between topics and queues?
http://activemq.apache.org/how-does-a-queue-compare-to-a-topic.html
What's a durable subscription?
http://activemq.apache.org/how-do-durable-queues-and-topics-work.html
What's the meaning of "last image caching" or "subscription recovery policy"?
http://activemq.apache.org/subscription-recovery-policy.html
Take into account that you'll have to include the next field in your header (when subscribing) in order to receive the messages:
- "activemq.retroactive" : "True"
How do I communicate with the broker?
There are several protocols available:
http://activemq.apache.org/protocols.html
Using Stomp is one of the easiest ways to communicate with the broker. You can find fully functional implementations of the protocol for C, C++, Python, Ruby, Perl , PHP, etc.
We highly recommend a Python implementation called stomp.py (
http://code.google.com/p/stomppy
).
Do you have testing brokers?
Yes, we have one testing brokers:
dev.msg.cern.ch
.
You can use it (for free).
What do MSG stands for?
MSG used to stand for "Messaging System for Grids" when we had dreams of a vendor neutral API layer and lots of enterprise-y stuff like that. Once we landed in reality and started to implement more tightly on top of
ActiveMQ and STOMP, it's just a shorthand for 'messaging'....
You can find more information about implementation of messagin in EGEE in:
stomp.py
stomp.py
is one of the supported libraries in MSG and its use is highly recommended. We have an RPM available of this (stomppy) in the
SA1 Package Repository
That's why we're going to show you some practical examples on how to use it.
The very basics
TBW
The Listener
In order to establish connection (as a consumer) you usually do something like this:
conn = stomp.Connection([(HOST,PORT)])
conn.set_listener('MyPersonalListener', MyListener())
conn.start()
conn.connect()
conn.subscribe(destination=TOPIC, ack='auto', headers=HEADERS)
The connection management has to be done using the associated listener.
A basic listener looks like this:
class MyListener(object):
def on_connecting(self, host_and_port):
print 'connecting : %s:%s'%host_and_port
def on_disconnected(self):
print "lost connection"
def on_message(self, headers, body):
self.__print_async("MESSAGE", headers, body)
def on_error(self, headers, body):
self.__print_async("ERROR", headers, body)
def on_receipt(self, headers, body):
self.__print_async("RECEIPT", headers, body)
def on_connected(self, headers, body):
pass
def __print_async(self, frame_type, headers, body):
print "\r \r",
print frame_type
for header_key in headers.keys():
print '%s: %s' % (header_key, headers[header_key])
print
print body
print '> ',
sys.stdout.flush()
Let's imagine that we wanna count the number of received messages. Because the reception of the messages is asynchronous (among other things) we'll have to modify our listener. Instead of just printing the message we should add something like:
def on_message(self, headers, body):
global N
self.__print_async("MESSAGE", headers, body)
N+=1
Creating a durable consumer
In order to create a durable consumer, you need to do two things:
- send a
client-id
header during the connect()
- send a
activemq.subscriptionName
header during the subscribe()
conn = stomp.Connection([(HOST,PORT)])
conn.set_listener('MyDurableListener', MyListener())
conn.start()
conn.connect(headers = {'client-id' : 'my-client-id-here'})
conn.subscribe(destination=TOPIC, ack='auto',
headers = {'activemq.subscriptionName' : 'my-subscription-name-here'})
If you subscribe to many topics in a single client, each subscription needs a different subscription name, but would share the same client ID.
Regarding the producer, you have to add the field
'persistent':'true'
to the header each time you send a message:
http://activemq.apache.org/stomp.html#Stomp-PersistenceofStompmessages
Setting message expiration
Even if we're using a durable consumer, we don't have to be interested in every message. After a consumer shutdown, doesn't make sense to flood it with useless messages.
The next situation is pretty usual:
- The durable consumer disconnects.
- Messages keep arriving to the topic (and they're moved to the "pending messages queue", flooding the server).
- The durable consumer reconnects after a few weeks and has to deal with thousands of messages!
Depending on the application the consumer may not be interested in, let's say, messages older than 24 hours. We can setup an expiration time in our publisher using the field
expires
.
# Expires in 5 seconds
EXPIRATION_TIME=5000
CURRENT_TIME=int(time.time()*1000)
conn.send('testing!',destination=TOPIC,headers={'persistent':'true','expires':CURRENT_TIME+EXPIRATION_TIME}, ack='auto')
Both CURRENT_TIME and EXPIRATION_TIME have to be in milliseconds.
Using selector to filter messages
Two important things to note:
- Stomp(version1.0) does not support filtering based on the body of the message.
- Stomp(version1.0) supports filtering based on headers.
In order to filter messages using selector
In the producer code
Add the appropriate header(s) with values during the
send()
.
conn = stomp.Connection([(HOST,PORT)])
conn.add_listener( MyListener())
conn.start()
conn.connect()
id = 0
conn.send('my-msg-here', destination=TOPIC, ack='auto',
headers = {'clientid' : id, 'groupid' : 'my-value-here'})
In the consumer code
Provide the proper filtering values for the headers using selector header during the
subscribe()
conn = stomp.Connection([(HOST,PORT)])
conn.add_listener( MyListener())
conn.start()
conn.connect()
conn.subscribe( destination=TOPIC, ack='auto',
headers = { 'selector' : "clientid = '%d' AND 'groupid' : 'my-value-here' " %id} )
Note that anything that follows ' selector ' : must be a valid SQL92 statement.
How to redirect the reply to a given topic or queue (reply-to)
Just add the field 'reply-to' to the header of your message:
conn.send('BODY',headers={'destination':'/topic/foo','reply-to':'/queue/bar'})
In the example above, the message is sent to '/topic/foo' and we expect a reply from the consumer receiving it. In order to let him know where to reply we add the field 'reply-to' to the header. The reply, in this case, will be sent to '/queue/bar'.
My client doesn't reconnect automatically after a broker shutdown
In order to be able to reconnect after a broker shutdown just modify your listener with:
def on_disconnected(self):
global conn
print 'lost connection'
print 'trying to reconnect'
conn.start()
conn.connect(wait=True)
conn.subscribe(destination=TOPIC, ack='auto', headers=HEADERS)
Apart from that, check your main loop or whatever your software was doing when the disconnection took place.
--
JoseDana - 23 Apr 2009