|
|
|
"""
|
|
|
|
Estimates the forecasted storm impacts based on the forecasted water level and dune crest/toe.
|
|
|
|
"""
|
|
|
|
|
|
|
|
import logging.config
|
|
|
|
import os
|
|
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
|
|
logging.config.fileConfig("./src/logging.conf", disable_existing_loggers=False)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
def forecasted_impacts(df_profile_features, df_forecasted_twl):
|
|
|
|
"""
|
|
|
|
Combines our profile features (containing dune toes and crests) with water levels, to get the forecasted storm
|
|
|
|
impacts.
|
|
|
|
:param df_profile_features:
|
|
|
|
:param df_forecasted_twl:
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
logger.info("Getting forecasted storm regimes")
|
|
|
|
|
|
|
|
df_forecasted_impacts = pd.DataFrame(index=df_profile_features.index)
|
|
|
|
|
|
|
|
# For each site, find the maximum R_high value and the corresponding R_low value.
|
|
|
|
idx = df_forecasted_twl.groupby(level=["site_id"])["R_high"].idxmax().dropna()
|
|
|
|
df_r_vals = df_forecasted_twl.loc[idx, ["R_high", "R_low"]].reset_index(["datetime"])
|
|
|
|
df_forecasted_impacts = df_forecasted_impacts.merge(df_r_vals, how="left", left_index=True, right_index=True)
|
|
|
|
|
|
|
|
# Join with df_profile features to find dune toe and crest elevations
|
|
|
|
df_forecasted_impacts = df_forecasted_impacts.merge(
|
|
|
|
df_profile_features[["dune_toe_z", "dune_crest_z"]], how="left", left_index=True, right_index=True
|
|
|
|
)
|
|
|
|
|
|
|
|
# Compare R_high and R_low wirth dune crest and toe elevations
|
|
|
|
df_forecasted_impacts = storm_regime(df_forecasted_impacts)
|
|
|
|
|
|
|
|
return df_forecasted_impacts
|
|
|
|
|
|
|
|
|
|
|
|
def storm_regime(df_forecasted_impacts):
|
|
|
|
"""
|
|
|
|
Returns the dataframe with an additional column of storm impacts based on the Storm Impact Scale. Refer to
|
|
|
|
Sallenger (2000) for details.
|
|
|
|
:param df_forecasted_impacts:
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
logger.info("Getting forecasted storm regimes")
|
|
|
|
df_forecasted_impacts.loc[
|
|
|
|
df_forecasted_impacts.R_high <= df_forecasted_impacts.dune_toe_z, "storm_regime"
|
|
|
|
] = "swash"
|
|
|
|
df_forecasted_impacts.loc[
|
|
|
|
df_forecasted_impacts.dune_toe_z <= df_forecasted_impacts.R_high, "storm_regime"
|
|
|
|
] = "collision"
|
|
|
|
df_forecasted_impacts.loc[
|
|
|
|
(df_forecasted_impacts.dune_crest_z <= df_forecasted_impacts.R_high)
|
|
|
|
& (df_forecasted_impacts.R_low <= df_forecasted_impacts.dune_crest_z),
|
|
|
|
"storm_regime",
|
|
|
|
] = "overwash"
|
|
|
|
df_forecasted_impacts.loc[
|
|
|
|
(df_forecasted_impacts.dune_crest_z <= df_forecasted_impacts.R_low)
|
|
|
|
& (df_forecasted_impacts.dune_crest_z <= df_forecasted_impacts.R_high),
|
|
|
|
"storm_regime",
|
|
|
|
] = "inundation"
|
|
|
|
|
|
|
|
return df_forecasted_impacts
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
logger.info("Importing existing data")
|
|
|
|
data_folder = "./data/interim"
|
|
|
|
df_profiles = pd.read_csv(os.path.join(data_folder, "profiles.csv"), index_col=[0, 1, 2])
|
|
|
|
df_profile_features = pd.read_csv(os.path.join(data_folder, "profile_features.csv"), index_col=[0])
|
|
|
|
df_forecasted_twl = pd.read_csv(os.path.join(data_folder, "twl_mean_slope_sto06.csv"), index_col=[0, 1])
|
|
|
|
|
|
|
|
df_forecasted_impacts = forecasted_impacts(df_profile_features, df_forecasted_twl)
|
|
|
|
df_forecasted_impacts.to_csv(os.path.join(data_folder, "impacts_forecasted_mean_slope_sto06.csv"))
|