""" Estimates the forecasted storm impacts based on the forecasted water level and dune crest/toe. """ import click import pandas as pd from logs import setup_logging logger = setup_logging() def forecasted_impacts(df_profile_features, df_forecasted_twl): """ Combines our profile features (containing dune toes and crests) with water levels, to get the forecasted storm impacts. :param df_profile_features: :param df_forecasted_twl: :return: """ logger.info("Getting forecasted storm impacts") df_forecasted_impacts = pd.DataFrame( index=df_profile_features.index.get_level_values("site_id").unique() ) # For each site, find the maximum R_high value and the corresponding R_low value. idx = df_forecasted_twl.groupby(level=["site_id"])["R_high"].idxmax().dropna() df_r_vals = df_forecasted_twl.loc[idx, ["R_high", "R_low"]].reset_index( ["datetime"] ) df_forecasted_impacts = df_forecasted_impacts.merge( df_r_vals, how="left", left_index=True, right_index=True ) # Join with df_profile features to find dune toe and crest elevations df_forecasted_impacts = df_forecasted_impacts.merge( df_profile_features.query("profile_type=='prestorm'").reset_index( "profile_type" )[["dune_toe_z", "dune_crest_z"]], how="left", left_on=["site_id"], right_on=["site_id"], ) # Compare R_high and R_low with dune crest and toe elevations df_forecasted_impacts = storm_regime(df_forecasted_impacts) return df_forecasted_impacts def storm_regime(df_forecasted_impacts): """ Returns the dataframe with an additional column of storm impacts based on the Storm Impact Scale. Refer to Sallenger (2000) for details. :param df_forecasted_impacts: :return: """ logger.info("Getting forecasted storm regimes") df_forecasted_impacts.loc[ df_forecasted_impacts.R_high <= df_forecasted_impacts.dune_toe_z, "storm_regime" ] = "swash" df_forecasted_impacts.loc[ df_forecasted_impacts.dune_toe_z <= df_forecasted_impacts.R_high, "storm_regime" ] = "collision" df_forecasted_impacts.loc[ (df_forecasted_impacts.dune_crest_z <= df_forecasted_impacts.R_high), "storm_regime", ] = "overwash" df_forecasted_impacts.loc[ (df_forecasted_impacts.dune_crest_z <= df_forecasted_impacts.R_low) & (df_forecasted_impacts.dune_crest_z <= df_forecasted_impacts.R_high), "storm_regime", ] = "inundation" # If there is no dune toe defined, R_high should be compared to dune crest to determine if swash or overwash. df_forecasted_impacts.loc[ (df_forecasted_impacts.dune_toe_z.isnull()) & (df_forecasted_impacts.R_high <= df_forecasted_impacts.dune_crest_z), "storm_regime", ] = "swash" return df_forecasted_impacts def twl_exceedence_time( df_profile_features, df_forecasted_twl, z_twl_col="R_high", z_exceedence_col="dune_toe_z", ): """ Returns a dataframe of number of hours the twl exceeded a certain z elevation. May need to use this https://stackoverflow.com/a/53656968 if datetimes are not consistent. :param df_profile_features: :param df_forecasted_twl: :param z_twl_col: :param z_exceedence_col: :return: """ logger.info("Getting twl exceedence time") # Get a dataframe of prestorm dune toes organised by site_id df_dune_toes = ( df_profile_features.query('profile_type=="prestorm"') .reset_index("profile_type")[z_exceedence_col] .to_frame() ) # Merge dune toes into site_id df_merged = df_forecasted_twl.merge( df_dune_toes, left_on=["site_id"], right_on=["site_id"] ) # Return the sum of hours that twl exceeded the level return ( (df_merged[z_twl_col] >= df_merged[z_exceedence_col]) .groupby("site_id") .sum() .rename("twl_{}_exceedance_hrs".format(z_exceedence_col)) .to_frame() ) @click.command() @click.option("--profile-features-csv", required=True, help="") @click.option("--forecasted-twl-csv", required=True, help="") @click.option("--output-file", required=True, help="") def create_forecasted_impacts(profile_features_csv, forecasted_twl_csv, output_file): logger.info("Creating observed wave impacts") logger.info("Importing existing data") df_profile_features = pd.read_csv(profile_features_csv, index_col=[0, 1]) df_forecasted_twl = pd.read_csv(forecasted_twl_csv, index_col=[0, 1]) df_forecasted_impacts = forecasted_impacts(df_profile_features, df_forecasted_twl) df_forecasted_impacts = df_forecasted_impacts.merge( twl_exceedence_time(df_profile_features, df_forecasted_twl), left_on=["site_id"], right_on=["site_id"], ) df_forecasted_impacts.to_csv(output_file, float_format="%.4f") logger.info("Saved to %s", output_file) logger.info("Done!")