Metadata-Version: 2.4
Name: FlowerPower
Version: 0.10.3
Summary: A simple workflow framework. Hamilton + APScheduler = FlowerPower
Author-email: "Volker L." <ligno.blades@gmail.com>
Project-URL: Homepage, https://github.com/legout/flowerpower
Project-URL: Bug Tracker, https://github.com/legout/flowerpower/issues
Keywords: hamilton,workflow,pipeline,scheduler,apscheduler,dask,ray
Requires-Python: >=3.11
Description-Content-Type: text/markdown
Requires-Dist: aiobotocore<2.18.0
Requires-Dist: aiosqlite>=0.21.0
Requires-Dist: dill>=0.3.8
Requires-Dist: duration-parser>=1.0.1
Requires-Dist: fsspec>=2024.10.0
Requires-Dist: humanize>=4.12.2
Requires-Dist: msgspec>=0.19.0
Requires-Dist: munch>=4.0.0
Requires-Dist: orjson>=3.10.15
Requires-Dist: pyarrow<19.0.0
Requires-Dist: pydantic>=2.10.2
Requires-Dist: python-dotenv>=1.0.1
Requires-Dist: pyyaml>=6.0.1
Requires-Dist: rich>=13.9.3
Requires-Dist: s3fs>=2024.10.0
Requires-Dist: sf-hamilton-sdk>=0.5.2
Requires-Dist: sf-hamilton[rich,tqdm,visualization]>=1.69.0
Requires-Dist: typer>=0.12.3
Provides-Extra: apscheduler
Requires-Dist: aiosqlite>=0.21.0; extra == "apscheduler"
Requires-Dist: apscheduler==4.0.0a5; extra == "apscheduler"
Requires-Dist: asyncpg>=0.29.0; extra == "apscheduler"
Requires-Dist: greenlet>=3.0.3; extra == "apscheduler"
Requires-Dist: sqlalchemy>=2.0.30; extra == "apscheduler"
Requires-Dist: cron-descriptor>=1.4.5; extra == "apscheduler"
Provides-Extra: io
Requires-Dist: adbc-driver-manager>=1.4.0; extra == "io"
Requires-Dist: datafusion>=43.1.0; extra == "io"
Requires-Dist: deltalake>=0.24.0; extra == "io"
Requires-Dist: duckdb>=1.1.3; extra == "io"
Requires-Dist: orjson>=3.10.12; extra == "io"
Requires-Dist: pandas>=2.2.3; extra == "io"
Requires-Dist: polars>=1.15.0; extra == "io"
Requires-Dist: pyarrow>=18.1.0; extra == "io"
Requires-Dist: pydala2>=0.9.4.5; extra == "io"
Requires-Dist: redis>=5.2.1; extra == "io"
Requires-Dist: sherlock>=0.4.1; extra == "io"
Provides-Extra: io-legacy
Requires-Dist: adbc-driver-manager>=1.4.0; extra == "io-legacy"
Requires-Dist: datafusion>=43.1.0; extra == "io-legacy"
Requires-Dist: deltalake>=0.24.0; extra == "io-legacy"
Requires-Dist: duckdb>=1.1.3; extra == "io-legacy"
Requires-Dist: orjson>=3.10.12; extra == "io-legacy"
Requires-Dist: pandas>=2.2.3; extra == "io-legacy"
Requires-Dist: polars-lts-cpu>=1.15.0; extra == "io-legacy"
Requires-Dist: pyarrow>=18.1.0; extra == "io-legacy"
Requires-Dist: pydala2>=0.9.4.5; extra == "io-legacy"
Requires-Dist: redis>=5.2.1; extra == "io-legacy"
Requires-Dist: sherlock>=0.4.1; extra == "io-legacy"
Provides-Extra: mongodb
Requires-Dist: pymongo>=4.7.2; extra == "mongodb"
Provides-Extra: mqtt
Requires-Dist: paho-mqtt>=2.1.0; extra == "mqtt"
Requires-Dist: orjson>=3.10.11; extra == "mqtt"
Requires-Dist: mmh3>=5.1.0; extra == "mqtt"
Provides-Extra: opentelemetry
Requires-Dist: opentelemetry-api>=1.5.0; extra == "opentelemetry"
Requires-Dist: opentelemetry-sdk>=1.5.0; extra == "opentelemetry"
Requires-Dist: opentelemetry-exporter-jaeger>=1.21.0; extra == "opentelemetry"
Provides-Extra: ray
Requires-Dist: ray>=2.34.0; extra == "ray"
Provides-Extra: redis
Requires-Dist: redis>=5.0.4; extra == "redis"
Provides-Extra: rq
Requires-Dist: rq>=2.3.1; extra == "rq"
Requires-Dist: rq-scheduler>=0.14.0; extra == "rq"
Requires-Dist: cron-descriptor>=1.4.5; extra == "rq"
Provides-Extra: tui
Requires-Dist: textual>=0.85.2; extra == "tui"
Provides-Extra: ui
Requires-Dist: sf-hamilton-ui>=0.0.11; extra == "ui"
Provides-Extra: webserver
Requires-Dist: sanic>=24.6.0; extra == "webserver"
Requires-Dist: sanic-ext>=23.12.0; extra == "webserver"
Requires-Dist: orjson>=3.10.11; extra == "webserver"
Provides-Extra: openlineage
Requires-Dist: openlineage-python>=1.32.0; extra == "openlineage"

