Skip to content

[v3-0-test] Add back missing [sources] link in generated documentation's includes (#49978) #50053

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,12 @@ function environment_initialization() {

cd "${AIRFLOW_SOURCES}"

# Temporarily add /opt/airflow/providers/standard/tests to PYTHONPATH in order to see example dags
# in the UI when testing in Breeze. This might be solved differently in the future
if [[ -d /opt/airflow/providers/standard/tests ]]; then
export PYTHONPATH=${PYTHONPATH=}:/opt/airflow/providers/standard/tests
fi

if [[ ${START_AIRFLOW:="false"} == "true" || ${START_AIRFLOW} == "True" ]]; then
export AIRFLOW__CORE__LOAD_EXAMPLES=${LOAD_EXAMPLES}
wait_for_asset_compilation
Expand Down
6 changes: 3 additions & 3 deletions airflow-core/docs/tutorial/taskflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ system-level packages. TaskFlow supports multiple execution environments to isol
Creates a temporary virtualenv at task runtime. Great for experimental or dynamic tasks, but may have cold start
overhead.

.. exampleinclude:: /../src/airflow/example_dags/example_python_decorator.py
.. exampleinclude:: /../../providers/standard/tests/system/standard/example_python_decorator.py
:language: python
:dedent: 4
:start-after: [START howto_operator_python_venv]
Expand All @@ -283,7 +283,7 @@ overhead.

Executes the task using a pre-installed Python interpreter — ideal for consistent environments or shared virtualenvs.

.. exampleinclude:: /../src/airflow/example_dags/example_python_decorator.py
.. exampleinclude:: /../../providers/standard/tests/system/standard/example_python_decorator.py
:language: python
:dedent: 4
:start-after: [START howto_operator_external_python]
Expand Down Expand Up @@ -333,7 +333,7 @@ Using Sensors
Use ``@task.sensor`` to build lightweight, reusable sensors using Python functions. These support both poke and reschedule
modes.

.. exampleinclude:: /../src/airflow/example_dags/example_sensor_decorator.py
.. exampleinclude:: /../../providers/standard/tests/system/standard/example_sensor_decorator.py
:language: python
:start-after: [START tutorial]
:end-before: [END tutorial]
Expand Down
26 changes: 26 additions & 0 deletions airflow-core/src/airflow/dag_processing/bundles/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import os
from typing import TYPE_CHECKING

from airflow.configuration import conf
Expand All @@ -34,6 +35,7 @@
from airflow.dag_processing.bundles.base import BaseDagBundle

_example_dag_bundle_name = "example_dags"
_example_standard_dag_bundle_name = "example_standard_dags"


def _bundle_item_exc(msg):
Expand Down Expand Up @@ -80,6 +82,25 @@ def _add_example_dag_bundle(config_list):
)


def _add_example_standard_dag_bundle(config_list):
# TODO(potiuk): make it more generic - for now we only add standard example_dags if they are locally available
try:
from system import standard
except ImportError:
return

example_dag_folder = next(iter(standard.__path__))
config_list.append(
{
"name": _example_standard_dag_bundle_name,
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {
"path": example_dag_folder,
},
}
)


class DagBundlesManager(LoggingMixin):
"""Manager for DAG bundles."""

Expand Down Expand Up @@ -112,6 +133,11 @@ def parse_config(self) -> None:
_validate_bundle_config(config_list)
if conf.getboolean("core", "LOAD_EXAMPLES"):
_add_example_dag_bundle(config_list)
if (
os.environ.get("BREEZE", "").lower() == "true"
or os.environ.get("_IN_UNIT_TESTS", "").lower() == "true"
):
_add_example_standard_dag_bundle(config_list)

