Skip to content

Commit

Permalink
feat(task_sdk): Move asset from Airflow core to task_sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Nov 7, 2024
1 parent a51487d commit de35f1e
Show file tree
Hide file tree
Showing 66 changed files with 717 additions and 658 deletions.
4 changes: 2 additions & 2 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@
"version": (".version", "", False),
# Deprecated lazy imports
"AirflowException": (".exceptions", "AirflowException", True),
"Dataset": (".assets", "Dataset", True),
"Dataset": (".sdk.definitions.asset", "Dataset", True),
}
if TYPE_CHECKING:
# These objects are imported by PEP-562, however, static analyzers and IDE's
# have no idea about typing of these objects.
# Add it under TYPE_CHECKING block should help with it.
from airflow.assets import Asset, Dataset
from airflow.models.dag import DAG
from airflow.models.xcom_arg import XComArg
from airflow.sdk.definitions.asset import Asset, Dataset


def __getattr__(name: str):
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
queued_event_collection_schema,
queued_event_schema,
)
from airflow.assets import Asset
from airflow.assets.manager import asset_manager
from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
from airflow.sdk.definitions.asset import Asset
from airflow.utils import timezone
from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
@functools.lru_cache
def initialize_method_map() -> dict[str, Callable]:
from airflow.api.common.trigger_dag import trigger_dag
from airflow.assets import expand_alias_to_assets
from airflow.assets.manager import AssetManager
from airflow.cli.commands.task_command import _get_ti_db_access
from airflow.dag_processing.manager import DagFileProcessorManager
Expand All @@ -76,6 +75,7 @@ def initialize_method_map() -> dict[str, Callable]:
_update_rtif,
_xcom_pull,
)
from airflow.sdk.definitions.asset import expand_alias_to_assets
from airflow.secrets.metastore import MetastoreBackend
from airflow.utils.cli_action_loggers import _default_action_log_internal
from airflow.utils.log.file_task_handler import FileTaskHandler
Expand Down
Loading

0 comments on commit de35f1e

Please sign in to comment.