This document describes the current stable version of Kombu (5.3). For development docs, go here.

Simple Interface

kombu.simple is a simple interface to AMQP queueing. It is only slightly different from the Queue class in the Python Standard Library, which makes it excellent for users with basic messaging needs.

Instead of defining exchanges and queues, the simple classes only requires two arguments, a connection channel and a name. The name is used as the queue, exchange and routing key. If the need arises, you can specify a Queue as the name argument instead.

In addition, the Connection comes with shortcuts to create simple queues using the current connection:

>>> queue = connection.SimpleQueue('myqueue')
>>> # ... do something with queue
>>> queue.close()

This is equivalent to:

>>> from kombu.simple import SimpleBuffer

>>> channel = connection.channel()
>>> queue = SimpleBuffer(channel, 'mybuffer')
>>> # ... do something with queue
>>> channel.close()
>>> queue.close()

Sending and receiving messages

The simple interface defines two classes; SimpleQueue, and SimpleBuffer. The former is used for persistent messages, and the latter is used for transient, buffer-like queues. They both have the same interface, so you can use them interchangeably.

Here is an example using the SimpleQueue class to produce and consume logging messages:

import socket
import datetime
from time import time
from kombu import Connection


class Logger:

    def __init__(self, connection, queue_name='log_queue',
            serializer='json', compression=None):
        self.queue = connection.SimpleQueue(queue_name)
        self.serializer = serializer
        self.compression = compression

    def log(self, message, level='INFO', context={}):
        self.queue.put({'message': message,
                        'level': level,
                        'context': context,
                        'hostname': socket.gethostname(),
                        'timestamp': time()},
                        serializer=self.serializer,
                        compression=self.compression)

    def process(self, callback, n=1, timeout=1):
        for i in xrange(n):
            log_message = self.queue.get(block=True, timeout=1)
            entry = log_message.payload # deserialized data.
            callback(entry)
            log_message.ack() # remove message from queue

    def close(self):
        self.queue.close()


if __name__ == '__main__':
    from contextlib import closing

    with Connection('amqp://guest:guest@localhost:5672//') as conn:
        with closing(Logger(conn)) as logger:

            # Send message
            logger.log('Error happened while encoding video',
                        level='ERROR',
                        context={'filename': 'cutekitten.mpg'})

            # Consume and process message

            # This is the callback called when a log message is
            # received.
            def dump_entry(entry):
                date = datetime.datetime.fromtimestamp(entry['timestamp'])
                print('[%s %s %s] %s %r' % (date,
                                            entry['hostname'],
                                            entry['level'],
                                            entry['message'],
                                            entry['context']))

            # Process a single message using the callback above.
            logger.process(dump_entry, n=1)