"""waternsw_grabber.py Download bore records from the WaterNSW data portal. """ import os import re import time import shutil import logging import warnings import requests import pandas as pd from tqdm import tqdm from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support.ui import WebDriverWait, Select from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import ( TimeoutException, StaleElementReferenceException, NoSuchElementException) def has_admin(): """Check if current user has admin rights. https://stackoverflow.com/questions/2946746 """ if os.name == 'nt': try: # Check if C:/Windows/temp is readable for current user os.listdir(os.path.join(os.environ.get('systemroot'), 'temp')) except PermissionError: return False else: return True else: if 'SUDO_USER' in os.environ and os.geteuid() == 0: return True else: return False def wait_for_element(driver, by, x, timeout=180): """Wait for element on page to load. Args: driver: selenium webdriver object by: locator strategy (e.g. By.ID) x: locator string timeout: maximum wait time (seconds) Raises TimeoutException if element does not load within timeout period """ try: element_present = EC.presence_of_element_located((by, x)) WebDriverWait(driver, timeout).until(element_present) except TimeoutError: raise TimeoutError( 'Request timed out on {}. Try again later?'.format(bore_id)) def get_telemetered_bore(driver, bore_id, start_date, end_date): """Download single record from telemetered bore. Args: driver: selenium webdriver object bore_id: bore ID (string) start_date: start date (string in YYYY-MM-DD format) end_date: end date (string in YYYY-MM-DD format) """ url = 'https://realtimedata.waternsw.com.au/water.stm' driver.get(url) driver.switch_to.default_content() webhyd = driver.find_element_by_id('webhyd') driver.switch_to.frame(webhyd) # Load site specific page driver.execute_script("go('{}','gw', 1)".format(bore_id)) # Wait for results frame to load wait_for_element(driver, By.ID, 'gwgwlf_org') driver.switch_to.frame('gwgwlf_org') # Wait until body text of iframe has loaded body_text = None while not body_text: try: # Get contents of body text body_text = driver.find_element_by_xpath('//*/body').text except (StaleElementReferenceException, NoSuchElementException): pass time.sleep(0.5) # Detect if bore record does not exist if 'No SITE record found for site' in body_text: raise ValueError('No SITE record found for site {}'.format(bore_id)) elif 'No variables data found for this site.' in body_text: raise ValueError('No variables data found for site {}'.format(bore_id)) # Wait for navigation tabs wait_for_element(driver, By.XPATH, '//*[@id="tabstext"]') # Activate outputs tab, and wait for 'Get Output' button driver.execute_script("menuloc.display_frame('gw', 'gwcf_org', '1')") driver.switch_to.parent_frame() wait_for_element(driver, By.ID, 'gwgwcf_org') driver.switch_to.frame('gwgwcf_org') wait_for_element(driver, By.ID, 'submit') # Get output select controls selects = driver.find_elements_by_xpath('//*/select') for select in selects: s = Select(select) label = s.options[0].get_attribute('label') if label == 'All data': period = s elif label == 'Plot': output = s elif label == 'Annual': interval = s # Change period dropdown to 'Custom' period.select_by_visible_text('Custom') # Get date input fields fields = driver.find_elements_by_xpath('//*[starts-with(@id,"cdate")]') # Parse dates start_date = pd.to_datetime(start_date) end_date = pd.to_datetime(end_date) # Update fields with specified dates for field, date in zip(fields, [start_date, end_date]): field.clear() field.send_keys(pd.datetime.strftime(date, '%H:%M_%d/%m/%Y')) # Set output dropdown to 'Download' output.select_by_visible_text('Download') # Set interval dropdown to 'All points' interval.select_by_visible_text('All points') # Make sure 'Groundwater Level - AHD' is selected as an output try: checkbox = driver.find_element_by_xpath( '//*/input[contains(@name, "sel__110.00_115.00")]') if not checkbox.get_attribute('selected'): checkbox.click() except NoSuchElementException: pass # Download data driver.execute_script("get_output()") driver.execute_script("hide_object('confirm');co(level,tab,1)") # Close popup wait_for_element( driver, By.XPATH, "//div[contains(@class, 'lity-container')]", timeout=60) webdriver.ActionChains(driver).send_keys(Keys.ESCAPE).perform() def open_browser(download_dir): """Opens an automated Firefox browser instance. Args: download_dir: path to where downloaded files will be saved Returns: A selenium web browser object """ # Make download directory absolute download_dir = os.path.abspath(download_dir) # Set up Firefox to silently download files to specified folder profile = webdriver.FirefoxProfile() profile.set_preference('browser.download.folderList', 2) profile.set_preference('browser.download.manager.showWhenStarting', False) profile.set_preference('browser.download.dir', download_dir) profile.set_preference('browser.helperApps.neverAsk.saveToDisk', ('application/zip,' 'application/octet-stream,' 'application/x-zip-compressed,' 'multipart/x-zip')) # Create download folder if it does not exist os.makedirs(download_dir, exist_ok=True) # Open browser driver = webdriver.Firefox(firefox_profile=profile) return driver def telemetered_bore_downloader(bore_ids, start_date, end_date, download_dir): """Download multiple records from telemetered bore. Args: bore_ids: bore ID values (array-like) start_date: start date (string YYYY-MM-DD format) end_date: end date (string YYYY-MM-DD format) download_dir: path to where downloaded files will be saved Raises: ValueError when bore ID is invalid """ driver = open_browser(download_dir) # Set up log File log_name = os.path.join(download_dir, 'errors.log') logging.basicConfig(filename=log_name, level=logging.ERROR) # Download bore logs pbar = tqdm(bore_ids) for bore_id in pbar: pbar.set_description(bore_id) try: get_telemetered_bore(driver, bore_id, start_date, end_date) except (ValueError, TimeoutError) as e: logging.error(e) # Tidy up console after tqdm print('\n') # Stop logging logging.shutdown() with open(log_name, 'r') as f: log_data = f.read() # Check contents of log file if log_data: warnings.warn( 'Some files failed to download. See log for details.', stacklevel=2) else: os.remove(log_name) driver.quit() def extract_records(input_dir, output_dir, clean_up=False): """Extract downloaded bore records. Args: input_dir: path to downloaded zip archives output_dir: path to save csv files clean_up: delete original zip archive after extracting it """ # Find zip files zip_names = [f for f in os.listdir(input_dir) if f.endswith('.zip')] # Prepare output directory os.makedirs(output_dir, exist_ok=True) # Create master dataframe periods = ['all', 'weekly'] master = {} for period in periods: master[period] = pd.DataFrame() for zip_name in tqdm(zip_names): # Skip duplicate downloads if re.search('\([0-9]+\)', zip_name): continue # Rename '.part' file if zip was not correctly downloaded if os.path.getsize(os.path.join(input_dir, zip_name)) == 0: shutil.move( os.path.join(input_dir, zip_name) + '.part', os.path.join(input_dir, zip_name)) # Read csv file inside zip archive df = pd.read_csv( os.path.join(input_dir, zip_name), header=2, skiprows=[3], parse_dates=['Date'], compression='zip', dayfirst=True) # Get bore specifics meta = df.iloc[1, -1] lat = float(re.search('(?<=Lat:)\S+', meta).group()) lon = float(re.search('(?<=Long:)\S+', meta).group()) elev = float(re.search('(?<=Elev:).+(?=m)', meta).group()) address = re.search('(?<=\d\.\d\.\d - ).+(?=\sLat)', meta).group() bore_id = re.search('^\S+', meta).group() site, hole, pipe = bore_id.split('.') # FIXME: detect basin automatically basin_id = 'MB' # Rename columns df = df.rename( columns={ 'Date': 'Date time', 'Bore level below MP': 'Below Measuring Point', 'GW Level - m AHD': 'Above Sea Level' }) # Select output columns df = df[[ 'Date time', 'Below Measuring Point', 'Above Sea Level', ]] # Set date index for resampling df.index = df['Date time'] # Append to master dataframe for period in periods: if period == 'weekly': # Resample to weekly timestamps df = df.resample('1w').mean() df['Date time'] = df.index # Add bore specifics to dataframe df['Site'] = site df['Hole'] = hole df['Pipe'] = pipe df['Lat'] = lat df['Lon'] = lon df['Elev'] = elev df['Basin'] = basin_id master[period] = pd.concat([master[period], df]) if clean_up: # Remove original zip archive os.remove(os.path.join(input_dir, zip_name)) for period in periods: # Set column order master[period] = master[period][[ 'Date time', 'Basin', 'Site', 'Hole', 'Pipe', 'Below Measuring Point', 'Above Sea Level', 'Lat', 'Lon', 'Elev' ]] # Get latest date from dataframe latest_date = master[period]['Date time'].iloc[-1].strftime('%Y-%m-%d') csv_name = os.path.join( output_dir, '{}-{}-{}.csv'.format(basin_id, latest_date, period)) # Export to csv master[period].to_csv(csv_name, index=False, float_format='%0.3f')