diff --git a/pysteps/blending/steps.py b/pysteps/blending/steps.py index 7e4535dd..a6ecc09f 100644 --- a/pysteps/blending/steps.py +++ b/pysteps/blending/steps.py @@ -46,6 +46,8 @@ import math import time from copy import deepcopy +from functools import partial +from multiprocessing.pool import ThreadPool import numpy as np from scipy.linalg import inv @@ -117,20 +119,29 @@ def forecast( ordered by timestamp from oldest to newest. The time steps between the inputs are assumed to be regular. precip_models: array-like - Array of shape (n_models,timesteps+1) containing, per timestep (t=0 to - lead time here) and per (NWP) model or model ensemble member, a - dictionary with a list of cascades obtained by calling a method - implemented in :py:mod:`pysteps.cascade.decomposition`. In case of one - (deterministic) model as input, add an extra dimension to make sure - precip_models is five dimensional prior to calling this function. - It is also possible to supply the original (NWP) model forecasts containing - rainfall fields as an array of shape (n_models,timestep+1,m,n), which will - then be decomposed in this function. Note that for an operational application - or for testing with multiple model runs, it is recommended to decompose - the model forecasts outside beforehand, as this reduces calculation times. + Either raw (NWP) model forecast data or decomposed (NWP) model forecast data. + If you supply decomposed data, it needs to be an array of shape + (n_models,timesteps+1) containing, per timestep (t=0 to lead time here) and + per (NWP) model or model ensemble member, a dictionary with a list of cascades + obtained by calling a method implemented in :py:mod:`pysteps.cascade.decomposition`. + If you supply the original (NWP) model forecast data, it needs to be an array of shape + (n_models,timestep+1,m,n) containing precipitation (or other) fields, which will + then be decomposed in this function. + + Depending on your use case it can be advantageous to decompose the model + forecasts outside beforehand, as this slightly reduces calculation times. This is possible with :py:func:`pysteps.blending.utils.decompose_NWP`, :py:func:`pysteps.blending.utils.compute_store_nwp_motion`, and - :py:func:`pysteps.blending.utils.load_NWP`. + :py:func:`pysteps.blending.utils.load_NWP`. However, if you have a lot of (NWP) model + members (e.g. 1 model member per nowcast member), this can lead to excessive memory + usage. + + To further reduce memory usage, both this array and the ``velocity_models`` array + can be given as float32. They will then be converted to float64 before computations + to minimize loss in precision. + + In case of one (deterministic) model as input, add an extra dimension to make sure + precip_models is four dimensional prior to calling this function. velocity: array-like Array of shape (2,m,n) containing the x- and y-components of the advection field. The velocities are assumed to represent one time step between the @@ -139,6 +150,10 @@ def forecast( Array of shape (n_models,timestep,2,m,n) containing the x- and y-components of the advection field for the (NWP) model field per forecast lead time. All values are required to be finite. + + To reduce memory usage, this array + can be given as float32. They will then be converted to float64 before computations + to minimize loss in precision. timesteps: int or list of floats Number of time steps to forecast or a list of time steps for which the forecasts are computed (relative to the input time step). The elements of @@ -557,22 +572,19 @@ def forecast( fft, ) - # 2.2 If necessary, decompose (NWP) model forecasts and stack cascades - ( - precip_models_cascade, - mu_models, - sigma_models, - precip_models_pm, - ) = _compute_cascade_decomposition_nwp( - precip_models, bp_filter, decompositor, recompositor, fft, domain - ) + # 2.2 If necessary, recompose (NWP) model forecasts + precip_models_cascade = None + + if precip_models.ndim != 4: + precip_models_cascade = precip_models + precip_models = _compute_cascade_recomposition_nwp(precip_models, recompositor) # 2.3 Check for zero input fields in the radar and NWP data. zero_precip_radar = blending.utils.check_norain(precip, precip_thr, norain_thr) # The norain fraction threshold used for nwp is the default value of 0.0, # since nwp does not suffer from clutter. zero_model_fields = blending.utils.check_norain( - precip_models_pm, precip_thr, norain_thr + precip_models, precip_thr, norain_thr ) if isinstance(timesteps, int): @@ -583,7 +595,7 @@ def forecast( timesteps = nowcast_utils.binned_timesteps(original_timesteps) timestep_type = "list" - # 2.3.1 If precip is below the norain threshold and precip_models_pm is zero, + # 2.3.1 If precip is below the norain threshold and precip_models is zero, # we consider it as no rain in the domain. # The forecast will directly return an array filled with the minimum # value present in precip (which equals zero rainfall in the used @@ -633,37 +645,41 @@ def forecast( # 2.3.3 If zero_precip_radar, make sure that precip_cascade does not contain # only nans or infs. If so, fill it with the zero value. if zero_precip_radar: - precip_cascade = np.nan_to_num( - precip_cascade, - copy=True, - nan=np.nanmin(precip_models_cascade), - posinf=np.nanmin(precip_models_cascade), - neginf=np.nanmin(precip_models_cascade), - ) - - # 2.3.4 Check if the NWP fields contain nans or infinite numbers. If so, - # fill these with the minimum value present in precip (corresponding to - # zero rainfall in the radar observations) - ( - precip_models_cascade, - precip_models_pm, - mu_models, - sigma_models, - ) = _fill_nans_infs_nwp_cascade( - precip_models_cascade, - precip_models_pm, - precip_cascade, - precip, - mu_models, - sigma_models, - ) + # Look for a timestep and member with rain so that we have a sensible decomposition + done = False + for t in timesteps: + if done: + break + for j in range(precip_models.shape[0]): + if not blending.utils.check_norain( + precip_models[j, t], precip_thr, norain_thr + ): + if precip_models_cascade is not None: + precip_cascade[~np.isfinite(precip_cascade)] = np.nanmin( + precip_models_cascade[j, t]["cascade_levels"] + ) + continue + precip_models_cascade_temp = decompositor( + precip_models[j, t, :, :], + bp_filter=bp_filter, + fft_method=fft, + output_domain=domain, + normalize=True, + compute_stats=True, + compact_output=True, + )["cascade_levels"] + precip_cascade[~np.isfinite(precip_cascade)] = np.nanmin( + precip_models_cascade_temp + ) + done = True + break # 2.3.5 If zero_precip_radar is True, only use the velocity field of the NWP # forecast. I.e., velocity (radar) equals velocity_model at the first time # step. if zero_precip_radar: # Use the velocity from velocity_models at time step 0 - velocity = velocity_models[:, 0, :, :, :] + velocity = velocity_models[:, 0, :, :, :].astype(np.float64, copy=False) # Take the average over the first axis, which corresponds to n_models # (hence, the model average) velocity = np.mean(velocity, axis=0) @@ -675,7 +691,7 @@ def forecast( # rainfall data if zero_precip_radar: precip_noise_input = _determine_max_nr_rainy_cells_nwp( - precip_models_pm, precip_thr, precip_models_pm.shape[0], timesteps + precip_models, precip_thr, precip_models.shape[0], timesteps ) # Make sure precip_noise_input is three dimensional if len(precip_noise_input.shape) != 3: @@ -809,23 +825,80 @@ def forecast( if measure_time: starttime = time.time() + if precip_models_cascade is not None: + decomp_precip_models = list(precip_models_cascade[:, t]) + else: + if precip_models.shape[0] == 1: + decomp_precip_models = [ + decompositor( + precip_models[0, t, :, :], + bp_filter=bp_filter, + fft_method=fft, + output_domain=domain, + normalize=True, + compute_stats=True, + compact_output=True, + ) + ] + else: + with ThreadPool(num_workers) as pool: + decomp_precip_models = pool.map( + partial( + decompositor, + bp_filter=bp_filter, + fft_method=fft, + output_domain=domain, + normalize=True, + compute_stats=True, + compact_output=True, + ), + list(precip_models[:, t, :, :]), + ) + + precip_models_cascade_temp = np.array( + [decomp["cascade_levels"] for decomp in decomp_precip_models] + ) + mu_models_temp = np.array( + [decomp["means"] for decomp in decomp_precip_models] + ) + sigma_models_temp = np.array( + [decomp["stds"] for decomp in decomp_precip_models] + ) + + # 2.3.4 Check if the NWP fields contain nans or infinite numbers. If so, + # fill these with the minimum value present in precip (corresponding to + # zero rainfall in the radar observations) + ( + precip_models_cascade_temp, + precip_models_temp, + mu_models_temp, + sigma_models_temp, + ) = _fill_nans_infs_nwp_cascade( + precip_models_cascade_temp, + precip_models[:, t, :, :].astype(np.float64, copy=False), + precip_cascade, + precip, + mu_models_temp, + sigma_models_temp, + ) + # 8.1.1 Before calling the worker for the forecast loop, determine which (NWP) # models will be combined with which nowcast ensemble members. With the # way it is implemented at this moment: n_ens_members of the output equals # the maximum number of (ensemble) members in the input (either the nowcasts or NWP). ( precip_models_cascade_temp, - precip_models_pm_temp, + precip_models_temp, velocity_models_temp, mu_models_temp, sigma_models_temp, n_model_indices, ) = _find_nwp_combination( - precip_models_cascade[:, t, :, :, :], - precip_models_pm[:, t, :, :], - velocity_models[:, t, :, :, :], - mu_models[:, t, :], - sigma_models[:, t, :], + precip_models_cascade_temp, + precip_models_temp, + velocity_models[:, t, :, :, :].astype(np.float64, copy=False), + mu_models_temp, + sigma_models_temp, n_ens_members, ar_order, n_cascade_levels, @@ -1421,7 +1494,7 @@ def worker(j): R_pm_stacked = np.concatenate( ( R_pm_ep[None, t_index], - precip_models_pm_temp, + precip_models_temp, ), axis=0, ) @@ -1429,7 +1502,7 @@ def worker(j): R_pm_stacked = np.concatenate( ( R_pm_ep[None, t_index], - precip_models_pm_temp[None, j], + precip_models_temp[None, j], ), axis=0, ) @@ -1446,11 +1519,11 @@ def worker(j): weights_pm_normalized_mod_only.reshape( weights_pm_normalized_mod_only.shape[0], 1, 1 ) - * precip_models_pm_temp, + * precip_models_temp, axis=0, ) else: - R_pm_blended_mod_only = precip_models_pm_temp[j] + R_pm_blended_mod_only = precip_models_temp[j] # The extrapolation components are NaN outside the advected # radar domain. This results in NaN values in the blended @@ -1541,7 +1614,7 @@ def worker(j): # that for the probability matching. if probmatching_method is not None and resample_distribution: arr1 = R_pm_ep[t_index] - arr2 = precip_models_pm_temp[j] + arr2 = precip_models_temp[j] # resample weights based on cascade level 2. # Areas where one of the fields is nan are not included. R_pm_resampled = probmatching.resample_distributions( @@ -2043,78 +2116,24 @@ def _compute_cascade_decomposition_radar( return R_c, mu_extrapolation, sigma_extrapolation -def _compute_cascade_decomposition_nwp( - precip_models, bp_filter, decompositor, recompositor, fft, domain -): - """If necessary, decompose (NWP) model forecasts and stack cascades.""" - if precip_models.ndim == 4: - # Keep the model fields for the probability matching later on - precip_models_pm = precip_models.copy() - # Decompose the (NWP) model forecasts - precip_models_cascade = [] - # Loop through the n_models - for i in range(precip_models.shape[0]): - precip_model_cascade = [] - # Loop through the time steps - for j in range(precip_models.shape[1]): - precip_model_cascade.append( - decompositor( - field=precip_models[i, j, :, :], - bp_filter=bp_filter, - fft_method=fft, - output_domain=domain, - normalize=True, - compute_stats=True, - compact_output=True, - ) - ) - precip_models_cascade.append(precip_model_cascade) - precip_models_cascade = np.array(precip_models_cascade) - - precip_models, precip_model_cascade = None, None - else: - precip_models_cascade = precip_models - precip_models_pm = None +def _compute_cascade_recomposition_nwp(precip_models_cascade, recompositor): + """If necessary, recompose (NWP) model forecasts.""" + precip_models = None - # Stack the (NWP) model cascades in separate normalized cascades and return - # the means and sigmas. - # The normalized model cascade should have the shape: - # [n_models, n_timesteps, n_cascade_levels, m, n] + # Recompose the (NWP) model cascades to have rainfall fields per + # model and time step, which will be used in the probability matching steps. + # Recomposed cascade will have shape: [n_models, n_timesteps, m, n] precip_models = [] - mu_models = [] - sigma_models = [] - # Stack it per model and combine that for i in range(precip_models_cascade.shape[0]): - precip_model, mu_model, sigma_model = blending.utils.stack_cascades( - R_d=precip_models_cascade[i, :], donorm=False - ) + precip_model = [] + for time_step in range(precip_models_cascade.shape[1]): + precip_model.append(recompositor(precip_models_cascade[i, time_step])) precip_models.append(precip_model) - mu_models.append(mu_model) - sigma_models.append(sigma_model) precip_models = np.stack(precip_models) - mu_models = np.stack(mu_models) - sigma_models = np.stack(sigma_models) - - precip_model, mu_model, sigma_model = None, None, None - - if precip_models_pm is None: - # Finally, recompose the (NWP) model cascades to have rainfall fields per - # model and time step, which will be used in the probability matching steps. - # Recomposed cascade will have shape: [n_models, n_timesteps, m, n] - precip_models_pm = [] - for i in range(precip_models_cascade.shape[0]): - precip_model_pm = [] - for time_step in range(precip_models_cascade.shape[1]): - precip_model_pm.append( - recompositor(precip_models_cascade[i, time_step]) - ) - precip_models_pm.append(precip_model_pm) - - precip_models_pm = np.stack(precip_models_pm) - precip_model_pm = None + precip_model = None - return precip_models, mu_models, sigma_models, precip_models_pm + return precip_models def _estimate_ar_parameters_radar( @@ -2434,7 +2453,7 @@ def _init_noise_cascade( def _fill_nans_infs_nwp_cascade( precip_models_cascade, - precip_models_pm, + precip_models, precip_cascade, precip, mu_models, @@ -2446,34 +2465,30 @@ def _fill_nans_infs_nwp_cascade( min_cascade = np.nanmin(precip_cascade) min_precip = np.nanmin(precip) precip_models_cascade[~np.isfinite(precip_models_cascade)] = min_cascade - precip_models_pm[~np.isfinite(precip_models_pm)] = min_precip + precip_models[~np.isfinite(precip_models)] = min_precip # Also set any nans or infs in the mean and sigma of the cascade to # respectively 0.0 and 1.0 mu_models[~np.isfinite(mu_models)] = 0.0 sigma_models[~np.isfinite(sigma_models)] = 0.0 - return precip_models_cascade, precip_models_pm, mu_models, sigma_models + return precip_models_cascade, precip_models, mu_models, sigma_models -def _determine_max_nr_rainy_cells_nwp( - precip_models_pm, precip_thr, n_models, timesteps -): +def _determine_max_nr_rainy_cells_nwp(precip_models, precip_thr, n_models, timesteps): """Initialize noise based on the NWP field time step where the fraction of rainy cells is highest""" if precip_thr is None: - precip_thr = np.nanmin(precip_models_pm) + precip_thr = np.nanmin(precip_models) max_rain_pixels = -1 max_rain_pixels_j = -1 max_rain_pixels_t = -1 for j in range(n_models): for t in timesteps: - rain_pixels = precip_models_pm[j][t][ - precip_models_pm[j][t] > precip_thr - ].size + rain_pixels = precip_models[j][t][precip_models[j][t] > precip_thr].size if rain_pixels > max_rain_pixels: max_rain_pixels = rain_pixels max_rain_pixels_j = j max_rain_pixels_t = t - precip_noise_input = precip_models_pm[max_rain_pixels_j][max_rain_pixels_t] + precip_noise_input = precip_models[max_rain_pixels_j][max_rain_pixels_t] - return precip_noise_input + return precip_noise_input.astype(np.float64, copy=False)