Skip to content

Commit 43e6c33

Browse files
committed
[v3-0-test] Move SQS message queue to Amazon provider (apache#50057)
The common.messaging abstraction should discover common messaging queue providers using the same mechanism as we have for other core extensions. Previously common.messaging had the optional (but not really) dependencies to other providers, but that was not needed and introduced unnecessary coupling. By switching to our built-in discovery mechanism we get immediately all the niceties of provider discovery mechanisms: * queue is provided by the actual provider where the service or integration already is implemented (sqs -> amazon provider, in the future kafka -> kafka provider) * queues are discovered from installed providers * there is no coupling or imports between common.messaging and the providers that implement messaging, the dependency is in the other way - providers that implement messaging depend on common.messaging * airflow providers queues CLI and providers core extensions documentation is automatically generated (cherry picked from commit 4f962f4) Co-authored-by: Jarek Potiuk <[email protected]>
1 parent 4a18c45 commit 43e6c33

File tree

36 files changed

+490
-88
lines changed

36 files changed

+490
-88
lines changed

airflow-core/docs/authoring-and-scheduling/connections.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,5 @@ Airflow allows to define custom connection types. This is what is described in d
4747
:doc:`apache-airflow-providers:index` - providers give you the capability of defining your own connections.
4848
The connection customization can be done by any provider, but also
4949
many of the providers managed by the community define custom connection types.
50-
The full list of all providers delivered by ``Apache Airflow community managed providers`` can be found in
50+
The full list of all connections delivered by ``Apache Airflow community managed providers`` can be found in
5151
:doc:`apache-airflow-providers:core-extensions/connections`.

airflow-core/docs/core-concepts/index.rst

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ Here you can find detailed documentation about each one of the core concepts of
4343
auth-manager/index
4444
objectstorage
4545
backfill
46+
message-queues
4647

4748
**Communication**
4849

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
or more contributor license agreements. See the NOTICE file
3+
distributed with this work for additional information
4+
regarding copyright ownership. The ASF licenses this file
5+
to you under the Apache License, Version 2.0 (the
6+
"License"); you may not use this file except in compliance
7+
with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
software distributed under the License is distributed on an
13+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
KIND, either express or implied. See the License for the
15+
specific language governing permissions and limitations
16+
under the License.
17+
18+
.. _concepts:message-queues:
19+
20+
Message Queues
21+
==============
22+
23+
The Message Queues are a way to expose capability of external event-driven scheduling of Dags.
24+
25+
Apache Airflow is primarily designed for time-based and dependency-based scheduling of workflows. However,
26+
modern data architectures often require near real-time processing and the ability to react to
27+
events from various sources, such as message queues.
28+
29+
Airflow has native event-driven capability, allowing users to create workflows that can be
30+
triggered by external events, thus enabling more responsive data pipelines.
31+
32+
Airflow supports poll-based event-driven scheduling, where the Triggerer can poll
33+
external message queues using built-in :class:`airflow.triggers.base.BaseTrigger` classes. This allows users
34+
to create workflows that can be triggered by external events, such as messages arriving
35+
in a queue or changes in a database efficiently.
36+
37+
Airflow constantly monitors the state of an external resource and updates the asset whenever the external
38+
resource reaches a given state (if it does reach it). To achieve this, we leverage Airflow Triggers.
39+
Triggers are small, asynchronous pieces of Python code whose job is to poll an external resource state.
40+
41+
The list of supported message queues is available in :doc:`apache-airflow-providers:core-extensions/message-queues`.

airflow-core/src/airflow/cli/cli_config.py

+6
Original file line numberDiff line numberDiff line change
@@ -1590,6 +1590,12 @@ class GroupCommand(NamedTuple):
15901590
func=lazy_load_command("airflow.cli.commands.provider_command.executors_list"),
15911591
args=(ARG_OUTPUT, ARG_VERBOSE),
15921592
),
1593+
ActionCommand(
1594+
name="queues",
1595+
help="Get information about queues provided",
1596+
func=lazy_load_command("airflow.cli.commands.provider_command.queues_list"),
1597+
args=(ARG_OUTPUT, ARG_VERBOSE),
1598+
),
15931599
ActionCommand(
15941600
name="notifications",
15951601
help="Get information about notifications provided",

airflow-core/src/airflow/cli/commands/provider_command.py

+13
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,19 @@ def executors_list(args):
220220
)
221221

