Metadata-Version: 2.4
Name: agentlattice
Version: 0.4.0
Summary: Governance SDK for AI agents — audit trails, policy engine, approval gates.
Project-URL: Homepage, https://agentlattice.com
Project-URL: Repository, https://github.com/inder/agentlattice
Project-URL: Documentation, https://agentlattice.com/docs
License: MIT
License-File: LICENSE
Keywords: agents,ai,approval,audit,governance
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.9
Requires-Dist: httpx>=0.25.0
Provides-Extra: dev
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest>=7.0; extra == 'dev'
Requires-Dist: respx>=0.21; extra == 'dev'
Requires-Dist: tomli>=2.0; (python_version < '3.11') and extra == 'dev'
Requires-Dist: websockets>=12.0; extra == 'dev'
Provides-Extra: realtime
Requires-Dist: websockets>=12.0; extra == 'realtime'
Description-Content-Type: text/markdown

# agentlattice — Python SDK

Governance-aware SDK for AI agents. Wraps agent actions with audit trails,
approval gates, and circuit breaker integration.

## Install

```bash
pip install agentlattice
```

## Quickstart

```python
import asyncio
import os
from agentlattice import AgentLattice

al = AgentLattice(api_key=os.environ["AL_API_KEY"])

async def main():
    # gate() blocks until approved — your agent code only runs if it passes
    await al.gate("pr.merge")
    await merge_pr(42)

asyncio.run(main())
```

Handle denials and timeouts:

```python
from agentlattice import AgentLattice, AgentLatticeDeniedError, AgentLatticeTimeoutError

al = AgentLattice(api_key=os.environ["AL_API_KEY"])

async def main():
    try:
        await al.gate("code.commit")
    except AgentLatticeDeniedError as e:
        print(f"Denied: {e.reason}, policy: {e.policy}")
        print(f"Failed conditions: {e.conditions}")
    except AgentLatticeTimeoutError as e:
        print(f"Approval timed out: {e.approval_id}")

asyncio.run(main())
```

## Sync usage

If you're not in an async context:

```python
import os
from agentlattice import AgentLattice

al = AgentLattice(api_key=os.environ["AL_API_KEY"])
al.gate_sync("code.commit")
```

## API

### `AgentLattice(api_key, *, base_url?, gate_poll_timeout_ms?, gate_poll_interval_ms?)`

| Parameter | Default | Description |
|-----------|---------|-------------|
| `api_key` | required | Bearer token — pass via `os.environ["AL_API_KEY"]`, never hardcoded |
| `base_url` | `https://agentlattice.com` | Override for self-hosted or staging |
| `gate_poll_timeout_ms` | `8 * 60 * 60 * 1000` (8 hours) | How long `gate()` polls for approval |
| `gate_poll_interval_ms` | `5000` | Poll interval in ms |

### `await al.execute(action_type, options?)`

Submit an action and return immediately. Returns an `ActionResult`. Does not block.

```python
from agentlattice import AgentLattice, ActionOptions

al = AgentLattice(api_key="al_...")
result = await al.execute("pr.open", ActionOptions(
    data_accessed=[{"type": "source_code", "count": 5, "sensitivity": "low"}],
    metadata={"pr_number": 42},
    event_id="my-idempotency-key",   # optional: dedup by your own key
))

if result.status == "executed":
    print(f"Auto-executed under policy: {result.policy_name}")
elif result.status == "requested":
    print(f"Awaiting approval: {result.approval_id}")
elif result.status == "denied":
    print(f"Denied ({result.denial_reason}): {result.conditions_evaluated}")
```

### `await al.gate(action_type, options?)`

Submit an action and block until it is approved or denied.

```python
try:
    await al.gate("pr.merge")
except AgentLatticeDeniedError as e:
    # e.reason:     "CONDITIONS_DENIED" | "POLICY_TAMPERED" | "DENIED_BY_REVIEWER"
    # e.policy:     name of the governing policy (if conditions-based)
    # e.conditions: list of ConditionResult (which rule failed and why)
    # e.approval_id: set if a human reviewer denied it
    ...
except AgentLatticeTimeoutError as e:
    # e.approval_id: the approval that timed out
    ...
```

