You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
608 lines
19 KiB
Python
608 lines
19 KiB
Python
"""waternsw_grabber.py
|
|
Download bore records from the WaterNSW data portal.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import time
|
|
import shutil
|
|
import logging
|
|
import warnings
|
|
import requests
|
|
import numpy as np
|
|
import pandas as pd
|
|
from tqdm import tqdm
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.common.keys import Keys
|
|
from selenium.webdriver.support.ui import WebDriverWait, Select
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import (
|
|
TimeoutException, StaleElementReferenceException, NoSuchElementException)
|
|
|
|
|
|
def has_admin():
|
|
"""Check if current user has admin rights.
|
|
https://stackoverflow.com/questions/2946746
|
|
"""
|
|
if os.name == 'nt':
|
|
try:
|
|
# Check if C:/Windows/temp is readable for current user
|
|
os.listdir(os.path.join(os.environ.get('systemroot'), 'temp'))
|
|
except PermissionError:
|
|
return False
|
|
else:
|
|
return True
|
|
else:
|
|
if 'SUDO_USER' in os.environ and os.geteuid() == 0:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def wait_for_element(driver, by, x, timeout=180):
|
|
"""Wait for element on page to load.
|
|
|
|
Args:
|
|
driver: selenium webdriver object
|
|
by: locator strategy (e.g. By.ID)
|
|
x: locator string
|
|
timeout: maximum wait time (seconds)
|
|
|
|
Raises
|
|
TimeoutException if element does not load within timeout period
|
|
"""
|
|
element_present = EC.presence_of_element_located((by, x))
|
|
WebDriverWait(driver, timeout).until(element_present)
|
|
|
|
|
|
def wait_for_body_text(driver):
|
|
"""Wait for body text element on page to load, and not be empty.
|
|
|
|
Args:
|
|
driver: selenium webdriver object
|
|
|
|
Returns
|
|
Body text
|
|
|
|
Raises
|
|
TimeoutException if element does not load within timeout period
|
|
"""
|
|
body_text = None
|
|
while not body_text:
|
|
try:
|
|
# Get contents of body text
|
|
body_text = driver.find_element_by_xpath('//*/body').text
|
|
except (StaleElementReferenceException, NoSuchElementException):
|
|
pass
|
|
time.sleep(0.5)
|
|
|
|
return body_text
|
|
|
|
|
|
def get_telemetered_bore(driver, bore_id, start_date, end_date):
|
|
"""Download single record from telemetered bore.
|
|
|
|
Args:
|
|
driver: selenium webdriver object
|
|
bore_id: bore ID (string)
|
|
start_date: start date (string in YYYY-MM-DD format)
|
|
end_date: end date (string in YYYY-MM-DD format)
|
|
"""
|
|
|
|
url = 'https://realtimedata.waternsw.com.au/water.stm'
|
|
driver.get(url)
|
|
|
|
driver.switch_to.default_content()
|
|
webhyd = driver.find_element_by_id('webhyd')
|
|
driver.switch_to.frame(webhyd)
|
|
|
|
# Load site specific page
|
|
driver.execute_script("go('{}','gw', 1)".format(bore_id))
|
|
|
|
# Wait for results frame to load
|
|
wait_for_element(driver, By.ID, 'gwgwlf_org')
|
|
driver.switch_to.frame('gwgwlf_org')
|
|
|
|
# Wait until body text of iframe has loaded
|
|
body_text = wait_for_body_text(driver)
|
|
|
|
# Detect if bore record does not exist
|
|
if 'No SITE record found for site' in body_text:
|
|
raise ValueError('No SITE record found for site {}'.format(bore_id))
|
|
|
|
# Wait for navigation tabs
|
|
wait_for_element(driver, By.XPATH, '//*[@id="tabstext"]')
|
|
|
|
# Activate outputs tab
|
|
driver.execute_script("menuloc.display_frame('gw', 'gwcf_org', '1')")
|
|
driver.switch_to.parent_frame()
|
|
wait_for_element(driver, By.ID, 'gwgwcf_org')
|
|
driver.switch_to.frame('gwgwcf_org')
|
|
|
|
# Wait until body text of iframe has loaded
|
|
body_text = wait_for_body_text(driver)
|
|
|
|
# Detect if no variables are available
|
|
if 'No variables data found for this site.' in body_text:
|
|
raise ValueError('No variables data found for site {}'.format(bore_id))
|
|
|
|
# Wait for 'Get Output' button
|
|
wait_for_element(driver, By.ID, 'submit')
|
|
|
|
# Get output select controls
|
|
selects = driver.find_elements_by_xpath('//*/select')
|
|
for select in selects:
|
|
s = Select(select)
|
|
label = s.options[0].get_attribute('label')
|
|
if label == 'All data':
|
|
period = s
|
|
elif label == 'Plot':
|
|
output = s
|
|
elif label == 'Annual':
|
|
interval = s
|
|
|
|
# Change period dropdown to 'Custom'
|
|
period.select_by_visible_text('Custom')
|
|
|
|
# Get date input fields
|
|
fields = driver.find_elements_by_xpath('//*[starts-with(@id,"cdate")]')
|
|
|
|
# Get available date ranges
|
|
datestr = driver.find_elements_by_xpath('//*/tr/td[4]/span')
|
|
dates = np.array([d.text.split(' to ') for d in datestr])
|
|
|
|
if start_date is not None:
|
|
start_date = pd.to_datetime(start_date)
|
|
else:
|
|
# Get date from page, if not provided
|
|
start_date = pd.to_datetime(dates[:, 0], dayfirst=True).min()
|
|
|
|
if end_date is not None:
|
|
end_date = pd.to_datetime(end_date)
|
|
else:
|
|
# Get date from page, if not provided
|
|
end_date = pd.to_datetime(dates[:, 1], dayfirst=True).max()
|
|
|
|
# Update fields with specified dates
|
|
for field, date in zip(fields, [start_date, end_date]):
|
|
field.clear()
|
|
field.send_keys(pd.datetime.strftime(date, '%H:%M_%d/%m/%Y'))
|
|
|
|
# Set output dropdown to 'Download'
|
|
output.select_by_visible_text('Download')
|
|
|
|
# Set interval dropdown to 'All points'
|
|
interval.select_by_visible_text('All points')
|
|
|
|
# Make sure 'Groundwater Level - AHD' is selected as an output
|
|
try:
|
|
checkbox = driver.find_element_by_xpath(
|
|
'//*/input[contains(@name, "sel__110.00_115.00")]')
|
|
if not checkbox.get_attribute('selected'):
|
|
checkbox.click()
|
|
except NoSuchElementException:
|
|
pass
|
|
|
|
# Download data
|
|
driver.execute_script("get_output()")
|
|
driver.execute_script("hide_object('confirm');co(level,tab,1)")
|
|
|
|
# Close popup
|
|
wait_for_element(
|
|
driver,
|
|
By.XPATH,
|
|
"//div[contains(@class, 'lity-container')]",
|
|
timeout=60)
|
|
webdriver.ActionChains(driver).send_keys(Keys.ESCAPE).perform()
|
|
|
|
|
|
def open_browser(download_dir):
|
|
"""Opens an automated Firefox browser instance.
|
|
|
|
Args:
|
|
download_dir: path to where downloaded files will be saved
|
|
|
|
Returns:
|
|
A selenium web browser object
|
|
"""
|
|
|
|
# Make download directory absolute
|
|
download_dir = os.path.abspath(download_dir)
|
|
|
|
# Set up Firefox to silently download files to specified folder
|
|
profile = webdriver.FirefoxProfile()
|
|
profile.set_preference('browser.download.folderList', 2)
|
|
profile.set_preference('browser.download.manager.showWhenStarting', False)
|
|
profile.set_preference('browser.download.dir', download_dir)
|
|
profile.set_preference('browser.helperApps.neverAsk.saveToDisk',
|
|
('application/zip,'
|
|
'application/octet-stream,'
|
|
'application/x-zip-compressed,'
|
|
'multipart/x-zip'))
|
|
|
|
# Create download folder if it does not exist
|
|
os.makedirs(download_dir, exist_ok=True)
|
|
|
|
# Open browser
|
|
driver = webdriver.Firefox(firefox_profile=profile)
|
|
|
|
return driver
|
|
|
|
|
|
def telemetered_bore_downloader(basin_name,
|
|
download_dir=None,
|
|
start_date=None,
|
|
end_date=None):
|
|
"""Download multiple records from telemetered bore.
|
|
|
|
Args:
|
|
basin_name: basin name (string)
|
|
download_dir: path to where downloaded files will be saved
|
|
start_date: start date (string YYYY-MM-DD format)
|
|
end_date: end date (string YYYY-MM-DD format)
|
|
|
|
Raises:
|
|
ValueError when bore ID is invalid
|
|
"""
|
|
|
|
# Get full name of basin
|
|
basins = get_basins()
|
|
|
|
# Check if full basin name was provided
|
|
basin_names = basins['Basin name'].unique()
|
|
if basin_name not in basin_names:
|
|
raise ValueError("'basin_name' must be one of:\n" + '\n'.join(basin_names))
|
|
|
|
# Get list of bore IDs from selected basin
|
|
bore_ids = basins[basins['Basin name'] == basin_name].index.values
|
|
|
|
# Add basin name to root download directory
|
|
if not download_dir:
|
|
download_dir = basin_name
|
|
else:
|
|
download_dir = os.path.join(download_dir, basin_name)
|
|
|
|
# Open browser
|
|
driver = open_browser(download_dir)
|
|
|
|
# Set up log File
|
|
log_name = os.path.join(download_dir, 'errors.log')
|
|
logging.basicConfig(filename=log_name, level=logging.ERROR)
|
|
|
|
# Download bore logs
|
|
pbar = tqdm(bore_ids)
|
|
for bore_id in pbar:
|
|
pbar.set_description('Downloading {}'.format(bore_id))
|
|
try:
|
|
get_telemetered_bore(driver, bore_id, start_date, end_date)
|
|
except ValueError as e:
|
|
logging.error(e)
|
|
except TimeoutException:
|
|
e = 'Request timed out on {}. Try again later?'.format(bore_id)
|
|
logging.error(e)
|
|
|
|
# Tidy up console after tqdm
|
|
print('\n')
|
|
|
|
# Stop logging
|
|
logging.shutdown()
|
|
|
|
if os.path.isfile(log_name):
|
|
with open(log_name, 'r') as f:
|
|
log_data = f.read()
|
|
|
|
# Check contents of log file
|
|
if log_data:
|
|
warnings.warn(
|
|
'Some files failed to download. See log for details.',
|
|
stacklevel=2)
|
|
else:
|
|
os.remove(log_name)
|
|
|
|
# Wait for downloads to finish
|
|
time.sleep(10)
|
|
|
|
# Close browser
|
|
driver.quit()
|
|
|
|
|
|
def get_basins():
|
|
"""Load basin definitions file."""
|
|
|
|
# Get basin info for telemetered site data
|
|
csv_name = os.path.join(
|
|
os.path.dirname(__file__), 'data', 'telemetered-sites.csv')
|
|
basins = pd.read_csv(csv_name, index_col=0)
|
|
|
|
return basins
|
|
|
|
|
|
def extract_definitions(basin_name, download_dir=None):
|
|
"""Extract variable and quality metadata from bore records.
|
|
|
|
Args:
|
|
basin_name: basin name (string)
|
|
download_dir: path to downloaded zip archives
|
|
"""
|
|
|
|
# Get basin info for telemetered site data
|
|
basins = get_basins()
|
|
|
|
# Check if download directory was provided
|
|
if not download_dir:
|
|
output_dir = basin_name
|
|
else:
|
|
output_dir = os.path.join(download_dir, basin_name)
|
|
|
|
# Prepare output directory
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Find zip files
|
|
zip_names = [f for f in os.listdir(output_dir) if f.endswith('.zip')]
|
|
|
|
if not zip_names:
|
|
raise ValueError('No zip files found')
|
|
|
|
for zip_name in zip_names:
|
|
# Skip duplicate downloads
|
|
if re.search(r'\([0-9]+\)', zip_name):
|
|
continue
|
|
|
|
# Rename '.part' file if zip was not correctly downloaded
|
|
if os.path.getsize(os.path.join(output_dir, zip_name)) == 0:
|
|
shutil.move(
|
|
os.path.join(output_dir, zip_name) + '.part',
|
|
os.path.join(output_dir, zip_name))
|
|
|
|
# Read csv file inside zip archive
|
|
df = pd.read_csv(
|
|
os.path.join(output_dir, zip_name),
|
|
header=2,
|
|
skiprows=[3],
|
|
parse_dates=['Date'],
|
|
compression='zip',
|
|
dayfirst=True,
|
|
nrows=100)
|
|
|
|
# Extract metadata from last column
|
|
keys = ['Sites:', 'Variables:', 'Qualities:']
|
|
meta = {k: [] for k in keys}
|
|
for i, row in df.iterrows():
|
|
line = row.values[-1]
|
|
if line in keys:
|
|
header = True
|
|
var = line
|
|
elif line == ' ':
|
|
continue
|
|
else:
|
|
meta[var].append(line)
|
|
|
|
# Get bore specifics
|
|
site_data = meta['Sites:'][0]
|
|
lat = float(re.search(r'(?<=Lat:)\S+', site_data).group())
|
|
lon = float(re.search(r'(?<=Long:)\S+', site_data).group())
|
|
try:
|
|
elev = float(re.search(r'(?<=Elev:).+(?=m)', site_data).group())
|
|
except AttributeError:
|
|
elev = np.nan
|
|
address = re.search(r'(?<=\d\.\d\.\d - ).+(?=\sLat)',
|
|
site_data).group()
|
|
bore_id = re.search(r'^\S+', site_data).group()
|
|
site, hole, pipe = bore_id.split('.')
|
|
|
|
sites = pd.DataFrame()
|
|
sites['ID'] = [bore_id]
|
|
sites['Site'] = [site]
|
|
sites['Hole'] = [hole]
|
|
sites['Pipe'] = [pipe]
|
|
sites['Lat'] = [lat]
|
|
sites['Lon'] = [lon]
|
|
sites['Elev'] = [elev]
|
|
sites['Address'] = [address]
|
|
sites = sites.set_index('ID')
|
|
|
|
# Get basin from master site dataframe
|
|
codes = basins.groupby('Basin name').first()['Basin code']
|
|
basin_code = codes[basin_name]
|
|
sites['Basin name'] = basin_name
|
|
sites['Basin code'] = basin_code
|
|
|
|
# Save variable definitions
|
|
variables = pd.DataFrame(
|
|
[v.split(' - ', 1) for v in meta['Variables:']])
|
|
variables.columns = ['Code', 'Description']
|
|
variables['Code'] = variables['Code'].astype(int)
|
|
variables = variables.set_index('Code')
|
|
|
|
# Save quality definitions
|
|
qualities = pd.DataFrame(
|
|
[q.split(' - ', 1) for q in meta['Qualities:']])
|
|
qualities.columns = ['Code', 'Description']
|
|
qualities['Code'] = qualities['Code'].astype(int)
|
|
qualities = qualities.set_index('Code')
|
|
|
|
# Update existing values
|
|
csv_name_s = os.path.join(output_dir, 'sites.csv')
|
|
csv_name_v = os.path.join(output_dir, 'variables.csv')
|
|
csv_name_q = os.path.join(output_dir, 'qualities.csv')
|
|
|
|
try:
|
|
sites = sites.append(pd.read_csv(csv_name_s, index_col=0))
|
|
sites = sites[~sites.index.duplicated(keep='first')]
|
|
sites = sites.sort_index()
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
try:
|
|
variables = variables.append(pd.read_csv(csv_name_v, index_col=0))
|
|
variables = variables[~variables.index.duplicated(keep='first')]
|
|
variables = variables.sort_index()
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
try:
|
|
qualities = qualities.append(pd.read_csv(csv_name_q, index_col=0))
|
|
qualities = qualities[~qualities.index.duplicated(keep='first')]
|
|
qualities = qualities.sort_index()
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
# Export updated tables
|
|
sites.to_csv(csv_name_s)
|
|
variables.to_csv(csv_name_v)
|
|
qualities.to_csv(csv_name_q)
|
|
|
|
sites = sites[~sites.index.duplicated(keep='first')]
|
|
return sites
|
|
|
|
|
|
def extract_records(basin_name, download_dir=None, clean_up=False):
|
|
"""Extract downloaded bore records.
|
|
|
|
Args:
|
|
basin_name: basin name (string)
|
|
download_dir: path to downloaded zip archives
|
|
clean_up: delete original zip archive after extracting it
|
|
"""
|
|
|
|
# Update definition tables
|
|
sites = extract_definitions(basin_name, download_dir)
|
|
|
|
# Keep unique basin codes
|
|
basin_codes = sites['Basin code'].unique()
|
|
|
|
# Check if download directory was provided
|
|
if not download_dir:
|
|
output_dir = basin_name
|
|
else:
|
|
output_dir = os.path.join(download_dir, basin_name)
|
|
|
|
# List zip files
|
|
zip_names = [f for f in os.listdir(output_dir) if f.endswith('.zip')]
|
|
|
|
# Prepare output directory
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Create master dataframe
|
|
periods = ['all', 'daily', 'weekly']
|
|
master = {}
|
|
for basin_code in basin_codes:
|
|
master[basin_code] = {}
|
|
for period in periods:
|
|
master[basin_code][period] = pd.DataFrame()
|
|
|
|
pbar = tqdm(zip_names)
|
|
for zip_name in pbar:
|
|
pbar.set_description('Extracting {}'.format(zip_name))
|
|
# Skip duplicate downloads
|
|
if re.search(r'\([0-9]+\)', zip_name):
|
|
continue
|
|
|
|
# Rename '.part' file if zip was not correctly downloaded
|
|
if os.path.getsize(os.path.join(output_dir, zip_name)) == 0:
|
|
shutil.move(
|
|
os.path.join(output_dir, zip_name) + '.part',
|
|
os.path.join(output_dir, zip_name))
|
|
|
|
# Read header
|
|
header = pd.read_csv(
|
|
os.path.join(output_dir, zip_name), compression='zip', nrows=3)
|
|
|
|
# Remove comments
|
|
header = header.iloc[:, 1:-1].T
|
|
|
|
# Apply product codes to all columns
|
|
header.iloc[1::2, 0] = header.iloc[::2, 0].values
|
|
header[0] = header[0].astype(float).astype(int).astype(str)
|
|
|
|
# Move quality label
|
|
header.iloc[1::2, 1] = header.iloc[1::2, 2]
|
|
|
|
# Combine labels
|
|
columns = [' '.join(c) for c in header.iloc[:, :-1].values]
|
|
|
|
# Read csv file inside zip archive
|
|
df = pd.read_csv(
|
|
os.path.join(output_dir, zip_name),
|
|
header=2,
|
|
skiprows=[3],
|
|
parse_dates=['Date'],
|
|
index_col=['Date'],
|
|
compression='zip',
|
|
dayfirst=True)
|
|
|
|
# Convert quality codes to integers
|
|
for col in df.columns:
|
|
if 'Quality' in col:
|
|
df[col] = df[col].astype(int)
|
|
|
|
# Update column names
|
|
df.columns = columns + ['Metadata']
|
|
|
|
# Get bore specifics
|
|
meta = df['Metadata'].iloc[1]
|
|
bore_id = re.search(r'^\S+', meta).group()
|
|
site, hole, pipe = bore_id.split('.')
|
|
df = df.drop(columns='Metadata')
|
|
|
|
# Get basin ID
|
|
basin_code = sites.loc[bore_id, 'Basin code']
|
|
|
|
# Make copy of original dataframe
|
|
df_all = df.copy()
|
|
|
|
# Get quality columns
|
|
q_idx = ['Quality' in col for col in df.columns]
|
|
|
|
# Resample if necessary
|
|
for period in periods:
|
|
if period == 'daily':
|
|
# Resample to daily timestamps
|
|
df = df_all.resample('1d').mean()
|
|
# Get first quality code for each period, as mean doesn't work
|
|
q_val = df_all.loc[:, q_idx].resample('1d').first()
|
|
df.loc[:, q_idx] = q_val
|
|
|
|
elif period == 'weekly':
|
|
# Resample to weekly timestamps
|
|
df = df_all.resample('1w').mean()
|
|
# Get first quality code for each period, as mean doesn't work
|
|
q_val = df_all.loc[:, q_idx].resample('1w').first()
|
|
df.loc[:, q_idx] = q_val
|
|
|
|
# Add specific borehole details
|
|
df['Site'] = sites.loc[bore_id, 'Site']
|
|
df['Hole'] = sites.loc[bore_id, 'Hole']
|
|
df['Pipe'] = sites.loc[bore_id, 'Pipe']
|
|
df['Basin'] = sites.loc[bore_id, 'Basin code']
|
|
df = df[['Site', 'Hole', 'Pipe', 'Basin'] + columns]
|
|
|
|
# Remove empty rows
|
|
df = df.dropna()
|
|
|
|
# Add to master dataframe
|
|
master[basin_code][period] = pd.concat(
|
|
[master[basin_code][period], df])
|
|
|
|
if clean_up:
|
|
# Remove original zip archive
|
|
os.remove(os.path.join(output_dir, zip_name))
|
|
|
|
for basin_code in basin_codes:
|
|
for period in periods:
|
|
# Ignore empty dataframes
|
|
if len(master[basin_code][period]) == 0:
|
|
continue
|
|
|
|
# Get latest date from dataframe
|
|
latest_date = master[basin_code][period].index[-1].strftime(
|
|
'%Y-%m-%d')
|
|
csv_name = os.path.join(
|
|
output_dir, '{}-{}-{}.csv'.format(basin_code, latest_date,
|
|
period))
|
|
|
|
# Export to csv
|
|
master[basin_code][period].to_csv(
|
|
csv_name, index=True, float_format='%0.3f')
|