<div align="center">
  <h1>FlowerPower 🌸 - Build & Orchestrate Data Pipelines</h1>
  <h3>Simple Workflow Framework - Hamilton + APScheduler or RQ = FlowerPower</h3>
  <img src="./image.png" alt="FlowerPower Logo" width="400" height="300">
</div>


# FlowerPower 🌸 - Build & Orchestrate Data Pipelines

[![PyPI version](https://img.shields.io/pypi/v/flowerpower.svg?style=flat-square)](https://pypi.org/project/flowerpower/) <!-- Placeholder -->
[![License](https://img.shields.io/pypi/l/flowerpower.svg?style=flat-square)](https://github.com/your-org/flowerpower/blob/main/LICENSE) <!-- Placeholder -->
[![Build Status](https://img.shields.io/github/actions/workflow/status/your-org/flowerpower/ci.yml?branch=main&style=flat-square)](https://github.com/your-org/flowerpower/actions) <!-- Placeholder -->
[![Python Version](https://img.shields.io/pypi/pyversions/flowerpower.svg?style=flat-square)](https://pypi.org/project/flowerpower/) <!-- Placeholder -->

**FlowerPower** is a Python framework designed for building, configuring, scheduling, and executing data processing pipelines with ease and flexibility. It promotes a modular, configuration-driven approach, allowing you to focus on your pipeline logic while FlowerPower handles the orchestration.

## ✨ Key Features

*   **Modular Design:** Easily swap components like job queue backends (APScheduler, RQ) or add custom I/O plugins.
*   **Configuration-Driven:** Define pipeline parameters, execution logic, and scheduling declaratively using simple YAML files.
*   **Job Queue Integration:** Built-in support for different asynchronous execution models:
    *   **APScheduler:** For time-based scheduling (cron, interval, date).
    *   **RQ (Redis Queue):** For distributed task queues.
*   **Extensible I/O Plugins:** Connect to various data sources and destinations (CSV, JSON, Parquet, DeltaTable, DuckDB, PostgreSQL, MySQL, MSSQL, Oracle, MQTT, SQLite, and more).
*   **Hamilton Integration:** Leverages the [Hamilton](https://github.com/DAGWorks-Inc/hamilton) library for defining dataflows in a clean, functional way within your Python pipeline scripts.
*   **Multiple Interfaces:** Interact with your pipelines via:
    *   **Command Line Interface (CLI):** For running, managing, and inspecting pipelines.
    *   **Web UI:** A graphical interface for monitoring and managing pipelines and schedules.
*   **Filesystem Abstraction:** Simplifies interactions with different storage backends.

## 🚀 Installation

We recommend using [uv](https://github.com/astral-sh/uv) for installing FlowerPower and managing your project environments. `uv` is an extremely fast Python package installer and resolver.

```bash
# Create and activate a virtual environment (recommended)
uv venv
source .venv/bin/activate # Or .\.venv\Scripts\activate on Windows

# Install FlowerPower
uv pip install flowerpower
```

*(Note: Specify required Python versions if known, e.g., Python 3.8+)*

## 🌱 Getting Started

Let's build a simple "Hello World" pipeline.

**1. Initialize Your Project:**

You can quickly set up the standard FlowerPower project structure using the CLI or Python.

**Using the CLI:**

Navigate to your desired parent directory and run:
```bash
flowerpower init --name hello-flowerpower-project
```
This will create a `hello-flowerpower-project` directory with the necessary `conf/` and `pipelines/` subdirectories and default configuration files.

**Using Python:**

Alternatively, you can initialize programmatically:
```python
from flowerpower import init_project

# Creates the structure in the current directory
init_project(name='hello-flowerpower-project', job_queue_type='rq') # Or 'apscheduler'
```

This sets up the basic layout:
```
hello-flowerpower-project/
├── conf/
│   ├── project.yml
│   └── pipelines/
└── pipelines/
```

Now, navigate into your new project directory:
```bash
cd hello-flowerpower-project
```

**2. Configure Project (`conf/project.yml`):**

Define your project name and choose your job queue backend. Here's an example using RQ:

```yaml
name: my_awesome_project
job_queue:
  type: rq
  backend:
    type: redis
    # host: localhost # Default or specify connection details
    # port: 6379
    # ... other redis options
    queues:
      - default
      - high
      - low
# adapter: ... # Optional adapter configurations (e.g., Hamilton Tracker, MLflow)
```

**3. Define Pipeline (`conf/pipelines/hello_world.yml`):**

Specify parameters, run configurations, and scheduling for your pipeline.

```yaml
# adapter: ... # Pipeline-specific adapter overrides

params: # Parameters accessible in your Python code
  greeting:
    message: "Hello"
  target:
    name: "World"

run: # How to execute the pipeline
  final_vars: # Specify the desired output(s) from your Hamilton DAG
    - full_greeting
  # config: ... # Runtime configuration overrides for Hamilton
  # executor: ... # Execution backend (e.g., threadpool, multiprocessing)

schedule: # Optional: How often to run the pipeline
  cron: "0 * * * *" # Run hourly
  # interval: # e.g., { "minutes": 15 }
  # date: # e.g., "2025-12-31 23:59:59"
```

**4. Implement Pipeline (`pipelines/hello_world.py`):**

Write your pipeline logic using Python and Hamilton. FlowerPower makes configuration easily accessible.

```python
import pandas as pd
from pathlib import Path
from hamilton.function_modifiers import parameterize
from flowerpower.cfg import Config

# Load configuration specific to this pipeline
# Assumes this file is in pipelines/hello_world.py relative to conf/
PARAMS = Config.load(Path(__file__).parents[1], pipeline_name="hello_world").pipeline.h_params
@parameterize(**PARAMS.greeting) # Inject 'message' from params
def greeting_message(message: str) -> str:
  """Provides the greeting part."""
  return f"{message},"

@parameterize(**PARAMS.target) # Inject 'name' from params
def target_name(name: str) -> str:
  """Provides the target name."""
  return f"{name}!"

def full_greeting(greeting_message: str, target_name: str) -> str:
  """Combines the greeting and target."""
  print(f"Generating greeting: {greeting_message} {target_name}")
  return f"{greeting_message} {target_name}"

# You can add more complex Hamilton functions here...
```


## 🏃‍♀️ Running Pipelines: Sync vs. Async

FlowerPower offers flexibility in how you execute your pipelines:

**1. Synchronous Execution:**

For simple pipelines or testing, you can run them directly in the current session without involving a job queue.

*   **Via CLI:**
    ```bash
    # Assumes your project structure is standard and you are in the project root
    flowerpower pipeline run hello_world --base_dir .
    ```
*   **Via Python:**
    ```python
    from flowerpower.pipeline import PipelineManager

    # Specify the base directory containing your 'conf/' folder
    pm = PipelineManager(base_dir='.')
    results = pm.run('hello_world') # Execute the pipeline named 'hello_world'
    print(results)
    ```

**2. Asynchronous Execution (Job Queues):**

For scheduling, background execution, or distributed processing, leverage FlowerPower's job queue integration. This is configured in your `conf/project.yml`.

*   **RQ (Redis Queue):**
    *   **Requires:** Access to a running Redis server.
    *   Ideal for distributed task queues where workers can pick up jobs.
    *   Configure in `project.yml`: `job_queue: { type: rq, backend: { type: redis, ... } }`
    *   **Learn More:** [RQ Documentation](https://python-rq.org/)

*   **APScheduler:**
    *   **Requires:**
        *   A **Data Store:** To persist job information (Options: PostgreSQL, MySQL, SQLite, MongoDB).
        *   An **Event Broker:** To notify workers of scheduled jobs (Options: Redis, MQTT, PostgreSQL).
    *   Ideal for time-based scheduling (cron, intervals, specific dates).
    *   Configure in `project.yml`: `job_queue: { type: apscheduler, datastore: { ... }, eventbroker: { ... } }`
    *   **Learn More:** [APScheduler Documentation](https://apscheduler.readthedocs.io/)

**Local Development Setup (Docker):**

To easily set up required services like Redis, PostgreSQL, or MQTT locally for testing job queues, a basic `docker-compose.yml` file is provided in the `docker/` directory. This file includes configurations for various services useful during development.

```bash
# Navigate to the docker directory and start services
cd docker
docker-compose up -d redis postgres # Example: Start Redis and PostgreSQL
```
*(Note: Review and adapt `docker/docker-compose.yml` for your specific needs. It's intended for development, not production.)*



## ⚙️ Configuration Overview

FlowerPower uses a layered configuration system:

*   **`conf/project.yml`:** Defines global settings for your project, primarily the `job_queue` backend (RQ or APScheduler) and configurations for integrated `adapter`s (like Hamilton Tracker, MLflow, etc.).
*   **`conf/pipelines/*.yml`:** Each file defines a specific pipeline. It contains:
    *   `params`: Input parameters for your Hamilton functions.
    *   `run`: Execution details like target outputs (`final_vars`), Hamilton runtime `config`, and `executor` settings.
    *   `schedule`: Defines when the pipeline should run automatically (using `cron`, `interval`, or `date`).
    *   `adapter`: Pipeline-specific overrides for adapter settings.

## 🛠️ Basic Usage

The primary way to interact with pipelines is often through the CLI:

```bash
# Run a pipeline manually
flowerpower run <pipeline_name>

# List available pipelines (example command)
# flowerpower list pipelines

# Check job status (example command)
# flowerpower status
```

*(Note: Replace placeholder commands with actual CLI commands once known)*

## 🖥️ Interfaces

FlowerPower provides two main ways to interact:

*   **CLI:** A command-line interface for developers and automation.
*   **Web UI:** A browser-based interface for monitoring pipeline runs, schedules, and potentially managing configurations.

## 🤝 Contributing

Contributions are welcome! Please refer to the `CONTRIBUTING.md` file (placeholder) for guidelines.

## 📜 License

This project is licensed under the MIT License - see the `LICENSE` file for details. (Placeholder - update with actual license)
