You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

184 lines
5.6 KiB
Python

import io
import os
import subprocess
import pandas as pd
def call_lastools(tool_name, input, output=None, args=None, verbose=True):
"""Send commands to the lastools library.
Requires lastools in system path.
Args:
tool_name: name of lastools binary
input: bytes from stdout, or path to main input data
output: '-stdout' to pipe output, or path to main output data
args: list of additional arguments, formatted for lastools
verbose: show all warnings and messages from lastools (boolean)
Returns:
bytes of output las, if output='-stdout'
None, if output='path/to/las/file'
Examples:
# Convert xyz file to las and pipe stdout to a python bytes object
las_data = call_lastools('txt2las', input='points.xyz', output='-stdout',
args=['-parse', 'sxyz'])
# Clip las_data with a shapefile, and save to a new las file
call_lastools('lasclip', input=las_data, output='points.las',
args=['-poly', 'boundary.shp'])
"""
# Start building command string
cmd = [tool_name]
# Parse input
if type(input) == bytes:
# Pipe input las bytes to stdin
cmd += ['-stdin']
stdin = input
else:
# Load las from file path
cmd += ['-i', input]
stdin = None
# Parse output
if output == '-stdout':
# Pipe output las to stdout
cmd += ['-stdout']
elif output:
# Save output las to file
cmd += ['-o', output]
# Append additional lastools arguments, if provided
if args:
cmd += [str(a) for a in args]
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
stdout, stderr = process.communicate(input=stdin)
# Handle errors, if detected
if process.returncode != 0:
print("Error: {} failed on {}".format(tool_name,
os.path.basename(input)))
print(stderr.decode())
elif verbose:
# Print addional messages if verbose mode is being used
print(stderr.decode())
# Output piped stdout if required
if output == '-stdout':
return stdout
else:
return None
def extract_pts(las_in, cp_in, survey_date, beach, args=None, verbose=True):
"""Extract elevations from a las surface based on x and y coordinates.
Requires lastools in system path.
Args:
las_in: bytes from stdout, or path to main input data
cp_in: point coordinates with columns: id, x, y, z (csv)
survey_date: survey date string, e.g. '19700101'
beach: beach name
args: list of additional arguments, formatted for lastools
verbose: show all warnings and messages from lastools (boolean)
Returns:
Dataframe containing input coordinates with extracted elevations
Examples:
# Extract elevations from 'points.las', using control points from 'cp.csv'
# Specify control point format as: id, x, y, z ('-parse', 'sxyz')
# Only use points classified as 'ground' ('-keep_class', '2')
extract_pts('points.las', 'cp.csv', survey_date='20001231', beach='manly',
args=['-parse', 'sxyz', '-keep_class', '2'])
"""
# Assemble lastools arguments
if args:
args = ['-cp', cp_in] + args
else:
args = ['-cp', cp_in]
las_data = call_lastools(
'lascontrol', input=las_in, output='-stdout', args=args, verbose=False)
# Load result into pandas dataframe
df = pd.read_csv(io.BytesIO(las_data))
# Create empty dataframe if no control points intersect point cloud
if (df.iloc[:, 0] == '-').all():
df = pd.read_csv(cp_in)
df['diff'] = '-'
df['lidar_z'] = '-'
# Tidy up dataframe
df = df.drop(columns=['diff'])
df['lidar_z'] = pd.to_numeric(df['lidar_z'], errors='coerce')
df['Beach'] = beach
df = df[[
'Beach', 'ProfileNum', 'Easting', 'Northing', 'Chainage', 'lidar_z'
]]
# Rename columns
new_names = {
'ProfileNum': 'Profile',
'lidar_z': 'Elevation_{}'.format(survey_date),
}
df = df.rename(columns=new_names)
return df
def update_survey_output(df, output_dir):
"""Update survey profile output csv files with current survey.
Args:
df: dataframe containing current survey elevations
output_dir: directory where csv files are saved
Returns:
True if current survey is latest, otherwise False
"""
# Merge current survey with existing data
profiles = df['Profile'].unique()
for profile in profiles:
csv_name = os.path.join(output_dir, profile + '.csv')
# Extract survey data for current profile
current_profile = df[df['Profile'] == profile]
try:
# Load existing results
master = pd.read_csv(csv_name)
except FileNotFoundError:
master = current_profile.copy()
# Add (or update) current survey
current_survey_col = current_profile.columns[-1]
master[current_survey_col] = current_profile[current_survey_col].values
# Prepare output directory
try:
os.makedirs(output_dir)
except FileExistsError:
pass
# Ensure survey dates are in correct order
elev_cols = sorted([col for col in master.columns if 'Elevation' in col])
other_cols = [col for col in master.columns if 'Elevation' not in col]
master = master[other_cols + elev_cols]
# Export updated results
master.to_csv(csv_name, index=False, float_format='%0.3f')