You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

593 lines
19 KiB
Python

"""waternsw_grabber.py
Download bore records from the WaterNSW data portal.
"""
import os
import re
import time
import shutil
import logging
import warnings
import requests
import numpy as np
import pandas as pd
from tqdm import tqdm
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
TimeoutException, StaleElementReferenceException, NoSuchElementException)
def has_admin():
"""Check if current user has admin rights.
https://stackoverflow.com/questions/2946746
"""
if os.name == 'nt':
try:
# Check if C:/Windows/temp is readable for current user
os.listdir(os.path.join(os.environ.get('systemroot'), 'temp'))
except PermissionError:
return False
else:
return True
else:
if 'SUDO_USER' in os.environ and os.geteuid() == 0:
return True
else:
return False
def wait_for_element(driver, by, x, timeout=180):
"""Wait for element on page to load.
Args:
driver: selenium webdriver object
by: locator strategy (e.g. By.ID)
x: locator string
timeout: maximum wait time (seconds)
Raises
TimeoutException if element does not load within timeout period
"""
element_present = EC.presence_of_element_located((by, x))
WebDriverWait(driver, timeout).until(element_present)
def wait_for_body_text(driver):
"""Wait for body text element on page to load, and not be empty.
Args:
driver: selenium webdriver object
Returns
Body text
Raises
TimeoutException if element does not load within timeout period
"""
body_text = None
while not body_text:
try:
# Get contents of body text
body_text = driver.find_element_by_xpath('//*/body').text
except (StaleElementReferenceException, NoSuchElementException):
pass
time.sleep(0.5)
return body_text
def get_telemetered_bore(driver, bore_id, start_date, end_date):
"""Download single record from telemetered bore.
Args:
driver: selenium webdriver object
bore_id: bore ID (string)
start_date: start date (string in YYYY-MM-DD format)
end_date: end date (string in YYYY-MM-DD format)
"""
url = 'https://realtimedata.waternsw.com.au/water.stm'
driver.get(url)
driver.switch_to.default_content()
webhyd = driver.find_element_by_id('webhyd')
driver.switch_to.frame(webhyd)
# Load site specific page
driver.execute_script("go('{}','gw', 1)".format(bore_id))
# Wait for results frame to load
wait_for_element(driver, By.ID, 'gwgwlf_org')
driver.switch_to.frame('gwgwlf_org')
# Wait until body text of iframe has loaded
body_text = wait_for_body_text(driver)
# Detect if bore record does not exist
if 'No SITE record found for site' in body_text:
raise ValueError('No SITE record found for site {}'.format(bore_id))
# Wait for navigation tabs
wait_for_element(driver, By.XPATH, '//*[@id="tabstext"]')
# Activate outputs tab
driver.execute_script("menuloc.display_frame('gw', 'gwcf_org', '1')")
driver.switch_to.parent_frame()
wait_for_element(driver, By.ID, 'gwgwcf_org')
driver.switch_to.frame('gwgwcf_org')
# Wait until body text of iframe has loaded
body_text = wait_for_body_text(driver)
# Detect if no variables are available
if 'No variables data found for this site.' in body_text:
raise ValueError('No variables data found for site {}'.format(bore_id))
# Wait for 'Get Output' button
wait_for_element(driver, By.ID, 'submit')
# Get output select controls
selects = driver.find_elements_by_xpath('//*/select')
for select in selects:
s = Select(select)
label = s.options[0].get_attribute('label')
if label == 'All data':
period = s
elif label == 'Plot':
output = s
elif label == 'Annual':
interval = s
# Change period dropdown to 'Custom'
period.select_by_visible_text('Custom')
# Get date input fields
fields = driver.find_elements_by_xpath('//*[starts-with(@id,"cdate")]')
# Get available date ranges
datestr = driver.find_elements_by_xpath('//*/tr/td[4]/span')
dates = np.array([d.text.split(' to ') for d in datestr])
if start_date is not None:
start_date = pd.to_datetime(start_date)
else:
# Get date from page, if not provided
start_date = pd.to_datetime(dates[:, 0], dayfirst=True).min()
if end_date is not None:
end_date = pd.to_datetime(end_date)
else:
# Get date from page, if not provided
end_date = pd.to_datetime(dates[:, 1], dayfirst=True).max()
# Update fields with specified dates
for field, date in zip(fields, [start_date, end_date]):
field.clear()
field.send_keys(pd.datetime.strftime(date, '%H:%M_%d/%m/%Y'))
# Set output dropdown to 'Download'
output.select_by_visible_text('Download')
# Set interval dropdown to 'All points'
interval.select_by_visible_text('All points')
# Make sure 'Groundwater Level - AHD' is selected as an output
try:
checkbox = driver.find_element_by_xpath(
'//*/input[contains(@name, "sel__110.00_115.00")]')
if not checkbox.get_attribute('selected'):
checkbox.click()
except NoSuchElementException:
pass
# Download data
driver.execute_script("get_output()")
driver.execute_script("hide_object('confirm');co(level,tab,1)")
# Close popup
wait_for_element(
driver,
By.XPATH,
"//div[contains(@class, 'lity-container')]",
timeout=60)
webdriver.ActionChains(driver).send_keys(Keys.ESCAPE).perform()
def open_browser(download_dir):
"""Opens an automated Firefox browser instance.
Args:
download_dir: path to where downloaded files will be saved
Returns:
A selenium web browser object
"""
# Make download directory absolute
download_dir = os.path.abspath(download_dir)
# Set up Firefox to silently download files to specified folder
profile = webdriver.FirefoxProfile()
profile.set_preference('browser.download.folderList', 2)
profile.set_preference('browser.download.manager.showWhenStarting', False)
profile.set_preference('browser.download.dir', download_dir)
profile.set_preference('browser.helperApps.neverAsk.saveToDisk',
('application/zip,'
'application/octet-stream,'
'application/x-zip-compressed,'
'multipart/x-zip'))
# Create download folder if it does not exist
os.makedirs(download_dir, exist_ok=True)
# Open browser
driver = webdriver.Firefox(firefox_profile=profile)
return driver
def telemetered_bore_downloader(basin_name,
download_dir=None,
start_date=None,
end_date=None):
"""Download multiple records from telemetered bore.
Args:
basin_name: basin name (string)
download_dir: path to where downloaded files will be saved
start_date: start date (string YYYY-MM-DD format)
end_date: end date (string YYYY-MM-DD format)
Raises:
ValueError when bore ID is invalid
"""
# Get full name of basin
basins = get_basins()
# Check if full basin name was provided
basin_names = basins['Basin name'].unique()
if basin_name not in basin_names:
raise ValueError("'basin_name' must be one of:\n" + '\n'.join(basin_names))
# Get list of bore IDs from selected basin
bore_ids = basins[basins['Basin name'] == basin_name].index.values
# Add basin name to root download directory
download_dir = os.path.join(download_dir, basin_name)
# Open browser
driver = open_browser(download_dir)
# Set up log File
log_name = os.path.join(download_dir, 'errors.log')
logging.basicConfig(filename=log_name, level=logging.ERROR)
# Download bore logs
pbar = tqdm(bore_ids)
for bore_id in pbar:
pbar.set_description(bore_id)
try:
get_telemetered_bore(driver, bore_id, start_date, end_date)
except ValueError as e:
logging.error(e)
except TimeoutException:
e = 'Request timed out on {}. Try again later?'.format(bore_id)
logging.error(e)
# Tidy up console after tqdm
print('\n')
# Stop logging
logging.shutdown()
if os.path.isfile(log_name):
with open(log_name, 'r') as f:
log_data = f.read()
# Check contents of log file
if log_data:
warnings.warn(
'Some files failed to download. See log for details.',
stacklevel=2)
else:
os.remove(log_name)
# Wait for downloads to finish
time.sleep(10)
# Close browser
driver.quit()
def get_basins():
"""Load basin definitions file."""
# Get basin info for telemetered site data
csv_name = os.path.join(
os.path.dirname(__file__), 'data', 'telemetered-sites.csv')
basins = pd.read_csv(csv_name, index_col=0)
return basins
def extract_definitions(basin_name, download_dir):
"""Extract variable and quality metadata from bore records.
Args:
basin_name: basin name (string)
download_dir: path to downloaded zip archives
"""
# Get basin info for telemetered site data
basins = get_basins()
# Prepare output directory
output_dir = os.path.join(download_dir, basin_name)
os.makedirs(output_dir, exist_ok=True)
# Find zip files
zip_names = [f for f in os.listdir(output_dir) if f.endswith('.zip')]
if not zip_names:
raise ValueError('No zip files found')
for zip_name in zip_names:
# Skip duplicate downloads
if re.search(r'\([0-9]+\)', zip_name):
continue
# Rename '.part' file if zip was not correctly downloaded
if os.path.getsize(os.path.join(output_dir, zip_name)) == 0:
shutil.move(
os.path.join(output_dir, zip_name) + '.part',
os.path.join(output_dir, zip_name))
# Read csv file inside zip archive
df = pd.read_csv(
os.path.join(output_dir, zip_name),
header=2,
skiprows=[3],
parse_dates=['Date'],
compression='zip',
dayfirst=True,
nrows=100)
# Extract metadata from last column
keys = ['Sites:', 'Variables:', 'Qualities:']
meta = {k: [] for k in keys}
for i, row in df.iterrows():
line = row.values[-1]
if line in keys:
header = True
var = line
elif line == ' ':
continue
else:
meta[var].append(line)
# Get bore specifics
site_data = meta['Sites:'][0]
lat = float(re.search(r'(?<=Lat:)\S+', site_data).group())
lon = float(re.search(r'(?<=Long:)\S+', site_data).group())
try:
elev = float(re.search(r'(?<=Elev:).+(?=m)', site_data).group())
except AttributeError:
elev = np.nan
address = re.search(r'(?<=\d\.\d\.\d - ).+(?=\sLat)',
site_data).group()
bore_id = re.search(r'^\S+', site_data).group()
site, hole, pipe = bore_id.split('.')
sites = pd.DataFrame()
sites['ID'] = [bore_id]
sites['Site'] = [site]
sites['Hole'] = [hole]
sites['Pipe'] = [pipe]
sites['Lat'] = [lat]
sites['Lon'] = [lon]
sites['Elev'] = [elev]
sites['Address'] = [address]
sites = sites.set_index('ID')
# Get basin from master site dataframe
codes = basins.groupby('Basin name').first()['Basin code']
basin_code = codes[basin_name]
sites['Basin name'] = basin_name
sites['Basin code'] = basin_code
# Save variable definitions
variables = pd.DataFrame(
[v.split(' - ', 1) for v in meta['Variables:']])
variables.columns = ['Code', 'Description']
variables['Code'] = variables['Code'].astype(int)
variables = variables.set_index('Code')
# Save quality definitions
qualities = pd.DataFrame(
[q.split(' - ', 1) for q in meta['Qualities:']])
qualities.columns = ['Code', 'Description']
qualities['Code'] = qualities['Code'].astype(int)
qualities = qualities.set_index('Code')
# Update existing values
csv_name_s = os.path.join(output_dir, 'sites.csv')
csv_name_v = os.path.join(output_dir, 'variables.csv')
csv_name_q = os.path.join(output_dir, 'qualities.csv')
try:
sites = sites.append(pd.read_csv(csv_name_s, index_col=0))
sites = sites[~sites.index.duplicated(keep='first')]
sites = sites.sort_index()
except FileNotFoundError:
pass
try:
variables = variables.append(pd.read_csv(csv_name_v, index_col=0))
variables = variables[~variables.index.duplicated(keep='first')]
variables = variables.sort_index()
except FileNotFoundError:
pass
try:
qualities = qualities.append(pd.read_csv(csv_name_q, index_col=0))
qualities = qualities[~qualities.index.duplicated(keep='first')]
qualities = qualities.sort_index()
except FileNotFoundError:
pass
# Export updated tables
sites.to_csv(csv_name_s)
variables.to_csv(csv_name_v)
qualities.to_csv(csv_name_q)
sites = sites[~sites.index.duplicated(keep='first')]
return sites
def extract_records(basin_name, download_dir, clean_up=False):
"""Extract downloaded bore records.
Args:
basin_name: basin name (string)
download_dir: path to downloaded zip archives
clean_up: delete original zip archive after extracting it
"""
# Update definition tables
sites = extract_definitions(basin_name, download_dir)
# Keep unique basin codes
basin_codes = sites['Basin code'].unique()
# Find zip files
output_dir = os.path.join(download_dir, basin_name)
zip_names = [f for f in os.listdir(output_dir) if f.endswith('.zip')]
# Prepare output directory
os.makedirs(output_dir, exist_ok=True)
# Create master dataframe
periods = ['all', 'daily', 'weekly']
master = {}
for basin_code in basin_codes:
master[basin_code] = {}
for period in periods:
master[basin_code][period] = pd.DataFrame()
for zip_name in tqdm(zip_names):
# Skip duplicate downloads
if re.search(r'\([0-9]+\)', zip_name):
continue
# Rename '.part' file if zip was not correctly downloaded
if os.path.getsize(os.path.join(output_dir, zip_name)) == 0:
shutil.move(
os.path.join(output_dir, zip_name) + '.part',
os.path.join(output_dir, zip_name))
# Read header
header = pd.read_csv(
os.path.join(output_dir, zip_name), compression='zip', nrows=3)
# Remove comments
header = header.iloc[:, 1:-1].T
# Apply product codes to all columns
header.iloc[1::2, 0] = header.iloc[::2, 0].values
header[0] = header[0].astype(float).astype(int).astype(str)
# Move quality label
header.iloc[1::2, 1] = header.iloc[1::2, 2]
# Combine labels
columns = [' '.join(c) for c in header.iloc[:, :-1].values]
# Read csv file inside zip archive
df = pd.read_csv(
os.path.join(output_dir, zip_name),
header=2,
skiprows=[3],
parse_dates=['Date'],
index_col=['Date'],
compression='zip',
dayfirst=True)
# Convert quality codes to integers
for col in df.columns:
if 'Quality' in col:
df[col] = df[col].astype(int)
# Update column names
df.columns = columns + ['Metadata']
# Get bore specifics
meta = df['Metadata'].iloc[1]
bore_id = re.search(r'^\S+', meta).group()
site, hole, pipe = bore_id.split('.')
df = df.drop(columns='Metadata')
# Get basin ID
basin_code = sites.loc[bore_id, 'Basin code']
# Make copy of original dataframe
df_all = df.copy()
# Get quality columns
q_idx = ['Quality' in col for col in df.columns]
# Resample if necessary
for period in periods:
if period == 'daily':
# Resample to daily timestamps
df = df_all.resample('1d').mean()
# Get first quality code for each period, as mean doesn't work
q_val = df_all.loc[:, q_idx].resample('1d').first()
df.loc[:, q_idx] = q_val
elif period == 'weekly':
# Resample to weekly timestamps
df = df_all.resample('1w').mean()
# Get first quality code for each period, as mean doesn't work
q_val = df_all.loc[:, q_idx].resample('1w').first()
df.loc[:, q_idx] = q_val
# Add specific borehole details
df['Site'] = sites.loc[bore_id, 'Site']
df['Hole'] = sites.loc[bore_id, 'Hole']
df['Pipe'] = sites.loc[bore_id, 'Pipe']
df['Basin'] = sites.loc[bore_id, 'Basin code']
df = df[['Site', 'Hole', 'Pipe', 'Basin'] + columns]
# Remove empty rows
df = df.dropna()
# Add to master dataframe
master[basin_code][period] = pd.concat(
[master[basin_code][period], df])
if clean_up:
# Remove original zip archive
os.remove(os.path.join(output_dir, zip_name))
for basin_code in basin_codes:
for period in periods:
# Ignore empty dataframes
if len(master[basin_code][period]) == 0:
continue
# Get latest date from dataframe
latest_date = master[basin_code][period].index[-1].strftime(
'%Y-%m-%d')
csv_name = os.path.join(
output_dir, '{}-{}-{}.csv'.format(basin_code, latest_date,
period))
# Export to csv
master[basin_code][period].to_csv(
csv_name, index=True, float_format='%0.3f')