import click import numpy as np import pandas as pd from scipy.integrate import simps from scipy.signal import savgol_filter from scipy.interpolate import interp1d from numpy import ma as ma from itertools import groupby from tqdm import tqdm from logs import setup_logging from utils import crossings, get_i_or_default from analysis.forecast_twl import get_mean_slope, get_intertidal_slope logger = setup_logging() def return_first_or_nan(l): """ Returns the first value of a list if empty or returns nan. Used for getting dune/toe and crest values. :param l: :return: """ if len(l) == 0: return np.nan else: return l[0] def round_up_to_odd(f): """ https://stackoverflow.com/a/31648815 :param f: :return: """ return int(np.ceil(f) // 2 * 2 + 1) def volume_change(df_profiles, df_profile_features): """ Calculates how much the volume change there is between prestrom and post storm profiles. :param df_profiles: :param df_profile_features: :return: """ logger.info("Calculating change in beach volume") df_vol_changes = pd.DataFrame( index=df_profile_features.index.get_level_values("site_id").unique() ) df_profiles = df_profiles.dropna(subset=["z"]) df_profiles = df_profiles.sort_index() sites = df_profiles.groupby(level=["site_id"]) for site_id, df_site in tqdm(sites): params = {} # Calculate pre and post storm volume seawards of our profile change point # and store in dictionary and dataframe. df_prestorm = df_profiles.loc[(site_id, "prestorm")] df_poststorm = df_profiles.loc[(site_id, "poststorm")] # Calculate total subaerial volume change # Calculate difference between pre and post storm profiles z_diff = ( df_profiles.loc[(site_id, "poststorm")].z - df_profiles.loc[(site_id, "prestorm")].z ) # There are no common points between pre and poststorm values, so we have to # skip this if z_diff.dropna().size == 0: continue # # # Debug # import matplotlib.pyplot as plt # plt.plot(z_diff) # plt.plot(df_profiles.loc[(site_id, "prestorm")].z) # plt.plot(df_profiles.loc[(site_id, "poststorm")].z) # plt.show() # First, find locations where we have a good match between pre and post storm # profiles. lidar_accuracy = 0.2 # m good_match = start_stop( (abs(z_diff) < lidar_accuracy).values, trigger_val=True, len_thresh=20 ) # If entire profile has changed (usually at lagoon entrance), take change # point as the first x-coord where we have both pre and poststorm profiles if good_match.size == 0: x_change_point = min( set(df_prestorm.z.dropna().index.get_level_values("x")) & set(df_poststorm.z.dropna().index.get_level_values("x")) ) z_change_point = df_prestorm.loc[x_change_point].z else: # Minimum idx_change_points should be the first place where we have a good match idx_change_point_min = good_match[0][0] # Identify locations where z_diff is negative, i.e. profile has been # eroded by the storm. Then group them by the number of consecutive # values. grouped = start_stop((z_diff < 0).values, trigger_val=True, len_thresh=1) # Sort by streaklength, then get the start index of the longest streak # of true values. x_change_point is then the x-coordinate where our pre and # post storm profiles start to change. idx_change_points = sorted( [x for x in grouped if x[0] > idx_change_point_min], key=lambda x: x[1] - x[0], reverse=True, ) if len(idx_change_points) == 0: continue else: idx_change_point = idx_change_points[0][0] x_change_point = z_diff.index[idx_change_point] z_change_point = df_prestorm.loc[x_change_point].z # Landward of the change point, set difference in pre/post storm profiles # equal to zero z_diff[z_diff.index < x_change_point] = 0 params["prestorm_total_subaerial_vol"] = beach_volume( x=df_prestorm.index.get_level_values("x"), z=df_prestorm.z.values, x_min=x_change_point, ) params["poststorm_total_subaerial_vol"] = beach_volume( x=df_poststorm.index.get_level_values("x"), z=df_poststorm.z.values, x_min=x_change_point, ) params["total_vol_change"] = ( params["poststorm_total_subaerial_vol"] - params["prestorm_total_subaerial_vol"] ) params["x_change_point"] = x_change_point params["z_change_point"] = z_change_point df_vol_changes.loc[site_id, "total_vol_change"] = params["total_vol_change"] df_vol_changes.loc[site_id, "x_change_point"] = params["x_change_point"] df_vol_changes.loc[site_id, "z_change_point"] = params["z_change_point"] for zone in ["dune", "swash"]: params[zone] = {} for profile_type in ["prestorm", "poststorm"]: # Store zone/profile_type results in a dictionary, then append at the # end to the params dictionary. d = {} # Get variables, this helps simplify the below code df_profile_feature = df_profile_features.loc[(site_id, profile_type)] df_profile = df_profiles.loc[(site_id, profile_type)] # Define the edges of the swash and dunes where we want to calculate subaeraial volume. if zone == "swash": d["x_min"] = df_profile_feature.dune_toe_x d["x_max"] = max(df_profile.index.get_level_values("x")) # For profiles with no Dlow value, we take Dhigh as the minimum value to calculate swash if np.isnan(d["x_min"]): d["x_min"] = df_profile_feature.dune_crest_x elif zone == "dune": d["x_min"] = df_profile_feature.dune_crest_x if np.isnan(df_profile_feature.dune_toe_x): # If there's no dune toe, take most seaward value d["x_max"] = max(df_profile.index.get_level_values("x")) else: d["x_max"] = df_profile_feature.dune_toe_x # For profiles with no Dlow value, the dune is undefined and we cannot calculate a dune volume. # Calculate subaerial volume based on our x min and maxes d["subaerial_vol"] = beach_volume( x=df_profile.index.get_level_values("x"), z=df_profile.z.values, x_min=d["x_min"], x_max=d["x_max"], ) params[zone][profile_type] = d # Calculate change in volumes. Use the z_diff array which has been # zero'ed out landward of the x_change_point # Zero out nans so we can calculate change in beach volume. z_diff.loc[np.isnan(z_diff)] = 0 params[zone]["vol_change"] = beach_volume( x=z_diff.index.values, z=z_diff.values, x_min=params[zone]["prestorm"]["x_min"], x_max=params[zone]["prestorm"]["x_max"], ) if (zone == 'dune') & (np.isnan(params[zone]['vol_change'])): print(site_id) params[zone]["pct_change"] = ( params[zone]["vol_change"] / params[zone]["prestorm"]["subaerial_vol"] ) # params[zone]["vol_loss"] = (params[zone]["prestorm"]["subaerial_vol"] - # params[zone]["poststorm"]["subaerial_vol"]) # params[zone]["pct_loss"] = \ # params[zone]["vol_loss"] / params[zone]["prestorm"]["subaerial_vol"] # Save results in our data frame df_vol_changes.loc[site_id, "prestorm_{}_vol".format(zone)] = params[zone][ "prestorm" ]["subaerial_vol"] df_vol_changes.loc[site_id, "poststorm_{}_vol".format(zone)] = params[zone][ "poststorm" ]["subaerial_vol"] df_vol_changes.loc[site_id, "{}_vol_change".format(zone)] = params[zone][ "vol_change" ] df_vol_changes.loc[site_id, "{}_pct_change".format(zone)] = params[zone][ "pct_change" ] return df_vol_changes def beach_volume(x, z, x_min=np.NINF, x_max=np.inf): """ Returns the beach volume of a profile, calculated with Simpsons rule :param x: x-coordinates of beach profile :param z: z-coordinates of beach profile :param x_min: Minimum x-coordinate to consider when calculating volume :param x_max: Maximum x-coordinate to consider when calculating volume :return: """ profile_mask = [True if x_min < x_coord < x_max else False for x_coord in x] x_masked = np.array(x)[profile_mask] z_masked = np.array(z)[profile_mask] if len(x_masked) == 0 or len(z_masked) == 0: return np.nan else: return simps(z_masked, x_masked) def storm_regime(df_observed_impacts): """ Returns the dataframe with an additional column of storm impacts based on the Storm Impact Scale. Refer to Sallenger (2000) for details. :param df_observed_impacts: :return: """ logger.info("Getting observed storm regimes") swash = df_observed_impacts.dune_vol_change > -3 collision = df_observed_impacts.dune_vol_change < -3 df_observed_impacts.loc[swash, "storm_regime"] = "swash" df_observed_impacts.loc[collision, "storm_regime"] = "collision" # TODO We may be able to identify observed regimes by looking at the change in crest and toe elevation. This would be useful for # locations where we have overwash and cannot calculate the change in volume correctly. Otherwise, maybe it's better to put it in manually. return df_observed_impacts def overwrite_impacts(df_observed_impacts, df_raw_features): """ Overwrites calculated impacts with impacts manually specified in profile_features file :param df_raw_profile_features: :return: """ # Get manually specified impacts from the profile features ./data/raw/ folder. Note that sites which need to be # overwritten with a NaN, use the string 'none' in the csv. This is because when we use the df.update() command, # it doesn't overwrite NaN values. So we'll put in the 'none' string, then overwrite that with the NaN. df_overwritten_impacts = df_raw_features.rename( columns={"observed_storm_regime": "storm_regime"} ).storm_regime.to_frame() df_observed_impacts.update(df_overwritten_impacts) # Replace 'none' with nan df_observed_impacts.loc[ df_observed_impacts.storm_regime == "unknown", "storm_regime" ] = np.nan return df_observed_impacts @click.command() @click.option("--profiles-csv", required=True, help="") @click.option("--profile-features-crest-toes-csv", required=True, help="") @click.option("--raw-profile-features-csv", required=True, help="") @click.option("--output-file", required=True, help="") def create_observed_impacts( profiles_csv, profile_features_crest_toes_csv, raw_profile_features_csv, output_file ): logger.info("Creating observed wave impacts") logger.info("Importing data") df_profiles = pd.read_csv(profiles_csv, index_col=[0, 1, 2]) df_profile_features = pd.read_csv(profile_features_crest_toes_csv, index_col=[0, 1]) logger.info("Creating new dataframe for observed impacts") df_observed_impacts = pd.DataFrame( index=df_profile_features.index.get_level_values("site_id").unique() ) # TODO Review volume change with changing dune toe/crests logger.info("Getting pre/post storm volumes") df_vol_changes = volume_change(df_profiles, df_profile_features) df_observed_impacts = df_observed_impacts.join(df_vol_changes) # Classify regime based on volume changes df_observed_impacts = storm_regime(df_observed_impacts) # Overwrite storm impacts with manually picked impacts df_raw_features = pd.read_csv(raw_profile_features_csv, index_col=[0]) df_observed_impacts = overwrite_impacts(df_observed_impacts, df_raw_features) # Calculate change in mean slope df_prestorm_mean_slopes = get_mean_slope( df_profile_features, df_profiles, profile_type="prestorm" ) df_poststorm_mean_slopes = get_mean_slope( df_profile_features, df_profiles, profile_type="poststorm" ) df_diff_mean_slopes = df_poststorm_mean_slopes - df_prestorm_mean_slopes # Calculate change in intertidal slope df_prestorm_intertidal_slopes = get_intertidal_slope( df_profiles, profile_type="prestorm" ) df_poststorm_intertidal_slopes = get_intertidal_slope( df_profiles, profile_type="poststorm" ) df_diff_intertidal_slopes = ( df_poststorm_intertidal_slopes - df_prestorm_intertidal_slopes ) # Rename slope columns and merge into observed impacts renames = [ {"df": df_prestorm_mean_slopes, "new_col_name": "beta_prestorm_mean"}, {"df": df_poststorm_mean_slopes, "new_col_name": "beta_poststorm_mean"}, {"df": df_diff_mean_slopes, "new_col_name": "beta_diff_mean"}, { "df": df_prestorm_intertidal_slopes, "new_col_name": "beta_prestorm_intertidal", }, { "df": df_poststorm_intertidal_slopes, "new_col_name": "beta_poststorm_intertidal", }, {"df": df_diff_intertidal_slopes, "new_col_name": "beta_diff_intertidal"}, ] for rename in renames: rename["df"].rename( {"beta": rename["new_col_name"]}, axis="columns", inplace=True ) # Join all our slopes into the observed impacts df_observed_impacts = pd.concat( [ df_prestorm_mean_slopes, df_poststorm_mean_slopes, df_diff_mean_slopes, df_prestorm_intertidal_slopes, df_poststorm_intertidal_slopes, df_diff_intertidal_slopes, df_observed_impacts, ], axis=1, ) # Calculate change in beach width df_width_msl_prestorm = get_beach_width( df_profile_features, df_profiles, profile_type="prestorm", ele=0, col_name="width_msl_prestorm", ) df_width_msl_poststorm = get_beach_width( df_profile_features, df_profiles, profile_type="poststorm", ele=0, col_name="width_msl_poststorm", ) df_width_msl_change_m = (df_width_msl_poststorm - df_width_msl_prestorm).rename( "width_msl_change_m" ) df_width_msl_change_pct = ( df_width_msl_change_m / df_width_msl_prestorm * 100 ).rename("width_msl_change_pct") # Join beach width change onto observed impacts dataframe df_observed_impacts = pd.concat( [ df_observed_impacts, df_width_msl_prestorm, df_width_msl_poststorm, df_width_msl_change_m, df_width_msl_change_pct, ], axis=1, ) # Save dataframe to csv df_observed_impacts.to_csv(output_file, float_format="%.4f") logger.info("Saved to %s", output_file) logger.info("Done!") def get_beach_width(df_profile_features, df_profiles, profile_type, ele, col_name): df_x_position = ( df_profiles.xs(profile_type, level="profile_type") .dropna(subset=["z"]) .groupby("site_id") .apply( lambda x: get_i_or_default( crossings( profile_x=x.index.get_level_values("x").tolist(), profile_z=x.z.tolist(), constant_z=ele, ), -1, default=np.nan, ) ) .rename("x_position") ) df_x_prestorm_dune_toe = df_profile_features.xs( "prestorm", level="profile_type" ).dune_toe_x df_width = (df_x_position - df_x_prestorm_dune_toe).rename(col_name) return df_width def start_stop(a, trigger_val, len_thresh=2): """ https://stackoverflow.com/a/51259253 In [47]: myArray Out[47]: array([1, 1, 0, 2, 0, 1, 1, 1, 1, 0, 0, 1, 2, 1, 1, 1]) In [48]: start_stop(myArray, trigger_val=1, len_thresh=2) Out[48]: array([[ 5, 8], [13, 15]]) :param a: :param trigger_val: :param len_thresh: :return: """ # "Enclose" mask with sentients to catch shifts later on mask = np.r_[False, np.equal(a, trigger_val), False] # Get the shifting indices idx = np.flatnonzero(mask[1:] != mask[:-1]) # Get lengths lens = idx[1::2] - idx[::2] return idx.reshape(-1, 2)[lens > len_thresh] - [0, 1]