This document describes the current stable version of Kombu (4.5). For development docs, go here.

Virtual Transport Base Class - kombu.transport.virtual

Transports

class kombu.transport.virtual.Transport(client, **kwargs)[source]

Virtual transport.

Parameters:client (kombu.Connection) – The client this is a transport for.
Channel = <class 'kombu.transport.virtual.base.Channel'>
Cycle = <class 'kombu.utils.scheduling.FairCycle'>
polling_interval = 1.0

Time to sleep between unsuccessful polls.

default_port = None

port number used when no port is specified.

state = <kombu.transport.virtual.base.BrokerState object>

Global BrokerState containing declared exchanges and bindings.

cycle = None

FairCycle instance used to fairly drain events from channels (set by constructor).

establish_connection()[source]
close_connection(connection)[source]
create_channel(connection)[source]
close_channel(channel)[source]
drain_events(connection, timeout=None)[source]

Channel

class kombu.transport.virtual.AbstractChannel[source]

Abstract channel interface.

This is an abstract class defining the channel methods you’d usually want to implement in a virtual channel.

Note

Do not subclass directly, but rather inherit from Channel.

class kombu.transport.virtual.Channel(connection, **kwargs)[source]

Virtual channel.

Parameters:connection (ConnectionT) – The transport instance this channel is part of.
Message = <class 'kombu.transport.virtual.base.Message'>

message class used.

state

Broker state containing exchanges and bindings.

qos

QoS manager for this channel.

do_restore = True

flag to restore unacked messages when channel goes out of scope.

exchange_types = {'direct': <class 'kombu.transport.virtual.exchange.DirectExchange'>, 'fanout': <class 'kombu.transport.virtual.exchange.FanoutExchange'>, 'topic': <class 'kombu.transport.virtual.exchange.TopicExchange'>}

mapping of exchange types and corresponding classes.

exchange_declare(exchange=None, type='direct', durable=False, auto_delete=False, arguments=None, nowait=False, passive=False)[source]

Declare exchange.

exchange_delete(exchange, if_unused=False, nowait=False)[source]

Delete exchange and all its bindings.

queue_declare(queue=None, passive=False, **kwargs)[source]

Declare queue.

queue_delete(queue, if_unused=False, if_empty=False, **kwargs)[source]

Delete queue.

queue_bind(queue, exchange=None, routing_key='', arguments=None, **kwargs)[source]

Bind queue to exchange with routing key.

queue_purge(queue, **kwargs)[source]

Remove all ready messages from queue.

basic_publish(message, exchange, routing_key, **kwargs)[source]

Publish message.

basic_consume(queue, no_ack, callback, consumer_tag, **kwargs)[source]

Consume from queue.

basic_cancel(consumer_tag)[source]

Cancel consumer by consumer tag.

basic_get(queue, no_ack=False, **kwargs)[source]

Get message by direct access (synchronous).

basic_ack(delivery_tag, multiple=False)[source]

Acknowledge message.

basic_recover(requeue=False)[source]

Recover unacked messages.

basic_reject(delivery_tag, requeue=False)[source]

Reject message.

basic_qos(prefetch_size=0, prefetch_count=0, apply_global=False)[source]

Change QoS settings for this channel.

Note

Only prefetch_count is supported.

get_table(exchange)[source]

Get table of bindings for exchange.

typeof(exchange, default='direct')[source]

Get the exchange type instance for exchange.

drain_events(timeout=None, callback=None)[source]
prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)[source]

Prepare message data.

message_to_python(raw_message)[source]

Convert raw message to Message instance.

flow(active=True)[source]

Enable/disable message flow.

Raises:NotImplementedError – as flow is not implemented by the base virtual implementation.
close()[source]

Close channel.

Cancel all consumers, and requeue unacked messages.

Message

class kombu.transport.virtual.Message(payload, channel=None, **kwargs)[source]

Message object.

exception MessageStateError

The message has already been acknowledged.

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

accept
ack(multiple=False)[source]

Acknowledge this message as being processed.

This will remove the message from the queue.

Raises:MessageStateError – If the message has already been acknowledged/requeued/rejected.
ack_log_error(logger, errors, multiple=False)[source]
acknowledged

Set to true if the message has been acknowledged.

body
channel
content_encoding
content_type
decode()[source]

Deserialize the message body.

Returning the original python structure sent by the publisher.

Note

The return value is memoized, use _decode to force re-evaluation.

delivery_info
delivery_tag
errors = None
headers
payload

The decoded message body.

properties
reject(requeue=False)[source]

Reject this message.

The message will be discarded by the server.

Raises:MessageStateError – If the message has already been acknowledged/requeued/rejected.
reject_log_error(logger, errors, requeue=False)[source]
requeue()[source]

Reject this message and put it back on the queue.

Warning

You must not use this method as a means of selecting messages to process.

Raises:MessageStateError – If the message has already been acknowledged/requeued/rejected.
serializable()[source]

Quality Of Service

class kombu.transport.virtual.QoS(channel, prefetch_count=0)[source]

Quality of Service guarantees.

Only supports prefetch_count at this point.

Parameters:
  • channel (ChannelT) – Connection channel.
  • prefetch_count (int) – Initial prefetch count (defaults to 0).
ack(delivery_tag)[source]

Acknowledge message and remove from transactional state.

append(message, delivery_tag)[source]

Append message to transactional state.

can_consume()[source]

Return true if the channel can be consumed from.

Used to ensure the client adhers to currently active prefetch limits.

can_consume_max_estimate()[source]

Return the maximum number of messages allowed to be returned.

Returns an estimated number of messages that a consumer may be allowed to consume at once from the broker. This is used for services where bulk ‘get message’ calls are preferred to many individual ‘get message’ calls - like SQS.

Returns:greater than zero.
Return type:int
get(delivery_tag)[source]
prefetch_count = 0

current prefetch count value

reject(delivery_tag, requeue=False)[source]

Remove from transactional state and requeue message.

restore_at_shutdown = True

If disabled, unacked messages won’t be restored at shutdown.

restore_unacked()[source]

Restore all unacknowledged messages.

restore_unacked_once(stderr=None)[source]

Restore all unacknowledged messages at shutdown/gc collect.

Note

Can only be called once for each instance, subsequent calls will be ignored.

restore_visible(*args, **kwargs)[source]

Restore any pending unackwnowledged messages.

To be filled in for visibility_timeout style implementations.

Note

This is implementation optional, and currently only used by the Redis transport.

In-memory State

class kombu.transport.virtual.BrokerState(exchanges=None)[source]

Broker state holds exchanges, queues and bindings.

binding_declare(queue, exchange, routing_key, arguments)[source]
binding_delete(queue, exchange, routing_key)[source]
bindings = None

This is the actual bindings registry, used to store bindings and to test ‘in’ relationships in constant time. It has the following structure:

{
    (queue, exchange, routing_key): arguments,
    # ...,
}
clear()[source]
exchanges = None

Mapping of exchange name to kombu.transport.virtual.exchange.ExchangeType

has_binding(queue, exchange, routing_key)[source]
queue_bindings(queue)[source]
queue_bindings_delete(queue)[source]
queue_index = None

The queue index is used to access directly (constant time) all the bindings of a certain queue. It has the following structure:

{
    queue: {
        (queue, exchange, routing_key),
        # ...,
    },
    # ...,
}