Initial commit of forecasted TWLs function

master
Chris Leaman 6 years ago
parent 5159239b66
commit 91f8b8bae6

@ -0,0 +1,267 @@
import logging.config
import os
from multiprocessing import Pool
import numpy as np
import numpy.ma as ma
import pandas as pd
from scipy import stats
from src.analysis.runup_models import sto06_individual, sto06
logging.config.fileConfig('./src/logging.conf', disable_existing_loggers=False)
logger = logging.getLogger(__name__)
def forecast_twl(df_tides, df_profiles, df_waves, df_profile_features, runup_function, n_processes=4,
slope='foreshore'):
# Use df_waves as a base
df_twl = df_waves.copy()
# Merge tides
logger.info('Merging tides')
df_twl = df_twl.merge(df_tides, left_index=True, right_index=True)
# Estimate foreshore slope. Do the analysis per site_id. This is so we only have to query the x and z
# cross-section profiles once per site.
logger.info('Calculating beach slopes')
site_ids = df_twl.index.get_level_values('site_id').unique()
# site_ids = [x for x in site_ids if 'NARRA' in x] # todo remove this - for testing narrabeen only
if slope == 'foreshore':
# Process each site_id with a different process and combine results at the end
with Pool(processes=n_processes) as pool:
results = pool.starmap(foreshore_slope_for_site_id,
[(site_id, df_twl, df_profiles) for site_id in site_ids])
df_twl['beta'] = pd.concat(results)
elif slope == 'mean':
# todo mean beach profile
df_temp = df_twl.join(df_profile_features, how='inner')
df_temp['mhw'] = 0.5
with Pool(processes=n_processes) as pool:
results = pool.starmap(mean_slope_for_site_id,
[(site_id, df_temp, df_profiles, 'dune_toe_z', 'mhw') for site_id in site_ids])
df_twl['beta'] = pd.concat(results)
# Estimate runup
R2, setup, S_total, S_inc, S_ig = runup_function(df_twl, Hs0_col='Hs0', Tp_col='Tp', beta_col='beta')
df_twl['R2'] = R2
df_twl['setup'] = setup
df_twl['S_total'] = S_total
# Estimate TWL
df_twl['R_high'] = df_twl['tide'] + df_twl['R2']
df_twl['R_low'] = df_twl['tide'] + 1.1 * df_twl['setup'] - 1.1 / 2 * df_twl['S_total']
# Drop unneeded columns
df_twl.drop(columns=['E', 'Exs', 'P', 'Pxs', 'dir'], inplace=True, errors='ignore')
return df_twl
def mean_slope_for_site_id(site_id, df_twl, df_profiles, top_elevation_col, btm_elevation_col):
"""
Calculates the foreshore slope values a given site_id. Returns a series (with same indicies as df_twl) of
foreshore slopes. This function is used to parallelize getting foreshore slopes as it is computationally
expensive, given the need to iterate for the foreshore slope.
:param site_id:
:param df_twl:
:param df_profiles:
:return: A dataframe with slope values calculated
"""
# Get the prestorm beach profile
profile = df_profiles.query("site_id =='{}' and profile_type == 'prestorm'".format(site_id))
profile_x = profile.index.get_level_values('x').tolist()
profile_z = profile.z.tolist()
df_twl_site = df_twl.query("site_id == '{}'".format(site_id))
df_beta = df_twl_site.apply(lambda row: slope_from_profile(profile_x=profile_x, profile_z=profile_z,
top_elevation=row[top_elevation_col],
btm_elevation=row[btm_elevation_col],
method='end_points'), axis=1)
return df_beta
def foreshore_slope_for_site_id(site_id, df_twl, df_profiles):
"""
Calculates the foreshore slope values a given site_id. Returns a series (with same indicies as df_twl) of
foreshore slopes. This function is used to parallelize getting foreshore slopes as it is computationally
expensive, given the need to iterate for the foreshore slope.
:param site_id:
:param df_twl:
:param df_profiles:
:return: A dataframe with slope values calculated
"""
# Get the prestorm beach profile
profile = df_profiles.query("site_id =='{}' and profile_type == 'prestorm'".format(site_id))
profile_x = profile.index.get_level_values('x').tolist()
profile_z = profile.z.tolist()
df_twl_site = df_twl.query("site_id == '{}'".format(site_id))
df_beta = df_twl_site.apply(lambda row: foreshore_slope_from_profile(profile_x=profile_x, profile_z=profile_z,
tide=row.tide,
runup_function=sto06_individual,
Hs0=row.Hs0,
Tp=row.Tp), axis=1)
return df_beta
def foreshore_slope_from_profile(profile_x, profile_z, tide, runup_function, **kwargs):
"""
Returns the foreshore slope given the beach profile, water level (tide) and runup_function. Since foreshore slope is
dependant on the setup elevation and swash magnitude, which in tern is dependant on the foreshore slope, the process
requires iteration to solve.
:param profile_x:
:param profile_z:
:param tide:
:param runup_function: The name of a function which will return runup values (refer to runup_models.py)
:param kwargs: Additional keyword arguments which will be passed to the runup_function (usually Hs0, Tp).
:return:
"""
# Sometimes there is no tide value for a record, so return None
if np.isnan(tide):
return None
# Initalize estimates
max_number_iterations = 20
iteration_count = 0
min_accuracy = 0.001
beta = 0.05
while True:
R2, setup, S_total, _, _ = runup_function(beta=beta, **kwargs)
beta_new = slope_from_profile(profile_x=profile_x, profile_z=profile_z, method='end_points',
top_elevation=tide + setup + S_total / 2,
btm_elevation=tide + setup - S_total / 2)
# Return None if we can't find a slope, usually because the elevations we've specified are above/below our
# profile x and z coordinates.
if beta_new is None:
return None
# If slopes do not change much between interactions, return the slope
if abs(beta_new - beta) < min_accuracy:
return beta
# If we can't converge a solution, return None
if iteration_count > max_number_iterations:
return None
beta = beta_new
iteration_count += 1
def slope_from_profile(profile_x, profile_z, top_elevation, btm_elevation, method='end_points'):
"""
Returns a slope (beta) from a bed profile, given the top and bottom elevations of where the slope should be taken.
:param x: List of x bed profile coordinates
:param z: List of z bed profile coordinates
:param top_elevation: Top elevation of where to take the slope
:param btm_elevation: Bottom elevation of where to take the slope
:param method: Method used to calculate slope (end_points or least_squares)
:return:
"""
# Need all data to get the slope
if any([x is None for x in [profile_x, profile_z, top_elevation, btm_elevation]]):
return None
end_points = {
'top': {
'z': top_elevation,
},
'btm': {
'z': btm_elevation,
}}
for end_type in end_points.keys():
elevation = end_points[end_type]['z']
intersection_x = crossings(profile_x, profile_z, elevation)
# No intersections found
if len(intersection_x) == 0:
return None
# One intersection
elif len(intersection_x) == 1:
end_points[end_type]['x'] = intersection_x[0]
# More than on intersection
else:
if end_type == 'top':
# For top elevation, take most seaward intersection
end_points[end_type]['x'] = intersection_x[-1]
else:
# For bottom elevation, take most landward intersection that is seaward of top elevation
end_points[end_type]['x'] = [x for x in intersection_x if x > end_points['top']['x']][0]
if method == 'end_points':
x_top = end_points['top']['x']
x_btm = end_points['btm']['x']
z_top = end_points['top']['z']
z_btm = end_points['btm']['z']
return -(z_top - z_btm) / (x_top - x_btm)
elif method == 'least_squares':
profile_mask = [True if end_points['top']['x'] < pts < end_points['btm']['x'] else False for pts in x]
slope_x = np.array(profile_x)[profile_mask].tolist()
slope_z = np.array(profile_z)[profile_mask].tolist()
slope, _, _, _, _ = stats.linregress(slope_x, slope_z)
return -slope
def crossings(profile_x, profile_z, constant_z):
"""
Finds the x coordinate of a z elevation for a beach profile. Much faster than using shapely to calculate
intersections since we are only interested in intersections of a constant value. Will return multiple
intersections if found. Used in calculating beach slope.
Adapted from https://stackoverflow.com/a/34745789
:param profile_x: List of x coordinates for the beach profile section
:param profile_z: List of z coordinates for the beach profile section
:param constant_z: Float of the elevation to find corresponding x coordinates
:return: List of x coordinates which correspond to the constant_z
"""
# Remove nans to suppress warning messages
valid = ~ma.masked_invalid(profile_z).mask
profile_z = np.array(profile_z)[valid]
profile_x = np.array(profile_x)[valid]
# Normalize the 'signal' to zero.
# Use np.subtract rather than a list comprehension for performance reasons
z = np.subtract(profile_z, constant_z)
# Find all indices right before any crossing.
indicies = np.where(z[:-1] * z[1:] < 0)[0]
# Use linear interpolation to find intersample crossings.
return [profile_x[i] - (profile_x[i] - profile_x[i + 1]) / (z[i] - z[i + 1]) * (z[i]) for i in indicies]
if __name__ == '__main__':
logger.info('Importing data')
data_folder = './data/interim'
df_waves = pd.read_csv(os.path.join(data_folder, 'waves.csv'), index_col=[0, 1])
df_tides = pd.read_csv(os.path.join(data_folder, 'tides.csv'), index_col=[0, 1])
df_profiles = pd.read_csv(os.path.join(data_folder, 'profiles.csv'), index_col=[0, 1, 2])
df_sites = pd.read_csv(os.path.join(data_folder, 'sites.csv'), index_col=[0])
df_profile_features = pd.read_csv(os.path.join(data_folder, 'profile_features.csv'), index_col=[0])
logger.info('Forecasting TWL')
df_twl_foreshore_slope_sto06 = forecast_twl(df_tides, df_profiles, df_waves, df_profile_features,
runup_function=sto06, slope='foreshore')
df_twl_foreshore_slope_sto06.to_csv(os.path.join(data_folder, 'twl_foreshore_slope_sto06.csv'))
df_twl_mean_slope_sto06 = forecast_twl(df_tides, df_profiles, df_waves, df_profile_features,
runup_function=sto06, slope='mean')
df_twl_mean_slope_sto06.to_csv(os.path.join(data_folder, 'twl_mean_slope_sto06.csv'))
logger.info('Done')

