Skip to content

Commit

Permalink
Implement asset definition creating a DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr authored and Lee-W committed Oct 22, 2024
1 parent ae90a8f commit 5c2762f
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 3 deletions.
74 changes: 74 additions & 0 deletions airflow/decorators/assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import types
import typing

import attrs

from airflow.datasets import Dataset as Asset
from airflow.models.dag import DAG, ScheduleArg
from airflow.operators.python import PythonOperator

if typing.TYPE_CHECKING:
from airflow.io.path import ObjectStoragePath


@attrs.define(kw_only=True)
class AssetDefinition:
"""
Asset representation from decorating a function with ``@asset``.
:meta private:
"""

name: str # TODO: This should be stored on Asset.
asset: Asset
function: types.FunctionType
schedule: ScheduleArg

def __attrs_post_init__(self) -> None:
with DAG(dag_id=self.name, schedule=self.schedule, auto_register=True) as dag:
PythonOperator(task_id="__main__", outlets=[self.asset], python_callable=self.function)
# TODO: Currently this just gets serialized into a string.
# When we create UI for assets, we should add logic to serde so the
# serialized DAG contains appropriate asset information.
dag._wrapped_definition = self


@attrs.define(kw_only=True)
class asset:
"""Create an asset by decorating a materialization function."""

schedule: ScheduleArg
uri: str | ObjectStoragePath | None
extra: dict[str, typing.Any] = attrs.field(factory=dict)

def __call__(self, f: types.FunctionType) -> AssetDefinition:
if (name := f.__name__) != f.__qualname__:
raise ValueError("nested function not supported")
return AssetDefinition(
name=name,
asset=Asset(
uri=name if self.uri is None else str(self.uri),
extra=self.extra,
),
function=f,
schedule=self.schedule,
)
4 changes: 4 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
from sqlalchemy.orm.session import Session

from airflow.decorators import TaskDecoratorCollection
from airflow.decorators.assets import AssetDefinition
from airflow.models.dagbag import DagBag
from airflow.models.operator import Operator
from airflow.serialization.pydantic.dag import DagModelPydantic
Expand Down Expand Up @@ -693,6 +694,9 @@ def __init__(
# fileloc based only on the serialize dag
self._processor_dags_folder = None

# Additional information if this is created from an @asset definition.
self._wrapped_definition: AssetDefinition | None = None

validate_instance_args(self, DAG_ARGS_EXPECTED_TYPES)

def get_doc_md(self, doc_md: str | None) -> str | None:
Expand Down
10 changes: 8 additions & 2 deletions airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
"_processor_dags_folder": {
"anyOf": [
{ "type": "null" },
{"type": "string"}
{ "type": "string" }
]
},
"orientation": { "type" : "string"},
Expand All @@ -180,7 +180,13 @@
{ "$ref": "#/definitions/task_group" }
]},
"edge_info": { "$ref": "#/definitions/edge_info" },
"dag_dependencies": { "$ref": "#/definitions/dag_dependencies" }
"dag_dependencies": { "$ref": "#/definitions/dag_dependencies" },
"_wrapped_definition": {
"anyOf": [
{ "type": "null" },
{ "type": "string" }
]
}
},
"required": [
"_dag_id",
Expand Down
4 changes: 3 additions & 1 deletion airflow/utils/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,9 @@ def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.Zi
with open(file_path, "rb") as dag_file:
content = dag_file.read()
content = content.lower()
return all(s in content for s in (b"dag", b"airflow"))
if b"airflow" not in content:
return False
return any(s in content for s in (b"dag", b"asset"))


def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]:
Expand Down

0 comments on commit 5c2762f

Please sign in to comment.