You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
spectur/spectur_live_view.py

245 lines
6.4 KiB
Python

"""spectur_live_view
Open an automated selenium webdriver, and download images from Live View mode.
"""
import os
import glob
import time
import pytz
import subprocess
from io import BytesIO
from datetime import datetime
import requests
import yaml
import numpy as np
from PIL import Image
from selenium import webdriver
from selenium.webdriver.firefox.options import Options
from selenium.common.exceptions import (NoSuchElementException,
StaleElementReferenceException,
WebDriverException)
CAM_ID = 0
URL = 'https://camera.uwatchit.com.au/UWatchitWebV2/User/LiveView#'
OUTPUT_DIR = 'images'
TIMEZONE = 'Australia/Sydney'
WAIT_TIME = 0.1
RECORDING_DURATION = 20
SILENT = True
username = ''
password = ''
# Get water level of next high tide
pwd = os.path.dirname(os.path.abspath(__file__))
next_high_tide = glob.glob(pwd + '/NEXT_HIGH_TIDE=*')[0]
height = next_high_tide.rsplit('_', 1)[-1]
# Get current time
now = datetime.now()
year = now.strftime('%Y')
timestamp = now.strftime('%Y-%m-%d_%H-%M')
current_hightide_dir = os.path.join(OUTPUT_DIR, year, f'{timestamp}_{height}')
def start_session():
"""Start automated browser session.
Returns:
selenium webdriver object
"""
# Start webdriver
options = Options()
options.headless = True
driver = webdriver.Firefox(options=options)
return driver
def login(driver):
"""Log into Spectur camera portal.
Args:
driver: selenium webdriver object
Returns:
cookies from current browser session
"""
driver.get(URL)
# Get login fields
usr = driver.find_element_by_id('UserName')
pss = driver.find_element_by_id('Password')
btn = driver.find_element_by_id('submitlogin')
if (username and password):
# Login automatically if details provided
usr.send_keys(username)
pss.send_keys(password)
btn.click()
else:
# Try to get credientials saved locally
try:
credential_path = os.path.join(os.path.expanduser('~'), '.spectur')
with open(credential_path, 'r') as f:
credentials = yaml.safe_load(f.read())
# Input login details
usr.send_keys(credentials['USERNAME'])
pss.send_keys(credentials['PASSWORD'])
btn.click()
except FileNotFoundError:
msg = f"""Provide spectur login details either:
1. In the header of this script; or
2. In the file {credential_path}.
"""
ValueError(msg)
# Extract cookies
session = requests.Session()
for c in driver.get_cookies():
session.cookies.set(c['name'], c['value'])
return session.cookies
def return_to_live_view(driver):
"""Return to live camera feed in Spectur portal.
"""
driver.get(URL)
driver.execute_script('ChangeCam({})'.format(CAM_ID))
def get_image_timestamp(driver):
"""Get timestamp from image link.
Args:
driver: selenium webdriver object
Returns:
UNIX timestamp, in seconds
"""
im_url = driver.find_element_by_id('camimage').get_property('src')
timestamp = int(im_url.rsplit('&', 1)[-1]) / 1000
return timestamp
def get_next_image(driver, cookies):
"""Get next image.
Args:
driver: selenium webdriver object
cookies: cookies from current browser session
"""
# Get timestamp
t_current = get_image_timestamp(driver)
# Wait until image timestamp is updated
while t_current == get_image_timestamp(driver):
time.sleep(WAIT_TIME)
# Get image URL
im_url = driver.find_element_by_id('camimage').get_property('src')
# Get jpeg data, and convert to image object
page = requests.get(im_url, cookies=cookies)
im = Image.open(BytesIO(page.content))
# Get EXIF timestamp
datestr = im._getexif()[36867]
# Convert to local time
t_naive = datetime.strptime(datestr, '%Y:%m:%d %H:%M:%S')
t_utc = t_naive.replace(tzinfo=pytz.utc)
t_local = t_utc.astimezone(pytz.timezone(TIMEZONE))
# Save image
jpg_name = os.path.join(current_hightide_dir, 'jpg',
t_local.strftime('%Y-%m-%d_%H-%M-%S%z') + '.jpg')
os.makedirs(os.path.dirname(jpg_name), exist_ok=True)
im.save(jpg_name)
# Get battery level
with open(os.path.join(OUTPUT_DIR, 'battery.txt'), 'a') as f:
battery_str = driver.find_element_by_id('battery').text
f.write('{}\t{}\n'.format(t_local.isoformat(), battery_str[:-1]))
# Update time counter
current_time = float(time.mktime(t_local.timetuple()))
if current_time not in t:
t.append(current_time)
# Check if recording has completed
if t[-1] - t[0] > RECORDING_DURATION:
raise (StopIteration)
# Count images collected
n = len(t)
if (n > 2) and not SILENT:
# Get frames per second
fps = 1 / np.median(np.diff(t))
# Count dropped frames
n_expected = fps * (t[-1] - t[0])
n_dropped = np.max([0, int(n_expected - n + 1)])
msg = '{} total images: {} fps: {} dropped frames: {}'.format(
t_local.isoformat(), n, fps, n_dropped)
print(msg, end='\r', flush=True)
def get_images():
# Start browser session
driver = start_session()
cookies = login(driver)
return_to_live_view(driver)
while True:
try:
# Get latest image
get_next_image(driver, cookies)
except (NoSuchElementException, StaleElementReferenceException,
WebDriverException):
# Navigate to Live View
return_to_live_view(driver)
except ConnectionError:
# Attempt to re-login if server goes down
time.sleep(5)
cookies = login(driver)
except StopIteration:
# Stop when recording has completed
break
def create_video():
"""Combine all images into timelapse video with ffmpeg
"""
mp4_name = os.path.join(current_hightide_dir, f'{timestamp}_{height}.mp4')
now.strftime('%Y-%m-%d_%H-%M')
command = [
'ffmpeg',
'-pattern_type',
'glob',
'-framerate',
'10',
'-i',
current_hightide_dir + '/jpg/*.jpg',
mp4_name,
]
subprocess.run(command, stdout=subprocess.DEVNULL)
# Create counter
t = []
# Open browser
get_images()
# Create video
create_video()