@ -5,7 +5,6 @@ import fiona
import numpy as np
import pandas as pd
import pyproj
from scipy import stats
from shapely.geometry import LineString, Point
from shapely.geometry import shape
from shapely.ops import transform
@ -45,61 +44,6 @@ def convert_coord_systems(g1, in_coord_system='EPSG:4326', out_coord_system='EPS
return g2
def get_slope(x, z, top_elevation, btm_elevation, method='end_points'):
"""
Returns a slope from a bed profile
:param x: List of x bed profile coordinates
:param z: List of z bed profile coordinates
:param top_elevation: Top elevation of where to take the slope
:param btm_elevation: Bottom elevation of where to take the slope
:param method: Method used to calculate slope (end_points or least_squares)
:return:
"""
profile_line = LineString([[x, z] for x, z in zip(x, z)])
end_points = {
'top': {
'elevation': top_elevation,
},
'btm': {
'elevation': btm_elevation,
}}
# If there are multiple intersections, take most seaward of landward point?
multiple_points = 'landward'
for end_type in end_points.keys():
elevation = end_points[end_type]['elevation']
elevation_line = LineString([[min(x), elevation], [max(x), elevation]])
intersection_points = profile_line.intersection(elevation_line)
if intersection_points.is_empty:
return None
if multiple_points == 'landward':
intersection_point = list(intersection_points)[0]
elif multiple_points == 'seaward':
intersection_point = list(intersection_points)[-1]
end_points[end_type]['x'] = intersection_point.xy[0][0]
end_points[end_type]['z'] = intersection_point.xy[1][0]
if method == 'end_points':
x_top = end_points['top']['x']
x_btm = end_points['btm']['x']
z_top = end_points['top']['z']
z_btm = end_points['btm']['z']
return -(z_top - z_btm) / (x_top - x_btm)
elif method == 'least_squares':
profile_mask = [True if end_points['top']['x'] < pts < end_points['btm']['x'] else False for pts in x]
profile_x = np.array(x)[profile_mask].tolist()
profile_z = np.array(z)[profile_mask].tolist()
slope, _, _, _, _ = stats.linregress(profile_x, profile_z)
return -slope
def distance_to_intersection(lat, lon, landward_orientation, beach, line_strings, line_properties):
"""
Returns the distance at whjch a line drawn from a lat/lon at an orientation intersects a line stinrg

Loading…
Cancel
Save