Rename mat parsing file and convert to callable CLI commands

develop
Chris Leaman 6 years ago
parent 67b7043ec3
commit 99e036a4cd

@ -1,263 +0,0 @@
"""
Converts raw .mat files into a flattened .csv structure which can be imported into python pandas.
"""
import logging.config
from datetime import datetime, timedelta
import pandas as pd
from mat4py import loadmat
import numpy as np
logging.config.fileConfig('./src/logging.conf', disable_existing_loggers=False)
logger = logging.getLogger(__name__)
def parse_orientations(orientations_mat):
"""
Parses the raw orientations.mat file and returns a pandas dataframe. Note that orientations are the direction
towards land measured in degrees anti-clockwise from east.
:param orientations_mat:
:return:
"""
logger.info('Parsing %s', orientations_mat)
mat_data = loadmat(orientations_mat)['output']
rows = []
for i in range(0, len(mat_data['beach'])):
rows.append({
'beach': mat_data['beach'][i],
'orientation': mat_data['orientation'][i],
'lat_center': mat_data['lat_center'][i],
'lon_center': mat_data['lon_center'][i],
'lat_land': mat_data['lat_land'][i],
'lon_land': mat_data['lon_land'][i],
'lat_sea': mat_data['lat_sea'][i],
'lon_sea': mat_data['lon_sea'][i],
})
df = pd.DataFrame(rows)
return df
def combine_sites_and_orientaions(df_sites, df_orientations):
"""
Replaces beach/lat/lon columns with the unique site_id.
:param dfs:
:param df_sites:
:return:
"""
df_merged_sites = df_sites.merge(df_orientations[['beach', 'lat_center', 'lon_center', 'orientation']],
left_on=['beach', 'lat', 'lon'],
right_on=['beach', 'lat_center', 'lon_center'])
# Check that all our records have a unique site identifier
n_unmatched = len(df_sites) - len(df_merged_sites)
if n_unmatched > 0:
logger.warning('Not all records (%d of %d) matched with an orientation', n_unmatched, len(df_sites))
# Drop extra columns
df_merged_sites = df_merged_sites.drop(columns = ['lat_center', 'lon_center'])
return df_merged_sites
def specify_lat_lon_profile_center(df_sites, x_val=200):
"""
Specify which x-coordinate in the beach profile cross section the lat/lon corresponds to
:param df_sites:
:return:
"""
df_sites['profile_x_lat_lon'] = x_val
return df_sites
def parse_waves(waves_mat):
"""
Parses the raw waves.mat file and returns a pandas dataframe
:param waves_mat:
:return:
"""
logger.info('Parsing %s', waves_mat)
mat_data = loadmat(waves_mat)['data']
rows = []
for i in range(0, len(mat_data['site'])):
for j in range(0, len(mat_data['dates'][i])):
rows.append({
'beach': mat_data['site'][i],
'lon': mat_data['lon'][i],
'lat': mat_data['lat'][i],
'datetime': matlab_datenum_to_datetime(mat_data['dates'][i][j][0]),
'Hs': mat_data['H'][i][j][0],
'Hs0': mat_data['Ho'][i][j][0],
'Tp': mat_data['T'][i][j][0],
'dir': mat_data['D'][i][j][0],
'E': mat_data['E'][i][j][0],
'P': mat_data['P'][i][j][0],
'Exs': mat_data['Exs'][i][j][0],
'Pxs': mat_data['Pxs'][i][j][0],
})
df = pd.DataFrame(rows)
df['datetime'] = df['datetime'].dt.round('1s')
return df
def parse_tides(tides_mat):
"""
Parses the raw tides.mat file and returns a pandas dataframe
:param tides_mat:
:return:
"""
logger.info('Parsing %s', tides_mat)
mat_data = loadmat(tides_mat)['data']
rows = []
for i in range(0, len(mat_data['site'])):
for j in range(0, len(mat_data['time'])):
rows.append({
'beach': mat_data['site'][i][0],
'lon': mat_data['lons'][i][0],
'lat': mat_data['lats'][i][0],
'datetime': matlab_datenum_to_datetime(mat_data['time'][j][0]),
'tide': mat_data['tide'][i][j]
})
df = pd.DataFrame(rows)
df['datetime'] = df['datetime'].dt.round('1s')
return df
def parse_profiles(profiles_mat):
"""
Parses the raw profiles.mat file and returns a pandas dataframe
:param tides_mat:
:return:
"""
logger.info('Parsing %s', profiles_mat)
mat_data = loadmat(profiles_mat)['data']
rows = []
for i in range(0, len(mat_data['site'])):
for j in range(0, len(mat_data['pfx'][i])):
for profile_type in ['prestorm', 'poststorm']:
if profile_type == 'prestorm':
z = mat_data['pf1'][i][j][0]
if profile_type == 'poststorm':
z = mat_data['pf2'][i][j][0]
rows.append({
'beach': mat_data['site'][i],
'lon': mat_data['lon'][i],
'lat': mat_data['lat'][i],
'profile_type': profile_type,
'x': mat_data['pfx'][i][j][0],
'z': z,
})
df = pd.DataFrame(rows)
return df
def remove_zeros(df_profiles):
"""
When parsing the pre/post storm profiles, the end of some profiles have constant values of zero. Let's change
these to NaNs for consistancy. Didn't use pandas fillnan because 0 may still be a valid value.
:param df:
:return:
"""
df_profiles = df_profiles.sort_index()
groups = df_profiles.groupby(level=['site_id','profile_type'])
for key, _ in groups:
logger.debug('Removing zeros from {} profile at {}'.format(key[1], key[0]))
idx_site = (df_profiles.index.get_level_values('site_id') == key[0]) & \
(df_profiles.index.get_level_values('profile_type') == key[1])
df_profile = df_profiles[idx_site]
x_last_ele = df_profile[df_profile.z!=0].index.get_level_values('x')[-1]
df_profiles.loc[idx_site & (df_profiles.index.get_level_values('x')>x_last_ele), 'z'] = np.nan
return df_profiles
def matlab_datenum_to_datetime(matlab_datenum):
# https://stackoverflow.com/a/13965852
return datetime.fromordinal(int(matlab_datenum)) + timedelta(days=matlab_datenum % 1) - timedelta(
days=366)
def get_unique_sites(dfs, cols=['beach', 'lat', 'lon']):
"""
Generates a dataframe of unique sites based on beach names, lats and lons. Creates a unique site ID for each.
:param dfs:
:param cols:
:return:
"""
rows = []
df_all = pd.concat([df[cols] for df in dfs])
beach_groups = df_all.groupby(['beach'])
for beach_name, beach_group in beach_groups:
site_groups = beach_group.groupby(['lat', 'lon'])
siteNo = 1
for site_name, site_group in site_groups:
site = '{}{:04d}'.format(beach_name, siteNo)
rows.append({'site_id': site,
'lat': site_name[0],
'lon': site_name[1],
'beach': beach_name})
siteNo += 1
df = pd.DataFrame(rows)
return df
def replace_unique_sites(df, df_sites, cols=['beach', 'lat', 'lon']):
"""
Replaces beach/lat/lon columns with the unique site_id
:param dfs:
:param df_sites:
:return:
"""
df_merged = df.merge(df_sites, on=cols)
# Check that all our records have a unique site identifier
n_unmatched = len(df) - len(df_merged)
if n_unmatched > 0:
logger.warning('Not all records (%d of %d) matched with a unique site', n_unmatched, len(df))
df_merged = df_merged.drop(columns=cols)
return df_merged
def main():
df_waves = parse_waves(waves_mat='./data/raw/processed_shorelines/waves.mat')
df_tides = parse_tides(tides_mat='./data/raw/processed_shorelines/tides.mat')
df_profiles = parse_profiles(profiles_mat='./data/raw/processed_shorelines/profiles.mat')
df_sites = get_unique_sites(dfs=[df_waves, df_tides, df_profiles])
df_orientations = parse_orientations(orientations_mat='./data/raw/processed_shorelines/orientations.mat')
logger.info('Identifying unique sites')
df_waves = replace_unique_sites(df_waves, df_sites)
df_tides = replace_unique_sites(df_tides, df_sites)
df_profiles = replace_unique_sites(df_profiles, df_sites)
logger.info('Combine orientations into sites')
df_sites = combine_sites_and_orientaions(df_sites, df_orientations)
df_sites = specify_lat_lon_profile_center(df_sites)
logger.info('Setting pandas index')
df_profiles.set_index(['site_id', 'profile_type', 'x'], inplace=True)
df_waves.set_index(['site_id', 'datetime'], inplace=True)
df_tides.set_index(['site_id', 'datetime'], inplace=True)
df_sites.set_index(['site_id'], inplace=True)
logger.info('Nanning profile zero elevations')
df_profiles = remove_zeros(df_profiles)
logger.info('Outputting .csv files')
df_profiles.to_csv('./data/interim/profiles.csv')
df_tides.to_csv('./data/interim/tides.csv')
df_waves.to_csv('./data/interim/waves.csv')
df_sites.to_csv('./data/interim/sites.csv')
logger.info('Done!')
if __name__ == '__main__':
main()

