Reading time: 4 – 7 minutes
Some times schemas and snippets don’t need large descriptions. If you think this is not enough in this case tell me and I’m going to add explanations.
Using a python library called kombu as an abstraction to talk with AMQP broker we are going to develop different message routes setting each type of Exchange. As a backend I used RabbitMQ with default configuration.
AMQP schema using an exchange of type direct

Queue definition:
from kombu import Exchange, Queue
task_exchange = Exchange("msgs", type="direct")
queue_msg_1 = Queue("messages_1", task_exchange, routing_key = 'message_1')
queue_msg_2 = Queue("messages_2", task_exchange, routing_key = 'message_2')
The producer:
from __future__ import with_statement
from queues import task_exchange
from kombu.common import maybe_declare
from kombu.pools import producers
if __name__ == "__main__":
from kombu import BrokerConnection
connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
with producers[connection].acquire(block=True) as producer:
maybe_declare(task_exchange, producer.channel)
payload = {"type": "handshake", "content": "hello #1"}
producer.publish(payload, exchange = 'msgs', serializer="pickle", routing_key = 'message_1')
payload = {"type": "handshake", "content": "hello #2"}
producer.publish(payload, exchange = 'msgs', serializer="pickle", routing_key = 'message_2')
One consumer:
from queues import queue_msg_1
from kombu.mixins import ConsumerMixin
class C(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
return
def get_consumers(self, Consumer, channel):
return [Consumer( queue_msg_1, callbacks = [ self.on_message ])]
def on_message(self, body, message):
print ("RECEIVED MSG - body: %r" % (body,))
print ("RECEIVED MSG - message: %r" % (message,))
message.ack()
return
if __name__ == "__main__":
from kombu import BrokerConnection
from kombu.utils.debug import setup_logging
setup_logging(loglevel="DEBUG")
with BrokerConnection("amqp://guest:guest@localhost:5672//") as connection:
try:
C(connection).run()
except KeyboardInterrupt:
print("bye bye")
AMQP schema using an exchange of type fanout

Queue definition:
from kombu import Exchange, Queue
task_exchange = Exchange("ce", type="fanout")
queue_events_db = Queue("events.db", task_exchange)
queue_events_notify = Queue("events.notify", task_exchange)
The producer:
from __future__ import with_statement
from queues import task_exchange
from kombu.common import maybe_declare
from kombu.pools import producers
if __name__ == "__main__":
from kombu import BrokerConnection
connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
with producers[connection].acquire(block=True) as producer:
maybe_declare(task_exchange, producer.channel)
payload = {"operation": "create", "content": "the object"}
producer.publish(payload, exchange = 'ce', serializer="pickle", routing_key = 'user.write')
payload = {"operation": "update", "content": "updated fields", "id": "id of the object"}
producer.publish(payload, exchange = 'ce', serializer="pickle", routing_key = 'user.write')
One consumer:
from queues import queue_events_db
from kombu.mixins import ConsumerMixin
class C(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
return
def get_consumers(self, Consumer, channel):
return [Consumer( queue_events_db, callbacks = [self.on_message])]
def on_message(self, body, message):
print ("save_db: RECEIVED MSG - body: %r" % (body,))
print ("save_db: RECEIVED MSG - message: %r" % (message,))
message.ack()
return
if __name__ == "__main__":
from kombu import BrokerConnection
from kombu.utils.debug import setup_logging
setup_logging(loglevel="DEBUG")
with BrokerConnection("amqp://guest:guest@localhost:5672//") as connection:
try:
C(connection).run()
except KeyboardInterrupt:
print("bye bye")
AMQP schema using an exchange of type topic

Queue definition:
from kombu import Exchange, Queue
task_exchange = Exchange("user", type="topic")
queue_user_write = Queue("user.write", task_exchange, routing_key = 'user.write')
queue_user_read = Queue("user.read", task_exchange, routing_key = 'user.read')
queue_notify = Queue("notify", task_exchange, routing_key = 'user.#')
The producer:
from __future__ import with_statement
from queues import task_exchange
from kombu.common import maybe_declare
from kombu.pools import producers
if __name__ == "__main__":
from kombu import BrokerConnection
connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
with producers[connection].acquire(block=True) as producer:
maybe_declare(task_exchange, producer.channel)
payload = {"operation": "create", "content": "the object"}
producer.publish(payload, exchange = 'user', serializer="pickle", routing_key = 'user.write')
payload = {"operation": "update", "content": "updated fields", "id": "id of the object"}
producer.publish(payload, exchange = 'user', serializer="pickle", routing_key = 'user.write')
payload = {"operation": "delete", "id": "id of the object"}
producer.publish(payload, exchange = 'user', serializer="pickle", routing_key = 'user.write')
payload = {"operation": "read", "id": "id of the object"}
producer.publish(payload, exchange = 'user', serializer="pickle", routing_key = 'user.read')
One consumer:
from queues import queue_events_db
from kombu.mixins import ConsumerMixin
class C(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
return
def get_consumers(self, Consumer, channel):
return [Consumer( queue_events_db, callbacks = [self.on_message])]
def on_message(self, body, message):
print ("save_db: RECEIVED MSG - body: %r" % (body,))
print ("save_db: RECEIVED MSG - message: %r" % (message,))
message.ack()
return
if __name__ == "__main__":
from kombu import BrokerConnection
from kombu.utils.debug import setup_logging
setup_logging(loglevel="DEBUG")
with BrokerConnection("amqp://guest:guest@localhost:5672//") as connection:
try:
C(connection).run()
except KeyboardInterrupt:
print("bye bye")
Simple queues
Kombu implements SimpleQueue and SimpleBuffer as simple solution for queues with exchange of type ‘direct’, with the same exchange name, routing key and queue name.
Pusher:
from kombu import BrokerConnection
connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
queue = connection.SimpleQueue("logs")
payload = { "severity":"info", "message":"this is just a log", "ts":"2013/09/30T15:10:23" }
queue.put(payload, serializer='pickle')
queue.close()
Getter:
from kombu import BrokerConnection
from Queue import Empty
connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
queue = connection.SimpleQueue("logs")
while 1:
try:
message = queue.get(block=True, timeout=1)
print message.payload
message.ack()
except Empty:
pass
except KeyboardInterrupt:
break
print message
queue.close()
The files
Download all example files: kombu-tests.tar.gz