Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AGENT-5360] Create Airflow Operator for Custom Job #86

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datarobot_provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
# Released under the terms of DataRobot Tool and Utility Agreement.
def get_provider_info():
return {
"package-name": "airflow-provider-datarobot",
"package-name": "airflow-provider-datarobot-early-access",
"name": "DataRobot Airflow Provider",
"description": "DataRobot Airflow provider.",
"versions": ["0.0.8"],
"versions": ["0.0.8.1"],
"connection-types": [
{
"hook-class-name": "datarobot_provider.hooks.datarobot.DataRobotHook",
Expand Down
Empty file.
31 changes: 31 additions & 0 deletions datarobot_provider/example_dags/custom_job/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os
from datarobot import Deployment


def main():
print(f"Running python code: {__file__}")

# Using this job runtime parameters
print()
print("Runtime parameters:")
print("-------------------")
string_param = os.environ.get("STRING_PARAMETER", None)
print(f"string param: {string_param}")

deployment_param = os.environ.get("DEPLOYMENT", None)
print(f"deployment_param: {deployment_param}")

model_package_param = os.environ.get("MODEL_PACKAGE", None)
print(f"model_package_param: {model_package_param}")

# An example of using the python client to list deployments
deployments = Deployment.list()
print()
print("List of all deployments")
print("-----------------------")
for deployment in deployments:
print(deployment)


if __name__ == "__main__":
main()
12 changes: 12 additions & 0 deletions datarobot_provider/example_dags/custom_job/metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: runtime-params

runtimeParameterDefinitions:
- fieldName: MODEL_PACKAGE
type: modelPackage
description: Model package that will be used to store key values
- fieldName: DEPLOYMENT
type: deployment
description: Deployment that will be used to make predictions
- fieldName: STRING_PARAMETER
type: string
description: An example of a string parameter
60 changes: 60 additions & 0 deletions datarobot_provider/example_dags/custom_job/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/bin/bash

echo "Job Starting: ($0)"

echo "===== Runtime Parameters ======"
echo "Model Package: $MODEL_PACKAGE"
echo "Deployment: $DEPLOYMENT"
echo "STRING_PARAMETER: $STRING_PARAMETER"
echo
echo
echo "===== Generic Variables ==========================="
echo "CURRENT_CUSTOM_JOB_RUN_ID: $CURRENT_CUSTOM_JOB_RUN_ID"
echo "CURRENT_CUSTOM_JOB_ID: $CURRENT_CUSTOM_JOB_ID"
echo "DATAROBOT_ENDPOINT: $DATAROBOT_ENDPOINT"
echo "DATAROBOT_API_TOKEN: Use the environment variable $DATAROBOT_API_TOKEN"
echo "==================================================="

echo
echo "How to check how much memory your job has"
memory_limit_bytes=$(cat /sys/fs/cgroup/memory/memory.limit_in_bytes)
memory_limit_megabytes=$((memory_limit_bytes / 1024 / 1024))
echo "Memory Limit (in Megabytes): $memory_limit_megabytes"
echo

# Uncomment the following if you want to check if the job has network access
## Define the IP address of an external server to ping (e.g., Google's DNS)
#external_server="8.8.8.8"
#echo "Checking internet connection"
## Try to ping the external server
#ping -c 1 $external_server > /dev/null 2>&1
#
## Check the exit status of the ping command
#if [ $? -eq 0 ]; then
# echo "Internet connection is available."
#else
# echo "No internet connection."
#fi
#echo
#echo

# Run the code in job.py
dir_path=$(dirname $0)
echo "Entrypoint is at $dir_path - cd into it"
cd $dir_path

if command -v python3 &>/dev/null; then
echo "python3 is installed and available."
else
echo "Error: python3 is not installed or not available."
exit 1
fi

python_file="job.py"
if [ -f "$python_file" ]; then
echo "Found $python_file .. running it"
python3 ./job.py
else
echo "File $python_file does not exist"
exit 1
fi
92 changes: 92 additions & 0 deletions datarobot_provider/example_dags/datarobot_custom_job_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Copyright 2024 DataRobot, Inc. and its affiliates.
#
# All rights reserved.
#
# This is proprietary source code of DataRobot, Inc. and its affiliates.
#
# Released under the terms of DataRobot Tool and Utility Agreement.

from datetime import datetime

from airflow.decorators import dag

from datarobot_provider.operators.custom_job import CreateCustomJobOperator
from datarobot_provider.operators.custom_job import AddFilesToCustomJobOperator
from datarobot_provider.operators.custom_job import SetCustomJobExecutionEnvironmentOperator
from datarobot_provider.operators.custom_job import SetCustomJobRuntimeParametersOperator
from datarobot_provider.operators.custom_job import RunCustomJobOperator
from datarobot_provider.sensors.client import BaseAsyncResolutionSensor


@dag(
schedule=None,
start_date=datetime(2023, 1, 1),
tags=['example', 'custom job'],
params={},
)
def create_custom_custom_job():
create_custom_job_op = CreateCustomJobOperator(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to create another kind of create operator which includes files, environment and runtime parameters all in one? This way the DAG could be simplified. Having these more specific operators makes sense, but I think a super-creator operator could be useful.

task_id='create_custom_job',
name="airflow-test-create-custom-job-v556",
description="demo-test-demonstration",
)

add_files_to_custom_job_op = AddFilesToCustomJobOperator(
task_id='add_files_to_custom_job',
custom_job_id=create_custom_job_op.output,
files_path="custom_job/",
)

# list_execution_env_op = ListExecutionEnvironmentOperator(
# task_id='list_execution_env',
# search_for="Python 3.9 PyTorch Drop-In"
# )

set_env_to_custom_job_op = SetCustomJobExecutionEnvironmentOperator(
task_id='set_env_to_custom_job',
custom_job_id=create_custom_job_op.output,
environment_id='5e8c888007389fe0f466c72b',
environment_version_id='65c1db901800cd9782d7ac07',
)

set_runtime_parameters_op = SetCustomJobRuntimeParametersOperator(
task_id='set_runtime_parameters',
custom_job_id=create_custom_job_op.output,
runtime_parameter_values=[
{"fieldName": "DEPLOYMENT", "type": "deployment", "value": "650ef15944f21ea1a3c91a25"},
{
"fieldName": "MODEL_PACKAGE",
"type": "modelPackage",
"value": "654b9b228404a39b5c8da5b2",
},
{"fieldName": "STRING_PARAMETER", "type": "string", "value": 'my test string'},
],
)

run_custom_job_op = RunCustomJobOperator(
task_id='run_custom_job',
custom_job_id=create_custom_job_op.output,
)

custom_job_complete_sensor = BaseAsyncResolutionSensor(
task_id="check_custom_job_complete",
job_id=run_custom_job_op.output,
poke_interval=5,
mode="reschedule",
timeout=3600,
)

(
create_custom_job_op
>> add_files_to_custom_job_op
>> set_env_to_custom_job_op
>> set_runtime_parameters_op
>> run_custom_job_op
>> custom_job_complete_sensor
)


create_custom_job_dag = create_custom_custom_job()

if __name__ == "__main__":
create_custom_job_dag.test()
Loading