Metadata-Version: 2.1
Name: airflow-provider-kafka
Version: 0.2.2
Summary: Apache Airflow Kafka provider containing Deferrable Operators & Sensors.
Home-page: https://github.com/astronomer/airflow-provider-kafka
Author: Dylan Storey
Author-email: dylan.storey@astronomer.io
License: Apache License 2.0
Project-URL: Source Code, https://github.com/astronomer/airflow-provider-kafka
Project-URL: Homepage, https://github.com/astronomer/airflow-provider-kafka
Project-URL: Changelog, https://github.com/astronomer/airflow-provider-kafka/blob/main/CHANGELOG.md
Platform: UNKNOWN
Classifier: Environment :: Web Environment
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Requires-Python: >=3.7
Description-Content-Type: text/markdown
License-File: LICENSE.txt
Requires-Dist: apache-airflow (>=2.2.0)
Requires-Dist: asgiref
Requires-Dist: confluent-kafka (>=1.8.2)
Provides-Extra: dev
Requires-Dist: mypy (>=0.800) ; extra == 'dev'
Requires-Dist: pytest ; extra == 'dev'
Requires-Dist: pre-commit ; extra == 'dev'

# Kafka Airflow Provider

![GitHub release (latest by date)](https://img.shields.io/github/v/release/astronomer/airflow-provider-kafka)![PyPI](https://img.shields.io/pypi/v/airflow-provider-kafka)![PyPI - Downloads](https://img.shields.io/pypi/dm/airflow-provider-kafka)


An airflow provider to: 
- interact with kafka clusters
- read from topics
- write to topics
- wait for specific messages to arrive to a topic

This package currently contains

3 hooks (`airflow_provider_kafka.hooks`) :
- `admin_client.KafkaAdminClientHook` - a hook to work against the actual kafka admin client
- `consumer.KafkaConsumerHook` - a hook that creates a consumer and provides it for interaction
- `producer.KafkaProducerHook` - a hook that creates a producer and provides it for interaction

4 operators (`airflow_provider_kafka.operators`) : 
- `await_message.AwaitKafkaMessageOperator` - a deferable operator (sensor) that awaits to encounter a message in the log before triggering down stream tasks.
- `consume_from_topic.ConsumeFromTopicOperator` - an operator that reads from a topic and applies a function to each message fetched. 
- `produce_to_topic.ProduceToTopicOperator` - an operator that uses a iterable to produce messages as key/value pairs to a kafka topic. 
- `event_triggers_function.EventTriggersFunctionOperator` - an operator that listens for messages on the topic and then triggers a downstream function before going back to listening.

1 trigger `airflow_provider_kafka.triggers` : 
- `await_message.AwaitMessageTrigger`


## Quick start

` pip install airflow-provider-kafka` 

Example usages : 
- [basic read/write/sense on a topic](example_dags/listener_dag_function.py)
- [event listener pattern](example_dags/listener_dag_function.py)

## FAQs 

**Why confluent kafka and not (other library) ?** A few reasons: the [confluent-kafka](https://github.com/confluentinc/confluent-kafka-python) library is guaranteed to be 1:1 functional with librdkafka, is faster, and is maintained by a company with a commercial stake in ensuring the continued quality and upkeep of it as a product. 

**Why not release this into airflow directly ?** I could probably make the PR and get it through, but the airflow code base is getting huge and I don't want to burden the maintainers with code that they don't own for maintainence. Also there's been multiple attempts to get a Kafka provider in before and this is just faster. 

**Why is most of the configuration handled in a dict ?** Because that's how `confluent-kafka` does it. I'd rather maintain interfaces that people already using kafka are comfortable with as a starting point - I'm happy to add more options/ interfaces in later but would prefer to be thoughtful about it to ensure that there difference between these operators and the actual client interface are minimal. 

## Local Development

### Unit Tests

Unit tests are located at `tests/unit`, a kafka server isn't required to run these tests.
execute with `pytest`


### Setup on M1 Mac
Installing on M1 chip means a brew install of the `librdkafka` library before you can `pip install confluent-kafka`
```bash
brew install librdkafka
export C_INCLUDE_PATH=/opt/homebrew/Cellar/librdkafka/1.8.2/include
export LIBRARY_PATH=/opt/homebrew/Cellar/librdkafka/1.8.2/lib
pip install confluent-kafka
```


