# Performs probabilstic assessment on *.yaml files, # and saves output shoreline chainages to csv import os import yaml import argparse import numpy as np import pandas as pd import matplotlib import matplotlib.patheffects import matplotlib.pyplot as plt import matplotlib.gridspec as gridspec from scipy.optimize import curve_fit from scipy import interpolate from tqdm import tqdm def choose(options): """ Get the user to select one (or all) items from a list of options. Args: x (list): list of options Returns: the selected option(s) in a list """ options.append('All') for i, option in enumerate(options): print('{:3d}) {:s}'.format(i + 1, option)) while True: try: val = input("Choose> ") if int(val) == len(options): selected = options[:-1] else: selected = [options[int(val) - 1]] return selected break except (ValueError, IndexError): print("Invalid input, please try again") def parse_args(): """ Check if a parameter file was provided at the command prompt Returns: the parsed input arguments in a dict """ parser = argparse.ArgumentParser(usage=__doc__) parser.add_argument('-f', '--file', help='name of parameter file', default=None) parser.add_argument('-a', '--all', help='process all *.yaml files in folder', action='store_true') return parser.parse_args() def read_parameter_file(fname): """ Read yaml file Args: fname (str): name of yaml file Returns: the file contents in a dict """ with open(fname, 'r') as stream: try: params = yaml.safe_load(stream) except yaml.YAMLError as exc: print(exc, "\nCheck the formatting of the yaml file.") return params def ari_to_aep(x): """ Convert annual recurrance interval to annual exceedance probability. Args: x (array_like): input array Returns: the calculated AEP """ return 1 - np.exp(-1 / x) def aep_to_ari(x): """ Convert annual exceedance probability to annual recurrance interval. Args: x (array_like): input array Returns: the calculated ARI """ return -1 / np.log(1 - x) def get_value_from_ep(x, p): """ Calculate percentile value from array based on exceedance probability. Args: x (array_like): input array p (float): probability (e.g. p=0.05 for 5% exceedance) Returns: the calculated percentile value """ return np.percentile(x, (1 - p) * 100) def get_cumulative_distribution(x, min_ep=1e-5, num=1000): """ Calculate cumulative distrubution function of a random variable. Args: x (array_like): input array min_ep (float): minimum exceedance probability num (int): number of points to calculate distribution over Returns: the cumulative distribution for plotting, e.g. plt.plot(exceedance_probability, value) """ exceedance_probability = np.logspace(np.log10(min_ep), 0, num) value = get_value_from_ep(x, exceedance_probability) return exceedance_probability, value def constrained_log_fit(x, p1, p2): """ Fit log curve based on AEP values. Gordon (1987) provides two cases: 1. Low demand, open beaches: p1 = 5 p2 = 30 2. High demand, rip heads p1 = 40 p2 = 40 Args: x (array_like): input array of AEP values p1 (float): parameter 1 p2 (float): parameter 2 Returns: the fitted values for the given AEPs """ ari = aep_to_ari(x) return p1 + p2 * np.log(ari) def get_ongoing_recession(n_runs, start_year, end_year, sea_level_rise, bruun_factor, underlying_recession): """ Use Monte Carlo simulation to calculate ongoing shoreline recession. Args: n_runs (int): number of runs start_year (int): first year of model end_year (int): last year of model sea_level_rise (dict): 'year' (array_like): years 'min' (array_like): minimum value 'mode' (array_like): most likely value 'max' (array_like): maximum value bruun_factor (dict): 'min' (float): minimum value 'mode' (float): most likely value 'max' (float): maximum value underlying_recession (dict): 'min' (float): minimum value 'mode' (float): most likely value 'max' (float): maximum value Returns: the simulated recession distance (m) """ # Get time interval from input file years = np.arange(start_year, end_year + 1) n_years = len(years) # Interpolate sea level rise projections (m) slr_mode = np.interp(years, xp=sea_level_rise['year'], fp=sea_level_rise['mode'])[:, np.newaxis] try: slr_min = np.interp(years, xp=sea_level_rise['year'], fp=sea_level_rise['min'])[:, np.newaxis] except ValueError: # Use mode for deterministic beaches slr_min = slr_mode try: slr_max = np.interp(years, xp=sea_level_rise['year'], fp=sea_level_rise['max'])[:, np.newaxis] except ValueError: # Use mode for deterministic beaches slr_max = slr_mode # Initialise sea level rise array slr = np.zeros([n_years, n_runs]) for i in range(n_years): # Use triangular distribution for SLR in each year (m) try: slr[i, :] = np.random.triangular(left=slr_min[i], mode=slr_mode[i], right=slr_max[i], size=n_runs) except ValueError: # Use constant value if slr_min == slr_max slr[i, :] = np.ones([1, n_runs]) * slr_mode[i] # Sort each row, so SLR follows a smooth trajectory for each model run slr.sort(axis=1) # Shuffle columns, so the order of model runs is randomised np.random.shuffle(slr.T) # Shift sea level so it is zero in the start year slr -= slr[0, :].mean(axis=0) # Simulate probabilistic Bruun factors (-) if (bruun_factor['min'] < bruun_factor['mode'] < bruun_factor['max']): # Use probabilistic method if min and max are provided bruun_factor = np.random.triangular(left=bruun_factor['min'], mode=bruun_factor['mode'], right=bruun_factor['max'], size=n_runs) else: # Ensure values were not given in reverse order if bruun_factor['min'] > bruun_factor['mode']: raise ValueError( "bruun_factor['min'] must be less than bruun_factor['mode']") if bruun_factor['mode'] > bruun_factor['max']: raise ValueError( "bruun_factor['mode'] must be less than bruun_factor['max']") # Use deterministic method if only mode is provided bruun_factor = np.ones([1, n_runs]) * bruun_factor['mode'] # Simulate probabilistic underlying recession rate (m/y) if (underlying_recession['min'] < underlying_recession['mode'] < underlying_recession['max']): # Use probabilistic method if min and max are provided underlying_recession_rate = np.random.triangular( left=underlying_recession['min'], mode=underlying_recession['mode'], right=underlying_recession['max'], size=n_runs) else: # Ensure values were not given in reverse order if underlying_recession['min'] > underlying_recession['mode']: raise ValueError(("underlying_recession['min'] must be " "less than underlying_recession['mode']")) if underlying_recession['mode'] > underlying_recession['max']: raise ValueError(("underlying_recession['mode'] must be " "less than underlying_recession['max']")) # Use deterministic method if only mode is provided underlying_recession_rate = np.ones([1, n_runs ]) * underlying_recession['mode'] # Repeat Bruun factors for each year bruun_factor = np.tile(bruun_factor, [n_years, 1]) # Calculate total underlying recession year_factor = np.arange(1, n_years + 1)[:, np.newaxis] underlying_recession = underlying_recession_rate * year_factor underlying_recession_rate = np.tile(underlying_recession_rate, [n_years, 1]) # Remove probabilistic component from start year slr[0, :] = slr[0, :].mean() underlying_recession[0, :] = underlying_recession[0, :].mean() bruun_factor[0, :] = bruun_factor[0, :].mean() underlying_recession_rate[0, :] = underlying_recession_rate[0, :].mean() # Calculate total ongoing recession (m) ongoing_recession = slr * bruun_factor + underlying_recession return (ongoing_recession, slr, bruun_factor, underlying_recession, underlying_recession_rate) def get_storm_demand_volume(ref_aep, ref_vol, n, mode='fit'): """ Calculate storm demand volume, either by fitting a log curve to the supplied values (mode='fit'), or by simulating storm events with a poisson distribution (mode='simulate'). Note: 'simulate' mode tends to overestimate storm demand for high ARIs to account for storm clustering. Args: ref_aep (array_like): input AEP values ref_vol (array_like): input volumes (m3) n (int): number of model runs mode (str): 'fit' (default) or 'simulate' Returns: the storm demand volume (m3/m) """ if mode == 'fit': # Fit curve based on reference storm demand volumes p, _ = curve_fit(constrained_log_fit, ref_aep, ref_vol, p0=(5., 30.)) rand_aep = np.random.rand(n) volume = constrained_log_fit(rand_aep, *p) volume[volume < 0] = 0 elif mode == 'simulate': # Simulate number of storms of each ARI magnitude lam = np.tile(ref_aep[:, np.newaxis], [1, n]) n_storms = np.random.poisson(lam) # Calculate storm demand volume (m3/m) vol_storms = ref_vol[:, np.newaxis] * n_storms volume = vol_storms.sum(axis=0) else: raise ValueError("mode must be 'fit' or 'simulate'") return volume def process(beach_name, beach_scenario, n_runs, start_year, end_year, output_years, output_ep, zsa_profile_file, zrfc_profile_file, output_folder, figure_folder, sea_level_rise, bruun_factor, underlying_recession, storm_demand, diagnostics, omit, min_chainage, segment_gaps, insert_points, append_points): """ Calculate probabilistic shorelines using Monte Carlo simulation Args: beach_name (str): name of beach in photogrammetry database beach_scenario (str): name for saved csv profile chainage file n_runs (int): number of runs start_year (int): first year of model end_year (int): last year of model output_years (list): years to save profiles output_ep (list): EP values for saved profiles zsa_profile_file (str): path to storm demand vs chainge data (ZSA) zrfc_profile_file (str): path to storm demand vs chainge data (ZRFC) output_folder (str): where to save profiles sea_level_rise (dict): 'year' (list): years 'min' (list): minimum value 'mode' (list): most likely value 'max' (list): maximum value bruun_factor (dict): 'min' (float): minimum value 'mode' (float): most likely value 'max' (float): maximum value underlying_recession (dict): 'min' (float): minimum value 'mode' (float): most likely value 'max' (float): maximum value diagnostics (dict): 'block' (list): block IDs for outputting diagnostics 'profile' (list): profile IDs for outputting diagnostics omit (dict): 'block' (list): block IDs to omit from analysis 'profile' (list): profile IDs to omit from analysis min_chainage (dict): 'block' (list): block IDs for trimming chainage 'profile' (list): profile IDs for trimming chainage 'chainage' (list): minimum chainage, at non-erodable barrier segment_gaps (dict): 'block' (list): block IDs for breaking segment 'profile' (list): profile IDs for breaking segments insert_points (dict): 'before' (dict): points to add to (south/west) end of hazard lines 'x' (list): eastings 'y' (list): northings append_points (dict): points to add to (north/east) end of lines 'x' (list): eastings 'y' (list): nortings """ # Get reference storm demand (m3/m) ref_ari = np.asarray(storm_demand['ari']) ref_aep = ari_to_aep(ref_ari) ref_vol = storm_demand['vol'] # Get year index years = np.arange(start_year, end_year + 1) # Check if beach is probabilistic if n_runs == 1: probabilistic = False else: probabilistic = True # Simulate ongoing shoreline recession r, slr, bf, ur, ur_rate = get_ongoing_recession(n_runs, start_year, end_year, sea_level_rise, bruun_factor, underlying_recession) ongoing_recession = r.copy() # Pre-allocate storm demand volume for each year (m3/m) storm_demand_volume = np.zeros([len(years), n_runs]) if probabilistic: for i in range(len(years)): # Generate synthetic storm demands for each year storm_demand_volume[i, :] = get_storm_demand_volume(ref_aep, ref_vol, n=n_runs, mode='fit') else: # Get storm demand for 1% AEP event sort_idx = np.argsort(ref_aep) storm_demand_volume[:] = np.interp(output_ep, np.array(ref_aep)[sort_idx], np.array(ref_vol)[sort_idx]) # Load profile data for current beach pbar_profiles = tqdm(['ZSA', 'ZRFC'], leave=False) for profile_type in pbar_profiles: pbar_profiles.set_description('{}'.format(profile_type)) if profile_type == 'ZSA': df_in = pd.read_csv(zsa_profile_file) if profile_type == 'ZRFC': df_in = pd.read_csv(zrfc_profile_file) col_names = [c for c in df_in.columns if c.isdigit()] # Loop through profiles dff = df_in[df_in['beach'] == beach_name] # Remove omitted profiles dff = pd.merge( pd.DataFrame(omit), dff, how='outer', indicator='source').query('source!="both"').drop(columns='source') pbar_profile = tqdm(dff.iterrows(), total=dff.shape[0], leave=False) for i, prof in pbar_profile: pbar_profile.set_description( ('Block: {}, profile: {}'.format(prof['block'], prof['profile']))) # Convert storm demand volume to a profile chainage (m) profile_volume = np.array([int(c) for c in col_names]) profile_chainage = np.array(prof[col_names], dtype=float) valid_idx = np.isfinite(profile_chainage) storm_demand_chainage = np.interp(storm_demand_volume, xp=profile_volume[valid_idx], fp=profile_chainage[valid_idx]) # Loop through years pbar_year = tqdm(output_years, leave=False) for year in pbar_year: pbar_year.set_description('Year: {}'.format(year)) # Create output dataframe df_out = df_in[['beach', 'block', 'profile']] # Add info on non-erodable sections df_out = df_out.assign(min_chainage=np.nan) for b, p, ch in zip(min_chainage['block'], min_chainage['profile'], min_chainage['chainage']): idx = (df_out['block'] == str(b)) & (df_out['profile'] == p) df_out.loc[idx, 'min_chainage'] = ch # Specify which segments to break df_out = df_out.assign(segment_gaps=False) for b, p, in zip(segment_gaps['block'], segment_gaps['profile']): idx = (df_out['block'] == str(b)) & (df_out['profile'] == p) df_out.loc[idx, 'segment_gaps'] = True # Specify which profiles to plot df_out = df_out.assign(diagnostics=False) for b, p in zip(diagnostics['block'], diagnostics['profile']): idx = (df_out['block'] == str(b)) & (df_out['profile'] == p) df_out.loc[idx, 'diagnostics'] = True # Specify which profiles to omit from shapefiles df_out = df_out.assign(omit=False) for b, p in zip(omit['block'], omit['profile']): idx = (df_out['block'] == b) & (df_out['profile'] == p) df_out.loc[idx, 'omit'] = True # Specify additional points to be included in shapefiles df_out = df_out.assign(insert_points='').astype('object') for b, p, x, y in zip(insert_points['block'], insert_points['profile'], insert_points['x'], insert_points['y']): idx = np.where((df_out['block'] == str(b)) & (df_out['profile'] == p) & (df_out['beach'] == beach_name))[0][0] if not df_out.loc[idx, 'insert_points']: df_out.loc[idx, 'insert_points'] = [] df_out.loc[idx, 'insert_points'].append((x, y)) df_out = df_out.assign(append_points='').astype('object') for b, p, x, y in zip(append_points['block'], append_points['profile'], append_points['x'], append_points['y']): idx = np.where((df_out['block'] == str(b)) & (df_out['profile'] == p) & (df_out['beach'] == beach_name))[0][0] if not df_out.loc[idx, 'append_points']: df_out.loc[idx, 'append_points'] = [] df_out.loc[idx, 'append_points'].append((x, y)) # Exit if no profiles found if (df_in['beach'] == beach_name).sum() == 0: raise KeyError('"{}" not found in {}'.format( beach_name, zsa_profile_file)) # Loop through EP values ep_cols = [] for ep in output_ep: # Add column for current EP ep_col_name = 'ep_{}'.format(ep) df_out.loc[:, ep_col_name] = 0 ep_cols.append(ep_col_name) # Combine recession and storm demand chainage # Subtract because chainages decrease landward chainage_with_recession = ( storm_demand_chainage[year >= years, :] - ongoing_recession[year >= years, :]) # Calculate maximum recession that has occured so far most_receeded_chainage = chainage_with_recession.min(axis=0) # Apply recession to current chainage (m) # Use negative exceedance probability, # because chainages increase seaward ch_out = get_value_from_ep(most_receeded_chainage, 1 - np.array(output_ep)) df_out.loc[i, ep_cols] = ch_out # Check if profile is non-erodable ch_min = df_out.loc[i, 'min_chainage'] if np.isfinite(ch_min): ch_out[ch_out < ch_min] = ch_min # Stop erosion past maximum limit df_out.loc[i, ep_cols] = ch_out # Calculate chainage for zero storm demand zero_chainage = interpolate.interp1d( profile_volume[valid_idx], profile_chainage[valid_idx], fill_value='extrapolate')(0) storm_demand_dist = zero_chainage - storm_demand_chainage # Include additional points for shapefile df_csv = df_out[df_out['beach'] == beach_name] # Save values for current beach csv_name = os.path.join( output_folder, '{} {} {}.csv'.format(beach_scenario, year, profile_type)) # Write header for csv file on first iteration if df_out[df_out['beach'] == beach_name].index[0] == i: df_csv.loc[[], :].to_csv(csv_name, index=False) # Only save points if they are going into shapefile df_csv = df_csv[~df_csv['omit'].astype(bool)] # Append data for current profile df_csv[df_csv.index == i].to_csv(csv_name, mode='a', index=False, header=False, float_format='%g') def main(): # Choose yaml file, if not already specified args = parse_args() if args.file is not None: param_files = [args.file] else: yaml_files = [f for f in os.listdir('.') if f.endswith('.yaml')] # Choose file, unless 'all' option specified if not args.all: param_files = choose(yaml_files) else: param_files = yaml_files # Loop through beaches pbar_beach = tqdm(param_files) for p in pbar_beach: params = read_parameter_file(p) pbar_beach.set_description('Beach: {}'.format(params['beach_name'])) process(**params) print('\n') if __name__ == '__main__': main()