kombu.pidbox

Generic process mailbox.

Introduction

Creating the applications Mailbox

>>> mailbox = pidbox.Mailbox("celerybeat", type="direct")

>>> @mailbox.handler
>>> def reload_schedule(state, **kwargs):
...     state["beat"].reload_schedule()

>>> @mailbox.handler
>>> def connection_info(state, **kwargs):
...     return {"connection": state["connection"].info()}

Example Node

>>> connection = kombu.Connection()
>>> state = {"beat": beat,
            "connection": connection}
>>> consumer = mailbox(connection).Node(hostname).listen()
>>> try:
...     while True:
...         connection.drain_events(timeout=1)
... finally:
...     consumer.cancel()

Example Client

>>> mailbox.cast("reload_schedule")   # cast is async.
>>> info = celerybeat.call("connection_info", timeout=1)

Mailbox

class kombu.pidbox.Mailbox(namespace, type='direct', connection=None, clock=None, accept=None, serializer=None)[source]
namespace = None

Name of application.

connection = None

Connection (if bound).

type = 'direct'

Exchange type (usually direct, or fanout for broadcast).

exchange = None

mailbox exchange (init by constructor).

reply_exchange = None

exchange to send replies to.

Node(hostname=None, state=None, channel=None, handlers=None)[source]
call(destination, command, kwargs={}, timeout=None, callback=None, channel=None)[source]
cast(destination, command, kwargs={})[source]
abcast(command, kwargs={})[source]
multi_call(command, kwargs={}, timeout=1, limit=None, callback=None, channel=None)[source]
get_reply_queue()[source]
get_queue(hostname)[source]

Node

class kombu.pidbox.Node(hostname, state=None, channel=None, handlers=None, mailbox=None)[source]
hostname = None

hostname of the node.

mailbox = None

the Mailbox this is a node for.

handlers = None

map of method name/handlers.

state = None

current context (passed on to handlers)

channel = None

current channel.

Consumer(channel=None, no_ack=True, accept=None, **options)[source]
handler(fun)[source]
listen(channel=None, callback=None)[source]
dispatch(method, arguments=None, reply_to=None, ticket=None, **kwargs)[source]
dispatch_from_message(body, message=None)
handle_call(method, arguments)[source]
handle_cast(method, arguments)[source]
handle(method, arguments={})[source]
handle_message(body, message=None)[source]
reply(data, exchange, routing_key, ticket, **kwargs)[source]