You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
roches-probabilistic-hazard.../probabilistic-analysis/probabilistic_assessment.py

623 lines
23 KiB
Python

# Performs probabilstic assessment on *.yaml files,
# and saves output shoreline chainages to csv
import os
import yaml
import argparse
import numpy as np
import pandas as pd
import matplotlib
import matplotlib.patheffects
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from scipy.optimize import curve_fit
from scipy import interpolate
from tqdm import tqdm
def choose(options):
"""
Get the user to select one (or all) items from a list of options.
Args:
x (list): list of options
Returns:
the selected option(s) in a list
"""
options.append('All')
for i, option in enumerate(options):
print('{:3d}) {:s}'.format(i + 1, option))
while True:
try:
val = input("Choose> ")
if int(val) == len(options):
selected = options[:-1]
else:
selected = [options[int(val) - 1]]
return selected
break
except (ValueError, IndexError):
print("Invalid input, please try again")
def parse_args():
"""
Check if a parameter file was provided at the command prompt
Returns:
the parsed input arguments in a dict
"""
parser = argparse.ArgumentParser(usage=__doc__)
parser.add_argument('-f',
'--file',
help='name of parameter file',
default=None)
parser.add_argument('-a',
'--all',
help='process all *.yaml files in folder',
action='store_true')
return parser.parse_args()
def read_parameter_file(fname):
"""
Read yaml file
Args:
fname (str): name of yaml file
Returns:
the file contents in a dict
"""
with open(fname, 'r') as stream:
try:
params = yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc, "\nCheck the formatting of the yaml file.")
return params
def ari_to_aep(x):
"""
Convert annual recurrance interval to annual exceedance probability.
Args:
x (array_like): input array
Returns:
the calculated AEP
"""
return 1 - np.exp(-1 / x)
def aep_to_ari(x):
"""
Convert annual exceedance probability to annual recurrance interval.
Args:
x (array_like): input array
Returns:
the calculated ARI
"""
return -1 / np.log(1 - x)
def get_value_from_ep(x, p):
"""
Calculate percentile value from array based on exceedance probability.
Args:
x (array_like): input array
p (float): probability (e.g. p=0.05 for 5% exceedance)
Returns:
the calculated percentile value
"""
return np.percentile(x, (1 - p) * 100)
def get_cumulative_distribution(x, min_ep=1e-5, num=1000):
"""
Calculate cumulative distrubution function of a random variable.
Args:
x (array_like): input array
min_ep (float): minimum exceedance probability
num (int): number of points to calculate distribution over
Returns:
the cumulative distribution for plotting, e.g.
plt.plot(exceedance_probability, value)
"""
exceedance_probability = np.logspace(np.log10(min_ep), 0, num)
value = get_value_from_ep(x, exceedance_probability)
return exceedance_probability, value
def constrained_log_fit(x, p1, p2):
"""
Fit log curve based on AEP values. Gordon (1987) provides two cases:
1. Low demand, open beaches:
p1 = 5
p2 = 30
2. High demand, rip heads
p1 = 40
p2 = 40
Args:
x (array_like): input array of AEP values
p1 (float): parameter 1
p2 (float): parameter 2
Returns:
the fitted values for the given AEPs
"""
ari = aep_to_ari(x)
return p1 + p2 * np.log(ari)
def get_ongoing_recession(n_runs, start_year, end_year, sea_level_rise,
bruun_factor, underlying_recession):
"""
Use Monte Carlo simulation to calculate ongoing shoreline recession.
Args:
n_runs (int): number of runs
start_year (int): first year of model
end_year (int): last year of model
sea_level_rise (dict):
'year' (array_like): years
'min' (array_like): minimum value
'mode' (array_like): most likely value
'max' (array_like): maximum value
bruun_factor (dict):
'min' (float): minimum value
'mode' (float): most likely value
'max' (float): maximum value
underlying_recession (dict):
'min' (float): minimum value
'mode' (float): most likely value
'max' (float): maximum value
Returns:
the simulated recession distance (m)
"""
# Get time interval from input file
years = np.arange(start_year, end_year + 1)
n_years = len(years)
# Interpolate sea level rise projections (m)
slr_mode = np.interp(years,
xp=sea_level_rise['year'],
fp=sea_level_rise['mode'])[:, np.newaxis]
try:
slr_min = np.interp(years,
xp=sea_level_rise['year'],
fp=sea_level_rise['min'])[:, np.newaxis]
except ValueError:
# Use mode for deterministic beaches
slr_min = slr_mode
try:
slr_max = np.interp(years,
xp=sea_level_rise['year'],
fp=sea_level_rise['max'])[:, np.newaxis]
except ValueError:
# Use mode for deterministic beaches
slr_max = slr_mode
# Initialise sea level rise array
slr = np.zeros([n_years, n_runs])
for i in range(n_years):
# Use triangular distribution for SLR in each year (m)
try:
slr[i, :] = np.random.triangular(left=slr_min[i],
mode=slr_mode[i],
right=slr_max[i],
size=n_runs)
except ValueError:
# Use constant value if slr_min == slr_max
slr[i, :] = np.ones([1, n_runs]) * slr_mode[i]
# Sort each row, so SLR follows a smooth trajectory for each model run
slr.sort(axis=1)
# Shuffle columns, so the order of model runs is randomised
np.random.shuffle(slr.T)
# Shift sea level so it is zero in the start year
slr -= slr[0, :].mean(axis=0)
# Simulate probabilistic Bruun factors (-)
if (bruun_factor['min'] < bruun_factor['mode'] < bruun_factor['max']):
# Use probabilistic method if min and max are provided
bruun_factor = np.random.triangular(left=bruun_factor['min'],
mode=bruun_factor['mode'],
right=bruun_factor['max'],
size=n_runs)
else:
# Ensure values were not given in reverse order
if bruun_factor['min'] > bruun_factor['mode']:
raise ValueError(
"bruun_factor['min'] must be less than bruun_factor['mode']")
if bruun_factor['mode'] > bruun_factor['max']:
raise ValueError(
"bruun_factor['mode'] must be less than bruun_factor['max']")
# Use deterministic method if only mode is provided
bruun_factor = np.ones([1, n_runs]) * bruun_factor['mode']
# Simulate probabilistic underlying recession rate (m/y)
if (underlying_recession['min'] < underlying_recession['mode'] <
underlying_recession['max']):
# Use probabilistic method if min and max are provided
underlying_recession_rate = np.random.triangular(
left=underlying_recession['min'],
mode=underlying_recession['mode'],
right=underlying_recession['max'],
size=n_runs)
else:
# Ensure values were not given in reverse order
if underlying_recession['min'] > underlying_recession['mode']:
raise ValueError(("underlying_recession['min'] must be "
"less than underlying_recession['mode']"))
if underlying_recession['mode'] > underlying_recession['max']:
raise ValueError(("underlying_recession['mode'] must be "
"less than underlying_recession['max']"))
# Use deterministic method if only mode is provided
underlying_recession_rate = np.ones([1, n_runs
]) * underlying_recession['mode']
# Repeat Bruun factors for each year
bruun_factor = np.tile(bruun_factor, [n_years, 1])
# Calculate total underlying recession
year_factor = np.arange(1, n_years + 1)[:, np.newaxis]
underlying_recession = underlying_recession_rate * year_factor
underlying_recession_rate = np.tile(underlying_recession_rate,
[n_years, 1])
# Remove probabilistic component from start year
slr[0, :] = slr[0, :].mean()
underlying_recession[0, :] = underlying_recession[0, :].mean()
bruun_factor[0, :] = bruun_factor[0, :].mean()
underlying_recession_rate[0, :] = underlying_recession_rate[0, :].mean()
# Calculate total ongoing recession (m)
ongoing_recession = slr * bruun_factor + underlying_recession
return (ongoing_recession, slr, bruun_factor, underlying_recession,
underlying_recession_rate)
def get_storm_demand_volume(ref_aep, ref_vol, n, mode='fit'):
"""
Calculate storm demand volume, either by fitting a log curve to the
supplied values (mode='fit'), or by simulating storm events with a
poisson distribution (mode='simulate').
Note: 'simulate' mode tends to overestimate storm demand for high ARIs
to account for storm clustering.
Args:
ref_aep (array_like): input AEP values
ref_vol (array_like): input volumes (m3)
n (int): number of model runs
mode (str): 'fit' (default) or 'simulate'
Returns:
the storm demand volume (m3/m)
"""
if mode == 'fit':
# Fit curve based on reference storm demand volumes
p, _ = curve_fit(constrained_log_fit, ref_aep, ref_vol, p0=(5., 30.))
rand_aep = np.random.rand(n)
volume = constrained_log_fit(rand_aep, *p)
volume[volume < 0] = 0
elif mode == 'simulate':
# Simulate number of storms of each ARI magnitude
lam = np.tile(ref_aep[:, np.newaxis], [1, n])
n_storms = np.random.poisson(lam)
# Calculate storm demand volume (m3/m)
vol_storms = ref_vol[:, np.newaxis] * n_storms
volume = vol_storms.sum(axis=0)
else:
raise ValueError("mode must be 'fit' or 'simulate'")
return volume
def process(beach_name, beach_scenario, n_runs, start_year, end_year,
output_years, output_ep, zsa_profile_file, zrfc_profile_file,
output_folder, figure_folder, sea_level_rise, bruun_factor,
underlying_recession, storm_demand, diagnostics, omit,
min_chainage, segment_gaps, insert_points, append_points):
"""
Calculate probabilistic shorelines using Monte Carlo simulation
Args:
beach_name (str): name of beach in photogrammetry database
beach_scenario (str): name for saved csv profile chainage file
n_runs (int): number of runs
start_year (int): first year of model
end_year (int): last year of model
output_years (list): years to save profiles
output_ep (list): EP values for saved profiles
zsa_profile_file (str): path to storm demand vs chainge data (ZSA)
zrfc_profile_file (str): path to storm demand vs chainge data (ZRFC)
output_folder (str): where to save profiles
sea_level_rise (dict):
'year' (list): years
'min' (list): minimum value
'mode' (list): most likely value
'max' (list): maximum value
bruun_factor (dict):
'min' (float): minimum value
'mode' (float): most likely value
'max' (float): maximum value
underlying_recession (dict):
'min' (float): minimum value
'mode' (float): most likely value
'max' (float): maximum value
diagnostics (dict):
'block' (list): block IDs for outputting diagnostics
'profile' (list): profile IDs for outputting diagnostics
omit (dict):
'block' (list): block IDs to omit from analysis
'profile' (list): profile IDs to omit from analysis
min_chainage (dict):
'block' (list): block IDs for trimming chainage
'profile' (list): profile IDs for trimming chainage
'chainage' (list): minimum chainage, at non-erodable barrier
segment_gaps (dict):
'block' (list): block IDs for breaking segment
'profile' (list): profile IDs for breaking segments
insert_points (dict):
'before' (dict): points to add to (south/west) end of hazard lines
'x' (list): eastings
'y' (list): northings
append_points (dict): points to add to (north/east) end of lines
'x' (list): eastings
'y' (list): nortings
"""
# Get reference storm demand (m3/m)
ref_ari = np.asarray(storm_demand['ari'])
ref_aep = ari_to_aep(ref_ari)
ref_vol = storm_demand['vol']
# Get year index
years = np.arange(start_year, end_year + 1)
# Check if beach is probabilistic
if n_runs == 1:
probabilistic = False
else:
probabilistic = True
# Simulate ongoing shoreline recession
r, slr, bf, ur, ur_rate = get_ongoing_recession(n_runs, start_year,
end_year, sea_level_rise,
bruun_factor,
underlying_recession)
ongoing_recession = r.copy()
# Pre-allocate storm demand volume for each year (m3/m)
storm_demand_volume = np.zeros([len(years), n_runs])
if probabilistic:
for i in range(len(years)):
# Generate synthetic storm demands for each year
storm_demand_volume[i, :] = get_storm_demand_volume(ref_aep,
ref_vol,
n=n_runs,
mode='fit')
else:
# Get storm demand for 1% AEP event
sort_idx = np.argsort(ref_aep)
storm_demand_volume[:] = np.interp(output_ep,
np.array(ref_aep)[sort_idx],
np.array(ref_vol)[sort_idx])
# Load profile data for current beach
pbar_profiles = tqdm(['ZSA', 'ZRFC'], leave=False)
for profile_type in pbar_profiles:
pbar_profiles.set_description('{}'.format(profile_type))
if profile_type == 'ZSA':
df_in = pd.read_csv(zsa_profile_file)
if profile_type == 'ZRFC':
df_in = pd.read_csv(zrfc_profile_file)
col_names = [c for c in df_in.columns if c.isdigit()]
# Loop through profiles
dff = df_in[df_in['beach'] == beach_name]
# Remove omitted profiles
dff = pd.merge(
pd.DataFrame(omit), dff, how='outer',
indicator='source').query('source!="both"').drop(columns='source')
pbar_profile = tqdm(dff.iterrows(), total=dff.shape[0], leave=False)
for i, prof in pbar_profile:
pbar_profile.set_description(
('Block: {}, profile: {}'.format(prof['block'],
prof['profile'])))
# Convert storm demand volume to a profile chainage (m)
profile_volume = np.array([int(c) for c in col_names])
profile_chainage = np.array(prof[col_names], dtype=float)
valid_idx = np.isfinite(profile_chainage)
storm_demand_chainage = np.interp(storm_demand_volume,
xp=profile_volume[valid_idx],
fp=profile_chainage[valid_idx])
# Loop through years
pbar_year = tqdm(output_years, leave=False)
for year in pbar_year:
pbar_year.set_description('Year: {}'.format(year))
# Create output dataframe
df_out = df_in[['beach', 'block', 'profile']]
# Add info on non-erodable sections
df_out = df_out.assign(min_chainage=np.nan)
for b, p, ch in zip(min_chainage['block'],
min_chainage['profile'],
min_chainage['chainage']):
idx = (df_out['block'] == str(b)) & (df_out['profile']
== p)
df_out.loc[idx, 'min_chainage'] = ch
# Specify which segments to break
df_out = df_out.assign(segment_gaps=False)
for b, p, in zip(segment_gaps['block'],
segment_gaps['profile']):
idx = (df_out['block'] == str(b)) & (df_out['profile']
== p)
df_out.loc[idx, 'segment_gaps'] = True
# Specify which profiles to plot
df_out = df_out.assign(plot_stats=False)
for b, p in zip(plot_stats['block'], plot_stats['profile']):
idx = (df_out['block'] == str(b)) & (df_out['profile']
== p)
df_out.loc[idx, 'plot_stats'] = True
# Specify which profiles to omit from shapefiles
df_out = df_out.assign(omit_from_shp=False)
for b, p in zip(omit_from_shp['block'],
omit_from_shp['profile']):
idx = (df_out['block'] == b) & (df_out['profile'] == p)
df_out.loc[idx, 'omit_from_shp'] = True
# Specify additional points to be included in shapefiles
df_out = df_out.assign(insert_points='').astype('object')
for b, p, x, y in zip(insert_points['block'],
insert_points['profile'],
insert_points['x'], insert_points['y']):
idx = np.where((df_out['block'] == str(b))
& (df_out['profile'] == p)
& (df_out['beach'] == beach_name))[0][0]
if not df_out.loc[idx, 'insert_points']:
df_out.loc[idx, 'insert_points'] = []
df_out.loc[idx, 'insert_points'].append((x, y))
df_out = df_out.assign(append_points='').astype('object')
for b, p, x, y in zip(append_points['block'],
append_points['profile'],
append_points['x'], append_points['y']):
idx = np.where((df_out['block'] == str(b))
& (df_out['profile'] == p)
& (df_out['beach'] == beach_name))[0][0]
if not df_out.loc[idx, 'append_points']:
df_out.loc[idx, 'append_points'] = []
df_out.loc[idx, 'append_points'].append((x, y))
# Exit if no profiles found
if (df_in['beach'] == beach_name).sum() == 0:
raise KeyError('"{}" not found in {}'.format(
beach_name, zsa_profile_file))
# Loop through EP values
ep_cols = []
for ep in output_ep:
# Add column for current EP
ep_col_name = 'ep_{}'.format(ep)
df_out.loc[:, ep_col_name] = 0
ep_cols.append(ep_col_name)
# Combine recession and storm demand chainage
# Subtract because chainages decrease landward
chainage_with_recession = (
storm_demand_chainage[year >= years, :] -
ongoing_recession[year >= years, :])
# Calculate maximum recession that has occured so far
most_receeded_chainage = chainage_with_recession.min(axis=0)
# Apply recession to current chainage (m)
# Use negative exceedance probability,
# because chainages increase seaward
ch_out = get_value_from_ep(most_receeded_chainage,
1 - np.array(output_ep))
df_out.loc[i, ep_cols] = ch_out
# Check if profile is non-erodable
ch_min = df_out.loc[i, 'min_chainage']
if np.isfinite(ch_min):
ch_out[ch_out < ch_min] = ch_min
# Stop erosion past maximum limit
df_out.loc[i, ep_cols] = ch_out
# Calculate chainage for zero storm demand
zero_chainage = interpolate.interp1d(
profile_volume[valid_idx],
profile_chainage[valid_idx],
fill_value='extrapolate')(0)
storm_demand_dist = zero_chainage - storm_demand_chainage
# Include additional points for shapefile
df_csv = df_out[df_out['beach'] == beach_name]
# Save values for current beach
csv_name = os.path.join(
output_folder,
'{} {} {}.csv'.format(beach_scenario, year, profile_type))
# Write header for csv file on first iteration
if df_out[df_out['beach'] == beach_name].index[0] == i:
df_csv.loc[[], :].to_csv(csv_name, index=False)
# Only save points if they are going into shapefile
df_csv = df_csv[~df_csv['omit'].astype(bool)]
# Append data for current profile
df_csv[df_csv.index == i].to_csv(csv_name,
mode='a',
index=False,
header=False,
float_format='%g')
def main():
# Choose yaml file, if not already specified
args = parse_args()
if args.file is not None:
param_files = [args.file]
else:
yaml_files = [f for f in os.listdir('.') if f.endswith('.yaml')]
# Choose file, unless 'all' option specified
if not args.all:
param_files = choose(yaml_files)
else:
param_files = yaml_files
# Loop through beaches
pbar_beach = tqdm(param_files)
for p in pbar_beach:
params = read_parameter_file(p)
pbar_beach.set_description('Beach: {}'.format(params['beach_name']))
process(**params)
print('\n')
if __name__ == '__main__':
main()