amqp package

Submodules

amqp.dependencies module

class micro_framework.amqp.dependencies.Producer(confirm_publish=True, service_name=None)

Bases: micro_framework.dependencies.Dependency

property amqp_uri
property exchange
async get_dependency(worker)

Injects the dependency that will be passed to the Target Function.

property transport_options
class micro_framework.amqp.dependencies.RPCProxyProvider(target_service)

Bases: micro_framework.rpc.dependencies.RPCDependency

Provides a RPCProxy with AMQPRPCConnector.

connector_class

alias of micro_framework.amqp.connectors.AMQPRPCConnector

get_connector_kwargs() → Dict

Key arguments to be passed to the connector init method :return dict:

reply_listener = <micro_framework.amqp.rpc.RPCReplyListener object>
micro_framework.amqp.dependencies.dispatch(amqp_uri, exchange, routing_key, payload, **kwargs)

Helper to dispatch a single payload that will load the client and call the send method.

Parameters
  • amqp_uri (str) –

  • exchange (Exchange) –

  • routing_key (str) –

  • payload (Any) –

  • headers (dict) –

amqp.entrypoints module

class micro_framework.amqp.entrypoints.BaseEventListener(exchange_name=None, routing_key=None, exchange_type='topic', payload_filter=None)

Bases: micro_framework.entrypoints.Entrypoint

Base class to handle Queue consuming of a single routing key in an exchange. It declares a Queue, binded to the exchange with exchange_name and with a routing_key from routing_key attribute.

async call_route(entry_id, *args, _meta=None, **kwargs)

Called by the Manager when a new message is received. :param Message message: Message object :param dict|str payload: Event Payload

async get_exchange()

Return the Exchange that will have a queue binded to. :return Exchange:

async get_queue()

Returns the Queue to be created or loaded using the exchange and routing key declared. :return:

async get_queue_name()

Return the name of the queue to be binded. It usually must be unique by consuming entrypoint to avoid the same queue being consumed by multiple different routes. :return str: The name of the Queue

internal = False
manager = <micro_framework.amqp.manager.ConsumerManager object>
async setup()

Called After the binding and before the system start

class micro_framework.amqp.entrypoints.EventListener(source_service, event_name, **kwargs)

Bases: micro_framework.amqp.entrypoints.QueueListener

async get_queue_name()

Return the name of the queue to be binded. It usually must be unique by consuming entrypoint to avoid the same queue being consumed by multiple different routes. :return str: The name of the Queue

class micro_framework.amqp.entrypoints.QueueListener(*args, queue_name=None, **kwargs)

Bases: micro_framework.amqp.entrypoints.BaseEventListener

async get_queue_name()

Return the name of the queue to be binded. It usually must be unique by consuming entrypoint to avoid the same queue being consumed by multiple different routes. :return str: The name of the Queue

class micro_framework.amqp.entrypoints.RPCListener

Bases: micro_framework.entrypoints.Entrypoint

manager = <micro_framework.amqp.manager.RPCManager object>
async setup()

Called After the binding and before the system start

amqp.manager module

class micro_framework.amqp.manager.ConsumerManager

Bases: micro_framework.extensions.Extension, kombu.mixins.ConsumerMixin

async ack_message(message)
async add_entrypoint(entrypoint)
property amqp_uri
context_singleton = True
async get_connection()
get_consumers(Consumer, channel)
async handle_new_message(entrypoint, body, message)
on_connection_error(exc, interval)
on_consume_ready(connection, channel, consumers, **kwargs)
on_message(entrypoint, body, message)
async requeue_message(message)
async setup()

Called After the binding and before the system start

async start()

Commands the extension to start running.

async stop()

Commands the extension to do a graceful stop

class micro_framework.amqp.manager.RPCManager

Bases: micro_framework.rpc.manager.RPCManagerMixin, kombu.mixins.ConsumerMixin

async ack_message(message)
property amqp_uri
context_singleton = True
async get_connection()
get_consumers(Consumer, channel)
async handle_new_call(target_ids, body, message)

Async Method to handle all procedures when receiving a RPC call.

on_broadcast_message(body, message)
on_message(entrypoint, body, message)
async requeue_message(message)
async send_reply(message, payload)

Send to the RPC Response to the Reply Queue :param message: :param future: :return:

async setup()

Called After the binding and before the system start

async start()

Commands the extension to start running.

async stop()

Commands the extension to do a graceful stop

Module contents