You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

441 lines
16 KiB
Python

"""
Converts raw .mat files into a flattened .csv structure which can be imported into python pandas.
"""
import math
from datetime import datetime, timedelta
import click
import numpy as np
import pandas as pd
from mat4py import loadmat
from shapely.geometry import Point
from data.profile_features import convert_coord_systems
from utils import setup_logging
logger = setup_logging()
def parse_orientations(orientations_mat):
"""
Parses the raw orientations.mat file and returns a pandas dataframe. Note that orientations are the direction
towards land measured in degrees anti-clockwise from east.
:param orientations_mat:
:return:
"""
logger.info("Parsing %s", orientations_mat)
mat_data = loadmat(orientations_mat)["output"]
rows = []
for i in range(0, len(mat_data["beach"])):
rows.append(
{
"beach": mat_data["beach"][i],
"orientation": mat_data["orientation"][i],
"lat_center": mat_data["lat_center"][i],
"lon_center": mat_data["lon_center"][i],
"lat_land": mat_data["lat_land"][i],
"lon_land": mat_data["lon_land"][i],
"lat_sea": mat_data["lat_sea"][i],
"lon_sea": mat_data["lon_sea"][i],
}
)
df = pd.DataFrame(rows)
return df
def parse_dune_crest_toes(df_sites, crest_mat, toe_mat):
"""
:param df_sites:
:param crest_mat:
:param toe_mat:
:return:
"""
logger.info("Parsing dune crests and toes")
rows = []
crest_data = loadmat(crest_mat)
toe_data = loadmat(toe_mat)
6 years ago
for n, _ in enumerate(crest_data["xc1"]):
rows.extend(
[
{
"dune_crest_x": crest_data["xc1"][n],
"dune_crest_z": crest_data["zc1"][n],
"dune_toe_x": toe_data["xt1"][n],
"dune_toe_z": toe_data["zt1"][n],
"profile_type": "prestorm",
"site_no": n + 1,
},
{
"dune_crest_x": crest_data["xc2"][n],
"dune_crest_z": crest_data["zc2"][n],
"dune_toe_x": toe_data["xt2"][n],
"dune_toe_z": toe_data["zt2"][n],
"profile_type": "poststorm",
"site_no": n + 1,
},
]
)
df_profile_features = pd.DataFrame(rows)
# Want the site_id instead of the site_no, so merge in df_sites
df_sites.reset_index(inplace=True)
6 years ago
df_profile_features = df_sites[["site_no", "site_id"]].merge(df_profile_features, how="outer", on=["site_no"])
df_profile_features.drop(columns=["site_no"], inplace=True)
df_profile_features.set_index(["site_id", "profile_type"], inplace=True)
df_profile_features.sort_index(inplace=True)
df_profile_features = df_profile_features.round(3)
return df_profile_features
6 years ago
def combine_sites_and_orientaions(df_sites, df_orientations):
"""
Replaces beach/lat/lon columns with the unique site_id.
:param dfs:
:param df_sites:
:return:
"""
df_merged_sites = df_sites.merge(
df_orientations[["beach", "lat_center", "lon_center", "orientation"]],
left_on=["beach", "lat", "lon"],
right_on=["beach", "lat_center", "lon_center"],
)
# Check that all our records have a unique site identifier
n_unmatched = len(df_sites) - len(df_merged_sites)
if n_unmatched > 0:
logger.warning("Not all records (%d of %d) matched with an orientation", n_unmatched, len(df_sites))
# Drop extra columns
df_merged_sites = df_merged_sites.drop(columns=["lat_center", "lon_center"])
return df_merged_sites
def specify_lat_lon_profile_center(df_sites, x_val=200):
"""
Specify which x-coordinate in the beach profile cross section the lat/lon corresponds to
:param df_sites:
:return:
"""
df_sites["profile_x_lat_lon"] = x_val
return df_sites
def parse_waves(waves_mat):
"""
Parses the raw waves.mat file and returns a pandas dataframe
:param waves_mat:
:return:
"""
logger.info("Parsing %s", waves_mat)
mat_data = loadmat(waves_mat)["data"]
rows = []
for i in range(0, len(mat_data["site"])):
for j in range(0, len(mat_data["dates"][i])):
rows.append(
{
"beach": mat_data["site"][i],
"lon": mat_data["lon"][i],
"lat": mat_data["lat"][i],
"datetime": matlab_datenum_to_datetime(mat_data["dates"][i][j][0]),
"Hs": mat_data["H"][i][j][0],
"Hs0": mat_data["Ho"][i][j][0],
"Tp": mat_data["T"][i][j][0],
"dir": mat_data["D"][i][j][0],
"E": mat_data["E"][i][j][0],
"P": mat_data["P"][i][j][0],
"Exs": mat_data["Exs"][i][j][0],
"Pxs": mat_data["Pxs"][i][j][0],
}
)
df = pd.DataFrame(rows)
df["datetime"] = df["datetime"].dt.round("1s")
return df
def parse_tides(tides_mat):
"""
Parses the raw tides.mat file and returns a pandas dataframe
:param tides_mat:
:return:
"""
logger.info("Parsing %s", tides_mat)
mat_data = loadmat(tides_mat)["data"]
rows = []
for i in range(0, len(mat_data["site"])):
for j in range(0, len(mat_data["time"])):
rows.append(
{
"beach": mat_data["site"][i][0],
"lon": mat_data["lons"][i][0],
"lat": mat_data["lats"][i][0],
"datetime": matlab_datenum_to_datetime(mat_data["time"][j][0]),
"tide": mat_data["tide"][i][j],
}
)
df = pd.DataFrame(rows)
df["datetime"] = df["datetime"].dt.round("1s")
return df
def parse_profiles_and_sites(profiles_mat):
"""
Parses the raw profiles.mat file and returns a pandas dataframe
:param tides_mat:
:return:
"""
logger.info("Parsing %s", profiles_mat)
mat_data = loadmat(profiles_mat)["data"]
profile_rows = []
site_rows = []
site_counter = 0
for i, site in enumerate(mat_data["site"]):
6 years ago
logger.debug("Processing site {} of {}".format(i + 1, len(mat_data["site"])))
# Give each site a unique id
if len(site_rows) == 0 or site_rows[-1]["beach"] != site:
site_counter = 1
else:
site_counter += 1
site_id = "{}{:04d}".format(site, site_counter)
# Initalize location of x=200m latitude and longitude
x_200_lat = np.nan
x_200_lon = np.nan
# Want to calculation the orientation
orientation = {}
for x, lat, lon, z_prestorm, z_poststorm, easting, northing in zip(
mat_data["x"][i],
mat_data["lats"][i],
mat_data["lons"][i],
mat_data["Zpre"][i],
mat_data["Zpost"][i],
mat_data["eastings"][i],
mat_data["northings"][i],
):
# Only extract pre and post storm profile
for j, profile_type in enumerate(["prestorm", "poststorm"]):
if mat_data["isgood"][i][j] == 1:
land_lim = mat_data["landlims"][i][j]
survey_datetime = matlab_datenum_to_datetime(mat_data["surveydates"][i][j])
if profile_type == "prestorm":
z = z_prestorm
else:
z = z_poststorm
# Keep a record of the where the center of the profile is located, and the locations of the land
# and sea
# TODO: This code isn't very transferrable. What if we don't have lat/lons at 200 m? Relook at this
if x[0] == 200:
x_200_lat = lat[0]
x_200_lon = lon[0]
elif x[0] == 0:
orientation["land_easting"] = easting[0]
orientation["land_northing"] = northing[0]
elif x[0] == 400:
orientation["sea_easting"] = easting[0]
orientation["sea_northing"] = northing[0]
profile_rows.append(
{
"site_id": site_id,
"lon": lon[0],
"lat": lat[0],
"profile_type": profile_type,
"x": x[0],
"z": z[0],
"land_lim": land_lim,
"survey_datetime": survey_datetime,
}
)
orientation = math.degrees(
math.atan2(
orientation["land_northing"] - orientation["sea_northing"],
orientation["land_easting"] - orientation["sea_easting"],
)
)
site_rows.append(
{
"site_id": site_id,
"site_no": i + 1,
"beach": site,
"lat": x_200_lat,
"lon": x_200_lon,
"orientation": orientation,
"profile_x_lat_lon": 200,
}
)
df_profiles = pd.DataFrame(profile_rows)
df_sites = pd.DataFrame(site_rows)
logger.info("Parsed profiles and sites")
return df_profiles, df_sites
def remove_zeros(df_profiles):
"""
When parsing the pre/post storm profiles, the end of some profiles have constant values of zero. Let's change
these to NaNs for consistancy. Didn't use pandas fillnan because 0 may still be a valid value.
:param df_profiles:
:return:
"""
logger.info("Removing zeros from end of profiles")
df_profiles = df_profiles.sort_index()
groups = df_profiles.groupby(level=["site_id", "profile_type"])
for key, _ in groups:
logger.debug("Removing zeros from {} profile at {}".format(key[1], key[0]))
idx_site = (df_profiles.index.get_level_values("site_id") == key[0]) & (
df_profiles.index.get_level_values("profile_type") == key[1]
)
df_profile = df_profiles[idx_site]
x_last_ele = df_profile[df_profile.z != 0].index.get_level_values("x")[-1]
df_profiles.loc[idx_site & (df_profiles.index.get_level_values("x") > x_last_ele), "z"] = np.nan
logger.info("Removed zeros from end of profiles")
return df_profiles
def matlab_datenum_to_datetime(matlab_datenum):
"""
Adapted from https://stackoverflow.com/a/13965852
:param matlab_datenum:
:return:
"""
return datetime.fromordinal(int(matlab_datenum)) + timedelta(days=matlab_datenum % 1) - timedelta(days=366)
def replace_unique_sites(df, df_sites):
"""
Replaces beach/lat/lon columns with the unique site_id
:param dfs:
:param df_sites:
:return:
"""
# Make the sites index a column, so it can be merged into df
df_sites["site_id"] = df_sites.index.get_level_values("site_id")
# Create eastings and northings so we can calculate distances
site_points = [convert_coord_systems(Point(lon, lat)).xy for lon, lat in zip(df_sites["lon"], df_sites["lat"])]
df_sites["easting"] = [x[0][0] for x in site_points]
df_sites["northing"] = [x[1][0] for x in site_points]
# Process each unique combination lat/lons in groups
groups = df.groupby(["lat", "lon"])
for (lat, lon), df_group in groups:
# Calculate distances from each point to each site and determine closest site
easting, northing = [x[0] for x in convert_coord_systems(Point(lon, lat)).xy]
distances_to_sites = np.sqrt((df_sites["easting"] - easting) ** 2 + (df_sites["northing"] - northing) ** 2)
min_distance = distances_to_sites.min()
closest_site = distances_to_sites.idxmin()
# Do some logging so we can check later.
if min_distance > 1:
logger.warning("Closest site to (%.4f,%.4f) is %s (%.2f m away)", lat, lon, closest_site, min_distance)
else:
logger.info("Closest site to (%.4f,%.4f) is %s (%.2f m away)", lat, lon, closest_site, min_distance)
# Assign site_id based on closest site
df.loc[df_group.index, "site_id"] = closest_site
nan_count = df.site_id.isna().sum()
if nan_count > 0:
logger.warning("Not all records (%d of %d) matched with a unique site", nan_count, len(df))
df = df.drop(columns=["lat", "lon", "beach"])
return df
@click.command(short_help="create waves.csv")
@click.option("--waves-mat", required=True, help=".mat file containing wave records")
@click.option("--sites-csv", required=True, help=".csv file description of cross section sites")
@click.option("--output-file", required=True, help="where to save waves.csv")
def create_waves_csv(waves_mat, sites_csv, output_file):
logger.info("Creating %s", output_file)
df_waves = parse_waves(waves_mat=waves_mat)
df_sites = pd.read_csv(sites_csv, index_col=[0])
df_waves = replace_unique_sites(df_waves, df_sites)
df_waves.set_index(["site_id", "datetime"], inplace=True)
df_waves.sort_index(inplace=True)
df_waves.to_csv(output_file)
logger.info("Created %s", output_file)
@click.command(short_help="create profile_features.csv")
@click.option("--crest-mat", required=True, help=".mat file containing wave records")
@click.option("--toe-mat", required=True, help=".mat file containing wave records")
@click.option("--sites-csv", required=True, help=".csv file description of cross section sites")
@click.option("--output-file", required=True, help="where to save waves.csv")
def create_profile_features(crest_mat, toe_mat, sites_csv, output_file):
logger.info("Creating %s", output_file)
df_sites = pd.read_csv(sites_csv, index_col=[0])
df_profile_features = parse_dune_crest_toes(df_sites, crest_mat, toe_mat)
df_profile_features.to_csv(output_file)
logger.info("Created %s", output_file)
6 years ago
@click.command(short_help="create profiles.csv")
@click.option("--profiles-mat", required=True, help=".mat file containing beach profiles")
@click.option("--profiles-output-file", required=True, help="where to save profiles.csv")
@click.option("--sites-output-file", required=True, help="where to save sites.csv")
def create_sites_and_profiles_csv(profiles_mat, profiles_output_file, sites_output_file):
logger.info("Creating sites and profiles csvs")
df_profiles, df_sites = parse_profiles_and_sites(profiles_mat=profiles_mat)
df_profiles.set_index(["site_id", "profile_type", "x"], inplace=True)
df_profiles.sort_index(inplace=True)
df_profiles = remove_zeros(df_profiles)
df_sites.set_index(["site_id"], inplace=True)
df_sites.sort_index(inplace=True)
df_profiles.to_csv(profiles_output_file)
logger.info("Created %s", profiles_output_file)
df_sites.to_csv(sites_output_file)
logger.info("Created %s", sites_output_file)
@click.command(short_help="create profiles.csv")
@click.option("--tides-mat", required=True, help=".mat file containing tides")
@click.option("--sites-csv", required=True, help=".csv file description of cross section sites")
@click.option("--output-file", required=True, help="where to save tides.csv")
def create_tides_csv(tides_mat, sites_csv, output_file):
logger.info("Creating %s", output_file)
df_tides = parse_tides(tides_mat=tides_mat)
df_sites = pd.read_csv(sites_csv, index_col=[0])
df_tides = replace_unique_sites(df_tides, df_sites)
df_tides.set_index(["site_id", "datetime"], inplace=True)
df_tides.sort_index(inplace=True)
df_tides.to_csv(output_file)
logger.info("Created %s", output_file)
@click.group()
def cli():
pass
if __name__ == "__main__":
cli.add_command(create_waves_csv)
cli.add_command(create_sites_and_profiles_csv)
cli.add_command(create_tides_csv)
cli()