Skip to content

MultisubscriberQueue

Bases: Generic[T]

Allow a single producer to provide the same payload to multiple consumers simultaneously.

An asyncio.Queue can be obtained directly by calling MultisubscriberQueue.queue() or MultisubscriberQueue.subscribe() can be used to yield data as it is available.

add()

Get a new Queue which is part of the subscriber set.

Returns:

Type Description
Queue[T]

A subscriber Queue.

remove(queue)

Remove given queue from the subscriber set.

Parameters:

Name Type Description Default
queue Queue[T]

Queue object to be removed from the subscriber set.

required

put(data) async

Put data on all the Queues in the subscriber set.

Parameters:

Name Type Description Default
data T

Data for the Queues. Please note that StopAsyncIteration is used to close all subscribing queues.

required

close() async

Put StopAsyncIteration on the subscriber Queues.

This is used to signal the end of the MultisubscriberQueue session.

queue()

Context helper which manages the lifecycle of the subscriber Queue.

Example:

from asyncio import Queue
from asyncio_multisubscriber_queue import MultisubscriberQueue

# create a MultisubscriberQueue for distributing int instances
mqueue: MultisubscriberQueue[int] = MultisubscriberQueue()
with mqueue.queue() as q:
    first_item: int = await q.get()

Returns:

Type Description
Generator[Queue[T], None, None]

Generator containing a subscriber Queue.

subscribe() async

Subscribe to MultisubscriberQueue using an async generator.

Instead of working with the Queue directly, the client can subscribe to data and have it yielded as it is available.

Example:

from asyncio_multisubscriber_queue import MultisubscriberQueue

# create a MultisubscriberQueue for distributing str instances
mqueue: MultisubscriberQueue[str] = MultisubscriberQueue()
async with mqueue.subscribe() as data:
        first_item: str = await q.get()

Returns:

Type Description
AsyncGenerator[T, None]

AsyncGenerator containing subscriber data.