tasks – Manage herds of running coroutines

class aioxmpp.tasks.TaskPool(*, max_tasks=None, default_limit=None, logger=None)[source]

Coroutine worker pool with limits on the maximum number of coroutines.

Parameters
  • max_tasks (positive int or None) – Maximum number of total coroutines running in the pool.

  • logger – Logger to use for diagnostics, defaults to a module-wide logger

Each coroutine run in the task pool belongs to zero or more groups. Groups are identified by their hashable group key. The structure of the key is not relevant. Groups are created on-demand. Each coroutine is implicitly part of the group () (the empty tuple).

max_tasks is the limit on the group () (the empty tuple). As every coroutine is running in that group, it is the limit on the total number of coroutines running in the pool.

When a coroutine exits (either normally or by an exception or cancellation), it is removed from the pool and the counters for running coroutines are adapted accordingly.

Controlling limits on groups:

set_limit(group, new_limit)[source]

Set a new limit on the number of tasks in the group.

Parameters
  • group (hashable) – Group key of the group to modify.

  • new_limit (non-negative int or None) – New limit for the number of tasks running in group.

Raises

ValueError – if new_limit is non-positive

The limit of tasks for the group is set to new_limit. If there are currently more than new_limit tasks running in group, those tasks will continue to run, however, the creation of new tasks is inhibited until the group is below its limit.

If the limit is set to zero, no new tasks can be spawned in the group at all.

If new_limit is negative ValueError is raised instead.

If new_limit is None, the method behaves as if clear_limit() was called for group.

get_limit(group)[source]

Return the limit on the number of tasks in the group.

Parameters

group (hashable) – Group key of the group to query.

Returns

The current limit

Return type

int or None

If the group currently has no limit set, None is returned. Otherwise, the maximum number of tasks which are allowed to run in the group is returned.

get_task_count(group)[source]

Return the number of tasks currently running in group.

Parameters

group (hashable) – Group key of the group to query.

Returns

Number of currently running tasks

Return type

int

clear_limit(group)[source]

Clear the limit on the number of tasks in the group.

Parameters

group (hashable) – Group key of the group to modify.

The limit on the number of tasks in group is removed. If the group currently has no limit, this method has no effect.

Starting and adding coroutines:

spawn(group, coro_fun, *args, **kwargs)[source]

Start a new coroutine and add it to the pool atomically.

Parameters
  • groups (set of group keys) – The groups the coroutine belongs to.

  • coro_fun – Coroutine function to run

  • args – Positional arguments to pass to coro_fun

  • kwargs – Keyword arguments to pass to coro_fun

Raises

RuntimeError – if the limit on any of the groups or the total limit is exhausted

Return type

asyncio.Task

Returns

The task in which the coroutine runs.

Every group must have at least one free slot available for coro to be spawned; if any groups capacity (or the total limit) is exhausted, the coroutine is not accepted into the pool and RuntimeError is raised.

If the coroutine cannot be added due to limiting, it is not started at all.

The coroutine is started by calling coro_fun with args and kwargs.

Note

The first two arguments can only be passed positionally, not as keywords. This is to prevent conflicts with keyword arguments to coro_fun.

add(groups, coro)[source]

Add a running coroutine in the given pool groups.

Parameters
  • groups (set of group keys) – The groups the coroutine belongs to.

  • coro – Coroutine to add

Raises

RuntimeError – if the limit on any of the groups or the total limit is exhausted

Return type

asyncio.Task

Returns

The task in which the coroutine runs.

Every group must have at least one free slot available for coro to be spawned; if any groups capacity (or the total limit) is exhausted, the coroutine is not accepted into the pool and RuntimeError is raised.