Skip to content

Commit b563f1b

Browse files
authored
Rewrite data pipeline tutorial to no longer use deprecated PostgresOperator (#49147)
1 parent 9fcba3d commit b563f1b

10 files changed

+120
-85
lines changed
-64.9 KB
Binary file not shown.
-134 KB
Binary file not shown.
107 KB
Loading
69 KB
Loading
105 KB
Loading
112 KB
Loading
68.7 KB
Loading
106 KB
Loading

airflow-core/docs/tutorial/pipeline.rst

Lines changed: 119 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,41 @@
1818
1919
2020
21-
Building a Running Pipeline
22-
===========================
21+
Building a Simple Data Pipeline
22+
===============================
2323

24-
Lets look at another example: we need to get some data from a file which is hosted online and insert it into our local database. We also need to look at removing duplicate rows while inserting.
24+
Welcome to the third tutorial in our series! At this point, you've already written your first DAG and used some basic
25+
operators. Now it's time to build a small but meaningful data pipeline -- one that retrieves data from an external
26+
source, loads it into a database, and cleans it up along the way.
2527

26-
*Be advised:* The operator used in this tutorial is `deprecated <https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/_api/airflow/providers/postgres/operators/postgres/index.html>`_.
27-
Its recommended successor, `SQLExecuteQueryOperator <https://airflow.apache.org/docs/apache-airflow-providers-common-sql/stable/_api/airflow/providers/common/sql/operators/sql/index.html#airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator>`_ works similarly.
28-
You might find `this guide <https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators/postgres_operator_howto_guide.html#creating-a-postgres-database-table>`_ helpful.
28+
This tutorial introduces the ``SQLExecuteQueryOperator``, a flexible and modern way to execute SQL in Airflow. We'll use
29+
it to interact with a local Postgres database, which we'll configure in the Airflow UI.
30+
31+
By the end of this tutorial, you'll have a working pipeline that:
32+
33+
- Downloads a CSV file
34+
- Loads the data into a staging table
35+
- Cleans the data and upserts it into a target table
36+
37+
Along the way, you'll gain hands-on experience with Airflow's UI, connection system, SQL execution, and DAG authoring
38+
patterns.
39+
40+
Want to go deeper as you go? Here are two helpful references:
41+
42+
- The `SQLExecuteQueryOperator <https://airflow.apache.org/docs/apache-airflow-providers-common-sql/stable/_api/airflow/providers/common/sql/operators/sql/index.html#airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator>`_ documentation
43+
- The `Postgres provider <https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/index.html>`_ documentation
44+
45+
Let's get started!
2946

3047
Initial setup
3148
-------------
3249

33-
We need to have Docker installed as we will be using the :doc:`/howto/docker-compose/index` procedure for this example.
34-
The steps below should be sufficient, but see the quick-start documentation for full instructions.
50+
.. caution::
51+
You'll need Docker installed to run this tutorial. We'll be using Docker Compose to launch Airflow locally. If you
52+
need help setting it up, check out the :doc:`Docker Compose quickstart guide </howto/docker-compose/index>`.
53+
54+
To run our pipeline, we need a working Airflow environment. Docker Compose makes this easy and safe -- no system-wide
55+
installs required. Just open your terminal and run the following:
3556

3657
.. code-block:: bash
3758
@@ -48,36 +69,58 @@ The steps below should be sufficient, but see the quick-start documentation for
4869
# Start up all services
4970
docker compose up
5071
51-
After all services have started up, the web UI will be available at: ``http://localhost:8080``. The default account has the username ``airflow`` and the password ``airflow``.
72+
Once Airflow is up and running, visit the UI at ``http://localhost:8080``.
73+
74+
Log in with:
75+
76+
- **Username:** ``airflow``
77+
- **Password:** ``airflow``
78+
79+
You'll land in the Airflow dashboard, where you can trigger DAGs, explore logs, and manage your environment.
5280

53-
We will also need to create a `connection <https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html>`_ to the postgres db. To create one via the web UI, from the "Admin" menu, select "Connections", then click the Plus sign to "Add a new record" to the list of connections.
81+
Create a Postgres Connection
82+
----------------------------
5483

55-
Fill in the fields as shown below. Note the Connection Id value, which we'll pass as a parameter for the ``postgres_conn_id`` kwarg.
84+
Before our pipeline can write to Postgres, we need to tell Airflow how to connect to it. In the UI, open the **Admin >
85+
Connections** page and click the + button to add a new
86+
`connection <https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html>`_.
5687

57-
- Connection Id: tutorial_pg_conn
58-
- Connection Type: postgres
59-
- Host: postgres
60-
- Schema: airflow
61-
- Login: airflow
62-
- Password: airflow
63-
- Port: 5432
88+
Fill in the following details:
6489

65-
Test your connection and if the test is successful, save your connection.
90+
- Connection ID: ``tutorial_pg_conn``
91+
- Connection Type: ``postgres``
92+
- Host: ``postgres``
93+
- Database: ``airflow`` (this is the default database in our container)
94+
- Login: ``airflow``
95+
- Password: ``airflow``
96+
- Port: ``5432``
6697

67-
Table Creation Tasks
68-
--------------------
98+
.. image:: ../img/ui-dark/tutorial_pipeline_add_connection.png
99+
:alt: Add Connection form in Airflow's web UI with Postgres details filled in.
69100

70-
We can use the `PostgresOperator <https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators/postgres_operator_howto_guide.html#creating-a-postgres-database-table>`_ to define tasks that create tables in our postgres db.
101+
|
71102
72-
We'll create one table to facilitate data cleaning steps (``employees_temp``) and another table to store our cleaned data (``employees``).
103+
Save the connection. This tells Airflow how to reach the Postgres database running in your Docker environment.
104+
105+
Next, we'll start building the pipeline that uses this connection.
106+
107+
Create tables for staging and final data
108+
----------------------------------------
109+
110+
Let's begin with table creation. We'll create two tables:
111+
112+
- ``employees_temp``: a staging table used for raw data
113+
- ``employees``: the cleaned and deduplicated destination
114+
115+
We'll use the ``SQLExecuteQueryOperator`` to run the SQL statements needed to create these tables.
73116

74117
.. code-block:: python
75118
76-
from airflow.providers.postgres.operators.postgres import PostgresOperator
119+
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
77120
78-
create_employees_table = PostgresOperator(
121+
create_employees_table = SQLExecuteQueryOperator(
79122
task_id="create_employees_table",
80-
postgres_conn_id="tutorial_pg_conn",
123+
conn_id="tutorial_pg_conn",
81124
sql="""
82125
CREATE TABLE IF NOT EXISTS employees (
83126
"Serial Number" NUMERIC PRIMARY KEY,
@@ -88,9 +131,9 @@ We'll create one table to facilitate data cleaning steps (``employees_temp``) an
88131
);""",
89132
)
90133
91-
create_employees_temp_table = PostgresOperator(
134+
create_employees_temp_table = SQLExecuteQueryOperator(
92135
task_id="create_employees_temp_table",
93-
postgres_conn_id="tutorial_pg_conn",
136+
conn_id="tutorial_pg_conn",
94137
sql="""
95138
DROP TABLE IF EXISTS employees_temp;
96139
CREATE TABLE employees_temp (
@@ -102,25 +145,13 @@ We'll create one table to facilitate data cleaning steps (``employees_temp``) an
102145
);""",
103146
)
104147
105-
Optional: Using SQL From Files
106-
------------------------------
148+
You can optionally place these SQL statements in ``.sql`` files inside your ``dags/`` folder and pass the file path to
149+
the ``sql=`` argument. This can be a great way to keep your DAG code clean.
107150

108-
If you want to abstract these sql statements out of your DAG, you can move the statements sql files somewhere within the ``dags/`` directory and pass the sql file_path (relative to ``dags/``) to the ``sql`` kwarg. For ``employees`` for example, create a ``sql`` directory in ``dags/``, put ``employees`` DDL in ``dags/sql/employees_schema.sql``, and modify the PostgresOperator() to:
151+
Load data into the staging table
152+
--------------------------------
109153

110-
.. code-block:: python
111-
112-
create_employees_table = PostgresOperator(
113-
task_id="create_employees_table",
114-
postgres_conn_id="tutorial_pg_conn",
115-
sql="sql/employees_schema.sql",
116-
)
117-
118-
and repeat for the ``employees_temp`` table.
119-
120-
Data Retrieval Task
121-
-------------------
122-
123-
Here we retrieve data, save it to a file on our Airflow instance, and load the data from that file into an intermediate table where we can execute data cleaning steps.
154+
Next, we'll download a CSV file, save it locally, and load it into ``employees_temp`` using the ``PostgresHook``.
124155

125156
.. code-block:: python
126157
@@ -153,10 +184,14 @@ Here we retrieve data, save it to a file on our Airflow instance, and load the d
153184
)
154185
conn.commit()
155186
156-
Data Merge Task
157-
---------------
187+
This task gives you a taste of combining Airflow with native Python and SQL hooks -- a common pattern in real-world
188+
pipelines.
158189

159-
Here we select completely unique records from the retrieved data, then we check to see if any employee ``Serial Numbers`` are already in the database (if they are, we update those records with the new data).
190+
Merge and clean the data
191+
------------------------
192+
193+
Now let's deduplicate the data and merge it into our final table. We'll write a task that runs a SQL `INSERT ... ON
194+
CONFLICT DO UPDATE`.
160195

161196
.. code-block:: python
162197
@@ -191,26 +226,10 @@ Here we select completely unique records from the retrieved data, then we check
191226
192227
193228
194-
Completing our DAG
195-
------------------
196-
197-
We've developed our tasks, now we need to wrap them in a DAG, which enables us to define when and how tasks should run, and state any dependencies that tasks have on other tasks. The DAG below is configured to:
198-
199-
* run every day at midnight starting on Jan 1, 2021,
200-
* only run once in the event that days are missed, and
201-
* timeout after 60 minutes
202-
203-
And from the last line in the definition of the ``process_employees`` DAG, we see:
204-
205-
.. code-block:: python
206-
207-
[create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()
208-
209-
* the ``merge_data()`` task depends on the ``get_data()`` task,
210-
* the ``get_data()`` depends on both the ``create_employees_table`` and ``create_employees_temp_table`` tasks, and
211-
* the ``create_employees_table`` and ``create_employees_temp_table`` tasks can run independently.
229+
Defining the DAG
230+
----------------
212231

213-
Putting all of the pieces together, we have our completed DAG.
232+
Now that we've defined all our tasks, it's time to put them together into a DAG.
214233

215234
.. code-block:: python
216235
@@ -221,7 +240,7 @@ Putting all of the pieces together, we have our completed DAG.
221240
import requests
222241
from airflow.sdk import dag, task
223242
from airflow.providers.postgres.hooks.postgres import PostgresHook
224-
from airflow.providers.postgres.operators.postgres import PostgresOperator
243+
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
225244
226245
227246
@dag(
@@ -232,9 +251,9 @@ Putting all of the pieces together, we have our completed DAG.
232251
dagrun_timeout=datetime.timedelta(minutes=60),
233252
)
234253
def ProcessEmployees():
235-
create_employees_table = PostgresOperator(
254+
create_employees_table = SQLExecuteQueryOperator(
236255
task_id="create_employees_table",
237-
postgres_conn_id="tutorial_pg_conn",
256+
conn_id="tutorial_pg_conn",
238257
sql="""
239258
CREATE TABLE IF NOT EXISTS employees (
240259
"Serial Number" NUMERIC PRIMARY KEY,
@@ -245,9 +264,9 @@ Putting all of the pieces together, we have our completed DAG.
245264
);""",
246265
)
247266
248-
create_employees_temp_table = PostgresOperator(
267+
create_employees_temp_table = SQLExecuteQueryOperator(
249268
task_id="create_employees_temp_table",
250-
postgres_conn_id="tutorial_pg_conn",
269+
conn_id="tutorial_pg_conn",
251270
sql="""
252271
DROP TABLE IF EXISTS employees_temp;
253272
CREATE TABLE employees_temp (
@@ -312,25 +331,40 @@ Putting all of the pieces together, we have our completed DAG.
312331
313332
dag = ProcessEmployees()
314333
315-
Save this code to a python file in the ``/dags`` folder (e.g. ``dags/process_employees.py``) and (after a `brief delay <https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dag-dir-list-interval>`_), the ``process_employees`` DAG will be included in the list of available dags on the web UI.
334+
Save this DAG as ``dags/process_employees.py``. After a short delay, it will show up in the UI.
335+
336+
Trigger and explore your DAG
337+
----------------------------
338+
339+
Open the Airflow UI and find the ``process_employees`` DAG in the list. Toggle it "on" using the slider, then trigger a
340+
run using the play button.
316341

317-
.. image:: ../img/tutorial-pipeline-1.png
342+
You can watch each task as it runs in the **Grid** view, and explore logs for each step.
318343

319-
You can trigger the ``process_employees`` DAG by unpausing it (via the slider on the left end) and running it (via the Run button under **Actions**).
344+
.. image:: ../img/ui-dark/tutorial_pipeline_dag_list.png
345+
:alt: DAG List view showing the ``process_employees`` DAG
320346

321-
.. image:: ../img/tutorial-pipeline-2.png
347+
|
322348
323-
In the ``process_employees`` DAG's **Grid** view, we see all that all tasks ran successfully in all executed runs. Success!
349+
.. image:: ../img/ui-dark/tutorial_pipeline_dag_overview_processed.png
350+
:alt: DAG Overview page for ``process_employees`` DAG showing the DAG run
351+
352+
|
353+
354+
Once it succeeds, you'll have a fully working pipeline that integrates data from the outside world, loads it into
355+
Postgres, and keeps it clean.
324356

325357
What's Next?
326358
-------------
327-
You now have a pipeline running inside Airflow using Docker Compose. Here are a few things you might want to do next:
359+
360+
Nice work! You've now built a real pipeline using Airflow's core patterns and tools. Here are a few ideas for where to
361+
go next:
362+
363+
- Try swapping in a different SQL provider, like MySQL or SQLite.
364+
- Split your DAG into TaskGroups or refactor into a more usable pattern.
365+
- Add an alerting step or send a notification when data is processed.
328366

329367
.. seealso::
330-
- Take an in-depth tour of the UI - click all the things! see what they do!
331-
- Keep reading the docs
332-
- Review the :doc:`how-to guides</howto/index>`, which include a guide for writing your own operator
333-
- Review the :ref:`Command Line Interface Reference<cli>`
334-
- Review the :ref:`List of operators <pythonapi:operators>`
335-
- Review the :ref:`Macros reference<macros>`
336-
- Write your first pipeline
368+
- Browse more how-to guides in the :doc:`Airflow documentation </howto/index>`
369+
- Explore the `SQL provider reference <https://airflow.apache.org/docs/apache-airflow-providers-common-sql/stable/>`_
370+
- Learn how to :doc:`write your own custom operator </howto/custom-operator>`

docs/spelling_wordlist.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,7 @@ decrypted
466466
Decrypts
467467
dedented
468468
deduplicate
469+
deduplicated
469470
deduplication
470471
deepcopy
471472
deepcopying

0 commit comments

Comments
 (0)