kombu.transport.virtual

Virtual transport implementation.

Emulates the AMQ API for non-AMQ transports.

Transports

class kombu.transport.virtual.Transport(client, **kwargs)[source]

Virtual transport.

Parameters:clientConnection instance
Channel = <class 'kombu.transport.virtual.Channel'>
Cycle = <class 'kombu.transport.virtual.scheduling.FairCycle'>
polling_interval = 1.0

Time to sleep between unsuccessful polls.

default_port = None

port number used when no port is specified.

state = <kombu.transport.virtual.BrokerState object>

BrokerState containing declared exchanges and bindings (set by constructor).

cycle = None

FairCycle instance used to fairly drain events from channels (set by constructor).

establish_connection()[source]
close_connection(connection)[source]
create_channel(connection)[source]
close_channel(channel)[source]
drain_events(connection, timeout=None)[source]

Channel

class kombu.transport.virtual.AbstractChannel[source]

This is an abstract class defining the channel methods you’d usually want to implement in a virtual channel.

Do not subclass directly, but rather inherit from Channel instead.

class kombu.transport.virtual.Channel(connection, **kwargs)[source]

Virtual channel.

Parameters:connection – The transport instance this channel is part of.
Message = <class 'kombu.transport.virtual.Message'>

message class used.

state

Broker state containing exchanges and bindings.

qos

QoS manager for this channel.

do_restore = True

flag to restore unacked messages when channel goes out of scope.

exchange_types = {'topic': <class 'kombu.transport.virtual.exchange.TopicExchange'>, 'fanout': <class 'kombu.transport.virtual.exchange.FanoutExchange'>, 'direct': <class 'kombu.transport.virtual.exchange.DirectExchange'>}

mapping of exchange types and corresponding classes.

exchange_declare(exchange=None, type=u'direct', durable=False, auto_delete=False, arguments=None, nowait=False, passive=False)[source]

Declare exchange.

exchange_delete(exchange, if_unused=False, nowait=False)[source]

Delete exchange and all its bindings.

queue_declare(queue=None, passive=False, **kwargs)[source]

Declare queue.

queue_delete(queue, if_unused=False, if_empty=False, **kwargs)[source]

Delete queue.

queue_bind(queue, exchange=None, routing_key=u'', arguments=None, **kwargs)[source]

Bind queue to exchange with routing key.

queue_purge(queue, **kwargs)[source]

Remove all ready messages from queue.

basic_publish(message, exchange, routing_key, **kwargs)[source]

Publish message.

basic_consume(queue, no_ack, callback, consumer_tag, **kwargs)[source]

Consume from queue

basic_cancel(consumer_tag)[source]

Cancel consumer by consumer tag.

basic_get(queue, no_ack=False, **kwargs)[source]

Get message by direct access (synchronous).

basic_ack(delivery_tag)[source]

Acknowledge message.

basic_recover(requeue=False)[source]

Recover unacked messages.

basic_reject(delivery_tag, requeue=False)[source]

Reject message.

basic_qos(prefetch_size=0, prefetch_count=0, apply_global=False)[source]

Change QoS settings for this channel.

Only prefetch_count is supported.

get_table(exchange)[source]

Get table of bindings for exchange.

typeof(exchange, default=u'direct')[source]

Get the exchange type instance for exchange.

drain_events(timeout=None)[source]
prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)[source]

Prepare message data.

message_to_python(raw_message)[source]

Convert raw message to Message instance.

flow(active=True)[source]

Enable/disable message flow.

Raises:NotImplementedError – as flow is not implemented by the base virtual implementation.
close()[source]

Close channel, cancel all consumers, and requeue unacked messages.

Message

class kombu.transport.virtual.Message(channel, payload, **kwargs)[source]
exception MessageStateError

The message has already been acknowledged.

args
message
Message.accept
Message.ack()[source]

Acknowledge this message as being processed., This will remove the message from the queue.

Raises:MessageStateError – If the message has already been acknowledged/requeued/rejected.
Message.ack_log_error(logger, errors)[source]
Message.acknowledged

Set to true if the message has been acknowledged.

Message.body
Message.channel
Message.content_encoding
Message.content_type
Message.decode()[source]

Deserialize the message body, returning the original python structure sent by the publisher.

Message.delivery_info
Message.delivery_tag
Message.errors = None
Message.headers
Message.payload

The decoded message body.

Message.properties
Message.reject(requeue=False)[source]

Reject this message.

The message will be discarded by the server.

Raises:MessageStateError – If the message has already been acknowledged/requeued/rejected.
Message.reject_log_error(logger, errors, requeue=False)[source]
Message.requeue()[source]

Reject this message and put it back on the queue.

You must not use this method as a means of selecting messages to process.

Raises:MessageStateError – If the message has already been acknowledged/requeued/rejected.
Message.serializable()[source]

Quality Of Service

class kombu.transport.virtual.QoS(channel, prefetch_count=0)[source]

Quality of Service guarantees.

Only supports prefetch_count at this point.

Parameters:
  • channel – AMQ Channel.
  • prefetch_count – Initial prefetch count (defaults to 0).
ack(delivery_tag)[source]

Acknowledge message and remove from transactional state.

append(message, delivery_tag)[source]

Append message to transactional state.

can_consume()[source]

Return true if the channel can be consumed from.

Used to ensure the client adhers to currently active prefetch limits.

can_consume_max_estimate()[source]

Returns the maximum number of messages allowed to be returned.

Returns an estimated number of messages that a consumer may be allowed to consume at once from the broker. This is used for services where bulk ‘get message’ calls are preferred to many individual ‘get message’ calls - like SQS.

returns:
An integer > 0
get(delivery_tag)[source]
prefetch_count = 0

current prefetch count value

reject(delivery_tag, requeue=False)[source]

Remove from transactional state and requeue message.

restore_at_shutdown = True

If disabled, unacked messages won’t be restored at shutdown.

restore_unacked()[source]

Restore all unacknowledged messages.

restore_unacked_once()[source]

Restores all unacknowledged messages at shutdown/gc collect.

Will only be done once for each instance.

restore_visible(*args, **kwargs)[source]

Restore any pending unackwnowledged messages for visibility_timeout style implementations.

Optional: Currently only used by the Redis transport.

In-memory State

class kombu.transport.virtual.BrokerState(exchanges=None, bindings=None)[source]
bindings = None

active bindings.

clear()[source]
exchanges = None

exchange declarations.