Metadata-Version: 2.1
Name: asyncgraphs
Version: 0.0.2
Project-URL: Source, https://github.com/SamVermeulen42/asyncgraphs
Author-email: Sam Vermeulen <sam.verm@yahoo.com>
License-Expression: MIT
License-File: LICENSE
Requires-Python: >=3.10
Requires-Dist: sentinel<1.1.0,>=1.0.0
Provides-Extra: dev
Requires-Dist: black==23.1.0; extra == 'dev'
Requires-Dist: coverage<8.0,>=7.1.0; extra == 'dev'
Requires-Dist: isort==5.12.0; extra == 'dev'
Requires-Dist: mypy==1.0.1; extra == 'dev'
Requires-Dist: pytest-asyncio<0.21.0,>=0.20.0; extra == 'dev'
Requires-Dist: pytest<8.0.0,>=7.2.1; extra == 'dev'
Requires-Dist: ruff==0.0.247; extra == 'dev'
Description-Content-Type: text/markdown

# AsyncGraphs

AsyncGraphs is a tiny ETL framework that leverages asyncio to make the execution more efficient.


## Installation

```commandline
pip install asyncgraphs
```

## Basic usage

```python
import asyncio
import datetime
from random import random

import pytz
from asyncgraphs import Graph, run


async def my_extract():
    while True:
        await asyncio.sleep(1)
        yield {"timestamp": datetime.datetime.now(tz=pytz.UTC), "value": random()}


def my_transform(in_doc):
    if in_doc["value"] < 0.5:
        in_doc["value"] = None
    return in_doc


class MyForwardFill:
    def __init__(self):
        self.last_value = None

    def __call__(self, in_doc):
        if in_doc["value"] is None:
            in_doc["value"] = self.last_value
        else:
            self.last_value = in_doc["value"]
        return in_doc


async def main():
    g = Graph()
    g | my_extract() | my_transform | MyForwardFill() | print
    await run(g)

if __name__ == '__main__':
    asyncio.run(main())
```

The example above shows some dummy extract/transform/load steps.
In the example most are synchronous, but regular applications should use async libraries as often as possible.
