This document describes the current stable version of Kombu (4.5). For development docs, go here.

Redis Transport - kombu.transport.redis

Redis transport.

Transport

class kombu.transport.redis.Transport(*args, **kwargs)[source]

Redis Transport.

class Channel(*args, **kwargs)

Redis Channel.

class QoS(*args, **kwargs)

Redis Ack Emulation.

ack(delivery_tag)

Acknowledge message and remove from transactional state.

append(message, delivery_tag)

Append message to transactional state.

pipe_or_acquire(pipe=None, client=None)
reject(delivery_tag, requeue=False)

Remove from transactional state and requeue message.

restore_at_shutdown = True
restore_by_tag(tag, client=None, leftmost=False)
restore_unacked(client=None)

Restore all unacknowledged messages.

restore_visible(start=0, num=10, interval=10)

Restore any pending unackwnowledged messages.

To be filled in for visibility_timeout style implementations.

Note

This is implementation optional, and currently only used by the Redis transport.

unacked_index_key
unacked_key
unacked_mutex_expire
unacked_mutex_key
visibility_timeout
ack_emulation = True
active_queues

Set of queues being consumed from (excluding fanout queues).

async_pool
basic_cancel(consumer_tag)

Cancel consumer by consumer tag.

basic_consume(queue, *args, **kwargs)

Consume from queue.

client

Client used to publish messages, BRPOP etc.

close()

Close channel.

Cancel all consumers, and requeue unacked messages.

conn_or_acquire(client=None)
connection_class = None
fanout_patterns = True
fanout_prefix = True
from_transport_options = ('body_encoding', 'deadletter_queue', 'sep', 'ack_emulation', 'unacked_key', 'unacked_index_key', 'unacked_mutex_key', 'unacked_mutex_expire', 'visibility_timeout', 'unacked_restore_limit', 'fanout_prefix', 'fanout_patterns', 'socket_timeout', 'socket_connect_timeout', 'socket_keepalive', 'socket_keepalive_options', 'queue_order_strategy', 'max_connections', 'priority_steps')
get_table(exchange)

Get table of bindings for exchange.

keyprefix_fanout = '/{db}.'
keyprefix_queue = '_kombu.binding.%s'
max_connections = 10
pool
priority(n)
priority_steps = [0, 3, 6, 9]
queue_order_strategy = 'round_robin'
sep = '\x06\x16'
socket_connect_timeout = None
socket_keepalive = None
socket_keepalive_options = None
socket_timeout = None
subclient

Pub/Sub connection used to consume fanout queues.

supports_fanout = True
unacked_index_key = 'unacked_index'
unacked_key = 'unacked'
unacked_mutex_expire = 300
unacked_mutex_key = 'unacked_mutex'
unacked_restore_limit = None
visibility_timeout = 3600
default_port = 6379
driver_name = 'redis'
driver_type = 'redis'
driver_version()[source]
implements = {'asynchronous': True, 'exchange_type': frozenset({'topic', 'direct', 'fanout'}), 'heartbeats': False}
on_readable(fileno)[source]

Handle AIO event for one of our file descriptors.

polling_interval = None
register_with_event_loop(connection, loop)[source]

Channel

class kombu.transport.redis.Channel(*args, **kwargs)[source]

Redis Channel.

class QoS(*args, **kwargs)

Redis Ack Emulation.

ack(delivery_tag)

Acknowledge message and remove from transactional state.

append(message, delivery_tag)

Append message to transactional state.

pipe_or_acquire(pipe=None, client=None)
reject(delivery_tag, requeue=False)

Remove from transactional state and requeue message.

restore_at_shutdown = True
restore_by_tag(tag, client=None, leftmost=False)
restore_unacked(client=None)

Restore all unacknowledged messages.

restore_visible(start=0, num=10, interval=10)

Restore any pending unackwnowledged messages.

To be filled in for visibility_timeout style implementations.

Note

This is implementation optional, and currently only used by the Redis transport.

unacked_index_key
unacked_key
unacked_mutex_expire
unacked_mutex_key
visibility_timeout
ack_emulation = True
active_queues

Set of queues being consumed from (excluding fanout queues).

async_pool
basic_cancel(consumer_tag)[source]

Cancel consumer by consumer tag.

basic_consume(queue, *args, **kwargs)[source]

Consume from queue.

client[source]

Client used to publish messages, BRPOP etc.

close()[source]

Close channel.

Cancel all consumers, and requeue unacked messages.

conn_or_acquire(client=None)[source]
connection_class = None
fanout_patterns = True

If enabled the fanout exchange will support patterns in routing and binding keys (like a topic exchange but using PUB/SUB).

Enabled by default since Kombu 4.x. Disable for backwards compatibility with Kombu 3.x.

fanout_prefix = True

Transport option to disable fanout keyprefix. Can also be string, in which case it changes the default prefix (‘/{db}.’) into to something else. The prefix must include a leading slash and a trailing dot.

Enabled by default since Kombu 4.x. Disable for backwards compatibility with Kombu 3.x.

from_transport_options = ('body_encoding', 'deadletter_queue', 'sep', 'ack_emulation', 'unacked_key', 'unacked_index_key', 'unacked_mutex_key', 'unacked_mutex_expire', 'visibility_timeout', 'unacked_restore_limit', 'fanout_prefix', 'fanout_patterns', 'socket_timeout', 'socket_connect_timeout', 'socket_keepalive', 'socket_keepalive_options', 'queue_order_strategy', 'max_connections', 'priority_steps')
get_table(exchange)[source]

Get table of bindings for exchange.

keyprefix_fanout = '/{db}.'
keyprefix_queue = '_kombu.binding.%s'
max_connections = 10
pool
priority(n)[source]
priority_steps = [0, 3, 6, 9]
queue_order_strategy = 'round_robin'

Order in which we consume from queues.

Can be either string alias, or a cycle strategy class

  • round_robin (round_robin_cycle).

    Make sure each queue has an equal opportunity to be consumed from.

  • sorted (sorted_cycle).

    Consume from queues in alphabetical order. If the first queue in the sorted list always contains messages, then the rest of the queues will never be consumed from.

  • priority (priority_cycle).

    Consume from queues in original order, so that if the first queue always contains messages, the rest of the queues in the list will never be consumed from.

The default is to consume from queues in round robin.

sep = '\x06\x16'
socket_connect_timeout = None
socket_keepalive = None
socket_keepalive_options = None
socket_timeout = None
subclient[source]

Pub/Sub connection used to consume fanout queues.

supports_fanout = True
unacked_index_key = 'unacked_index'
unacked_key = 'unacked'
unacked_mutex_expire = 300
unacked_mutex_key = 'unacked_mutex'
unacked_restore_limit = None
visibility_timeout = 3600