This document describes the current stable version of Kombu (4.2). For development docs, go here.

Common Utilities - kombu.common

Common Utilities.

class kombu.common.Broadcast(name=None, queue=None, auto_delete=True, exchange=None, alias=None, **kwargs)[source]

Broadcast queue.

Convenience class used to define broadcast queues.

Every queue instance will have a unique name, and both the queue and exchange is configured with auto deletion.

  • name (str) – This is used as the name of the exchange.
  • queue (str) – By default a unique id is used for the queue name for every consumer. You can specify a custom queue name here.
  • **kwargs (Any) – See Queue for a list of additional keyword arguments supported.
attrs = ((u'name', None), (u'exchange', None), (u'routing_key', None), (u'queue_arguments', None), (u'binding_arguments', None), (u'consumer_arguments', None), (u'durable', <type 'bool'>), (u'exclusive', <type 'bool'>), (u'auto_delete', <type 'bool'>), (u'no_ack', None), (u'alias', None), (u'bindings', <type 'list'>), (u'no_declare', <type 'bool'>), (u'expires', <type 'float'>), (u'message_ttl', <type 'float'>), (u'max_length', <type 'int'>), (u'max_length_bytes', <type 'int'>), (u'max_priority', <type 'int'>), (u'queue', None))
kombu.common.maybe_declare(entity, channel=None, retry=False, **retry_policy)[source]

Declare entity (cached).

kombu.common.uuid(_uuid=<function uuid4>)[source]

Generate unique id in UUID4 format.

See also

For now this is provided by uuid.uuid4().

kombu.common.itermessages(conn, channel, queue, limit=1, timeout=None, callbacks=None, **kwargs)[source]

Iterator over messages.

kombu.common.send_reply(exchange, req, msg, producer=None, retry=False, retry_policy=None, **props)[source]

Send reply for request.

  • exchange (kombu.Exchange, str) – Reply exchange
  • req (Message) – Original request, a message with a reply_to property.
  • producer (kombu.Producer) – Producer instance
  • retry (bool) – If true must retry according to the reply_policy argument.
  • retry_policy (Dict) – Retry settings.
  • **props (Any) – Extra properties.
kombu.common.collect_replies(conn, channel, queue, *args, **kwargs)[source]

Generator collecting replies from queue.

kombu.common.insured(pool, fun, args, kwargs, errback=None, on_revive=None, **opts)[source]

Function wrapper to handle connection errors.

Ensures function performing broker commands completes despite intermittent connection failures.

kombu.common.drain_consumer(consumer, limit=1, timeout=None, callbacks=None)[source]

Drain messages from consumer instance.

kombu.common.eventloop(conn, limit=None, timeout=None, ignore_timeouts=False)[source]

Best practice generator wrapper around Connection.drain_events.

Able to drain events forever, with a limit, and optionally ignoring timeout errors (a timeout of 1 is often used in environments where the socket can get “stuck”, and is a best practice for Kombu consumers).

eventloop is a generator.


>>> from kombu.common import eventloop
>>> def run(conn):
...     it = eventloop(conn, timeout=1, ignore_timeouts=True)
...     next(it)   # one event consumed, or timed out.
...     for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
...         pass  # loop forever.

It also takes an optional limit parameter, and timeout errors are propagated by default:

for _ in eventloop(connection, limit=1, timeout=1):

See also

itermessages(), which is an event loop bound to one or more consumers, that yields any messages received.