""" Converts raw .mat files into a flattened .csv structure which can be imported into python pandas. """ import os import sys sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from datetime import datetime, timedelta import math import click import numpy as np import pandas as pd from mat4py import loadmat from shapely.geometry import Point from profile_features import convert_coord_systems from utils import setup_logging logger = setup_logging() def parse_orientations(orientations_mat): """ Parses the raw orientations.mat file and returns a pandas dataframe. Note that orientations are the direction towards land measured in degrees anti-clockwise from east. :param orientations_mat: :return: """ logger.info("Parsing %s", orientations_mat) mat_data = loadmat(orientations_mat)["output"] rows = [] for i in range(0, len(mat_data["beach"])): rows.append( { "beach": mat_data["beach"][i], "orientation": mat_data["orientation"][i], "lat_center": mat_data["lat_center"][i], "lon_center": mat_data["lon_center"][i], "lat_land": mat_data["lat_land"][i], "lon_land": mat_data["lon_land"][i], "lat_sea": mat_data["lat_sea"][i], "lon_sea": mat_data["lon_sea"][i], } ) df = pd.DataFrame(rows) return df def combine_sites_and_orientaions(df_sites, df_orientations): """ Replaces beach/lat/lon columns with the unique site_id. :param dfs: :param df_sites: :return: """ df_merged_sites = df_sites.merge( df_orientations[["beach", "lat_center", "lon_center", "orientation"]], left_on=["beach", "lat", "lon"], right_on=["beach", "lat_center", "lon_center"], ) # Check that all our records have a unique site identifier n_unmatched = len(df_sites) - len(df_merged_sites) if n_unmatched > 0: logger.warning("Not all records (%d of %d) matched with an orientation", n_unmatched, len(df_sites)) # Drop extra columns df_merged_sites = df_merged_sites.drop(columns=["lat_center", "lon_center"]) return df_merged_sites def specify_lat_lon_profile_center(df_sites, x_val=200): """ Specify which x-coordinate in the beach profile cross section the lat/lon corresponds to :param df_sites: :return: """ df_sites["profile_x_lat_lon"] = x_val return df_sites def parse_waves(waves_mat): """ Parses the raw waves.mat file and returns a pandas dataframe :param waves_mat: :return: """ logger.info("Parsing %s", waves_mat) mat_data = loadmat(waves_mat)["data"] rows = [] for i in range(0, len(mat_data["site"])): for j in range(0, len(mat_data["dates"][i])): rows.append( { "beach": mat_data["site"][i], "lon": mat_data["lon"][i], "lat": mat_data["lat"][i], "datetime": matlab_datenum_to_datetime(mat_data["dates"][i][j][0]), "Hs": mat_data["H"][i][j][0], "Hs0": mat_data["Ho"][i][j][0], "Tp": mat_data["T"][i][j][0], "dir": mat_data["D"][i][j][0], "E": mat_data["E"][i][j][0], "P": mat_data["P"][i][j][0], "Exs": mat_data["Exs"][i][j][0], "Pxs": mat_data["Pxs"][i][j][0], } ) df = pd.DataFrame(rows) df["datetime"] = df["datetime"].dt.round("1s") return df def parse_tides(tides_mat): """ Parses the raw tides.mat file and returns a pandas dataframe :param tides_mat: :return: """ logger.info("Parsing %s", tides_mat) mat_data = loadmat(tides_mat)["data"] rows = [] for i in range(0, len(mat_data["site"])): for j in range(0, len(mat_data["time"])): rows.append( { "beach": mat_data["site"][i][0], "lon": mat_data["lons"][i][0], "lat": mat_data["lats"][i][0], "datetime": matlab_datenum_to_datetime(mat_data["time"][j][0]), "tide": mat_data["tide"][i][j], } ) df = pd.DataFrame(rows) df["datetime"] = df["datetime"].dt.round("1s") return df def parse_profiles_and_sites(profiles_mat): """ Parses the raw profiles.mat file and returns a pandas dataframe :param tides_mat: :return: """ logger.info("Parsing %s", profiles_mat) mat_data = loadmat(profiles_mat)["data"] profile_rows = [] site_rows = [] site_counter = 0 for i, site in enumerate(mat_data["site"]): # Give each site a unique id if len(site_rows) == 0 or site_rows[-1]["beach"] != site: site_counter = 1 else: site_counter += 1 site_id = "{}{:04d}".format(site, site_counter) # Initalize location of x=200m latitude and longitude x_200_lat = np.nan x_200_lon = np.nan # Want to calculation the orientation orientation = {} for x, lat, lon, z_prestorm, z_poststorm, easting, northing in zip( mat_data["x"][i], mat_data["lats"][i], mat_data["lons"][i], mat_data["Zpre"][i], mat_data["Zpost"][i], mat_data["eastings"][i], mat_data["northings"][i], ): # Only extract pre and post storm profile for j, profile_type in enumerate(["prestorm", "poststorm"]): if mat_data["isgood"][i][j] == 1: land_lim = mat_data["landlims"][i][j] survey_datetime = matlab_datenum_to_datetime(mat_data["surveydates"][i][j]) if profile_type == "prestorm": z = z_prestorm else: z = z_poststorm # Keep a record of the where the center of the profile is located, and the locations of the land # and sea # TODO: This code isn't very transferrable. What if we don't have lat/lons at 200 m? Relook at this if x[0] == 200: x_200_lat = lat[0] x_200_lon = lon[0] elif x[0] == 0: orientation["land_easting"] = easting[0] orientation["land_northing"] = northing[0] elif x[0] == 400: orientation["sea_easting"] = easting[0] orientation["sea_northing"] = northing[0] profile_rows.append( { "site_id": site_id, "lon": lon[0], "lat": lat[0], "profile_type": profile_type, "x": x[0], "z": z[0], "land_lim": land_lim, "survey_datetime": survey_datetime, } ) orientation = math.degrees( math.atan2( orientation["land_northing"] - orientation["sea_northing"], orientation["land_easting"] - orientation["sea_easting"], ) ) site_rows.append( { "site_id": site_id, "beach": site, "lat": x_200_lat, "lon": x_200_lon, "orientation": orientation, "profile_x_lat_lon": 200, } ) df_profiles = pd.DataFrame(profile_rows) df_sites = pd.DataFrame(site_rows) logger.info("Parsed profiles and sites") return df_profiles, df_sites def remove_zeros(df_profiles): """ When parsing the pre/post storm profiles, the end of some profiles have constant values of zero. Let's change these to NaNs for consistancy. Didn't use pandas fillnan because 0 may still be a valid value. :param df_profiles: :return: """ logger.info("Removing zeros from end of profiles") df_profiles = df_profiles.sort_index() groups = df_profiles.groupby(level=["site_id", "profile_type"]) for key, _ in groups: logger.debug("Removing zeros from {} profile at {}".format(key[1], key[0])) idx_site = (df_profiles.index.get_level_values("site_id") == key[0]) & ( df_profiles.index.get_level_values("profile_type") == key[1] ) df_profile = df_profiles[idx_site] x_last_ele = df_profile[df_profile.z != 0].index.get_level_values("x")[-1] df_profiles.loc[idx_site & (df_profiles.index.get_level_values("x") > x_last_ele), "z"] = np.nan logger.info("Removed zeros from end of profiles") return df_profiles def matlab_datenum_to_datetime(matlab_datenum): """ Adapted from https://stackoverflow.com/a/13965852 :param matlab_datenum: :return: """ return datetime.fromordinal(int(matlab_datenum)) + timedelta(days=matlab_datenum % 1) - timedelta(days=366) def replace_unique_sites(df, df_sites): """ Replaces beach/lat/lon columns with the unique site_id :param dfs: :param df_sites: :return: """ # Make the sites index a column, so it can be merged into df df_sites["site_id"] = df_sites.index.get_level_values("site_id") # Create eastings and northings so we can calculate distances site_points = [convert_coord_systems(Point(lon, lat)).xy for lon, lat in zip(df_sites["lon"], df_sites["lat"])] df_sites["easting"] = [x[0][0] for x in site_points] df_sites["northing"] = [x[1][0] for x in site_points] # Process each unique combination lat/lons in groups groups = df.groupby(["lat", "lon"]) for (lat, lon), df_group in groups: # Calculate distances from each point to each site and determine closest site easting, northing = [x[0] for x in convert_coord_systems(Point(lon, lat)).xy] distances_to_sites = np.sqrt((df_sites["easting"] - easting) ** 2 + (df_sites["northing"] - northing) ** 2) min_distance = distances_to_sites.min() closest_site = distances_to_sites.idxmin() # Do some logging so we can check later. if min_distance > 1: logger.warning("Closest site to (%.4f,%.4f) is %s (%.2f m away)", lat, lon, closest_site, min_distance) else: logger.info("Closest site to (%.4f,%.4f) is %s (%.2f m away)", lat, lon, closest_site, min_distance) # Assign site_id based on closest site df.loc[df_group.index, "site_id"] = closest_site nan_count = df.site_id.isna().sum() if nan_count > 0: logger.warning("Not all records (%d of %d) matched with a unique site", nan_count, len(df)) df = df.drop(columns=["lat", "lon", "beach"]) return df @click.command(short_help="create waves.csv") @click.option("--waves-mat", required=True, help=".mat file containing wave records") @click.option("--sites-csv", required=True, help=".csv file description of cross section sites") @click.option("--output-file", required=True, help="where to save waves.csv") def create_waves_csv(waves_mat, sites_csv, output_file): logger.info("Creating %s", output_file) df_waves = parse_waves(waves_mat=waves_mat) df_sites = pd.read_csv(sites_csv, index_col=[0]) df_waves = replace_unique_sites(df_waves, df_sites) df_waves.set_index(["site_id", "datetime"], inplace=True) df_waves.sort_index(inplace=True) df_waves.to_csv(output_file) logger.info("Created %s", output_file) @click.command(short_help="create profiles.csv") @click.option("--profiles-mat", required=True, help=".mat file containing beach profiles") @click.option("--profiles-output-file", required=True, help="where to save profiles.csv") @click.option("--sites-output-file", required=True, help="where to save sites.csv") def create_sites_and_profiles_csv(profiles_mat, profiles_output_file, sites_output_file): logger.info("Creating sites and profiles csvs") df_profiles, df_sites = parse_profiles_and_sites(profiles_mat=profiles_mat) df_profiles.set_index(["site_id", "profile_type", "x"], inplace=True) df_profiles.sort_index(inplace=True) df_profiles = remove_zeros(df_profiles) df_sites.set_index(["site_id"], inplace=True) df_sites.sort_index(inplace=True) df_profiles.to_csv(profiles_output_file) logger.info("Created %s", profiles_output_file) df_sites.to_csv(sites_output_file) logger.info("Created %s", sites_output_file) @click.command(short_help="create profiles.csv") @click.option("--tides-mat", required=True, help=".mat file containing tides") @click.option("--sites-csv", required=True, help=".csv file description of cross section sites") @click.option("--output-file", required=True, help="where to save tides.csv") def create_tides_csv(tides_mat, sites_csv, output_file): logger.info("Creating %s", output_file) df_tides = parse_tides(tides_mat=tides_mat) df_sites = pd.read_csv(sites_csv, index_col=[0]) df_tides = replace_unique_sites(df_tides, df_sites) df_tides.set_index(["site_id", "datetime"], inplace=True) df_tides.sort_index(inplace=True) df_tides.to_csv(output_file) logger.info("Created %s", output_file) @click.group() def cli(): pass if __name__ == "__main__": cli.add_command(create_waves_csv) cli.add_command(create_sites_and_profiles_csv) cli.add_command(create_tides_csv) cli()