""" Converts raw .mat files into a flattened .csv structure which can be imported into python pandas. """ import logging.config from datetime import datetime, timedelta import click import pandas as pd from mat4py import loadmat import numpy as np logging.config.fileConfig("./src/logging.conf", disable_existing_loggers=False) logger = logging.getLogger(__name__) def parse_orientations(orientations_mat): """ Parses the raw orientations.mat file and returns a pandas dataframe. Note that orientations are the direction towards land measured in degrees anti-clockwise from east. :param orientations_mat: :return: """ logger.info("Parsing %s", orientations_mat) mat_data = loadmat(orientations_mat)["output"] rows = [] for i in range(0, len(mat_data["beach"])): rows.append( { "beach": mat_data["beach"][i], "orientation": mat_data["orientation"][i], "lat_center": mat_data["lat_center"][i], "lon_center": mat_data["lon_center"][i], "lat_land": mat_data["lat_land"][i], "lon_land": mat_data["lon_land"][i], "lat_sea": mat_data["lat_sea"][i], "lon_sea": mat_data["lon_sea"][i], } ) df = pd.DataFrame(rows) return df def combine_sites_and_orientaions(df_sites, df_orientations): """ Replaces beach/lat/lon columns with the unique site_id. :param dfs: :param df_sites: :return: """ df_merged_sites = df_sites.merge( df_orientations[["beach", "lat_center", "lon_center", "orientation"]], left_on=["beach", "lat", "lon"], right_on=["beach", "lat_center", "lon_center"], ) # Check that all our records have a unique site identifier n_unmatched = len(df_sites) - len(df_merged_sites) if n_unmatched > 0: logger.warning("Not all records (%d of %d) matched with an orientation", n_unmatched, len(df_sites)) # Drop extra columns df_merged_sites = df_merged_sites.drop(columns=["lat_center", "lon_center"]) return df_merged_sites def specify_lat_lon_profile_center(df_sites, x_val=200): """ Specify which x-coordinate in the beach profile cross section the lat/lon corresponds to :param df_sites: :return: """ df_sites["profile_x_lat_lon"] = x_val return df_sites def parse_waves(waves_mat): """ Parses the raw waves.mat file and returns a pandas dataframe :param waves_mat: :return: """ logger.info("Parsing %s", waves_mat) mat_data = loadmat(waves_mat)["data"] rows = [] for i in range(0, len(mat_data["site"])): for j in range(0, len(mat_data["dates"][i])): rows.append( { "beach": mat_data["site"][i], "lon": mat_data["lon"][i], "lat": mat_data["lat"][i], "datetime": matlab_datenum_to_datetime(mat_data["dates"][i][j][0]), "Hs": mat_data["H"][i][j][0], "Hs0": mat_data["Ho"][i][j][0], "Tp": mat_data["T"][i][j][0], "dir": mat_data["D"][i][j][0], "E": mat_data["E"][i][j][0], "P": mat_data["P"][i][j][0], "Exs": mat_data["Exs"][i][j][0], "Pxs": mat_data["Pxs"][i][j][0], } ) df = pd.DataFrame(rows) df["datetime"] = df["datetime"].dt.round("1s") return df def parse_tides(tides_mat): """ Parses the raw tides.mat file and returns a pandas dataframe :param tides_mat: :return: """ logger.info("Parsing %s", tides_mat) mat_data = loadmat(tides_mat)["data"] rows = [] for i in range(0, len(mat_data["site"])): for j in range(0, len(mat_data["time"])): rows.append( { "beach": mat_data["site"][i][0], "lon": mat_data["lons"][i][0], "lat": mat_data["lats"][i][0], "datetime": matlab_datenum_to_datetime(mat_data["time"][j][0]), "tide": mat_data["tide"][i][j], } ) df = pd.DataFrame(rows) df["datetime"] = df["datetime"].dt.round("1s") return df def parse_profiles(profiles_mat): """ Parses the raw profiles.mat file and returns a pandas dataframe :param tides_mat: :return: """ logger.info("Parsing %s", profiles_mat) mat_data = loadmat(profiles_mat)["data"] rows = [] for i in range(0, len(mat_data["site"])): for j in range(0, len(mat_data["pfx"][i])): for profile_type in ["prestorm", "poststorm"]: if profile_type == "prestorm": z = mat_data["pf1"][i][j][0] if profile_type == "poststorm": z = mat_data["pf2"][i][j][0] rows.append( { "beach": mat_data["site"][i], "lon": mat_data["lon"][i], "lat": mat_data["lat"][i], "profile_type": profile_type, "x": mat_data["pfx"][i][j][0], "z": z, } ) df = pd.DataFrame(rows) return df def remove_zeros(df_profiles): """ When parsing the pre/post storm profiles, the end of some profiles have constant values of zero. Let's change these to NaNs for consistancy. Didn't use pandas fillnan because 0 may still be a valid value. :param df: :return: """ df_profiles = df_profiles.sort_index() groups = df_profiles.groupby(level=["site_id", "profile_type"]) for key, _ in groups: logger.debug("Removing zeros from {} profile at {}".format(key[1], key[0])) idx_site = (df_profiles.index.get_level_values("site_id") == key[0]) & ( df_profiles.index.get_level_values("profile_type") == key[1] ) df_profile = df_profiles[idx_site] x_last_ele = df_profile[df_profile.z != 0].index.get_level_values("x")[-1] df_profiles.loc[idx_site & (df_profiles.index.get_level_values("x") > x_last_ele), "z"] = np.nan return df_profiles def matlab_datenum_to_datetime(matlab_datenum): """ Adapted from https://stackoverflow.com/a/13965852 :param matlab_datenum: :return: """ return datetime.fromordinal(int(matlab_datenum)) + timedelta(days=matlab_datenum % 1) - timedelta(days=366) def get_unique_sites(dfs, cols=["beach", "lat", "lon"]): """ Generates a dataframe of unique sites based on beach names, lats and lons. Creates a unique site ID for each. :param dfs: :param cols: :return: """ rows = [] df_all = pd.concat([df[cols] for df in dfs]) beach_groups = df_all.groupby(["beach"]) for beach_name, beach_group in beach_groups: site_groups = beach_group.groupby(["lat", "lon"]) siteNo = 1 for site_name, site_group in site_groups: site = "{}{:04d}".format(beach_name, siteNo) rows.append({"site_id": site, "lat": site_name[0], "lon": site_name[1], "beach": beach_name}) siteNo += 1 df = pd.DataFrame(rows) return df def replace_unique_sites(df, df_sites, cols=["lat", "lon"]): """ Replaces beach/lat/lon columns with the unique site_id :param dfs: :param df_sites: :return: """ # Make the sites index a column, so it can be merged into df df_sites["site_id"] = df_sites.index.get_level_values("site_id") # Merging on a float can lead to subtle bugs. Lets convert lat/lons to integers and merge on that instead precision = 8 df_sites["lat_int"] = np.round(df_sites["lat"] * 10 ** precision).astype(np.int64) df_sites["lon_int"] = np.round(df_sites["lon"] * 10 ** precision).astype(np.int64) df["lat_int"] = np.round(df["lat"] * 10 ** precision).astype(np.int64) df["lon_int"] = np.round(df["lon"] * 10 ** precision).astype(np.int64) df_merged = df.merge(df_sites, on=["lat_int", "lon_int"]) # Check that all our records have a unique site identifier n_unmatched = len(df) - len(df_merged) if n_unmatched > 0: logger.warning("Not all records (%d of %d) matched with a unique site", n_unmatched, len(df)) df_merged = df_merged.drop( columns=[ "lat_x", "lon_x", "lat_int", "lon_int", "beach_y", "beach_x", "lat_y", "lon_y", "orientation", "profile_x_lat_lon", ] ) return df_merged @click.command(short_help="create sites.csv") @click.option("--waves-mat", required=True, help=".mat file containing wave records") @click.option("--tides-mat", required=True, help=".mat file containing tide records") @click.option("--profiles-mat", required=True, help=".mat file containing beach profiles") @click.option("--orientations-mat", required=True, help=".mat file containing orientation of beach profiles") @click.option("--output-file", required=True, help="where to save sites.csv") def create_sites_csv(waves_mat, tides_mat, profiles_mat, orientations_mat, output_file): logger.info("Creating %s", output_file) df_waves = parse_waves(waves_mat=waves_mat) df_tides = parse_tides(tides_mat=tides_mat) df_profiles = parse_profiles(profiles_mat=profiles_mat) df_orientations = parse_orientations(orientations_mat=orientations_mat) df_sites = get_unique_sites(dfs=[df_waves, df_tides, df_profiles]) df_sites = combine_sites_and_orientaions(df_sites, df_orientations) df_sites = specify_lat_lon_profile_center(df_sites) df_sites.set_index(["site_id"], inplace=True) df_sites.to_csv(output_file) logger.info("Created %s", output_file) @click.command(short_help="create waves.csv") @click.option("--waves-mat", required=True, help=".mat file containing wave records") @click.option("--sites-csv", required=True, help=".csv file description of cross section sites") @click.option("--output-file", required=True, help="where to save waves.csv") def create_waves_csv(waves_mat, sites_csv, output_file): logger.info("Creating %s", output_file) df_waves = parse_waves(waves_mat=waves_mat) df_sites = pd.read_csv(sites_csv, index_col=[0]) df_waves = replace_unique_sites(df_waves, df_sites) df_waves.set_index(["site_id", "datetime"], inplace=True) df_waves.sort_index(inplace=True) df_waves.to_csv(output_file) logger.info("Created %s", output_file) @click.command(short_help="create profiles.csv") @click.option("--profiles-mat", required=True, help=".mat file containing beach profiles") @click.option("--sites-csv", required=True, help=".csv file description of cross section sites") @click.option("--output-file", required=True, help="where to save profiles.csv") def create_profiles_csv(profiles_mat, sites_csv, output_file): logger.info("Creating %s", output_file) df_profiles = parse_profiles(profiles_mat=profiles_mat) df_sites = pd.read_csv(sites_csv, index_col=[0]) df_profiles = replace_unique_sites(df_profiles, df_sites) df_profiles.set_index(["site_id", "profile_type", "x"], inplace=True) df_profiles.sort_index(inplace=True) df_profiles.to_csv(output_file) logger.info("Created %s", output_file) @click.command(short_help="create profiles.csv") @click.option("--tides-mat", required=True, help=".mat file containing tides") @click.option("--sites-csv", required=True, help=".csv file description of cross section sites") @click.option("--output-file", required=True, help="where to save tides.csv") def create_tides_csv(tides_mat, sites_csv, output_file): logger.info("Creating %s", output_file) df_tides = parse_tides(tides_mat=tides_mat) df_sites = pd.read_csv(sites_csv, index_col=[0]) df_tides = replace_unique_sites(df_tides, df_sites) df_tides.set_index(["site_id", "datetime"], inplace=True) df_tides.sort_index(inplace=True) df_tides.to_csv(output_file) logger.info("Created %s", output_file) @click.group() def cli(): pass if __name__ == "__main__": cli.add_command(create_sites_csv) cli.add_command(create_waves_csv) cli.add_command(create_profiles_csv) cli.add_command(create_tides_csv) cli()