Metadata-Version: 2.4
Name: agentdiff-coordination
Version: 0.1.0
Summary: Lightweight coordination primitives for AI agents - prevent race conditions and concurrency bugs
Project-URL: Homepage, https://github.com/AgentDiff/agentdiff-coordination
Project-URL: Repository, https://github.com/AgentDiff/agentdiff-coordination
Project-URL: Issues, https://github.com/AgentDiff/agentdiff-coordination/issues
Project-URL: Documentation, https://github.com/AgentDiff/agentdiff-coordination#readme
Author-email: AgentDiff Team <hello@agentdiff.com>
License-Expression: MIT
License-File: LICENSE
Keywords: agents,ai,concurrency,coordination,crewai,langchain,langgraph,race-conditions
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Distributed Computing
Requires-Python: >=3.9
Requires-Dist: wrapt>=1.16.0
Provides-Extra: dev
Requires-Dist: anthropic>=0.64.0; extra == 'dev'
Requires-Dist: crewai>=0.5.0; extra == 'dev'
Requires-Dist: openai>=1.99.9; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.21.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Requires-Dist: python-dotenv>=1.1.1; extra == 'dev'
Description-Content-Type: text/markdown

# AgentDiff Coordination

AgentDiff is a lightweight coordination library that prevents the concurrency bugs you've probably hit before - like Agent2 started before Agent1 completed their task or two agents writing to the same database key, or all your agents hitting OpenAI/Anthropic simultaneously and getting rate limited.

Instead of debugging framework internals for days, just add `@coordinate` decorators where you need resource locks and `@when` for event-driven chaining. It works with whatever you're already using - LangChain, CrewAI, or pure Python.

Think of it as a traffic light system for your agents, not a whole new framework to learn.

[![PyPI version](https://badge.fury.io/py/agentdiff-coordination.svg)](https://badge.fury.io/py/agentdiff-coordination)
[![Python Support](https://img.shields.io/pypi/pyversions/agentdiff-coordination.svg)](https://pypi.org/project/agentdiff-coordination/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

## What AgentDiff Coordination Solves

- **Race conditions** between agents accessing shared resources.
- **Corrupted state** when multiple agents write to the same keys.
- **API rate limit chaos** from concurrent LLM calls.
- **"Two nodes writing to same key"** bugs that waste days of debugging.
- **Framework complexity** that gets in the way of actually building agents.

## Core Features

- **`@coordinate`** - Resource locks + automatic lifecycle events
- **`@when`** - Event-driven agent chaining (no manual orchestration)
- **`emit()`** - Custom events for complex workflows
- **Zero Configuration** - Works immediately, configure only what you need
- **Framework Agnostic** - Works with any agent framework or pure Python

## Use cases: Solving for concurrency issues and race conditions.

### **Concurrent State Updates**

```python
# Before: Race conditions in shared state
def process_customer_data():
    customer_state["status"] = "processing"  # Race condition
    result = process_data()
    customer_state["result"] = result       # Overwrites other agent

def update_customer_profile():
    customer_state["status"] = "updating"   # Conflicts with processor
    customer_state["profile"] = new_profile # State corruption

# After: Resource locks prevent conflicts
@coordinate("data_processor", lock_name="customer_123")
def process_customer_data():
    customer_state["status"] = "processing"  # Exclusive access
    result = process_data()
    customer_state["result"] = result        # Safe update

@coordinate("profile_updater", lock_name="customer_123")
def update_customer_profile():
    customer_state["status"] = "updating"    # Waits for processor
    customer_state["profile"] = new_profile  # No conflicts
```

### **API Rate Limit Chaos**

```python
# Before: Multiple agents hitting APIs simultaneously
def research_agent():
    response = openai.chat.completions.create(...)  #  Rate limited

def analysis_agent():
    response = openai.chat.completions.create(...)  #  Rate limited

def summary_agent():
    response = openai.chat.completions.create(...)  #  Rate limited

# Run in parallel = debugging nightmare
threading.Thread(target=research_agent).start()
threading.Thread(target=analysis_agent).start()
threading.Thread(target=summary_agent).start()

# After: Resource locks queue API calls safely
@coordinate("researcher", lock_name="openai_api")
def research_agent():
    response = openai.chat.completions.create(...)  #  Queued safely

@coordinate("analyzer", lock_name="openai_api")
def analysis_agent():
    response = openai.chat.completions.create(...)  #  Waits for researcher

@coordinate("summarizer", lock_name="openai_api")
def summary_agent():
    response = openai.chat.completions.create(...)  #  Waits for analyzer
```

### **Manual Orchestration Nightmare**

```python
# Before: Complex manual coordination
def run_workflow():
    research_result = research_agent()
    if research_result:
        analysis_result = analysis_agent(research_result)
        if analysis_result:
            summary_result = summary_agent(analysis_result)
            if summary_result:
                final_report = editor_agent(summary_result)
    #  Error handling, retries, parallel flows = complexity explosion

# After: Event-driven coordination
@coordinate("researcher")
def research_agent():
    return research_data

@when("researcher_complete")
def start_analysis(event_data):
    analysis_agent(event_data['result'])  #  Auto-triggered

@when("analyzer_complete")
def start_summary(event_data):
    summary_agent(event_data['result'])   #  Auto-chained

# Just start the workflow - coordination happens automatically
research_agent()  # Everything else flows automatically
```

## Installation & Requirements

**Python Support**: 3.9+ (tested on 3.9, 3.10, 3.11, 3.12)

```bash
pip install agentdiff-coordination
```

## Documentation

- **[Quick Start Guide](docs/quickstart.md)** - Get up and running in 5 minutes
- **[API Reference](docs/api-reference.md)** - Complete function documentation
- **[Use cases](docs/use-cases.md)** - Example use cases
- **[Configuration Guide](docs/configuration.md)** - Environment variables and settings
- **[Examples](examples/)** - Example agent coordination patterns

## Contributing

We welcome contributions! See [CONTRIBUTING.md](CONTRIBUTING.md) for guidelines.

## License

MIT License - see [LICENSE](LICENSE) file for details.

## About AgentDiff

AgentDiff builds practical tools for AI developers who are tired of framework complexity. This coordination library was built after seeing too many developers waste days debugging concurrency instead of building agents.

- **GitHub**: [https://github.com/AgentDiff](https://github.com/AgentDiff)
- **Issues**: [Report bugs and request features](https://github.com/AgentDiff/agentdiff-coordination/issues)
- **Community**: Share your coordination wins and debugging horror stories
