Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-72: Task SDK support for on_task_instance_* listeners, make OpenLineage compatible #45294

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

mobuchowski
Copy link
Contributor

@mobuchowski mobuchowski commented Dec 30, 2024

With AIP-72, there is no access to the database session from the worker process, and the runtime objects have some differences to the db models. This PR contains three commits that deal with that situation:

  • First commit adjusts on_task_instance_* listeners interface to AIP-72: drops session argument and makes task_instance argument an instance of RuntimeTaskInstance class, not database model
  • Second commit adds basic support for calling listeners in Task SDK, additionally adding some context fields that allow OL and other listeners not use DB.
  • Third commit adjusts OpenLineage integration to work with new interface and Task SDK.

Some followup work:

  • wrap listener calls into Activity to make logging better visible from UI, and distinct from task logs
  • add separate interface for API to capture manual/API task changes. Impossible to reuse current interface since object would be a different type (not RuntimeTaskInstance)
  • add support for more types of listeners, more in task Ensure Listeners work with Task SDK #45491

closes #45423

@boring-cyborg boring-cyborg bot added area:Executors-core LocalExecutor & SequentialExecutor area:providers area:Scheduler including HA (high availability) scheduler area:task-sdk provider:openlineage AIP-53 labels Dec 30, 2024
@mobuchowski mobuchowski force-pushed the tasksdk-call-listeners branch 4 times, most recently from f07a78e to 81b886b Compare January 2, 2025 08:20
@potiuk
Copy link
Member

potiuk commented Jan 2, 2025

@mobuchowski - can you please rebase that one -> we found and issue with @jscheffl with the new caching scheme - fixed in #45347 that would run "main" version of the tests. I am asking in all affected PRs to rebase.

@potiuk potiuk force-pushed the tasksdk-call-listeners branch from 81b886b to 15fd95a Compare January 2, 2025 12:26
@potiuk
Copy link
Member

potiuk commented Jan 2, 2025

Actually - I rebased it now.

@mobuchowski
Copy link
Contributor Author

I will rebase very soon as I'm working on some of the test failures anyway 🙂

@mobuchowski mobuchowski force-pushed the tasksdk-call-listeners branch 7 times, most recently from 35a94e2 to 86297af Compare January 6, 2025 16:35
@mobuchowski mobuchowski marked this pull request as ready for review January 6, 2025 16:52
@mobuchowski mobuchowski force-pushed the tasksdk-call-listeners branch from 86297af to 6d88389 Compare January 6, 2025 17:04
@mobuchowski mobuchowski changed the title [draft] AIP-72: call on_task_instance_* listeners AIP-72: call on_task_instance_* listeners Jan 6, 2025
@mobuchowski mobuchowski force-pushed the tasksdk-call-listeners branch 5 times, most recently from 2bfe26b to e9a36fd Compare January 15, 2025 12:41
@@ -181,6 +182,7 @@ class DagRun(BaseModel):
data_interval_end: UtcDateTime | None
start_date: UtcDateTime
end_date: UtcDateTime | None
clear_number: int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's odd to clear_number in the DagRun datamodel!

What do we need this for? Can we get it from somewhere else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a part of the current DagRun model :)

clear_number = Column(Integer, default=0, nullable=False, server_default="0")

We need it to properly generate DR uuid, so that events from different physical executions of a dag run aren't mixed up:

data=f"{conf.namespace()}.{dag_id}.{clear_number}".encode(),

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have clarified better: I meant clear_number on the DagRun Runtime model feels odd i.e. it isn't required for it.

Since logical_date can now be None too based on the link below, I think, you might need to refactor the logic for generating DR uuid.

https://lists.apache.org/thread/cknldkl9pmmzr1q7ot67wborzznlwrtv

Copy link
Member

@uranusjr uranusjr Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually (in OpenLineage) if you clear and rerun a DAG run, the two runs (before and after the clear) are treated as entirely different objects. I believe this is kind of what we want to do in Airflow in the long run (similar to how we added TaskInstanceHistory), but before that happens, OL needs clear_number to distinguish logically different runs that reuse the same DR row and have the exact same identity otherwise (run_id, and even the UUID pk).

Copy link
Member

@kaxil kaxil Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong -- but the (dag_id, clear_number & logical_date) won't be unique anymore in AF 3.0 -- since logical_date would accept Null values, no? @uranusjr

@staticmethod
def build_dag_run_id(dag_id: str, logical_date: datetime, clear_number: int) -> str:
return str(
generate_static_uuid(
instant=logical_date,
data=f"{conf.namespace()}.{dag_id}.{clear_number}".encode(),
)
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this needs to change - but I would leave that for followup PR after the actual change in Airflow behavior will get merged #45732

@mobuchowski mobuchowski force-pushed the tasksdk-call-listeners branch 2 times, most recently from afc2ca4 to 1e04b32 Compare January 20, 2025 15:58
@mobuchowski mobuchowski force-pushed the tasksdk-call-listeners branch 8 times, most recently from c2e2165 to 305fff0 Compare January 24, 2025 12:21
@mobuchowski mobuchowski force-pushed the tasksdk-call-listeners branch 7 times, most recently from f6625b4 to 3dc5ae4 Compare February 4, 2025 00:31
… TaskSDK, make OpenLineage provider support Airflow 3's listener interface

Signed-off-by: Maciej Obuchowski <[email protected]>
@mobuchowski mobuchowski force-pushed the tasksdk-call-listeners branch from 3dc5ae4 to 39172c3 Compare February 4, 2025 09:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Executors-core LocalExecutor & SequentialExecutor area:providers area:Scheduler including HA (high availability) scheduler area:task-sdk provider:openlineage AIP-53
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Ensure Task Instance Listeners work with Task SDK
6 participants