You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

460 lines
16 KiB
Python

"""las_manipulation.py
Clip, classify, and detect swash zone for an input las file.
Example usage:
# Process single survey at specific beach
python las_manipulation.py survey-1-avoca.yaml
# Process single survey at multiple beaches
python las_manipulation.py survey-1-avoca.yaml survey-1-pearl.yaml
# Process all surveys at specific beach
python las_manipulation.py *avoca.yaml
# Process all beaches for specific survey date
python las_manipulation.py survey-1*.yaml
"""
import os
import sys
import yaml
import argparse
import subprocess
from glob import glob
from tqdm import tqdm
import numpy as np
from scipy import interpolate
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, Polygon
from survey_tools import call_lastools
def remove_problems(x_list, y_list, z_list, x_now, y_now, check_value):
z_ave=nine_pt_moving_average(z_list)
deriv_ave, chainage=forward_derivative(x_list, y_list, z_ave)
deriv_real, chainage=two_point_derivative(x_list, y_list, z_list)
#first find the reference contour on the beach
#index_contour, x_now, y_now, distance=find_beach_reference_contour_choose_closest(chainage, z_ave, x_list, y_list, x_now, y_now,deriv_ave, check_value)
index_contour, x_now, y_now, distance=find_beach_reference_contour(chainage, z_ave, x_list, y_list, x_now, y_now,deriv_ave,deriv_real,check_value)
if index_contour<len(chainage): #other wise keep everthing
#find the beach slope, get the interpolated line (beach line) and the index of the reference contour +1
beach_slope, beach_line, index_high=find_beach_slope(chainage, z_ave,index_contour, check_value)
#find the natural deviation of the lower beach
nat_dev=get_natural_deviation(chainage, z_list, index_contour, index_high, beach_line)
for i in range(index_contour, len(z_list)):
if abs(z_list[i]-float(beach_line(chainage[i])))>nat_dev:
break
else:
i=index_contour
z_return=z_list[0:i]
chainage_return=chainage[0:i]
return z_return, chainage_return, x_now, y_now, distance
def two_point_derivative(x_list, y_list, z_list):
chain=[((x_list[0]-x_list[i])**2+(y_list[0]-y_list[i])**2)**0.5 for i in range(0,len(x_list))]
deriv=[(z_list[i+1]-z_list[i-1])/(chain[i+1]-chain[i-1]) for i in range(1, len(z_list)-1)]
deriv.insert(0,0)
deriv.append(0)
return deriv, chain
def forward_derivative(x_list, y_list, z_list):
chain=[((x_list[0]-x_list[i])**2+(y_list[0]-y_list[i])**2)**0.5 for i in range(0,len(x_list))]
deriv=[(z_list[i]-z_list[i-1])/(chain[i]-chain[i-1]) for i in range(0, len(z_list)-1)]
deriv.insert(0,0)
return deriv, chain
def find_first_over_reference(z_list, value):
i=len(z_list)-1
while i>0 and z_list[i]<value:
i=i-1
return i
def nine_pt_moving_average(z_list):
i=0
move_ave=[]
while i<len(z_list):
if i<5:
ave=np.mean([z_list[j] for j in range(0,i+5)])
elif i>len(z_list)-5:
ave=np.mean([z_list[j] for j in range(i-4,len(z_list))])
else:
ave=np.mean([z_list[j] for j in range(i-4,i+5)])
move_ave.append(ave)
i=i+1
return move_ave
def find_neg_derivative(z_list, deriv_list):
i=len(z_list)-5
while z_list[i]>=0 and z_list[i+1]>=0 and z_list[i+2]>=0 and z_list[i+3]>=0 and z_list[i+4]>=0:
i=i-1
return i
def find_beach_reference_contour_choose_closest(chain_list, z_ave_list, x_list, y_list, x_last, y_last, deriv_ave_list, check_value):
#note that z_list should be the 9 point moving average
#assumes that beaches are shallow (|derivative|<0.3), sloping and between 0 - 4 m AHD
i=0
choice_list=[]
distance_list=[]
if z_ave_list[i]>check_value:
state_now='over'
else:
state_now='under'
while i<len(z_ave_list):
if state_now=='under' and z_ave_list[i]>check_value: #only keep if it is downward sloping
state_now='over'
elif state_now=='over' and z_ave_list[i]<check_value:
choice_list.append(i)
state_now='under'
if x_last!=None:
distance_list.append(((x_last - x_list[i])**2+(y_last - y_list[i])**2)**0.5)
i=i+1
if len(choice_list)>0 and x_last==None: #choose the first time for the first point
i=choice_list[0]
distance=0
elif len(choice_list)>0 and x_last!=None:
assert(len(choice_list)==len(distance_list))
i=choice_list[distance_list.index(min(distance_list))]
distance=min(distance_list)
if i>=len(x_list):
i=len(x_list)-1
if x_last!=None:
distance=((x_last - x_list[i])**2+(y_last - y_list[i])**2)**0.5
else:
distance=0
x=x_list[i]
y=y_list[i]
return i, x, y, distance
def find_beach_reference_contour(chain_list, z_ave_list, x_list, y_list, x_last, y_last, deriv_ave_list,deriv_real_list, check_value):
#note that z_list should be the 9 point moving average
#assumes that beaches are shallow (|derivative|<0.3), sloping and between 0 - 4 m AHD
i=len(z_ave_list)-1
while i>=0 and (z_ave_list[i]>check_value+2 or z_ave_list[i]<check_value-2 or deriv_ave_list[i]>0 or max([abs(i) for i in deriv_real_list[max(0,i-7):i]]+[0])>0.3):#beaches are shallow sloping, low
i=i-1
#find the first time it gets to check_value after this
while i>=0 and z_ave_list[i]<check_value:
i=i-1
if i==0:
i=len(z_ave_list)-1 # the whole this is above the beach
if x_last!=None:
distance=((x_last - x_list[i])**2+(y_last - y_list[i])**2)**0.5
else:
distance=0
x=x_list[i]
y=y_list[i]
return i, x, y, distance
def find_beach_slope(chain_list, z_ave_list, ref_index, check_value):
#ref index is the index of the check value
#find the beach slope between this point and 1 m above this point
i=ref_index
while i>0 and z_ave_list[i]<check_value+1:
i=i-1
# Hide numpy floating point arithmetic warnings
np.seterr(all='ignore')
slope=(z_ave_list[i]-z_ave_list[ref_index])/(chain_list[i]-chain_list[ref_index])
# Show numpy floating point arithmetic warnings
np.seterr(all=None)
beach_ave=interpolate.interp1d([min(chain_list),max(chain_list)], [(min(chain_list)-chain_list[ref_index])*slope+z_ave_list[ref_index], (z_ave_list[ref_index]-(chain_list[ref_index]-max(chain_list))*slope)])
return slope, beach_ave, i
def get_natural_deviation(chain_list, z_list, ref_index, ref_high, beach_ave):
#for the points considered to be on the beach (reference contour to reference contour +1), find the average natural deviation
deviation=[]
for i in range(ref_high, ref_index+1):
dev_tmp=abs(z_list[i] - float(beach_ave(chain_list[i])))
deviation.append(dev_tmp)
natural_deviation=min(np.max(deviation),0.4) #THIS MAY BE TOO CONSERVATIVE
return natural_deviation
def distance_point_to_poly(x_list, y_list, x_now, y_now):
#make a line from the mid of x_list, y_list
end=Point(x_list[-1], y_list[-1])
point=Point(x_now, y_now)
dist=point.distance(end)
return dist
def polygon_wave_runup(xyz_1m, direction, shp_name, set_check_value, distance_check, zone):
#print('starting processing of wave runup')
all_data=pd.read_csv(xyz_1m, header=None, names=['X','Y','Z'])
if direction=='north_south':
all_data_sorted=all_data.sort_values(by=['X', 'Y'], ascending=[1,0])
elif direction=='west_east':
all_data_sorted=all_data.sort_values(by=['Y', 'X'], ascending=[0,1])
fixed_now=0
a=0
X_tmp=[]
processed_data = pd.DataFrame(columns=['X','Y','Z'])
list_to_print=[10,20,30,40,50,60,70,80,90]
crop_line=[]
top_line=[]
tmp_x_last=None
tmp_y_last=None
exceed_list=[]
# Create progress bar
pbar = tqdm(all_data_sorted.iterrows(), total=all_data_sorted.shape[0])
for index, line in pbar:
a=a+1
percent_done=round(a/len(all_data_sorted)*100,1)
if percent_done in list_to_print:
#print("Finished %s%% of the processing" % percent_done)
list_to_print=list_to_print[1:len(list_to_print)]
if direction=='north_south':
check_this=line['X']
elif direction=='west_east':
check_this=line['Y']
if check_this==fixed_now:
X_tmp.append(line['X'])
Y_tmp.append(line['Y'])
Z_tmp.append(line['Z'])
else:
if len(X_tmp)!=0:
#try: ########may need to change!~!
if len(X_tmp)>10:
Z_set, chainage_tmp, temp_x, temp_y, distance=remove_problems(X_tmp, Y_tmp, Z_tmp,tmp_x_last, tmp_y_last, set_check_value)
#except:
else:
Z_set=Z_tmp
temp_x=X_tmp[len(Z_set)-1]
temp_y=Y_tmp[len(Z_set)-1]
distance=0
distance_2_old=distance_point_to_poly(X_tmp, Y_tmp, temp_x, temp_y)
if distance_2_old<distance_check: # find a way to change so it is checking the distance from the first crop polyogn, concave_now.buffer(buffer)
tmp_x_last=temp_x
tmp_y_last=temp_y
crop_line.append([X_tmp[len(Z_set)-1], Y_tmp[len(Z_set)-1]])
top_line.append([X_tmp[0], Y_tmp[0]])
#otherwise crop by the distance_check
else:
exceed_list.append(1)
try:
tmp_x_last=X_tmp[len(X_tmp)-distance_check] #beacuse this is a 1m DSM, this works
tmp_y_last=Y_tmp[len(Y_tmp)-distance_check]
crop_line.append([tmp_x_last, tmp_y_last])
top_line.append([X_tmp[0], Y_tmp[0]])
except:
print('problem with the last crop point, keeping whole line')
crop_line.append([X_tmp[-1], Y_tmp[-1]])
top_line.append([X_tmp[0], Y_tmp[0]])
if direction=='north_south':
fixed_now=line['X']
elif direction=='west_east':
fixed_now=line['Y']
X_tmp=[line['X']]
Y_tmp=[line['Y']]
Z_tmp=[line['Z']]
else:
if direction=='north_south':
fixed_now=line['X']
elif direction=='west_east':
fixed_now=line['Y']
X_tmp=[line['X']]
Y_tmp=[line['Y']]
Z_tmp=[line['Z']]
#for the last line
derivative, chainage=forward_derivative(X_tmp, Y_tmp, Z_tmp)
if len(X_tmp)>10:
Z_set, chainage_tmp, temp_x, temp_y, distance=remove_problems(X_tmp, Y_tmp, Z_tmp,tmp_x_last, tmp_y_last, set_check_value)
#except:
else:
Z_set=Z_tmp
temp_x=X_tmp[len(Z_set)-1]
temp_y=Y_tmp[len(Z_set)-1]
distance=0
X_set=X_tmp[0:len(Z_set)]
Y_set=Y_tmp[0:len(Z_set)]
#write to new data frame
#if len(Z_set)>0:
# for i in range(0, len(Z_set)):
# processed_data =processed_data.append({'X':X_set[i],'Y':Y_set[i],'Z':Z_set[i],'r':r_set[i],'g':g_set[i],'b':b_set[i]}, ignore_index=True)
#add to crop line
distance_2_old=distance_point_to_poly(X_tmp, Y_tmp, temp_x, temp_y)
if distance_2_old<distance_check: # find a way to change so it is checking the distance from the first crop polyogn, concave_now.buffer(buffer)
tmp_x_last=temp_x
tmp_y_last=temp_y
crop_line.append([X_tmp[len(Z_set)-1], Y_tmp[len(Z_set)-1]])
top_line.append([X_tmp[0], Y_tmp[0]])
#otherwise crop by the distance_check
else:
exceed_list.append(1)
tmp_x_last=X_tmp[len(X_tmp)-distance_check]
tmp_y_last=Y_tmp[len(Y_tmp)-distance_check]
crop_line.append(tmp_x_last, tmp_y_last)
top_line.append([X_tmp[0], Y_tmp[0]])
#otherwise dont add. straight line is better
if direction=='north_south':
y_filtered=nine_pt_moving_average([i[1] for i in crop_line])
crop_new=[[crop_line[i][0],y_filtered[i]] for i in range(0, len(crop_line))]
elif direction=='west_east':
x_filtered=nine_pt_moving_average([i[0] for i in crop_line])
crop_new=[[x_filtered[i],crop_line[i][1]] for i in range(0, len(crop_line))]
for_shape=crop_new+top_line[::-1]
for_shape.append(crop_new[0])
#print('exceeded the manual distance_check %s%% of the time. manually cropping undertaken' % (round(len(exceed_list)/a,2)*100))
#making the cropping shapefile
#print('making the crop polygon')
# Simplify polygon to remove invalid geometry
#geom = Polygon(for_shape).simplify(10)
geom = Polygon(for_shape)
# Export polygon as shapefile
df = gpd.GeoDataFrame(geometry=[geom])
df.crs = {'init': 'epsg:283{}'.format(zone), 'no_defs': True}
df.to_file(shp_name, driver='ESRI Shapefile')
return None
def remove_temp_files(directory):
for f in os.listdir(directory):
os.unlink(os.path.join(directory, f))
return None
def process(yaml_file):
with open(yaml_file, 'r') as f:
params = yaml.safe_load(f)
print("Starting to process %s" % params['BEACH'])
input_las = params['INPUT LAS']
initial_crop_poly = params['INITIAL CROP POLY']
lasground_step = params['LASGROUND STEP']
zone_MGA = params['ZONE MGA']
check_value = params['CHECK VALUE']
direct = params['DIRECTION']
check_distance = params['CHECK DISTANCE']
las_dir = params['LAS CLASSIFIED FOLDER']
shp_dir = params['SHP SWASH FOLDER']
tmp_dir = params['TMP FOLDER']
survey_date=params['SURVEY DATE']
beach=params['BEACH']
# Get base name of input las
#las_basename = os.path.splitext(os.path.basename(input_las))[0]
las_basename='%s_%s' % (beach.lower().replace(" ","_"), survey_date)
# # Crop to beach boundary
print('Clipping...')
las_clipped_name = os.path.join(tmp_dir, las_basename + '_clipped.las')
call_lastools('lasclip', input=input_las, output=las_clipped_name,
args=['-poly', initial_crop_poly], verbose=False)
# Classify ground points
print('Classifying ground...')
las_classified_name = os.path.join(las_dir, las_basename + '.las')
call_lastools('lasground_new', input=las_clipped_name, output=las_classified_name,
args=['-step', lasground_step], verbose=False)
# Interpolate point cloud onto a grid
print('Interpolating to grid...')
xyz_name = os.path.join(tmp_dir, las_basename + '.xyz')
call_lastools('blast2dem', input=las_classified_name, output=xyz_name,
args=['-step', 1], verbose=False)
# Make runup clipping mask from gridded point cloud
print('Calculating runup clipping mask...')
shp_name = os.path.join(shp_dir, las_basename + '.shp')
polygon_wave_runup(xyz_name, direct, shp_name, check_value, check_distance, zone_MGA)
#NOTE THAT YOU NEED TO CHECK THE OUTPUT SHP FILE AND ADJUST AS REQUIRED
#delete the temp files
remove_temp_files(tmp_dir)
def main():
example_text = """examples:
# Process single survey at specific beach
python las_manipulation.py survey-1-avoca.yaml
# Process single survey at multiple beaches
python las_manipulation.py survey-1-avoca.yaml survey-1-pearl.yaml
# Process all surveys at specific beach
python las_manipulation.py *avoca.yaml
# Process all beaches for specific survey date
python las_manipulation.py survey-1*.yaml
"""
# Set up command line arguments
parser = argparse.ArgumentParser(
epilog=example_text,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('input', help='path to yaml file(s)', nargs='*')
# Print usage if no arguments are provided
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)
# Parse arguments
args = parser.parse_args()
yaml_files = []
[yaml_files.extend(glob(f)) for f in args.input]
for yaml_file in yaml_files:
process(yaml_file)
if __name__ == '__main__':
main()