"""spectur_live_view Open an automated selenium webdriver, and download images from Live View mode. """ import os import glob import time import pytz import subprocess from io import BytesIO from datetime import datetime import requests import yaml import numpy as np from PIL import Image from selenium import webdriver from selenium.webdriver.firefox.options import Options from selenium.common.exceptions import (NoSuchElementException, StaleElementReferenceException, WebDriverException) CAM_ID = 0 URL = 'https://camera.uwatchit.com.au/UWatchitWebV2/User/LiveView#' OUTPUT_DIR = 'images' TIMEZONE = 'Australia/Sydney' WAIT_TIME = 0.1 RECORDING_DURATION = 20 SILENT = True username = '' password = '' # Get water level of next high tide pwd = os.path.dirname(os.path.abspath(__file__)) next_high_tide = glob.glob(pwd + '/NEXT_HIGH_TIDE=*')[0] height = next_high_tide.rsplit('_', 1)[-1] # Get current time now = datetime.now() year = now.strftime('%Y') timestamp = now.strftime('%Y-%m-%d_%H-%M') current_hightide_dir = os.path.join(OUTPUT_DIR, year, f'{timestamp}_{height}') def start_session(): """Start automated browser session. Returns: selenium webdriver object """ # Start webdriver options = Options() options.headless = True driver = webdriver.Firefox(options=options) return driver def login(driver): """Log into Spectur camera portal. Args: driver: selenium webdriver object Returns: cookies from current browser session """ driver.get(URL) # Get login fields usr = driver.find_element_by_id('UserName') pss = driver.find_element_by_id('Password') btn = driver.find_element_by_id('submitlogin') if (username and password): # Login automatically if details provided usr.send_keys(username) pss.send_keys(password) btn.click() else: # Try to get credientials saved locally try: credential_path = os.path.join(os.path.expanduser('~'), '.spectur') with open(credential_path, 'r') as f: credentials = yaml.safe_load(f.read()) # Input login details usr.send_keys(credentials['USERNAME']) pss.send_keys(credentials['PASSWORD']) btn.click() except FileNotFoundError: msg = f"""Provide spectur login details either: 1. In the header of this script; or 2. In the file {credential_path}. """ ValueError(msg) # Extract cookies session = requests.Session() for c in driver.get_cookies(): session.cookies.set(c['name'], c['value']) return session.cookies def return_to_live_view(driver): """Return to live camera feed in Spectur portal. """ driver.get(URL) driver.execute_script('ChangeCam({})'.format(CAM_ID)) def get_image_timestamp(driver): """Get timestamp from image link. Args: driver: selenium webdriver object Returns: UNIX timestamp, in seconds """ im_url = driver.find_element_by_id('camimage').get_property('src') timestamp = int(im_url.rsplit('&', 1)[-1]) / 1000 return timestamp def get_next_image(driver, cookies): """Get next image. Args: driver: selenium webdriver object cookies: cookies from current browser session """ # Get timestamp t_current = get_image_timestamp(driver) # Wait until image timestamp is updated while t_current == get_image_timestamp(driver): time.sleep(WAIT_TIME) # Get image URL im_url = driver.find_element_by_id('camimage').get_property('src') # Get jpeg data, and convert to image object page = requests.get(im_url, cookies=cookies) im = Image.open(BytesIO(page.content)) # Get EXIF timestamp datestr = im._getexif()[36867] # Convert to local time t_naive = datetime.strptime(datestr, '%Y:%m:%d %H:%M:%S') t_utc = t_naive.replace(tzinfo=pytz.utc) t_local = t_utc.astimezone(pytz.timezone(TIMEZONE)) # Save image jpg_name = os.path.join(current_hightide_dir, 'jpg', t_local.strftime('%Y-%m-%d_%H-%M-%S%z') + '.jpg') os.makedirs(os.path.dirname(jpg_name), exist_ok=True) im.save(jpg_name) # Get battery level with open(os.path.join(OUTPUT_DIR, 'battery.txt'), 'a') as f: battery_str = driver.find_element_by_id('battery').text f.write('{}\t{}\n'.format(t_local.isoformat(), battery_str[:-1])) # Update time counter current_time = float(time.mktime(t_local.timetuple())) if current_time not in t: t.append(current_time) # Check if recording has completed if t[-1] - t[0] > RECORDING_DURATION: raise (StopIteration) # Count images collected n = len(t) if (n > 2) and not SILENT: # Get frames per second fps = 1 / np.median(np.diff(t)) # Count dropped frames n_expected = fps * (t[-1] - t[0]) n_dropped = np.max([0, int(n_expected - n + 1)]) msg = '{} total images: {} fps: {} dropped frames: {}'.format( t_local.isoformat(), n, fps, n_dropped) print(msg, end='\r', flush=True) def get_images(): # Start browser session driver = start_session() cookies = login(driver) return_to_live_view(driver) while True: try: # Get latest image get_next_image(driver, cookies) except (NoSuchElementException, StaleElementReferenceException, WebDriverException): # Navigate to Live View return_to_live_view(driver) except ConnectionError: # Attempt to re-login if server goes down time.sleep(5) cookies = login(driver) except StopIteration: # Stop when recording has completed break def create_video(): """Combine all images into timelapse video with ffmpeg """ mp4_name = os.path.join(current_hightide_dir, f'{timestamp}_{height}.mp4') now.strftime('%Y-%m-%d_%H-%M') command = [ 'ffmpeg', '-hide_banner', '-loglevel', 'panic', '-pattern_type', 'glob', '-framerate', '10', '-i', current_hightide_dir + '/jpg/*.jpg', mp4_name ] subprocess.run(command) # Create counter t = [] # Open browser get_images() # Create video create_video()