import os from multiprocessing import Pool import click import numpy as np import pandas as pd from scipy import stats from analysis import runup_models from utils import crossings from logs import setup_logging logger = setup_logging() MULTIPROCESS_THREADS = int(os.environ.get("MULTIPROCESS_THREADS", 4)) def forecast_twl( df_tides, df_profiles, df_waves, df_profile_features, df_grain_size, runup_function, n_processes=MULTIPROCESS_THREADS, slope="foreshore", profile_type="prestorm", ): # Use df_waves as a base df_twl = df_waves.copy() # Merge tides logger.info("Merging tides") df_twl = df_twl.merge(df_tides, left_index=True, right_index=True) # Estimate foreshore slope. Do the analysis per site_id. This is so we only have to query the x and z # cross-section profiles once per site. site_ids = df_twl.index.get_level_values("site_id").unique() if slope == "foreshore": logger.info("Calculating foreshore slopes") # Process each site_id with a different process and combine results at the end with Pool(processes=n_processes) as pool: results = pool.starmap( foreshore_slope_for_site_id, [(site_id, df_twl, df_profiles) for site_id in site_ids], ) df_twl["beta"] = pd.concat(results) elif slope == "mean": logger.info("Calculating mean (dune toe to MHW) slopes") btm_z = 0.5 # m AHD # When calculating mean slope, we go from the dune toe to mhw. However, in some profiles, the dune toe is not # defined. In these cases, we should go to the dune crest. Let's make a temporary dataframe which has this # already calculated. df_top_ele = df_profile_features.xs(profile_type, level="profile_type").copy() df_top_ele.loc[:, "top_ele"] = df_top_ele.dune_toe_z df_top_ele.loc[ df_top_ele.top_ele.isnull().values, "top_ele" ] = df_top_ele.dune_crest_z n_no_top_ele = len(df_top_ele[df_top_ele.top_ele.isnull()].index) if n_no_top_ele != 0: logger.warning( "{} sites do not have dune toes/crests to calculate mean slope".format( n_no_top_ele ) ) df_slopes = ( df_profiles.xs(profile_type, level="profile_type") .dropna(subset=["z"]) .groupby("site_id") .apply( lambda x: slope_from_profile( profile_x=x.index.get_level_values("x").tolist(), profile_z=x.z.tolist(), top_elevation=df_top_ele.loc[x.index[0][0], :].top_ele, btm_elevation=btm_z, method="least_squares", ) ) .rename("beta") .to_frame() ) # Merge calculated slopes onto each twl timestep df_twl = df_twl.merge(df_slopes, left_index=True, right_index=True) elif slope == "intertidal": logger.info("Calculating intertidal slopes") top_z = 1.15 # m AHD = HAT from MHL annual ocean tides summary report btm_z = -0.9 # m AHD = HAT from MHL annual ocean tides summary report # Calculate slopes for each profile df_slopes = ( df_profiles.xs(profile_type, level="profile_type") .dropna(subset=["z"]) .groupby("site_id") .apply( lambda x: slope_from_profile( profile_x=x.index.get_level_values("x").tolist(), profile_z=x.z.tolist(), top_elevation=top_z, btm_elevation=max(min(x.z), btm_z), method="least_squares", ) ) .rename("beta") .to_frame() ) # Merge calculated slopes onto each twl timestep df_twl = df_twl.merge(df_slopes, left_index=True, right_index=True) # Estimate runup R2, setup, S_total, S_inc, S_ig = runup_function( Hs0=df_twl["Hs0"].tolist(), Tp=df_twl["Tp"].tolist(), beta=df_twl["beta"].tolist(), r=df_twl.merge(df_grain_size, on="site_id").r.tolist(), ) df_twl["R2"] = R2 df_twl["setup"] = setup df_twl["S_total"] = S_total # Estimate TWL df_twl["R_high"] = df_twl["tide"] + df_twl["R2"] df_twl["R_low"] = ( df_twl["tide"] + 1.1 * df_twl["setup"] - 1.1 / 2 * df_twl["S_total"] ) # Drop unneeded columns # df_twl.drop(columns=["E", "Exs", "P", "Pxs", "dir"], inplace=True, errors="ignore") return df_twl def mean_slope_for_site_id( site_id, df_twl, df_profiles, top_elevation_col, top_x_col, btm_elevation_col, profile_type="prestorm", ): """ Calculates the foreshore slope values a given site_id. Returns a series (with same indicies as df_twl) of foreshore slopes. This function is used to parallelize getting foreshore slopes as it is computationally expensive, given the need to iterate for the foreshore slope. :param site_id: :param df_twl: :param df_profiles: :return: A dataframe with slope values calculated """ # Get the prestorm beach profile profile = df_profiles.loc[(site_id, profile_type)] profile_x = profile.index.get_level_values("x").tolist() profile_z = profile.z.tolist() idx = pd.IndexSlice df_twl_site = df_twl.loc[idx[site_id, :], :] df_beta = df_twl_site.apply( lambda row: slope_from_profile( profile_x=profile_x, profile_z=profile_z, top_elevation=row[top_elevation_col], btm_elevation=row[btm_elevation_col], method="least_squares", top_x=row[top_x_col], ), axis=1, ) return df_beta def foreshore_slope_for_site_id(site_id, df_twl, df_profiles): """ Calculates the foreshore slope values a given site_id. Returns a series (with same indicies as df_twl) of foreshore slopes. This function is used to parallelize getting foreshore slopes as it is computationally expensive, given the need to iterate for the foreshore slope. :param site_id: :param df_twl: :param df_profiles: :return: A dataframe with slope values calculated """ # Get the prestorm beach profile profile = df_profiles.query( "site_id =='{}' and profile_type == 'prestorm'".format(site_id) ) profile_x = profile.index.get_level_values("x").tolist() profile_z = profile.z.tolist() df_twl_site = df_twl.query("site_id == '{}'".format(site_id)) df_beta = df_twl_site.apply( lambda row: foreshore_slope_from_profile( profile_x=profile_x, profile_z=profile_z, tide=row.tide, runup_function=runup_models.sto06, Hs0=row.Hs0, Tp=row.Tp, ), axis=1, ) return df_beta def foreshore_slope_from_profile(profile_x, profile_z, tide, runup_function, **kwargs): """ Returns the foreshore slope given the beach profile, water level (tide) and runup_function. Since foreshore slope is dependant on the setup elevation and swash magnitude, which in tern is dependant on the foreshore slope, the process requires iteration to solve. :param profile_x: :param profile_z: :param tide: :param runup_function: The name of a function which will return runup values (refer to runup_models.py) :param kwargs: Additional keyword arguments which will be passed to the runup_function (usually Hs0, Tp). :return: """ # Sometimes there is no tide value for a record, so return None if np.isnan(tide): return None # Initalize estimates max_number_iterations = 30 iteration_count = 0 averaged_accuracy = ( 0.03 ) # if slopes within this amount, average after max number of iterations acceptable_accuracy = ( 0.01 ) # if slopes within this amount, accept after max number of iterations preferred_accuracy = 0.001 # if slopes within this amount, accept beta = 0.05 while True: R2, setup, S_total, _, _ = runup_function(beta=beta, **kwargs) beta_new = slope_from_profile( profile_x=profile_x, profile_z=profile_z, method="least_squares", top_elevation=tide + setup + S_total / 2, btm_elevation=tide + setup - S_total / 2, ) # Return None if we can't find a slope, usually because the elevations we've specified are above/below our # profile x and z coordinates. if beta_new is None: return None # If slopes do not change much between interactions, return the slope if abs(beta_new - beta) < preferred_accuracy: return beta # If we can't converge a solution, return None if iteration_count > max_number_iterations: if abs(beta_new - beta) < acceptable_accuracy: return beta elif abs(beta_new - beta) < averaged_accuracy: return (beta_new + beta) / 2 else: return None beta = beta_new iteration_count += 1 def slope_from_profile( profile_x, profile_z, top_elevation, btm_elevation, method="end_points", top_x=None, btm_x=None, ): """ Returns a slope (beta) from a bed profile, given the top and bottom elevations of where the slope should be taken. :param x: List of x bed profile coordinates :param z: List of z bed profile coordinates :param top_elevation: Top elevation of where to take the slope :param btm_elevation: Bottom elevation of where to take the slope :param method: Method used to calculate slope (end_points or least_squares) :param top_x: x-coordinate of the top end point. May be needed, as there may be multiple crossings of the top_elevation. :param btm_x: x-coordinate of the bottom end point :return: """ # Need all data to get the slope if any([x is None for x in [profile_x, profile_z, top_elevation, btm_elevation]]): return None end_points = {"top": {"z": top_elevation}, "btm": {"z": btm_elevation}} for end_type in end_points.keys(): # Add x coordinates if they are specified if top_x and end_type == "top": end_points["top"]["x"] = top_x continue if btm_x and end_type == "top": end_points["btm"]["x"] = btm_x continue elevation = end_points[end_type]["z"] intersection_x = crossings(profile_x, profile_z, elevation) # No intersections found if len(intersection_x) == 0: return None # One intersection elif len(intersection_x) == 1: end_points[end_type]["x"] = intersection_x[0] # More than on intersection else: if end_type == "top": # For top elevation, take most seaward intersection end_points[end_type]["x"] = intersection_x[-1] else: # For bottom elevation, take most landward intersection that is seaward of top elevation end_point_btm = [ x for x in intersection_x if x > end_points["top"]["x"] ] if len(end_point_btm) == 0: # If there doesn't seem to be an intersection seaward of the top elevation, return none. logger.warning("No intersections found seaward of top elevation") return None else: end_points[end_type]["x"] = end_point_btm[0] # Ensure that top point is landward of bottom point if end_points["top"]["x"] > end_points["btm"]["x"]: logger.warning("Top point is not landward of bottom point") return None if method == "end_points": x_top = end_points["top"]["x"] x_btm = end_points["btm"]["x"] z_top = end_points["top"]["z"] z_btm = end_points["btm"]["z"] return -(z_top - z_btm) / (x_top - x_btm) elif method == "least_squares": profile_mask = [ True if end_points["top"]["x"] < pts < end_points["btm"]["x"] else False for pts in profile_x ] slope_x = np.array(profile_x)[profile_mask].tolist() slope_z = np.array(profile_z)[profile_mask].tolist() slope, _, _, _, _ = stats.linregress(slope_x, slope_z) return -slope @click.command() @click.option("--waves-csv", required=True, help="") @click.option("--tides-csv", required=True, help="") @click.option("--profiles-csv", required=True, help="") @click.option("--profile-features-csv", required=True, help="") @click.option( "--runup-function", required=True, help="", type=click.Choice(["sto06", "hol86", "nie91", "pow18"]), ) @click.option( "--slope", required=True, help="", type=click.Choice(["foreshore", "mean", "intertidal"]), ) @click.option( "--profile-type", required=True, help="", type=click.Choice(["prestorm", "poststorm"]), ) @click.option("--output-file", required=True, help="") @click.option("--grain-size-csv", required=False, help="") def create_twl_forecast( waves_csv, tides_csv, profiles_csv, profile_features_csv, runup_function, slope, profile_type, output_file, grain_size_csv, ): logger.info("Creating forecast of total water levels") logger.info("Importing data") df_waves = pd.read_csv(waves_csv, index_col=[0, 1]) df_tides = pd.read_csv(tides_csv, index_col=[0, 1]) df_profiles = pd.read_csv(profiles_csv, index_col=[0, 1, 2]) df_profile_features = pd.read_csv(profile_features_csv, index_col=[0, 1]) df_grain_size = pd.read_csv(grain_size_csv, index_col=[0]) logger.info("Forecasting TWL") df_twl = forecast_twl( df_tides, df_profiles, df_waves, df_profile_features, df_grain_size, runup_function=getattr(runup_models, runup_function), slope=slope, profile_type=profile_type, ) df_twl.to_csv(output_file) logger.info("Saved to %s", output_file) logger.info("Done!")