@ -0,0 +1,345 @@
"""
Converts raw .mat files into a flattened .csv structure which can be imported into python pandas.
"""
import logging.config
from datetime import datetime, timedelta
import click
import pandas as pd
from mat4py import loadmat
import numpy as np
logging.config.fileConfig("./src/logging.conf", disable_existing_loggers=False)
logger = logging.getLogger(__name__)
def parse_orientations(orientations_mat):
"""
Parses the raw orientations.mat file and returns a pandas dataframe. Note that orientations are the direction
towards land measured in degrees anti-clockwise from east.
:param orientations_mat:
:return:
"""
logger.info("Parsing %s", orientations_mat)
mat_data = loadmat(orientations_mat)["output"]
rows = []
for i in range(0, len(mat_data["beach"])):
rows.append(
{
"beach": mat_data["beach"][i],
"orientation": mat_data["orientation"][i],
"lat_center": mat_data["lat_center"][i],
"lon_center": mat_data["lon_center"][i],
"lat_land": mat_data["lat_land"][i],
"lon_land": mat_data["lon_land"][i],
"lat_sea": mat_data["lat_sea"][i],
"lon_sea": mat_data["lon_sea"][i],
}
)
df = pd.DataFrame(rows)
return df
def combine_sites_and_orientaions(df_sites, df_orientations):
"""
Replaces beach/lat/lon columns with the unique site_id.
:param dfs:
:param df_sites:
:return:
"""
df_merged_sites = df_sites.merge(
df_orientations[["beach", "lat_center", "lon_center", "orientation"]],
left_on=["beach", "lat", "lon"],
right_on=["beach", "lat_center", "lon_center"],
)
# Check that all our records have a unique site identifier
n_unmatched = len(df_sites) - len(df_merged_sites)
if n_unmatched > 0:
logger.warning("Not all records (%d of %d) matched with an orientation", n_unmatched, len(df_sites))
# Drop extra columns
df_merged_sites = df_merged_sites.drop(columns=["lat_center", "lon_center"])
return df_merged_sites
def specify_lat_lon_profile_center(df_sites, x_val=200):
"""
Specify which x-coordinate in the beach profile cross section the lat/lon corresponds to
:param df_sites:
:return:
"""
df_sites["profile_x_lat_lon"] = x_val
return df_sites
def parse_waves(waves_mat):
"""
Parses the raw waves.mat file and returns a pandas dataframe
:param waves_mat:
:return:
"""
logger.info("Parsing %s", waves_mat)
mat_data = loadmat(waves_mat)["data"]
rows = []
for i in range(0, len(mat_data["site"])):
for j in range(0, len(mat_data["dates"][i])):
rows.append(
{
"beach": mat_data["site"][i],
"lon": mat_data["lon"][i],
"lat": mat_data["lat"][i],
"datetime": matlab_datenum_to_datetime(mat_data["dates"][i][j][0]),
"Hs": mat_data["H"][i][j][0],
"Hs0": mat_data["Ho"][i][j][0],
"Tp": mat_data["T"][i][j][0],
"dir": mat_data["D"][i][j][0],
"E": mat_data["E"][i][j][0],
"P": mat_data["P"][i][j][0],
"Exs": mat_data["Exs"][i][j][0],
"Pxs": mat_data["Pxs"][i][j][0],
}
)
df = pd.DataFrame(rows)
df["datetime"] = df["datetime"].dt.round("1s")
return df
def parse_tides(tides_mat):
"""
Parses the raw tides.mat file and returns a pandas dataframe
:param tides_mat:
:return:
"""
logger.info("Parsing %s", tides_mat)
mat_data = loadmat(tides_mat)["data"]
rows = []
for i in range(0, len(mat_data["site"])):
for j in range(0, len(mat_data["time"])):
rows.append(
{
"beach": mat_data["site"][i][0],
"lon": mat_data["lons"][i][0],
"lat": mat_data["lats"][i][0],
"datetime": matlab_datenum_to_datetime(mat_data["time"][j][0]),
"tide": mat_data["tide"][i][j],
}
)
df = pd.DataFrame(rows)
df["datetime"] = df["datetime"].dt.round("1s")
return df
def parse_profiles(profiles_mat):
"""
Parses the raw profiles.mat file and returns a pandas dataframe
:param tides_mat:
:return:
"""
logger.info("Parsing %s", profiles_mat)
mat_data = loadmat(profiles_mat)["data"]
rows = []
for i in range(0, len(mat_data["site"])):
for j in range(0, len(mat_data["pfx"][i])):
for profile_type in ["prestorm", "poststorm"]:
if profile_type == "prestorm":
z = mat_data["pf1"][i][j][0]
if profile_type == "poststorm":
z = mat_data["pf2"][i][j][0]
rows.append(
{
"beach": mat_data["site"][i],
"lon": mat_data["lon"][i],
"lat": mat_data["lat"][i],
"profile_type": profile_type,
"x": mat_data["pfx"][i][j][0],
"z": z,
}
)
df = pd.DataFrame(rows)
return df
def remove_zeros(df_profiles):
"""
When parsing the pre/post storm profiles, the end of some profiles have constant values of zero. Let's change
these to NaNs for consistancy. Didn't use pandas fillnan because 0 may still be a valid value.
:param df:
:return:
"""
df_profiles = df_profiles.sort_index()
groups = df_profiles.groupby(level=["site_id", "profile_type"])
for key, _ in groups:
logger.debug("Removing zeros from {} profile at {}".format(key[1], key[0]))
idx_site = (df_profiles.index.get_level_values("site_id") == key[0]) & (
df_profiles.index.get_level_values("profile_type") == key[1]
)
df_profile = df_profiles[idx_site]
x_last_ele = df_profile[df_profile.z != 0].index.get_level_values("x")[-1]
df_profiles.loc[idx_site & (df_profiles.index.get_level_values("x") > x_last_ele), "z"] = np.nan
return df_profiles
def matlab_datenum_to_datetime(matlab_datenum):
"""
Adapted from https://stackoverflow.com/a/13965852
:param matlab_datenum:
:return:
"""
return datetime.fromordinal(int(matlab_datenum)) + timedelta(days=matlab_datenum % 1) - timedelta(days=366)
def get_unique_sites(dfs, cols=["beach", "lat", "lon"]):
"""
Generates a dataframe of unique sites based on beach names, lats and lons. Creates a unique site ID for each.
:param dfs:
:param cols:
:return:
"""
rows = []
df_all = pd.concat([df[cols] for df in dfs])
beach_groups = df_all.groupby(["beach"])
for beach_name, beach_group in beach_groups:
site_groups = beach_group.groupby(["lat", "lon"])
siteNo = 1
for site_name, site_group in site_groups:
site = "{}{:04d}".format(beach_name, siteNo)
rows.append({"site_id": site, "lat": site_name[0], "lon": site_name[1], "beach": beach_name})
siteNo += 1
df = pd.DataFrame(rows)
return df
def replace_unique_sites(df, df_sites, cols=["lat", "lon"]):
"""
Replaces beach/lat/lon columns with the unique site_id
:param dfs:
:param df_sites:
:return:
"""
# Make the sites index a column, so it can be merged into df
df_sites["site_id"] = df_sites.index.get_level_values("site_id")
# Merging on a float can lead to subtle bugs. Lets convert lat/lons to integers and merge on that instead
precision = 8
df_sites["lat_int"] = np.round(df_sites["lat"] * 10 ** precision).astype(np.int64)
df_sites["lon_int"] = np.round(df_sites["lon"] * 10 ** precision).astype(np.int64)
df["lat_int"] = np.round(df["lat"] * 10 ** precision).astype(np.int64)
df["lon_int"] = np.round(df["lon"] * 10 ** precision).astype(np.int64)
df_merged = df.merge(df_sites, on=["lat_int", "lon_int"])
# Check that all our records have a unique site identifier
n_unmatched = len(df) - len(df_merged)
if n_unmatched > 0:
logger.warning("Not all records (%d of %d) matched with a unique site", n_unmatched, len(df))
df_merged = df_merged.drop(
columns=[
"lat_x",
"lon_x",
"lat_int",
"lon_int",
"beach_y",
"beach_x",
"lat_y",
"lon_y",
"orientation",
"profile_x_lat_lon",
]
)
return df_merged
@click.command(short_help="create sites.csv")
@click.option("--waves-mat", required=True, help=".mat file containing wave records")
@click.option("--tides-mat", required=True, help=".mat file containing tide records")
@click.option("--profiles-mat", required=True, help=".mat file containing beach profiles")
@click.option("--orientations-mat", required=True, help=".mat file containing orientation of beach profiles")
@click.option("--output-file", required=True, help="where to save sites.csv")
def create_sites_csv(waves_mat, tides_mat, profiles_mat, orientations_mat, output_file):
logger.info("Creating %s", output_file)
df_waves = parse_waves(waves_mat=waves_mat)
df_tides = parse_tides(tides_mat=tides_mat)
df_profiles = parse_profiles(profiles_mat=profiles_mat)
df_orientations = parse_orientations(orientations_mat=orientations_mat)
df_sites = get_unique_sites(dfs=[df_waves, df_tides, df_profiles])
df_sites = combine_sites_and_orientaions(df_sites, df_orientations)
df_sites = specify_lat_lon_profile_center(df_sites)
df_sites.set_index(["site_id"], inplace=True)
df_sites.to_csv(output_file)
logger.info("Created %s", output_file)
@click.command(short_help="create waves.csv")
@click.option("--waves-mat", required=True, help=".mat file containing wave records")
@click.option("--sites-csv", required=True, help=".csv file description of cross section sites")
@click.option("--output-file", required=True, help="where to save waves.csv")
def create_waves_csv(waves_mat, sites_csv, output_file):
logger.info("Creating %s", output_file)
df_waves = parse_waves(waves_mat=waves_mat)
df_sites = pd.read_csv(sites_csv, index_col=[0])
df_waves = replace_unique_sites(df_waves, df_sites)
df_waves.set_index(["site_id", "datetime"], inplace=True)
df_waves.sort_index(inplace=True)
df_waves.to_csv(output_file)
logger.info("Created %s", output_file)
@click.command(short_help="create profiles.csv")
@click.option("--profiles-mat", required=True, help=".mat file containing beach profiles")
@click.option("--sites-csv", required=True, help=".csv file description of cross section sites")
@click.option("--output-file", required=True, help="where to save profiles.csv")
def create_profiles_csv(profiles_mat, sites_csv, output_file):
logger.info("Creating %s", output_file)
df_profiles = parse_profiles(profiles_mat=profiles_mat)
df_sites = pd.read_csv(sites_csv, index_col=[0])
df_profiles = replace_unique_sites(df_profiles, df_sites)
df_profiles.set_index(["site_id", "profile_type", "x"], inplace=True)
df_profiles.sort_index(inplace=True)
df_profiles.to_csv(output_file)
logger.info("Created %s", output_file)
@click.command(short_help="create profiles.csv")
@click.option("--tides-mat", required=True, help=".mat file containing tides")
@click.option("--sites-csv", required=True, help=".csv file description of cross section sites")
@click.option("--output-file", required=True, help="where to save tides.csv")
def create_tides_csv(tides_mat, sites_csv, output_file):
logger.info("Creating %s", output_file)
df_tides = parse_tides(tides_mat=tides_mat)
df_sites = pd.read_csv(sites_csv, index_col=[0])
df_tides = replace_unique_sites(df_tides, df_sites)
df_tides.set_index(["site_id", "datetime"], inplace=True)
df_tides.sort_index(inplace=True)
df_tides.to_csv(output_file)
logger.info("Created %s", output_file)
@click.group()
def cli():
pass
if __name__ == "__main__":
cli.add_command(create_sites_csv)
cli.add_command(create_waves_csv)
cli.add_command(create_profiles_csv)
cli.add_command(create_tides_csv)
cli()
pd.set_option("display.precision", 8)
pd.set_option("display.max_columns", None)
Loading…
Cancel
Save