Metadata-Version: 2.4
Name: batch-openai
Version: 0.3.0
Summary: A developer-friendly helper for OpenAI's Batch API
License-File: LICENSE
Requires-Python: >=3.11
Requires-Dist: openai>=2.0.1
Requires-Dist: pydantic>=2.0
Requires-Dist: python-dotenv>=1.2.2
Description-Content-Type: text/markdown

# batch-openai

<div>
    <img src="https://img.shields.io/badge/python-%3E%3D3.11-blue"/>
    <img src="https://img.shields.io/badge/license-MIT-green"/>
    <!-- <img src="https://img.shields.io/badge/pypi-v0.3.0-orange"/> -->
</div>

A developer-friendly Python helper for OpenAI's [Batch API](https://platform.openai.com/docs/guides/batch).

OpenAI's Batch API processes requests asynchronously at 50% lower cost, but using it raw means writing ~70 lines of boilerplate per script — manual JSONL construction, file upload, batch creation, state files to survive restarts, a polling loop, and nested dict parsing. This library reduces that lifecycle to ~16 lines. See the [before/after comparison](docs/before-after.md) for real-world examples with measured reductions of 68–77%.

It provides a `BatchRequestBuilder` for constructing batch requests, a `BatchJob` for managing the lifecycle of a batch, and a `BatchResult` for handling outputs with ease. It also includes a local `BatchRegistry` for tracking jobs across sessions and a CLI for managing batches without code.

## Installation

```bash
pip install batch-openai
```

