Skip to content

Commit 106cf78

Browse files
committed
Move SQS message queue to Amazon provider
The common.messaging abstraction should discover common messaging queue providers using the same mechanism as we have for other core extensions. Previously common.messaging had the optional (but not really) dependencies to other providers, but that was not needed and introduced unnecessary coupling. By switching to our built-in discovery mechanism we get immediately all the niceties of provider discovery mechanisms: * queue is provided by the actual provider where the service or integration already is implemented (sqs -> amazon provider, in the future kafka -> kafka provider) * queues are discovered from installed providers * there is no coupling or imports between common.messaging and the providers that implement messaging, the dependency is in the other way - providers that implement messaging depend on common.messaging * airflow providers queues CLI and providers core extensions documentation is automatically generated
1 parent c37e6eb commit 106cf78

File tree

33 files changed

+389
-58
lines changed

33 files changed

+389
-58
lines changed

airflow-core/docs/authoring-and-scheduling/connections.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,5 @@ Airflow allows to define custom connection types. This is what is described in d
4747
:doc:`apache-airflow-providers:index` - providers give you the capability of defining your own connections.
4848
The connection customization can be done by any provider, but also
4949
many of the providers managed by the community define custom connection types.
50-
The full list of all providers delivered by ``Apache Airflow community managed providers`` can be found in
50+
The full list of all connections delivered by ``Apache Airflow community managed providers`` can be found in
5151
:doc:`apache-airflow-providers:core-extensions/connections`.

airflow-core/docs/core-concepts/index.rst

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ Here you can find detailed documentation about each one of the core concepts of
4343
auth-manager/index
4444
objectstorage
4545
backfill
46+
message-queues
4647

4748
**Communication**
4849

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
or more contributor license agreements. See the NOTICE file
3+
distributed with this work for additional information
4+
regarding copyright ownership. The ASF licenses this file
5+
to you under the Apache License, Version 2.0 (the
6+
"License"); you may not use this file except in compliance
7+
with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
software distributed under the License is distributed on an
13+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
KIND, either express or implied. See the License for the
15+
specific language governing permissions and limitations
16+
under the License.
17+
18+
.. _concepts:message-queues:
19+
20+
Message Queues
21+
==============
22+
23+
The Message Queues are a way to expose capability of external event-driven scheduling of Dags.
24+
25+
Apache Airflow is primarily designed for time-based and dependency-based scheduling of workflows. However,
26+
modern data architectures often require near real-time processing and the ability to react to
27+
events from various sources, such as message queues.
28+
29+
Airflow has native event-driven capability to Airflow, allowing users to create workflows that can be
30+
triggered by external events, thus enabling more responsive data pipelines.
31+
32+
Airflow supports poll-based event-driven scheduling, where the Triggerer can poll
33+
external message queues using built-in :class:`airflow.triggers.base.BaseTrigger` classes. This allows users
34+
to create workflows that can be triggered by external events, such as messages arriving
35+
in a queue or changes in a database efficiently.
36+
37+
Airflow constantly monitors the state of an external resource and updates the asset whenever the external
38+
resource reaches a given state (if it does reach it). To achieve this, we leverage Airflow Triggers.
39+
Triggers are small, asynchronous pieces of Python code whose job is to poll an external resource state.
40+
41+
The list of supported message queues is available in :doc:`apache-airflow-providers:core-extensions/message-queues`.

airflow-core/src/airflow/cli/cli_config.py

+6
Original file line numberDiff line numberDiff line change
@@ -1597,6 +1597,12 @@ class GroupCommand(NamedTuple):
15971597
func=lazy_load_command("airflow.cli.commands.provider_command.executors_list"),
15981598
args=(ARG_OUTPUT, ARG_VERBOSE),
15991599
),
1600+
ActionCommand(
1601+
name="queues",
1602+
help="Get information about queues provided",
1603+
func=lazy_load_command("airflow.cli.commands.provider_command.queues_list"),
1604+
args=(ARG_OUTPUT, ARG_VERBOSE),
1605+
),
16001606
ActionCommand(
16011607
name="notifications",
16021608
help="Get information about notifications provided",

airflow-core/src/airflow/cli/commands/provider_command.py

+13
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,19 @@ def executors_list(args):
220220
)
221221

222222

223+
@suppress_logs_and_warning
224+
@providers_configuration_loaded
225+
def queues_list(args):
226+
"""List all queues at the command line."""
227+
AirflowConsole().print_as(
228+
data=list(ProvidersManager().queue_class_names),
229+
output=args.output,
230+
mapper=lambda x: {
231+
"queue_class_names": x,
232+
},
233+
)
234+
235+
223236
@suppress_logs_and_warning
224237
@providers_configuration_loaded
225238
def config_list(args):

airflow-core/src/airflow/provider.yaml.schema.json

