Hello World using ‘kombu’ library and python

Reading time: 62 – 103 minutes

Some times schemas and snippets don’t need large descriptions. If you think this is not enough in this case tell me and I’m going to add explanations.

Using a python library called kombu as an abstraction to talk with AMQP broker we are going to develop different message routes setting each type of Exchange. As a backend I used RabbitMQ with default configuration.

AMQP schema using an exchange of type direct


Queue definition:

from kombu import Exchange, Queue

task_exchange = Exchange("msgs", type="direct")
queue_msg_1 = Queue("messages_1", task_exchange, routing_key = 'message_1')
queue_msg_2 = Queue("messages_2", task_exchange, routing_key = 'message_2')

The producer:

from __future__ import with_statement
from queues import task_exchange

from kombu.common import maybe_declare
from kombu.pools import producers

if __name__ == "__main__":
    from kombu import BrokerConnection

    connection = BrokerConnection("amqp://guest:guest@localhost:5672//")

    with producers[connection].acquire(block=True) as producer:
        maybe_declare(task_exchange, producer.channel)
        payload = {"type": "handshake", "content": "hello #1"}
        producer.publish(payload, exchange = 'msgs', serializer="pickle", routing_key = 'message_1')
        payload = {"type": "handshake", "content": "hello #2"}
        producer.publish(payload, exchange = 'msgs', serializer="pickle", routing_key = 'message_2')

One consumer:

from queues import queue_msg_1
from kombu.mixins import ConsumerMixin

class C(ConsumerMixin):
    def __init__(self, connection):
        self.connection = connection
    def get_consumers(self, Consumer, channel):
        return [Consumer( queue_msg_1, callbacks = [ self.on_message ])]
    def on_message(self, body, message):
        print ("RECEIVED MSG - body: %r" % (body,))
        print ("RECEIVED MSG - message: %r" % (message,))

if __name__ == "__main__":
    from kombu import BrokerConnection
    from kombu.utils.debug import setup_logging

    with BrokerConnection("amqp://guest:guest@localhost:5672//") as connection:
        except KeyboardInterrupt:
            print("bye bye")

AMQP schema using an exchange of type fanout


Queue definition:

from kombu import Exchange, Queue

task_exchange = Exchange("ce", type="fanout")
queue_events_db = Queue("events.db", task_exchange)
queue_events_notify = Queue("events.notify", task_exchange)

The producer:

from __future__ import with_statement
from queues import task_exchange

from kombu.common import maybe_declare
from kombu.pools import producers

if __name__ == "__main__":
    from kombu import BrokerConnection

    connection = BrokerConnection("amqp://guest:guest@localhost:5672//")

    with producers[connection].acquire(block=True) as producer:
        maybe_declare(task_exchange, producer.channel)
        payload = {"operation": "create", "content": "the object"}
        producer.publish(payload, exchange = 'ce', serializer="pickle", routing_key = 'user.write')

        payload = {"operation": "update", "content": "updated fields", "id": "id of the object"}
        producer.publish(payload, exchange = 'ce', serializer="pickle", routing_key = 'user.write')

One consumer:

from queues import queue_events_db
from kombu.mixins import ConsumerMixin

class C(ConsumerMixin):
    def __init__(self, connection):
        self.connection = connection
    def get_consumers(self, Consumer, channel):
        return [Consumer( queue_events_db, callbacks = [self.on_message])]
    def on_message(self, body, message):
        print ("save_db: RECEIVED MSG - body: %r" % (body,))
        print ("save_db: RECEIVED MSG - message: %r" % (message,))

if __name__ == "__main__":
    from kombu import BrokerConnection
    from kombu.utils.debug import setup_logging

    with BrokerConnection("amqp://guest:guest@localhost:5672//") as connection:
        except KeyboardInterrupt:
            print("bye bye")

AMQP schema using an exchange of type topic


Queue definition:

from kombu import Exchange, Queue

task_exchange = Exchange("user", type="topic")
queue_user_write = Queue("user.write", task_exchange, routing_key = 'user.write')
queue_user_read = Queue("user.read", task_exchange, routing_key = 'user.read')
queue_notify = Queue("notify", task_exchange, routing_key = 'user.#')

The producer:

from __future__ import with_statement
from queues import task_exchange

from kombu.common import maybe_declare
from kombu.pools import producers

if __name__ == "__main__":
    from kombu import BrokerConnection

    connection = BrokerConnection("amqp://guest:guest@localhost:5672//")

    with producers[connection].acquire(block=True) as producer:
        maybe_declare(task_exchange, producer.channel)
        payload = {"operation": "create", "content": "the object"}
        producer.publish(payload, exchange = 'user', serializer="pickle", routing_key = 'user.write')

        payload = {"operation": "update", "content": "updated fields", "id": "id of the object"}
        producer.publish(payload, exchange = 'user', serializer="pickle", routing_key = 'user.write')

        payload = {"operation": "delete", "id": "id of the object"}
        producer.publish(payload, exchange = 'user', serializer="pickle", routing_key = 'user.write')

        payload = {"operation": "read", "id": "id of the object"}
        producer.publish(payload, exchange = 'user', serializer="pickle", routing_key = 'user.read')

One consumer:

from queues import queue_events_db
from kombu.mixins import ConsumerMixin

class C(ConsumerMixin):
    def __init__(self, connection):
        self.connection = connection
    def get_consumers(self, Consumer, channel):
        return [Consumer( queue_events_db, callbacks = [self.on_message])]
    def on_message(self, body, message):
        print ("save_db: RECEIVED MSG - body: %r" % (body,))
        print ("save_db: RECEIVED MSG - message: %r" % (message,))

if __name__ == "__main__":
    from kombu import BrokerConnection
    from kombu.utils.debug import setup_logging

    with BrokerConnection("amqp://guest:guest@localhost:5672//") as connection:
        except KeyboardInterrupt:
            print("bye bye")

Simple queues

Kombu implements SimpleQueue and SimpleBuffer as simple solution for queues with exchange of type ‘direct’, with the same exchange name, routing key and queue name.


from kombu import BrokerConnection

connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
queue = connection.SimpleQueue("logs")

payload = { "severity":"info", "message":"this is just a log", "ts":"2013/09/30T15:10:23" }
queue.put(payload, serializer='pickle')



from kombu import BrokerConnection
from Queue import  Empty

connection = BrokerConnection("amqp://guest:guest@localhost:5672//")

queue = connection.SimpleQueue("logs")

while 1:
        message = queue.get(block=True, timeout=1)
        print message.payload
    except Empty:
    except KeyboardInterrupt:
    print message


The files

Download all example files: kombu-tests.tar.gz

5 thoughts on “Hello World using ‘kombu’ library and python”

  1. Richard Loo

    thanks for sharing, great post.

    I have one question, what is “maybe_declare” used for?

  2. It’s used to create, if it’s necessary, the entities of RabbitMQ: exchange, binding and queues. If the entities was created before it does nothing.

  3. I know this question does not make sense..But can I extend this to run on Django Web Server and create an IOS app that use API to send and receive message, more like chat application ?

  4. Rather look to use Django REST API and a tool such SwampDragon.

Comments are closed.