Skip to content

Commit

Permalink
Make AssetDefinition subclass Asset
Browse files Browse the repository at this point in the history
This seems to be the best way for 'schedule' dependencies to work. Still
not entirely sure; we'll revisit this.
  • Loading branch information
uranusjr authored and Lee-W committed Oct 22, 2024
1 parent bbb0cf5 commit 91fb531
Showing 1 changed file with 4 additions and 7 deletions.
11 changes: 4 additions & 7 deletions airflow/decorators/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,14 @@ def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]:


@attrs.define(kw_only=True)
class AssetDefinition:
class AssetDefinition(Asset):
"""
Asset representation from decorating a function with ``@asset``.
:meta private:
"""

name: str # TODO: This should be stored on Asset.
asset: Asset
function: types.FunctionType
schedule: ScheduleArg

Expand All @@ -82,7 +81,7 @@ def __attrs_post_init__(self) -> None:
task_id="__main__",
# TODO: This should use the name argument instead.
inlets=[Asset(uri=k) for k in parameters if k not in ("self", "context")],
outlets=[self.asset],
outlets=[self],
python_callable=self.function,
definition_name=self.name,
)
Expand All @@ -107,10 +106,8 @@ def __call__(self, f: types.FunctionType) -> AssetDefinition:
raise ValueError(f"prohibited name for asset: {name}")
return AssetDefinition(
name=name,
asset=Asset(
uri=name if self.uri is None else str(self.uri),
extra=self.extra,
),
uri=name if self.uri is None else str(self.uri),
extra=self.extra,
function=f,
schedule=self.schedule,
)

0 comments on commit 91fb531

Please sign in to comment.