Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ __pycache__/

# C extensions
*.so
runtime/

# Distribution / packaging
.Python
Expand Down
52 changes: 51 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,51 @@
# airflow_etl
# estela-airflow-etl

This project manages an Apache Airflow in deployment with estela using Helm.

## Commands
You need to go to the `k8s_installation` dir.

### Build the Docker Image

```bash
make build
```
Builds the Airflow Docker image and tags it.
Default values:
- `PLATFORM=linux/amd64`
- `IMAGE_NAME=airflow`
- `TAG=latest`
- `REGISTRY=localhost:5001`

### Push the Docker Image

```bash
make push
```
Pushes the Docker image to the registry.

### Install Airflow

```bash
make install
```
Installs Airflow using Helm. To configure connections, specify the Airflow Docker image, manage resource limits, and more, modify the `override.yaml` file. The file includes detailed instructions on how to apply these settings and also allows you to set Git sync credentials.

Default values:
- `RELEASE_NAME=airflow`
- `NAMESPACE=airflow`
- `SSH_KEY=~/.ssh/dags_ssh`

### Uninstall Airflow

```bash
make uninstall
```
Removes the Airflow deployment from the cluster.

### Upgrade Airflow

```bash
make upgrade
```
If you make changes on values.yaml you should upgrade airflow using this command.
95 changes: 95 additions & 0 deletions consumer_manager/consumer_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import json
import requests
import base64

from confluent_kafka import Consumer, Producer

AIRFLOW_API = "http://localhost:8080"

class ConsumerProxy:
internal_queues = {}
internal_cnt = {}
batch_size = 200

@staticmethod
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

def __init__(self, conf):
conf = {
"bootstrap.servers": "localhost:29092",
"group.id": "consumer_manager",
"auto.offset.reset": "latest",
}
self.conf = conf
self.consumer = Consumer(self.conf)
self.consumer.subscribe(["^job_.*"])
self.producer = Producer(self.conf)


def process_message(self, msg):
dsd_msg = json.loads(msg.value().decode("utf-8"))
topic = msg.topic()
jid = dsd_msg["jid"]
if self.internal_queues.get(f"{jid}-{topic}", None) is None:
self.internal_queues[f"{jid}-{topic}"] = []
self.internal_cnt[f"{jid}-{topic}"] = 0
else:
self.internal_queues[f"{jid}-{topic}"].append(dsd_msg)
print(f"Queue size for {jid}-{topic}: {len(self.internal_queues[f'{jid}-{topic}'])}")
if len(self.internal_queues[f"{jid}-{topic}"]) >= self.batch_size:
for item in self.internal_queues[f"{jid}-{topic}"]:
self.producer.poll(0)
self.producer.produce(f"{jid}-{topic}-{self.internal_cnt[f'{jid}-{topic}']}", json.dumps(item).encode("UTF-8"), callback=self.delivery_report)
self.internal_queues[f"{jid}-{topic}"] = []
self.producer.flush()
job_id, spider_id, project_id = jid.split(".")
payload = {
"conf": {
"topic": f"{jid}-{topic}-{self.internal_cnt[f'{jid}-{topic}']}",
"batch_size": self.batch_size,
"mongo_database": project_id,
"mongo_collection": f"{job_id}-{spider_id}-{topic}",
}
}
self.internal_cnt[f"{jid}-{topic}"] = self.internal_cnt[f"{jid}-{topic}"] + 1
# Username and password for Airflow
username = "airflow"
password = "airflow"

# Create a basic authentication header
credentials = f"{username}:{password}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
headers = {
"Content-Type": "application/json",
# If authentication is required:
"Authorization": f"Basic {encoded_credentials}"
}
path = "/api/v1/dags/etl/dagRuns"
print(f"Triggering DAG: {AIRFLOW_API}{path}")
response = requests.post(f"{AIRFLOW_API}{path}", headers=headers, data=json.dumps(payload))
# Check the response
if response.status_code == 200:
print("DAG triggered successfully:", response.json())
else:
print(f"Failed to trigger DAG: {response.status_code} - {response.text}")

