Skip to content

AsyncioPoolNG

A pool of coroutine functions.

AsyncioPool utilizes an asyncio.TaskGroup to manage the lifecycle of each async function but adds the ability to limit how many run concurrently.

For each async function, a Future is returned. This Future will eventually hold the result of the function that was executed. Any exceptions raised inside the function are hidden from the TaskGroup and held within the Future.

AsyncioPool is fully typed and passes checks with mypy's strict mode.

is_empty: bool property

Return True if the pool has no tasks.

Returns:

Type Description
bool

True/False

is_full: bool property

Return True if the pool has reached the maximum number of concurrent tasks.

Returns:

Type Description
bool

True/False

__init__(size=100)

Initialize AsyncioPool.

Parameters:

Name Type Description Default
size int

Max number of coroutines which can run simultaneously.

100

itermap(func, iterable, name=None, context=None, batch_duration=1.0) async

Generate a future for func for every item of iterable.

Futures are yielded as they completed.

Parameters:

Name Type Description Default
func AsyncioPoolMapWorkerType[T, R]

Function created with async def

required
iterable Iterable[T] | AsyncIterable[T]

Instance of Iterable or AsyncIterabl which produces values for func

required
name str | None

Optional name for the asyncio.Task

None
context Context | None

Optional context argument allows specifying a custom contextvars.Context for the coro to run in. The current context copy is created when no context is provided.

None
batch_duration int | float

Duration to wait before yielding batches of completed futures.

1.0

Returns:

Type Description
AsyncGenerator[Future[R], None]

Async generator of futures.

join() async

Wait for all active tasks to complete.

map(func, iterable, name=None, context=None)

Apply func to every item of iterable.

Parameters:

Name Type Description Default
func AsyncioPoolMapWorkerType[T, R]

Function created with async def

required
iterable Iterable[T]

Iterable instance which produces values for func

required
name str | None

Optional name for the asyncio.Task

None
context Context | None

Optional context argument allows specifying a custom contextvars.Context for the coro to run in. The current context copy is created when no context is provided.

None

Returns:

Type Description
set[Future[R]]

Set of futures which will eventually contain the results of each func.

spawn(func, *args, name=None, context=None, **kwargs)

Schedules the callable, func, to be executed as func(*args, **kwargs).

This method immediately returns a future which represents the execution of the callable.

Parameters:

Name Type Description Default
func AsyncioPoolWorkerType[R]

Function created with async def

required
*args Any

Positional arguments for func

()
name str | None

Optional name for the asyncio.Task

None
context Context | None

Optional context argument allows specifying a custom contextvars.Context for the coro to run in. The current context copy is created when no context is provided.

None
**kwargs Any

Keyword aarguments for func

{}

Returns:

Type Description
Future[R]

Future which will eventually contain the results of func.