diff --git a/src/bathymetric_adjustment.py b/src/bathymetric_adjustment.py index d21680415..26c2a8b73 100644 --- a/src/bathymetric_adjustment.py +++ b/src/bathymetric_adjustment.py @@ -13,15 +13,17 @@ import geopandas as gpd import glob import pandas as pd +import numpy as np import matplotlib.pyplot as plt -from synthesize_test_cases import progress_bar_handler +# ############################################from synthesize_test_cases import progress_bar_handler path_aib_bathy_parquet = "/efs-drives/fim-dev-efs/fim-home/heidi.safa/aib_bathy_adjustment/data/ml_outputs_v1.01.parquet" fim_dir = "/efs-drives/fim-dev-efs/fim-home/heidi.safa/aib_bathy_adjustment/data/fim_4_4_15_0/" huc = "07130003" +bathy_file = "/efs-drives/fim-dev-efs/fim-home/bathymetry_processing/bathymetry_illinois.gpkg" # ------------------------------------------------------- -def correct_rating_for_ai_based_bathymetry(path_aib_bathy_parquet, fim_dir, huc, ploting_flag): +def correct_rating_for_ai_based_bathymetry(path_aib_bathy_parquet, fim_dir, huc): # Load AI-based bathymetry data ml_bathy_data = pd.read_parquet(path_aib_bathy_parquet, engine='pyarrow') # @@ -64,6 +66,9 @@ def correct_rating_for_ai_based_bathymetry(path_aib_bathy_parquet, fim_dir, huc, aib_bathy_data_gdf = aib_bathy_data_gdf.rename(columns={'COMID': 'feature_id'}) aib_bathy_data_gdf = aib_bathy_data_gdf.rename(columns={'owp_inchan_channel_area': 'missing_xs_area_m2'}) + feature_id_aib = aib_bathy_data_gdf["feature_id"].drop_duplicates(keep="first") + feature_id_aib.index = range(len(feature_id_aib)) + # Calculating missing_wet_perimeter_m and adding it to aib_bathy_data_gdf missing_wet_perimeter_m = aib_bathy_data_gdf['owp_inchan_channel_perimeter'] - aib_bathy_data_gdf['owp_tw_inchan'] aib_bathy_data_gdf['missing_wet_perimeter_m'] = missing_wet_perimeter_m @@ -72,35 +77,22 @@ def correct_rating_for_ai_based_bathymetry(path_aib_bathy_parquet, fim_dir, huc, log_text = f'Calculating bathymetry adjustment: {huc}\n' # Get src_full from each branch - src_all_branches_path = [] - branches = os.listdir(join(fim_huc_dir, 'branches')) - for branch in branches: - src_full = join(fim_huc_dir, 'branches', str(branch), f'src_full_crosswalked_{branch}.csv') - if os.path.isfile(src_full): - src_all_branches_path.append(src_full) + processing_files = join(fim_huc_dir, "src_processing", "original_srcs") + src_all_branches_path = glob.glob(join(processing_files, "*.csv")) - # Make a copy of original srcs - destination_dir = join(fim_huc_dir, "original_srcs") - os.mkdir(destination_dir) - for file_path in src_all_branches_path: - shutil.copy(file_path, destination_dir) - - # make a directory for ai-based bathy adjusted SRCs - path_aib_src = join(fim_huc_dir, "aib_srcs") + # Make a directory for ai-based bathy adjusted SRCs + path_aib_src = join(join(fim_huc_dir, "src_processing"), "aib_srcs") os.mkdir(path_aib_src) # Update src parameters with bathymetric data for src in src_all_branches_path: src_df = pd.read_csv(src) if 'Bathymetry_source' in src_df.columns: src_df = src_df.drop(columns='Bathymetry_source') - branch = re.search(r'branches/(\d{10}|0)/', src).group()[9:-1] + + src_name = os.path.basename(src) + branch = src_name.split(".")[0].split("_")[-1] log_text += f' Branch: {branch}\n' - # # testing parameters - # fids = src_df['feature_id'].drop_duplicates(keep = 'first') - # aib_bathy = aib_bathy_data_gdf[['feature_id', 'missing_xs_area_m2', 'missing_wet_perimeter_m']] - # aib_bathy_branch = aib_bathy[aib_bathy['feature_id'].isin(fids)] - # Merge in missing bathy data and fill Nans try: src_df = src_df.merge( @@ -151,22 +143,18 @@ def correct_rating_for_ai_based_bathymetry(path_aib_bathy_parquet, fim_dir, huc, src_df.loc[src_df['Stage'] == 0, ['Discharge (m3s-1)']] = 0 # Calculate number of adjusted HydroIDs - if ploting_flag == "True": - # Write src in ai-based file - src_name = os.path.basename(src) - path2save = join(path_aib_src, src_name) - src_df.to_csv(path2save, index=False) - - else: - # Write src back to file - src_df.to_csv(src, index=False) + # Write src in ai-based file + path2save = join(path_aib_src, src_name) + src_df.to_csv(path2save, index=False) + + return feature_id_aib def plot_aib_original_src(fim_dir, huc): fim_huc_dir = join(fim_dir, huc) - path_aib = join(fim_huc_dir, "aib_srcs") - path_original = join(fim_huc_dir, "original_srcs") + path_aib = join(fim_huc_dir, "src_processing", "aib_srcs") + path_original = join(fim_huc_dir, "src_processing", "original_srcs") csv_files_aib = glob.glob(join(path_aib,'*.csv')) # Plot original srcs and ai-based srcs @@ -219,10 +207,9 @@ def plot_aib_original_src(fim_dir, huc): def plot_ehydro_aib_original_srcs(fim_dir, huc): fim_huc_dir = join(fim_dir, huc) - path_aib = join(fim_huc_dir, "aib_srcs") - path_ehydro = join(fim_huc_dir, "ehydro_srcs") - path_original = join(fim_huc_dir, "original_srcs") - # csv_files_aib = glob.glob(join(path_aib,'*.csv')) + path_aib = join(fim_huc_dir, "src_processing", "aib_srcs") + path_ehydro = join(fim_huc_dir, "src_processing", "ehydro_srcs") + path_original = join(fim_huc_dir, "src_processing", "original_srcs") csv_files_ehydro = glob.glob(join(path_ehydro,'*.csv')) # Plot original srcs and ai-based and ehydro srcs @@ -280,7 +267,7 @@ def plot_ehydro_aib_original_srcs(fim_dir, huc): # ------------------------------------------------------- # Adjusting synthetic rating curves using 'USACE eHydro' bathymetry data -def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, ploting_flag, verbose): +def correct_rating_for_ehydro_bathymetry(fim_dir, huc, bathy_file, verbose): """Function for correcting synthetic rating curves. It will correct each branch's SRCs in serial based on the feature_ids in the input bathy_file. @@ -293,9 +280,6 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, ploting_flag, verbos bathy_file : str Path to bathymetric adjustment geopackage, e.g. "/data/inputs/bathymetry/bathymetry_adjustment_data.gpkg". - verbose : bool - Verbose printing. - Returns ---------- log_text : str @@ -309,6 +293,8 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, ploting_flag, verbos wbd8_clp = gpd.read_file(join(fim_huc_dir, 'wbd8_clp.gpkg'), engine="pyogrio", use_arrow=True) bathy_data = gpd.read_file(bathy_file, mask=wbd8_clp, engine="fiona") bathy_data = bathy_data.rename(columns={'ID': 'feature_id'}) + feature_id_ehydro = bathy_data["feature_id"].drop_duplicates(keep="first") + feature_id_ehydro.index = range(len(feature_id_ehydro)) # Get src_full from each branch src_all_branches = [] @@ -317,14 +303,8 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, ploting_flag, verbos src_full = join(fim_huc_dir, 'branches', str(branch), f'src_full_crosswalked_{branch}.csv') if os.path.isfile(src_full): src_all_branches.append(src_full) - - # Make a copy of original srcs - destination_dir = join(fim_huc_dir, "original_srcs") - os.mkdir(destination_dir) - for file_path in src_all_branches: - shutil.copy(file_path, destination_dir) - path_ehydro_src = join(fim_huc_dir, "ehydro_srcs") + path_ehydro_src = join(join(fim_huc_dir, "src_processing"), "ehydro_srcs") os.mkdir(path_ehydro_src) # Update src parameters with bathymetric data for src in src_all_branches: @@ -335,7 +315,7 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, ploting_flag, verbos log_text += f' Branch: {branch}\n' if bathy_data.empty: - log_text += ' There were no bathymetry feature_ids for this branch' + log_text += ' There were no eHydro bathymetry feature_ids for this branch' src_df['Bathymetry_source'] = [""] * len(src_df) src_df.to_csv(src, index=False) return log_text @@ -359,6 +339,7 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, ploting_flag, verbos 'Bathymetry_source' ].first() src_df = src_df.merge(reconciled_bathy_data, on='feature_id', how='left', validate='many_to_one') + # Exit if there are no recalculations to be made if ~src_df['Bathymetry_source'].any(axis=None): log_text += ' No matching feature_ids in this branch\n' @@ -389,18 +370,17 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, ploting_flag, verbos # Calculate number of adjusted HydroIDs count = len(src_df.loc[(src_df['Stage'] == 0) & (src_df['Bathymetry_source'] == 'USACE eHydro')]) - if ploting_flag == "True": - # Write src in ai-based file - src_name = os.path.basename(src) - path2save = join(path_ehydro_src, src_name) - src_df.to_csv(path2save, index=False) - else: - # Write src back to file - src_df.to_csv(src, index=False) + # Write src in ai-based file + src_name = os.path.basename(src) + path2save = join(path_ehydro_src, src_name) + src_df.to_csv(path2save, index=False) + + # Write src back to file + src_df.to_csv(src, index=False) log_text += f' Successfully recalculated {count} HydroIDs\n' - return log_text + return [feature_id_ehydro, log_text] def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, number_of_jobs, verbose): @@ -490,32 +470,71 @@ def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, numb log_file.write('TOTAL RUN TIME: ' + str(tot_run_time)) log_file.close() -# wbd_buffer = 50 -# bathy_file = "/efs-drives/fim-dev-efs/fim-home/bathymetry_processing/bathymetry_illinois.gpkg" -# output_suffix = "eHydro" -# number_of_jobs = 6 -# verbose = False +verbose = False # correct_rating_for_bathymetry(fim_dir, huc, bathy_file, verbose) +bathymetry_source = "USACE eHydro" -def apply_bathy_data_to_srcs ( +def apply_bathy_data_to_srcs( fim_dir, huc, path_aib_bathy_parquet, bathy_file, - wbd_buffer, - wbd, - output_suffix, - number_of_jobs, verbose, bathymetry_source, ploting_flag ): - if bathymetry_source == "AI_Based": + fim_huc_dir = join(fim_dir, huc) + path_src_processing = join(fim_huc_dir, "src_processing") + os.mkdir(path_src_processing) + + # Get src_full from each branch + src_all_branches = [] + branches = os.listdir(join(fim_huc_dir, 'branches')) + for branch in branches: + src_full = join(fim_huc_dir, 'branches', str(branch), f'src_full_crosswalked_{branch}.csv') + if os.path.isfile(src_full): + src_all_branches.append(src_full) + + # Make a copy of original srcs + destination_dir = join(path_src_processing, "original_srcs") + os.mkdir(destination_dir) + for file_path in src_all_branches: + shutil.copy(file_path, destination_dir) + + if bathymetry_source == "USACE eHydro": + output_ehydro = correct_rating_for_ehydro_bathymetry(fim_dir, huc, bathy_file, verbose) + feature_id_ehydro = output_ehydro[0] + feature_id_aib = correct_rating_for_ai_based_bathymetry(path_aib_bathy_parquet, fim_dir, huc) + else: + feature_id_aib = correct_rating_for_ai_based_bathymetry(path_aib_bathy_parquet, fim_dir, huc) + + # feature_id_target = feature_id_aib[~feature_id_aib.isin(feature_id_ehydro)] + path_aib_src = join(path_src_processing, "aib_srcs") + src_all_branches_path_aib = glob.glob(join(path_aib_src, "*.csv")) + + src_all_branches_path_aib.sort() + src_all_branches.sort() + + for src in range(len(src_all_branches)): + src_df_ehy = pd.read_csv(src_all_branches[src]) + src_df_ehy = src_df_ehy.sort_index(axis=1) + src_df_aib = pd.read_csv(src_all_branches_path_aib[src]) + src_df_aib = src_df_aib.sort_index(axis=1) + + # src_df_ehy["Bathymetry_source"][3:5] = 'n' + # src_df_ehy["BedArea (m2)"][3:5] = 0 + mask = src_df_ehy['Bathymetry_source'] != 'USACE eHydro' + # Use np.where to replace rows in df1 with rows from df2 where mask is True + src_ar_ehy = np.where(mask[:, None], src_df_aib, src_df_ehy) + src_df_ehy = pd.DataFrame(src_ar_ehy, columns = src_df_ehy.columns) + + src_name = os.path.basename(src) + branch = src_name.split(".")[0].split("_")[-1] + log_text += f' Branch: {branch}\n' + - correct_rating_for_ai_based_bathymetry(path_aib_bathy_parquet, fim_dir, huc, ploting_flag) - else: multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, number_of_jobs, verbose, ploting_flag) if __name__ == '__main__':