Metadata-Version: 2.4
Name: quantava
Version: 0.2.0
Summary: First-party Python SDK for the Quantava REST API: robust workflow execution, polling, and parallel runs.
Project-URL: Homepage, https://quantava.io
Project-URL: Source, https://github.com/Quantava/quantava-python-sdk
Author: Quantava
License: MIT
Keywords: automation,quantava,sdk,workflow
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Typing :: Typed
Requires-Python: <3.13,>=3.11
Requires-Dist: httpx>=0.27
Requires-Dist: pydantic>=2.7
Provides-Extra: dev
Requires-Dist: mypy>=1.10; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest>=8; extra == 'dev'
Requires-Dist: respx>=0.21; extra == 'dev'
Requires-Dist: ruff>=0.5; extra == 'dev'
Provides-Extra: sse
Requires-Dist: httpx-sse>=0.4; extra == 'sse'
Description-Content-Type: text/markdown

# quantava

First-party Python SDK for the [Quantava](https://quantava.io) workflow platform.
Trigger a deployed workflow, poll to a terminal state with backoff, return a
typed result, and fan out many runs in parallel with bounded concurrency and
partial-failure isolation.

- Async-first core with a synchronous facade — use whichever fits your code.
- Typed inputs inferred from each workflow's trigger schema.
- Tiny dependency surface: `httpx` + `pydantic` v2.

## Install

```bash
pip install quantava
# Server-Sent Events streaming support:
pip install "quantava[sse]"
```

Python 3.11-3.12.

## Quickstart

No `with` block, no client to manage — fan out one workflow over many inputs and
print a summary:

```python
import quantava as qv

batch = qv.run_many("wf_id", [{"x": 1}, {"x": 2}], max_concurrency=5, progress=True)
batch.print_summary()
```

A single run, waited to completion, returning its output:

```python
import quantava as qv

output = qv.run_and_wait("wf_id", {"x": 1})
print(output)
```

These read your key from the environment (`QUANTAVA_API_KEY`) or accept it
explicitly: `qv.run_and_wait("wf_id", {...}, api_key="qva_...")`.

> **Prerequisite:** the API key must have the **`workflows:execute`** scope, or
> the trigger fails with a `PermissionDeniedError`. (`ensure_deployed=True` also
> needs `workflows:deploy`.)

## Authenticate

The SDK uses an API key (`X-API-Key: qva_...`). The base URL defaults to
`https://quantava.io/api/v1`; override it with the `base_url` argument or the
`QUANTAVA_BASE_URL` environment variable for self-host or staging.

```bash
export QUANTAVA_API_KEY=qva_your_key
# optional:
export QUANTAVA_BASE_URL=https://quantava.io/api/v1
```

## Explicit client (streaming, reuse)

For repeated calls, streaming, or fine-grained control, construct a client once
and reuse it. The synchronous client uses a `with` block; the async client uses
`async with`.

```python
from quantava import Quantava, WorkflowExecutionError

with Quantava.from_env() as q:                 # reads QUANTAVA_API_KEY / QUANTAVA_BASE_URL
    execution = q.workflows.run("wf_id", pdf_url="https://example.com/a.pdf")
    try:
        output = execution.result(timeout=300)  # waits to terminal, returns output_data
    except WorkflowExecutionError as exc:
        print(f"failed: {exc} (execution_id={exc.execution_id})")
    else:
        print(output)
```

```python
import asyncio
from quantava import AsyncQuantava

async def main():
    async with AsyncQuantava.from_env() as q:
        execution = await q.workflows.run("wf_id", pdf_url="https://example.com/a.pdf")
        output = await execution.result(timeout=300)
        print(output)

asyncio.run(main())
```

You can also pass the key directly (`Quantava(api_key="qva_...")`) and cheaply
reconfigure a client with `q.with_options(max_concurrency=20, timeout=60)`.

## Typed inputs

Each workflow's trigger node declares an input schema (name, type, required,
default, description). The SDK surfaces those as typed, validated parameters and
catches mistakes before a wasted round-trip.

