You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

234 lines
7.1 KiB
Python

"""waternsw_grabber.py
Download bore records from the WaterNSW data portal.
"""
import os
import re
import time
import logging
import requests
import warnings
import pandas as pd
from tqdm import tqdm
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
TimeoutException, StaleElementReferenceException, NoSuchElementException)
def has_admin():
"""Check if current user has admin rights.
https://stackoverflow.com/questions/2946746
"""
if os.name == 'nt':
try:
# Check if C:/Windows/temp is readable for current user
os.listdir(os.path.join(os.environ.get('systemroot'), 'temp'))
except PermissionError:
return False
else:
return True
else:
if 'SUDO_USER' in os.environ and os.geteuid() == 0:
return True
else:
return False
def return_to_master_frame(driver):
driver.switch_to.default_content()
webhyd = driver.find_element_by_id('webhyd')
driver.switch_to.frame(webhyd)
def wait_for_element(driver, by, x, timeout=60):
try:
element_present = EC.presence_of_element_located((by, x))
WebDriverWait(driver, timeout).until(element_present)
except TimeoutException:
print("Timed out waiting for page to load")
def get_telemetered_bore(driver, bore_id, start_date, end_date):
"""Download single record from telemetered bore.
Args:
driver: selenium webdriver object
bore_id: bore ID (string)
start_date: start date (string in YYYY-MM-DD format)
end_date: end date (string in YYYY-MM-DD format)
"""
url = 'https://realtimedata.waternsw.com.au/water.stm'
driver.get(url)
driver.switch_to.default_content()
webhyd = driver.find_element_by_id('webhyd')
driver.switch_to.frame(webhyd)
# Load site specific page
driver.execute_script("go('{}','gw', 1)".format(bore_id))
# Wait for results frame to load
WebDriverWait(
driver, timeout=60).until(
EC.frame_to_be_available_and_switch_to_it('gwgwlf_org'))
# Wait until body text of iframe has loaded
body_text = None
while not body_text:
try:
# Get contents of body text
body_text = driver.find_element_by_xpath('//*/body').text
except (StaleElementReferenceException, NoSuchElementException):
pass
time.sleep(0.5)
# Detect if bore record does not exist
if body_text.startswith('No SITE record found for site'):
raise ValueError('No SITE record found for site {}'.format(bore_id))
# Wait for navigation tabs
wait_for_element(driver, By.XPATH, '//*[@id="tabstext"]')
# Activate outputs tab, and wait for 'Get Output' button
driver.execute_script("menuloc.display_frame('gw','gwcf_org','1')")
driver.switch_to.parent_frame()
WebDriverWait(
driver, timeout=60).until(
EC.frame_to_be_available_and_switch_to_it('gwgwcf_org'))
wait_for_element(driver, By.ID, 'submit')
# Get output select controls
selects = driver.find_elements_by_xpath('//*/select')
for select in selects:
s = Select(select)
label = s.options[0].get_attribute('label')
if label == 'All data':
period = s
elif label == 'Plot':
output = s
elif label == 'Annual':
interval = s
# Change period dropdown to 'Custom'
period.select_by_visible_text('Custom')
# Get date input fields
fields = driver.find_elements_by_xpath('//*[starts-with(@id,"cdate")]')
# Parse dates
start_date = pd.to_datetime(start_date)
end_date = pd.to_datetime(end_date)
# Update fields with specified dates
for field, date in zip(fields, [start_date, end_date]):
field.clear()
field.send_keys(pd.datetime.strftime(date, '%H:%M_%d/%m/%Y'))
# Set output dropdown to 'Download'
output.select_by_visible_text('Download')
# Set interval dropdown to 'All points'
interval.select_by_visible_text('All points')
# Make sure 'Groundwater Level - AHD' is selected as an output
checkbox = driver.find_element_by_xpath(
'//*/input[contains(@name, "sel__110.00_115.00")]')
if not checkbox.get_attribute('selected'):
checkbox.click()
# Download data
driver.execute_script("get_output()")
driver.execute_script("hide_object('confirm');co(level,tab,1)")
# Close popup
wait_for_element(
driver,
By.XPATH,
"//div[contains(@class, 'lity-container')]",
timeout=60)
webdriver.ActionChains(driver).send_keys(Keys.ESCAPE).perform()
def open_browser(download_dir):
"""Opens an automated Firefox browser instance.
Args:
download_dir: path to where downloaded files will be saved
Returns:
A selenium web browser object
"""
# Make download directory absolute
download_dir = os.path.abspath(download_dir)
# Set up Firefox to silently download files to specified folder
profile = webdriver.FirefoxProfile()
profile.set_preference('browser.download.folderList', 2)
profile.set_preference('browser.download.manager.showWhenStarting', False)
profile.set_preference('browser.download.dir', download_dir)
profile.set_preference('browser.helperApps.neverAsk.saveToDisk',
('application/zip,'
'application/octet-stream,'
'application/x-zip-compressed,'
'multipart/x-zip'))
# Create download folder if it does not exist
os.makedirs(download_dir, exist_ok=True)
# Open browser
driver = webdriver.Firefox(firefox_profile=profile)
return driver
def telemetered_bore_downloader(bore_ids, start_date, end_date, download_dir):
"""Download multiple records from telemetered bore.
Args:
bore_ids: bore ID values (array-like)
start_date: start date (string YYYY-MM-DD format)
end_date: end date (string YYYY-MM-DD format)
download_dir: path to where downloaded files will be saved
Raises:
ValueError when bore ID is invalid
"""
driver = open_browser(download_dir)
# Set up log File
log_name = os.path.join(download_dir, 'errors.log')
logging.basicConfig(filename=log_name, level=logging.ERROR)
# Download bore logs
pbar = tqdm(bore_ids)
for bore_id in pbar:
pbar.set_description(bore_id)
try:
get_telemetered_bore(driver, bore_id, start_date, end_date)
except ValueError:
logging.error('Failed to download {}\n'.format(bore_id))
# Tidy up console after tqdm
print('\n')
# Stop logging
logging.shutdown()
with open(log_name, 'r') as f:
log_data = f.read()
# Check contents of log file
if log_data:
warnings.warn(
'Some files failed to download. See log for details.',
stacklevel=2)
else:
os.remove(log_name)
driver.quit()