### `al.gate_sync(action_type, options?)` / `al.execute_sync(action_type, options?)`

Synchronous wrappers — identical semantics, no `asyncio.run()` needed.

## ActionResult

```python
@dataclass
class ActionResult:
    status: str                          # "executed" | "requested" | "denied" | ...
    audit_event_id: str
    approval_id: str | None              # set when status="requested"
    message: str | None
    timeout_at: str | None
    denial_reason: str | None            # "CONDITIONS_DENIED" | "POLICY_TAMPERED" | ...
    policy_name: str | None              # governing policy name
    conditions_evaluated: list[ConditionResult]  # which rules passed/failed
```

## ConditionResult

```python
@dataclass
class ConditionResult:
    field: str       # e.g. "pr_size"
    operator: str    # e.g. "lt"
    expected: Any    # e.g. 100
    result: bool     # True = passed, False = failed
```

Agents can read `conditions_evaluated` to understand exactly which governance
rule blocked them and adapt their behavior accordingly.

## Self-correcting agent pattern

`conditions_evaluated` is what makes this more than a compliance layer — it
closes the feedback loop so agents can adapt, not just fail.

```python
from agentlattice import AgentLattice, AgentLatticeDeniedError, ActionOptions

al = AgentLattice(api_key=os.environ["AL_API_KEY"])

async def commit_with_governance(files: list[str]) -> None:
    try:
        await al.gate("pr.open", ActionOptions(
            metadata={"pr_size": len(files), "repo": "acme/backend"},
        ))
        await open_pr(files)

    except AgentLatticeDeniedError as e:
        if e.reason == "CONDITIONS_DENIED":
            # Find which rules failed and why
            failed = [c for c in e.conditions if not c.result]
            # e.g. [ConditionResult(field="pr_size", operator="lt", expected=50, result=False)]

            for rule in failed:
                if rule.field == "pr_size":
                    # Policy says PRs must be < 50 files — split and retry
                    mid = len(files) // 2
                    await commit_with_governance(files[:mid])
                    await commit_with_governance(files[mid:])
                    return

        # Reviewer denial or policy tamper — no retry
        raise
```

## Delegation — Scoped Sub-Agents

Create short-lived child agents with narrowed capabilities. Cleanup is automatic.

```python
async with al.delegate("data-processor",
    capabilities=["read_data", "write_results"],
    ttl=300
) as sub:
    await sub.execute("read_data")
    result = await sub.execute("write_results")
```

Sync version:

```python
with al.delegate_sync("data-processor",
    capabilities=["read_data"], ttl=300
) as sub:
    sub.execute_sync("read_data")
```

### Parallel fan-out

```python
async with AgentLattice.parallel(
    al.delegate("reader-1", capabilities=["read_data"], ttl=60),
    al.delegate("reader-2", capabilities=["read_data"], ttl=60),
) as [r1, r2]:
    result_1 = await r1.execute("read_data")
    result_2 = await r2.execute("read_data")
```

### `al.delegate(name, *, capabilities, ttl)`

