You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
spectur/spectur_live_view.py

279 lines
7.4 KiB
Python

5 years ago
"""spectur_live_view
Open an automated selenium webdriver, and download images from Live View mode.
"""
import os
import glob
5 years ago
import time
import pytz
import shutil
import subprocess
5 years ago
from io import BytesIO
from datetime import datetime
import requests
import yaml
5 years ago
import numpy as np
from PIL import Image
from selenium import webdriver
from selenium.webdriver.firefox.options import Options
from selenium.common.exceptions import (NoSuchElementException,
StaleElementReferenceException,
WebDriverException)
5 years ago
CAM_ID = 0
5 years ago
URL = 'https://camera.uwatchit.com.au/UWatchitWebV2/User/LiveView#'
OUTPUT_DIR = 'images'
TIMEZONE = 'Australia/Sydney'
WAIT_TIME = 0.1
RECORDING_DURATION = 20
SILENT = True
IMAGE_MASK = 'mask/overtopping-mask.png'
username = ''
password = ''
5 years ago
# Get water level of next high tide
pwd = os.path.dirname(os.path.abspath(__file__))
next_high_tide = glob.glob(pwd + '/NEXT_HIGH_TIDE=*')[0]
height = next_high_tide.rsplit('_', 1)[-1]
# Get current time
now = datetime.now()
year = now.strftime('%Y')
timestamp = now.strftime('%Y-%m-%d_%H-%M')
current_hightide_dir = os.path.join(OUTPUT_DIR, year, f'{timestamp}_{height}')
5 years ago
def start_session():
"""Start automated browser session.
Returns:
selenium webdriver object
"""
# Start webdriver
options = Options()
options.headless = True
executable_path = os.path.join(pwd, 'geockdriver')
driver = webdriver.Firefox(executable_path=executable_path, options=options)
5 years ago
return driver
def login(driver):
"""Log into Spectur camera portal.
Args:
driver: selenium webdriver object
Returns:
cookies from current browser session
"""
driver.get(URL)
# Get login fields
usr = driver.find_element_by_id('UserName')
pss = driver.find_element_by_id('Password')
btn = driver.find_element_by_id('submitlogin')
5 years ago
if (username and password):
# Login automatically if details provided
usr.send_keys(username)
pss.send_keys(password)
btn.click()
else:
# Try to get credientials saved locally
try:
credential_path = os.path.join(os.path.expanduser('~'), '.spectur')
with open(credential_path, 'r') as f:
credentials = yaml.safe_load(f.read())
# Input login details
usr.send_keys(credentials['USERNAME'])
pss.send_keys(credentials['PASSWORD'])
btn.click()
except FileNotFoundError:
msg = f"""Provide spectur login details either:
1. In the header of this script; or
2. In the file {credential_path}.
"""
ValueError(msg)
5 years ago
# Extract cookies
session = requests.Session()
for c in driver.get_cookies():
session.cookies.set(c['name'], c['value'])
return session.cookies
def return_to_live_view(driver):
"""Return to live camera feed in Spectur portal.
"""
driver.get(URL)
driver.execute_script('ChangeCam({})'.format(CAM_ID))
def get_image_timestamp(driver):
"""Get timestamp from image link.
Args:
driver: selenium webdriver object
Returns:
UNIX timestamp, in seconds
"""
im_url = driver.find_element_by_id('camimage').get_property('src')
timestamp = int(im_url.rsplit('&', 1)[-1]) / 1000
return timestamp
def get_next_image(driver, cookies):
"""Get next image.
Args:
driver: selenium webdriver object
cookies: cookies from current browser session
"""
# Get timestamp
t_current = get_image_timestamp(driver)
# Wait until image timestamp is updated
while t_current == get_image_timestamp(driver):
time.sleep(WAIT_TIME)
# Get image URL
im_url = driver.find_element_by_id('camimage').get_property('src')
# Get jpeg data, and convert to image object
page = requests.get(im_url, cookies=cookies)
im = Image.open(BytesIO(page.content))
# Get EXIF timestamp
datestr = im._getexif()[36867]
# Convert to local time
t_naive = datetime.strptime(datestr, '%Y:%m:%d %H:%M:%S')
t_utc = t_naive.replace(tzinfo=pytz.utc)
t_local = t_utc.astimezone(pytz.timezone(TIMEZONE))
# Save image
jpg_name = os.path.join(current_hightide_dir, 'jpg',
t_local.strftime('%Y-%m-%d_%H-%M-%S%z') + '.jpg')
5 years ago
os.makedirs(os.path.dirname(jpg_name), exist_ok=True)
im.save(jpg_name)
# Get battery level
with open(os.path.join(OUTPUT_DIR, 'battery.txt'), 'a') as f:
battery_str = driver.find_element_by_id('battery').text
f.write('{}\t{}\n'.format(t_local.isoformat(), battery_str[:-1]))
# Update time counter
current_time = float(time.mktime(t_local.timetuple()))
if current_time not in t:
t.append(current_time)
5 years ago
# Check if recording has completed
if t[-1] - t[0] > RECORDING_DURATION:
raise (StopIteration)
5 years ago
# Count images collected
n = len(t)
if (n > 2) and not SILENT:
5 years ago
# Get frames per second
fps = 1 / np.median(np.diff(t))
# Count dropped frames
n_expected = fps * (t[-1] - t[0])
n_dropped = np.max([0, int(n_expected - n + 1)])
5 years ago
msg = '{} total images: {} fps: {} dropped frames: {}'.format(
t_local.isoformat(), n, fps, n_dropped)
print(msg, end='\r', flush=True)
5 years ago
def get_images():
5 years ago
# Start browser session
driver = start_session()
cookies = login(driver)
return_to_live_view(driver)
while True:
try:
# Get latest image
get_next_image(driver, cookies)
except (NoSuchElementException, StaleElementReferenceException,
WebDriverException):
# Navigate to Live View
return_to_live_view(driver)
except ConnectionError:
# Attempt to re-login if server goes down
time.sleep(5)
cookies = login(driver)
except StopIteration:
# Stop when recording has completed
break
5 years ago
def create_video():
"""Combine all images into timelapse video with ffmpeg
"""
mp4_name = os.path.join(current_hightide_dir, f'{timestamp}_{height}.mp4')
now.strftime('%Y-%m-%d_%H-%M')
command = [
'ffmpeg', '-hide_banner', '-loglevel', 'panic', '-pattern_type',
'glob', '-framerate', '10', '-i', current_hightide_dir + '/jpg/*.jpg',
mp4_name
]
subprocess.run(command)
def get_brightness(jpg_name, mask):
"""Get mean brighness of an image with a mask applied
"""
# Load image
im = Image.open(jpg_name)
# Load mask and resize of necessary
width, height = mask.size
im = im.resize((width, height), Image.LANCZOS)
# Apply mask to image
im.paste(mask, mask=mask)
# Convert to greyscale
im_grey = im.convert('LA')
# Calculate mean intensity
b = np.array(im_grey).mean()
return b
5 years ago
# Create counter
t = []
# Open browser
get_images()
# Create video
create_video()
# Load image mask
mask = Image.open(os.path.join(pwd, IMAGE_MASK))
# Find brightest image for current high tide
jpg_names = glob.glob(current_hightide_dir + '/jpg/*')
b = np.zeros(len(jpg_names))
for i, jpg_name in enumerate(jpg_names):
b[i] = get_brightness(jpg_name, mask)
# Copy brightest image to root directory for current year
src_name = jpg_names[b.argmax()]
dst_name = current_hightide_dir + '.jpg'
shutil.copy(src_name, dst_name)