Skip to content

Commit

Permalink
Add AssetActive model (apache#42612)
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr authored Oct 15, 2024
1 parent cd1e9f5 commit cc0aad0
Show file tree
Hide file tree
Showing 17 changed files with 878 additions and 733 deletions.
2 changes: 1 addition & 1 deletion airflow/api_fastapi/views/ui/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def next_run_assets(
),
isouter=True,
)
.where(DagScheduleAssetReference.dag_id == dag_id, ~AssetModel.is_orphaned)
.where(DagScheduleAssetReference.dag_id == dag_id, AssetModel.active.has())
.group_by(AssetModel.id, AssetModel.uri)
.order_by(AssetModel.uri)
)
Expand Down
20 changes: 16 additions & 4 deletions airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
import logging
from typing import TYPE_CHECKING, NamedTuple

from sqlalchemy import func, select
from sqlalchemy import func, select, tuple_
from sqlalchemy.orm import joinedload, load_only
from sqlalchemy.sql import expression

from airflow.assets import Asset, AssetAlias
from airflow.assets.manager import asset_manager
from airflow.models.asset import (
AssetActive,
AssetAliasModel,
AssetModel,
DagScheduleAssetAliasReference,
Expand Down Expand Up @@ -298,8 +298,6 @@ def add_assets(self, *, session: Session) -> dict[str, AssetModel]:
orm_assets: dict[str, AssetModel] = {
am.uri: am for am in session.scalars(select(AssetModel).where(AssetModel.uri.in_(self.assets)))
}
for model in orm_assets.values():
model.is_orphaned = expression.false()
orm_assets.update(
(model.uri, model)
for model in asset_manager.create_assets(
Expand Down Expand Up @@ -328,6 +326,20 @@ def add_asset_aliases(self, *, session: Session) -> dict[str, AssetAliasModel]:
)
return orm_aliases

def add_asset_active_references(self, assets: Collection[AssetModel], *, session: Session) -> None:
existing_entries = set(
session.execute(
select(AssetActive.name, AssetActive.uri).where(
tuple_(AssetActive.name, AssetActive.uri).in_((asset.name, asset.uri) for asset in assets)
)
)
)
session.add_all(
AssetActive.for_asset(asset)
for asset in assets
if (asset.name, asset.uri) not in existing_entries
)

def add_dag_asset_references(
self,
dags: dict[str, DagModel],
Expand Down
21 changes: 13 additions & 8 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from airflow.jobs.job import Job, perform_heartbeat
from airflow.models import Log
from airflow.models.asset import (
AssetActive,
AssetDagRunQueue,
AssetEvent,
AssetModel,
Expand Down Expand Up @@ -2062,15 +2063,14 @@ def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None:
SerializedDagModel.remove_dag(dag_id=dag.dag_id, session=session)
session.flush()

def _set_orphaned(self, asset: AssetModel) -> int:
self.log.info("Orphaning unreferenced asset '%s'", asset.uri)
asset.is_orphaned = expression.true()
return 1
def _get_orphaning_identifier(self, asset: AssetModel) -> tuple[str, str]:
self.log.info("Orphaning unreferenced %s", asset)
return asset.name, asset.uri

@provide_session
def _orphan_unreferenced_assets(self, session: Session = NEW_SESSION) -> None:
"""
Detect orphaned assets and set is_orphaned flag to True.
Detect orphaned assets and remove their active entry.
An orphaned asset is no longer referenced in any DAG schedule parameters or task outlets.
"""
Expand All @@ -2085,7 +2085,7 @@ def _orphan_unreferenced_assets(self, session: Session = NEW_SESSION) -> None:
isouter=True,
)
.group_by(AssetModel.id)
.where(~AssetModel.is_orphaned)
.where(AssetModel.active.has())
.having(
and_(
func.count(DagScheduleAssetReference.dag_id) == 0,
Expand All @@ -2094,8 +2094,13 @@ def _orphan_unreferenced_assets(self, session: Session = NEW_SESSION) -> None:
)
)

updated_count = sum(self._set_orphaned(asset) for asset in orphaned_asset_query)
Stats.gauge("asset.orphaned", updated_count)
orphaning_identifiers = [self._get_orphaning_identifier(asset) for asset in orphaned_asset_query]
session.execute(
delete(AssetActive).where(
tuple_in_condition((AssetActive.name, AssetActive.uri), orphaning_identifiers)
)
)
Stats.gauge("asset.orphaned", len(orphaning_identifiers))

def _executor_to_tis(self, tis: list[TaskInstance]) -> dict[BaseExecutor, list[TaskInstance]]:
"""Organize TIs into lists per their respective executor."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ def upgrade():
batch_op.add_column(sa.Column("name", _STRING_COLUMN_TYPE))
batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE, default=str, nullable=False))
# Fill name from uri column.
Session(bind=op.get_bind()).execute(sa.text("update dataset set name=uri"))
with Session(bind=op.get_bind()) as session:
session.execute(sa.text("update dataset set name=uri"))
session.commit()
# Set the name column non-nullable.
# Now with values in there, we can create the new unique constraint and index.
# Due to MySQL restrictions, we are also reducing the length on uri.
Expand Down
88 changes: 88 additions & 0 deletions airflow/migrations/versions/0037_3_0_0_add_asset_active.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Add AssetActive to track orphaning instead of a flag.
Revision ID: 5a5d66100783
Revises: c3389cd7793f
Create Date: 2024-10-01 08:39:48.997198
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op
from sqlalchemy.orm import Session

# revision identifiers, used by Alembic.
revision = "5a5d66100783"
down_revision = "c3389cd7793f"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"

_STRING_COLUMN_TYPE = sa.String(length=1500).with_variant(
sa.String(length=1500, collation="latin1_general_cs"),
dialect_name="mysql",
)


def upgrade():
op.create_table(
"asset_active",
sa.Column("name", _STRING_COLUMN_TYPE, nullable=False),
sa.Column("uri", _STRING_COLUMN_TYPE, nullable=False),
sa.PrimaryKeyConstraint("name", "uri", name="asset_active_pkey"),
sa.ForeignKeyConstraint(
columns=["name", "uri"],
refcolumns=["dataset.name", "dataset.uri"],
name="asset_active_asset_name_uri_fkey",
ondelete="CASCADE",
),
sa.Index("idx_asset_active_name_unique", "name", unique=True),
sa.Index("idx_asset_active_uri_unique", "uri", unique=True),
)
with Session(bind=op.get_bind()) as session:
session.execute(
sa.text(
"insert into asset_active (name, uri) "
"select name, uri from dataset where is_orphaned = false"
)
)
session.commit()
with op.batch_alter_table("dataset", schema=None) as batch_op:
batch_op.drop_column("is_orphaned")


def downgrade():
with op.batch_alter_table("dataset", schema=None) as batch_op:
batch_op.add_column(
sa.Column("is_orphaned", sa.Boolean, default=False, nullable=False, server_default="0")
)
with Session(bind=op.get_bind()) as session:
session.execute(
sa.text(
"update dataset set is_orphaned = true "
"where exists (select 1 from asset_active "
"where dataset.name = asset_active.name and dataset.uri = asset_active.uri)"
)
)
session.commit()
op.drop_table("asset_active")
59 changes: 57 additions & 2 deletions airflow/models/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import sqlalchemy_jsonfield
from sqlalchemy import (
Boolean,
Column,
ForeignKey,
ForeignKeyConstraint,
Expand Down Expand Up @@ -192,7 +191,8 @@ class AssetModel(Base):

created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False)
is_orphaned = Column(Boolean, default=False, nullable=False, server_default="0")

active = relationship("AssetActive", uselist=False, viewonly=True)

consuming_dags = relationship("DagScheduleAssetReference", back_populates="dataset")
producing_tasks = relationship("TaskOutletAssetReference", back_populates="dataset")
Expand Down Expand Up @@ -232,6 +232,61 @@ def to_public(self) -> Asset:
return Asset(uri=self.uri, extra=self.extra)


class AssetActive(Base):
"""
Collection of active assets.
An asset is considered active if it is declared by the user in any DAG files.
AssetModel entries that are not active (also called orphaned in some parts
of the code base) are still kept in the database, but have their corresponding
entries in this table removed. This ensures we keep all possible history on
distinct assets (those with non-matching name-URI pairs), but still ensure
*name and URI are each unique* within active assets.
"""

name = Column(
String(length=1500).with_variant(
String(
length=1500,
# latin1 allows for more indexed length in mysql
# and this field should only be ascii chars
collation="latin1_general_cs",
),
"mysql",
),
nullable=False,
)
uri = Column(
String(length=1500).with_variant(
String(
length=1500,
# latin1 allows for more indexed length in mysql
# and this field should only be ascii chars
collation="latin1_general_cs",
),
"mysql",
),
nullable=False,
)

__tablename__ = "asset_active"
__table_args__ = (
PrimaryKeyConstraint(name, uri, name="asset_active_pkey"),
ForeignKeyConstraint(
columns=[name, uri],
refcolumns=["dataset.name", "dataset.uri"],
name="asset_active_asset_name_uri_fkey",
ondelete="CASCADE",
),
Index("idx_asset_active_name_unique", name, unique=True),
Index("idx_asset_active_uri_unique", uri, unique=True),
)

@classmethod
def for_asset(cls, asset: AssetModel) -> AssetActive:
return cls(name=asset.name, uri=asset.uri)


class DagScheduleAssetAliasReference(Base):
"""References from a DAG to an asset alias of which it is a consumer."""

Expand Down
1 change: 1 addition & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2593,6 +2593,7 @@ def bulk_write_to_db(
orm_asset_aliases = asset_op.add_asset_aliases(session=session)
session.flush() # This populates id so we can create fks in later calls.

asset_op.add_asset_active_references(orm_assets.values(), session=session)
asset_op.add_dag_asset_references(orm_dags, orm_assets, session=session)
asset_op.add_dag_asset_alias_references(orm_dags, orm_asset_aliases, session=session)
asset_op.add_task_asset_references(orm_dags, orm_assets, session=session)
Expand Down
1 change: 0 additions & 1 deletion airflow/serialization/pydantic/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class AssetPydantic(BaseModelPydantic):
extra: Optional[dict]
created_at: datetime
updated_at: datetime
is_orphaned: bool

consuming_dags: List[DagScheduleAssetReferencePydantic]
producing_tasks: List[TaskOutletAssetReferencePydantic]
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class MappedClassProtocol(Protocol):
"2.9.0": "1949afb29106",
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"3.0.0": "c3389cd7793f",
"3.0.0": "5a5d66100783",
}


Expand Down
4 changes: 2 additions & 2 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3450,7 +3450,7 @@ def next_run_datasets(self, dag_id):
),
isouter=True,
)
.where(DagScheduleAssetReference.dag_id == dag_id, ~AssetModel.is_orphaned)
.where(DagScheduleAssetReference.dag_id == dag_id, AssetModel.active.has())
.group_by(AssetModel.id, AssetModel.uri)
.order_by(AssetModel.uri)
)
Expand Down Expand Up @@ -3583,7 +3583,7 @@ def datasets_summary(self):
if has_event_filters:
count_query = count_query.join(AssetEvent, AssetEvent.dataset_id == AssetModel.id)

filters = [~AssetModel.is_orphaned]
filters = [AssetModel.active.has()]
if uri_pattern:
filters.append(AssetModel.uri.ilike(f"%{uri_pattern}%"))
if updated_after:
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
8bd129828ba299ef05d70305eee66d15b6c0c79dc6ae82f654b9657464e3682a
0ed26236c783f7524416c1377638fe18ff3520bd355160db48656585ff58524e
Loading

0 comments on commit cc0aad0

Please sign in to comment.