Skip to content

Commit

Permalink
feat(task_sdk): Move assets.metadata to task_sdk.definitions.asset
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Nov 7, 2024
1 parent de35f1e commit 5c8d6e2
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 55 deletions.
46 changes: 0 additions & 46 deletions airflow/assets/metadata.py

This file was deleted.

3 changes: 1 addition & 2 deletions airflow/example_dags/example_outlet_event_extra.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk.definitions.asset import Asset
from airflow.sdk.definitions.asset.metadata import Metadata
from airflow.sdk.definitions.asset import Asset, Metadata

ds = Asset("s3://output/1.txt")

Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/operator_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def __init__(
def run(self, *args, **kwargs) -> Any:
import inspect

from airflow.sdk.definitions.asset.metadata import Metadata
from airflow.sdk.definitions.asset import Metadata
from airflow.utils.types import NOTSET

if not inspect.isgeneratorfunction(self.func):
Expand Down
4 changes: 2 additions & 2 deletions docs/apache-airflow/authoring-and-scheduling/datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ The easiest way to attach extra information to the asset event is by ``yield``-i
.. code-block:: python
from airflow.sdk.definitions.asset import Asset
from airflow.sdk.definitions.asset.metadata import Metadata
from airflow.sdk.definitions.asset import Metadata
example_s3_asset = Asset("s3://asset/example.csv")
Expand Down Expand Up @@ -452,7 +452,7 @@ The following example creates an asset event against the S3 URI ``f"s3://bucket/

.. code-block:: python
from airflow.sdk.definitions.asset.metadata import Metadata
from airflow.sdk.definitions.asset import Metadata
@task(outlets=[AssetAlias("my-task-outputs")])
Expand Down
20 changes: 20 additions & 0 deletions task_sdk/src/airflow/sdk/definitions/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from typing import TYPE_CHECKING, Any, Callable, ClassVar, cast, overload

import attr
import attrs
from sqlalchemy import select

from airflow.api_internal.internal_api_call import internal_api_call
Expand Down Expand Up @@ -528,3 +529,22 @@ def as_expression(self) -> Any:
:meta private:
"""
return {"all": [o.as_expression() for o in self.objects]}


@attrs.define(init=False)
class Metadata:
"""Metadata to attach to an AssetEvent."""

uri: str
extra: dict[str, Any]
alias_name: str | None = None

def __init__(
self, target: str | Asset, extra: dict[str, Any], alias: AssetAlias | str | None = None
) -> None:
self.uri = extract_event_key(target)
self.extra = extra
if isinstance(alias, AssetAlias):
self.alias_name = alias.name
else:
self.alias_name = alias
6 changes: 2 additions & 4 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2472,8 +2472,7 @@ def write(*, outlet_events):

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_outlet_asset_extra_yield(self, dag_maker, session):
from airflow.sdk.definitions.asset import Asset
from airflow.sdk.definitions.asset.metadata import Metadata
from airflow.sdk.definitions.asset import Asset, Metadata

with dag_maker(schedule=None, session=session) as dag:

Expand Down Expand Up @@ -2645,8 +2644,7 @@ def producer(*, outlet_events):

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_outlet_asset_alias_through_metadata(self, dag_maker, session):
from airflow.sdk.definitions.asset import AssetAlias
from airflow.sdk.definitions.asset.metadata import Metadata
from airflow.sdk.definitions.asset import AssetAlias, Metadata

asset_uri = "test_outlet_asset_alias_through_metadata_ds"
asset_alias_name = "test_outlet_asset_alias_through_metadata_asset_alias"
Expand Down

0 comments on commit 5c8d6e2

Please sign in to comment.