Metadata-Version: 2.4
Name: datapipelab
Version: 0.3.5
Summary: A data pipeline library with connectors, sources, processors, and sinks.
Description-Content-Type: text/markdown
Requires-Dist: json5
Requires-Dist: loguru
Dynamic: description
Dynamic: description-content-type
Dynamic: requires-dist
Dynamic: summary

# Datapipelab

## Overview
`datapipelab` is a lightweight, flexible data pipeline framework designed for building and orchestrating complex data workflows. It supports a modular node-based architecture, allowing users to plug in source, processor, and sink nodes using technologies such as Apache Spark, Google BigQuery, Hive, Delta Lake, and Microsoft Teams.


## Installation
Clone the repository and install any required dependencies:
```bash
pip install -r requirements.txt
```

Or, if integrating as a module:
```bash
pip install datapipelab
```


## Usage Guide

To run a pipeline, you typically follow these steps:

1. **Define your pipeline configuration** using a Python list or a JSON config.
2. **Instantiate and execute the engine**.

Example:

```python
from datapipelab.engine import Engine

config = [
    {
        "type": "source",
        "format": "hive",
        "name": "load_customer_accounts",
        "options": {
            "query": "SELECT customer_id, enrollment_date FROM customer_account"
        }
    },
    {
        "type": "processor",
        "format": "spark",
        "name": "aggregate_active_users",
        "options": {
            "parents": ["load_customer_accounts"],
            "query": """
                SELECT 
                    YEAR(enrollment_date) AS enrollment_year, 
                    COUNT(*) AS active_user_count
                FROM load_customer_accounts
                GROUP BY enrollment_year
            """
        }
    },
    {
        "type": "sink",
        "format": "hive",
        "name": "store_active_user_report",
        "options": {
            "parents": ["aggregate_active_users"],
            "table": "report.active_user_summary"
        }
    },
    {
        "type": "sink",
        "format": "teams_notification",
        "name": "notify_report_ready",
        "options": {
            "parents": ["store_active_user_report"],
            "webhook_url": "{{{WEBHOOK_URL}}}",
            "message": "Active user report has been updated in Hive."
        }
    }
]

params = {"WEBHOOK_URL": "https://outlook.office.com/webhook/..."}
engine = Engine(config, spark, params)
engine.running_travelers()
```


## Pipeline Configuration

Pipelines are defined using structured configuration objects or files that specify:

* Nodes (source, processor, sink)
* Dependencies and execution order via `parents`
* Parameters for each node, e.g., SQL queries, table names, paths

## Available Node Types

### Source Nodes

* **`spark_node`**
  * Executes a Spark SQL query to read data into the pipeline.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "source",
      "format": "spark",
      "source": "spark",
      "options": {
        "query": "SELECT * FROM database_name.table_name"
      }
    }
    ```
    

* **`hive_node`**
  * Reads data from a Hive table.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "source",
      "format": "hive",
      "source": "hive",
      "options": {
        "query": "SELECT * FROM database_name.table_name"
      }
    }
    ```

### Processor Nodes

* **`bigquery_api_node`**
  * Executes a query via BigQuery API.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "processor",
      "format": "bigquery_api",
      "options": {
            "credentials_path": "creadentials.json",
            "return_as_spark_df": false,
            "return_as_python_list": false,
            "return_as_is": true,
            "project_name": "project_name",
            "query": "select * from `project_name.dataset_name.table_name`"
      }
    }
    ```
    - *`return_as_python_list` and `return_as_is` are optional
    - *`query` can be any valid BigQuery SQL query including (SELECT/DDL/DML/Scripting/Control Flow/Stored Procedure Calls/Temporary Table Usage) statements.


* **`gcp_bucket_api_node`**
  * Deletes a bucket or a directory in a GCP bucket.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "processor",
      "format": "gcp_bucket_api",
      "options": {
            "credentials_path": "creadentials.json",
            "project_name": "project_name",
            "bucket_name": "bucket_name",
            "subdirectory": "path/to/subdirectory"
      }
    }
    ```
    - *`subdirectory` is optional and can be used to specify a subdirectory within the bucket.


* **`bigquery_spark_node`**
  * Reads data from BigQuery using the Spark BigQuery connector.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "processor",
      "format": "bigquery_spark",
      "options": {
            "parent_project": "parent_project_name",
            "materialization_dataset": "materialization_dataset_name",
            "query": "select * from `project_name.dataset_name.table_name`"
      }
    }
    ```
    - *`query` does not support DDL/DML/Scripting/Control Flow/Stored Procedure Calls/Temporary Table Usage statements. Only SELECT statements are supported.


* **`shell_node`**
  * Executes a shell command or script.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "processor",
      "format": "shell",
      "options": {
        "query": "echo 'Hello, World!'"
      }
    }
    ```
    

* **`custom_node`**
  * Custom logic node written by user.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "processor",
      "format": "custom",
      "options": {
        "module_name": "CustomModuleName"
        "module_path": "path/to/custom_module",
        "class_name": "CustomNodeClassName",
        "optional_param": "value"
      }
    }
    ```


### Sink Nodes

* **`hive_node`**
  * Writes output to a Hive table.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "sink",
      "format": "hive",
      "type": "spark",
      "options": {
        "parents": ["parent_node_name"],
        "database": "database_name",
        "table": "table_name"
      }
    }
    ```
    

* **`spark_node`**
  * Writes output to a Hive table.
  * Example:

    ```json
    {
      "name": "node_name",
      "type": "sink",
      "format": "spark",
      "type": "spark",
      "options": {
        "parents": ["parent_node_name"],
        "database": "database_name",
        "table": "table_name"
      }
    }
    ```
    

* **`teams_notification_node`**

  * Sends a message to a Microsoft Teams channel.
  * Example:

    ```json
    {
      "type": "sink",
      "format": "teams_notification",
      "name": "notify_report_ready",
      "options": {
         "parents": ["store_active_user_report"],
         "webhook_url": "{{{WEBHOOK_URL}}}",
         "message": "Active user report has been updated in Hive."
      }
    }
    ```


## Extending the Framework

To create a custom node:

1. Subclass `TNode` from `app/node/tnode.py`
2. Implement the required methods (`run`, `validate`, etc.)
3. Register your node in the pipeline factory or configuration

## Logging and Monitoring

Logging is centralized in `logger.py`. Logs are categorized by node and execution stage to assist with debugging and auditing.

## Troubleshooting

---

For more advanced examples or integration guides, refer to the `examples/` folder or reach out to the maintainers.
