Skip to content

Commit

Permalink
replacing original src with ai-based
Browse files Browse the repository at this point in the history
  • Loading branch information
hhs732 committed Jun 7, 2024
1 parent 015b228 commit 40bf892
Showing 1 changed file with 90 additions and 71 deletions.
161 changes: 90 additions & 71 deletions src/bathymetric_adjustment.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@
import geopandas as gpd
import glob
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from synthesize_test_cases import progress_bar_handler
# ############################################from synthesize_test_cases import progress_bar_handler

path_aib_bathy_parquet = "/efs-drives/fim-dev-efs/fim-home/heidi.safa/aib_bathy_adjustment/data/ml_outputs_v1.01.parquet"
fim_dir = "/efs-drives/fim-dev-efs/fim-home/heidi.safa/aib_bathy_adjustment/data/fim_4_4_15_0/"
huc = "07130003"
bathy_file = "/efs-drives/fim-dev-efs/fim-home/bathymetry_processing/bathymetry_illinois.gpkg"

# -------------------------------------------------------
def correct_rating_for_ai_based_bathymetry(path_aib_bathy_parquet, fim_dir, huc, ploting_flag):
def correct_rating_for_ai_based_bathymetry(path_aib_bathy_parquet, fim_dir, huc):

# Load AI-based bathymetry data
ml_bathy_data = pd.read_parquet(path_aib_bathy_parquet, engine='pyarrow') #
Expand Down Expand Up @@ -64,6 +66,9 @@ def correct_rating_for_ai_based_bathymetry(path_aib_bathy_parquet, fim_dir, huc,
aib_bathy_data_gdf = aib_bathy_data_gdf.rename(columns={'COMID': 'feature_id'})
aib_bathy_data_gdf = aib_bathy_data_gdf.rename(columns={'owp_inchan_channel_area': 'missing_xs_area_m2'})

feature_id_aib = aib_bathy_data_gdf["feature_id"].drop_duplicates(keep="first")
feature_id_aib.index = range(len(feature_id_aib))

# Calculating missing_wet_perimeter_m and adding it to aib_bathy_data_gdf
missing_wet_perimeter_m = aib_bathy_data_gdf['owp_inchan_channel_perimeter'] - aib_bathy_data_gdf['owp_tw_inchan']
aib_bathy_data_gdf['missing_wet_perimeter_m'] = missing_wet_perimeter_m
Expand All @@ -72,35 +77,22 @@ def correct_rating_for_ai_based_bathymetry(path_aib_bathy_parquet, fim_dir, huc,
log_text = f'Calculating bathymetry adjustment: {huc}\n'

# Get src_full from each branch
src_all_branches_path = []
branches = os.listdir(join(fim_huc_dir, 'branches'))
for branch in branches:
src_full = join(fim_huc_dir, 'branches', str(branch), f'src_full_crosswalked_{branch}.csv')
if os.path.isfile(src_full):
src_all_branches_path.append(src_full)
processing_files = join(fim_huc_dir, "src_processing", "original_srcs")
src_all_branches_path = glob.glob(join(processing_files, "*.csv"))

# Make a copy of original srcs
destination_dir = join(fim_huc_dir, "original_srcs")
os.mkdir(destination_dir)
for file_path in src_all_branches_path:
shutil.copy(file_path, destination_dir)

# make a directory for ai-based bathy adjusted SRCs
path_aib_src = join(fim_huc_dir, "aib_srcs")
# Make a directory for ai-based bathy adjusted SRCs
path_aib_src = join(join(fim_huc_dir, "src_processing"), "aib_srcs")
os.mkdir(path_aib_src)
# Update src parameters with bathymetric data
for src in src_all_branches_path:
src_df = pd.read_csv(src)
if 'Bathymetry_source' in src_df.columns:
src_df = src_df.drop(columns='Bathymetry_source')
branch = re.search(r'branches/(\d{10}|0)/', src).group()[9:-1]

src_name = os.path.basename(src)
branch = src_name.split(".")[0].split("_")[-1]
log_text += f' Branch: {branch}\n'

# # testing parameters
# fids = src_df['feature_id'].drop_duplicates(keep = 'first')
# aib_bathy = aib_bathy_data_gdf[['feature_id', 'missing_xs_area_m2', 'missing_wet_perimeter_m']]
# aib_bathy_branch = aib_bathy[aib_bathy['feature_id'].isin(fids)]

# Merge in missing bathy data and fill Nans
try:
src_df = src_df.merge(
Expand Down Expand Up @@ -151,22 +143,18 @@ def correct_rating_for_ai_based_bathymetry(path_aib_bathy_parquet, fim_dir, huc,
src_df.loc[src_df['Stage'] == 0, ['Discharge (m3s-1)']] = 0
# Calculate number of adjusted HydroIDs

if ploting_flag == "True":
# Write src in ai-based file
src_name = os.path.basename(src)
path2save = join(path_aib_src, src_name)
src_df.to_csv(path2save, index=False)

else:
# Write src back to file
src_df.to_csv(src, index=False)
# Write src in ai-based file
path2save = join(path_aib_src, src_name)
src_df.to_csv(path2save, index=False)

return feature_id_aib


def plot_aib_original_src(fim_dir, huc):

fim_huc_dir = join(fim_dir, huc)
path_aib = join(fim_huc_dir, "aib_srcs")
path_original = join(fim_huc_dir, "original_srcs")
path_aib = join(fim_huc_dir, "src_processing", "aib_srcs")
path_original = join(fim_huc_dir, "src_processing", "original_srcs")
csv_files_aib = glob.glob(join(path_aib,'*.csv'))

# Plot original srcs and ai-based srcs
Expand Down Expand Up @@ -219,10 +207,9 @@ def plot_aib_original_src(fim_dir, huc):
def plot_ehydro_aib_original_srcs(fim_dir, huc):

fim_huc_dir = join(fim_dir, huc)
path_aib = join(fim_huc_dir, "aib_srcs")
path_ehydro = join(fim_huc_dir, "ehydro_srcs")
path_original = join(fim_huc_dir, "original_srcs")
# csv_files_aib = glob.glob(join(path_aib,'*.csv'))
path_aib = join(fim_huc_dir, "src_processing", "aib_srcs")
path_ehydro = join(fim_huc_dir, "src_processing", "ehydro_srcs")
path_original = join(fim_huc_dir, "src_processing", "original_srcs")
csv_files_ehydro = glob.glob(join(path_ehydro,'*.csv'))

# Plot original srcs and ai-based and ehydro srcs
Expand Down Expand Up @@ -280,7 +267,7 @@ def plot_ehydro_aib_original_srcs(fim_dir, huc):

# -------------------------------------------------------
# Adjusting synthetic rating curves using 'USACE eHydro' bathymetry data
def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, ploting_flag, verbose):
def correct_rating_for_ehydro_bathymetry(fim_dir, huc, bathy_file, verbose):
"""Function for correcting synthetic rating curves. It will correct each branch's
SRCs in serial based on the feature_ids in the input bathy_file.
Expand All @@ -293,9 +280,6 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, ploting_flag, verbos
bathy_file : str
Path to bathymetric adjustment geopackage, e.g.
"/data/inputs/bathymetry/bathymetry_adjustment_data.gpkg".
verbose : bool
Verbose printing.
Returns
----------
log_text : str
Expand All @@ -309,6 +293,8 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, ploting_flag, verbos
wbd8_clp = gpd.read_file(join(fim_huc_dir, 'wbd8_clp.gpkg'), engine="pyogrio", use_arrow=True)
bathy_data = gpd.read_file(bathy_file, mask=wbd8_clp, engine="fiona")
bathy_data = bathy_data.rename(columns={'ID': 'feature_id'})
feature_id_ehydro = bathy_data["feature_id"].drop_duplicates(keep="first")
feature_id_ehydro.index = range(len(feature_id_ehydro))

# Get src_full from each branch
src_all_branches = []
Expand All @@ -317,14 +303,8 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, ploting_flag, verbos
src_full = join(fim_huc_dir, 'branches', str(branch), f'src_full_crosswalked_{branch}.csv')
if os.path.isfile(src_full):
src_all_branches.append(src_full)

# Make a copy of original srcs
destination_dir = join(fim_huc_dir, "original_srcs")
os.mkdir(destination_dir)
for file_path in src_all_branches:
shutil.copy(file_path, destination_dir)

path_ehydro_src = join(fim_huc_dir, "ehydro_srcs")
path_ehydro_src = join(join(fim_huc_dir, "src_processing"), "ehydro_srcs")
os.mkdir(path_ehydro_src)
# Update src parameters with bathymetric data
for src in src_all_branches:
Expand All @@ -335,7 +315,7 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, ploting_flag, verbos
log_text += f' Branch: {branch}\n'

if bathy_data.empty:
log_text += ' There were no bathymetry feature_ids for this branch'
log_text += ' There were no eHydro bathymetry feature_ids for this branch'
src_df['Bathymetry_source'] = [""] * len(src_df)
src_df.to_csv(src, index=False)
return log_text
Expand All @@ -359,6 +339,7 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, ploting_flag, verbos
'Bathymetry_source'
].first()
src_df = src_df.merge(reconciled_bathy_data, on='feature_id', how='left', validate='many_to_one')

# Exit if there are no recalculations to be made
if ~src_df['Bathymetry_source'].any(axis=None):
log_text += ' No matching feature_ids in this branch\n'
Expand Down Expand Up @@ -389,18 +370,17 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, ploting_flag, verbos
# Calculate number of adjusted HydroIDs
count = len(src_df.loc[(src_df['Stage'] == 0) & (src_df['Bathymetry_source'] == 'USACE eHydro')])

if ploting_flag == "True":
# Write src in ai-based file
src_name = os.path.basename(src)
path2save = join(path_ehydro_src, src_name)
src_df.to_csv(path2save, index=False)
else:
# Write src back to file
src_df.to_csv(src, index=False)
# Write src in ai-based file
src_name = os.path.basename(src)
path2save = join(path_ehydro_src, src_name)
src_df.to_csv(path2save, index=False)

# Write src back to file
src_df.to_csv(src, index=False)

log_text += f' Successfully recalculated {count} HydroIDs\n'

return log_text
return [feature_id_ehydro, log_text]


def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, number_of_jobs, verbose):
Expand Down Expand Up @@ -490,32 +470,71 @@ def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, numb
log_file.write('TOTAL RUN TIME: ' + str(tot_run_time))
log_file.close()

# wbd_buffer = 50
# bathy_file = "/efs-drives/fim-dev-efs/fim-home/bathymetry_processing/bathymetry_illinois.gpkg"
# output_suffix = "eHydro"
# number_of_jobs = 6
# verbose = False
verbose = False
# correct_rating_for_bathymetry(fim_dir, huc, bathy_file, verbose)
bathymetry_source = "USACE eHydro"

def apply_bathy_data_to_srcs (
def apply_bathy_data_to_srcs(
fim_dir,
huc,
path_aib_bathy_parquet,
bathy_file,
wbd_buffer,
wbd,
output_suffix,
number_of_jobs,
verbose,
bathymetry_source,
ploting_flag
):

if bathymetry_source == "AI_Based":
fim_huc_dir = join(fim_dir, huc)
path_src_processing = join(fim_huc_dir, "src_processing")
os.mkdir(path_src_processing)

# Get src_full from each branch
src_all_branches = []
branches = os.listdir(join(fim_huc_dir, 'branches'))
for branch in branches:
src_full = join(fim_huc_dir, 'branches', str(branch), f'src_full_crosswalked_{branch}.csv')
if os.path.isfile(src_full):
src_all_branches.append(src_full)

# Make a copy of original srcs
destination_dir = join(path_src_processing, "original_srcs")
os.mkdir(destination_dir)
for file_path in src_all_branches:
shutil.copy(file_path, destination_dir)

if bathymetry_source == "USACE eHydro":
output_ehydro = correct_rating_for_ehydro_bathymetry(fim_dir, huc, bathy_file, verbose)
feature_id_ehydro = output_ehydro[0]
feature_id_aib = correct_rating_for_ai_based_bathymetry(path_aib_bathy_parquet, fim_dir, huc)
else:
feature_id_aib = correct_rating_for_ai_based_bathymetry(path_aib_bathy_parquet, fim_dir, huc)

# feature_id_target = feature_id_aib[~feature_id_aib.isin(feature_id_ehydro)]
path_aib_src = join(path_src_processing, "aib_srcs")
src_all_branches_path_aib = glob.glob(join(path_aib_src, "*.csv"))

src_all_branches_path_aib.sort()
src_all_branches.sort()

for src in range(len(src_all_branches)):
src_df_ehy = pd.read_csv(src_all_branches[src])
src_df_ehy = src_df_ehy.sort_index(axis=1)
src_df_aib = pd.read_csv(src_all_branches_path_aib[src])
src_df_aib = src_df_aib.sort_index(axis=1)

# src_df_ehy["Bathymetry_source"][3:5] = 'n'
# src_df_ehy["BedArea (m2)"][3:5] = 0
mask = src_df_ehy['Bathymetry_source'] != 'USACE eHydro'
# Use np.where to replace rows in df1 with rows from df2 where mask is True
src_ar_ehy = np.where(mask[:, None], src_df_aib, src_df_ehy)
src_df_ehy = pd.DataFrame(src_ar_ehy, columns = src_df_ehy.columns)

src_name = os.path.basename(src)
branch = src_name.split(".")[0].split("_")[-1]
log_text += f' Branch: {branch}\n'


correct_rating_for_ai_based_bathymetry(path_aib_bathy_parquet, fim_dir, huc, ploting_flag)

else: multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, number_of_jobs, verbose, ploting_flag)


if __name__ == '__main__':
Expand Down

0 comments on commit 40bf892

Please sign in to comment.