222222

223+
@suppress_logs_and_warning
224+
@providers_configuration_loaded
225+
def queues_list(args):
226+
"""List all queues at the command line."""
227+
AirflowConsole().print_as(
228+
data=list(ProvidersManager().queue_class_names),
229+
output=args.output,
230+
mapper=lambda x: {
231+
"queue_class_names": x,
232+
},
233+
)
234+
235+
223236
@suppress_logs_and_warning
224237
@providers_configuration_loaded
225238
def config_list(args):

airflow-core/src/airflow/provider.yaml.schema.json

+12
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,18 @@
467467
}
468468
}
469469
},
470+
"queues": {
471+
"type": "array",
472+
"description": "Message Queues exposed by the provider",
473+
"items": {
474+
"name": {
475+
"type": "string"
476+
},
477+
"message-queue-class": {
478+
"type": "string"
479+
}
480+
}
481+
},
470482
"source-date-epoch": {
471483
"type": "integer",
472484
"description": "Source date epoch - seconds since epoch (gmtime) when the release documentation was prepared. Used to generate reproducible package builds with flint.",

airflow-core/src/airflow/provider_info.schema.json

+12
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,18 @@
416416
"description": "Class to instantiate the plugin"
417417
}
418418
}
419+
},
420+
"queues": {
421+
"type": "array",
422+
"description": "Message Queues exposed by the provider",
423+
"items": {
424+
"name": {
425+
"type": "string"
426+
},
427+
"message-queue-class": {
428+
"type": "string"
429+
}
430+
}
419431
}
420432
},
421433
"definitions": {

airflow-core/src/airflow/providers_manager.py

+22
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ def __init__(self):
416416
self._auth_manager_class_name_set: set[str] = set()
417417
self._secrets_backend_class_name_set: set[str] = set()
418418
self._executor_class_name_set: set[str] = set()
419+
self._queue_class_name_set: set[str] = set()
419420
self._provider_configs: dict[str, dict[str, Any]] = {}
420421
self._trigger_info_set: set[TriggerInfo] = set()
421422
self._notification_info_set: set[NotificationInfo] = set()
@@ -533,6 +534,12 @@ def initialize_providers_executors(self):
533534
self.initialize_providers_list()
534535
self._discover_executors()
535536

537+
@provider_info_cache("queues")
538+
def initialize_providers_queues(self):
539+
"""Lazy initialization of providers queue information."""
540+
self.initialize_providers_list()
541+
self._discover_queues()
542+
536543
@provider_info_cache("notifications")
537544
def initialize_providers_notifications(self):
538545
"""Lazy initialization of providers notifications information."""
@@ -1091,6 +1098,14 @@ def _discover_executors(self) -> None:
10911098
if _correctness_check(provider_package, executors_class_name, provider):
10921099
self._executor_class_name_set.add(executors_class_name)
10931100

1101+
def _discover_queues(self) -> None:
1102+
"""Retrieve all queues defined in the providers."""
1103+
for provider_package, provider in self._provider_dict.items():
1104+
if provider.data.get("queues"):
1105+
for queue_class_name in provider.data["queues"]:
1106+
if _correctness_check(provider_package, queue_class_name, provider):
1107+
self._queue_class_name_set.add(queue_class_name)
1108+
10941109
def _discover_config(self) -> None:
10951110
"""Retrieve all configs defined in the providers."""
10961111
for provider_package, provider in self._provider_dict.items():
@@ -1221,6 +1236,11 @@ def executor_class_names(self) -> list[str]:
12211236
self.initialize_providers_executors()
12221237
return sorted(self._executor_class_name_set)
12231238

1239+
@property
1240+
def queue_class_names(self) -> list[str]:
1241+
self.initialize_providers_queues()
1242+
return sorted(self._queue_class_name_set)
1243+
12241244
@property
12251245
def filesystem_module_names(self) -> list[str]:
12261246
self.initialize_providers_filesystems()
@@ -1268,9 +1288,11 @@ def _cleanup(self):
12681288
self._auth_manager_class_name_set.clear()
12691289
self._secrets_backend_class_name_set.clear()
12701290
self._executor_class_name_set.clear()
1291+
self._queue_class_name_set.clear()
12711292
self._provider_configs.clear()
12721293
self._trigger_info_set.clear()
12731294
self._notification_info_set.clear()
12741295
self._plugins_set.clear()
1296+
12751297
self._initialized = False
12761298
self._initialization_stack_trace = None

devel-common/src/sphinx_exts/operators_and_hooks_ref.py

+12
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,17 @@ def render_content(
481481
)
482482

483483

484+
class QueuesDirective(BaseJinjaReferenceDirective):
485+
"""Generate list of queues"""
486+
487+
def render_content(
488+
self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR
489+
) -> str:
490+
return _common_render_list_content(
491+
header_separator=header_separator, resource_type="queues", template="queues.rst.jinja2"
492+
)
493+
494+
484495
class DeferrableOperatorDirective(BaseJinjaReferenceDirective):
485496
"""Generate list of deferrable operators"""
486497

@@ -521,6 +532,7 @@ def setup(app):
521532
app.add_directive("airflow-extra-links", ExtraLinksDirective)
522533
app.add_directive("airflow-notifications", NotificationsDirective)
523534
app.add_directive("airflow-executors", ExecutorsDirective)
535+
app.add_directive("airflow-queues", QueuesDirective)
524536
app.add_directive("airflow-deferrable-operators", DeferrableOperatorDirective)
525537
app.add_directive("airflow-deprecations", DeprecationsDirective)
526538
app.add_directive("airflow-dataset-schemes", AssetSchemeDirective)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{#
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
#}
19+
{%for provider, provider_dict in items.items() %}
20+
{{ provider_dict['name'] }}
21+
{{ header_separator * (provider_dict['name']|length) }}
22+
23+
{% for queue in provider_dict['queues'] -%}
24+
- :class:`~{{ queue }}`
25+
{% endfor -%}
26+
27+
{% endfor %}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
09c338483f511f25463933efe9540ac5270ad491bfa25de295c94dda2111a4f0
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
or more contributor license agreements. See the NOTICE file
3+
distributed with this work for additional information
4+
regarding copyright ownership. The ASF licenses this file
5+
to you under the Apache License, Version 2.0 (the
6+
"License"); you may not use this file except in compliance
7+
with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
software distributed under the License is distributed on an
13+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
KIND, either express or implied. See the License for the
15+
specific language governing permissions and limitations
16+
under the License.
17+
18+
Message Queues
19+
--------------
20+
21+
This is a summary of all Apache Airflow Community provided implementations of Queues
22+
exposed via community-managed providers.
23+
24+
Airflow can be extended by providers with Queues. Each provider can define their own Queues,
25+
that can be configured to handle executing tasks
26+
27+
The queues are explained in
28+
:doc:`apache-airflow:core-concepts/message-queues` and you can also see those
29+
provided by the community-managed providers:
30+
31+
.. airflow-queues::
32+
:tags: None
33+
:header-separator: "

providers/amazon/docs/index.rst

+2-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@
4242
Logging for Tasks <logging/index>
4343
Configuration <configurations-ref>
4444
Executors <executors/index>
45-
Auth manager <auth-manager/index>
45+
Message Queues <message-queues/index>
46+
AWS Auth manager <auth-manager/index>
4647
CLI <cli-ref>
4748

4849
.. toctree::
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
or more contributor license agreements. See the NOTICE file
3+
distributed with this work for additional information
4+
regarding copyright ownership. The ASF licenses this file
5+
to you under the Apache License, Version 2.0 (the
6+
"License"); you may not use this file except in compliance
7+
with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
software distributed under the License is distributed on an
13+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
KIND, either express or implied. See the License for the
15+
specific language governing permissions and limitations
16+
under the License.
17+
18+
Amazon Messaging Queues
19+
=======================
20+
21+
Amazon SQS Queue Provider
22+
-------------------------
23+
24+
Implemented by :class:`~airflow.providers.amazon.aws.queues.sqs.SqsMessageQueueProvider`
25+
26+
The Amazon SQS Queue Provider is a message queue provider that uses
27+
Amazon Simple Queue Service (SQS) as the underlying message queue system.
28+
It allows you to send and receive messages using SQS queues in your Airflow workflows.
29+
The provider supports both standard and FIFO queues, and it provides features
30+
such as message visibility timeout, message retention period, and dead-letter queues.
31+
32+
The queue must be matching this regex:
33+
34+
.. exampleinclude:: /../src/airflow/providers/amazon/aws/queues/sqs.py
35+
:language: python
36+
:dedent: 0
37+
:start-after: [START queue_regexp]
38+
:end-before: [END queue_regexp]
39+
40+
41+
The queue parameter is passed directly to ``sqs_queue`` parameter of the underlying
42+
:class:`~airflow.providers.amazon.aws.triggers.sqs.SqsSensorTrigger` class, and passes
43+
all the kwargs directly to the trigger constructor if added.

providers/amazon/provider.yaml

+7
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ state: ready
2525
source-date-epoch: 1744788746
2626
# note that those versions are maintained by release manager - do not update them manually
2727
versions:
28+
- 9.7.0
2829
- 9.6.1
2930
- 9.6.0
3031
- 9.5.0
@@ -1183,3 +1184,9 @@ config:
11831184

11841185
executors:
11851186
- airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor
1187+
1188+
auth-managers:
1189+
- airflow.providers.amazon.aws.auth_manager.aws_auth_manager.AwsAuthManager
1190+
1191+
queues:
1192+
- airflow.providers.amazon.aws.queues.sqs.SqsMessageQueueProvider

providers/amazon/pyproject.toml

+7-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi"
2525

2626
[project]
2727
name = "apache-airflow-providers-amazon"
28-
version = "9.6.1"
28+
version = "9.7.0"
2929
description = "Provider package apache-airflow-providers-amazon for Apache Airflow"
3030
readme = "README.rst"
3131
authors = [
@@ -146,6 +146,9 @@ dependencies = [
146146
"standard" = [
147147
"apache-airflow-providers-standard"
148148
]
149+
"common.messaging" = [
150+
"apache-airflow-providers-common-messaging>=1.0.1"
151+
]
149152

150153
[dependency-groups]
151154
dev = [
@@ -155,6 +158,7 @@ dev = [
155158
"apache-airflow-providers-apache-hive",
156159
"apache-airflow-providers-cncf-kubernetes",
157160
"apache-airflow-providers-common-compat",
161+
"apache-airflow-providers-common-messaging",
158162
"apache-airflow-providers-common-sql",
159163
"apache-airflow-providers-exasol",
160164
"apache-airflow-providers-ftp",
@@ -208,8 +212,8 @@ apache-airflow-providers-common-sql = {workspace = true}
208212
apache-airflow-providers-standard = {workspace = true}
209213

210214
[project.urls]
211-
"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.6.1"
212-
"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.6.1/changelog.html"
215+
"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.7.0"
216+
"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.7.0/changelog.html"
213217
"Bug Tracker" = "https://github.com/apache/airflow/issues"
214218
"Source Code" = "https://github.com/apache/airflow"
215219
"Slack Chat" = "https://s.apache.org/airflow-slack"

providers/amazon/src/airflow/providers/amazon/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
__all__ = ["__version__"]
3131

32-
__version__ = "9.6.1"
32+
__version__ = "9.7.0"
3333

3434
if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
3535
"2.9.0"

0 commit comments

Comments
 (0)