Or with [uv](https://github.com/astral-sh/uv):

```bash
uv add batch-openai
```

**Note:** Requires Python 3.11+ and an `OPENAI_API_KEY` environment variable.

## Quick Start

```python
from openai import OpenAI
from batch_openai import BatchRequestBuilder, BatchJobError

client = OpenAI()

builder = BatchRequestBuilder()

for i, text in enumerate(my_documents):
    builder.add(
        messages=[{"role": "user", "content": f"Summarise this: {text}"}],
        model="gpt-5.4-mini",
        custom_id=f"doc-{i}",  # optional — auto-generated if omitted
    )

# Upload + create batch in one call
job = builder.submit(client, description="doc summarisation")
print(f"Batch submitted: {job.batch_id}")

# Poll until done (prints progress each tick)
result = job.wait(
    poll_interval=60,
    on_progress=lambda b: print(b.status, b.request_counts.completed),
)

print(f"Succeeded: {len(result.succeeded)}  Failed: {len(result.failed)}")

for item in result.succeeded:
    print(item.custom_id, item.content)

for item in result.failed:
    print(item.custom_id, item.error)
```

## Model Compatibility

OpenAI model generations differ in which parameters they accept. Passing an unsupported parameter causes the batch request to fail with a `400 Bad Request` — which is **not retried** since it is a client error, not a transient one.

| Parameter | `gpt-4o`, `gpt-4o-mini` | `gpt-5`, `gpt-5.1`, `gpt-5.2`, `gpt-5.4` (incl. `-mini` variants) |
|---|---|---|
| Token limit | `max_tokens` | `max_completion_tokens` |
| Sampling | `temperature` | Not supported — omit entirely |
| Log probabilities | `logprobs`, `top_logprobs` | Not supported |

```python
# gpt-4o family
builder.add(messages=[...], model="gpt-4o-mini", max_tokens=200, temperature=0.7)

# gpt-5.x family — use max_completion_tokens and omit temperature
builder.add(messages=[...], model="gpt-5.4-mini", max_completion_tokens=200)
```

When mixing models in one batch, each `.add()` call can specify its own `model` and parameters independently.


## Structured Output (Pydantic)

Use `result.parse(Model)` to validate responses directly into a Pydantic model:

```python
from copy import deepcopy

from pydantic import BaseModel
from batch_openai import BatchRequestBuilder

class Summary(BaseModel):
    # This is required to enable strict mode in the JSON schema,
    # which ensures the model's output matches the schema exactly.
    model_config = ConfigDict(extra="forbid")

    title: str
    key_points: list[str]

def _strip_titles(obj: object) -> object:
    """Remove 'title' keys from a JSON schema so OpenAI strict mode accepts it."""
    if isinstance(obj, dict):
        return {k: _strip_titles(v) for k, v in obj.items() if k != "title"}
    if isinstance(obj, list):
        return [_strip_titles(v) for v in obj]
    return obj

schema = _strip_titles(deepcopy(Summary.model_json_schema()))

SCHEMA = {
    "type": "json_schema",
    "json_schema": {
        "name": "summary",
        "schema": schema,
        "strict": True,
    },
}

builder = BatchRequestBuilder()
for doc in documents:
    builder.add(
        messages=[{"role": "user", "content": f"Summarise: {doc.text}"}],
        model="gpt-5.4-mini",
        response_format=SCHEMA,
        custom_id=f"doc-{doc.id}",
    )

result = builder.submit(client).wait(poll_interval=60)

summaries: list[Summary] = result.parse(Summary)
for s in summaries:
    print(s.title, s.key_points)
```

## Correlating Results with Inputs

Use `.join()` to map results back to the original input objects by `custom_id`:

```python
joined = result.join(documents, key=lambda doc: f"doc-{doc.id}")

for item in joined:
    print(item.input.title)         # original document
    if item.output is not None:
        print(item.output.content)  # BatchResultItem for this document
```

`item.output` is `None` if no result was found for that input.



## Tracking Jobs Across Sessions

Use `BatchRegistry` to persist batch IDs locally (stored in `~/.batch_openai/registry.db`):

```python
from batch_openai import BatchRegistry

registry = BatchRegistry()

# After submitting:
job = builder.submit(client, description="nightly eval")
registry.track(job)

# In a later session — reconnect by batch ID:
job = registry.get("batch_abc123", client)
result = job.wait()

# Or reconnect directly without the registry:
from batch_openai import BatchJob
job = BatchJob.from_id(client, "batch_abc123")

# List all tracked batches:
for record in registry.list():
    print(record.batch_id, record.status, record.description)

# Update status after completion:
registry.update_status(job.batch_id, job.status)

# Remove when no longer needed:
registry.delete("batch_abc123")
```

## CLI

After installation, the `batch-openai` command is available for managing jobs from the terminal without writing code.

```bash
# List all batches tracked in the local registry (no API call)
batch-openai list

# Show live status of a specific batch
batch-openai status batch_abc123

# Cancel a running batch
batch-openai cancel batch_abc123

# Download results as JSONL to stdout
batch-openai download batch_abc123

# Download to a file
batch-openai download batch_abc123 --output results.jsonl

# Use a custom registry location
batch-openai --registry ./my_project/.batch list
```

Commands that call the OpenAI API read `OPENAI_API_KEY` from the environment or a `.env` file. `list` reads only the local SQLite registry and requires no API key.

## Accessing Raw Responses

For use cases that need logprobs, token usage, or finish reason, use `item.raw_response`:

```python
builder.add(
    messages=[{"role": "user", "content": "Is this review positive? Reply Yes or No."}],
    model="gpt-4o-mini",
    max_tokens=1,
    logprobs=True,
    top_logprobs=5,
)

result = builder.submit(client).wait()

item = result.succeeded[0]
top_logprobs = item.raw_response["choices"][0]["logprobs"]["content"][0]["top_logprobs"]
prob_yes = next((lp["logprob"] for lp in top_logprobs if lp["token"] == "Yes"), None)
```

> **Note:** GPT-5 models currently do not support `logprobs` or `top_logprobs`, so these fields will be `None` in the raw response when using those models.

## Error Handling

All library exceptions extend `OpenAIBatchError`. Transient API errors — rate limits (429), server errors (5xx), connection drops, and timeouts — are **automatically retried** with exponential backoff. No configuration is needed.

```python
from batch_openai import (
    OpenAIBatchError,      # base class for all library exceptions
    BatchJobError,         # batch ended in failed / expired / cancelled state
    BatchTransientError,   # retries exhausted (429 / 5xx / connection)
    BatchAuthError,        # 401 after single retry
    BatchValidationError,  # 400 / 422 — bad request, never retried
)

try:
    result = job.wait()
except BatchJobError as e:
    print(f"Batch {e.batch_id} ended with status {e.status!r}")
except BatchTransientError as e:
    print(f"API unavailable after retries: {e.status_code} {e.request_id}")
except OpenAIBatchError as e:
    print(f"Unexpected error: {e}")
```

---

## API Reference

### `BatchRequestBuilder`

| Method | Description |
|---|---|
| `.add(messages, *, custom_id, model, **params)` | Add one request. `custom_id` is auto-generated if omitted. Returns `self`. |
| `.to_jsonl(path)` | Write all requests to a JSONL file. Returns the `Path`. |
| `.submit(client, *, description)` | Upload + create batch. Returns `BatchJob`. |
| `len(builder)` | Number of requests accumulated so far. |

### `BatchJob`

| Method / Property | Description |
|---|---|
| `.wait(*, poll_interval, on_progress)` | Block until terminal state. Returns `BatchResult`. Raises `BatchJobError` if not completed. |
| `.status` | Last known status (no network call). |
| `.batch_id` | OpenAI batch ID. |
| `.input_file_id` | Uploaded input file ID. |
| `.description` | Description from batch metadata. |
| `BatchJob.from_id(client, batch_id)` | Reconstruct from an existing batch ID. |

### `BatchResult`

| Method / Property | Description |
|---|---|
| `.succeeded` | `list[BatchResultItem]` — items with a successful response. |
| `.failed` | `list[BatchResultItem]` — items with an error. |
| `.parse(Model)` | Parse `.content` of each succeeded item into a Pydantic model. |
| `.join(inputs, key)` | Correlate results to inputs. Returns `list[JoinedItem]`. |
| `iter(result)` / `len(result)` | Iterate or count all items. |

### `BatchResultItem`

| Field | Description |
|---|---|
| `.custom_id` | The request identifier. |
| `.content` | `choices[0].message.content` shortcut. `None` on error. |
| `.error` | Error dict if the request failed. `None` on success. |
| `.raw_response` | Full `response.body` dict (logprobs, usage, finish_reason, etc.). |

### `BatchRegistry`

| Method | Description |
|---|---|
| `BatchRegistry(path=None)` | Open registry. Defaults to `~/.batch_openai/`. |
| `.track(job)` | Save or update a job. |
| `.get(batch_id, client)` | Reconstruct a `BatchJob`. Raises `KeyError` if not found. |
| `.update_status(batch_id, status)` | Update stored status. |
| `.list()` | All tracked batches as `list[BatchRecord]`, newest first. |
| `.delete(batch_id)` | Remove from registry. |

### Exceptions

| Exception | When raised |
|---|---|
| `OpenAIBatchError` | Base class — catch this to handle any library error |
| `BatchJobError` | `.wait()` — batch ended in `failed`, `expired`, or `cancelled` state |
| `BatchTransientError` | All retries exhausted for a 429 / 5xx / connection error |
| `BatchAuthError` | 401 persisted after a single retry |
| `BatchValidationError` | 400 / 422 bad request — not retried |
| `BatchParseError` | `result.parse(Model)` — a response failed Pydantic validation |

```python
except BatchJobError as e:
    print(e.status)    # "failed" | "expired" | "cancelled"
    print(e.batch_id)
```

## Examples

Working examples for common use cases are in [`src/examples/`](src/examples/):

| Example | Use Case |
|---|---|
| [`synthetic_data.py`](src/examples/synthetic_data.py) | Synthetic Data Generation — structured JSON output parsed into Pydantic models |
| [`llm_judge.py`](src/examples/llm_judge.py) | LLM as a Judge — score answers and correlate inputs/outputs with `.join()` |
| [`document_summarization.py`](src/examples/document_summarization.py) | Bulk Document Summarisation — plain text batch processing with `.join()` |

Each example is self-contained and runs with just `OPENAI_API_KEY` set.


## Use Cases

- **[LLM as a Judge](src/examples/llm_judge.py)** — evaluate model outputs across large datasets at 50% cost
- **[Synthetic Data Generation](src/examples/synthetic_data.py)** — generate structured datasets in bulk
- **[Bulk Document Processing](src/examples/document_summarization.py)** — labelling, extraction, summarisation at scale
- **Bulk Content Generation** — marketing copy, translations, paraphrasing
