diff --git a/openquake/calculators/base.py b/openquake/calculators/base.py index 746636153035..e3482190b6d9 100644 --- a/openquake/calculators/base.py +++ b/openquake/calculators/base.py @@ -26,6 +26,7 @@ import logging import operator import traceback +import getpass from datetime import datetime from shapely import wkt import psutil @@ -71,6 +72,9 @@ ('min', F32), ('max', F32), ('len', U16)]) +USER = getpass.getuser() +MB = 1024 ** 2 + def get_aelo_changelog(): dic = collections.defaultdict(list) @@ -1544,3 +1548,69 @@ def run_calc(job_ini, **kw): calc = calculators(log.get_oqparam(), log.calc_id) calc.run() return calc + + +def expose_outputs(dstore, owner=USER, status='complete'): + """ + Build a correspondence between the outputs in the datastore and the + ones in the database. + + :param dstore: datastore + """ + oq = dstore['oqparam'] + exportable = set(ekey[0] for ekey in exp) + calcmode = oq.calculation_mode + dskeys = set(dstore) & exportable # exportable datastore keys + dskeys.add('fullreport') + if 'avg_gmf' in dskeys: + dskeys.remove('avg_gmf') # hide + rlzs = dstore['full_lt'].rlzs + if len(rlzs) > 1: + dskeys.add('realizations') + hdf5 = dstore.hdf5 + if 'hcurves-stats' in hdf5 or 'hcurves-rlzs' in hdf5: + if oq.hazard_stats() or oq.individual_rlzs or len(rlzs) == 1: + dskeys.add('hcurves') + if oq.uniform_hazard_spectra: + dskeys.add('uhs') # export them + if oq.hazard_maps: + dskeys.add('hmaps') # export them + if len(rlzs) > 1 and not oq.collect_rlzs: + if 'aggrisk' in dstore: + dskeys.add('aggrisk-stats') + if 'aggcurves' in dstore: + dskeys.add('aggcurves-stats') + if not oq.individual_rlzs: + for out in ['avg_losses-rlzs', 'aggrisk', 'aggcurves']: + if out in dskeys: + dskeys.remove(out) + if 'curves-rlzs' in dstore and len(rlzs) == 1: + dskeys.add('loss_curves-rlzs') + if 'curves-stats' in dstore and len(rlzs) > 1: + dskeys.add('loss_curves-stats') + if oq.conditional_loss_poes: # expose loss_maps outputs + if 'loss_curves-stats' in dstore: + dskeys.add('loss_maps-stats') + if 'ruptures' in dskeys: + if 'scenario' in calcmode or len(dstore['ruptures']) == 0: + # do not export, as requested by Vitor + exportable.remove('ruptures') + else: + dskeys.add('event_based_mfd') + if 'hmaps' in dskeys and not oq.hazard_maps: + dskeys.remove('hmaps') # do not export the hazard maps + if logs.dbcmd('get_job', dstore.calc_id) is None: + # the calculation has not been imported in the db yet + logs.dbcmd('import_job', dstore.calc_id, oq.calculation_mode, + oq.description + ' [parent]', owner, status, + oq.hazard_calculation_id, dstore.datadir) + keysize = [] + for key in sorted(dskeys & exportable): + try: + size_mb = dstore.getsize(key) / MB + except (KeyError, AttributeError): + size_mb = -1 + if size_mb: + keysize.append((key, size_mb)) + ds_size = dstore.getsize() / MB + logs.dbcmd('create_outputs', dstore.calc_id, keysize, ds_size) diff --git a/openquake/calculators/event_based.py b/openquake/calculators/event_based.py index fea4ff86cb49..d252f4df90e1 100644 --- a/openquake/calculators/event_based.py +++ b/openquake/calculators/event_based.py @@ -28,7 +28,6 @@ from openquake.baselib import config, hdf5, parallel, python3compat from openquake.baselib.general import ( AccumDict, humansize, groupby, block_splitter) -from openquake.engine.aristotle import get_close_mosaic_models from openquake.hazardlib.geo.packager import fiona from openquake.hazardlib.map_array import MapArray, get_mean_curve from openquake.hazardlib.stats import geom_avg_std, compute_stats @@ -45,14 +44,14 @@ from openquake.commonlib import util, logs, readinput, datastore from openquake.commonlib.calc import ( gmvs_to_poes, make_hmaps, slice_dt, build_slice_by_event, RuptureImporter, - SLICE_BY_EVENT_NSITES) + SLICE_BY_EVENT_NSITES, get_close_mosaic_models) from openquake.risklib.riskinput import str2rsi, rsi2str from openquake.calculators import base, views from openquake.calculators.getters import get_rupture_getters, sig_eps_dt from openquake.calculators.classical import ClassicalCalculator from openquake.calculators.extract import Extractor from openquake.calculators.postproc.plots import plot_avg_gmf -from openquake.engine import engine +from openquake.calculators.base import expose_outputs from PIL import Image U8 = numpy.uint8 @@ -809,7 +808,7 @@ def post_execute(self, dummy): # source model, however usually this is quite fast and # does not dominate the computation self.cl.run() - engine.expose_outputs(self.cl.datastore) + expose_outputs(self.cl.datastore) all = slice(None) for imt in oq.imtls: cl_mean_curves = get_mean_curve( diff --git a/openquake/calculators/post_risk.py b/openquake/calculators/post_risk.py index 6714266818e0..0702faf7ccd3 100644 --- a/openquake/calculators/post_risk.py +++ b/openquake/calculators/post_risk.py @@ -26,8 +26,8 @@ from openquake.baselib import general, parallel, python3compat from openquake.commonlib import datastore, logs from openquake.risklib import asset, scientific, reinsurance -from openquake.engine import engine from openquake.calculators import base, views +from openquake.calculators.base import expose_outputs U8 = numpy.uint8 F32 = numpy.float32 @@ -659,4 +659,4 @@ def post_aggregate(calc_id: int, aggregate_by): parallel.Starmap.init() prc = PostRiskCalculator(oqp, log.calc_id) prc.run(aggregate_by=[aggby]) - engine.expose_outputs(prc.datastore) + expose_outputs(prc.datastore) diff --git a/openquake/commands/importcalc.py b/openquake/commands/importcalc.py index 17655cd3e910..97569ab9b7ae 100644 --- a/openquake/commands/importcalc.py +++ b/openquake/commands/importcalc.py @@ -20,7 +20,7 @@ import logging from openquake.commonlib import logs, datastore from openquake.calculators.extract import WebExtractor -from openquake.engine import engine +from openquake.calculators.base import expose_outputs def main(calc_id): @@ -51,7 +51,7 @@ def main(calc_id): webex.close() with datastore.read(calc_id) as dstore: pprint.pprint(dstore.get_attrs('/')) - engine.expose_outputs(dstore, status='complete') + expose_outputs(dstore, status='complete') logging.info('Imported calculation %s successfully', calc_id) diff --git a/openquake/commands/tests/independence_test.py b/openquake/commands/tests/independence_test.py index a6e6ac11fd11..016a39e0e625 100644 --- a/openquake/commands/tests/independence_test.py +++ b/openquake/commands/tests/independence_test.py @@ -50,6 +50,7 @@ def test_commonlib(self): def test_engine(self): assert_independent('openquake.engine', 'openquake.server') + assert_independent('openquake.calculators', 'openquake.engine') class CaseConsistencyTestCase(unittest.TestCase): diff --git a/openquake/commonlib/calc.py b/openquake/commonlib/calc.py index ca04d555ab8e..e2e1af7389c3 100644 --- a/openquake/commonlib/calc.py +++ b/openquake/commonlib/calc.py @@ -20,12 +20,13 @@ import operator import functools import numpy +from shapely.geometry import Point from openquake.baselib import performance, parallel, hdf5, general from openquake.hazardlib.source import rupture -from openquake.hazardlib import map_array +from openquake.hazardlib import map_array, geo from openquake.hazardlib.source.rupture import get_events -from openquake.commonlib import util +from openquake.commonlib import util, readinput TWO16 = 2 ** 16 TWO24 = 2 ** 24 @@ -49,6 +50,7 @@ # ############## utilities for the classical calculator ############### # + # used only in the view global_hcurves def convert_to_array(pmap, nsites, imtls, inner_idx=0): """ @@ -218,7 +220,7 @@ def import_rups_events(self, rup_array, get_rupture_getters): ne, nr, int(eff_time), mag)) def _save_events(self, rup_array, rgetters): - oq = self.oqparam + oq = self.oqparam # this is very fast compared to saving the ruptures E = rup_array['n_occ'].sum() events = numpy.zeros(E, rupture.events_dt) @@ -430,7 +432,7 @@ def starmap_from_gmfs(task_func, oq, dstore, mon): slices.append(get_slices(sbe[slc], data, num_assets)) slices = numpy.concatenate(slices, dtype=slices[0].dtype) dstore.swmr_on() - maxw = slices['weight'].sum()/ (oq.concurrent_tasks or 1) or 1. + maxw = slices['weight'].sum() / (oq.concurrent_tasks or 1) or 1. logging.info('maxw = {:_d}'.format(int(maxw))) smap = parallel.Starmap.apply( task_func, (slices, oq, ds), @@ -439,3 +441,31 @@ def starmap_from_gmfs(task_func, oq, dstore, mon): weight=operator.itemgetter('weight'), h5=dstore.hdf5) return smap + + +def get_close_mosaic_models(lon, lat, buffer_radius): + """ + :param lon: longitude + :param lat: latitude + :param buffer_radius: radius of the buffer around the point. + This distance is in the same units as the point's + coordinates (i.e. degrees), and it defines how far from + the point the buffer should extend in all directions, + creating a circular buffer region around the point + :returns: list of mosaic models intersecting the circle + centered on the given coordinates having the specified radius + """ + mosaic_df = readinput.read_mosaic_df(buffer=1) + hypocenter = Point(lon, lat) + hypo_buffer = hypocenter.buffer(buffer_radius) + geoms = numpy.array([hypo_buffer]) + [close_mosaic_models] = geo.utils.geolocate_geometries(geoms, mosaic_df) + if not close_mosaic_models: + raise ValueError( + f'({lon}, {lat}) is farther than {buffer_radius} deg' + f' from any mosaic model!') + elif len(close_mosaic_models) > 1: + logging.info( + '(%s, %s) is closer than %s deg with respect to the following' + ' mosaic models: %s' % (lon, lat, buffer_radius, close_mosaic_models)) + return close_mosaic_models diff --git a/openquake/engine/aristotle.py b/openquake/engine/aristotle.py index a5d6fb40cd36..0bddf2225a7f 100644 --- a/openquake/engine/aristotle.py +++ b/openquake/engine/aristotle.py @@ -24,14 +24,14 @@ import logging from dataclasses import dataclass import numpy -from shapely.geometry import Point from json.decoder import JSONDecodeError from urllib.error import HTTPError from openquake.baselib import config, hdf5, sap -from openquake.hazardlib import geo, nrml, sourceconverter +from openquake.hazardlib import nrml, sourceconverter from openquake.hazardlib.shakemap.parsers import ( download_rupture_dict, download_station_data_file) from openquake.commonlib import readinput +from openquake.commonlib.calc import get_close_mosaic_models from openquake.engine import engine CDIR = os.path.dirname(__file__) # openquake/engine @@ -55,34 +55,6 @@ class AristotleParam: ignore_shakemap: bool = False -def get_close_mosaic_models(lon, lat, buffer_radius): - """ - :param lon: longitude - :param lat: latitude - :param buffer_radius: radius of the buffer around the point. - This distance is in the same units as the point's - coordinates (i.e. degrees), and it defines how far from - the point the buffer should extend in all directions, - creating a circular buffer region around the point - :returns: list of mosaic models intersecting the circle - centered on the given coordinates having the specified radius - """ - mosaic_df = readinput.read_mosaic_df(buffer=1) - hypocenter = Point(lon, lat) - hypo_buffer = hypocenter.buffer(buffer_radius) - geoms = numpy.array([hypo_buffer]) - [close_mosaic_models] = geo.utils.geolocate_geometries(geoms, mosaic_df) - if not close_mosaic_models: - raise ValueError( - f'({lon}, {lat}) is farther than {buffer_radius} deg' - f' from any mosaic model!') - elif len(close_mosaic_models) > 1: - logging.info( - '(%s, %s) is closer than %s deg with respect to the following' - ' mosaic models: %s' % (lon, lat, buffer_radius, close_mosaic_models)) - return close_mosaic_models - - def get_trts_around(mosaic_model, exposure_hdf5): """ :returns: list of TRTs for the given mosaic model diff --git a/openquake/engine/engine.py b/openquake/engine/engine.py index 0eadbe8fda83..6477a0bb568f 100644 --- a/openquake/engine/engine.py +++ b/openquake/engine/engine.py @@ -46,7 +46,8 @@ def setproctitle(title): from openquake.baselib import parallel, general, config, slurm, workerpool as w from openquake.commonlib.oqvalidation import OqParam from openquake.commonlib import readinput, logs -from openquake.calculators import base, export +from openquake.calculators import base +from openquake.calculators.base import expose_outputs USER = getpass.getuser() @@ -91,72 +92,6 @@ def set_concurrent_tasks_default(calc): logging.warning('Using %d %s workers', num_workers, dist) -def expose_outputs(dstore, owner=USER, status='complete'): - """ - Build a correspondence between the outputs in the datastore and the - ones in the database. - - :param dstore: datastore - """ - oq = dstore['oqparam'] - exportable = set(ekey[0] for ekey in export.export) - calcmode = oq.calculation_mode - dskeys = set(dstore) & exportable # exportable datastore keys - dskeys.add('fullreport') - if 'avg_gmf' in dskeys: - dskeys.remove('avg_gmf') # hide - rlzs = dstore['full_lt'].rlzs - if len(rlzs) > 1: - dskeys.add('realizations') - hdf5 = dstore.hdf5 - if 'hcurves-stats' in hdf5 or 'hcurves-rlzs' in hdf5: - if oq.hazard_stats() or oq.individual_rlzs or len(rlzs) == 1: - dskeys.add('hcurves') - if oq.uniform_hazard_spectra: - dskeys.add('uhs') # export them - if oq.hazard_maps: - dskeys.add('hmaps') # export them - if len(rlzs) > 1 and not oq.collect_rlzs: - if 'aggrisk' in dstore: - dskeys.add('aggrisk-stats') - if 'aggcurves' in dstore: - dskeys.add('aggcurves-stats') - if not oq.individual_rlzs: - for out in ['avg_losses-rlzs', 'aggrisk', 'aggcurves']: - if out in dskeys: - dskeys.remove(out) - if 'curves-rlzs' in dstore and len(rlzs) == 1: - dskeys.add('loss_curves-rlzs') - if 'curves-stats' in dstore and len(rlzs) > 1: - dskeys.add('loss_curves-stats') - if oq.conditional_loss_poes: # expose loss_maps outputs - if 'loss_curves-stats' in dstore: - dskeys.add('loss_maps-stats') - if 'ruptures' in dskeys: - if 'scenario' in calcmode or len(dstore['ruptures']) == 0: - # do not export, as requested by Vitor - exportable.remove('ruptures') - else: - dskeys.add('event_based_mfd') - if 'hmaps' in dskeys and not oq.hazard_maps: - dskeys.remove('hmaps') # do not export the hazard maps - if logs.dbcmd('get_job', dstore.calc_id) is None: - # the calculation has not been imported in the db yet - logs.dbcmd('import_job', dstore.calc_id, oq.calculation_mode, - oq.description + ' [parent]', owner, status, - oq.hazard_calculation_id, dstore.datadir) - keysize = [] - for key in sorted(dskeys & exportable): - try: - size_mb = dstore.getsize(key) / MB - except (KeyError, AttributeError): - size_mb = -1 - if size_mb: - keysize.append((key, size_mb)) - ds_size = dstore.getsize() / MB - logs.dbcmd('create_outputs', dstore.calc_id, keysize, ds_size) - - class MasterKilled(KeyboardInterrupt): "Exception raised when a job is killed manually" @@ -326,7 +261,7 @@ def stop_workers(job_id): """ print(w.WorkerMaster(job_id).stop()) - + def run_jobs(jobctxs, concurrent_jobs=None, nodes=1, sbatch=False, precalc=False): """ Run jobs using the specified config file and other options. @@ -385,7 +320,7 @@ def run_jobs(jobctxs, concurrent_jobs=None, nodes=1, sbatch=False, precalc=False 'start_time': datetime.utcnow()} logs.dbcmd('update_job', job.calc_id, dic) try: - if dist in ('zmq', 'slurm') and w.WorkerMaster(job_id).status() == []: + if dist in ('zmq', 'slurm') and w.WorkerMaster(job_id).status() == []: start_workers(job_id, dist, nodes) # run the jobs sequentially or in parallel, with slurm or without diff --git a/openquake/server/views.py b/openquake/server/views.py index 502e7862ed84..c8152761ab13 100644 --- a/openquake/server/views.py +++ b/openquake/server/views.py @@ -49,6 +49,7 @@ from openquake.baselib.general import groupby, gettemp, zipfiles, mp from openquake.hazardlib import nrml, gsim, valid from openquake.commonlib import readinput, oqvalidation, logs, datastore, dbapi +from openquake.commonlib.calc import get_close_mosaic_models from openquake.calculators import base, views from openquake.calculators.getters import NotFound from openquake.calculators.export import export @@ -61,8 +62,7 @@ from openquake.engine.export.core import DataStoreExportError from openquake.hazardlib.shakemap.parsers import download_station_data_file from openquake.engine.aristotle import ( - get_close_mosaic_models, get_trts_around, get_aristotle_params, - get_rupture_dict) + get_trts_around, get_aristotle_params, get_rupture_dict) from openquake.server import utils from django.conf import settings diff --git a/utils/csm2rup b/utils/csm2rup index 71e855477d50..e045f5e4d729 100755 --- a/utils/csm2rup +++ b/utils/csm2rup @@ -7,9 +7,8 @@ import logging import operator import numpy from openquake.baselib import sap, parallel, general, hdf5 -from openquake.engine.engine import expose_outputs - from openquake.calculators import base +from openquake.calculators.base import expose_outputs rupfields = dict( mag=numpy.float32,