import os import json import yaml import numpy as np import pandas as pd from scipy.optimize import curve_fit fname = 'adopted-input-values.xlsx' output_path = '../probabilistic-analysis' # Load input file df = pd.read_excel(fname) # Remove calculation columns (headers that begin with #) df = df.drop([c for c in df.columns if c.startswith('#')], axis=1) # Parse lists and other objects in dataframe for col_name in [c for c in df.columns if df[c].dtype == 'O']: parsed_vals = [] for i, cell_contents in enumerate(df[col_name]): if type(cell_contents) is not str: parsed_vals.append([]) else: try: parsed_vals.append(json.loads(cell_contents)) except json.decoder.JSONDecodeError as e: raise Exception( ('\n\nInvalid JSON string in column "{}" on row {}: {}' '\nEnsure brackets are matched, and strings are ' 'double-quoted, e.g. ["A", "B", "C"]\n').format( col_name, i + 2, cell_contents)).with_traceback(e.__traceback__) df[col_name] = parsed_vals df = df.rename(columns={col_name: col_name.replace('#list', '')}) parameters = { 'beach_scenario': '# Beach scenrio name including north/south or other special conditions', 'beach_name': '# Beach name as it appears in photgrammetry database', 'n_runs': '# Number of runs for Monte Carlo simulation', 'start_year': '# Beginning of period for simulation', 'end_year': '# End of period for simulation', 'output_years': '# Years used for reporting', 'output_ep': '# Encounter probability values for reporting', 'output_folder': '# Path to output folder', 'figure_folder': '# Path to figure folder', 'zsa_profile_file': '# Path to storm demand file (zone of slope adjustment)', 'zrfc_profile_file': '# Path to storm demand file (zone of reduced foundation capacity)', 'sea_level_rise': '# Projected sea levels (m)', 'storm_demand': '# Storm demand annual recurrance intervals (years) and volumes (m3/m)', 'bruun_factor': '# Bruun factor (-)', 'underlying_recession': '# Underlying shoreline recession rate (m/year)', 'diagnostics': '# Choose profiles to output probabilistic diagnostics', 'omit': '# Choose profiles to omit from analysis', 'min_chainage': '# Set minimum chainages for non-erodable sections of profiles', 'segment_gaps': '# Break shoreline into multiple segments (with a gap at this profile)', 'insert_points': '# Insert points before specific profile', 'append_points': '# Add points after specific profile', } def gordon_log_fit(ari, p1, p2): """ Fit log curve based on ARI values. Gordon (1987) provides two cases: 1. Low demand, open beaches: p1 = 5 p2 = 30 2. High demand, rip heads p1 = 40 p2 = 40 Args: ari (array_like): input array of ARI values p1 (float): parameter 1 p2 (float): parameter 2 Returns: the fitted values for the given ARIs """ return p1 + p2 * np.log(ari) def get_storm_demand(x): """ Get storm demand based on a 100 year ARI storm demand volume. Args: x (float): storm demand volume for 100 year ARI event Returns: storm demand volumes for 1, 10, 100, 1000 year ARI events """ if x > 140: # Repeat values to supress scipy covariance warning ref_ari = np.array([0.03, 0.03, 100, 100]) ref_vol = np.array([-100, -100, x, x]) else: # Repeat values to supress scipy covariance warning ref_ari = np.array([1, 1, 100, 100]) ref_vol = np.array([1, 1, x, x]) ari = np.array([1, 10, 100, 1000]) try: p, _ = curve_fit(gordon_log_fit, ref_ari, ref_vol, p0=(5., 30.)) vol = gordon_log_fit(ari, *p) except ValueError: vol = ari * np.nan return ari, vol # Calculate storm demand volumes df = df.assign(storm_demand_ari=0).astype('object') df = df.assign(storm_demand_vol=0).astype('object') for i in range(len(df)): vol_ari_100 = df['storm_demand'][i] ari, vol = get_storm_demand(vol_ari_100) df.at[i, 'storm_demand_ari'] = ari.tolist() df.at[i, 'storm_demand_vol'] = vol.round(1).tolist() # Remove original storm demand column df = df.drop('storm_demand', axis=1) for i, row in df.sort_values(by='beach_scenario').iterrows(): output_file = os.path.join(output_path, row['beach_scenario'] + '.yaml') with open(output_file, 'w') as f: f.write('# {}\n'.format(row['beach_scenario'])) for parameter, description in parameters.items(): col_names = [c for c in df.columns if parameter in c] # Write parameter description f.write('\n{}\n'.format(description)) # Collect and write parameter values values = {c.split('_')[-1]: row[c] for c in col_names} if len(values) == 1: # Remove from dictionary if parameter just has one value values = list(values.values())[0] yaml.dump({parameter: values}, f, default_flow_style=False)