This document describes the current stable version of Kombu (5.3). For development docs, go here.

Pidbox - kombu.pidbox

Generic process mailbox.

Introduction

Creating the applications Mailbox

>>> mailbox = pidbox.Mailbox('celerybeat', type='direct')

>>> @mailbox.handler
>>> def reload_schedule(state, **kwargs):
...     state['beat'].reload_schedule()

>>> @mailbox.handler
>>> def connection_info(state, **kwargs):
...     return {'connection': state['connection'].info()}

Example Node

>>> connection = kombu.Connection()
>>> state = {'beat': beat,
             'connection': connection}
>>> consumer = mailbox(connection).Node(hostname).listen()
>>> try:
...     while True:
...         connection.drain_events(timeout=1)
... finally:
...     consumer.cancel()

Example Client

>>> mailbox.cast('reload_schedule')   # cast is async.
>>> info = celerybeat.call('connection_info', timeout=1)

Mailbox

class kombu.pidbox.Mailbox(namespace, type='direct', connection=None, clock=None, accept=None, serializer=None, producer_pool=None, queue_ttl=None, queue_expires=None, reply_queue_ttl=None, reply_queue_expires=10.0)[source]

Process Mailbox.

namespace = None

Name of application.

connection = None

Connection (if bound).

type = 'direct'

Exchange type (usually direct, or fanout for broadcast).

exchange = None

mailbox exchange (init by constructor).

reply_exchange = None

exchange to send replies to.

Node(hostname=None, state=None, channel=None, handlers=None)[source]
call(destination, command, kwargs=None, timeout=None, callback=None, channel=None)[source]
cast(destination, command, kwargs=None)[source]
abcast(command, kwargs=None)[source]
multi_call(command, kwargs=None, timeout=1, limit=None, callback=None, channel=None)[source]
get_reply_queue()[source]
get_queue(hostname)[source]

Node

class kombu.pidbox.Node(hostname, state=None, channel=None, handlers=None, mailbox=None)[source]

Mailbox node.

hostname = None

hostname of the node.

mailbox = None

the Mailbox this is a node for.

handlers = None

map of method name/handlers.

state = None

current context (passed on to handlers)

channel = None

current channel.

Consumer(channel=None, no_ack=True, accept=None, **options)[source]
handler(fun)[source]
listen(channel=None, callback=None)[source]
dispatch(method, arguments=None, reply_to=None, ticket=None, **kwargs)[source]
dispatch_from_message(body, message=None)
handle_call(method, arguments)[source]
handle_cast(method, arguments)[source]
handle(method, arguments=None)[source]
handle_message(body, message=None)[source]
reply(data, exchange, routing_key, ticket, **kwargs)[source]