Metadata-Version: 2.4
Name: dbbasic-pipe
Version: 0.1.0
Summary: SQL-style query optimization for Unix pipes with bidirectional flow control
Author-email: Dan Quellhorst <dan@quellhorst.com>
License: MIT
Project-URL: Homepage, https://github.com/askrobots/dbbasic-pipe
Project-URL: Repository, https://github.com/askrobots/dbbasic-pipe
Project-URL: Issues, https://github.com/askrobots/dbbasic-pipe/issues
Keywords: pipe,unix,pipeline,backpressure,streaming,sql,query
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: System :: Shells
Classifier: Topic :: Utilities
Classifier: Operating System :: POSIX
Classifier: Operating System :: Unix
Classifier: Operating System :: MacOS
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Dynamic: license-file

# dbbasic-pipe

**SQL-style query optimization for Unix pipes**

`dbbasic-pipe` brings bidirectional flow control to Unix pipelines, allowing downstream commands to signal upstream commands to optimize data production—just like SQL query optimizers push predicates down and apply limits early.

## The Problem

Traditional Unix pipes are one-way: data flows left-to-right with no way for downstream commands to tell upstream "I only need 100 items" or "stop after the first match."

```bash
# Inefficient: cat reads entire 10GB file
cat huge.log | grep ERROR | head -10
```

## The Solution

`dbbasic-pipe` adds an out-of-band coordination channel via environment variables and Unix sockets, letting commands communicate control messages while data still flows through normal pipes.

```bash
# Efficient: pcat stops reading after producing enough matches
dbbasic-pipe pcat huge.log | pfilter ERROR | plimit 10
```

## Installation

```bash
pip install dbbasic-pipe
```

## Quick Start

```bash
# Create test data
cat > data.json << EOF
{"name": "Alice", "age": 25}
{"name": "Bob", "age": 17}
{"name": "Charlie", "age": 30}
{"name": "David", "age": 15}
{"name": "Eve", "age": 22}
EOF

# Run with coordination (optimized)
dbbasic-pipe pcat data.json | pfilter 'age > 18' | plimit 2
```

**Output:**
```json
{"name": "Alice", "age": 25}
{"name": "Charlie", "age": 30}
```

**What happened:**
1. `plimit` sent backpressure signal: "I only need 2 items"
2. `pfilter` received it and forwarded upstream
3. `pcat` stopped reading after producing 2 matching records (instead of reading all 5)

## Commands

### `dbbasic-pipe`

Wrapper that enables coordination for a pipeline.

```bash
dbbasic-pipe <command>
```

**Example:**
```bash
dbbasic-pipe bash -c 'pcat data.json | pfilter "age > 18" | plimit 10'
```

### `pcat <file>`

Smart cat - reads files and respects backpressure signals.

```bash
pcat data.json
pcat large.log | plimit 100
```

### `pfilter '<expression>'`

Filter JSON lines by Python expressions.

```bash
cat data.json | pfilter 'age > 18'
cat data.json | pfilter 'name == "Alice"'
cat data.json | pfilter 'status == "active" and score > 50'
```

### `plimit <count>`

Output first N lines and signal backpressure upstream.

```bash
cat data.json | plimit 10
```

## How It Works

### Architecture

```
Data flow:     pcat → pipe → pfilter → pipe → plimit
Control flow:  pcat ← socket ← pfilter ← socket ← plimit
                     ↓                       ↓
                     └──→ coordinator ←──────┘
```

### Protocol

1. `dbbasic-pipe` sets `PIPE_CONTROL_SOCKET` environment variable
2. Starts `pipe-coordinator` service in background
3. Commands detect the env var and connect to coordinator socket
4. Commands exchange JSON control messages:
   - `register`: Announce presence
   - `backpressure`: Signal "I only need N more items"
   - `complete`: Notify when done

### Message Format

```json
{"type": "register", "pid": 12345, "command": "pfilter", "predicate": "age > 18"}
{"type": "backpressure", "count": 100}
{"type": "complete", "pid": 12345, "processed": 150, "output": 100}
```

## Standalone Usage

All commands work **without** coordination too:

```bash
# Works fine, just no optimization
pcat data.json | pfilter 'age > 18' | plimit 10
```

## Use Cases

### Large Log Files

```bash
# Stop after finding 10 errors (doesn't read entire file)
dbbasic-pipe pcat /var/log/huge.log | pfilter 'ERROR' | plimit 10
```

### API Data Processing

With future `pcurl` command:
```bash
# pcurl could add ?limit=100 to API request based on backpressure
dbbasic-pipe pcurl api.com/users | pfilter 'age > 18' | plimit 100
```

### Database Queries

With future `psql` wrapper:
```bash
# psql-smart could build optimized query: WHERE age>18 LIMIT 100
dbbasic-pipe psql-smart -c 'SELECT * FROM users' | pfilter 'age>18' | plimit 100
```

## Comparison to Traditional Pipes

| Scenario | Traditional | With dbbasic-pipe |
|----------|-------------|-------------------|
| `cat 10GB.log \| grep ERROR \| head -10` | Reads entire 10GB | Stops after ~10 matches |
| `cat users.json \| filter age>18 \| head -100` | Processes all records | Stops at ~100 matches |
| API calls | Downloads all data | Could request `?limit=100` |

## Real-World Inspiration

This approach is inspired by:

- **SQL query optimizers** - Push predicates down, apply limits early
- **Reactive Streams** - Explicit backpressure protocol
- **Apache Beam/Flink** - Distributed query planning
- **Rust iterators** - Lazy evaluation with `take()`
- **PRQL/Kusto** - Pipe-style query languages

## Development

```bash
# Clone repo
git clone https://github.com/askrobots/dbbasic-pipe.git
cd dbbasic-pipe

# Install in development mode
pip install -e .

# Run tests
python -m pytest
```

## Extending

Create your own coordination-aware commands:

```python
#!/usr/bin/env python3
import os, sys, socket, json

ctrl_socket = os.getenv('PIPE_CONTROL_SOCKET')
if ctrl_socket:
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    sock.connect(ctrl_socket)

    # Register
    sock.send(json.dumps({
        'type': 'register',
        'pid': os.getpid(),
        'command': 'my-tool'
    }).encode() + b'\n')

    # Listen for backpressure
    # ... check sock with select()

# Process data via stdin/stdout as normal
```

## Future Ideas

- **pcurl** - HTTP client that adds query params based on backpressure
- **psql-smart** - Builds optimal SQL queries from downstream filters
- **pgrep** - Stops searching after limit reached
- **ptail** - Stops tailing after condition met
- **Type system** - Commands advertise schema
- **Distributed mode** - Coordinator as network service
- **Query optimization** - Merge adjacent filters, reorder operations

## Contributing

Contributions welcome! Please open issues or PRs at:
https://github.com/askrobots/dbbasic-pipe

## License

MIT License - see [LICENSE](LICENSE) file

## Author

Dan Quellhorst
- GitHub: [@askrobots](https://github.com/askrobots)
- Website: [quellhorst.com](https://quellhorst.com)

## Related Projects

- [dbbasic-tsv](https://github.com/askrobots/dbbasic-tsv) - TSV data processing
- [dbbasic-content](https://github.com/askrobots/dbbasic-content) - Content management tools
- [dbbasic-video](https://github.com/askrobots/dbbasic-video) - Video processing utilities
