"""las_manipulation.py Clip, classify, and detect swash zone for an input las file. Example usage: # Process single survey at specific beach python las_manipulation.py survey-1-avoca.yaml # Process single survey at multiple beaches python las_manipulation.py survey-1-avoca.yaml survey-1-pearl.yaml # Process all surveys at specific beach python las_manipulation.py *avoca.yaml # Process all beaches for specific survey date python las_manipulation.py survey-1*.yaml """ import os import sys import yaml import argparse import subprocess from glob import glob from tqdm import tqdm import numpy as np from scipy import interpolate import pandas as pd import geopandas as gpd from shapely.geometry import Point, Polygon from survey_tools import call_lastools def remove_problems(x_list, y_list, z_list, x_now, y_now, check_value): z_ave=nine_pt_moving_average(z_list) deriv_ave, chainage=forward_derivative(x_list, y_list, z_ave) deriv_real, chainage=two_point_derivative(x_list, y_list, z_list) #first find the reference contour on the beach #index_contour, x_now, y_now, distance=find_beach_reference_contour_choose_closest(chainage, z_ave, x_list, y_list, x_now, y_now,deriv_ave, check_value) index_contour, x_now, y_now, distance=find_beach_reference_contour(chainage, z_ave, x_list, y_list, x_now, y_now,deriv_ave,deriv_real,check_value) if index_contournat_dev: break else: i=index_contour z_return=z_list[0:i] chainage_return=chainage[0:i] return z_return, chainage_return, x_now, y_now, distance def two_point_derivative(x_list, y_list, z_list): chain=[((x_list[0]-x_list[i])**2+(y_list[0]-y_list[i])**2)**0.5 for i in range(0,len(x_list))] deriv=[(z_list[i+1]-z_list[i-1])/(chain[i+1]-chain[i-1]) for i in range(1, len(z_list)-1)] deriv.insert(0,0) deriv.append(0) return deriv, chain def forward_derivative(x_list, y_list, z_list): chain=[((x_list[0]-x_list[i])**2+(y_list[0]-y_list[i])**2)**0.5 for i in range(0,len(x_list))] deriv=[(z_list[i]-z_list[i-1])/(chain[i]-chain[i-1]) for i in range(0, len(z_list)-1)] deriv.insert(0,0) return deriv, chain def find_first_over_reference(z_list, value): i=len(z_list)-1 while i>0 and z_list[i]len(z_list)-5: ave=np.mean([z_list[j] for j in range(i-4,len(z_list))]) else: ave=np.mean([z_list[j] for j in range(i-4,i+5)]) move_ave.append(ave) i=i+1 return move_ave def find_neg_derivative(z_list, deriv_list): i=len(z_list)-5 while z_list[i]>=0 and z_list[i+1]>=0 and z_list[i+2]>=0 and z_list[i+3]>=0 and z_list[i+4]>=0: i=i-1 return i def find_beach_reference_contour_choose_closest(chain_list, z_ave_list, x_list, y_list, x_last, y_last, deriv_ave_list, check_value): #note that z_list should be the 9 point moving average #assumes that beaches are shallow (|derivative|<0.3), sloping and between 0 - 4 m AHD i=0 choice_list=[] distance_list=[] if z_ave_list[i]>check_value: state_now='over' else: state_now='under' while icheck_value: #only keep if it is downward sloping state_now='over' elif state_now=='over' and z_ave_list[i]0 and x_last==None: #choose the first time for the first point i=choice_list[0] distance=0 elif len(choice_list)>0 and x_last!=None: assert(len(choice_list)==len(distance_list)) i=choice_list[distance_list.index(min(distance_list))] distance=min(distance_list) if i>=len(x_list): i=len(x_list)-1 if x_last!=None: distance=((x_last - x_list[i])**2+(y_last - y_list[i])**2)**0.5 else: distance=0 x=x_list[i] y=y_list[i] return i, x, y, distance def find_beach_reference_contour(chain_list, z_ave_list, x_list, y_list, x_last, y_last, deriv_ave_list,deriv_real_list, check_value): #note that z_list should be the 9 point moving average #assumes that beaches are shallow (|derivative|<0.3), sloping and between 0 - 4 m AHD i=len(z_ave_list)-1 while i>=0 and (z_ave_list[i]>check_value+2 or z_ave_list[i]0 or max([abs(i) for i in deriv_real_list[max(0,i-7):i]]+[0])>0.3):#beaches are shallow sloping, low i=i-1 #find the first time it gets to check_value after this while i>=0 and z_ave_list[i]0 and z_ave_list[i]10: Z_set, chainage_tmp, temp_x, temp_y, distance=remove_problems(X_tmp, Y_tmp, Z_tmp,tmp_x_last, tmp_y_last, set_check_value) #except: else: Z_set=Z_tmp temp_x=X_tmp[len(Z_set)-1] temp_y=Y_tmp[len(Z_set)-1] distance=0 distance_2_old=distance_point_to_poly(X_tmp, Y_tmp, temp_x, temp_y) if distance_2_old10: Z_set, chainage_tmp, temp_x, temp_y, distance=remove_problems(X_tmp, Y_tmp, Z_tmp,tmp_x_last, tmp_y_last, set_check_value) #except: else: Z_set=Z_tmp temp_x=X_tmp[len(Z_set)-1] temp_y=Y_tmp[len(Z_set)-1] distance=0 X_set=X_tmp[0:len(Z_set)] Y_set=Y_tmp[0:len(Z_set)] #write to new data frame #if len(Z_set)>0: # for i in range(0, len(Z_set)): # processed_data =processed_data.append({'X':X_set[i],'Y':Y_set[i],'Z':Z_set[i],'r':r_set[i],'g':g_set[i],'b':b_set[i]}, ignore_index=True) #add to crop line distance_2_old=distance_point_to_poly(X_tmp, Y_tmp, temp_x, temp_y) if distance_2_old