def consume(self):
while True:
msg = self.consumer.poll(20.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
self.process_message(msg)
#print("Received message: {}".format(msg.value().decode("utf-8")))

if __name__ == "__main__":
consumer_manager = ConsumerProxy({})
consumer_manager.consume()
105 changes: 105 additions & 0 deletions dags/etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import json
import os
import pendulum

#from kafka import KafkaConsumer
from airflow.providers.mongo.hooks.mongo import MongoHook
from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook

from airflow.decorators import task, dag
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator

now = pendulum.now()

KAFKA_BROKER = os.getenv("KAFKA_BROKER", "host.docker.internal:29092")

def split_list(lst, batch_size):
for i in range(0, len(lst), batch_size):
yield lst[i:i + batch_size]

default_args = {
'start_date': days_ago(1),
}

@dag(start_date=now, schedule=None, catchup=False)
def etl():
@task
def consume(**kwargs):
cnt = 0
batch_size = int(kwargs["params"]["batch_size"])
item_kafka_topic = kwargs["params"]["topic"]
kafka_hook = KafkaConsumerHook([item_kafka_topic], "estela-kafka")
consumer = kafka_hook.get_consumer()

print("Getting the following conf %s", str(kwargs["params"]))
message_list = []
while True:
if cnt >= batch_size:
break;
message = consumer.poll(2.0)
if message is None:
continue
if message.error():
print(f"Consumer error: {message.error()}")
continue
cnt += 1
print(message)
message_list.append(message.value().decode("utf-8"))
print(f"Received message: {message.value()}")

return message_list

@task # we expect to get a json items
def transform(items):
item_list = []
for item in items:
item_json = json.loads(item)
item_list.append(item_json)

return item_list

@task
def uploading_mongodb(items, **kwargs):
#topic = kwargs["params"]["topic"]
mongo_conn_id = kwargs["params"].get("mongo_conn_id", "estela-primary")
# Database and collection should be defined in kwargs.
mongo = MongoHook(mongo_conn_id=mongo_conn_id)
client = mongo.get_conn()
# job_id, spider_id, project_id, data_kind = topic.split(".")
database_str = kwargs["params"]["mongo_database"]
database = client.get_database(database_str)
collection = database.get_collection(kwargs["params"]["mongo_collection"])
inserted = collection.insert_many([item["payload"] for item in items])
if len(inserted.inserted_ids) == len(items):
outcome = "All documents were successfully inserted."
else:
outcome = "Not all documents were successfully inserted."
return outcome

data = consume()
items = transform(data)
uploading_mongodb(items)

etl()

# with DAG(
# dag_id="mongodb_read_dag",
# default_args=default_args,
# schedule_interval=None,
# catchup=False,
# ) as dag:
# def reading_mongodb():
# mongo = MongoHook(mongo_conn_id="estela-primary")
# client = mongo.get_conn()
# database = client.get_database("185c81f4-bc89-41c6-90f6-99b8eef7d876")
# collection = database.get_collection("129-210-job_items")
# print("Reading from MongoDB: ")
# print([item for item in collection.find()])
# reading_mongodb = PythonOperator(
# task_id="reading_mongodb",
# python_callable=reading_mongodb,
# provide_context=True,
# )
# reading_mongodb
10 changes: 10 additions & 0 deletions docker-installation/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM apache/airflow:latest

USER root

COPY --chown=airflow:root ./dags/ /usr/src/app/dags/
COPY --chown=airflow:root ./dags/ /opt/airflow/dags/
USER airflow
WORKDIR /usr/src/app
COPY requirements.txt .
RUN pip install --no-cache-dir "apache-airflow==${AIRFLOW_VERSION}" -r requirements.txt
Loading