Metadata-Version: 2.1
Name: api-pipe
Version: 2.0.22
Summary: API Pipe
Author: Rafael Roman Otero
Author-email: rromanotero@romanlabs.com
Requires-Python: >=3.11,<4.0
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Dist: httpx (>=0.26.0,<1.0.0)
Requires-Dist: rich (>=13.7.0,<14.0.0)
Description-Content-Type: text/markdown


# API Pipe

API Pipe is a simple Python library for fetching data from a URL endpoint (GET) and applying transformations from a set of **predefined steps** (filter, map, select, etc) on the fetched data. In other words it helps you create a data pipeline from data fetched from an API Endpoint.

It logs the full data in every step for inspection. Though simple, it can be handy, as:

1) Simplifies applying transformations on data
2) Simplifies development/debugging, as often lack of visibility on API data and transformations make the task cumbersome. 
3) Simplifies working with multiple API endpoints. API Pipe will keep the logs tidy and organized under a logs directory.
4) Built-in exponential retries
5) Takes in [httpx](https://github.com/encode/httpx) (both `sync` and `async`) clients to make GET API requests
6) Uses [rich](https://github.com/Textualize/rich) to pretty print logs.

# Installation

```bash
pip3 install api-pipe
```

# Steps

Available data transformation steps:

- fetch         (GET)
- fetch_async   (GET Async)
- filter
- map
- select
- key
- to_python
- to_json

# Examples

First define a URL and common parameters for all the API calls that will be made:

```python
gitlab_url = Url("https://gitlab.com/api/v4")

common_params = ApiParams(
    headers={
        "PRIVATE-TOKEN": os.environ["TOKEN"]
    },
    timeout=(5.0, 5.0),
    retries={
        "initial_delay": 0.5,
        "backoff_factor": 3,
        "max_retries": 7,
    },
    logs={
        "unique_name": "__TO_BE_REPLACED__",
        "log_dir": Path("../logs"),
        "level": logging.DEBUG,
        "words_to_highlight": [config.logger_words_to_highlight]
    }
)
```

- headers: passed directly to the httpx client
- timeout: passed directly to the httpx client
- retries: parameters for the retry mechanism with exponential backoff
- logs: log configuration 
    - log_dir: parent log dir, can be shared by multiple Api objects to keep logs tidy
    - unique_name: 
        - Each Api object has its own logger, the logger uses `unique_name` to make itself unique so it won't create the same logger multiple times and endup printing the same log message 20 times (one per each created logger object). 
        - Also used to organize logged output by directory. A directory named exactly as passed in `unique_name` will be created and any logged files for this object will be placed in there.
    
    - level: log level
    - words_to_highlight: this is to configure `rich` to highlight any word in this list


## Example 1

This example is a single API call to read all variable from a Gitlab API endpoint, select only certain fields of interest, and filter by variable key. Notice how `unique_name` is set to something meaningful and how params are copied from common params so they can be shared for multiple API objects.

```python

params = deepcopy(common_params)
params.logs["unique_name"] = "test_gitlab_read_var"

with httpx.Client() as client:
    api = ApiPipe(
        gitlab_url / "groups" / GROUP_ID / "variables",
        client,
        params
    )

    api                         \
        .fetch()                \
        .to_python()            \
        .select([
            "key",
            "value",
            "masked",
        ])                      \
        .filter(
            lambda item: item["key"] == "Var2"
        )                       \
        .to_json(indent=2)

    print(api.data)
```

When run, all data transformation steps are logged to `Path("../logs")`:

![Demo](images/example1_demo_logs.gif)

Log level can be changed to reduce logging, say for prod:

![Demo](images/example1_demo_log_level_info.gif)

Sometimes calls fail with transient 500 errors or get rate-limited, and depending on a number of things (say you're using threads to call the API) there may be the need to wait for a while. This is where the built-in exponential retrying come in handy:

![Demo](images/example1_demo_retrying.gif)

## Example 2

This example demonstrastes paginated calls to the Gitlab API to extract all the CI/CD variable from a Gitlab group. To force multiple API calls, page size is set to 1 (`per_page=1`). Because there are 2 CI/CD variables in `DUMMY_TEST_GROUP_ID` this will result in 3 API calls (2 where data is returned, 1 empty).

```python
params = deepcopy(common_params)
page_number = 1
results = []

while True:

    url = gitlab_url                    \
        / "groups"                      \
        / DUMMY_TEST_GROUP_ID           \
        / "variables"                   \
        / f"?page={page_number}&per_page=1"

    params.logs["unique_name"] = f"test_run_fetch_all_{page_number}"

    api = ApiPipe(
        url,
        client,
        params
    )
    api                         \
        .fetch()                \
        .to_python()            \
        .select([
            "key",
            "value",
            "masked",
        ])

    if api.data:
        results += api.data
    else:
        break

    page_number += 1

    print(
        "Number of calls made: ", page_number, " (last one was empty)"
    )

    print(
        json.dumps(results, indent=2)
    )
```

each call is logged for inspection:

![Demo](images/example2_demo_fetch_all.gif)

mixing paginated calls with calls to multiple API endpoints. 

![Demo](images/example2_demo_multiple_calls.gif)

# Use case (sample pipeline)

1. Collect all Gitlab users from 150 pages (150 API calls).
    - This is similar to what is shown in Example 2
2. Keep only the fields `id`,`username`, `last_activity_on`
```python
.select(
    "id",
    "user",
    "last_activity_on"
)
```
3. Based on `last_activity_on` calculate number of days inactive:

```python
.map(
    lambda user: {
        **user,
        "days_inactive" : calculate_days_inactive(user['last_activity_on'])
    }
)
```
4. Keep only users that have been inactive longer than a threshold:

```python
.filter(
    lambda user:
        user['days_inactive'] > threshold
)
```
5. For each user get all their group and project memberships, but only those where they have owner access, and don't keep all the fields, only `source_id` and `access_level`:

```python
.map(
    lambda user:
        **user
        "memberships" : read_memberships(
            user['id'], ...
        ).select([
            "source_id",
            "access_level",
        ]).filter(
            lambda member: member['access_level'] >= OWNER
        )
)
```
6. Clean data, remove users with no memberships

```python
.filter(
    lambda user: len(user['memberships']) > 0
)
```
7. Remove duplicate memberships (a Gitlab bug... really)

```python
.map(
    lambda user: {
        **user,
        "memberships" : remove_duplicated(member['memberhip'])
    }
)
```
Verify the final user list is correct and downgrade them from owner to developers. 

*In fact this is a similar example to the scenario that led to the creation of this library, you want to "see" ALL the data exactly as it was fetched and in every transformation step, for debugging and for verification. That and auto retry exponentially on random 500 errors and not so random rate limit errors*



