You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

332 lines
12 KiB
Python

import os
from multiprocessing import Pool
import click
import numpy as np
import numpy.ma as ma
import pandas as pd
from scipy import stats
from analysis import runup_models
from utils import setup_logging
logger = setup_logging()
MULTIPROCESS_THREADS = int(os.environ.get("MULTIPROCESS_THREADS", 4))
def forecast_twl(
df_tides,
df_profiles,
df_waves,
df_profile_features,
runup_function,
n_processes=MULTIPROCESS_THREADS,
slope="foreshore",
profile_type='prestorm'
):
# Use df_waves as a base
df_twl = df_waves.copy()
# Merge tides
logger.info("Merging tides")
df_twl = df_twl.merge(df_tides, left_index=True, right_index=True)
# Estimate foreshore slope. Do the analysis per site_id. This is so we only have to query the x and z
# cross-section profiles once per site.
logger.info("Calculating beach slopes")
site_ids = df_twl.index.get_level_values("site_id").unique()
if slope == "foreshore":
# Process each site_id with a different process and combine results at the end
with Pool(processes=n_processes) as pool:
results = pool.starmap(
foreshore_slope_for_site_id, [(site_id, df_twl, df_profiles) for site_id in site_ids]
)
df_twl["beta"] = pd.concat(results)
elif slope == "mean":
df_temp = df_twl.join(df_profile_features.query("profile_type=='{}'".format(profile_type)).reset_index(
level='profile_type')
, how="inner")
df_temp["mhw"] = 0.5
with Pool(processes=n_processes) as pool:
results = pool.starmap(
mean_slope_for_site_id, [(site_id, df_temp, df_profiles, "dune_toe_z", "dune_toe_x", "mhw") for
site_id in site_ids]
)
df_twl["beta"] = pd.concat(results)
# Estimate runup
R2, setup, S_total, S_inc, S_ig = runup_function(df_twl, Hs0_col="Hs0", Tp_col="Tp", beta_col="beta")
df_twl["R2"] = R2
df_twl["setup"] = setup
df_twl["S_total"] = S_total
# Estimate TWL
df_twl["R_high"] = df_twl["tide"] + df_twl["R2"]
df_twl["R_low"] = df_twl["tide"] + 1.1 * df_twl["setup"] - 1.1 / 2 * df_twl["S_total"]
# Drop unneeded columns
df_twl.drop(columns=["E", "Exs", "P", "Pxs", "dir"], inplace=True, errors="ignore")
return df_twl
def mean_slope_for_site_id(site_id, df_twl, df_profiles, top_elevation_col, top_x_col, btm_elevation_col,
profile_type='prestorm'):
"""
Calculates the foreshore slope values a given site_id. Returns a series (with same indicies as df_twl) of
foreshore slopes. This function is used to parallelize getting foreshore slopes as it is computationally
expensive, given the need to iterate for the foreshore slope.
:param site_id:
:param df_twl:
:param df_profiles:
:return: A dataframe with slope values calculated
"""
# Get the prestorm beach profile
profile = df_profiles.query("site_id =='{}' and profile_type == '{}'".format(site_id, profile_type))
profile_x = profile.index.get_level_values("x").tolist()
profile_z = profile.z.tolist()
df_twl_site = df_twl.query("site_id == '{}'".format(site_id))
df_beta = df_twl_site.apply(
lambda row: slope_from_profile(
profile_x=profile_x,
profile_z=profile_z,
top_elevation=row[top_elevation_col],
btm_elevation=row[btm_elevation_col],
method="end_points",
top_x= row[top_x_col]
),
axis=1,
)
return df_beta
def foreshore_slope_for_site_id(site_id, df_twl, df_profiles):
"""
Calculates the foreshore slope values a given site_id. Returns a series (with same indicies as df_twl) of
foreshore slopes. This function is used to parallelize getting foreshore slopes as it is computationally
expensive, given the need to iterate for the foreshore slope.
:param site_id:
:param df_twl:
:param df_profiles:
:return: A dataframe with slope values calculated
"""
# Get the prestorm beach profile
profile = df_profiles.query("site_id =='{}' and profile_type == 'prestorm'".format(site_id))
profile_x = profile.index.get_level_values("x").tolist()
profile_z = profile.z.tolist()
df_twl_site = df_twl.query("site_id == '{}'".format(site_id))
df_beta = df_twl_site.apply(
lambda row: foreshore_slope_from_profile(
profile_x=profile_x,
profile_z=profile_z,
tide=row.tide,
runup_function=runup_models.sto06_individual,
Hs0=row.Hs0,
Tp=row.Tp,
),
axis=1,
)
return df_beta
def foreshore_slope_from_profile(profile_x, profile_z, tide, runup_function, **kwargs):
"""
Returns the foreshore slope given the beach profile, water level (tide) and runup_function. Since foreshore slope is
dependant on the setup elevation and swash magnitude, which in tern is dependant on the foreshore slope, the process
requires iteration to solve.
:param profile_x:
:param profile_z:
:param tide:
:param runup_function: The name of a function which will return runup values (refer to runup_models.py)
:param kwargs: Additional keyword arguments which will be passed to the runup_function (usually Hs0, Tp).
:return:
"""
# Sometimes there is no tide value for a record, so return None
if np.isnan(tide):
return None
# Initalize estimates
max_number_iterations = 30
iteration_count = 0
averaged_accuracy = 0.03 # if slopes within this amount, average after max number of iterations
acceptable_accuracy = 0.01 # if slopes within this amount, accept after max number of iterations
preferred_accuracy = 0.001 # if slopes within this amount, accept
beta = 0.05
while True:
R2, setup, S_total, _, _ = runup_function(beta=beta, **kwargs)
beta_new = slope_from_profile(
profile_x=profile_x,
profile_z=profile_z,
method="end_points",
top_elevation=tide + setup + S_total / 2,
btm_elevation=tide + setup - S_total / 2,
)
# Return None if we can't find a slope, usually because the elevations we've specified are above/below our
# profile x and z coordinates.
if beta_new is None:
return None
# If slopes do not change much between interactions, return the slope
if abs(beta_new - beta) < preferred_accuracy:
return beta
# If we can't converge a solution, return None
if iteration_count > max_number_iterations:
if abs(beta_new - beta) < acceptable_accuracy:
return beta
elif abs(beta_new - beta) < averaged_accuracy:
return (beta_new + beta) / 2
else:
return None
beta = beta_new
iteration_count += 1
def slope_from_profile(profile_x, profile_z, top_elevation, btm_elevation, method="end_points", top_x=None, btm_x=None):
"""
Returns a slope (beta) from a bed profile, given the top and bottom elevations of where the slope should be taken.
:param x: List of x bed profile coordinates
:param z: List of z bed profile coordinates
:param top_elevation: Top elevation of where to take the slope
:param btm_elevation: Bottom elevation of where to take the slope
:param method: Method used to calculate slope (end_points or least_squares)
:param top_x: x-coordinate of the top end point. May be needed, as there may be multiple crossings of the
top_elevation.
:param btm_x: x-coordinate of the bottom end point
:return:
"""
# Need all data to get the slope
if any([x is None for x in [profile_x, profile_z, top_elevation, btm_elevation]]):
return None
end_points = {"top": {"z": top_elevation}, "btm": {"z": btm_elevation}}
for end_type in end_points.keys():
# Add x coordinates if they are specified
if top_x and end_type == 'top':
end_points['top']['x'] = top_x
continue
if btm_x and end_type == 'top':
end_points['btm']['x'] = btm_x
continue
elevation = end_points[end_type]["z"]
intersection_x = crossings(profile_x, profile_z, elevation)
# No intersections found
if len(intersection_x) == 0:
return None
# One intersection
elif len(intersection_x) == 1:
end_points[end_type]["x"] = intersection_x[0]
# More than on intersection
else:
if end_type == "top":
# For top elevation, take most seaward intersection
end_points[end_type]["x"] = intersection_x[-1]
else:
# For bottom elevation, take most landward intersection that is seaward of top elevation
end_point_btm = [x for x in intersection_x if x > end_points["top"]["x"]]
if len(end_point_btm) == 0:
# If there doesn't seem to be an intersection seaward of the top elevation, return none.
return None
else:
end_points[end_type]["x"] = end_point_btm[0]
if method == "end_points":
x_top = end_points["top"]["x"]
x_btm = end_points["btm"]["x"]
z_top = end_points["top"]["z"]
z_btm = end_points["btm"]["z"]
return -(z_top - z_btm) / (x_top - x_btm)
elif method == "least_squares":
profile_mask = [True if end_points["top"]["x"] < pts < end_points["btm"]["x"] else False for pts in profile_x]
slope_x = np.array(profile_x)[profile_mask].tolist()
slope_z = np.array(profile_z)[profile_mask].tolist()
slope, _, _, _, _ = stats.linregress(slope_x, slope_z)
return -slope
def crossings(profile_x, profile_z, constant_z):
"""
Finds the x coordinate of a z elevation for a beach profile. Much faster than using shapely to calculate
intersections since we are only interested in intersections of a constant value. Will return multiple
intersections if found. Used in calculating beach slope.
Adapted from https://stackoverflow.com/a/34745789
:param profile_x: List of x coordinates for the beach profile section
:param profile_z: List of z coordinates for the beach profile section
:param constant_z: Float of the elevation to find corresponding x coordinates
:return: List of x coordinates which correspond to the constant_z
"""
# Remove nans to suppress warning messages
valid = ~ma.masked_invalid(profile_z).mask
profile_z = np.array(profile_z)[valid]
profile_x = np.array(profile_x)[valid]
# Normalize the 'signal' to zero.
# Use np.subtract rather than a list comprehension for performance reasons
z = np.subtract(profile_z, constant_z)
# Find all indices right before any crossing.
# TODO Sometimes this can give a runtime warning https://stackoverflow.com/a/36489085
indicies = np.where(z[:-1] * z[1:] < 0)[0]
# Use linear interpolation to find intersample crossings.
return [profile_x[i] - (profile_x[i] - profile_x[i + 1]) / (z[i] - z[i + 1]) * (z[i]) for i in indicies]
@click.command()
@click.option("--waves-csv", required=True, help="")
@click.option("--tides-csv", required=True, help="")
@click.option("--profiles-csv", required=True, help="")
@click.option("--profile-features-csv", required=True, help="")
@click.option("--runup-function", required=True, help="", type=click.Choice(["sto06"]))
@click.option("--slope", required=True, help="", type=click.Choice(["foreshore", "mean"]))
@click.option("--profile-type", required=True, help="", type=click.Choice(["prestorm", "poststorm"]))
@click.option("--output-file", required=True, help="")
def create_twl_forecast(waves_csv, tides_csv, profiles_csv, profile_features_csv, runup_function, slope,
profile_type,output_file):
logger.info("Creating forecast of total water levels")
logger.info("Importing data")
df_waves = pd.read_csv(waves_csv, index_col=[0, 1])
df_tides = pd.read_csv(tides_csv, index_col=[0, 1])
df_profiles = pd.read_csv(profiles_csv, index_col=[0, 1, 2])
df_profile_features = pd.read_csv(profile_features_csv, index_col=[0,1])
logger.info("Forecasting TWL")
df_twl = forecast_twl(
df_tides,
df_profiles,
df_waves,
df_profile_features,
runup_function=getattr(runup_models, runup_function),
slope=slope,
profile_type=profile_type
)
df_twl.to_csv(output_file)
logger.info("Saved to %s", output_file)
logger.info("Done!")