You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
286 lines
8.9 KiB
Python
286 lines
8.9 KiB
Python
3 years ago
|
"""
|
||
|
Test download
|
||
|
"""
|
||
|
|
||
|
import datetime
|
||
|
import re
|
||
|
from pathlib import Path
|
||
|
from urllib.parse import urljoin
|
||
|
from os import path, makedirs
|
||
|
|
||
|
import attr
|
||
|
import pytz
|
||
|
import requests
|
||
|
#import typer
|
||
|
import pandas as pd
|
||
|
from loguru import logger
|
||
|
from timezonefinder import TimezoneFinder
|
||
|
from werkzeug.utils import secure_filename
|
||
|
|
||
|
#app = typer.Typer()
|
||
|
coastsnap_sites = pd.read_csv("C:/Users/z5079346/OneDrive - UNSW/Projects/Coastsnap_test/CoastSnap_Sites.csv")
|
||
|
|
||
|
|
||
|
|
||
|
@attr.s()
|
||
|
class SpotteronImage:
|
||
|
"""
|
||
|
Parses the dictionary from the Spotteron API into an object to make it easier to
|
||
|
get the required parameters
|
||
|
"""
|
||
|
|
||
|
raw_data = attr.ib()
|
||
|
site_name = attr.ib()
|
||
|
|
||
|
__img_url = "https://files.spotteron.com/images/spots/"
|
||
|
|
||
|
def exists(self, folder):
|
||
|
"""
|
||
|
Check if image has already been downloaded
|
||
|
"""
|
||
|
folder = path.join(folder, str(self.dt.year))
|
||
|
output_filepath = Path(folder, self.output_filename)
|
||
|
|
||
|
if output_filepath.is_file():
|
||
|
return True
|
||
|
else:
|
||
|
return False
|
||
|
|
||
|
def save(self, folder):
|
||
|
|
||
|
# Concatenate year to parent dir
|
||
|
# For example: "C:\Users\z5079346\OneDrive - UNSW\My files\CoastSnap\Images\alex\Processed"
|
||
|
# + "\2022"
|
||
|
folder = path.join(folder, str(self.dt.year))
|
||
|
|
||
|
# Check if the folder already exists
|
||
|
if not path.exists(folder):
|
||
|
makedirs(folder)
|
||
|
|
||
|
# Concatentate filename to parent dir
|
||
|
# For example: "C:\Users\z5079346\OneDrive - UNSW\My files\CoastSnap\Images\alex\Processed\2022"
|
||
|
# + "\1641158046.Mon.Jan.03_07_14_06.AEST.2022.alex.snap.Raymond_b.jpg"
|
||
|
output_filepath = Path(folder, self.output_filename)
|
||
|
|
||
|
logger.info(f"Downloading {output_filepath}")
|
||
|
response = requests.get(self.url, stream=True)
|
||
|
if response.status_code == 200:
|
||
|
with open(output_filepath, "wb") as f:
|
||
|
f.write(response.content)
|
||
|
|
||
|
@property
|
||
|
def id(self):
|
||
|
return self.raw_data["id"]
|
||
|
|
||
|
@property
|
||
|
def lat(self):
|
||
|
return self.raw_data["attributes"]["latitude"]
|
||
|
|
||
|
@property
|
||
|
def lon(self):
|
||
|
return self.raw_data["attributes"]["longitude"]
|
||
|
|
||
|
@property
|
||
|
def tz(self):
|
||
|
"""
|
||
|
Finds timezone based on lon/lat
|
||
|
"""
|
||
|
|
||
|
tf = TimezoneFinder()
|
||
|
return tf.timezone_at(lng=self.lon, lat=self.lat)
|
||
|
|
||
|
@property
|
||
|
def dt(self):
|
||
|
"""
|
||
|
Parses 'spotted at' attributes and returns a timezone aware python datetime
|
||
|
"""
|
||
|
spotted_at = self.raw_data["attributes"]["spotted_at"]
|
||
|
spotted_dt = datetime.datetime.strptime(spotted_at, "%Y-%m-%d %H:%M:%S")
|
||
|
spotted_dt_tz = pytz.timezone(self.tz).localize(spotted_dt)
|
||
|
return spotted_dt_tz
|
||
|
|
||
|
@property
|
||
|
def timestamp(self):
|
||
|
return datetime.datetime.timestamp(self.dt)
|
||
|
|
||
|
@property
|
||
|
def url(self):
|
||
|
"""
|
||
|
URL to download the image
|
||
|
"""
|
||
|
img_name = f"{self.raw_data['attributes']['image']}.jpg"
|
||
|
return urljoin(self.__img_url, img_name)
|
||
|
|
||
|
@property
|
||
|
def author(self):
|
||
|
author = self.raw_data["attributes"]["spotted_by_name"]
|
||
|
|
||
|
# Sanitize author and remove spaces
|
||
|
author = secure_filename(author)
|
||
|
author = re.sub(r"\s+", "", author)
|
||
|
return author
|
||
|
|
||
|
@property
|
||
|
def output_filename(self):
|
||
|
"""
|
||
|
Define the name of the image depending on its properties. Optional site_name
|
||
|
can be included.
|
||
|
"""
|
||
|
|
||
|
if self.site_name:
|
||
|
return (
|
||
|
f"{int(self.timestamp)}."
|
||
|
f'{self.dt.strftime("%a.%b.%d_%H_%M_%S.")}{self.dt.tzname()}.{self.dt.strftime("%Y")}.'
|
||
|
f"{self.site_name}.snap.{self.author}.jpg"
|
||
|
)
|
||
|
else:
|
||
|
print("Please provide a site-name. Otherwise file names won't follow the reccomended naming convention")
|
||
|
return (
|
||
|
f"{int(self.timestamp)}."
|
||
|
f'{self.dt.strftime("%a.%b.%d_%H_%M_%S.%z.%Y")}.{self.author}.jpg'
|
||
|
)
|
||
|
|
||
|
|
||
|
@attr.s
|
||
|
class Spotteron:
|
||
|
"""
|
||
|
Refer to https://www.spotteron.com/docs/api/v2?topic_id=37&key=LDazWbK5n62lbNA4hRNHtLa6hkyqz6Tr
|
||
|
for API documentation
|
||
|
"""
|
||
|
|
||
|
api_url = "https://www.spotteron.com/api/v2/spots"
|
||
|
|
||
|
def save_images(self, root_id, output_folder, site_name, limit, overwrite):
|
||
|
|
||
|
page = 1
|
||
|
n_downloaded = 0
|
||
|
while True:
|
||
|
json_data = self.get_data(page=page, root_id=root_id)
|
||
|
images = [
|
||
|
SpotteronImage(raw_data=x, site_name=site_name)
|
||
|
for x in json_data["data"]
|
||
|
]
|
||
|
|
||
|
if not images:
|
||
|
logger.info("No images returned. Check correct root_id is supplied")
|
||
|
|
||
|
for img in images:
|
||
|
|
||
|
if img.exists(output_folder) and overwrite == False:
|
||
|
logger.info("Existing images found. Stopping getting images")
|
||
|
break
|
||
|
else:
|
||
|
img.save(output_folder) # THIS SHOULD BE THE PARENT DIR
|
||
|
n_downloaded += 1
|
||
|
|
||
|
if n_downloaded >= limit:
|
||
|
logger.info(f"Downloaded limit of {limit} images. Stopping.")
|
||
|
break
|
||
|
|
||
|
# Break out of the nested loop if we break on an image
|
||
|
else:
|
||
|
page += 1
|
||
|
continue
|
||
|
break
|
||
|
|
||
|
pass
|
||
|
|
||
|
logger.info("Download completed")
|
||
|
|
||
|
@classmethod
|
||
|
def get_data(self, page, root_id=None):
|
||
|
"""
|
||
|
Gets the json data for a particular topic_id and root_id. Returns a dictionary
|
||
|
containing data returned by api.
|
||
|
"""
|
||
|
|
||
|
# Defined by Spotteron for coastsnap stations
|
||
|
topic_id = 37
|
||
|
|
||
|
payload = {
|
||
|
"filter[topic_id]": topic_id,
|
||
|
"limit": 5,
|
||
|
"page": page,
|
||
|
}
|
||
|
|
||
|
if root_id:
|
||
|
payload["filter[root_id]"] = root_id
|
||
|
|
||
|
r = requests.get(self.api_url, params=payload)
|
||
|
return r.json()
|
||
|
|
||
|
|
||
|
# @app.command()
|
||
|
# def from_spotteron(
|
||
|
# root_id: int = typer.Argument(..., help="Spotteron id of Coastsnap station."),
|
||
|
# output_folder: str = typer.Argument(..., help="Path to save images to."),
|
||
|
# site_name: str = typer.Option(None, help="Add site to filename."),
|
||
|
# limit: int = typer.Option(30, help="Max number of images to save."),
|
||
|
# overwrite: bool = typer.Option(False, help="Overwrite downloaded images?"),
|
||
|
# ):
|
||
|
"""
|
||
|
Downloads images from Spotteron API and saves to folder
|
||
|
|
||
|
"""
|
||
|
def from_spotteron(root_id, output_folder, site_name, limit, overwrite):
|
||
|
spot = Spotteron()
|
||
|
spot.save_images(root_id, output_folder, site_name, limit, overwrite)
|
||
|
|
||
|
|
||
|
|
||
|
# @app.command()
|
||
|
# def from_spotteron_batch(
|
||
|
# overwrite: bool = typer.Option(False, help="Overwrite downloaded images?"),
|
||
|
# ):
|
||
|
# """
|
||
|
# Downloads images from Spotteron API for all beaches specified in batch_download.csv
|
||
|
# """
|
||
|
|
||
|
# #all_beaches = pd.read_csv(r"C:\Users\z5079346\OneDrive - UNSW\Code\coastsnap\coastsnap\spotteron_batch_download\batch_download.csv")
|
||
|
|
||
|
# # Retrieve Parent Directory in batch_download.csv
|
||
|
# parent_directory = coastsnap_sites.parent_directory[0]
|
||
|
# print(parent_directory)
|
||
|
|
||
|
# for index, beach in coastsnap_sites.iterrows():
|
||
|
|
||
|
# # Concatentate the parent directory, site name and 'Processed'
|
||
|
# # to create the output site_path
|
||
|
# site_name = beach.site_name
|
||
|
# site_path = path.join(parent_directory, site_name, 'Processed')
|
||
|
|
||
|
# # Download the images for a given site
|
||
|
# logger.info(f"Downloading images for {beach.site_name}")
|
||
|
# from_spotteron(beach.root_id, site_path, site_name, limit = beach.limit, overwrite = overwrite)
|
||
|
|
||
|
# if __name__ == "__main__":
|
||
|
# app()
|
||
|
|
||
|
#overwrite: bool = typer.Option(False, help="Overwrite downloaded images?"),
|
||
|
# ):
|
||
|
"""
|
||
|
Downloads images from Spotteron API for all beaches specified in batch_download.csv
|
||
|
"""
|
||
|
|
||
|
#all_beaches = pd.read_csv(r"C:\Users\z5079346\OneDrive - UNSW\Code\coastsnap\coastsnap\spotteron_batch_download\batch_download.csv")
|
||
|
|
||
|
# Retrieve Parent Directory in batch_download.csv
|
||
|
parent_directory = coastsnap_sites.parent_directory[0]
|
||
|
print(parent_directory)
|
||
|
|
||
|
for index, beach in coastsnap_sites.iterrows():
|
||
|
|
||
|
# Concatentate the parent directory, site name and 'Processed'
|
||
|
# to create the output site_path
|
||
|
site_name = beach.site_name
|
||
|
site_path = path.join(parent_directory, site_name, 'Processed')
|
||
|
|
||
|
# Download the images for a given site
|
||
|
logger.info(f"Downloading images for {beach.site_name}")
|
||
|
from_spotteron(beach.root_id, site_path, site_name, limit = beach.limit, overwrite = False)
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|