You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
189 lines
7.2 KiB
Python
189 lines
7.2 KiB
Python
import logging.config
|
|
import os
|
|
import click
|
|
import numpy as np
|
|
import pandas as pd
|
|
from scipy.integrate import simps
|
|
|
|
logging.config.fileConfig("./src/logging.conf", disable_existing_loggers=False)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def return_first_or_nan(l):
|
|
"""
|
|
Returns the first value of a list if empty or returns nan. Used for getting dune/toe and crest values.
|
|
:param l:
|
|
:return:
|
|
"""
|
|
if len(l) == 0:
|
|
return np.nan
|
|
else:
|
|
return l[0]
|
|
|
|
|
|
def volume_change(df_profiles, df_profile_features, zone):
|
|
"""
|
|
Calculates how much the volume change there is between prestrom and post storm profiles.
|
|
:param df_profiles:
|
|
:param df_profile_features:
|
|
:param zone: Either 'swash' or 'dune_face'
|
|
:return:
|
|
"""
|
|
logger.info("Calculating change in beach volume in {} zone".format(zone))
|
|
|
|
df_vol_changes = pd.DataFrame(index=df_profile_features.index)
|
|
df_profiles = df_profiles.sort_index()
|
|
sites = df_profiles.groupby(level=["site_id"])
|
|
|
|
for site_id, df_site in sites:
|
|
logger.debug("Calculating change in beach volume at {} in {} zone".format(site_id, zone))
|
|
prestorm_dune_toe_x = df_profile_features.loc[df_profile_features.index == site_id].dune_toe_x.tolist()
|
|
prestorm_dune_crest_x = df_profile_features.loc[df_profile_features.index == site_id].dune_crest_x.tolist()
|
|
|
|
# We may not have a dune toe or crest defined, or there may be multiple defined.
|
|
prestorm_dune_crest_x = return_first_or_nan(prestorm_dune_crest_x)
|
|
prestorm_dune_toe_x = return_first_or_nan(prestorm_dune_toe_x)
|
|
|
|
# If no dune to has been defined, Dlow = Dhigh. Refer to Sallenger (2000).
|
|
if np.isnan(prestorm_dune_toe_x):
|
|
prestorm_dune_toe_x = prestorm_dune_crest_x
|
|
|
|
# Find last x coordinate where we have both prestorm and poststorm measurements. If we don't do this,
|
|
# the prestorm and poststorm values are going to be calculated over different lengths.
|
|
df_zone = df_site.dropna(subset=["z"])
|
|
x_last_obs = min(
|
|
[
|
|
max(df_zone.query("profile_type == '{}'".format(profile_type)).index.get_level_values("x"))
|
|
for profile_type in ["prestorm", "poststorm"]
|
|
]
|
|
)
|
|
|
|
# Where we want to measure pre and post storm volume is dependant on the zone selected
|
|
if zone == "swash":
|
|
x_min = prestorm_dune_toe_x
|
|
x_max = x_last_obs
|
|
elif zone == "dune_face":
|
|
x_min = prestorm_dune_crest_x
|
|
x_max = prestorm_dune_toe_x
|
|
else:
|
|
logger.warning("Zone argument not properly specified. Please check")
|
|
x_min = None
|
|
x_max = None
|
|
|
|
# Now, compute the volume of sand between the x-coordinates prestorm_dune_toe_x and x_swash_last for both prestorm
|
|
# and post storm profiles.
|
|
prestorm_vol = beach_volume(
|
|
x=df_zone.query("profile_type=='prestorm'").index.get_level_values("x"),
|
|
z=df_zone.query("profile_type=='prestorm'").z,
|
|
x_min=x_min,
|
|
x_max=x_max,
|
|
)
|
|
poststorm_vol = beach_volume(
|
|
x=df_zone.query("profile_type=='poststorm'").index.get_level_values("x"),
|
|
z=df_zone.query("profile_type=='poststorm'").z,
|
|
x_min=x_min,
|
|
x_max=x_max,
|
|
)
|
|
|
|
# No point keeping so many decimal places, let's round them
|
|
prestorm_vol = np.round(prestorm_vol, 2)
|
|
poststorm_vol = np.round(poststorm_vol, 2)
|
|
|
|
df_vol_changes.loc[site_id, "prestorm_{}_vol".format(zone)] = prestorm_vol
|
|
df_vol_changes.loc[site_id, "poststorm_{}_vol".format(zone)] = poststorm_vol
|
|
df_vol_changes.loc[site_id, "{}_vol_change".format(zone)] = prestorm_vol - poststorm_vol
|
|
|
|
return df_vol_changes
|
|
|
|
|
|
def beach_volume(x, z, x_min=np.NINF, x_max=np.inf):
|
|
"""
|
|
Returns the beach volume of a profile, calculated with Simpsons rule
|
|
:param x: x-coordinates of beach profile
|
|
:param z: z-coordinates of beach profile
|
|
:param x_min: Minimum x-coordinate to consider when calculating volume
|
|
:param x_max: Maximum x-coordinate to consider when calculating volume
|
|
:return:
|
|
"""
|
|
profile_mask = [True if x_min < x_coord < x_max else False for x_coord in x]
|
|
x_masked = np.array(x)[profile_mask]
|
|
z_masked = np.array(z)[profile_mask]
|
|
|
|
if len(x_masked) == 0 or len(z_masked) == 0:
|
|
return np.nan
|
|
else:
|
|
return simps(z_masked, x_masked)
|
|
|
|
|
|
def storm_regime(df_observed_impacts):
|
|
"""
|
|
Returns the dataframe with an additional column of storm impacts based on the Storm Impact Scale. Refer to
|
|
Sallenger (2000) for details.
|
|
:param df_observed_impacts:
|
|
:return:
|
|
"""
|
|
logger.info("Getting observed storm regimes")
|
|
df_observed_impacts.loc[df_observed_impacts.dune_face_vol_change <= 5, "storm_regime"] = "swash"
|
|
df_observed_impacts.loc[df_observed_impacts.dune_face_vol_change > 5, "storm_regime"] = "collision"
|
|
return df_observed_impacts
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logger.info("Importing existing data")
|
|
data_folder = "./data/interim"
|
|
df_profiles = pd.read_csv(os.path.join(data_folder, "profiles.csv"), index_col=[0, 1, 2])
|
|
df_profile_features = pd.read_csv(os.path.join(data_folder, "profile_features.csv"), index_col=[0])
|
|
|
|
logger.info("Creating new dataframe for observed impacts")
|
|
df_observed_impacts = pd.DataFrame(index=df_profile_features.index)
|
|
|
|
logger.info("Getting pre/post storm volumes")
|
|
df_swash_vol_changes = volume_change(df_profiles, df_profile_features, zone="swash")
|
|
df_dune_face_vol_changes = volume_change(df_profiles, df_profile_features, zone="dune_face")
|
|
df_observed_impacts = df_observed_impacts.join([df_swash_vol_changes, df_dune_face_vol_changes])
|
|
|
|
# Classify regime based on volume changes
|
|
df_observed_impacts = storm_regime(df_observed_impacts)
|
|
|
|
# Save dataframe to csv
|
|
df_observed_impacts.to_csv(os.path.join(data_folder, "impacts_observed.csv"))
|
|
|
|
|
|
@click.command()
|
|
@click.option("--profiles-csv", required=True, help="")
|
|
@click.option("--profile-features-csv", required=True, help="")
|
|
@click.option("--output-file", required=True, help="")
|
|
def create_observed_impacts(profiles_csv, profile_features_csv, output_file):
|
|
|
|
logger.info("Creating observed wave impacts")
|
|
logger.info("Importing data")
|
|
df_profiles = pd.read_csv(profiles_csv, index_col=[0, 1, 2])
|
|
df_profile_features = pd.read_csv(profile_features_csv, index_col=[0])
|
|
|
|
logger.info("Creating new dataframe for observed impacts")
|
|
df_observed_impacts = pd.DataFrame(index=df_profile_features.index)
|
|
|
|
logger.info("Getting pre/post storm volumes")
|
|
df_swash_vol_changes = volume_change(df_profiles, df_profile_features, zone="swash")
|
|
df_dune_face_vol_changes = volume_change(df_profiles, df_profile_features, zone="dune_face")
|
|
df_observed_impacts = df_observed_impacts.join([df_swash_vol_changes, df_dune_face_vol_changes])
|
|
|
|
# Classify regime based on volume changes
|
|
df_observed_impacts = storm_regime(df_observed_impacts)
|
|
|
|
# Save dataframe to csv
|
|
df_observed_impacts.to_csv(output_file)
|
|
|
|
logger.info("Saved to %s", output_file)
|
|
logger.info("Done!")
|
|
|
|
|
|
@click.group()
|
|
def cli():
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli.add_command(create_observed_impacts)
|
|
cli()
|