You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

233 lines
7.3 KiB
Python

"""major_projects_grabber.py
Download documents from the NSW DPE Major Projects website.
Example usage:
# Grab a single project modification using its job id, and save in 'files'
major_projects_grabber -i 1746 -o files
# Grab all modifications in search results page, and save in current folder
major_projects_grabber -u "http://majorprojects.planning.nsw.gov.au/index.pl?action=search&authority_id=547"
"""
import os
import re
import sys
import shutil
import logging
import requests
import argparse
import warnings
from lxml import html
from tqdm import tqdm
from requests.exceptions import ConnectionError, InvalidURL
def make_safe(s):
"""Remove characters that would be invalid in a filepath"""
# Remove trailing spaces
s_safe = s.strip()
# Remove '\', '*', '"', '<', '>' '|'
s_safe = re.sub('\\\|\*|"|<|>\|', '', s_safe)
# Replace '/' and ':' with '-'
s_safe = re.sub(':', ' -', s_safe)
s_safe = re.sub('/', '-', s_safe)
return s_safe
def mod_ids_from_search(search_results_url):
"""Get modification job IDs from search results URL"""
# Get HTML of search results page
page = requests.get(search_results_url)
# Create HTML tree
tree = html.fromstring(page.content)
# Find job ids of items in results list
mod_ids = []
mods = tree.xpath('//*[@class="vpa_list"]/tr/td[1]/a')
for mod in mods:
mod_ids.append(re.search('(?<=job_id=)\d+', mod.get('href')).group())
return mod_ids
def get_document_list(mod_id, output_dir):
"""Get list of documents from project modification ID"""
# Get html from mod page
mod_url = ('http://majorprojects.planning.nsw.gov.au/'
'index.pl?action=view_job&job_id=' + mod_id)
mod_page = requests.get(mod_url)
mod_tree = html.fromstring(mod_page.content)
# Get mod details
project_name = mod_tree.xpath(
'//*[@class="vpa_header vpa_header_contrast"]/h2')[0].text
mod_name = mod_tree.xpath(
'//*[@class="vpa_header vpa_header_contrast"]/h1')[0].text
# Get list of document folders
folders = mod_tree.xpath('//div[@class="folder_row"]')
# Remove invalid characters before creating folders
project_name = make_safe(project_name)
mod_name = make_safe(mod_name)
# Create modification folder
mod_dir = os.path.join(output_dir, project_name, mod_name)
try:
os.makedirs(mod_dir, exist_ok=True)
except FileNotFoundError:
# Fix destination path if longer than 255 characters (Windows only)
mod_dir = '\\\\?\\' + os.path.abspath(mod_dir)
os.makedirs(mod_dir, exist_ok=True)
# Add note if no documents are found on portal
if not folders:
txt_name = 'No documents on DPE portal for this modification.txt'
open(os.path.join(mod_dir, txt_name), 'a').close()
# Create link to DPE Major Projects page for current modification
text = """<html>
<meta http-equiv="refresh" content="0; url={}">
</html>""".format(mod_url)
with open(os.path.join(mod_dir, 'DPE-portal-page.html'), 'w') as f:
f.write(text)
document_data = []
for folder in folders:
folder_name = folder.xpath('a[2]')[0].text.strip()
# Get documents in current folder
documents = folder.xpath('ul/li/a')
for document in documents:
doc = {}
doc['url'] = document.get('href')
doc['name'] = document.text
doc['document_path'] = os.path.join(
output_dir, project_name, mod_name, folder_name, doc['name'])
document_data.append(doc)
return document_data
def download_document(url, document_path):
"""Download document from given url"""
# Check if destination path is too long (Windows filename limitation)
try:
open(document_path, 'a').close()
except FileNotFoundError:
document_path = '\\\\?\\' + os.path.abspath(document_path)
# Create output directories as required
os.makedirs(os.path.dirname(document_path), exist_ok=True)
# Check if file exists
if os.path.isfile(document_path):
pass
else:
try:
# Attempt to download file
r = requests.get(url, stream=True)
except (ConnectionError, InvalidURL):
logging.error(
('Failed to download {4}\n'
' Project: {1}\n'
' Modification: {2}\n'
' Folder: {3}\n').format(*document_path.split(os.sep)))
return
# Write file to disk
with open(document_path, 'wb') as f:
shutil.copyfileobj(r.raw, f)
def main():
example_text = """examples:
# Grab a single project modification using its job id, and save in 'files'
major_projects_grabber -i 1746 -o files
# Grab all modifications in search results page, and save in current folder
major_projects_grabber -u "http://majorprojects.planning.nsw.gov.au/index.pl?action=search&authority_id=547"
"""
# Set up command line arguments
parser = argparse.ArgumentParser(
epilog=example_text,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument(
'-o', metavar='OUTPUT_DIR', default='.', help='root output directory')
parser.add_argument(
'-i',
metavar='ID',
default=[],
help='modification job id(s)',
nargs='*')
parser.add_argument('-u', metavar='URL', help='url of search results page')
# Print usage if no arguments are provided
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)
# Parse arguments
args = parser.parse_args()
search_results_url = args.u
output_dir = args.o
mod_ids = args.i
# Set up log File
os.makedirs(output_dir, exist_ok=True)
log_name = os.path.join(output_dir, 'errors.log')
logging.basicConfig(filename=log_name, level=logging.ERROR)
# Get mod IDs from search results
if search_results_url:
search_mod_ids = mod_ids_from_search(search_results_url)
mod_ids.extend(search_mod_ids)
# Get list of documents from given modification IDs
mod_pbar = tqdm(mod_ids)
for mod_id in mod_pbar:
document_data = get_document_list(mod_id, output_dir)
# Download documents for current modification
doc_pbar = tqdm(document_data)
for doc in doc_pbar:
# Update progress bars
mod_name = doc['document_path'].split(os.sep)[-3]
doc_name = doc['document_path'].split(os.sep)[-1]
mod_pbar.set_description(mod_name)
doc_pbar.set_description(doc_name)
# Download document
download_document(doc['url'], doc['document_path'])
# Tidy up console after tqdm
print('\n')
# Stop logging
logging.shutdown()
with open(log_name, 'r') as f:
log_data = f.read()
# Check contents of log file
if log_data:
warnings.warn(
'Some files failed to download. See log for details.',
stacklevel=2)
else:
os.remove(log_name)
if __name__ == '__main__':
main()