| Parameter | Description |
|-----------|-------------|
| `name` | Display name for the ephemeral child agent |
| `capabilities` | List of action types the child can perform (must be subset of parent's) |
| `ttl` | Time-to-live in seconds (max 86400 = 24 hours) |

Returns an async context manager yielding a `DelegatedAgent`. Cleanup runs
in `__aexit__` regardless of exceptions.

### `DelegatedAgent`

| Method | Description |
|--------|-------------|
| `sub.execute(action_type, options?)` | Execute an action as the child |
| `sub.gate(action_type, options?)` | Execute + block until approved |
| `sub.delegate(name, *, capabilities, ttl)` | Chain further with narrower scope |

All methods have `_sync` variants.

See [docs/ephemeral-agents.md](../../docs/ephemeral-agents.md) for architecture details,
security model, and cleanup guarantees.

## Introspection & Governance (Tier 2)

These methods give agents visibility into their own identity, policies, audit trails, and governance posture.

### `await al.whoami()`

Returns this agent's identity, active policies, and delegation relationships.

```python
info = await al.whoami()
print(f"Agent: {info.name} ({info.config_id})")
print(f"Circuit breaker: {info.cb_state}")
print(f"Policies: {[p.name for p in info.policies]}")
```

### `await al.policies(scope?)`

List policies. `scope="own"` (default) returns policies for this agent. `scope="org"` returns all org policies.

```python
result = await al.policies(scope="org")
for p in result.policies:
    print(f"{p.name}: {p.action_type} (approval={p.approval_required})")
```

### `await al.can_i(action_type)`

Dry-run policy check. No audit event created, no approval requested.

```python
check = await al.can_i("pr.merge")
if check.allowed:
    print("Good to go")
elif check.needs_approval:
    print(f"Will need approval under policy: {check.policy_name}")
```

### `await al.audit(cursor?, limit?, action_type?)`

Query the audit trail with pagination.

```python
page = await al.audit(limit=50, action_type="pr.merge")
for event in page.events:
    print(f"{event.timestamp}: {event.action_type} -> {event.status}")
if page.next_cursor:
    next_page = await al.audit(cursor=page.next_cursor)
```

### `await al.verify()`

Verify the org's audit chain integrity (tamper detection).

```python
result = await al.verify()
if not result.chain_valid:
    print(f"Chain broken at row: {result.first_broken_at_row_id}")
```

### `await al.delegations(role?, active_only?)`

List delegations. Filter by `role="parent"` or `role="child"`, and `active_only=False` to include expired.

```python
result = await al.delegations(role="parent", active_only=False)
for d in result.delegations:
    print(f"{d.id}: active={d.active}, expires={d.expires_at}")
```

### `await al.revoke(delegation_id)`

Revoke a delegation by ID.

```python
result = await al.revoke("del-abc123")
assert result.revoked
```

### `await al.report(audit_event_id, outcome)`

Report the outcome of a previously executed action. Closes the audit loop.

```python
from agentlattice import ReportOutcome

result = await al.report(
    "evt-abc123",
    ReportOutcome(status="success", message="Deployed to prod"),
)
print(f"Reported at: {result.reported_at}")
```

Outcome statuses: `"success"`, `"failure"`, `"partial"`.

### `await al.posture()`

Get the org's governance posture score (0-100).

```python
result = await al.posture()
print(f"Score: {result.score}/100")
for name, comp in result.components.items():
    print(f"  {name}: {comp.score}/{comp.max}")
```

### Sync wrappers

Every Tier 2 method has a `*_sync()` variant:

```python
info = al.whoami_sync()
check = al.can_i_sync("deploy")
score = al.posture_sync()
```

## Realtime Events

Subscribe to governance events via WebSocket. Requires `pip install agentlattice[realtime]`.

```python
from agentlattice import AgentLattice, GovernanceEvent

al = AgentLattice(api_key=os.environ["AL_API_KEY"])

def on_event(event: GovernanceEvent):
    print(f"[{event.event_type}] agent={event.agent_id} data={event.data}")

await al.subscribe("org-id", on_event)

# Filter to specific event types
await al.subscribe("org-id", on_event, events=["action.denied", "anomaly.detected"])

# Filter to a specific agent
await al.subscribe("org-id", on_event, agent_filter="agent-123")

# Cleanup
await al.unsubscribe()
```

Event types: `action.denied`, `action.executed`, `delegation.expired`, `delegation.revoked`, `policy.changed`, `anomaly.detected`, `enforcement.triggered`.

## Error hierarchy

```
AgentLatticeError          base — code, message, details
  AgentLatticeDeniedError  approval_id?, reason?, policy?, conditions?
  AgentLatticeTimeoutError approval_id
```

## Reliability

`execute()` and `gate()` retry automatically on 5xx and network errors — up to 3 attempts with 1s/2s/4s exponential backoff. Each request has a 30-second timeout. Pass `event_id` to make retries idempotent.

`execute_sync()` and `gate_sync()` are safe in async runtimes (LangGraph, FastAPI, CrewAI) — they detect a running event loop and dispatch to a thread pool automatically.

## License

MIT
