Metadata-Version: 2.4
Name: agentpulse-py
Version: 0.2.0
Summary: Lightweight observability SDK for AI agents — non-blocking run tracking with batching and retries
Project-URL: Homepage, https://github.com/your-org/agenttrace-py
Project-URL: Documentation, https://docs.agenttrace.dev
Project-URL: Bug Tracker, https://github.com/your-org/agenttrace-py/issues
License: MIT
Keywords: agents,ai,llm,monitoring,observability,tracing
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: System :: Monitoring
Classifier: Typing :: Typed
Requires-Python: >=3.8
Provides-Extra: dev
Requires-Dist: build; extra == 'dev'
Requires-Dist: pytest; extra == 'dev'
Requires-Dist: twine; extra == 'dev'
Description-Content-Type: text/markdown

# agenttrace

Lightweight observability SDK for AI agents. Track runs, steps, tokens, cost, and latency — with **zero blocking overhead**.

```python
with agenttrace.track_run("my-agent", model="llama-3.3-70b") as run:
    result = llm.call(prompt)
    run.add_step("llm_response", input=prompt, output=result, tokens=150, latency=320)
```

Completed runs are flushed to your [AgentTrace](https://agenttrace.dev) dashboard in the background — your agent never waits on a network call.

---

## Features

- **Async-safe** — per-run objects on the call stack, no global mutable state
- **Non-blocking** — background worker thread + `queue.Queue`; agent execution is never delayed
- **Reliable** — exponential backoff retries with jitter (4 attempts by default)
- **Batching** — configurable batch size and flush interval
- **Zero dependencies** — stdlib only (`urllib`, `queue`, `threading`, `contextlib`)
- **Sync + async** — context managers and decorator for both sync and async code

---

## Install

```bash
pip install agenttrace
```

---

## Quick start

Set your credentials in `.env` (or export as environment variables):

```env
AGENTTRACE_API_KEY="at_xxxxxxxxxxxxxxxxxxxx"
AGENTTRACE_URL="https://your-dashboard.com"   # default: http://localhost:3001
AGENTTRACE_SERVICE="my-agent"                 # default: agent
```

```python
from dotenv import load_dotenv
load_dotenv()

import agenttrace

with agenttrace.track_run("my-agent") as run:
    # ... your agent logic ...
    run.add_step("llm_response", input="Hello", output="Hi there", tokens=20, latency=310)

# For scripts: flush before process exit
agenttrace.flush()
```

---

## Usage

### Sync — context manager

```python
import time
import agenttrace

with agenttrace.track_run(
    "hotel-search-agent",
    model="llama-3.3-70b",
    user_id="user_123",
    tags=["prod", "search"],
) as run:

    # Track an LLM call
    t0 = time.time_ns()
    response = groq_client.chat.completions.create(model=..., messages=...)
    run.add_step(
        "llm_response",
        input=messages[-1]["content"],
        output=response.choices[0].message.content,
        tokens=response.usage.total_tokens,
        latency=(time.time_ns() - t0) // 1_000_000,   # ns → ms
    )

    # Track a tool call
    t1 = time.time_ns()
    results = web_search(query)
    run.add_step(
        "tool_call",
        input=query,
        output=results[:2000],
        latency=(time.time_ns() - t1) // 1_000_000,
    )
```

The run is enqueued the moment the `with` block exits — whether it succeeded or raised an exception.

### Async — context manager

Identical API, works inside `async def` functions and coroutines:

```python
import agenttrace

async def handle_request(query: str) -> str:
    async with agenttrace.async_track_run("my-agent", model="gpt-4o") as run:
        result = await llm.acall(query)
        run.add_step("llm_response", input=query, output=result, tokens=80, latency=500)
        return result
```

`enqueue()` is synchronous and non-blocking (`queue.put_nowait`), so it is safe to call from async code without `await`.

### Decorator

Wrap a function so every call is tracked automatically:

```python
@agenttrace.traced_run("search-agent")          # explicit name
def search_agent(query: str) -> str:
    ...

@agenttrace.traced_run                          # uses function name
def code_agent(question: str) -> str:
    ...
```

The decorator uses `track_run` internally, so exceptions are caught, the run is marked failed, and the error is recorded before re-raising.

### Marking a run failed

The context manager catches unhandled exceptions automatically. For explicit failure paths:

```python
with agenttrace.track_run("my-agent") as run:
    result = call_external_api()
    if result is None:
        run.fail("External API returned no data")
        return
    run.add_step(...)
```

---

## Configuration

### Environment variables

| Variable | Default | Description |
|---|---|---|
| `AGENTTRACE_API_KEY` | — | **Required.** Your API key (`at_...`) |
| `AGENTTRACE_URL` | `http://localhost:3001` | Backend base URL |
| `AGENTTRACE_SERVICE` | `agent` | Default service/agent name |

### `agenttrace.init()` — programmatic config

Calling `init()` is optional if you use environment variables. Use it to override defaults or tune worker behaviour:

```python
agenttrace.init(
    api_key="at_xxxxxxxxxxxxxxxxxxxx",
    base_url="https://your-dashboard.com",
    service_name="my-agent",

    # Worker tuning (optional)
    batch_size=30,          # flush after this many runs accumulate (default: 20)
    flush_interval=3.0,     # flush every N seconds regardless (default: 2.0)
    max_queue_size=2000,    # drop runs with a warning if queue exceeds this (default: 1000)
    max_retries=5,          # retry attempts per run on transient errors (default: 4)
    retry_base_delay=1.0,   # base backoff in seconds, doubles each retry (default: 0.5)
)
```

`init()` can be called multiple times (e.g. in tests) — it replaces the worker and config.

---

## `run.add_step()` reference

```python
run.add_step(
    step_type,       # str  — see Step types below
    *,
    input="",        # str  — prompt / query / tool input
    output="",       # str  — completion / result / tool output
    tokens=0,        # int  — total tokens for this step
    latency=0,       # int  — wall-clock time in milliseconds
    cost=0.0,        # float — USD cost for this step
    status="success" # "success" | "failed"
)
```

Tokens and cost are **summed automatically** across all steps — you don't need to track totals yourself.

### Step types

| `step_type` | When to use |
|---|---|
| `"llm_response"` | Any LLM completion call |
| `"llm_prompt"` | Prompt-only span (before response arrives) |
| `"tool_call"` | External tool / function call |
| `"tool_response"` | Response from a tool |
| `"user_prompt"` | Initial user message |
| `"decision"` | Routing / branching logic in the agent |

---

## Flushing before exit

The background worker sends continuously in long-running processes (servers, workers). For **short-lived scripts**, call `flush()` before the process exits:

```python
if __name__ == "__main__":
    run_agent(query)
    agenttrace.flush()          # waits up to 10s for the queue to drain
```

An `atexit` handler provides a best-effort flush as a safety net, but an explicit `flush()` is more reliable.

---

## How it works

```
Agent thread                     Background worker thread
─────────────────                ────────────────────────────────
track_run().__enter__()
  → AgentRun created             sleeping (flush_interval elapsed?)
run.add_step(...)
  → appended to run._steps
track_run().__exit__()
  → run.to_payload()             wakes up: batch_size reached or
  → queue.put_nowait(payload)      flush_interval elapsed
                                 drains up to batch_size items
                                 POST /agent-metrics  (with retry)
                                 POST /agent-metrics
                                 ...
```

The agent thread never waits on the network. If the queue fills up (e.g. backend is down for a long time), new runs are dropped with a warning rather than blocking.

---

## Architecture

```
agenttrace/
  __init__.py   Public API: track_run, async_track_run, traced_run, init, flush
  _config.py    Config dataclass — written once at init, read-only thereafter
  _run.py       AgentRun — per-invocation context object, lives on the call stack
  _worker.py    IngestionWorker — daemon thread, Queue consumer, batching + atexit
  _client.py    HTTP POST with exponential backoff retry, stdlib urllib only
  py.typed      PEP 561 marker — enables type checking in downstream projects
```

---

## Full example

```python
import json, os, time
from dotenv import load_dotenv
load_dotenv()

import agenttrace
from groq import Groq

GROQ_MODEL = os.getenv("GROQ_MODEL", "llama-3.3-70b-versatile")

def run_agent(query: str) -> str:
    client = Groq()
    messages = [{"role": "user", "content": query}]

    with agenttrace.track_run("my-agent", model=GROQ_MODEL) as run:
        for _ in range(10):
            t0 = time.time_ns()
            resp = client.chat.completions.create(
                model=GROQ_MODEL, messages=messages, tools=[...], tool_choice="auto"
            )
            run.add_step(
                "llm_response",
                input=messages[-1]["content"],
                output=resp.choices[0].message.content or "",
                tokens=(resp.usage.prompt_tokens + resp.usage.completion_tokens),
                latency=(time.time_ns() - t0) // 1_000_000,
            )

            msg = resp.choices[0].message
            if not msg.tool_calls:
                return msg.content

            for tc in msg.tool_calls:
                args = json.loads(tc.function.arguments)
                t1 = time.time_ns()
                result = my_tool(**args)
                run.add_step(
                    "tool_call",
                    input=json.dumps(args),
                    output=str(result)[:2000],
                    latency=(time.time_ns() - t1) // 1_000_000,
                )
                messages.append({"role": "tool", "tool_call_id": tc.id, "content": result})

        run.fail("Max iterations reached")
        return ""

if __name__ == "__main__":
    print(run_agent("best hotels in Bangalore"))
    agenttrace.flush()
```

---

## License

MIT
