Skip to content

Commit 40123ee

Browse files
author
songzhendong
committed
Add Airflow Monitor (#12071
1 parent 3156d47 commit 40123ee

File tree

26 files changed

+3915
-7
lines changed

26 files changed

+3915
-7
lines changed

.github/workflows/skywalking.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,8 @@ jobs:
650650
config: test/e2e-v2/cases/rocketmq/e2e.yaml
651651
- name: ClickHouse
652652
config: test/e2e-v2/cases/clickhouse/clickhouse-prometheus-endpoint/e2e.yaml
653+
- name: Airflow
654+
config: test/e2e-v2/cases/airflow/airflow-main/e2e.yaml
653655

654656
- name: UI Menu BanyanDB
655657
config: test/e2e-v2/cases/menu/banyandb/e2e.yaml

docs/en/changes/changes.md

+9-2
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@
8080
* Support displaying the port services listen to from OAP and UI during server start.
8181
* Refactor data-generator to support generating metrics.
8282
* Fix `AvgHistogramPercentileFunction` legacy name.
83+
* Support Airflow monitoring.
84+
* Add Service Hierarchy auto matching layer relationships (upper -> lower) as following:
85+
- AIRFLOW -> K8S_SERVICE
8386

8487
#### UI
8588

@@ -103,7 +106,11 @@
103106
* Enhance VNode logic and support multiple Trace IDs in span's ref.
104107
* Add the layers filed and associate layers dashboards for the service topology nodes.
105108
* Fix `Nginx-Instance` metrics to instance level.
106-
* Update tabs of the Kubernetes service page.
109+
* Update tabs of the Kubernetes service page.
110+
* Support Airflow monitoring.
111+
* Add Airflow menu i18n.
112+
* Add Support for dragging in the trace panel.
113+
* Add workflow icon.
107114

108115
#### Documentation
109116

@@ -130,6 +137,6 @@
130137
* Remove `OpenTelemetry Exporter` support from meter doc, as this has been flagged as unmaintained on OTEL upstream.
131138
* Add doc of one-line quick start script for different storage types.
132139
* Add FAQ for `Why is Clickhouse or Loki or xxx not supported as a storage option?`.
133-
* Add Airflow monitoring.
140+
* Add Airflow monitoring docs.
134141

135142
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/202?closed=1)

docs/en/concepts-and-designs/service-hierarchy.md