```python
wf = q.workflows.get("wf_id")
for field in wf.inputs.fields:
    print(field.name, field.json_type, "required" if field.required else "optional")
wf.inputs.is_free_form        # True when no usable schema is declared

execution = wf.run(pdf_url="https://example.com/a.pdf", page={"n": 1})
```

Escape hatches: `q.workflows.run(id, ..., validate=False)` or
`q.workflows.run_raw(id, input_data={...})` send arbitrary JSON untouched.

> **Note:** Typed-input checks are **client-side** only. The Quantava server
> does not enforce the trigger schema — it accepts `input_data` verbatim. The
> SDK validation gives parity with the web run-form and catches obvious
> mistakes, but a server that accepts your input does not imply the schema
> matched.

## Parallel execution

Fan out many runs with bounded concurrency, automatic retries, idempotency-safe
triggers, and partial-failure isolation. Results are collected by default.

```python
from quantava import AsyncQuantava, RunSpec

async with AsyncQuantava.from_env() as q:
    batch = await q.run_parallel(
        [
            ("wf_a", {"x": 1}),
            ("wf_b", {"x": 2}),
            RunSpec(workflow_id="wf_c", input={"x": 3}, per_run_timeout=120),
        ],
        max_concurrency=8,       # None falls back to config.max_concurrency (default 8)
        fail_fast=False,         # collect-all default; True cancels remaining on first failure
        timeout=600,             # overall batch budget (None = no cap)
        per_run_timeout=300,     # per-run budget (None = poll to terminal)
        progress=True,
    )
    batch.print_summary()
    print(batch.ok, batch.failed, batch.outputs)
    batch.raise_for_errors()     # raises BatchError(failed) if any run failed
```

`batch` is a sequence of `RunResult` in input order — iterate it (`for r in
batch: r.ok / r.output / r.input / r.error`) to map results back to jobs.
`batch.outputs` holds the **successful** outputs only, so it is *not*
positionally aligned with the input jobs.

The synchronous `Quantava.run_parallel(...)` / `Quantava.run_many(...)` drive the
same async engine on a background event-loop thread, so they work in plain
scripts and notebooks.

## Errors

All exceptions import from `quantava` and derive from `QuantavaError`.

```python
from quantava import (
    QuantavaError,           # base
    APIError,                # server error: status_code, code, message, details, request_id
    AuthenticationError,     # 401
    PermissionDeniedError,   # 403 (e.g. missing workflows:execute scope)
    NotFoundError,           # 404
    ConflictError,           # 409
    ValidationError,         # 400/422 -> .field_errors
    RateLimitError,          # 429 -> .retry_after, .limits
    ServerError,             # 5xx
    ConnectionError,         # network/transport
    TimeoutError,            # request-level timeout
    WorkflowNotDeployedError,
    WorkflowExecutionError,  # .execution_id, .error_message, .request_id
    ExecutionTimeout,        # wait() exceeded timeout (no server-side cancel by default)
    WorkflowInputError,      # client-side input validation
    BatchError,              # aggregates failed RunResults
)
```

Catch what you care about and let the rest propagate:

```python
import quantava as qv
from quantava import (
    RateLimitError,
    WorkflowExecutionError,
    WorkflowInputError,
    QuantavaError,
)

try:
    output = qv.run_and_wait("wf_id", {"pdf_url": "https://example.com/a.pdf"})
except WorkflowInputError as exc:        # client-side: bad/missing inputs
    print(f"fix your inputs: {exc}")
except RateLimitError as exc:            # 429 (auto-retried first; surfaced if exhausted)
    print(f"rate limited, retry after {exc.retry_after}s")
except WorkflowExecutionError as exc:    # the run reached a terminal failure
    print(f"run {exc.execution_id} failed: {exc.error_message}")
except QuantavaError as exc:             # anything else from the SDK
    print(f"unexpected: {exc}")
```

The retry engine automatically retries `RateLimitError` (honoring `Retry-After`,
including HTTP-date values), `ServerError` 502/503/504, and connect/read timeouts
on idempotent reads. Everything else fails fast.

## Examples

Runnable scripts live in [`examples/`](./examples/): `quickstart_sync.py`,
`quickstart_async.py`, and `parallel.py`.

## License

MIT
