Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for running post-processing on GCP Cloud Run #21

Merged
merged 7 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 150 additions & 18 deletions buildstockbatch/gcp/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import argparse
import collections
import csv
from dask.distributed import Client as DaskClient
from fsspec.implementations.local import LocalFileSystem
from gcsfs import GCSFileSystem
import gzip
Expand All @@ -43,6 +44,7 @@
from google.cloud import batch_v1, storage
from google.cloud.storage import transfer_manager
from google.cloud import compute_v1
from google.cloud import run_v2

from buildstockbatch import postprocessing
from buildstockbatch.cloud.docker_base import DockerBatchBase
Expand Down Expand Up @@ -234,6 +236,20 @@ def repository_uri(self):
"""
return f"{self.region}-docker.pkg.dev/{self.gcp_project}/{self.ar_repo}/buildstockbatch"

@property
def postprocessing_job_id(self):
return f"{self.job_identifier}-pp"

@property
def postprocessing_job_name(self):
return f"projects/{self.gcp_project}/locations/{self.region}" \
f"/jobs/{self.postprocessing_job_id}"

@property
def postprocessing_job_console_url(self):
return f"https://console.cloud.google.com/run/jobs/details/{self.region}" \
f"/{self.postprocessing_job_id}/executions?project={self.gcp_project}"

# todo: aws-shared (see file comment)
def build_image(self):
"""
Expand Down Expand Up @@ -355,9 +371,8 @@ def gcp_batch_job_name(self):

def clean(self):
delete_job(self.gcp_batch_job_name)
# TODO: Clean up docker images in AR (and locally?)
self.clean_postprocessing_job()

logger.warning("TODO: clean() not fully implemented yet!")

def list_jobs(self):
"""
Expand Down Expand Up @@ -819,28 +834,139 @@ def process_results(self, skip_combine=False, use_dask_cluster=True):
Here, where writing to GCS is (currently) coupled to running on GCS, the writing
to GCS will happen indirectly (via `postprocessing.combine_results()`), and we don't need to
also try to explicitly upload results.

TODO: `use_dask_cluster` (which comes from the parent implementation) is ignored. The job,
run on Cloud Run, always uses Dask, in part because `postprocessing.combine_results` fails
if `DaskClient()` is not initialized. Once `combine_results` is fixed to work without
DaskClient, the `use_dask_cluster` param needs to be piped through environment variables to
`run_combine_results_on_cloud`.
"""

wfg_args = self.cfg["workflow_generator"].get("args", {})
if self.cfg["workflow_generator"]["type"] == "residential_hpxml":
if "simulation_output_report" in wfg_args.keys():
if "timeseries_frequency" in wfg_args["simulation_output_report"].keys():
do_timeseries = wfg_args["simulation_output_report"]["timeseries_frequency"] != "none"
else:
do_timeseries = "timeseries_csv_export" in wfg_args.keys()

if not skip_combine:
self.start_combine_results_job_on_cloud(self.results_dir, do_timeseries=do_timeseries)


@classmethod
def run_combine_results_on_cloud(cls, gcs_bucket, gcs_prefix, results_dir, do_timeseries):
"""This is the function that is run on the cloud to actually perform `combine_results` on
the cloud.
"""
logger.info("run_combine_results_on_cloud starting")
client = storage.Client()
bucket = client.get_bucket(gcs_bucket)

logger.debug("Reading config")
blob = bucket.blob(f"{gcs_prefix}/config.json")
cfg = json.loads(blob.download_as_string())

DaskClient()
postprocessing.combine_results(GCSFileSystem(), results_dir, cfg, do_timeseries=do_timeseries)


def start_combine_results_job_on_cloud(self, results_dir, do_timeseries=True):
"""Set up `combine_results` to be run on GCP Cloud Run.

Parameters are passed to `combine_results` (so see that for parameter documentation).
"""
logger.info("Creating job to run combine_results on Cloud Run...")

# Define the Job
pp_env_cfg = self.cfg["gcp"].get("postprocessing_environment", {})
job = run_v2.Job(
template=run_v2.ExecutionTemplate(
template=run_v2.TaskTemplate(
containers=[
run_v2.Container(
name=self.job_identifier,
image=self.repository_uri + ":" + self.job_identifier,
resources=run_v2.ResourceRequirements(
limits={
"memory": f"{pp_env_cfg.get('memory_mib', 4096)}Mi",
"cpu": str(pp_env_cfg.get('cpus', 2)),
}
),
command=["/bin/sh"],
args=["-c", "python3 -m buildstockbatch.gcp.gcp"],
env=[
run_v2.EnvVar(name="JOB_TYPE", value="POSTPROCESS"),
run_v2.EnvVar(name="GCS_PREFIX", value=self.gcs_prefix),
run_v2.EnvVar(name="GCS_BUCKET", value=self.gcs_bucket),
run_v2.EnvVar(name="RESULTS_DIR", value=results_dir),
run_v2.EnvVar(name="DO_TIMESERIES", value="True" if do_timeseries else "False")
],
)
],
timeout=f"{60 * 60 * 24}s", # 24h
max_retries=0,
)
)
)

if use_dask_cluster:
self.get_dask_client() # noqa F841
# Create the job
lathanh marked this conversation as resolved.
Show resolved Hide resolved
jobs_client = run_v2.JobsClient()
jobs_client.create_job(
run_v2.CreateJobRequest(
parent=f"projects/{self.gcp_project}/locations/{self.region}",
job_id=self.postprocessing_job_id,
job=job
)
)
logger.info("Cloud Run job created (but not yet started). See status at:"
f" {self.postprocessing_job_console_url}")