+12
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,18 @@
467467
}
468468
}
469469
},
470+
"queues": {
471+
"type": "array",
472+
"description": "Message Queues exposed by the provider",
473+
"items": {
474+
"name": {
475+
"type": "string"
476+
},
477+
"message-queue-class": {
478+
"type": "string"
479+
}
480+
}
481+
},
470482
"source-date-epoch": {
471483
"type": "integer",
472484
"description": "Source date epoch - seconds since epoch (gmtime) when the release documentation was prepared. Used to generate reproducible package builds with flint.",

airflow-core/src/airflow/provider_info.schema.json

+12
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,18 @@
416416
"description": "Class to instantiate the plugin"
417417
}
418418
}
419+
},
420+
"queues": {
421+
"type": "array",
422+
"description": "Message Queues exposed by the provider",
423+
"items": {
424+
"name": {
425+
"type": "string"
426+
},
427+
"message-queue-class": {
428+
"type": "string"
429+
}
430+
}
419431
}
420432
},
421433
"definitions": {

airflow-core/src/airflow/providers_manager.py

+22
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ def __init__(self):
416416
self._auth_manager_class_name_set: set[str] = set()
417417
self._secrets_backend_class_name_set: set[str] = set()
418418
self._executor_class_name_set: set[str] = set()
419+
self._queue_class_name_set: set[str] = set()
419420
self._provider_configs: dict[str, dict[str, Any]] = {}
420421
self._trigger_info_set: set[TriggerInfo] = set()
421422
self._notification_info_set: set[NotificationInfo] = set()
@@ -533,6 +534,12 @@ def initialize_providers_executors(self):
533534
self.initialize_providers_list()
534535
self._discover_executors()
535536

537+
@provider_info_cache("queues")
538+
def initialize_providers_queues(self):
539+
"""Lazy initialization of providers queue information."""
540+
self.initialize_providers_list()
541+
self._discover_queues()
542+
536543
@provider_info_cache("notifications")
537544
def initialize_providers_notifications(self):
538545
"""Lazy initialization of providers notifications information."""
@@ -1091,6 +1098,14 @@ def _discover_executors(self) -> None:
10911098
if _correctness_check(provider_package, executors_class_name, provider):
10921099
self._executor_class_name_set.add(executors_class_name)
10931100

1101+
def _discover_queues(self) -> None:
1102+
"""Retrieve all queues defined in the providers."""
1103+
for provider_package, provider in self._provider_dict.items():
1104+
if provider.data.get("queues"):
1105+
for queue_class_name in provider.data["queues"]:
1106+
if _correctness_check(provider_package, queue_class_name, provider):
1107+
self._queue_class_name_set.add(queue_class_name)
1108+
10941109
def _discover_config(self) -> None:
10951110
"""Retrieve all configs defined in the providers."""
10961111
for provider_package, provider in self._provider_dict.items():
@@ -1221,6 +1236,11 @@ def executor_class_names(self) -> list[str]:
12211236
self.initialize_providers_executors()
12221237
return sorted(self._executor_class_name_set)
12231238

1239+
@property
1240+
def queue_class_names(self) -> list[str]:
1241+
self.initialize_providers_queues()
1242+
return sorted(self._queue_class_name_set)
1243+
12241244
@property
12251245
def filesystem_module_names(self) -> list[str]:
12261246
self.initialize_providers_filesystems()
@@ -1268,9 +1288,11 @@ def _cleanup(self):
12681288
self._auth_manager_class_name_set.clear()
12691289
self._secrets_backend_class_name_set.clear()
12701290
self._executor_class_name_set.clear()
1291+
self._queue_class_name_set.clear()
12711292
self._provider_configs.clear()
12721293
self._trigger_info_set.clear()
12731294
self._notification_info_set.clear()
12741295
self._plugins_set.clear()
1296+
12751297
self._initialized = False
12761298
self._initialization_stack_trace = None

dev/breeze/src/airflow_breeze/global_constants.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,8 @@ def generate_provider_dependencies_if_needed():
696696
{
697697
"python-version": "3.9",
698698
"airflow-version": "3.0.0",
699-
"remove-providers": "cloudant",
699+
# TODO: bring back common-messaging when we bump airflow to 3.0.1
700+
"remove-providers": "cloudant common.messaging",
700701
"run-tests": "true",
701702
},
702703
]

devel-common/src/sphinx_exts/operators_and_hooks_ref.py

+12
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,17 @@ def render_content(
481481
)
482482

483483

484+
class QueuesDirective(BaseJinjaReferenceDirective):
485+
"""Generate list of queues"""
486+
487+
def render_content(
488+
self, *, tags: set[str] | None, header_separator: str = DEFAULT_HEADER_SEPARATOR
489+
) -> str:
490+
return _common_render_list_content(
491+
header_separator=header_separator, resource_type="queues", template="queues.rst.jinja2"
492+
)
493+
494+
484495
class DeferrableOperatorDirective(BaseJinjaReferenceDirective):
485496
"""Generate list of deferrable operators"""
486497

