Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DENG-1844] Slack alert #2

Open
wants to merge 2 commits into
base: v2.8.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 67 additions & 20 deletions dags/example_dag_with_taskflow_api.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,68 @@
# DAG exhibiting task flow paradigm in airflow 2.0
# https://airflow.apache.org/docs/apache-airflow/2.0.2/tutorial_taskflow_api.html
# Modified for our use case

import json
import pendulum

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
from airflow.models import Variable
from airflow.hooks.base_hook import BaseHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator


def task_failure_slack_alert(context):
print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")

environment = Variable.get("airflow_environment", default_var="local")
if environment not in ["dev", "prod"] :
print(f"{environment} mode - skip sending alerts.")
return

ti = context['ti'] # to get the Task Instance
TASK_STATE = ti.state
TASK_ID = context.get('task_instance').task_id
DAG_ID = context.get('task_instance').dag_id
EXECUTION_TIME = context.get('execution_date')
LOG_URL = context.get('task_instance').log_url

slack_msg = f"""
:red_circle: Task Failed.
*Dag*: {DAG_ID}
*Task*: {TASK_ID}
*Task State*: {TASK_STATE}
*Execution Time*: {EXECUTION_TIME}
*Log URL*: <{LOG_URL}|*Logs*>
"""

SLACK_FAILURE_ALERT_CONN_ID = "slack_data_alerts"
CHANNEL = BaseHook.get_connection(SLACK_FAILURE_ALERT_CONN_ID).login

slack_alert = SlackWebhookOperator(
task_id=TASK_ID,
slack_webhook_conn_id = SLACK_FAILURE_ALERT_CONN_ID,
message=slack_msg,
channel=CHANNEL,
username='airflow'
)

return slack_alert.execute(context=context)

default_args = {
'owner': 'airflow',
'owner':'data platform',
'retries': 0,
'start_date': pendulum.datetime(2021, 1, 1, tz="UTC")
}
@dag(default_args=default_args, schedule_interval="@daily", start_date=days_ago(2), tags=['example'])
def dag_with_taskflow_api():
"""
### TaskFlow API Tutorial Documentation
This is a simple ETL data pipeline example which demonstrates the use of
the TaskFlow API using three simple tasks for Extract, Transform, and Load.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.apache.org/docs/stable/tutorial_taskflow_api.html)
"""

@dag(
dag_id="example_taskflow_api",
description="Taskflow api example",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
default_args = default_args,
catchup=False,
on_success_callback=None,
on_failure_callback=task_failure_slack_alert,
tags=["example"],
)

def tutorial_taskflow_api():
@task()
def extract():
"""
Expand All @@ -32,6 +75,7 @@ def extract():

order_data_dict = json.loads(data_string)
return order_data_dict

@task(multiple_outputs=True)
def transform(order_data_dict: dict):
"""
Expand All @@ -45,6 +89,7 @@ def transform(order_data_dict: dict):
total_order_value += value

return {"total_order_value": total_order_value}

@task()
def load(total_order_value: float):
"""
Expand All @@ -53,8 +98,10 @@ def load(total_order_value: float):
instead of saving it to end user review, just prints it out.
"""

print("Total order value is: %.2f" % total_order_value)
print(f"Total order value is: {total_order_value:.2f}")

order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
dag_with_taskflow_api = dag_with_taskflow_api()

tutorial_taskflow_api()
16 changes: 15 additions & 1 deletion docker/config/.env.localrunner
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,24 @@
# Example environment variables using temporary security credentials
# AWS_ACCESS_KEY_ID=XXXXXXXXXX
# AWS_SECRET_ACCESS_KEY=YYYYYYYYYYYY
# AWS_SESSION_TOKEN=ZZZZZZZZZZ
# AWS_SESSION_TOKEN=ZZZZZZZZZZ

# to change default password you'll need to delete the db-data folder (when running locally)
DEFAULT_PASSWORD="test"
S3_DAGS_PATH=""
S3_PLUGINS_PATH=""
S3_REQUIREMENTS_PATH=""
ENVIRONMENT="local"

# Airflow webserver instance name
AIRFLOW__WEBSERVER__INSTANCE_NAME="DAGs (Local)"

# Airflow variables
# Example: "AIRFLOW_VAR_{}"
AIRFLOW_VAR_AIRFLOW_ENVIRONMENT="local"

# Example of JSON format variable
# AIRFLOW_VAR_SLACK_DMS_MONITOR='{"key":"","value":""}'

# Airflow connections
# Example: "AIRFLOW_CONN_{}"
1 change: 1 addition & 0 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ apache-airflow-providers-snowflake==5.2.1
apache-airflow-providers-mysql==5.5.1
apache-airflow-providers-slack==8.5.1
boto3==1.33.13
astronomer-cosmos==1.3.2