MultisubscriberQueue
Bases: Generic[T]
Allow a single producer to provide the same payload to multiple consumers simultaneously.
An asyncio.Queue
can be obtained directly by calling MultisubscriberQueue.queue() or
MultisubscriberQueue.subscribe()
can be used to yield data as it is available.
add()
Get a new Queue which is part of the subscriber set.
Returns:
Type | Description |
---|---|
Queue[T]
|
A subscriber Queue. |
remove(queue)
Remove given queue from the subscriber set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queue |
Queue[T]
|
Queue object to be removed from the subscriber set. |
required |
put(data)
async
Put data on all the Queues in the subscriber set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
T
|
Data for the Queues. Please note that |
required |
close()
async
Put StopAsyncIteration
on the subscriber Queues.
This is used to signal the end of the MultisubscriberQueue session.
queue()
Context helper which manages the lifecycle of the subscriber Queue.
Example:
from asyncio import Queue
from asyncio_multisubscriber_queue import MultisubscriberQueue
# create a MultisubscriberQueue for distributing int instances
mqueue: MultisubscriberQueue[int] = MultisubscriberQueue()
with mqueue.queue() as q:
first_item: int = await q.get()
Returns:
Type | Description |
---|---|
Generator[Queue[T], None, None]
|
Generator containing a subscriber Queue. |
subscribe()
async
Subscribe to MultisubscriberQueue
using an async generator.
Instead of working with the Queue directly, the client can subscribe to data and have it yielded as it is available.
Example:
from asyncio_multisubscriber_queue import MultisubscriberQueue
# create a MultisubscriberQueue for distributing str instances
mqueue: MultisubscriberQueue[str] = MultisubscriberQueue()
async with mqueue.subscribe() as data:
first_item: str = await q.get()
Returns:
Type | Description |
---|---|
AsyncGenerator[T, None]
|
AsyncGenerator containing subscriber data. |