@@ -521,6 +532,7 @@ def setup(app):
521532
app.add_directive("airflow-extra-links", ExtraLinksDirective)
522533
app.add_directive("airflow-notifications", NotificationsDirective)
523534
app.add_directive("airflow-executors", ExecutorsDirective)
535+
app.add_directive("airflow-queues", QueuesDirective)
524536
app.add_directive("airflow-deferrable-operators", DeferrableOperatorDirective)
525537
app.add_directive("airflow-deprecations", DeprecationsDirective)
526538
app.add_directive("airflow-dataset-schemes", AssetSchemeDirective)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{#
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
#}
19+
{%for provider, provider_dict in items.items() %}
20+
{{ provider_dict['name'] }}
21+
{{ header_separator * (provider_dict['name']|length) }}
22+
23+
{% for queue in provider_dict['queues'] -%}
24+
- :class:`~{{ queue }}`
25+
{% endfor -%}
26+
27+
{% endfor %}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
or more contributor license agreements. See the NOTICE file
3+
distributed with this work for additional information
4+
regarding copyright ownership. The ASF licenses this file
5+
to you under the Apache License, Version 2.0 (the
6+
"License"); you may not use this file except in compliance
7+
with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
software distributed under the License is distributed on an
13+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
KIND, either express or implied. See the License for the
15+
specific language governing permissions and limitations
16+
under the License.
17+
18+
Message Queues
19+
--------------
20+
21+
This is a summary of all Apache Airflow Community provided implementations of Queues
22+
exposed via community-managed providers.
23+
24+
Airflow can be extended by providers with Queues. Each provider can define their own Queues,
25+
that can be configured to handle executing tasks
26+
27+
The queues are explained in
28+
:doc:`apache-airflow:core-concepts/message-queues` and you can also see those
29+
provided by the community-managed providers:
30+
31+
.. airflow-queues::
32+
:tags: None
33+
:header-separator: "

providers/amazon/docs/index.rst

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
Logging for Tasks <logging/index>
4343
Configuration <configurations-ref>
4444
Executors <executors/index>
45+
Message Queues <message-queues/index>
4546
AWS Auth manager <auth-manager/index>
4647
CLI <cli-ref>
4748

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
or more contributor license agreements. See the NOTICE file
3+
distributed with this work for additional information
4+
regarding copyright ownership. The ASF licenses this file
5+
to you under the Apache License, Version 2.0 (the
6+
"License"); you may not use this file except in compliance
7+
with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
software distributed under the License is distributed on an
13+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
KIND, either express or implied. See the License for the
15+
specific language governing permissions and limitations
16+
under the License.
17+
18+
19+
20+
Amazon Messaging Queues
21+
=======================
22+
23+
24+
* Amazon SQS Queue Provider :class:`~airflow.providers.amazon.aws.queues.sqs.SqsMessageQueueProvider`

providers/amazon/provider.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ state: ready
2525
source-date-epoch: 1744788746
2626
# note that those versions are maintained by release manager - do not update them manually
2727
versions:
28+
- 9.7.0
2829
- 9.6.1
2930
- 9.6.0
3031
- 9.5.0
@@ -1178,3 +1179,6 @@ executors:
11781179

11791180
auth-managers:
11801181
- airflow.providers.amazon.aws.auth_manager.aws_auth_manager.AwsAuthManager
1182+
1183+
queues:
1184+
- airflow.providers.amazon.aws.queues.sqs.SqsMessageQueueProvider

providers/amazon/pyproject.toml

+7-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi"
2525

2626
[project]
2727
name = "apache-airflow-providers-amazon"
28-
version = "9.6.1"
28+
version = "9.7.0"
2929
description = "Provider package apache-airflow-providers-amazon for Apache Airflow"
3030
readme = "README.rst"
3131
authors = [
@@ -149,6 +149,9 @@ dependencies = [
149149
"standard" = [
150150
"apache-airflow-providers-standard"
151151
]
152+
"common.messaging" = [
153+
"apache-airflow-providers-common-messaging"
154+
]
152155

153156
[dependency-groups]
154157
dev = [
@@ -158,6 +161,7 @@ dev = [
158161
"apache-airflow-providers-apache-hive",
159162
"apache-airflow-providers-cncf-kubernetes",
160163
"apache-airflow-providers-common-compat",
164+
"apache-airflow-providers-common-messaging",
161165
"apache-airflow-providers-common-sql",
162166
"apache-airflow-providers-exasol",
163167
"apache-airflow-providers-ftp",
@@ -211,8 +215,8 @@ apache-airflow-providers-common-sql = {workspace = true}
211215
apache-airflow-providers-standard = {workspace = true}
212216

213217
[project.urls]
214-
"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.6.1"
215-
"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.6.1/changelog.html"
218+
"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.7.0"
219+
"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-amazon/9.7.0/changelog.html"
216220
"Bug Tracker" = "https://github.com/apache/airflow/issues"
217221
"Source Code" = "https://github.com/apache/airflow"
218222
"Slack Chat" = "https://s.apache.org/airflow-slack"

providers/amazon/src/airflow/providers/amazon/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
__all__ = ["__version__"]
3131

32-
__version__ = "9.6.1"
32+
__version__ = "9.7.0"
3333

3434
if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
3535
"2.10.0"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.

0 commit comments

Comments
 (0)