+8
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ If you want to customize it according to your own needs, please refer to [Servic
3636
| VIRTUAL_DATABASE | CLICKHOUSE | [VIRTUAL_DATABASE On CLICKHOUSE](#virtual_database-on-clickhouse) |
3737
| PULSAR | K8S_SERVICE | [PULSAR On K8S_SERVICE](#pulsar-on-k8s_service) |
3838
| VIRTUAL_MQ | PULSAR | [VIRTUAL_MQ On PULSAR](#virtual_mq-on-pulsar) |
39+
| AIRFLOW | K8S_SERVICE | [AIRFLOW On K8S_SERVICE](#airflow-on-k8s_service) |
3940

4041
- The following sections will describe the **default matching rules** in detail and use the `upper-layer On lower-layer` format.
4142
- The example service name are based on SkyWalking [Showcase](https://github.com/apache/skywalking-showcase) default deployment.
@@ -216,6 +217,13 @@ If you want to customize it according to your own needs, please refer to [Servic
216217
- Matched Example:
217218
- VIRTUAL_MQ.service.name: `pulsar.skywalking-showcase.svc.cluster.local:6650`
218219
- PULSAR.service.name: `pulsar::pulsar.skywalking-showcase`
220+
#### AIRFLOW On K8S_SERVICE
221+
- Rule name: `short-name`
222+
- Groovy script: `{ (u, l) -> u.shortName == l.shortName }`
223+
- Description: AIRFLOW.service.shortName == K8S_SERVICE.service.shortName
224+
- Matched Example:
225+
- AIRFLOW.service.name: `airflow::airflow.skywalking-showcase`
226+
- K8S_SERVICE.service.name: `skywalking-showcase::airflow.skywalking-showcase`
219227

220228
### Build Through Specific Agents
221229
Use agent tech involved(such as eBPF) and deployment tools(such as operator and agent injector) detect the service hierarchy relations.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Apache Airflow monitoring
2+
## Airflow server performance from 'OpenTelemetry'
3+
SkyWalking leverages Airflow for collecting metrics data. It leverages OpenTelemetry Collector to transfer the metrics to
4+
[OpenTelemetry receiver](opentelemetry-receiver.md) and into the [Meter System](./../../concepts-and-designs/meter.md).
5+
6+
### Data flow
7+
8+
1. Airflow sending metrics to OpenTelemetry Collector,OpenTelemetry Collector pushes metrics to SkyWalking OAP Server via OpenTelemetry exporter.
9+
2. The SkyWalking OAP Server parses the expression with [MAL](../../concepts-and-designs/mal.md) to filter/calculate/aggregate and store the results.
10+
11+
### Set up
12+
1. Set up [Airflow](https://airflow.apache.org/docs/apache-airflow/stable/start.html).
13+
2. Set up [Airflow[otel]](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/metrics.html).
14+
3. Set up [OpenTelemetry Collector ](https://opentelemetry.io/docs/collector/getting-started/#docker). For details on Receiver in OpenTelemetry Collector, refer to [here](../../../../test/e2e-v2/cases/airflow/airflow-exporter/otel-collector-config.yaml).
15+
4. Config SkyWalking [OpenTelemetry receiver](opentelemetry-receiver.md).
16+
17+
### Airflow Monitoring
18+
Airflow monitoring provides monitoring of the status and resources of Airflow . Airflow is cataloged as a `Layer: Airflow` `Service` in OAP.
19+
#### Airflow Service Supported Metrics
20+
21+
| Monitoring Panel | Unit | Metric Name | Description | Data Source |
22+
|-----|------|-----|-----|-----|
23+
| Airflow Job Started | count | <job_name>_start | Number of started job | OpenTelemetry from Airflow |
24+
| Tasks Executable | count | scheduler.tasks.executable | Number of tasks that are ready for execution (set to queued) with respect to pool limits, DAG concurrency, executor state, and priority. | OpenTelemetry from Airflow |
25+
| Tasks Cleared | count | scheduler.orphaned_tasks.cleared | Number of Orphaned tasks cleared by the Scheduler | OpenTelemetry from Airflow |
26+
| Tasks Adopted | count | scheduler.orphaned_tasks.adopted | Number of Orphaned tasks adopted by the Scheduler | OpenTelemetry from Airflow |
27+
| Queued Tasks | count | executor.queued_tasks | Number of queued tasks on executor | OpenTelemetry from Airflow |
28+
| Pool Open Slots | count | executor.open_slots | Number of open slots on executor | OpenTelemetry from Airflow |
29+
| Pool Queued Slots | count | pool.queued_slots | Number of queued slots in the pool. Metric with pool_name tagging. | OpenTelemetry from Airflow |
30+
| Deferred Slots | count | pool.deferred_slots | Number of deferred slots in the pool. Metric with pool_name tagging. | OpenTelemetry from Airflow |
31+
| Scheduler Heartbeat | rate | scheduler_heartbeat | Scheduler heartbeats | OpenTelemetry from Airflow |
32+
| DAG File Queue Size | count | dag_processing.file_path_queue_size | Number of DAG files to be considered for the next scan | OpenTelemetry from Airflow |
33+
| Dataset Updates | count | dataset.updates | Number of updated datasets | OpenTelemetry from Airflow |
34+
35+
### Airflow Instance Supported Metrics
36+
| Monitoring Panel | Unit | Metric Name | Description | Data Source |
37+
|---------------------------|--------------|-----|-----|-----|
38+
| Airflow Job Started | count | <job_name>_start | Number of started job | OpenTelemetry from Airflow |
39+
| Pool Open Slots | count | pool.open_slots|Number of open slots in the pool. Metric with pool_name tagging| OpenTelemetry from Airflow |
40+
| Pool Deferred Slots | count | pool.deferred_slots| Number of deferred slots in the pool. Metric with pool_name tagging| OpenTelemetry from Airflow |
41+
| Pool Running Slots | count | pool.running_slots| Number of running slots in the pool. Metric with pool_name tagging.| OpenTelemetry from Airflow |
42+
| Triggerer Heartbeat | rate | triggerer_heartbeat | Number of open slots on executor | OpenTelemetry from Airflow |
43+
| Triggers Main Thread | count | triggers.blocked_main_thread| Number of triggers that blocked the main thread (likely due to not being fully asynchronous)| OpenTelemetry from Airflow |
44+
| Triggers Succeeded | count | pool.deferred_slots | Number of deferred slots in the pool. Metric with pool_name tagging. | OpenTelemetry from Airflow |
45+
| triggers Failed | count|triggers.failed| Number of triggers that errored before they could fire an event| OpenTelemetry from Airflow |
46+
| Tasks Executable | count | scheduler.tasks.executable | Number of tasks that are ready for execution (set to queued) with respect to pool limits, DAG concurrency, executor state, and priority. | OpenTelemetry from Airflow |
47+
| Tasks Cleared | count | scheduler.orphaned_tasks.cleared | Number of Orphaned tasks cleared by the Scheduler | OpenTelemetry from Airflow |
48+
| Tasks Adopted | count | scheduler.orphaned_tasks.adopted | Number of Orphaned tasks adopted by the Scheduler | OpenTelemetry from Airflow |
49+
| Queued Tasks | count | executor.queued_tasks | Number of queued tasks on executor | OpenTelemetry from Airflow |
50+
| Dataset Updates | count | dataset.updates | Number of updated datasets | OpenTelemetry from Airflow |
51+
| Dataset Orphaned | count | dataset.orphaned| Number of datasets marked as orphans because they are no longer referenced in DAG schedule parameters or task outlets| OpenTelemetry from Airflow |
52+
| Dataset Triggered Dagruns | count | dataset.triggered_dagruns | Number of DAG runs triggered by a dataset update| OpenTelemetry from Airflow |
53+
54+
55+
### Customizations
56+
You can customize your own metrics/expression/dashboard panel.
57+
The metrics definition and expression rules are found in `/config/otel-rules/airflow`.
58+
The Airflow dashboard panel configurations are found in `/config/ui-initialized-templates/airflow`.

docs/en/setup/backend/opentelemetry-receiver.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,6 @@ for identification of the metric data.
5858
| Metrics of ClickHouse | otel-rules/clickhouse/clickhouse-service.yaml | ClickHouse(embedded prometheus endpoint) -> OpenTelemetry Collector -- OTLP exporter --> SkyWalking OAP Server |
5959
| Metrics of RocketMQ | otel-rules/rocketmq/rocketmq-cluster.yaml | rocketmq-exporter -> OpenTelemetry Collector -- OTLP exporter --> SkyWalking OAP Server |
6060
| Metrics of RocketMQ | otel-rules/rocketmq/rocketmq-broker.yaml | rocketmq-exporter -> OpenTelemetry Collector -- OTLP exporter --> SkyWalking OAP Server |
61-
| Metrics of RocketMQ | otel-rules/rocketmq/rocketmq-topic.yaml | rocketmq-exporter -> OpenTelemetry Collector -- OTLP exporter --> SkyWalking OAP Server |
61+
| Metrics of RocketMQ | otel-rules/rocketmq/rocketmq-topic.yaml | rocketmq-exporter -> OpenTelemetry Collector -- OTLP exporter --> SkyWalking OAP Server |
62+
| Metrics of Airflow | otel-rules/airflow/airflow-instance.yaml | Airflow-> OpenTelemetry Collector -- OTLP exporter --> SkyWalking OAP Server |
63+
| Metrics of Airflow | otel-rules/airflow/airflow-service.yaml | Airflow-> OpenTelemetry Collector -- OTLP exporter --> SkyWalking OAP Server |

docs/en/swip/SWIP-7.md

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
## **Motivation**
2+
Apache Airflow is an open-source workflow management platform primarily used for scheduling and monitoring workflows, It can be used to handle complex data pipelines and has been widely applied in the fields of data engineering and data science. Airflow allows users to write workflows, which are called DAGs (Directed Acyclic Graphs). Each DAG contains a series of tasks that can be executed in a specific sequence and dependency relationship, Due to its support for multitasking in complex scenarios, monitoring the health and operational status of Airflow is crucial, Through these metrics, it is possible to help analyze task health status, formulate optimization plans, and design risk prevention strategies.
3+
4+
5+
## **Architecture Graph**
6+
7+
There is no significant architecture-level change.
8+
9+
## **Proposed Changes**
10+
```mermaid
11+
graph LR;
12+
AirflowOTEL("Airflow OTEL") --> OpenTelemetryCollector("OpenTelemetry Collector") --> SkyWalkingOTELReceiver("SkyWalking OTEL Receiver") --> SkyWalkingMALEngine("SkyWalking MAL Engine")
13+
--> SkyWalkingUI("SkyWalking UI")
14+
```
15+
1. Airflow sending metrics to OpenTelemetry Collector,OpenTelemetry Collector pushes metrics to SkyWalking OTEL
16+
Receiver via OpenTelemetry exporter.
17+
2. The SkyWalking OAP Server parses the expression with MAL to filter/calculate/aggregate and store the results.
18+
3. These metrics can be displayed via the SkyWalking UI, and the metrics can be customized for display on the UI dashboard.
19+
20+
#### Airflow Service Supported Metrics
21+
| Monitoring Panel | Unit | Metric Name | Description | Data Source |
22+
|-----|------|-----|-----|-----|
23+
| Airflow Job Started | count | <job_name>_start | Number of started job | OpenTelemetry from Airflow |
24+
| Tasks Executable | count | scheduler.tasks.executable | Number of tasks that are ready for execution (set to queued) with respect to pool limits, DAG concurrency, executor state, and priority. | OpenTelemetry from Airflow |
25+
| Tasks Cleared | count | scheduler.orphaned_tasks.cleared | Number of Orphaned tasks cleared by the Scheduler | OpenTelemetry from Airflow |
26+
| Tasks Adopted | count | scheduler.orphaned_tasks.adopted | Number of Orphaned tasks adopted by the Scheduler | OpenTelemetry from Airflow |
27+
| Queued Tasks | count | executor.queued_tasks | Number of queued tasks on executor | OpenTelemetry from Airflow |
28+
| Pool Open Slots | count | executor.open_slots | Number of open slots on executor | OpenTelemetry from Airflow |
29+
| Pool Queued Slots | count | pool.queued_slots | Number of queued slots in the pool. Metric with pool_name tagging. | OpenTelemetry from Airflow |
30+
| Deferred Slots | count | pool.deferred_slots | Number of deferred slots in the pool. Metric with pool_name tagging. | OpenTelemetry from Airflow |
31+
| Scheduler Heartbeat | rate | scheduler_heartbeat | Scheduler heartbeats | OpenTelemetry from Airflow |
32+
| DAG File Queue Size | count | dag_processing.file_path_queue_size | Number of DAG files to be considered for the next scan | OpenTelemetry from Airflow |
33+
| Dataset Updates | count | dataset.updates | Number of updated datasets | OpenTelemetry from Airflow |
34+
35+
### Airflow Instance Supported Metrics
36+
37+
| Monitoring Panel | Unit | Metric Name | Description | Data Source |
38+
|---------------------------|--------------|-----|-----|-----|
39+
| Airflow Job Started | count | <job_name>_start | Number of started job | OpenTelemetry from Airflow |
40+
| Pool Open Slots | count | pool.open_slots|Number of open slots in the pool. Metric with pool_name tagging| OpenTelemetry from Airflow |
41+
| Pool Deferred Slots | count | pool.deferred_slots| Number of deferred slots in the pool. Metric with pool_name tagging| OpenTelemetry from Airflow |
42+
| Pool Running Slots | count | pool.running_slots| Number of running slots in the pool. Metric with pool_name tagging.| OpenTelemetry from Airflow |
43+
| Triggerer Heartbeat | rate | triggerer_heartbeat | Number of open slots on executor | OpenTelemetry from Airflow |
44+
| Triggers Main Thread | count | triggers.blocked_main_thread| Number of triggers that blocked the main thread (likely due to not being fully asynchronous)| OpenTelemetry from Airflow |
45+
| Triggers Succeeded | count | pool.deferred_slots | Number of deferred slots in the pool. Metric with pool_name tagging. | OpenTelemetry from Airflow |
46+
| triggers Failed | count|triggers.failed| Number of triggers that errored before they could fire an event| OpenTelemetry from Airflow |
47+
| Tasks Executable | count | scheduler.tasks.executable | Number of tasks that are ready for execution (set to queued) with respect to pool limits, DAG concurrency, executor state, and priority. | OpenTelemetry from Airflow |
48+
| Tasks Cleared | count | scheduler.orphaned_tasks.cleared | Number of Orphaned tasks cleared by the Scheduler | OpenTelemetry from Airflow |
49+
| Tasks Adopted | count | scheduler.orphaned_tasks.adopted | Number of Orphaned tasks adopted by the Scheduler | OpenTelemetry from Airflow |
50+
| Queued Tasks | count | executor.queued_tasks | Number of queued tasks on executor | OpenTelemetry from Airflow |
51+
| Dataset Updates | count | dataset.updates | Number of updated datasets | OpenTelemetry from Airflow |
52+
| Dataset Orphaned | count | dataset.orphaned| Number of datasets marked as orphans because they are no longer referenced in DAG schedule parameters or task outlets| OpenTelemetry from Airflow |
53+
| Dataset Triggered Dagruns | count | dataset.triggered_dagruns | Number of DAG runs triggered by a dataset update| OpenTelemetry from Airflow |
54+
55+
If the metrics exists for both service and its instances , then the number displayed on the service dashboard is the sum of all instances
56+
## **Imported Dependencies libs and their licenses.**
57+
58+
No new dependency.
59+
60+
## **Compatibility**
61+
62+
no breaking changes.
63+
64+
## **General usage docs**

docs/en/swip/readme.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ All accepted and proposed SWIPs can be found in [here](https://github.com/apache
6868

6969
## Known SWIPs
7070

71-
Next SWIP Number: 6
71+
Next SWIP Number: 8
7272

7373
### Accepted SWIPs
74-
74+
- [SWIP-7 Support Apache Airflow Monitoring](SWIP-7.md)
7575
- [SWIP-5 Support ClickHouse Monitoring](SWIP-5.md)
7676
- [SWIP-4 Support available layers of service in the topology](SWIP-4.md)
7777
- [SWIP-3 Support RocketMQ Monitoring](SWIP-3.md)

docs/menu.yml

+8
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,10 @@ catalog:
188188
path: "/en/setup/service-agent/virtual-cache"
189189
- name: "Virtual MQ"
190190
path: "/en/setup/service-agent/virtual-mq"
191+
- name: "Workflow Scheduler"
192+
catalog:
193+
- name: "Airflow"
194+
path: "/en/setup/backend/backend-airflow-monitoring"
191195
- name: "Service Mesh"
192196
catalog:
193197
- name: "Observe Service Mesh through Access Log Service (ALS)"
@@ -264,6 +268,10 @@ catalog:
264268
path: "/en/setup/backend/backend-pulsar-monitoring"
265269
- name: "RocketMQ"
266270
path: "/en/setup/backend/backend-rocketmq-monitoring"
271+
- name: "Workflow Scheduler"
272+
catalog:
273+
- name: "Airflow"
274+
path: "/en/setup/backend/backend-airflow-monitoring"
267275
- name: "Self Observability"
268276
catalog:
269277
- name: "OAP self telemetry"

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Layer.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,12 @@ public enum Layer {
223223
/**
224224
* A high-performance, column-oriented SQL database management system (DBMS) for online analytical processing (OLAP).
225225
*/
226-
CLICKHOUSE(36, true);
226+
CLICKHOUSE(36, true),
227+
228+
/**
229+
* A chedule and monitor workflows tool
230+
*/
231+
AIRFLOW(37, true);
227232

228233
private final int value;
229234
/**

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/management/ui/template/UITemplateInitializer.java

+1
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public class UITemplateInitializer {
7474
Layer.NGINX.name(),
7575
Layer.ROCKETMQ.name(),
7676
Layer.CLICKHOUSE.name(),
77+
Layer.AIRFLOW.name(),
7778
"custom"
7879
};
7980
private final UITemplateManagementService uiTemplateManagementService;

oap-server/server-starter/src/main/resources/application.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ receiver-otel:
353353
selector: ${SW_OTEL_RECEIVER:default}
354354
default:
355355
enabledHandlers: ${SW_OTEL_RECEIVER_ENABLED_HANDLERS:"otlp-metrics,otlp-logs"}
356-
enabledOtelMetricsRules: ${SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES:"apisix,nginx/*,k8s/*,istio-controlplane,vm,mysql/*,postgresql/*,oap,aws-eks/*,windows,aws-s3/*,aws-dynamodb/*,aws-gateway/*,redis/*,elasticsearch/*,rabbitmq/*,mongodb/*,kafka/*,pulsar/*,bookkeeper/*,rocketmq/*,clickhouse/*"}
356+
enabledOtelMetricsRules: ${SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES:"apisix,nginx/*,k8s/*,istio-controlplane,vm,mysql/*,postgresql/*,oap,aws-eks/*,windows,aws-s3/*,aws-dynamodb/*,aws-gateway/*,redis/*,elasticsearch/*,rabbitmq/*,mongodb/*,kafka/*,pulsar/*,bookkeeper/*,rocketmq/*,clickhouse/*,airflow/*"}
357357

358358
receiver-zipkin:
359359
selector: ${SW_RECEIVER_ZIPKIN:-}

oap-server/server-starter/src/main/resources/hierarchy-definition.yml

+4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ hierarchy:
6060
PULSAR:
6161
K8S_SERVICE: short-name
6262

63+
AIRFLOW:
64+
K8S_SERVICE: short-name
65+
6366
VIRTUAL_DATABASE:
6467
MYSQL: lower-short-name-with-fqdn
6568
POSTGRESQL: lower-short-name-with-fqdn
@@ -103,6 +106,7 @@ layer-levels:
103106
RABBITMQ: 2
104107
KAFKA: 2
105108
PULSAR: 2
109+
AIRFLOW: 2
106110

107111
MESH_DP: 1
108112

0 commit comments

Comments
 (0)