Skip to content

Commit 0b341e6

Browse files
authored
Move listeners module to shared library for client server separation (#59883)
Extract the listeners infrastructure to `shared/listeners/` library to eliminate cross dependencies between airflow-core and task-sdk. - ListenerManager and hookimpl marker now in shared library - Hook specs split by callers: - shared: lifecycle, taskinstance (called from both sdk and core) - core: dagrun, asset, importerrors (called only from core) - sdk registers only specs it actually uses (lifecycle, taskinstance) - core registers all specs for full listener support
1 parent 9cab6fb commit 0b341e6

File tree

36 files changed

+616
-202
lines changed

36 files changed

+616
-202
lines changed

airflow-core/pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ dependencies = [
161161
# Start of shared configuration dependencies
162162
"pyyaml>=6.0.3",
163163
# End of shared configuration dependencies
164+
# Start of shared listeners dependencies
165+
"pluggy>=1.5.0",
166+
# End of shared listeners dependencies
164167
]
165168

166169

@@ -235,6 +238,7 @@ exclude = [
235238
"../shared/secrets_backend/src/airflow_shared/secrets_backend" = "src/airflow/_shared/secrets_backend"
236239
"../shared/secrets_masker/src/airflow_shared/secrets_masker" = "src/airflow/_shared/secrets_masker"
237240
"../shared/timezones/src/airflow_shared/timezones" = "src/airflow/_shared/timezones"
241+
"../shared/listeners/src/airflow_shared/listeners" = "src/airflow/_shared/listeners"
238242
"../shared/plugins_manager/src/airflow_shared/plugins_manager" = "src/airflow/_shared/plugins_manager"
239243

240244
[tool.hatch.build.targets.custom]
@@ -305,6 +309,7 @@ apache-airflow-devel-common = { workspace = true }
305309
shared_distributions = [
306310
"apache-airflow-shared-configuration",
307311
"apache-airflow-shared-dagnode",
312+
"apache-airflow-shared-listeners",
308313
"apache-airflow-shared-logging",
309314
"apache-airflow-shared-module-loading",
310315
"apache-airflow-shared-observability",
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../../../shared/listeners/src/airflow_shared/listeners

airflow-core/src/airflow/listeners/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@
1717
# under the License.
1818
from __future__ import annotations
1919

20-
from pluggy import HookimplMarker
20+
from airflow._shared.listeners import hookimpl
2121

22-
hookimpl = HookimplMarker("airflow")
22+
__all__ = ["hookimpl"]

airflow-core/src/airflow/listeners/listener.py

Lines changed: 23 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -17,72 +17,36 @@
1717
# under the License.
1818
from __future__ import annotations
1919

20-
import logging
2120
from functools import cache
22-
from typing import TYPE_CHECKING
23-
24-
import pluggy
2521

22+
from airflow._shared.listeners.listener import ListenerManager
23+
from airflow._shared.listeners.spec import lifecycle, taskinstance
24+
from airflow.listeners.spec import asset, dagrun, importerrors
2625
from airflow.plugins_manager import integrate_listener_plugins
2726

28-
if TYPE_CHECKING:
29-
from pluggy._hooks import _HookRelay
30-
31-
log = logging.getLogger(__name__)
32-
33-
34-
def _before_hookcall(hook_name, hook_impls, kwargs):
35-
log.debug("Calling %r with %r", hook_name, kwargs)
36-
log.debug("Hook impls: %s", hook_impls)
37-
38-
39-
def _after_hookcall(outcome, hook_name, hook_impls, kwargs):
40-
log.debug("Result from %r: %s", hook_name, outcome.get_result())
41-
42-
43-
class ListenerManager:
44-
"""Manage listener registration and provides hook property for calling them."""
45-
46-
def __init__(self):
47-
from airflow.listeners.spec import (
48-
asset,
49-
dagrun,
50-
importerrors,
51-
lifecycle,
52-
taskinstance,
53-
)
54-
55-
self.pm = pluggy.PluginManager("airflow")
56-
self.pm.add_hookcall_monitoring(_before_hookcall, _after_hookcall)
57-
self.pm.add_hookspecs(lifecycle)
58-
self.pm.add_hookspecs(dagrun)
59-
self.pm.add_hookspecs(asset)
60-
self.pm.add_hookspecs(taskinstance)
61-
self.pm.add_hookspecs(importerrors)
62-
63-
@property
64-
def has_listeners(self) -> bool:
65-
return bool(self.pm.get_plugins())
66-
67-
@property
68-
def hook(self) -> _HookRelay:
69-
"""Return hook, on which plugin methods specified in spec can be called."""
70-
return self.pm.hook
71-
72-
def add_listener(self, listener):
73-
if self.pm.is_registered(listener):
74-
return
75-
self.pm.register(listener)
76-
77-
def clear(self):
78-
"""Remove registered plugins."""
79-
for plugin in self.pm.get_plugins():
80-
self.pm.unregister(plugin)
81-
8227

8328
@cache
8429
def get_listener_manager() -> ListenerManager:
85-
"""Get singleton listener manager."""
30+
"""
31+
Get a listener manager for Airflow core.
32+
33+
Registers the following listeners:
34+
- lifecycle: on_starting, before_stopping
35+
- dagrun: on_dag_run_running, on_dag_run_success, on_dag_run_failed
36+
- taskinstance: on_task_instance_running, on_task_instance_success, etc.
37+
- asset: on_asset_created, on_asset_changed, etc.
38+
- importerrors: on_new_dag_import_error, on_existing_dag_import_error
39+
"""
8640
_listener_manager = ListenerManager()
41+
42+
_listener_manager.add_hookspecs(lifecycle)
43+
_listener_manager.add_hookspecs(dagrun)
44+
_listener_manager.add_hookspecs(taskinstance)
45+
_listener_manager.add_hookspecs(asset)
46+
_listener_manager.add_hookspecs(importerrors)
47+
8748
integrate_listener_plugins(_listener_manager)
8849
return _listener_manager
50+
51+
52+
__all__ = ["get_listener_manager", "ListenerManager"]

airflow-core/src/airflow/listeners/spec/asset.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# KIND, either express or implied. See the License for the
1616
# specific language governing permissions and limitations
1717
# under the License.
18+
1819
from __future__ import annotations
1920

2021
from typing import TYPE_CHECKING

airflow-core/src/airflow/listeners/spec/importerrors.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# KIND, either express or implied. See the License for the
1616
# specific language governing permissions and limitations
1717
# under the License.
18+
1819
from __future__ import annotations
1920

2021
from pluggy import HookspecMarker

airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727

2828
from airflow._shared.timezones import timezone
2929
from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse
30-
from airflow.listeners.listener import get_listener_manager
3130
from airflow.models import DagModel, DagRun, Log
3231
from airflow.models.asset import AssetEvent, AssetModel
3332
from airflow.providers.standard.operators.empty import EmptyOperator
@@ -1285,12 +1284,6 @@ def test_patch_dag_run_bad_request(self, test_client):
12851284
body = response.json()
12861285
assert body["detail"][0]["msg"] == "Input should be 'queued', 'success' or 'failed'"
12871286

1288-
@pytest.fixture(autouse=True)
1289-
def clean_listener_manager(self):
1290-
get_listener_manager().clear()
1291-
yield
1292-
get_listener_manager().clear()
1293-
12941287
@pytest.mark.parametrize(
12951288
("state", "listener_state"),
12961289
[
@@ -1300,11 +1293,11 @@ def clean_listener_manager(self):
13001293
],
13011294
)
13021295
@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
1303-
def test_patch_dag_run_notifies_listeners(self, test_client, state, listener_state):
1296+
def test_patch_dag_run_notifies_listeners(self, test_client, state, listener_state, listener_manager):
13041297
from unit.listeners.class_listener import ClassBasedListener
13051298

13061299
listener = ClassBasedListener()
1307-
get_listener_manager().add_listener(listener)
1300+
listener_manager(listener)
13081301
response = test_client.patch(f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}", json={"state": state})
13091302
assert response.status_code == 200
13101303
assert listener.state == listener_state

airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
3434
from airflow.jobs.job import Job
3535
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
36-
from airflow.listeners.listener import get_listener_manager
3736
from airflow.models import DagRun, Log, TaskInstance
3837
from airflow.models.dag_version import DagVersion
3938
from airflow.models.hitl import HITLDetail
@@ -4084,12 +4083,6 @@ class TestPatchTaskInstance(TestTaskInstanceEndpoint):
40844083
TASK_ID = "print_the_context"
40854084
RUN_ID = "TEST_DAG_RUN_ID"
40864085

4087-
@pytest.fixture(autouse=True)
4088-
def clean_listener_manager(self):
4089-
get_listener_manager().clear()
4090-
yield
4091-
get_listener_manager().clear()
4092-
40934086
@pytest.mark.parametrize(
40944087
("state", "listener_state"),
40954088
[
@@ -4098,13 +4091,15 @@ def clean_listener_manager(self):
40984091
("skipped", []),
40994092
],
41004093
)
4101-
def test_patch_task_instance_notifies_listeners(self, test_client, session, state, listener_state):
4094+
def test_patch_task_instance_notifies_listeners(
4095+
self, test_client, session, state, listener_state, listener_manager
4096+
):
41024097
from unit.listeners.class_listener import ClassBasedListener
41034098

41044099
self.create_task_instances(session)
41054100

41064101
listener = ClassBasedListener()
4107-
get_listener_manager().add_listener(listener)
4102+
listener_manager(listener)
41084103
test_client.patch(
41094104
self.ENDPOINT_URL,
41104105
json={

airflow-core/tests/unit/assets/test_manager.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
from airflow import settings
3232
from airflow.assets.manager import AssetManager
33-
from airflow.listeners.listener import get_listener_manager
3433
from airflow.models.asset import (
3534
AssetAliasModel,
3635
AssetDagRunQueue,
@@ -183,11 +182,11 @@ def test_register_asset_change_no_downstreams(self, session, mock_task_instance)
183182
assert session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 0
184183

185184
def test_register_asset_change_notifies_asset_listener(
186-
self, session, mock_task_instance, testing_dag_bundle
185+
self, session, mock_task_instance, testing_dag_bundle, listener_manager
187186
):
188187
asset_manager = AssetManager()
189188
asset_listener.clear()
190-
get_listener_manager().add_listener(asset_listener)
189+
listener_manager(asset_listener)
191190

192191
bundle_name = "testing"
193192

@@ -207,10 +206,10 @@ def test_register_asset_change_notifies_asset_listener(
207206
assert len(asset_listener.changed) == 1
208207
assert asset_listener.changed[0].uri == asset.uri
209208

210-
def test_create_assets_notifies_asset_listener(self, session):
209+
def test_create_assets_notifies_asset_listener(self, session, listener_manager):
211210
asset_manager = AssetManager()
212211
asset_listener.clear()
213-
get_listener_manager().add_listener(asset_listener)
212+
listener_manager(asset_listener)
214213

215214
asset = Asset(uri="test://asset1", name="test_asset_1")
216215

airflow-core/tests/unit/dag_processing/test_collection.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
update_dag_parsing_results_in_db,
4242
)
4343
from airflow.exceptions import SerializationError
44-
from airflow.listeners.listener import get_listener_manager
4544
from airflow.models import DagModel, DagRun
4645
from airflow.models.asset import (
4746
AssetActive,
@@ -321,12 +320,11 @@ def clean_db(self, session):
321320
clear_db_import_errors()
322321

323322
@pytest.fixture(name="dag_import_error_listener")
324-
def _dag_import_error_listener(self):
323+
def _dag_import_error_listener(self, listener_manager):
325324
from unit.listeners import dag_import_error_listener
326325

327-
get_listener_manager().add_listener(dag_import_error_listener)
326+
listener_manager(dag_import_error_listener)
328327
yield dag_import_error_listener
329-
get_listener_manager().clear()
330328
dag_import_error_listener.clear()
331329

332330
@mark_fab_auth_manager_test

0 commit comments

Comments
 (0)