for cfg in config_list:
name = cfg["name"]
Expand Down
8 changes: 8 additions & 0 deletions airflow-core/src/airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,14 @@ def collect_dags(
example_dag_folder = next(iter(example_dags.__path__))

files_to_parse.extend(list_py_file_paths(example_dag_folder, safe_mode=safe_mode))
try:
from system import standard

example_dag_folder_standard = next(iter(standard.__path__))
files_to_parse.extend(list_py_file_paths(example_dag_folder_standard, safe_mode=safe_mode))
except ImportError:
# Nothing happens - this should only work during tests
pass

for filepath in files_to_parse:
try:
Expand Down
1 change: 0 additions & 1 deletion airflow-core/src/airflow/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ def get_dag(bundle_names: list | None, dag_id: str, from_db: bool = False) -> DA

bundle_names = bundle_names or []
dag: DAG | None = None

if from_db:
dagbag = DagBag(read_dags_from_db=True)
dag = dagbag.get_dag(dag_id) # get_dag loads from the DB as requested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@

pytestmark = pytest.mark.db_test

EXAMPLE_DAG_FILE = AIRFLOW_CORE_SOURCES_PATH / "airflow" / "example_dags" / "example_bash_operator.py"
TEST_DAG_ID = "example_bash_operator"
EXAMPLE_DAG_FILE = AIRFLOW_CORE_SOURCES_PATH / "airflow" / "example_dags" / "example_simplest_dag.py"
TEST_DAG_ID = "example_simplest_dag"
NOT_READABLE_DAG_ID = "latest_only_with_trigger"
TEST_MULTIPLE_DAGS_ID = "asset_produces_1"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from airflow.utils.file import list_py_file_paths

import system.standard
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_dags, parse_and_sync_to_db

Expand All @@ -37,8 +38,10 @@
def get_corresponding_dag_file_count(dir: str, include_examples: bool = True) -> int:
from airflow import example_dags

return len(list_py_file_paths(directory=dir)) + (
len(list_py_file_paths(next(iter(example_dags.__path__)))) if include_examples else 0
return (
len(list_py_file_paths(directory=dir))
+ (len(list_py_file_paths(next(iter(example_dags.__path__)))) if include_examples else 0)
+ (len(list_py_file_paths(next(iter(system.standard.__path__)))) if include_examples else 0)
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@

# Example bash operator located here: airflow/example_dags/example_bash_operator.py
EXAMPLE_DAG_FILE = (
AIRFLOW_REPO_ROOT_PATH / "airflow-core" / "src" / "airflow" / "example_dags" / "example_bash_operator.py"
AIRFLOW_REPO_ROOT_PATH / "airflow-core" / "src" / "airflow" / "example_dags" / "example_simplest_dag.py"
)
TEST_DAG_ID = "example_bash_operator"
TEST_DAG_ID = "example_simplest_dag"


@pytest.fixture
Expand Down
5 changes: 3 additions & 2 deletions airflow-core/tests/unit/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_runs, parse_and_sync_to_db

pytestmark = pytest.mark.db_test
Expand Down Expand Up @@ -74,7 +75,6 @@ def move_back(old_path, new_path):
shutil.move(new_path, old_path)


# TODO: Check if tests needs side effects - locally there's missing DAG
class TestCliTasks:
run_id = "TEST_RUN_ID"
dag_id = "example_python_operator"
Expand All @@ -91,7 +91,7 @@ def setup_class(cls):
cls.parser = cli_parser.get_parser()
clear_db_runs()

cls.dagbag = DagBag(read_dags_from_db=True)
cls.dagbag = DagBag(read_dags_from_db=True, include_examples=True)
cls.dag = cls.dagbag.get_dag(cls.dag_id)
data_interval = cls.dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE)
cls.dag_run = cls.dag.create_dagrun(
Expand All @@ -108,6 +108,7 @@ def setup_class(cls):
def teardown_class(cls) -> None:
clear_db_runs()

@conf_vars({("core", "load_examples"): "true"})
@pytest.mark.execution_timeout(120)
def test_cli_list_tasks(self):
for dag_id in self.dagbag.dags:
Expand Down
8 changes: 4 additions & 4 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from sqlalchemy import func, select, update
from sqlalchemy.orm import joinedload

import airflow.example_dags
from airflow import settings
from airflow.assets.manager import AssetManager
from airflow.callbacks.callback_requests import DagCallbackRequest, TaskCallbackRequest
Expand Down Expand Up @@ -75,6 +74,7 @@
from airflow.utils.thread_safe_dict import ThreadSafeDict
from airflow.utils.types import DagRunTriggeredByType, DagRunType

from system import standard
from tests_common.test_utils.asserts import assert_queries_count
from tests_common.test_utils.config import conf_vars, env_vars
from tests_common.test_utils.db import (
Expand Down Expand Up @@ -104,7 +104,7 @@
ELASTIC_DAG_FILE = os.path.join(PERF_DAGS_FOLDER, "elastic_dag.py")

TEST_DAG_FOLDER = os.environ["AIRFLOW__CORE__DAGS_FOLDER"]
EXAMPLE_DAGS_FOLDER = airflow.example_dags.__path__[0]
EXAMPLE_STANDARD_DAGS_FOLDER = standard.__path__[0]
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
DEFAULT_LOGICAL_DATE = timezone.coerce_datetime(DEFAULT_DATE)
TRY_NUMBER = 1
Expand Down Expand Up @@ -5707,7 +5707,7 @@ def test_find_and_purge_task_instances_without_heartbeats_nothing(self):

@pytest.mark.usefixtures("testing_dag_bundle")
def test_find_and_purge_task_instances_without_heartbeats(self, session, create_dagrun):
dagfile = os.path.join(EXAMPLE_DAGS_FOLDER, "example_branch_operator.py")
dagfile = os.path.join(EXAMPLE_STANDARD_DAGS_FOLDER, "example_branch_operator.py")
dagbag = DagBag(dagfile)
dag = dagbag.get_dag("example_branch_operator")
dm = LazyDeserializedDAG(data=SerializedDAG.to_dict(dag))
Expand Down Expand Up @@ -5773,7 +5773,7 @@ def test_task_instance_heartbeat_timeout_message(self, session, create_dagrun):
"""
Check that the task instance heartbeat timeout message comes out as expected
"""
dagfile = os.path.join(EXAMPLE_DAGS_FOLDER, "example_branch_operator.py")
dagfile = os.path.join(EXAMPLE_STANDARD_DAGS_FOLDER, "example_branch_operator.py")
dagbag = DagBag(dagfile)
dag = dagbag.get_dag("example_branch_operator")
dm = LazyDeserializedDAG(data=SerializedDAG.to_dict(dag))
Expand Down
16 changes: 13 additions & 3 deletions airflow-core/tests/unit/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils import timezone as tz
from airflow.utils.session import create_session
from scripts.ci.pre_commit.common_precommit_utils import AIRFLOW_ROOT_PATH

from tests_common.test_utils import db
from tests_common.test_utils.asserts import assert_queries_count
Expand All @@ -52,6 +53,12 @@
pytestmark = pytest.mark.db_test

example_dags_folder = pathlib.Path(airflow.example_dags.__path__[0]) # type: ignore[attr-defined]
try:
import system.standard

example_standard_dags_folder = pathlib.Path(system.standard.__path__[0]) # type: ignore[attr-defined]
except ImportError:
example_standard_dags_folder = pathlib.Path(airflow.example_dags.__path__[0]) # type: ignore[attr-defined]

PY311 = sys.version_info >= (3, 11)

Expand Down Expand Up @@ -359,13 +366,16 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
("file_to_load", "expected"),
(
pytest.param(
pathlib.Path(example_dags_folder) / "example_bash_operator.py",
{"example_bash_operator": "airflow/example_dags/example_bash_operator.py"},
pathlib.Path(example_standard_dags_folder) / "example_bash_operator.py",
{
"example_bash_operator": f"{example_standard_dags_folder.relative_to(AIRFLOW_ROOT_PATH) / 'example_bash_operator.py'}"
},
id="example_bash_operator",
),
),
)
def test_get_dag_registration(self, file_to_load, expected):
pytest.importorskip("system.standard")
dagbag = DagBag(dag_folder=os.devnull, include_examples=False)
dagbag.process_file(os.fspath(file_to_load))
for dag_id, path in expected.items():
Expand Down Expand Up @@ -420,7 +430,7 @@ def test_refresh_py_dag(self, mock_dagmodel, tmp_path):
Test that we can refresh an ordinary .py DAG
"""
dag_id = "example_bash_operator"
fileloc = str(example_dags_folder / "example_bash_operator.py")
fileloc = str(example_standard_dags_folder / "example_bash_operator.py")

mock_dagmodel.return_value = DagModel()
mock_dagmodel.return_value.last_expired = datetime.max.replace(tzinfo=timezone.utc)
Expand Down
2 changes: 1 addition & 1 deletion clients/python/test_python_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@

# Make sure in the [core] section, the `load_examples` config is set to True in your airflow.cfg
# or AIRFLOW__CORE__LOAD_EXAMPLES environment variable set to True
DAG_ID = "example_bash_operator"
DAG_ID = "example_simplest_dag"


# Enter a context with an instance of the API client
Expand Down
9 changes: 1 addition & 8 deletions devel-common/src/docs/provider_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,8 @@
"operators/_partials",
"_api/airflow/index.rst",
"_api/airflow/providers/index.rst",
"_api/airflow/providers/apache/index.rst",
"_api/airflow/providers/atlassian/index.rst",
"_api/airflow/providers/cncf/index.rst",
"_api/airflow/providers/common/index.rst",
"_api/airflow/providers/common/messaging/providers/base_provider/index.rst",
"_api/airflow/providers/common/messaging/providers/sqs/index.rst",
"_api/airflow/providers/dbt/index.rst",
"_api/airflow/providers/microsoft/index.rst",
"_api/docs/conf",
*[f"_api/airflow/providers/{subpackage}/index.rst" for subpackage in empty_subpackages],
*[f"_api/system/{subpackage}/index.rst" for subpackage in empty_subpackages],
*[f"_api/tests/system/{subpackage}/index.rst" for subpackage in empty_subpackages],
]
Expand Down
11 changes: 11 additions & 0 deletions devel-common/src/sphinx_exts/docs_build/docs_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ def _src_dir(self) -> Path:
console.print(f"[red]Unknown package name: {self.package_name}")
sys.exit(1)

@property
def pythonpath(self) -> list[Path]:
path = []
if (self._src_dir.parent / "tests").exists():
path.append(self._src_dir.parent.joinpath("tests").resolve())
return path

@property
def _generated_api_dir(self) -> Path:
return self._build_dir.resolve() / "_api"
Expand Down Expand Up @@ -167,6 +174,8 @@ def check_spelling(self, verbose: bool) -> tuple[list[SpellingError], list[DocBu
console.print("[yellow]Command to run:[/] ", " ".join([shlex.quote(arg) for arg in build_cmd]))
env = os.environ.copy()
env["AIRFLOW_PACKAGE_NAME"] = self.package_name
if self.pythonpath:
env["PYTHONPATH"] = ":".join([path.as_posix() for path in self.pythonpath])
if verbose:
console.print(
f"[bright_blue]{self.package_name:60}:[/] The output is hidden until an error occurs."
Expand Down Expand Up @@ -246,6 +255,8 @@ def build_sphinx_docs(self, verbose: bool) -> list[DocBuildError]:
console.print("[yellow]Command to run:[/] ", " ".join([shlex.quote(arg) for arg in build_cmd]))
env = os.environ.copy()
env["AIRFLOW_PACKAGE_NAME"] = self.package_name
if self.pythonpath:
env["PYTHONPATH"] = ":".join([path.as_posix() for path in self.pythonpath])
if verbose:
console.print(
f"[bright_blue]{self.package_name:60}:[/] Running sphinx. "
Expand Down
Loading