You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

233 lines
7.3 KiB
Python

6 years ago
"""major_projects_grabber.py
Download documents from the NSW DPE Major Projects website.
Example usage:
# Grab a single project modification using its job id, and save in 'files'
6 years ago
major_projects_grabber -i 1746 -o files
6 years ago
# Grab all modifications in search results page, and save in current folder
6 years ago
major_projects_grabber -u "http://majorprojects.planning.nsw.gov.au/index.pl?action=search&authority_id=547"
6 years ago
"""
import os
import re
import sys
import shutil
import logging
import requests
import argparse
import warnings
6 years ago
from lxml import html
from tqdm import tqdm
6 years ago
from requests.exceptions import ConnectionError, InvalidURL
def make_safe(s):
"""Remove characters that would be invalid in a filepath"""
# Remove trailing spaces
s_safe = s.strip()
# Remove '\', '*', '"', '<', '>' '|'
s_safe = re.sub('\\\|\*|"|<|>\|', '', s_safe)
# Replace '/' and ':' with '-'
s_safe = re.sub(':', ' -', s_safe)
s_safe = re.sub('/', '-', s_safe)
return s_safe
def mod_ids_from_search(search_results_url):
"""Get modification job IDs from search results URL"""
# Get HTML of search results page
page = requests.get(search_results_url)
# Create HTML tree
tree = html.fromstring(page.content)
# Find job ids of items in results list
mod_ids = []
mods = tree.xpath('//*[@class="vpa_list"]/tr/td[1]/a')
for mod in mods:
mod_ids.append(re.search('(?<=job_id=)\d+', mod.get('href')).group())
return mod_ids
def get_document_list(mod_id, output_dir):
"""Get list of documents from project modification ID"""
6 years ago
# Get html from mod page
mod_url = ('http://majorprojects.planning.nsw.gov.au/'
'index.pl?action=view_job&job_id=' + mod_id)
mod_page = requests.get(mod_url)
mod_tree = html.fromstring(mod_page.content)
# Get mod details
project_name = mod_tree.xpath(
'//*[@class="vpa_header vpa_header_contrast"]/h2')[0].text
mod_name = mod_tree.xpath(
'//*[@class="vpa_header vpa_header_contrast"]/h1')[0].text
# Get list of document folders
folders = mod_tree.xpath('//div[@class="folder_row"]')
# Remove invalid characters before creating folders
project_name = make_safe(project_name)
mod_name = make_safe(mod_name)
# Create modification folder
mod_dir = os.path.join(output_dir, project_name, mod_name)
try:
os.makedirs(mod_dir, exist_ok=True)
except FileNotFoundError:
# Fix destination path if longer than 255 characters (Windows only)
mod_dir = '\\\\?\\' + os.path.abspath(mod_dir)
os.makedirs(mod_dir, exist_ok=True)
# Add note if no documents are found on portal
if not folders:
txt_name = 'No documents on DPE portal for this modification.txt'
open(os.path.join(mod_dir, txt_name), 'a').close()
# Create link to DPE Major Projects page for current modification
text = """<html>
<meta http-equiv="refresh" content="0; url={}">
</html>""".format(mod_url)
with open(os.path.join(mod_dir, 'DPE-portal-page.html'), 'w') as f:
f.write(text)
document_data = []
6 years ago
for folder in folders:
folder_name = folder.xpath('a[2]')[0].text.strip()
# Get documents in current folder
documents = folder.xpath('ul/li/a')
for document in documents:
doc = {}
doc['url'] = document.get('href')
doc['name'] = document.text
doc['document_path'] = os.path.join(
output_dir, project_name, mod_name, folder_name, doc['name'])
document_data.append(doc)
return document_data
def download_document(url, document_path):
"""Download document from given url"""
# Check if destination path is too long (Windows filename limitation)
try:
open(document_path, 'a').close()
except FileNotFoundError:
document_path = '\\\\?\\' + os.path.abspath(document_path)
# Create output directories as required
os.makedirs(os.path.dirname(document_path), exist_ok=True)
# Check if a non-empty file exists
if os.path.isfile(document_path) and os.path.getsize(document_path) > 0:
pass
else:
try:
# Attempt to download file
r = requests.get(url, stream=True)
except (ConnectionError, InvalidURL):
logging.error(
('Failed to download {4}\n'
' Project: {1}\n'
' Modification: {2}\n'
' Folder: {3}\n').format(*document_path.split(os.sep)))
return
# Write file to disk
with open(document_path, 'wb') as f:
shutil.copyfileobj(r.raw, f)
6 years ago
def main():
example_text = """examples:
# Grab a single project modification using its job id, and save in 'files'
major_projects_grabber -i 1746 -o files
6 years ago
# Grab all modifications in search results page, and save in current folder
major_projects_grabber -u "http://majorprojects.planning.nsw.gov.au/index.pl?action=search&authority_id=547"
6 years ago
"""
# Set up command line arguments
parser = argparse.ArgumentParser(
epilog=example_text,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument(
'-o', metavar='OUTPUT_DIR', default='.', help='root output directory')
parser.add_argument(
'-i',
metavar='ID',
default=[],
help='modification job id(s)',
nargs='*')
6 years ago
parser.add_argument('-u', metavar='URL', help='url of search results page')
# Print usage if no arguments are provided
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)
# Parse arguments
args = parser.parse_args()
search_results_url = args.u
output_dir = args.o
mod_ids = args.i
# Set up log File
os.makedirs(output_dir, exist_ok=True)
log_name = os.path.join(output_dir, 'errors.log')
logging.basicConfig(filename=log_name, level=logging.ERROR)
# Get mod IDs from search results
if search_results_url:
search_mod_ids = mod_ids_from_search(search_results_url)
mod_ids.extend(search_mod_ids)
# Get list of documents from given modification IDs
mod_pbar = tqdm(mod_ids)
for mod_id in mod_pbar:
document_data = get_document_list(mod_id, output_dir)
# Download documents for current modification
doc_pbar = tqdm(document_data)
for doc in doc_pbar:
# Update progress bars
mod_name = doc['document_path'].split(os.sep)[-3]
doc_name = doc['document_path'].split(os.sep)[-1]
mod_pbar.set_description(mod_name)
doc_pbar.set_description(doc_name)
# Download document
download_document(doc['url'], doc['document_path'])
# Tidy up console after tqdm
print('\n')
6 years ago
# Stop logging
logging.shutdown()
with open(log_name, 'r') as f:
log_data = f.read()
# Check contents of log file
if log_data:
warnings.warn(
'Some files failed to download. See log for details.',
stacklevel=2)
else:
os.remove(log_name)
6 years ago
if __name__ == '__main__':
main()