You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
236 lines
7.5 KiB
Python
236 lines
7.5 KiB
Python
"""major_projects_grabber.py
|
|
Download documents from the NSW DPE Major Projects website.
|
|
|
|
Example usage:
|
|
# Grab a single project modification using its job id, and save in 'files'
|
|
major_projects_grabber -i 1746 -o files
|
|
|
|
# Grab all modifications in search results page, and save in current folder
|
|
major_projects_grabber -u "http://majorprojects.planning.nsw.gov.au/index.pl?action=search&authority_id=547"
|
|
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import shutil
|
|
import logging
|
|
import requests
|
|
import argparse
|
|
import warnings
|
|
from lxml import html
|
|
from tqdm import tqdm
|
|
from requests.exceptions import ConnectionError, InvalidURL
|
|
|
|
|
|
def make_safe(s):
|
|
"""Remove characters that would be invalid in a filepath"""
|
|
# Remove trailing spaces
|
|
s_safe = s.strip()
|
|
|
|
# Remove '\', '*', '"', '<', '>' '|'
|
|
s_safe = re.sub('\\\|\*|"|<|>\|', '', s_safe)
|
|
|
|
# Replace '/' and ':' with '-'
|
|
s_safe = re.sub(':', ' -', s_safe)
|
|
s_safe = re.sub('/', '-', s_safe)
|
|
|
|
return s_safe
|
|
|
|
|
|
def mod_ids_from_search(search_results_url):
|
|
"""Get modification job IDs from search results URL"""
|
|
|
|
# Get HTML of search results page
|
|
page = requests.get(search_results_url)
|
|
|
|
# Create HTML tree
|
|
tree = html.fromstring(page.content)
|
|
|
|
# Find job ids of items in results list
|
|
mod_ids = []
|
|
mods = tree.xpath('//*[@class="vpa_list"]/tr/td[1]/a')
|
|
for mod in mods:
|
|
mod_ids.append(re.search('(?<=job_id=)\d+', mod.get('href')).group())
|
|
|
|
return mod_ids
|
|
|
|
|
|
def get_document_list(mod_id, output_dir):
|
|
"""Get list of documents from project modification ID"""
|
|
|
|
# Get html from mod page
|
|
mod_url = ('http://majorprojects.planning.nsw.gov.au/'
|
|
'index.pl?action=view_job&job_id=' + mod_id)
|
|
mod_page = requests.get(mod_url)
|
|
mod_tree = html.fromstring(mod_page.content)
|
|
|
|
# Get mod details
|
|
project_name = mod_tree.xpath(
|
|
'//*[@class="vpa_header vpa_header_contrast"]/h2')[0].text
|
|
mod_name = mod_tree.xpath(
|
|
'//*[@class="vpa_header vpa_header_contrast"]/h1')[0].text
|
|
|
|
# Get list of document folders
|
|
folders = mod_tree.xpath('//div[@class="folder_row"]')
|
|
|
|
# Remove invalid characters before creating folders
|
|
project_name = make_safe(project_name)
|
|
mod_name = make_safe(mod_name)
|
|
|
|
# Create modification folder
|
|
mod_dir = os.path.join(output_dir, project_name, mod_name)
|
|
try:
|
|
os.makedirs(mod_dir, exist_ok=True)
|
|
except FileNotFoundError:
|
|
# Fix destination path if longer than 255 characters (Windows only)
|
|
mod_dir = '\\\\?\\' + os.path.abspath(mod_dir)
|
|
os.makedirs(mod_dir, exist_ok=True)
|
|
|
|
# Add note if no documents are found on portal
|
|
if not folders:
|
|
txt_name = 'No documents on DPE portal for this modification.txt'
|
|
open(os.path.join(mod_dir, txt_name), 'a').close()
|
|
|
|
# Create link to DPE Major Projects page for current modification
|
|
text = """<html>
|
|
<meta http-equiv="refresh" content="0; url={}">
|
|
</html>""".format(mod_url)
|
|
with open(os.path.join(mod_dir, 'DPE-portal-page.html'), 'w') as f:
|
|
f.write(text)
|
|
|
|
document_data = []
|
|
for folder in folders:
|
|
folder_name = folder.xpath('a[2]')[0].text.strip()
|
|
|
|
# Get documents in current folder
|
|
documents = folder.xpath('ul/li/a')
|
|
for document in documents:
|
|
doc = {}
|
|
doc['url'] = document.get('href')
|
|
doc['name'] = document.text
|
|
doc['document_path'] = os.path.join(
|
|
output_dir, project_name, mod_name, folder_name, doc['name'])
|
|
document_data.append(doc)
|
|
|
|
return document_data
|
|
|
|
|
|
def download_document(url, document_path):
|
|
"""Download document from given url"""
|
|
|
|
# Check if destination path is too long (Windows filename limitation)
|
|
try:
|
|
# Create output directories as required
|
|
os.makedirs(os.path.dirname(document_path), exist_ok=True)
|
|
|
|
# Check if empty file can be created in this location
|
|
open(document_path, 'a').close()
|
|
except FileNotFoundError:
|
|
# Tweak long path for Windows
|
|
document_path = '\\\\?\\' + os.path.abspath(document_path)
|
|
os.makedirs(os.path.dirname(document_path), exist_ok=True)
|
|
|
|
# Check if a non-empty file exists
|
|
if os.path.isfile(document_path) and os.path.getsize(document_path) > 0:
|
|
pass
|
|
else:
|
|
try:
|
|
# Attempt to download file
|
|
r = requests.get(url, stream=True)
|
|
except (ConnectionError, InvalidURL):
|
|
logging.error(
|
|
('Failed to download {4}\n'
|
|
' Project: {1}\n'
|
|
' Modification: {2}\n'
|
|
' Folder: {3}\n').format(*document_path.split(os.sep)))
|
|
return
|
|
|
|
# Write file to disk
|
|
with open(document_path, 'wb') as f:
|
|
shutil.copyfileobj(r.raw, f)
|
|
|
|
|
|
def main():
|
|
example_text = """examples:
|
|
|
|
# Grab a single project modification using its job id, and save in 'files'
|
|
major_projects_grabber -i 1746 -o files
|
|
|
|
# Grab all modifications in search results page, and save in current folder
|
|
major_projects_grabber -u "http://majorprojects.planning.nsw.gov.au/index.pl?action=search&authority_id=547"
|
|
"""
|
|
|
|
# Set up command line arguments
|
|
parser = argparse.ArgumentParser(
|
|
epilog=example_text,
|
|
formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
parser.add_argument(
|
|
'-o', metavar='OUTPUT_DIR', default='.', help='root output directory')
|
|
parser.add_argument(
|
|
'-i',
|
|
metavar='ID',
|
|
default=[],
|
|
help='modification job id(s)',
|
|
nargs='*')
|
|
parser.add_argument('-u', metavar='URL', help='url of search results page')
|
|
|
|
# Print usage if no arguments are provided
|
|
if len(sys.argv) == 1:
|
|
parser.print_help(sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Parse arguments
|
|
args = parser.parse_args()
|
|
search_results_url = args.u
|
|
output_dir = args.o
|
|
mod_ids = args.i
|
|
|
|
# Set up log File
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
log_name = os.path.join(output_dir, 'errors.log')
|
|
logging.basicConfig(filename=log_name, level=logging.ERROR)
|
|
|
|
# Get mod IDs from search results
|
|
if search_results_url:
|
|
search_mod_ids = mod_ids_from_search(search_results_url)
|
|
mod_ids.extend(search_mod_ids)
|
|
|
|
# Get list of documents from given modification IDs
|
|
mod_pbar = tqdm(mod_ids)
|
|
for mod_id in mod_pbar:
|
|
document_data = get_document_list(mod_id, output_dir)
|
|
|
|
# Download documents for current modification
|
|
doc_pbar = tqdm(document_data)
|
|
|
|
for doc in doc_pbar:
|
|
# Update progress bars
|
|
mod_name = doc['document_path'].split(os.sep)[-3]
|
|
doc_name = doc['document_path'].split(os.sep)[-1]
|
|
mod_pbar.set_description(mod_name)
|
|
doc_pbar.set_description(doc_name)
|
|
|
|
# Download document
|
|
download_document(doc['url'], doc['document_path'])
|
|
|
|
# Tidy up console after tqdm
|
|
print('\n')
|
|
|
|
# Stop logging
|
|
logging.shutdown()
|
|
with open(log_name, 'r') as f:
|
|
log_data = f.read()
|
|
|
|
# Check contents of log file
|
|
if log_data:
|
|
warnings.warn(
|
|
'Some files failed to download. See log for details.',
|
|
stacklevel=2)
|
|
else:
|
|
os.remove(log_name)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|