|
20 | 20 |
|
21 | 21 | .. include:: /../../../devel-common/src/sphinx_exts/includes/providers-configurations-ref.rst |
22 | 22 | .. include:: /../../../devel-common/src/sphinx_exts/includes/sections-and-options.rst |
| 23 | + |
| 24 | + |
| 25 | +Highlighted configurations |
| 26 | +=========================== |
| 27 | + |
| 28 | +.. _configuration_transport:openlineage: |
| 29 | + |
| 30 | +Transport setup |
| 31 | +---------------- |
| 32 | + |
| 33 | +At minimum, one thing that needs to be set up for OpenLineage to function is ``Transport`` - where do you wish for |
| 34 | +your events to end up - for example `Marquez <https://marquezproject.ai/>`_. |
| 35 | + |
| 36 | +Transport as JSON string |
| 37 | +^^^^^^^^^^^^^^^^^^^^^^^^ |
| 38 | +The ``transport`` option in OpenLineage section of Airflow configuration is used for that purpose. |
| 39 | + |
| 40 | +.. code-block:: ini |
| 41 | +
|
| 42 | + [openlineage] |
| 43 | + transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"} |
| 44 | +
|
| 45 | +``AIRFLOW__OPENLINEAGE__TRANSPORT`` environment variable is an equivalent. |
| 46 | + |
| 47 | +.. code-block:: ini |
| 48 | +
|
| 49 | + AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}' |
| 50 | +
|
| 51 | +
|
| 52 | +If you want to look at OpenLineage events without sending them anywhere, you can set up ``ConsoleTransport`` - the events will end up in task logs. |
| 53 | + |
| 54 | +.. code-block:: ini |
| 55 | +
|
| 56 | + [openlineage] |
| 57 | + transport = {"type": "console"} |
| 58 | +
|
| 59 | +.. note:: |
| 60 | + For full list of built-in transport types, specific transport's options or instructions on how to implement your custom transport, refer to |
| 61 | + `Python client documentation <https://openlineage.io/docs/client/python/configuration#transports>`_. |
| 62 | + |
| 63 | +Transport as config file |
| 64 | +^^^^^^^^^^^^^^^^^^^^^^^^ |
| 65 | +You can also configure OpenLineage ``Transport`` using a YAML file (f.e. ``openlineage.yml``). |
| 66 | +Provide the path to the YAML file as ``config_path`` option in Airflow configuration. |
| 67 | + |
| 68 | +.. code-block:: ini |
| 69 | +
|
| 70 | + [openlineage] |
| 71 | + config_path = '/path/to/openlineage.yml' |
| 72 | +
|
| 73 | +``AIRFLOW__OPENLINEAGE__CONFIG_PATH`` environment variable is an equivalent. |
| 74 | + |
| 75 | +.. code-block:: ini |
| 76 | +
|
| 77 | + AIRFLOW__OPENLINEAGE__CONFIG_PATH='/path/to/openlineage.yml' |
| 78 | +
|
| 79 | +Example content of config YAML file: |
| 80 | + |
| 81 | +.. code-block:: ini |
| 82 | +
|
| 83 | + transport: |
| 84 | + type: http |
| 85 | + url: https://backend:5000 |
| 86 | + endpoint: events/receive |
| 87 | + auth: |
| 88 | + type: api_key |
| 89 | + apiKey: f048521b-dfe8-47cd-9c65-0cb07d57591e |
| 90 | +
|
| 91 | +.. note:: |
| 92 | + |
| 93 | + Detailed description, together with example config files, can be found `in Python client documentation <https://openlineage.io/docs/client/python/configuration#transports>`_. |
| 94 | + |
| 95 | + |
| 96 | +Configuration precedence |
| 97 | +^^^^^^^^^^^^^^^^^^^^^^^^^ |
| 98 | + |
| 99 | +Primary, and recommended method of configuring OpenLineage Airflow Provider is Airflow configuration. |
| 100 | +As there are multiple possible ways of configuring OpenLineage, it's important to keep in mind the precedence of different configurations. |
| 101 | +OpenLineage Airflow Provider looks for the configuration in the following order: |
| 102 | + |
| 103 | +1. Check ``config_path`` in ``airflow.cfg`` under ``openlineage`` section (or AIRFLOW__OPENLINEAGE__CONFIG_PATH environment variable) |
| 104 | +2. Check ``transport`` in ``airflow.cfg`` under ``openlineage`` section (or AIRFLOW__OPENLINEAGE__TRANSPORT environment variable) |
| 105 | +3. If all the above options are missing, the OpenLineage Python client used underneath looks for configuration in the order described in `this <https://openlineage.io/docs/client/python/configuration>`_ documentation. Please note that **using Airflow configuration is encouraged** and is the only future proof solution. |
| 106 | + |
| 107 | + |
| 108 | +.. _configuration_selective_enable:openlineage: |
| 109 | + |
| 110 | +Enabling OpenLineage on Dag/task level |
| 111 | +--------------------------------------- |
| 112 | + |
| 113 | +One can selectively enable OpenLineage for specific Dags and tasks by using the ``selective_enable`` policy. |
| 114 | +To enable this policy, set the ``selective_enable`` option to True in the [openlineage] section of your Airflow configuration file: |
| 115 | + |
| 116 | +.. code-block:: ini |
| 117 | +
|
| 118 | + [openlineage] |
| 119 | + selective_enable = True |
| 120 | +
|
| 121 | +``AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE`` environment variable is an equivalent. |
| 122 | + |
| 123 | +.. code-block:: ini |
| 124 | +
|
| 125 | + AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE=true |
| 126 | +
|
| 127 | +
|
| 128 | +While ``selective_enable`` enables selective control, the ``disabled`` option still has precedence. |
| 129 | +If you set ``disabled`` to True in the configuration, OpenLineage will be disabled for all Dags and tasks regardless of the ``selective_enable`` setting. |
| 130 | + |
| 131 | +Once the ``selective_enable`` policy is enabled, you can choose to enable OpenLineage |
| 132 | +for individual Dags and tasks using the ``enable_lineage`` and ``disable_lineage`` functions. |
| 133 | + |
| 134 | +1. Enabling Lineage on a Dag: |
| 135 | + |
| 136 | +.. code-block:: python |
| 137 | +
|
| 138 | + from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage |
| 139 | +
|
| 140 | + with enable_lineage(Dag(...)): |
| 141 | + # Tasks within this Dag will have lineage tracking enabled |
| 142 | + MyOperator(...) |
| 143 | +
|
| 144 | + AnotherOperator(...) |
| 145 | +
|
| 146 | +2. Enabling Lineage on a Task: |
| 147 | + |
| 148 | +While enabling lineage on a Dag implicitly enables it for all tasks within that Dag, you can still selectively disable it for specific tasks: |
| 149 | + |
| 150 | +.. code-block:: python |
| 151 | +
|
| 152 | + from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage |
| 153 | +
|
| 154 | + with DAG(...) as dag: |
| 155 | + t1 = MyOperator(...) |
| 156 | + t2 = AnotherOperator(...) |
| 157 | +
|
| 158 | + # Enable lineage for the entire Dag |
| 159 | + enable_lineage(dag) |
| 160 | +
|
| 161 | + # Disable lineage for task t1 |
| 162 | + disable_lineage(t1) |
| 163 | +
|
| 164 | +Enabling lineage on the Dag level automatically enables it for all tasks within that Dag unless explicitly disabled per task. |
| 165 | + |
| 166 | +Enabling lineage on the task level implicitly enables lineage on its Dag. |
| 167 | +This is because each emitting task sends a `ParentRunFacet <https://openlineage.io/docs/spec/facets/run-facets/parent_run>`_, |
| 168 | +which requires the Dag-level lineage to be enabled in some OpenLineage backend systems. |
| 169 | +Disabling Dag-level lineage while enabling task-level lineage might cause errors or inconsistencies. |
| 170 | + |
| 171 | + |
| 172 | +.. _configuration_custom_facets:openlineage: |
| 173 | + |
| 174 | +Custom Facets |
| 175 | +-------------- |
| 176 | +To learn more about facets in OpenLineage, please refer to `facet documentation <https://openlineage.io/docs/spec/facets/>`_. |
| 177 | + |
| 178 | +The OpenLineage spec might not contain all the facets you need to write your extractor, |
| 179 | +in which case you will have to make your own `custom facets <https://openlineage.io/docs/spec/facets/custom-facets>`_. |
| 180 | + |
| 181 | +You can also inject your own custom facets in the lineage event's run facet using the ``custom_run_facets`` Airflow configuration. |
| 182 | + |
| 183 | +Steps to be taken, |
| 184 | + |
| 185 | +1. Write a function that returns the custom facets. You can write as many custom facet functions as needed. |
| 186 | +2. Register the functions using the ``custom_run_facets`` Airflow configuration. |
| 187 | + |
| 188 | +Airflow OpenLineage listener will automatically execute these functions during the lineage event generation and append their return values to the run facet in the lineage event. |
| 189 | + |
| 190 | +Writing a custom facet function |
| 191 | +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
| 192 | + |
| 193 | +- **Input arguments:** The function should accept two input arguments: ``TaskInstance`` and ``TaskInstanceState``. |
| 194 | +- **Function body:** Perform the logic needed to generate the custom facets. The custom facets must inherit from the ``RunFacet`` for the ``_producer`` and ``_schemaURL`` to be automatically added for the facet. |
| 195 | +- **Return value:** The custom facets to be added to the lineage event. Return type should be ``dict[str, RunFacet]`` or ``None``. You may choose to return ``None``, if you do not want to add custom facets for certain criteria. |
| 196 | + |
| 197 | +**Example custom facet function** |
| 198 | + |
| 199 | +.. code-block:: python |
| 200 | +
|
| 201 | + import attrs |
| 202 | + from airflow.models.taskinstance import TaskInstance, TaskInstanceState |
| 203 | + from airflow.providers.common.compat.openlineage.facet import RunFacet |
| 204 | +
|
| 205 | +
|
| 206 | + @attrs.define |
| 207 | + class MyCustomRunFacet(RunFacet): |
| 208 | + """Define a custom facet.""" |
| 209 | +
|
| 210 | + name: str |
| 211 | + jobState: str |
| 212 | + uniqueName: str |
| 213 | + displayName: str |
| 214 | + dagId: str |
| 215 | + taskId: str |
| 216 | + cluster: str |
| 217 | + custom_metadata: dict |
| 218 | +
|
| 219 | +
|
| 220 | + def get_my_custom_facet( |
| 221 | + task_instance: TaskInstance, ti_state: TaskInstanceState |
| 222 | + ) -> dict[str, RunFacet] | None: |
| 223 | + operator_name = task_instance.task.operator_name |
| 224 | + custom_metadata = {} |
| 225 | + if operator_name == "BashOperator": |
| 226 | + return None |
| 227 | + if ti_state == TaskInstanceState.FAILED: |
| 228 | + custom_metadata["custom_key_failed"] = "custom_value" |
| 229 | + job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}" |
| 230 | + return { |
| 231 | + "additional_run_facet": MyCustomRunFacet( |
| 232 | + name="test-lineage-namespace", |
| 233 | + jobState=task_instance.state, |
| 234 | + uniqueName=job_unique_name, |
| 235 | + displayName=f"{task_instance.dag_id}.{task_instance.task_id}", |
| 236 | + dagId=task_instance.dag_id, |
| 237 | + taskId=task_instance.task_id, |
| 238 | + cluster="TEST", |
| 239 | + custom_metadata=custom_metadata, |
| 240 | + ) |
| 241 | + } |
| 242 | +
|
| 243 | +Register the custom facet functions |
| 244 | +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
| 245 | + |
| 246 | +Use the ``custom_run_facets`` Airflow configuration to register the custom run facet functions by passing |
| 247 | +a string of semicolon separated full import path to the functions. |
| 248 | + |
| 249 | +.. code-block:: ini |
| 250 | +
|
| 251 | + [openlineage] |
| 252 | + transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"} |
| 253 | + custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function |
| 254 | +
|
| 255 | +``AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS`` environment variable is an equivalent. |
| 256 | + |
| 257 | +.. code-block:: ini |
| 258 | +
|
| 259 | + AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function' |
| 260 | +
|
| 261 | +.. note:: |
| 262 | + |
| 263 | + - The custom facet functions are executed both at the START and COMPLETE/FAIL of the TaskInstance and added to the corresponding OpenLineage event. |
| 264 | + - When creating conditions on TaskInstance state, you should use second argument provided (``TaskInstanceState``) that will contain the state the task should be in. This may vary from ti.current_state() as the OpenLineage listener may get called before the TaskInstance's state is updated in Airflow database. |
| 265 | + - When path to a single function is registered more than once, it will still be executed only once. |
| 266 | + - When duplicate custom facet keys are returned by multiple functions registered, the result of random function result will be added to the lineage event. Please avoid using duplicate facet keys as it can produce unexpected behaviour. |
| 267 | + |
| 268 | + |
| 269 | +.. _configuration_backwards_compatibility:openlineage: |
| 270 | + |
| 271 | +Backwards compatibility |
| 272 | +------------------------ |
| 273 | + |
| 274 | +.. warning:: |
| 275 | + |
| 276 | + Below variables **should not** be used and can be removed in the future. Consider using Airflow configuration (described above) for a future proof solution. |
| 277 | + |
| 278 | +For backwards compatibility with ``openlineage-airflow`` package, some environment variables are still available: |
| 279 | + |
| 280 | +- ``OPENLINEAGE_DISABLED`` is an equivalent of ``AIRFLOW__OPENLINEAGE__DISABLED``. |
| 281 | +- ``OPENLINEAGE_CONFIG`` is an equivalent of ``AIRFLOW__OPENLINEAGE__CONFIG_PATH``. |
| 282 | +- ``OPENLINEAGE_NAMESPACE`` is an equivalent of ``AIRFLOW__OPENLINEAGE__NAMESPACE``. |
| 283 | +- ``OPENLINEAGE_EXTRACTORS`` is an equivalent of setting ``AIRFLOW__OPENLINEAGE__EXTRACTORS``. |
| 284 | +- ``OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE`` is an equivalent of ``AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE``. |
| 285 | +- ``OPENLINEAGE_URL`` can be used to set up simple http transport. This method has some limitations and may require using other environment variables to achieve desired output. See `docs <https://openlineage.io/docs/client/python/configuration#transports>`_. |
0 commit comments