Metadata-Version: 2.4
Name: FlowerPower
Version: 1.0.0b1
Summary: A simple workflow framework. Hamilton + APScheduler = FlowerPower
Author-email: "Volker L." <ligno.blades@gmail.com>
Project-URL: Homepage, https://github.com/legout/flowerpower
Project-URL: Bug Tracker, https://github.com/legout/flowerpower/issues
Keywords: hamilton,workflow,pipeline,scheduler,apscheduler,dask,ray
Requires-Python: >=3.11
Description-Content-Type: text/markdown
Requires-Dist: aiobotocore<2.18.0
Requires-Dist: aiosqlite>=0.21.0
Requires-Dist: dill>=0.3.8
Requires-Dist: duration-parser>=1.0.1
Requires-Dist: fsspec>=2024.10.0
Requires-Dist: humanize>=4.12.2
Requires-Dist: msgspec>=0.19.0
Requires-Dist: munch>=4.0.0
Requires-Dist: orjson>=3.10.15
Requires-Dist: pyarrow<19.0.0
Requires-Dist: pydantic>=2.10.2
Requires-Dist: python-dotenv>=1.0.1
Requires-Dist: pyyaml>=6.0.1
Requires-Dist: rich>=13.9.3
Requires-Dist: s3fs>=2024.10.0
Requires-Dist: sf-hamilton-sdk>=0.5.2
Requires-Dist: sf-hamilton[rich,tqdm,visualization]>=1.69.0
Requires-Dist: typer>=0.12.3
Provides-Extra: apscheduler
Requires-Dist: aiosqlite>=0.21.0; extra == "apscheduler"
Requires-Dist: apscheduler==4.0.0a5; extra == "apscheduler"
Requires-Dist: asyncpg>=0.29.0; extra == "apscheduler"
Requires-Dist: greenlet>=3.0.3; extra == "apscheduler"
Requires-Dist: sqlalchemy>=2.0.30; extra == "apscheduler"
Requires-Dist: cron-descriptor>=1.4.5; extra == "apscheduler"
Provides-Extra: io
Requires-Dist: adbc-driver-manager>=1.4.0; extra == "io"
Requires-Dist: datafusion>=43.1.0; extra == "io"
Requires-Dist: deltalake>=0.24.0; extra == "io"
Requires-Dist: duckdb>=1.1.3; extra == "io"
Requires-Dist: orjson>=3.10.12; extra == "io"
Requires-Dist: pandas>=2.2.3; extra == "io"
Requires-Dist: polars>=1.15.0; extra == "io"
Requires-Dist: pyarrow>=18.1.0; extra == "io"
Requires-Dist: pydala2>=0.9.4.5; extra == "io"
Requires-Dist: redis>=5.2.1; extra == "io"
Requires-Dist: sherlock>=0.4.1; extra == "io"
Provides-Extra: io-legacy
Requires-Dist: adbc-driver-manager>=1.4.0; extra == "io-legacy"
Requires-Dist: datafusion>=43.1.0; extra == "io-legacy"
Requires-Dist: deltalake>=0.24.0; extra == "io-legacy"
Requires-Dist: duckdb>=1.1.3; extra == "io-legacy"
Requires-Dist: orjson>=3.10.12; extra == "io-legacy"
Requires-Dist: pandas>=2.2.3; extra == "io-legacy"
Requires-Dist: polars-lts-cpu>=1.15.0; extra == "io-legacy"
Requires-Dist: pyarrow>=18.1.0; extra == "io-legacy"
Requires-Dist: pydala2>=0.9.4.5; extra == "io-legacy"
Requires-Dist: redis>=5.2.1; extra == "io-legacy"
Requires-Dist: sherlock>=0.4.1; extra == "io-legacy"
Provides-Extra: mongodb
Requires-Dist: pymongo>=4.7.2; extra == "mongodb"
Provides-Extra: mqtt
Requires-Dist: paho-mqtt>=2.1.0; extra == "mqtt"
Requires-Dist: orjson>=3.10.11; extra == "mqtt"
Requires-Dist: mmh3>=5.1.0; extra == "mqtt"
Provides-Extra: opentelemetry
Requires-Dist: opentelemetry-api>=1.5.0; extra == "opentelemetry"
Requires-Dist: opentelemetry-sdk>=1.5.0; extra == "opentelemetry"
Requires-Dist: opentelemetry-exporter-jaeger>=1.21.0; extra == "opentelemetry"
Provides-Extra: ray
Requires-Dist: ray>=2.34.0; extra == "ray"
Provides-Extra: redis
Requires-Dist: redis>=5.0.4; extra == "redis"
Provides-Extra: rq
Requires-Dist: rq>=2.3.1; extra == "rq"
Requires-Dist: rq-scheduler>=0.14.0; extra == "rq"
Requires-Dist: cron-descriptor>=1.4.5; extra == "rq"
Provides-Extra: tui
Requires-Dist: textual>=0.85.2; extra == "tui"
Provides-Extra: ui
Requires-Dist: sf-hamilton-ui>=0.0.11; extra == "ui"
Provides-Extra: webserver
Requires-Dist: sanic>=24.6.0; extra == "webserver"
Requires-Dist: sanic-ext>=23.12.0; extra == "webserver"
Requires-Dist: orjson>=3.10.11; extra == "webserver"
Provides-Extra: openlineage
Requires-Dist: openlineage-python>=1.32.0; extra == "openlineage"

<div align="center">
  <h1>FlowerPower 🌸</h1>
  <h3>Simple Workflow Framework - Hamilton + APScheduler = FlowerPower</h3>
  <img src="./image.png" alt="FlowerPower Logo" width="400" height="300">
