Metadata-Version: 2.4
Name: asynchronously
Version: 1.0.0
Summary: Tools to control async code execution
Project-URL: Homepage, https://github.com/miriada-io/asynchronously
Project-URL: Repository, https://github.com/miriada-io/asynchronously
Project-URL: Issues, https://github.com/miriada-io/asynchronously/issues
Project-URL: Changelog, https://github.com/miriada-io/asynchronously/blob/master/CHANGELOG.md
Author-email: Miriada <info@miriada.io>
License-Expression: MIT
License-File: LICENSE
Keywords: async,asyncio,deinitialization,initialization,lifecycle,periodic,retry
Classifier: Development Status :: 5 - Production/Stable
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.11
Provides-Extra: dev
Requires-Dist: coverage>=7.0; extra == 'dev'
Requires-Dist: pre-commit>=3.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: ruff>=0.6; extra == 'dev'
Description-Content-Type: text/markdown

# asynchronously

[![PyPI](https://img.shields.io/pypi/v/asynchronously.svg?label=PyPI)](https://pypi.org/project/asynchronously/)
[![Python](https://img.shields.io/pypi/pyversions/asynchronously.svg?label=Python)](https://pypi.org/project/asynchronously/)
[![Tests](https://github.com/miriada-io/asynchronously/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/miriada-io/asynchronously/actions/workflows/tests.yml)
[![License](https://img.shields.io/pypi/l/asynchronously.svg?label=License)](https://github.com/miriada-io/asynchronously/blob/master/LICENSE)

Asyncio building blocks for service lifecycle, retries, periodic jobs, and structured concurrent awaiting.

## Installation

```bash
pip install asynchronously
```

Requires Python 3.11+.

## Why asynchronously?

A typical Python service has several long-lived components — a database connection, an HTTP server, Kafka consumers, background workers — that must come up in dependency order and shut down in reverse. Stdlib `asyncio` doesn't ship the right primitives for that.

`asynchronously` provides those primitives, plus a few other utilities for common async patterns.

## Quick Start

A `Context` dataclass holds all components, each is an `AsyncInitable` (and/or `AsyncDeinitable`), and dependencies are declared with one call.

```python
import asyncio
from dataclasses import dataclass

from asynchronously import AsyncDeinitable, AsyncInitable


class Database(AsyncInitable, AsyncDeinitable):
    def __init__(self, url: str) -> None:
        AsyncInitable.__init__(self)
        AsyncDeinitable.__init__(self)
        self.url = url

    async def _async_init(self) -> None:
        ...  # open connection pool, run migrations

    async def _async_deinit(self) -> None:
        ...  # close pool


class HttpServer(AsyncInitable, AsyncDeinitable):
    def __init__(self, port: int) -> None:
        AsyncInitable.__init__(self)
        AsyncDeinitable.__init__(self)
        self.port = port

    async def _async_init(self) -> None:
        ...

    async def _async_deinit(self) -> None:
        ...


@dataclass
class Context(AsyncInitable, AsyncDeinitable):
    database: Database = None
    http_server: HttpServer = None

    def __post_init__(self) -> None:
        AsyncInitable.__init__(self)
        AsyncDeinitable.__init__(self)


async def amain() -> None:
    context = Context(
        database=Database("postgres://localhost/app"),
        http_server=HttpServer(port=8000),
    )

    # HTTP server starts only after DB init is done
    context.http_server.wait_before_async_init_for(
        asyncio.create_task(context.database.async_init_finished__event.wait())
    )

    await context.async_init()
    try:
        await asyncio.Event().wait()
    finally:
        await context.async_deinit()


asyncio.run(amain())
```

## Overview

**Lifecycle:** [`AsyncInitable`](#asyncinitable) | [`AsyncDeinitable`](#asyncdeinitable)

**Background tasks:** [`Periodic`](#periodic)

**Retries:** [`retry`](#retry)

**Helpers:** [`acall`](#acall) | [`DummyAsyncContextManager`](#dummyasynccontextmanager)

**Concurrent awaiting:** [`strict_gather`](#strict_gather) | [`await_dict_values`](#await_dict_values) | [`wait_for_all_other_tasks`](#wait_for_all_other_tasks)

---

## Lifecycle

### `AsyncInitable`

Mixin that adds a structured asynchronous initialization phase. Override `_async_init` to do your component's own work. By default the base implementation walks `vars(self)` and concurrently initializes any nested `AsyncInitable` attributes (also those inside `list`, `set`, or `dict` containers), so a single `await context.async_init()` on the top-level container initializes the entire tree.

```python
class FileManager(AsyncInitable):
    def __init__(self, context: Context) -> None:
        AsyncInitable.__init__(self)
        self.context = context

    async def _async_init(self) -> None:
        await self._reset_stuck_uploads()
```

Each instance exposes three `asyncio.Event` attributes that mark phases of its lifecycle and that other components can `await`:

| Attribute                        | Set when                                                |
|----------------------------------|---------------------------------------------------------|
| `async_init_scheduled__event`    | `async_init()` was called (does not mean work started)  |
| `async_init_started__event`      | All `wait_before_async_init_for` waits have completed   |
| `async_init_finished__event`     | `_async_init` returned                                  |

#### Declaring init-time dependencies

`wait_before_async_init_for(awaitable)` registers an awaitable that must complete before this component's `_async_init` runs. Typically you await another component's `async_init_finished__event`:

```python
http_server.wait_before_async_init_for(
    asyncio.create_task(database.async_init_finished__event.wait())
)
```

Raises `RuntimeError` if called after `async_init()` was already scheduled.

#### Inheriting alongside other base classes

When mixing with another base (e.g. `FastAPI`), call `AsyncInitable.__init__(self)` explicitly after the other parent's `__init__`. With `@dataclass`, do the same from `__post_init__`:

```python
class Application(FastAPI, AsyncInitable, AsyncDeinitable):
    def __init__(self, ...):
        super().__init__(...)
        AsyncInitable.__init__(self)
        AsyncDeinitable.__init__(self)
```

### `AsyncDeinitable`

The teardown counterpart of `AsyncInitable`. Mirrors the same API with `deinit` in place of `init`: override `_async_deinit`, observe `async_deinit_{scheduled,started,finished}__event`, declare dependencies with `wait_before_async_deinit_for`.

The two mixins are independent — a class can be only `AsyncInitable`, only `AsyncDeinitable`, or both:

```python
class Database(AsyncInitable, AsyncDeinitable):
    def __init__(self, url: str) -> None:
        AsyncInitable.__init__(self)
        AsyncDeinitable.__init__(self)
        self.url = url

    async def _async_init(self) -> None:
        self.pool = await create_pool(self.url)

    async def _async_deinit(self) -> None:
        await self.pool.close()
```

Deinit dependencies typically run in the *opposite* direction to init dependencies — e.g. the HTTP server stops accepting traffic before the DB pool closes:

```python
database.wait_before_async_deinit_for(
    asyncio.create_task(http_server.async_deinit_finished__event.wait())
)
```

## Background tasks

### `Periodic`

Runs an async or sync callable on a fixed period. Construction with `is_active=True` (the default) schedules the loop immediately, so a `Periodic` must be created from within a running event loop.

```python
import asyncio
from datetime import timedelta

from asynchronously import Periodic


async def amain():
    periodic_print = Periodic(lambda: print("i'm ok"), timedelta(minutes=1))
    # do other stuff
    await asyncio.Event().wait()


asyncio.run(amain())
```

Full signature:

```python
class Periodic:
    def __init__(
        self,
        job: Callable,
        period: datetime.timedelta,
        is_active: bool = True,
        first_at: datetime.datetime | None = None,
        decrease_sleep_time_by_evaluation_time: bool = False,
    ) -> None
```

Scheduling rules:

- `first_at=None` (default): the first invocation happens `period` after construction.
- `first_at` set: the first invocation happens at that moment. `first_at` may be naive or timezone-aware; the current time is read with the same tzinfo.
- `first_at` in the past: a warning is logged and the job runs immediately.
- By default, the next invocation is scheduled `period` after the previous job finishes. With `decrease_sleep_time_by_evaluation_time=True`, the next sleep is shortened by the job's wall-clock duration so the cadence stays aligned with `period` between successive job *starts*.

Exceptions raised by the job are caught and logged — the loop keeps running.

Public API: `start()` (called automatically when `is_active=True`, no-op on second call), `async stop()` (sets `is_active=False` and awaits the loop), `future` (the underlying `asyncio.Future`).

## Retries

### `retry`

Decorator that retries an `async` function on exceptions. `attempts_limit` is the maximum number of **retries after the first failure**, so `attempts_limit=N` means up to `N + 1` total calls. With `attempts_limit=None` (the default) the function is retried indefinitely.

```python
from asynchronously import retry


class VimeoConnector:
    @retry(attempts_limit=3, sleep_time_sec=2, retrying_exceptions=ConnectionError)
    async def get_video_duration(self, vimeo_id: str) -> int:
        return await self._http.post("/video/duration/get", payload={"vimeo_id": vimeo_id})


async def amain() -> None:
    connector = VimeoConnector()
    try:
        duration = await connector.get_video_duration("12345")
    except ConnectionError:
        # all 4 attempts exhausted — the original exception is re-raised
        logger.error("vimeo unavailable, falling back to default duration")
        duration = 0
```

Full signature:

```python
def retry(
    func: Callable[..., Any] | None = None,
    attempts_limit: int | None = None,
    sleep_time_sec: float = 5.0,
    retrying_exceptions: type[Exception] | tuple[type[Exception], ...] = Exception,
) -> Callable[..., Any]
```

Usable both with and without parentheses (`@retry` retries forever, `@retry(attempts_limit=3)` retries up to 3 times).

`asyncio.CancelledError` and other `BaseException` subclasses are not caught — task cancellation works as expected. Each retry is logged at `ERROR` level as `Retry: K/N`, with `K` starting from 1. Decorating a sync function raises `TypeError`.

## Helpers

### `acall`

Call `target` if it's callable, then `await` it if the result is awaitable. Lets a caller accept "anything that produces a value" — sync function, async function, coroutine, or a plain value — without branching on its kind:

```python
from asynchronously import acall

await acall(async_function)      # calls, awaits, returns result
await acall(sync_function)       # calls, returns result
await acall(coroutine_object)    # awaits, returns result
await acall(42)                  # returns 42
```

Used internally by [`Periodic`](#periodic) so jobs can be sync or async.

Any callable is invoked — including classes and instances with `__call__`. To pass such a value through without invoking it, wrap it: `acall(lambda: my_value)`.

### `DummyAsyncContextManager`

A no-op async context manager. Useful where the structure demands `async with` but there's nothing to do — for example to make an optional lock optional at the call site:

```python
from asynchronously import DummyAsyncContextManager


async def update(lock: asyncio.Lock | None = None) -> None:
    async with lock or DummyAsyncContextManager():
        ...
```

Exceptions inside the block propagate normally — `__aexit__` does not suppress them.

## Concurrent awaiting

### `strict_gather`

Like `asyncio.gather`, but on the first error cancels and awaits the remaining siblings before re-raising. Bare `asyncio.gather` leaves the other children running, which can later surface as `"Task was destroyed but it is pending!"` warnings. The original exception (and its traceback) is preserved — unlike `TaskGroup`'s `ExceptionGroup` wrapping.

```python
from asynchronously import strict_gather

results = await strict_gather(
    fetch_user(user_id),
    fetch_profile(user_id),
    fetch_settings(user_id),
)
```

### `await_dict_values`

Awaits every value in a mapping concurrently and returns a plain dict of results, keeping the original keys. On the first failure, siblings are cancelled (via `strict_gather`) and the exception is re-raised.

```python
from asynchronously import await_dict_values

balances = await await_dict_values({
    "stripe": stripe_client.get_balance(),
    "paypal": paypal_client.get_balance(),
    "internal": internal_ledger.get_balance(),
})
```

### `wait_for_all_other_tasks`

Awaits every task in the running event loop except the caller's own. Mainly for test teardown and end-of-process drains:

```python
import pytest
from asynchronously import wait_for_all_other_tasks


@pytest.fixture(autouse=True)
async def drain():
    yield
    await wait_for_all_other_tasks()
```

Because it waits for *every* task — including ones the caller did not start — it can easily produce surprising deadlocks if used inside a long-running service. For ordinary coordination, prefer awaiting specific tasks.

The old name `complete_all_tasks` is kept as an alias for backward compatibility.

## License

[MIT](LICENSE)
