""" Converts raw .mat files into a flattened .csv structure which can be imported into python pandas. """ import math from datetime import datetime, timedelta import click import numpy as np import pandas as pd from mat4py import loadmat from shapely.geometry import Point from data.profile_features import convert_coord_systems from utils import setup_logging logger = setup_logging() def parse_orientations(orientations_mat): """ Parses the raw orientations.mat file and returns a pandas dataframe. Note that orientations are the direction towards land measured in degrees anti-clockwise from east. :param orientations_mat: :return: """ logger.info("Parsing %s", orientations_mat) mat_data = loadmat(orientations_mat)["output"] rows = [] for i in range(0, len(mat_data["beach"])): rows.append( { "beach": mat_data["beach"][i], "orientation": mat_data["orientation"][i], "lat_center": mat_data["lat_center"][i], "lon_center": mat_data["lon_center"][i], "lat_land": mat_data["lat_land"][i], "lon_land": mat_data["lon_land"][i], "lat_sea": mat_data["lat_sea"][i], "lon_sea": mat_data["lon_sea"][i], } ) df = pd.DataFrame(rows) return df def parse_dune_crest_toes(df_sites, crest_mat, toe_mat): """ :param df_sites: :param crest_mat: :param toe_mat: :return: """ logger.info("Parsing dune crests and toes") rows = [] crest_data = loadmat(crest_mat) toe_data = loadmat(toe_mat) for n, _ in enumerate(crest_data['xc1']): rows.extend([{ 'dune_crest_x': crest_data['xc1'][n], 'dune_crest_z': crest_data['zc1'][n], 'dune_toe_x': toe_data['xt1'][n], 'dune_toe_z': toe_data['zt1'][n], 'profile_type': 'prestorm', 'site_no': n+1 },{ 'dune_crest_x': crest_data['xc2'][n], 'dune_crest_z': crest_data['zc2'][n], 'dune_toe_x': toe_data['xt2'][n], 'dune_toe_z': toe_data['zt2'][n], 'profile_type': 'poststorm', 'site_no': n + 1 }]) df_profile_features = pd.DataFrame(rows) # Want the site_id instead of the site_no, so merge in df_sites df_sites.reset_index(inplace=True) df_profile_features = df_sites[['site_no','site_id']].merge(df_profile_features, how='outer', on=['site_no']) df_profile_features.drop(columns=['site_no'],inplace=True) df_profile_features.set_index(['site_id','profile_type'], inplace=True) df_profile_features.sort_index(inplace=True) df_profile_features = df_profile_features.round(3) return df_profile_features def combine_sites_and_orientaions(df_sites, df_orientations): """ Replaces beach/lat/lon columns with the unique site_id. :param dfs: :param df_sites: :return: """ df_merged_sites = df_sites.merge( df_orientations[["beach", "lat_center", "lon_center", "orientation"]], left_on=["beach", "lat", "lon"], right_on=["beach", "lat_center", "lon_center"], ) # Check that all our records have a unique site identifier n_unmatched = len(df_sites) - len(df_merged_sites) if n_unmatched > 0: logger.warning("Not all records (%d of %d) matched with an orientation", n_unmatched, len(df_sites)) # Drop extra columns df_merged_sites = df_merged_sites.drop(columns=["lat_center", "lon_center"]) return df_merged_sites def specify_lat_lon_profile_center(df_sites, x_val=200): """ Specify which x-coordinate in the beach profile cross section the lat/lon corresponds to :param df_sites: :return: """ df_sites["profile_x_lat_lon"] = x_val return df_sites def parse_waves(waves_mat): """ Parses the raw waves.mat file and returns a pandas dataframe :param waves_mat: :return: """ logger.info("Parsing %s", waves_mat) mat_data = loadmat(waves_mat)["data"] rows = [] for i in range(0, len(mat_data["site"])): for j in range(0, len(mat_data["dates"][i])): rows.append( { "beach": mat_data["site"][i], "lon": mat_data["lon"][i], "lat": mat_data["lat"][i], "datetime": matlab_datenum_to_datetime(mat_data["dates"][i][j][0]), "Hs": mat_data["H"][i][j][0], "Hs0": mat_data["Ho"][i][j][0], "Tp": mat_data["T"][i][j][0], "dir": mat_data["D"][i][j][0], "E": mat_data["E"][i][j][0], "P": mat_data["P"][i][j][0], "Exs": mat_data["Exs"][i][j][0], "Pxs": mat_data["Pxs"][i][j][0], } ) df = pd.DataFrame(rows) df["datetime"] = df["datetime"].dt.round("1s") return df def parse_tides(tides_mat): """ Parses the raw tides.mat file and returns a pandas dataframe :param tides_mat: :return: """ logger.info("Parsing %s", tides_mat) mat_data = loadmat(tides_mat)["data"] rows = [] for i in range(0, len(mat_data["site"])): for j in range(0, len(mat_data["time"])): rows.append( { "beach": mat_data["site"][i][0], "lon": mat_data["lons"][i][0], "lat": mat_data["lats"][i][0], "datetime": matlab_datenum_to_datetime(mat_data["time"][j][0]), "tide": mat_data["tide"][i][j], } ) df = pd.DataFrame(rows) df["datetime"] = df["datetime"].dt.round("1s") return df def parse_profiles_and_sites(profiles_mat): """ Parses the raw profiles.mat file and returns a pandas dataframe :param tides_mat: :return: """ logger.info("Parsing %s", profiles_mat) mat_data = loadmat(profiles_mat)["data"] profile_rows = [] site_rows = [] site_counter = 0 for i, site in enumerate(mat_data["site"]): logger.debug('Processing site {} of {}'.format(i+1, len(mat_data['site']))) # Give each site a unique id if len(site_rows) == 0 or site_rows[-1]["beach"] != site: site_counter = 1 else: site_counter += 1 site_id = "{}{:04d}".format(site, site_counter) # Initalize location of x=200m latitude and longitude x_200_lat = np.nan x_200_lon = np.nan # Want to calculation the orientation orientation = {} for x, lat, lon, z_prestorm, z_poststorm, easting, northing in zip( mat_data["x"][i], mat_data["lats"][i], mat_data["lons"][i], mat_data["Zpre"][i], mat_data["Zpost"][i], mat_data["eastings"][i], mat_data["northings"][i], ): # Only extract pre and post storm profile for j, profile_type in enumerate(["prestorm", "poststorm"]): if mat_data["isgood"][i][j] == 1: land_lim = mat_data["landlims"][i][j] survey_datetime = matlab_datenum_to_datetime(mat_data["surveydates"][i][j]) if profile_type == "prestorm": z = z_prestorm else: z = z_poststorm # Keep a record of the where the center of the profile is located, and the locations of the land # and sea # TODO: This code isn't very transferrable. What if we don't have lat/lons at 200 m? Relook at this if x[0] == 200: x_200_lat = lat[0] x_200_lon = lon[0] elif x[0] == 0: orientation["land_easting"] = easting[0] orientation["land_northing"] = northing[0] elif x[0] == 400: orientation["sea_easting"] = easting[0] orientation["sea_northing"] = northing[0] profile_rows.append( { "site_id": site_id, "lon": lon[0], "lat": lat[0], "profile_type": profile_type, "x": x[0], "z": z[0], "land_lim": land_lim, "survey_datetime": survey_datetime, } ) orientation = math.degrees( math.atan2( orientation["land_northing"] - orientation["sea_northing"], orientation["land_easting"] - orientation["sea_easting"], ) ) site_rows.append( { "site_id": site_id, "site_no": i + 1, "beach": site, "lat": x_200_lat, "lon": x_200_lon, "orientation": orientation, "profile_x_lat_lon": 200, } ) df_profiles = pd.DataFrame(profile_rows) df_sites = pd.DataFrame(site_rows) logger.info("Parsed profiles and sites") return df_profiles, df_sites def remove_zeros(df_profiles): """ When parsing the pre/post storm profiles, the end of some profiles have constant values of zero. Let's change these to NaNs for consistancy. Didn't use pandas fillnan because 0 may still be a valid value. :param df_profiles: :return: """ logger.info("Removing zeros from end of profiles") df_profiles = df_profiles.sort_index() groups = df_profiles.groupby(level=["site_id", "profile_type"]) for key, _ in groups: logger.debug("Removing zeros from {} profile at {}".format(key[1], key[0])) idx_site = (df_profiles.index.get_level_values("site_id") == key[0]) & ( df_profiles.index.get_level_values("profile_type") == key[1] ) df_profile = df_profiles[idx_site] x_last_ele = df_profile[df_profile.z != 0].index.get_level_values("x")[-1] df_profiles.loc[idx_site & (df_profiles.index.get_level_values("x") > x_last_ele), "z"] = np.nan logger.info("Removed zeros from end of profiles") return df_profiles def matlab_datenum_to_datetime(matlab_datenum): """ Adapted from https://stackoverflow.com/a/13965852 :param matlab_datenum: :return: """ return datetime.fromordinal(int(matlab_datenum)) + timedelta(days=matlab_datenum % 1) - timedelta(days=366) def replace_unique_sites(df, df_sites): """ Replaces beach/lat/lon columns with the unique site_id :param dfs: :param df_sites: :return: """ # Make the sites index a column, so it can be merged into df df_sites["site_id"] = df_sites.index.get_level_values("site_id") # Create eastings and northings so we can calculate distances site_points = [convert_coord_systems(Point(lon, lat)).xy for lon, lat in zip(df_sites["lon"], df_sites["lat"])] df_sites["easting"] = [x[0][0] for x in site_points] df_sites["northing"] = [x[1][0] for x in site_points] # Process each unique combination lat/lons in groups groups = df.groupby(["lat", "lon"]) for (lat, lon), df_group in groups: # Calculate distances from each point to each site and determine closest site easting, northing = [x[0] for x in convert_coord_systems(Point(lon, lat)).xy] distances_to_sites = np.sqrt((df_sites["easting"] - easting) ** 2 + (df_sites["northing"] - northing) ** 2) min_distance = distances_to_sites.min() closest_site = distances_to_sites.idxmin() # Do some logging so we can check later. if min_distance > 1: logger.warning("Closest site to (%.4f,%.4f) is %s (%.2f m away)", lat, lon, closest_site, min_distance) else: logger.info("Closest site to (%.4f,%.4f) is %s (%.2f m away)", lat, lon, closest_site, min_distance) # Assign site_id based on closest site df.loc[df_group.index, "site_id"] = closest_site nan_count = df.site_id.isna().sum() if nan_count > 0: logger.warning("Not all records (%d of %d) matched with a unique site", nan_count, len(df)) df = df.drop(columns=["lat", "lon", "beach"]) return df @click.command(short_help="create waves.csv") @click.option("--waves-mat", required=True, help=".mat file containing wave records") @click.option("--sites-csv", required=True, help=".csv file description of cross section sites") @click.option("--output-file", required=True, help="where to save waves.csv") def create_waves_csv(waves_mat, sites_csv, output_file): logger.info("Creating %s", output_file) df_waves = parse_waves(waves_mat=waves_mat) df_sites = pd.read_csv(sites_csv, index_col=[0]) df_waves = replace_unique_sites(df_waves, df_sites) df_waves.set_index(["site_id", "datetime"], inplace=True) df_waves.sort_index(inplace=True) df_waves.to_csv(output_file) logger.info("Created %s", output_file) @click.command(short_help="create profile_features.csv") @click.option("--crest-mat", required=True, help=".mat file containing wave records") @click.option("--toe-mat", required=True, help=".mat file containing wave records") @click.option("--sites-csv", required=True, help=".csv file description of cross section sites") @click.option("--output-file", required=True, help="where to save waves.csv") def create_profile_features(crest_mat, toe_mat, sites_csv, output_file): logger.info("Creating %s", output_file) df_sites = pd.read_csv(sites_csv, index_col=[0]) df_profile_features = parse_dune_crest_toes(df_sites, crest_mat, toe_mat) df_profile_features.to_csv(output_file) logger.info("Created %s", output_file) @click.command(short_help="create profiles.csv") @click.option("--profiles-mat", required=True, help=".mat file containing beach profiles") @click.option("--profiles-output-file", required=True, help="where to save profiles.csv") @click.option("--sites-output-file", required=True, help="where to save sites.csv") def create_sites_and_profiles_csv(profiles_mat, profiles_output_file, sites_output_file): logger.info("Creating sites and profiles csvs") df_profiles, df_sites = parse_profiles_and_sites(profiles_mat=profiles_mat) df_profiles.set_index(["site_id", "profile_type", "x"], inplace=True) df_profiles.sort_index(inplace=True) df_profiles = remove_zeros(df_profiles) df_sites.set_index(["site_id"], inplace=True) df_sites.sort_index(inplace=True) df_profiles.to_csv(profiles_output_file) logger.info("Created %s", profiles_output_file) df_sites.to_csv(sites_output_file) logger.info("Created %s", sites_output_file) @click.command(short_help="create profiles.csv") @click.option("--tides-mat", required=True, help=".mat file containing tides") @click.option("--sites-csv", required=True, help=".csv file description of cross section sites") @click.option("--output-file", required=True, help="where to save tides.csv") def create_tides_csv(tides_mat, sites_csv, output_file): logger.info("Creating %s", output_file) df_tides = parse_tides(tides_mat=tides_mat) df_sites = pd.read_csv(sites_csv, index_col=[0]) df_tides = replace_unique_sites(df_tides, df_sites) df_tides.set_index(["site_id", "datetime"], inplace=True) df_tides.sort_index(inplace=True) df_tides.to_csv(output_file) logger.info("Created %s", output_file) @click.group() def cli(): pass if __name__ == "__main__": cli.add_command(create_waves_csv) cli.add_command(create_sites_and_profiles_csv) cli.add_command(create_tides_csv) cli()