You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
spectur/spectur_live_view.py

286 lines
7.6 KiB
Python

#!/home/cronbot/anaconda3/bin/python
"""spectur_live_view
Open an automated selenium webdriver, and download images from Live View mode.
"""
import os
import sys
import glob
import time
import pytz
import shutil
import subprocess
from io import BytesIO
from datetime import datetime
import requests
import yaml
import numpy as np
from PIL import Image
from selenium import webdriver
from selenium.webdriver.firefox.options import Options
from selenium.common.exceptions import (NoSuchElementException,
StaleElementReferenceException,
WebDriverException)
CAM_ID = 0
URL = 'https://camera.uwatchit.com.au/UWatchitWebV2/User/LiveView#'
OUTPUT_DIR = 'images'
TIMEZONE = 'Australia/Sydney'
WAIT_TIME = 0.1
RECORDING_DURATION = 120
SILENT = True
IMAGE_MASK = 'mask/overtopping-mask.png'
username = ''
password = ''
# Get water level of next high tide
pwd = os.path.dirname(os.path.abspath(__file__))
next_high_tide = glob.glob(pwd + '/NEXT_HIGH_TIDE=*')[0]
height = next_high_tide.rsplit('_', 1)[-1]
# Get current time
now = datetime.now()
year = now.strftime('%Y')
timestamp = now.strftime('%Y-%m-%d_%H-%M')
current_hightide_dir = os.path.join(pwd, OUTPUT_DIR, year,
f'{timestamp}_{height}')
def start_session():
"""Start automated browser session.
Returns:
selenium webdriver object
"""
# Start webdriver
options = Options()
options.headless = True
executable_path = os.path.join(pwd, 'geckodriver')
driver = webdriver.Firefox(executable_path=executable_path,
options=options)
return driver
def login(driver):
"""Log into Spectur camera portal.
Args:
driver: selenium webdriver object
Returns:
cookies from current browser session
"""
driver.get(URL)
# Get login fields
usr = driver.find_element_by_id('UserName')
pss = driver.find_element_by_id('Password')
btn = driver.find_element_by_id('submitlogin')
if (username and password):
# Login automatically if details provided
usr.send_keys(username)
pss.send_keys(password)
btn.click()
else:
# Try to get credientials saved locally
try:
credential_path = os.path.join(os.path.expanduser('~'), '.spectur')
with open(credential_path, 'r') as f:
credentials = yaml.safe_load(f.read())
# Input login details
usr.send_keys(credentials['USERNAME'])
pss.send_keys(credentials['PASSWORD'])
btn.click()
except FileNotFoundError:
msg = f"""Provide spectur login details either:
1. In the header of this script; or
2. In the file {credential_path}.
"""
ValueError(msg)
# Extract cookies
session = requests.Session()
for c in driver.get_cookies():
session.cookies.set(c['name'], c['value'])
return session.cookies
def return_to_live_view(driver):
"""Return to live camera feed in Spectur portal.
"""
driver.get(URL)
driver.execute_script('ChangeCam({})'.format(CAM_ID))
def get_image_timestamp(driver):
"""Get timestamp from image link.
Args:
driver: selenium webdriver object
Returns:
UNIX timestamp, in seconds
"""
im_url = driver.find_element_by_id('camimage').get_property('src')
timestamp = int(im_url.rsplit('&', 1)[-1]) / 1000
return timestamp
def get_next_image(driver, cookies):
"""Get next image.
Args:
driver: selenium webdriver object
cookies: cookies from current browser session
"""
# Get timestamp
t_current = get_image_timestamp(driver)
# Wait until image timestamp is updated
while t_current == get_image_timestamp(driver):
time.sleep(WAIT_TIME)
# Get image URL
im_url = driver.find_element_by_id('camimage').get_property('src')
# Get jpeg data, and convert to image object
page = requests.get(im_url, cookies=cookies)
im = Image.open(BytesIO(page.content))
# Get EXIF timestamp
datestr = im._getexif()[36867]
# Convert to local time
t_naive = datetime.strptime(datestr, '%Y:%m:%d %H:%M:%S')
t_utc = t_naive.replace(tzinfo=pytz.utc)
t_local = t_utc.astimezone(pytz.timezone(TIMEZONE))
# Save image
jpg_name = os.path.join(current_hightide_dir, 'jpg',
t_local.strftime('%Y-%m-%d_%H-%M-%S%z') + '.jpg')
os.makedirs(os.path.dirname(jpg_name), exist_ok=True)
im.save(jpg_name)
# Get battery level
with open(os.path.join(OUTPUT_DIR, 'battery.txt'), 'a') as f:
battery_str = driver.find_element_by_id('battery').text
f.write('{}\t{}\n'.format(t_local.isoformat(), battery_str[:-1]))
# Update time counter
current_time = float(time.mktime(t_local.timetuple()))
if current_time not in t:
t.append(current_time)
# Check if recording has completed
if t[-1] - t[0] > RECORDING_DURATION:
raise (StopIteration)
# Count images collected
n = len(t)
if (n > 2) and not SILENT:
# Get frames per second
fps = 1 / np.median(np.diff(t))
# Count dropped frames
n_expected = fps * (t[-1] - t[0])
n_dropped = np.max([0, int(n_expected - n + 1)])
msg = '{} total images: {} fps: {} dropped frames: {}'.format(
t_local.isoformat(), n, fps, n_dropped)
print(msg, end='\r', flush=True)
def get_images():
# Start browser session
driver = start_session()
cookies = login(driver)
return_to_live_view(driver)
while True:
try:
# Get latest image
get_next_image(driver, cookies)
except (NoSuchElementException, StaleElementReferenceException,
WebDriverException):
# Navigate to Live View
return_to_live_view(driver)
except ConnectionError:
# Attempt to re-login if server goes down
time.sleep(5)
cookies = login(driver)
except StopIteration:
# Stop when recording has completed
break
def create_video():
"""Combine all images into timelapse video with ffmpeg
"""
mp4_name = os.path.join(current_hightide_dir, f'{timestamp}_{height}.mp4')
now.strftime('%Y-%m-%d_%H-%M')
command = [
'ffmpeg', '-hide_banner', '-loglevel', 'panic', '-pattern_type',
'glob', '-framerate', '10', '-i', current_hightide_dir + '/jpg/*.jpg',
mp4_name
]
subprocess.run(command)
def get_brightness(jpg_name, mask):
"""Get mean brighness of an image with a mask applied
"""
# Load image
im = Image.open(jpg_name)
# Load mask and resize of necessary
width, height = mask.size
im = im.resize((width, height), Image.LANCZOS)
# Apply mask to image
im.paste(mask, mask=mask)
# Convert to greyscale
im_grey = im.convert('LA')
# Calculate mean intensity
b = np.array(im_grey).mean()
return b
# Create counter
t = []
# Open browser
get_images()
# Force browser process to end (bug with selenium)
subprocess.run(['killall', 'firefox'])
# Create video
create_video()
# Load image mask
mask = Image.open(os.path.join(pwd, IMAGE_MASK))
# Find brightest image for current high tide
jpg_names = glob.glob(current_hightide_dir + '/jpg/*')
b = np.zeros(len(jpg_names))
for i, jpg_name in enumerate(jpg_names):
b[i] = get_brightness(jpg_name, mask)
# Copy brightest image to root directory for current year
src_name = jpg_names[b.argmax()]
dst_name = current_hightide_dir + '.jpg'
shutil.copy(src_name, dst_name)