</div>

A powerful and flexible data pipeline framework that simplifies data processing workflows, job scheduling, and event-driven architectures. FlowerPower combines modern data processing capabilities with robust job queue management and MQTT integration.

## ✨ Features

### Core Features
- 📊 **Pipeline Management**: Build and run data processing pipelines with support for multiple data formats and computation engines
- 🔄 **Job Queue Integration**: Support for multiple job queue backends (RQ, APScheduler)
- 📡 **MQTT Integration**: Built-in support for MQTT-based event processing
- 🎯 **Resilient Execution**: Automatic retries with configurable backoff and jitter
- 📊 **Data Format Support**: Work with CSV, JSON, Parquet files and more
- 🗄️ **Database Connectivity**: Connect to PostgreSQL, MySQL, SQLite, DuckDB, Oracle, and MSSQL

### Additional Features
- 🛠️ **CLI Tools**: Comprehensive command-line interface for all operations
- 📈 **Pipeline Visualization**: DAG visualization for pipeline understanding
- 🔍 **Monitoring**: Integration with OpenTelemetry for observability
- 🐳 **Docker Support**: Ready-to-use Docker configurations

## 🚀 Quick Start

### Installation

```bash
# Using pip
pip install flowerpower

# For development installation
git clone https://github.com/yourusername/flowerpower.git
cd flowerpower
pip install -e ".[dev]"
```

### Create Your First Pipeline

1. Initialize a new project:
```bash
flowerpower init --name my-first-project
```

2. Create a simple pipeline in `pipelines/hello_world.py`:
```python
import pandas as pd

def load_data() -> pd.DataFrame:
    """Load sample data"""
    return pd.DataFrame({
        'name': ['Alice', 'Bob', 'Charlie'],
        'age': [25, 30, 35]
    })

def process_data(df: pd.DataFrame) -> pd.DataFrame:
    """Add a greeting column"""
    df['greeting'] = 'Hello, ' + df['name']
    return df

def save_output(df: pd.DataFrame) -> None:
    """Save the processed data"""
    print(df)
```

3. Run the pipeline:
```bash
flowerpower pipeline run hello_world
```

## 💡 Key Concepts

### Pipeline Management

Pipelines are the core building blocks of FlowerPower. They can be:
- Run directly
- Scheduled
- Triggered by MQTT messages
- Executed as background jobs

```bash
# Run a pipeline
flowerpower pipeline run my_pipeline --inputs '{"source": "data.csv"}'

# Schedule a pipeline
flowerpower pipeline schedule my_pipeline --cron "0 * * * *"

# Show pipeline structure
flowerpower pipeline show-dag my_pipeline
```

### Job Queue Integration

FlowerPower supports multiple job queue backends:

```bash
# Start a worker with RQ backend
flowerpower job-queue start-worker --type rq

# Start APScheduler worker
flowerpower job-queue start-worker --type apscheduler

# Add a job with retry configuration
flowerpower job-queue add-job my_pipeline \
  --max-retries 3 \
  --retry-delay 2.0 \
  --jitter-factor 0.1
```

### MQTT Integration

Connect your pipelines to MQTT message brokers:

```bash
# Run a pipeline when messages arrive
flowerpower mqtt run-pipeline-on-message my_pipeline \
  --topic "sensors/data" \
  --max-retries 3 \
  --retry-delay 1.0

# Start a custom message listener
flowerpower mqtt start-listener \
  --on-message process_message \
  --topic "events/#"
```

## 📁 Project Structure

```
my-project/
├── conf/
│   ├── project.yml          # Project configuration
│   └── pipelines/          # Pipeline configurations
│       └── my_pipeline.yml
├── pipelines/              # Pipeline implementations
│   └── my_pipeline.py
└── data/                   # Data files (optional)
```

## 🔌 Data Connectors

### Supported File Formats
- CSV
- JSON
- Parquet
- Pydala Datasets

### Supported Databases
- PostgreSQL
- MySQL
- SQLite
- Oracle
- Microsoft SQL Server
- DuckDB

## 🐳 Docker Support

Run FlowerPower in containers:

```bash
cd docker
docker-compose up
```

The Docker setup includes:
- Python worker environment
- MQTT broker (Mosquitto)
- Built-in configuration

## 🛠️ Configuration

### Pipeline Configuration
```yaml
# conf/pipelines/my_pipeline.yml
name: my_pipeline
description: Example pipeline configuration
inputs:
  source_data:
    type: csv
    path: data/input.csv
outputs:
  processed_data:
    type: parquet
    path: data/output.parquet
```

### Job Queue Configuration
```yaml
# conf/project.yml
job_queue:
  type: rq  # or apscheduler
  redis_url: redis://localhost:6379
  max_retries: 3
  retry_delay: 1.0
```

## 📚 API Documentation

Visit our [API Documentation](docs/api.md) for detailed information about:
- Pipeline API
- Job Queue API
- MQTT Integration
- Data Connectors
- Configuration Options

## 🧪 Running Tests

```bash
# Run all tests
pytest tests/

# Run specific test category
pytest tests/test_pipeline/
```

## 🤝 Contributing

We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.

1. Fork the repository
2. Create your feature branch
3. Commit your changes
4. Push to the branch
5. Create a Pull Request

## 📄 License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## 🙏 Acknowledgments

- Built with [Hamilton](https://github.com/DAGWorks-Inc/hamilton) for pipeline execution
- Uses [RQ](https://python-rq.org/) and [APScheduler](https://apscheduler.readthedocs.io/) for job queues
- MQTT support via [Paho MQTT](https://www.eclipse.org/paho/)
- Database connectivity through SQLAlchemy and native connectors
