You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

160 lines
5.0 KiB
Python

import os
import re
import requests
import warnings
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
def has_admin():
"""Check if current user has admin rights.
https://stackoverflow.com/questions/2946746
"""
if os.name == 'nt':
try:
# Check if C:/Windows/temp is readable for current user
os.listdir(os.path.join(os.environ.get('systemroot'), 'temp'))
except PermissionError:
return False
else:
return True
else:
if 'SUDO_USER' in os.environ and os.geteuid() == 0:
return True
else:
return False
def return_to_master_frame(driver):
driver.switch_to.default_content()
webhyd = driver.find_element_by_id('webhyd')
driver.switch_to.frame(webhyd)
def wait_for_element(driver, by, x, timeout=10):
try:
element_present = EC.presence_of_element_located((by, x))
WebDriverWait(driver, timeout).until(element_present)
except TimeoutException:
print("Timed out waiting for page to load")
def get_telemetered_bore(driver, bore_id, start_date, end_date):
driver.switch_to.default_content()
webhyd = driver.find_element_by_id('webhyd')
driver.switch_to.frame(webhyd)
# Load site specific page
driver.execute_script("go('{}','gw', 1)".format(bore_id))
# Wait for results frame to load
WebDriverWait(
driver, timeout=20).until(
EC.frame_to_be_available_and_switch_to_it('gwgwlf_org'))
wait_for_element(driver, By.XPATH, '//*[@id="tabstext"]')
# Activate outputs tab, and wait for 'Get Output' button
driver.execute_script("menuloc.display_frame('gw','gwcf_org','1')")
driver.switch_to.parent_frame()
WebDriverWait(
driver, timeout=20).until(
EC.frame_to_be_available_and_switch_to_it('gwgwcf_org'))
wait_for_element(driver, By.ID, 'submit')
# Get output select controls
selects = driver.find_elements_by_xpath('//*/select')
for select in selects:
s = Select(select)
label = s.options[0].get_attribute('label')
if label == 'All data':
period = s
elif label == 'Plot':
output = s
elif label == 'Annual':
interval = s
# Change period dropdown to 'Custom'
period.select_by_visible_text('Custom')
# Get date input fields
fields = driver.find_elements_by_xpath('//*[starts-with(@id,"cdate")]')
# Parse dates
start_date = pd.to_datetime(start_date)
end_date = pd.to_datetime(end_date)
# Update fields with specified dates
for field, date in zip(fields, [start_date, end_date]):
field.clear()
field.send_keys(pd.datetime.strftime(date, '%H:%M_%d/%m/%Y'))
# Set output dropdown to 'Download'
output.select_by_visible_text('Download')
# Set interval dropdown to 'All points'
interval.select_by_visible_text('All points')
# Make sure 'Groundwater Level - AHD' is selected as an output
checkbox = driver.find_element_by_xpath(
'//*/input[contains(@name, "sel__110.00_115.00")]')
if not checkbox.get_attribute('selected'):
checkbox.click()
# Download data
driver.execute_script("get_output()")
driver.execute_script("hide_object('confirm');co(level,tab,1)")
# Close popup
wait_for_element(driver, By.XPATH,
"//div[contains(@class, 'lity-container')]")
webdriver.ActionChains(driver).send_keys(Keys.ESCAPE).perform()
def open_browser(download_dir):
# Warn if user does not have admin privileges
if not has_admin():
warnings.warn('This program should be run as an administrator.')
profile = webdriver.FirefoxProfile()
profile.set_preference('browser.download.folderList', 2)
profile.set_preference('browser.download.manager.showWhenStarting', False)
profile.set_preference('browser.download.dir', download_dir)
profile.set_preference('browser.helperApps.neverAsk.saveToDisk',
('application/zip,'
'application/octet-stream,'
'application/x-zip-compressed,'
'multipart/x-zip'))
# Open browser
driver = webdriver.Firefox(firefox_profile=profile)
url = 'https://realtimedata.waternsw.com.au/water.stm'
driver.get(url)
return driver
def main():
driver = open_browser()
bore_ids = [
'GW036367.1.1',
'GW036211.1.1',
'GW036588.4.4',
'GW036572.1.1',
]
start_date = '2018-01-01'
end_date = '2018-06-01'
# Download bore logs
for bore_id in bore_ids:
print('Downloading {}...'.format(bore_id))
get_telemetered_bore(driver, bore_id, start_date, end_date)
driver.quit()