You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
5.7 KiB
Python

"""
Converts raw .mat files into a flattened .csv structure which can be imported into python pandas.
"""
import logging.config
from datetime import datetime, timedelta
import pandas as pd
from mat4py import loadmat
logging.config.fileConfig('../logging.conf', disable_existing_loggers=False)
logger = logging.getLogger(__name__)
def parse_waves(waves_mat):
"""
Parses the raw waves.mat file and returns a pandas dataframe
:param waves_mat:
:return:
"""
logger.info('Parsing %s', waves_mat)
mat_data = loadmat(waves_mat)['data']
rows = []
for i in range(0, len(mat_data['site'])):
for j in range(0, len(mat_data['dates'][i])):
rows.append({
'beach': mat_data['site'][i],
'lon': mat_data['lon'][i],
'lat': mat_data['lat'][i],
'datetime': matlab_datenum_to_datetime(mat_data['dates'][i][j][0]),
'Hs': mat_data['H'][i][j][0],
'Hs0': mat_data['Ho'][i][j][0],
'Tp': mat_data['T'][i][j][0],
'dir': mat_data['D'][i][j][0],
'E': mat_data['E'][i][j][0],
'P': mat_data['P'][i][j][0],
'Exs': mat_data['Exs'][i][j][0],
'Pxs': mat_data['Pxs'][i][j][0],
})
df = pd.DataFrame(rows)
df['datetime'] = df['datetime'].dt.round('1s')
return df
def parse_tides(tides_mat):
"""
Parses the raw tides.mat file and returns a pandas dataframe
:param tides_mat:
:return:
"""
logger.info('Parsing %s', tides_mat)
mat_data = loadmat(tides_mat)['data']
rows = []
for i in range(0, len(mat_data['site'])):
for j in range(0, len(mat_data['time'])):
rows.append({
'beach': mat_data['site'][i][0],
'lon': mat_data['lons'][i][0],
'lat': mat_data['lats'][i][0],
'datetime': matlab_datenum_to_datetime(mat_data['time'][j][0]),
'tide': mat_data['tide'][i][j]
})
df = pd.DataFrame(rows)
df['datetime'] = df['datetime'].dt.round('1s')
return df
def parse_profiles(profiles_mat):
"""
Parses the raw profiles.mat file and returns a pandas dataframe
:param tides_mat:
:return:
"""
logger.info('Parsing %s', profiles_mat)
mat_data = loadmat(profiles_mat)['data']
rows = []
for i in range(0, len(mat_data['site'])):
for j in range(0, len(mat_data['pfx'][i])):
for profile_type in ['prestorm', 'poststorm']:
if profile_type == 'prestorm':
z = mat_data['pf1'][i][j][0]
if profile_type == 'poststorm':
z = mat_data['pf2'][i][j][0]
rows.append({
'beach': mat_data['site'][i],
'lon': mat_data['lon'][i],
'lat': mat_data['lat'][i],
'profile_type': profile_type,
'x': mat_data['pfx'][i][j][0],
'z': z,
})
df = pd.DataFrame(rows)
return df
def matlab_datenum_to_datetime(matlab_datenum):
# https://stackoverflow.com/a/13965852
return datetime.fromordinal(int(matlab_datenum)) + timedelta(days=matlab_datenum % 1) - timedelta(
days=366)
def get_unique_sites(dfs, cols=['beach', 'lat', 'lon']):
"""
Generates a dataframe of unique sites based on beach names, lats and lons. Creates a unique site ID for each.
:param dfs:
:param cols:
:return:
"""
rows = []
df_all = pd.concat([df[cols] for df in dfs])
beach_groups = df_all.groupby(['beach'])
for beach_name, beach_group in beach_groups:
site_groups = beach_group.groupby(['lat', 'lon'])
siteNo = 1
for site_name, site_group in site_groups:
site = '{}{:04d}'.format(beach_name, siteNo)
rows.append({'site_id': site,
'lat': site_name[0],
'lon': site_name[1],
'beach': beach_name})
siteNo += 1
df = pd.DataFrame(rows)
return df
def replace_unique_sites(df, df_sites, cols=['beach', 'lat', 'lon']):
"""
Replaces beach/lat/lon columns with the unique site_id
:param dfs:
:param df_sites:
:return:
"""
df_merged = df.merge(df_sites, on=cols)
# Check that all our records have a unique site identifier
n_unmatched = len(df) - len(df_merged)
if n_unmatched > 0:
logger.warning('Not all records (%d of %d) matched with a unique site', n_unmatched, len(df))
df_merged = df_merged.drop(columns=cols)
return df_merged
def main():
df_waves = parse_waves(waves_mat='../../data/raw/waves.mat')
df_tides = parse_tides(tides_mat='../../data/raw/tides.mat')
df_profiles = parse_profiles(profiles_mat='../../data/raw/profiles.mat')
df_sites = get_unique_sites(dfs=[df_waves, df_tides, df_profiles])
logger.info('Identifying unique sites')
df_waves = replace_unique_sites(df_waves, df_sites)
df_tides = replace_unique_sites(df_tides, df_sites)
df_profiles = replace_unique_sites(df_profiles, df_sites)
logger.info('Setting pandas index')
df_profiles.set_index(['site_id', 'profile_type', 'x'], inplace=True)
df_waves.set_index(['site_id', 'datetime'], inplace=True)
df_tides.set_index(['site_id', 'datetime'], inplace=True)
df_sites.set_index(['site_id'], inplace=True)
logger.info('Outputting .csv files')
df_profiles.to_csv('../../data/interim/profiles.csv')
df_tides.to_csv('../../data/interim/tides.csv')
df_waves.to_csv('../../data/interim/waves.csv')
df_sites.to_csv('../../data/interim/sites.csv')
logger.info('Done!')
if __name__ == '__main__':
main()