diff --git a/AUTHORS.rst b/AUTHORS.rst index a852e7ee..6a15b77d 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -6,7 +6,9 @@ The list of contributors in alphabetical order: - `Anton Khodak `_ - `Diego Rodriguez `_ - `Dinos Kousidis `_ +- `Cody Kankel `_ - `Jan Okraska `_ +- `Kenyi Hurtado-Anampa `_ - `Rokas Maciulaitis `_ - `Sinclert Perez `_ - `Tibor Simko `_ diff --git a/Dockerfile b/Dockerfile index 24a29676..c741ae1f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,17 +7,19 @@ FROM python:3.6-slim ENV TERM=xterm + RUN apt-get update && \ apt-get install -y vim-tiny && \ pip install --upgrade pip RUN export DEBIAN_FRONTEND=noninteractive ;\ - apt-get -yq install krb5-user \ + apt-get -yq install --no-install-recommends \ + krb5-user \ krb5-config \ libkrb5-dev \ libauthen-krb5-perl \ gcc; -ADD etc/krb5.conf /etc/krb5.conf +COPY etc/krb5.conf /etc/krb5.conf ARG COMPUTE_BACKENDS=kubernetes diff --git a/etc/htcondorvc3/job_wrapper.sh b/etc/htcondorvc3/job_wrapper.sh new file mode 100755 index 00000000..0c5dd6e9 --- /dev/null +++ b/etc/htcondorvc3/job_wrapper.sh @@ -0,0 +1,236 @@ +#!/bin/bash + +# Job wrapper for htcondorvc3. Setups up directory +# structure and discovers which container technology is availble. +# Then executes the payload from reana. +# @TODO: This could be executed +# in +PreCmd as a separate script. + +# Expected arguments from htcondorvc3_job_manager: +# $1 : workflow_workspace +# $2 : DOCKER_IMG +# $3 + : cmd + +# Defining inputs +DOCKER_IMG=$2 +REANA_WORKFLOW_DIR=$1 +CMD=${@:3} + +# Get static version of parrot. +# Note: We depend on curl for this. +# Assumed to be available on HPC worker nodes (might need to transfer a static version otherwise). +get_parrot(){ + curl --retry 5 -o parrot_static_run https://s3.ssl-hep.org/vc3-repository-70e707f3-6df8-41f6-adb2-fa5ecba1548d/builder-files/parrot_static_run_v7.0.11 > /dev/null 2>&1 + if [ -e "parrot_static_run" ]; then + chmod +x parrot_static_run + else + echo "[Error] Could not download parrot" >&2 + exit 210 + fi +} + +populate(){ + if [ ! -x "$_CONDOR_SCRATCH_DIR/parrot_static_run" ]; then get_parrot; fi + mkdir -p "$_CONDOR_SCRATCH_DIR/$REANA_WORKFLOW_DIR" + local parent="$(dirname $REANA_WORKFLOW_DIR)" + if [ -z ${t##*madminer*} ]; then + # In case of madminer, define files and directories to copy based on arguments + for arg in $CMD; do + if [ -z "${arg##*/reana/users*}" ]; then + # Strip single and double quotes + arg=$(echo $arg | sed "s/'//g" | sed 's/"//g') + $_CONDOR_SCRATCH_DIR/parrot_static_run ls "/chirp/CONDOR${arg}" + res=$? + if [ $res == 0 ]; then + local argparent=$(dirname $arg) + mkdir -p "$_CONDOR_SCRATCH_DIR/$argparent" + $_CONDOR_SCRATCH_DIR/parrot_static_run -T 30 cp --no-clobber -r "/chirp/CONDOR/$arg" "$_CONDOR_SCRATCH_DIR/$argparent" + fi + fi + done + else + # Otherwise, copy all the workflow dir + $_CONDOR_SCRATCH_DIR/parrot_static_run -T 30 cp --no-clobber -r "/chirp/CONDOR/$REANA_WORKFLOW_DIR" "$_CONDOR_SCRATCH_DIR/$parent" + fi +} + +find_module(){ + module > /dev/null 2>&1 + if [ $? == 0 ]; then + return 0 + elif [ -e /etc/profile.d/modules.sh ]; then + source /etc/profile.d/modules.sh + fi + module > /dev/null 2>&1 + return $? +} + +# Discover the container technology available. +# Currently searching for: Singularity or Shifter. +# Returns 0: Successful discovery of a container +# 1: Couldn't find a container +find_container(){ + declare -a search_list=("singularity" "shifter") + declare -a found_list=() + local default="shifter" + local cont_found=false + + + for cntr in "${search_list[@]}"; do + cntr_path="$(command -v $cntr)" + if [[ -x "$cntr_path" ]] # Checking binaries in path + then + if [ "$(basename "$cntr_path")" == "$default" ]; then + CONTAINER_PATH="$cntr_path" + return 0 + else + found_list+=("$cntr_path") + cont_found=true + fi + fi + done + # If VC3 didn't automatically load a module (fail-safe) + if [ ! "$cont_found" ]; then + for cntr in "${search_list[@]}"; do + find_module + module_found=$? + if [ $module_found == 0 ]; then + for var in ${search_list[*]}; do + module load $var 2>/dev/null + var_path="$(command -v $var 2>/dev/null)" + if [ "$(basename "$var_path")" == "$default" ]; then + CONTAINER_PATH="$var_path" + return 0 + else + found_list+=("$var_path") + cont_found=true + fi + done + fi + done + fi + + # If default wasn't found but a container was found, use that + if (( "${#found_list[@]}" >= 1 )); then + CONTAINER_PATH=${found_list[0]} + return 0 + else + return 1 # No containers found + fi +} + +# Setting up cmd line args for singularity +# Print's stdout the argument line for running singularity utilizing +setup_singularity(){ + # TODO: Cleanup calling of this function + + # Send cache to $SCRATCH or to the condor scratch directory + # otherwise + if [ -z "$SCRATCH" ]; then + CONTAINER_ENV="SINGULARITY_CACHEDIR=\"\$_CONDOR_SCRATCH_DIR\"" + else + CONTAINER_ENV="SINGULARITY_CACHEDIR=\"\$SCRATCH\"" + fi + + CNTR_ARGUMENTS="exec -B ./$REANA_WORKFLOW_DIR:$REANA_WORKFLOW_DIR docker://$DOCKER_IMG" + +} + +# Setting up shifter. Pull the docker_img into the shifter image gateway +# and dump required arguments into stdout to be collected by a function call +setup_shifter(){ + #TODO: Cleanup calling of this function + # Check for shifterimg + if [[ ! $(command -v shifterimg 2>/dev/null) ]]; then + echo "Error: shifterimg not found..." >&2 + exit 127 + fi + + # Attempt to pull image into image-gateway + if ! shifterimg pull "$DOCKER_IMG" >/dev/null 2>&1; then + echo "Error: Could not pull img: $DOCKER_IMG" >&2 + exit 127 + fi + + # Put arguments into stdout to collect. + echo "--image=docker:${DOCKER_IMG} --volume=$(pwd -P)/reana:/reana -- " +} + +# Setting up the arguments to pass to a container technology. +# Currently able to setup: Singularity and Shifter. +# Creates cmd line arguements for containers and pull image if needed (shifter) +# Global arguments is used as the arguments to a container +setup_container(){ + # Need to cleanup to make more automated. + # i.e. run through the same list in find_container + local container=$(basename "$CONTAINER_PATH") + + if [ "$container" == "singularity" ]; then + setup_singularity + elif [ "$container" == "shifter" ]; then + CNTR_ARGUMENTS=$(setup_shifter) + else + echo "Error: Unrecognized container: $(basename $CONTAINER_PATH)" >&2 + exit 127 + fi +} + +######## Setup environment ############# +# @TODO: This should be done in a prologue +# in condor via +PreCmd, eventually. +############################# + +find_container +if [ $? != 0 ]; then + echo "[Error]: Container technology could not be found in the sytem." >&2 + exit 127 +fi +populate +setup_container + +######## Execution ########## +# Note: Double quoted arguments are broken +# and passed as multiple arguments +# in bash for some reason, working that +# around by dumping command to a +# temporary wrapper file named tmpjob. +tmpjob=$(mktemp -p .) +chmod +x $tmpjob +if command -v aprun; then + echo -n "aprun -b -n 1 -- " > $tmpjob +fi + +echo "$CONTAINER_ENV" "$CONTAINER_PATH" "$CNTR_ARGUMENTS" "${@:3} " >> $tmpjob +bash $tmpjob +res=$? +rm $tmpjob + +if [ $res != 0 ]; then + echo "[Error] Execution failed with error code: $res" >&2 + exit $res +fi + +###### Stageout ########### +# TODO: This shoul be done in an epilogue +# via +PostCmd, eventually. +# Not implemented yet. +# Read files from $reana_workflow_outputs +# and write them into $REANA_WORKFLOW_DIR +# Stage out depending on the protocol +# E.g.: +# - file: will be transferred via condor_chirp +# - xrootd:////store/user/path:file: will be transferred via XRootD +# Only chirp transfer supported for now. +# Use vc3-builder to get a static version +# of parrot (eventually, a static version +# of the chirp client only). +if [ "x$REANA_WORKFLOW_DIR" == "x" ]; then + echo "[Info]: Nothing to stage out" + exit $res +fi + +parent="$(dirname $REANA_WORKFLOW_DIR)" +# TODO: Check for parrot exit code and propagate it in case of errors. +./parrot_static_run -T 30 cp --no-clobber -r "$_CONDOR_SCRATCH_DIR/$REANA_WORKFLOW_DIR" "/chirp/CONDOR/$parent" + +exit $res diff --git a/reana_job_controller/config.py b/reana_job_controller/config.py index d2fa6dbf..1e425507 100644 --- a/reana_job_controller/config.py +++ b/reana_job_controller/config.py @@ -16,17 +16,18 @@ HTCondorJobManagerCERN from reana_job_controller.job_monitor import (JobMonitorHTCondorCERN, JobMonitorKubernetes, - JobMonitorSlurmCERN) + JobMonitorSlurmCERN, + JobMonitorHTCondorVC3) from reana_job_controller.kubernetes_job_manager import KubernetesJobManager from reana_job_controller.slurmcern_job_manager import SlurmJobManagerCERN - -SHARED_VOLUME_PATH_ROOT = os.getenv('SHARED_VOLUME_PATH_ROOT', '/var/reana') -"""Root path of the shared volume .""" +from reana_job_controller.htcondorvc3_job_manager import \ + HTCondorJobManagerVC3 COMPUTE_BACKENDS = { 'kubernetes': KubernetesJobManager, 'htcondorcern': HTCondorJobManagerCERN, - 'slurmcern': SlurmJobManagerCERN + 'slurmcern': SlurmJobManagerCERN, + 'htcondorvc3' : HTCondorJobManagerVC3 } """Supported job compute backends and corresponding management class.""" @@ -34,11 +35,11 @@ 'kubernetes': JobMonitorKubernetes, 'htcondorcern': JobMonitorHTCondorCERN, 'slurmcern': JobMonitorSlurmCERN, + 'htcondorvc3': JobMonitorHTCondorVC3 } """Classes responsible for monitoring specific backend jobs""" - -DEFAULT_COMPUTE_BACKEND = 'kubernetes' +DEFAULT_COMPUTE_BACKEND = 'htcondorvc3' """Default job compute backend.""" JOB_HOSTPATH_MOUNTS = [] @@ -66,8 +67,10 @@ ``/usr/local/share/mydata`` in the host machine. """ -SUPPORTED_COMPUTE_BACKENDS = os.getenv('COMPUTE_BACKENDS', - DEFAULT_COMPUTE_BACKEND).split(",") +SUPPORTED_COMPUTE_BACKENDS = DEFAULT_COMPUTE_BACKEND.split(",") +#SUPPORTED_COMPUTE_BACKENDS = os.getenv('COMPUTE_BACKENDS', +# DEFAULT_COMPUTE_BACKEND).split(",") + """List of supported compute backends provided as docker build arg.""" KRB5_CONTAINER_IMAGE = os.getenv('KRB5_CONTAINER_IMAGE', diff --git a/reana_job_controller/factory.py b/reana_job_controller/factory.py index 8157c2d1..815f0144 100644 --- a/reana_job_controller/factory.py +++ b/reana_job_controller/factory.py @@ -29,7 +29,7 @@ def create_app(JOB_DB=None, config_mapping=None): app.config.from_object(config) if config_mapping: app.config.from_mapping(config_mapping) - if 'htcondorcern' in app.config['SUPPORTED_COMPUTE_BACKENDS']: + if 'htcondorcern' or 'htcondorvc3' in app.config['SUPPORTED_COMPUTE_BACKENDS']: app.htcondor_executor = ThreadPoolExecutor(max_workers=1) with app.app_context(): app.config['OPENAPI_SPEC'] = build_openapi_spec() diff --git a/reana_job_controller/htcondorvc3_job_manager.py b/reana_job_controller/htcondorvc3_job_manager.py new file mode 100644 index 00000000..deb2bb3c --- /dev/null +++ b/reana_job_controller/htcondorvc3_job_manager.py @@ -0,0 +1,174 @@ +# -*- coding: utf-8 -*- +# +# This file is part of REANA. +# Copyright (C) 2019, 2020 CERN. +# +# REANA is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +"""HTCondor VC3 Job Manager.""" + +import logging +import traceback +import uuid +import htcondor +import classad +import os +import re +import shutil +import filecmp +import pwd + +from retrying import retry + +from kubernetes.client.rest import ApiException +from reana_commons.config import K8S_DEFAULT_NAMESPACE +from reana_db.database import Session +from reana_db.models import Workflow + +from reana_job_controller.job_manager import JobManager + + +"""Number of retries for a job before considering it as failed.""" +MAX_NUM_RETRIES = 10 + +@retry(stop_max_attempt_number=MAX_NUM_RETRIES) +def submit(schedd, sub): + """Submit condor job to local schedd. + + :param schedd: The local + """ + try: + with schedd.transaction() as txn: + clusterid = sub.queue(txn) + except Exception as e: + logging.debug("Error submission: {0}".format(e)) + raise e + + return clusterid + +def get_input_files(workflow_workspace): + """Get files from workflow space. + + :param workflow_workspace: Workflow directory + """ + # First, get list of input files + input_files = [] + for root, dirs, files in os.walk(workflow_workspace): + for filename in files: + input_files.append(os.path.join(root, filename)) + + return ",".join(input_files) + +def get_schedd(): + """Find and return the HTCondor sched. + + :returns: htcondor schedd object. + """ + schedd_ad = classad.ClassAd() + schedd_ad["MyAddress"] = os.environ.get("REANA_JOB_CONTROLLER_VC3_HTCONDOR_ADDR", None) + schedd = htcondor.Schedd(schedd_ad) + return schedd + +def get_wrapper(workflow_workspace): + """Get bash job wrapper for executing on remote HPC worker node. Transfer if it does not exist. + + :param workflow_workspace: Shared FS directory, e.g.: /var/reana. + :type workflow_workspace: str + """ + wrapper = os.path.join(workflow_workspace, 'wrapper', 'job_wrapper.sh') + local_wrapper = '/code/etc/htcondorvc3/job_wrapper.sh' + if os.path.exists(wrapper) and filecmp.cmp(local_wrapper, wrapper): + return wrapper + try: + if not os.path.isdir(os.path.dirname(wrapper)): + os.mkdir(os.path.dirname(wrapper)) + shutil.copy('/code/etc/htcondorvc3/job_wrapper.sh', wrapper) + except Exception as e: + logging.debug("Error transfering wrapper : {0}.".format(e)) + logging.debug("user: {0}".format(pwd.getpwuid(os.getuid()).pw_name)) + raise e + + return wrapper + +class HTCondorJobManagerVC3(JobManager): + """HTCondor VC3 job management.""" + + def __init__(self, docker_img=None, cmd=None, prettified_cmd=None, + env_vars=None, workflow_uuid=None, workflow_workspace=None, + cvmfs_mounts='false', shared_file_system=False, + job_name=None, kerberos=False, kubernetes_uid=None): + """Instantiate HTCondorVC3 job manager. + + :param docker_img: Docker image. + :type docker_img: str + :param cmd: Command to execute. + :type cmd: list + :param prettified_cmd: pretified version of command to execute. + :type prettified_cmd: str + :param env_vars: Environment variables. + :type env_vars: dict + :param workflow_id: Unique workflow id. + :type workflow_id: str + :param workflow_workspace: Workflow workspace path. + :type workflow_workspace: str + :param cvmfs_mounts: list of CVMFS mounts as a string. + :type cvmfs_mounts: str + :param shared_file_system: if shared file system is available. + :type shared_file_system: bool + :param job_name: Name of the job + :type job_name: str + """ + self.docker_img = docker_img or '' + self.cmd = cmd or '' + self.env_vars = env_vars or {} + self.workflow_uuid = workflow_uuid + self.compute_backend = "HTCondorVC3" + self.workflow_workspace = workflow_workspace + self.cvmfs_mounts = cvmfs_mounts + self.shared_file_system = shared_file_system + self.schedd = get_schedd() + self.wrapper = get_wrapper(workflow_workspace) + self.job_name = job_name + self.kerberos = kerberos + self.prettified_cmd = prettified_cmd + + + @JobManager.execution_hook + def execute(self): + """Execute / submit a job with HTCondor.""" + sub = htcondor.Submit() + sub['executable'] = self.wrapper + sub['arguments'] = "{0} {1} {2}".format(self.workflow_workspace,self.docker_img, + re.sub(r'"', '\\"', self.cmd)) + sub['Output'] = '/tmp/$(Cluster)-$(Process).out' + sub['Error'] = '/tmp/$(Cluster)-$(Process).err' + sub['InitialDir'] = '/tmp' + sub['+WantIOProxy'] = 'true' + job_env = 'reana_workflow_dir={0}'.format(self.workflow_workspace) + for key, value in self.env_vars.items(): + job_env += '; {0}={1}'.format(key, value) + sub['environment'] = job_env + sub['on_exit_remove'] = '(ExitBySignal == False) && ((ExitCode == 0) || (ExitCode !=0 && NumJobStarts > {0}))'.format(MAX_NUM_RETRIES) + clusterid = submit(self.schedd, sub) + logging.warning("Submitting job clusterid: {0}".format(clusterid)) + return str(clusterid) + + + def add_shared_volume(self, job): + """Add shared CephFS volume to a given job.""" + pass #Not Implemented yet + + + def stop(backend_job_id): + """Stop HTCondor job execution. + + :param backend_job_id: HTCondor cluster ID of the job to be removed. + :type backend_job_id: str + """ + try: + schedd.act( + htcondor.JobAction.Remove, + 'ClusterId=={}'.format(backend_job_id)) + except Exception as e: + logging.error(e, exc_info=True) diff --git a/reana_job_controller/job_db.py b/reana_job_controller/job_db.py index 861d2e2a..62d91343 100644 --- a/reana_job_controller/job_db.py +++ b/reana_job_controller/job_db.py @@ -43,6 +43,13 @@ def retrieve_k8s_job(job_id): """ return JOB_DB[job_id]['obj'] +def retrieve_condor_job(job_id): + """Retrieve the Condor job. + + :param job_id: String which represents the ID of the job. + :returns: The string of the HTCondor job assigned to the job_id. + """ + return JOB_DB[job_id]['obj'] def retrieve_backend_job_id(job_id): """Retrieve compute backend job id. diff --git a/reana_job_controller/job_monitor.py b/reana_job_controller/job_monitor.py index dd31ef60..0b7e526c 100644 --- a/reana_job_controller/job_monitor.py +++ b/reana_job_controller/job_monitor.py @@ -23,6 +23,8 @@ HTCondorJobManagerCERN from reana_job_controller.job_db import JOB_DB from reana_job_controller.kubernetes_job_manager import KubernetesJobManager +from reana_job_controller.htcondorvc3_job_manager import \ + HTCondorJobManagerVC3 from reana_job_controller.slurmcern_job_manager import SlurmJobManagerCERN from reana_job_controller.utils import SSHClient, singleton @@ -260,6 +262,98 @@ def watch_jobs(self, job_db, app): logging.error("Unexpected error: {}".format(e), exc_info=True) time.sleep(120) +@singleton +class JobMonitorHTCondorVC3(JobMonitor): + """HTCondor jobs monitor VC3.""" + + def __init__(self, app): + """Initialize HTCondor job monitor thread.""" + from reana_job_controller.htcondorvc3_job_manager import get_schedd + self.schedd = get_schedd() + super(__class__, self).__init__( + thread_name='htcondorvc3_job_monitor', + app=app + ) + + def query_condor_jobs(self, backend_job_ids): + """Query condor jobs. Return iterable.""" + logging.debug("Will query jobs: {0}".format(backend_job_ids)) + schedd = self.schedd + ads = ['ClusterId', 'JobStatus', 'ExitCode'] + base_query = 'ClusterId == {} ||' + query = '' + for job_id in backend_job_ids: + query += base_query.format(job_id) + query = query[:-2] + try: + condor_jobs = schedd.history(query, ads) + # schedd history is not sorted by ClusterId + # as opposed to xquery, so we need to work that around + # though we lose the iterator feature and get a list + condor_jobs = sorted(condor_jobs, key=lambda x: x['ClusterId']) + return condor_jobs + except Exception as e: + logging.debug(e) + return + + def watch_jobs(self, job_db, app): + """Watch currently running HTCondor jobs. + + :param job_db: Dictionary which contains all current jobs. + """ + ads = ['ClusterId', 'JobStatus', 'ExitCode'] + statuses_to_skip = ['succeeded', 'failed'] + while True: + try: + logging.info('Starting a new stream request to watch Condor Jobs') + backend_job_ids = \ + [job_dict['backend_job_id'] for id, job_dict in + job_db.items() + if not job_db[id]['deleted'] and + job_db[id]['compute_backend'] == 'htcondorvc3'] + future_condor_jobs = app.htcondor_executor.submit( + self.query_condor_jobs, + backend_job_ids) + condor_jobs = future_condor_jobs.result() + for job_id, job_dict in job_db.items(): + if job_db[job_id]['deleted'] or \ + job_db[job_id]['compute_backend'] != 'htcondorvc3' or \ + job_db[job_id]['status'] in statuses_to_skip: + continue + try: + logging.debug('Looking for job {} in schedd history'\ + .format(job_dict['backend_job_id'])) + condor_job = \ + next(job for job in condor_jobs + if str(job['ClusterId']) == str(job_dict + ['backend_job_id'])) + except Exception as e: + # Did not match to any job in the history queue yet + msg = 'Job with id {} was not found in schedd history yet.'\ + .format(job_dict['backend_job_id']) + logging.debug(msg) + logging.debug(e) + continue + if condor_job['JobStatus'] == condorJobStatus['Completed']: + if condor_job['ExitCode'] == 0: + job_db[job_id]['status'] = 'succeeded' + else: + logging.info( + 'Job job_id: {0}, condor_job_id: {1} ' + 'failed'.format(job_id, + condor_job['ClusterId'])) + job_db[job_id]['status'] = 'failed' + # @todo: Grab/Save logs when job either succeeds or fails. + job_db[job_id]['deleted'] = True + elif condor_job['JobStatus'] == condorJobStatus['Held']: + logging.info('Job Was held, will delette and set as failed') + HTCondorJobManagerVC3.stop(condor_job['ClusterId']) + job_db[job_id]['deleted'] == True + time.sleep(120) + except Exception as e: + logging.error("Unexpected error: {}".format(e), exc_info=True) + time.sleep(120) + slurmJobStatus = { 'failed': ['BOOT_FAIL', 'CANCELLED', 'DEADLINE', 'FAILED', 'NODE_FAIL', diff --git a/setup.py b/setup.py index 3819e3b2..f6e23d81 100644 --- a/setup.py +++ b/setup.py @@ -48,9 +48,10 @@ 'Flask>=0.11', 'fs>=2.0', 'marshmallow>2.13.0,<=2.20.1', + 'kubernetes==10.0.1', 'reana-commons[kubernetes]>=0.6.0,<0.7.0', 'reana-db>=0.6.0,<0.7.0', - 'htcondor==8.9.2', + 'htcondor==8.9.6', 'retrying>=1.3.3', 'paramiko[gssapi]>=2.6.0', ]