Add module for parsing observed storm impacts
parent
e7d6aa8761
commit
1b521a0524
@ -0,0 +1,138 @@
|
||||
import logging.config
|
||||
import os
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from scipy.integrate import simps
|
||||
|
||||
logging.config.fileConfig('./src/logging.conf', disable_existing_loggers=False)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def return_first_or_nan(l):
|
||||
"""
|
||||
Returns the first value of a list if empty or returns nan. Used for getting dune/toe and crest values.
|
||||
:param l:
|
||||
:return:
|
||||
"""
|
||||
if len(l) == 0:
|
||||
return np.nan
|
||||
else:
|
||||
return l[0]
|
||||
|
||||
|
||||
def volume_change(df_profiles, df_profile_features, zone):
|
||||
logger.info('Calculating change in beach volume in {} zone'.format(zone))
|
||||
|
||||
df_vol_changes = pd.DataFrame(index=df_profile_features.index)
|
||||
df_profiles = df_profiles.sort_index()
|
||||
sites = df_profiles.groupby(level=['site_id'])
|
||||
|
||||
for site_id, df_site in sites:
|
||||
logger.debug('Calculating change in beach volume at {} in {} zone'.format(site_id, zone))
|
||||
prestorm_dune_toe_x = df_profile_features.loc[df_profile_features.index == site_id].dune_toe_x.tolist()
|
||||
prestorm_dune_crest_x = df_profile_features.loc[df_profile_features.index == site_id].dune_crest_x.tolist()
|
||||
|
||||
# We may not have a dune toe or crest defined, or there may be multiple defined.
|
||||
prestorm_dune_crest_x = return_first_or_nan(prestorm_dune_crest_x)
|
||||
prestorm_dune_toe_x = return_first_or_nan(prestorm_dune_toe_x)
|
||||
|
||||
# If no dune to has been defined, Dlow = Dhigh. Refer to Sallenger (2000).
|
||||
if np.isnan(prestorm_dune_toe_x):
|
||||
prestorm_dune_toe_x = prestorm_dune_crest_x
|
||||
|
||||
# Find last x coordinate where we have both prestorm and poststorm measurements. If we don't do this,
|
||||
# the prestorm and poststorm values are going to be calculated over different lengths.
|
||||
df_zone = df_site.dropna(subset=['z'])
|
||||
x_last_obs = min([max(df_zone.query("profile_type == '{}'".format(profile_type)).index.get_level_values('x'))
|
||||
for profile_type in ['prestorm', 'poststorm']])
|
||||
|
||||
# Where we want to measure pre and post storm volume is dependant on the zone selected
|
||||
if zone == 'swash':
|
||||
x_min = prestorm_dune_toe_x
|
||||
x_max = x_last_obs
|
||||
elif zone == 'dune_face':
|
||||
x_min = prestorm_dune_crest_x
|
||||
x_max = prestorm_dune_toe_x
|
||||
else:
|
||||
logger.warning('Zone argument not properly specified. Please check')
|
||||
x_min = None
|
||||
x_max = None
|
||||
|
||||
# Now, compute the volume of sand between the x-coordinates prestorm_dune_toe_x and x_swash_last for both prestorm
|
||||
# and post storm profiles.
|
||||
prestorm_vol = beach_volume(x=df_zone.query("profile_type=='prestorm'").index.get_level_values('x'),
|
||||
z=df_zone.query("profile_type=='prestorm'").z,
|
||||
x_min=x_min,
|
||||
x_max=x_max)
|
||||
poststorm_vol = beach_volume(x=df_zone.query("profile_type=='poststorm'").index.get_level_values('x'),
|
||||
z=df_zone.query("profile_type=='poststorm'").z,
|
||||
x_min=x_min,
|
||||
x_max=x_max)
|
||||
|
||||
df_vol_changes.loc[site_id, 'prestorm_{}_vol'.format(zone)] = prestorm_vol
|
||||
df_vol_changes.loc[site_id, 'poststorm_{}_vol'.format(zone)] = poststorm_vol
|
||||
df_vol_changes.loc[site_id, '{}_vol_change'.format(zone)] = prestorm_vol - poststorm_vol
|
||||
|
||||
return df_vol_changes
|
||||
|
||||
|
||||
def beach_volume(x, z, x_min=np.NINF, x_max=np.inf):
|
||||
"""
|
||||
Returns the beach volume of a profile, calculated with Simpsons rule
|
||||
:param x: x-coordinates of beach profile
|
||||
:param z: z-coordinates of beach profile
|
||||
:param x_min: Minimum x-coordinate to consider when calculating volume
|
||||
:param x_max: Maximum x-coordinate to consider when calculating volume
|
||||
:return:
|
||||
"""
|
||||
profile_mask = [True if x_min < x_coord < x_max else False for x_coord in x]
|
||||
x_masked = np.array(x)[profile_mask]
|
||||
z_masked = np.array(z)[profile_mask]
|
||||
|
||||
if len(x_masked) == 0 or len(z_masked) == 0:
|
||||
return np.nan
|
||||
else:
|
||||
return simps(z_masked, x_masked)
|
||||
|
||||
|
||||
def impacts_from_profiles(df_profiles, df_profile_features):
|
||||
# Impacts should be per site, so use the profile_features as the base index.
|
||||
df_observed_impacts = pd.DataFrame(index=df_profile_features.index)
|
||||
|
||||
# Swash zone volume change
|
||||
prestorm_swash_vol, poststorm_swash_vol = volume_change(df_profiles, df_profile_features, zone='swash')
|
||||
|
||||
# Dune volume change
|
||||
|
||||
# If no dune volume change, then swash zone
|
||||
|
||||
#
|
||||
pass
|
||||
|
||||
|
||||
def storm_regime(df_observed_impacts):
|
||||
logger.info('Getting observed storm regimes')
|
||||
df_observed_impacts.loc[df_observed_impacts.swash_vol_change < 3,'storm_regime'] = 'swash'
|
||||
df_observed_impacts.loc[df_observed_impacts.dune_face_vol_change > 3, 'storm_regime'] = 'collision'
|
||||
return df_observed_impacts
|
||||
|
||||
if __name__ == '__main__':
|
||||
logger.info('Importing existing data')
|
||||
data_folder = './data/interim'
|
||||
df_profiles = pd.read_csv(os.path.join(data_folder, 'profiles.csv'), index_col=[0, 1, 2])
|
||||
df_profile_features = pd.read_csv(os.path.join(data_folder, 'profile_features.csv'), index_col=[0])
|
||||
|
||||
logger.info('Creating new dataframe for observed impacts')
|
||||
df_observed_impacts = pd.DataFrame(index=df_profile_features.index)
|
||||
|
||||
logger.info('Getting pre/post storm volumes')
|
||||
df_swash_vol_changes = volume_change(df_profiles, df_profile_features, zone='swash')
|
||||
df_dune_face_vol_changes = volume_change(df_profiles, df_profile_features, zone='dune_face')
|
||||
df_observed_impacts = df_observed_impacts.join([df_swash_vol_changes, df_dune_face_vol_changes ])
|
||||
|
||||
# TODO Classify regime based on volume changes
|
||||
df_observed_impacts = storm_regime(df_observed_impacts)
|
||||
|
||||
# TODO Save dataframe to csv
|
||||
df_observed_impacts.to_csv(os.path.join(data_folder, 'impacts_observed.csv'))
|
Loading…
Reference in New Issue