# Start the job!
jobs_client.run_job(name=self.postprocessing_job_name)
logger.info("Post-processing Cloud Run job started! You will need to run this script with "
"--clean to clean up GCP environment after post-processing is complete.")

def clean_postprocessing_job(self):
jobs_client = run_v2.JobsClient()
logger.info("Cleaning post-processing Cloud Run job with "
f"job_identifier='{self.job_identifier}'; "
f"job name={self.postprocessing_job_name}...")
try:
wfg_args = self.cfg["workflow_generator"].get("args", {})
if self.cfg["workflow_generator"]["type"] == "residential_hpxml":
if "simulation_output_report" in wfg_args.keys():
if "timeseries_frequency" in wfg_args["simulation_output_report"].keys():
do_timeseries = wfg_args["simulation_output_report"]["timeseries_frequency"] != "none"
else:
do_timeseries = "timeseries_csv_export" in wfg_args.keys()
job = jobs_client.get_job(name=self.postprocessing_job_name)
except Exception:
logger.warning("Post-processing Cloud Run job not found for "
f"job_identifier='{self.job_identifier}' "
f"(postprocessing_job_name='{self.postprocessing_job_name}').")
return

fs = self.get_fs()
# Ask for confirmation to delete if it is not completed
if int(job.latest_created_execution.completion_time.timestamp()) == 0:
answer = input(
"Post-processing job does not appear to be completed. "
"Are you sure you want to cancel and delete it? (y/n) "
)
if answer[:1] not in ("y", "Y"):
return

# Delete execution first
executions_client = run_v2.ExecutionsClient()
try:
executions_client.cancel_execution(name=job.latest_created_execution.name)
except Exception:
logger.warning("Failed to cancel execution with"
f" name={job.latest_created_execution.name}.", exc_info=True)
logger.warning(f"You may want to try deleting the job via the console:"
f" {self.postprocessing_job_console_url}")
return

if not skip_combine:
postprocessing.combine_results(fs, self.results_dir, self.cfg, do_timeseries=do_timeseries)
# ... The job succeeded or its execution was deleted successfully; it can be deleted
try:
jobs_client.delete_job(name=self.postprocessing_job_name)
except Exception:
logger.warning("Failed to deleted post-processing Cloud Run job.", exc_info=True)
logger.info(f"Post-processing Cloud Run job deleted: '{self.postprocessing_job_name}'")

finally:
if use_dask_cluster:
self.cleanup_dask()

def upload_results(self, *args, **kwargs):
"""
Expand Down Expand Up @@ -888,6 +1014,12 @@ def main():
gcs_prefix = os.environ["GCS_PREFIX"]
job_name = os.environ["JOB_NAME"]
GcpBatch.run_task(task_index, job_name, gcs_bucket, gcs_prefix)
elif "POSTPROCESS" == os.environ.get("JOB_TYPE", ""):
gcs_bucket = os.environ["GCS_BUCKET"]
gcs_prefix = os.environ["GCS_PREFIX"]
results_dir = os.environ["RESULTS_DIR"]
do_timeseries = os.environ.get("DO_TIMESERIES", "False") == "True"
GcpBatch.run_combine_results_on_cloud(gcs_bucket, gcs_prefix, results_dir, do_timeseries)
lathanh marked this conversation as resolved.
Show resolved Hide resolved
else:
parser = argparse.ArgumentParser()
parser.add_argument("project_filename")
Expand Down Expand Up @@ -958,7 +1090,7 @@ def main():
batch.push_image()
batch.run_batch()
batch.process_results()
batch.clean()
# process_results is async, so don't do a clean (which would clean before it's done)


if __name__ == "__main__":
Expand Down
5 changes: 5 additions & 0 deletions buildstockbatch/schemas/v0.3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ gcp-spec:
batch_array_size: num(min=1, max=10000, required=True)
gcs: include('gcs-spec', required=True)
job_environment: include('gcp-job-environment-spec', required=False)
postprocessing_environment: include('gcp-postprocessing_environment-spec', required=False)

gcs-spec:
bucket: str(required=True)
Expand All @@ -43,6 +44,10 @@ gcp-job-environment-spec:
machine_type: str(required=False)
use_spot: bool(required=False)

gcp-postprocessing_environment-spec:
cpus: int(min=1, max=224, required=False)
memory_mib: int(min=512, required=False)

aws-spec:
job_identifier: regex('^[a-zA-Z]\w{,9}$', required=True)
s3: include('s3-aws-postprocessing-spec', required=True)
Expand Down
9 changes: 9 additions & 0 deletions docs/project_defn.rst
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,15 @@ on the `GCP Batch <https://cloud.google.com/batch>`_ service.
* ``use_spot``: true or false. Defaults to false if missing. This tells the project
to use `Spot VMs <https://cloud.google.com/spot-vms>`_ for data
simulations, which can reduce costs by up to 91%.
* ``postprocessing_environment``: Optional. Specifies the Cloud Run computing environment for
postprocessing.

* ``cpus``: `Number of CPUs`_ to use. Default: 2.
* ``memory_mib``: `Amount of RAM`_ needed in MiB. 2048 MiB per CPU is recommended. Default:
4096.

.. _Number of CPUs: https://cloud.google.com/run/docs/configuring/services/cpu
.. _Amount of RAM: https://cloud.google.com/run/docs/configuring/services/memory-limits

.. _postprocessing:

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"google-cloud-artifact-registry",
"google-cloud-batch",
"google-cloud-compute",
"google-cloud-run",
"google-cloud-storage",
"tqdm",
],
Expand Down
Loading