import os
import sys
import argparse
import subprocess
from tqdm import tqdm
import numpy as np
from scipy import interpolate
import pandas as pd
import geopandas as gpd
from shapely . geometry import Point , Polygon
from survey_tools import call_lastools
def remove_problems ( x_list , y_list , z_list , x_now , y_now , check_value ) :
z_ave = nine_pt_moving_average ( z_list )
deriv_ave , chainage = forward_derivative ( x_list , y_list , z_ave )
deriv_real , chainage = two_point_derivative ( x_list , y_list , z_list )
#first find the reference contour on the beach
#index_contour, x_now, y_now, distance=find_beach_reference_contour_choose_closest(chainage, z_ave, x_list, y_list, x_now, y_now,deriv_ave, check_value)
index_contour , x_now , y_now , distance = find_beach_reference_contour ( chainage , z_ave , x_list , y_list , x_now , y_now , deriv_ave , deriv_real , check_value )
if index_contour < len ( chainage ) : #other wise keep everthing
#find the beach slope, get the interpolated line (beach line) and the index of the reference contour +1
beach_slope , beach_line , index_high = find_beach_slope ( chainage , z_ave , index_contour , check_value )
#find the natural deviation of the lower beach
nat_dev = get_natural_deviation ( chainage , z_list , index_contour , index_high , beach_line )
for i in range ( index_contour , len ( z_list ) ) :
if abs ( z_list [ i ] - float ( beach_line ( chainage [ i ] ) ) ) > nat_dev :
break
else :
i = index_contour
z_return = z_list [ 0 : i ]
chainage_return = chainage [ 0 : i ]
return z_return , chainage_return , x_now , y_now , distance
def two_point_derivative ( x_list , y_list , z_list ) :
chain = [ ( ( x_list [ 0 ] - x_list [ i ] ) * * 2 + ( y_list [ 0 ] - y_list [ i ] ) * * 2 ) * * 0.5 for i in range ( 0 , len ( x_list ) ) ]
deriv = [ ( z_list [ i + 1 ] - z_list [ i - 1 ] ) / ( chain [ i + 1 ] - chain [ i - 1 ] ) for i in range ( 1 , len ( z_list ) - 1 ) ]
deriv . insert ( 0 , 0 )
deriv . append ( 0 )
return deriv , chain
def forward_derivative ( x_list , y_list , z_list ) :
chain = [ ( ( x_list [ 0 ] - x_list [ i ] ) * * 2 + ( y_list [ 0 ] - y_list [ i ] ) * * 2 ) * * 0.5 for i in range ( 0 , len ( x_list ) ) ]
deriv = [ ( z_list [ i ] - z_list [ i - 1 ] ) / ( chain [ i ] - chain [ i - 1 ] ) for i in range ( 0 , len ( z_list ) - 1 ) ]
deriv . insert ( 0 , 0 )
return deriv , chain
def find_first_over_reference ( z_list , value ) :
i = len ( z_list ) - 1
while i > 0 and z_list [ i ] < value :
i = i - 1
return i
def nine_pt_moving_average ( z_list ) :
i = 0
move_ave = [ ]
while i < len ( z_list ) :
if i < 5 :
ave = np . mean ( [ z_list [ j ] for j in range ( 0 , i + 5 ) ] )
elif i > len ( z_list ) - 5 :
ave = np . mean ( [ z_list [ j ] for j in range ( i - 4 , len ( z_list ) ) ] )
else :
ave = np . mean ( [ z_list [ j ] for j in range ( i - 4 , i + 5 ) ] )
move_ave . append ( ave )
i = i + 1
return move_ave
def find_neg_derivative ( z_list , deriv_list ) :
i = len ( z_list ) - 5
while z_list [ i ] > = 0 and z_list [ i + 1 ] > = 0 and z_list [ i + 2 ] > = 0 and z_list [ i + 3 ] > = 0 and z_list [ i + 4 ] > = 0 :
i = i - 1
return i
def find_beach_reference_contour_choose_closest ( chain_list , z_ave_list , x_list , y_list , x_last , y_last , deriv_ave_list , check_value ) :
#note that z_list should be the 9 point moving average
#assumes that beaches are shallow (|derivative|<0.3), sloping and between 0 - 4 m AHD
i = 0
choice_list = [ ]
distance_list = [ ]
if z_ave_list [ i ] > check_value :
state_now = ' over '
else :
state_now = ' under '
while i < len ( z_ave_list ) :
if state_now == ' under ' and z_ave_list [ i ] > check_value : #only keep if it is downward sloping
state_now = ' over '
elif state_now == ' over ' and z_ave_list [ i ] < check_value :
choice_list . append ( i )
state_now = ' under '
if x_last != None :
distance_list . append ( ( ( x_last - x_list [ i ] ) * * 2 + ( y_last - y_list [ i ] ) * * 2 ) * * 0.5 )
i = i + 1
if len ( choice_list ) > 0 and x_last == None : #choose the first time for the first point
i = choice_list [ 0 ]
distance = 0
elif len ( choice_list ) > 0 and x_last != None :
assert ( len ( choice_list ) == len ( distance_list ) )
i = choice_list [ distance_list . index ( min ( distance_list ) ) ]
distance = min ( distance_list )
if i > = len ( x_list ) :
i = len ( x_list ) - 1
if x_last != None :
distance = ( ( x_last - x_list [ i ] ) * * 2 + ( y_last - y_list [ i ] ) * * 2 ) * * 0.5
else :
distance = 0
x = x_list [ i ]
y = y_list [ i ]
return i , x , y , distance
def find_beach_reference_contour ( chain_list , z_ave_list , x_list , y_list , x_last , y_last , deriv_ave_list , deriv_real_list , check_value ) :
#note that z_list should be the 9 point moving average
#assumes that beaches are shallow (|derivative|<0.3), sloping and between 0 - 4 m AHD
i = len ( z_ave_list ) - 1
while i > = 0 and ( z_ave_list [ i ] > check_value + 2 or z_ave_list [ i ] < check_value - 2 or deriv_ave_list [ i ] > 0 or max ( [ abs ( i ) for i in deriv_real_list [ max ( 0 , i - 7 ) : i ] ] + [ 0 ] ) > 0.3 ) : #beaches are shallow sloping, low
i = i - 1
#find the first time it gets to check_value after this
while i > = 0 and z_ave_list [ i ] < check_value :
i = i - 1
if i == 0 :
i = len ( z_ave_list ) - 1 # the whole this is above the beach
if x_last != None :
distance = ( ( x_last - x_list [ i ] ) * * 2 + ( y_last - y_list [ i ] ) * * 2 ) * * 0.5
else :
distance = 0
x = x_list [ i ]
y = y_list [ i ]
return i , x , y , distance
def find_beach_slope ( chain_list , z_ave_list , ref_index , check_value ) :
#ref index is the index of the check value
#find the beach slope between this point and 1 m above this point
i = ref_index
while i > 0 and z_ave_list [ i ] < check_value + 1 :
i = i - 1
# Hide numpy floating point arithmetic warnings
np . seterr ( all = ' ignore ' )
slope = ( z_ave_list [ i ] - z_ave_list [ ref_index ] ) / ( chain_list [ i ] - chain_list [ ref_index ] )
# Show numpy floating point arithmetic warnings
np . seterr ( all = None )
beach_ave = interpolate . interp1d ( [ min ( chain_list ) , max ( chain_list ) ] , [ ( min ( chain_list ) - chain_list [ ref_index ] ) * slope + z_ave_list [ ref_index ] , ( z_ave_list [ ref_index ] - ( chain_list [ ref_index ] - max ( chain_list ) ) * slope ) ] )
return slope , beach_ave , i
def get_natural_deviation ( chain_list , z_list , ref_index , ref_high , beach_ave ) :
#for the points considered to be on the beach (reference contour to reference contour +1), find the average natural deviation
deviation = [ ]
for i in range ( ref_high , ref_index + 1 ) :
dev_tmp = abs ( z_list [ i ] - float ( beach_ave ( chain_list [ i ] ) ) )
deviation . append ( dev_tmp )
natural_deviation = min ( np . max ( deviation ) , 0.4 ) #THIS MAY BE TOO CONSERVATIVE
return natural_deviation
def distance_point_to_poly ( x_list , y_list , x_now , y_now ) :
#make a line from the mid of x_list, y_list
end = Point ( x_list [ - 1 ] , y_list [ - 1 ] )
point = Point ( x_now , y_now )
dist = point . distance ( end )
return dist
def polygon_wave_runup ( xyz_1m , direction , shp_name , set_check_value , distance_check , zone ) :
#print('starting processing of wave runup')
all_data = pd . read_csv ( xyz_1m , header = None , names = [ ' X ' , ' Y ' , ' Z ' ] )
if direction == ' north_south ' :
all_data_sorted = all_data . sort_values ( by = [ ' X ' , ' Y ' ] , ascending = [ 1 , 0 ] )
elif direction == ' west_east ' :
all_data_sorted = all_data . sort_values ( by = [ ' Y ' , ' X ' ] , ascending = [ 0 , 1 ] )
fixed_now = 0
a = 0
X_tmp = [ ]
processed_data = pd . DataFrame ( columns = [ ' X ' , ' Y ' , ' Z ' ] )
list_to_print = [ 10 , 20 , 30 , 40 , 50 , 60 , 70 , 80 , 90 ]
crop_line = [ ]
top_line = [ ]
tmp_x_last = None
tmp_y_last = None
exceed_list = [ ]
# Create progress bar
pbar = tqdm ( all_data_sorted . iterrows ( ) , total = all_data_sorted . shape [ 0 ] )
for index , line in pbar :
a = a + 1
percent_done = round ( a / len ( all_data_sorted ) * 100 , 1 )
if percent_done in list_to_print :
#print("Finished %s%% of the processing" % percent_done)
list_to_print = list_to_print [ 1 : len ( list_to_print ) ]
if direction == ' north_south ' :
check_this = line [ ' X ' ]
elif direction == ' west_east ' :
check_this = line [ ' Y ' ]
if check_this == fixed_now :
X_tmp . append ( line [ ' X ' ] )
Y_tmp . append ( line [ ' Y ' ] )
Z_tmp . append ( line [ ' Z ' ] )
else :
if len ( X_tmp ) != 0 :
#try: ########may need to change!~!
if len ( X_tmp ) > 10 :
Z_set , chainage_tmp , temp_x , temp_y , distance = remove_problems ( X_tmp , Y_tmp , Z_tmp , tmp_x_last , tmp_y_last , set_check_value )
#except:
else :
Z_set = Z_tmp
temp_x = X_tmp [ len ( Z_set ) - 1 ]
temp_y = Y_tmp [ len ( Z_set ) - 1 ]
distance = 0
distance_2_old = distance_point_to_poly ( X_tmp , Y_tmp , temp_x , temp_y )
if distance_2_old < distance_check : # find a way to change so it is checking the distance from the first crop polyogn, concave_now.buffer(buffer)
tmp_x_last = temp_x
tmp_y_last = temp_y
crop_line . append ( [ X_tmp [ len ( Z_set ) - 1 ] , Y_tmp [ len ( Z_set ) - 1 ] ] )
top_line . append ( [ X_tmp [ 0 ] , Y_tmp [ 0 ] ] )
#otherwise crop by the distance_check
else :
exceed_list . append ( 1 )
try :
tmp_x_last = X_tmp [ len ( X_tmp ) - distance_check ] #beacuse this is a 1m DSM, this works
tmp_y_last = Y_tmp [ len ( Y_tmp ) - distance_check ]
crop_line . append ( [ tmp_x_last , tmp_y_last ] )
top_line . append ( [ X_tmp [ 0 ] , Y_tmp [ 0 ] ] )
except :
print ( ' problem with the last crop point, keeping whole line ' )
crop_line . append ( [ X_tmp [ - 1 ] , Y_tmp [ - 1 ] ] )
top_line . append ( [ X_tmp [ 0 ] , Y_tmp [ 0 ] ] )
if direction == ' north_south ' :
fixed_now = line [ ' X ' ]
elif direction == ' west_east ' :
fixed_now = line [ ' Y ' ]
X_tmp = [ line [ ' X ' ] ]
Y_tmp = [ line [ ' Y ' ] ]
Z_tmp = [ line [ ' Z ' ] ]
else :
if direction == ' north_south ' :
fixed_now = line [ ' X ' ]
elif direction == ' west_east ' :
fixed_now = line [ ' Y ' ]
X_tmp = [ line [ ' X ' ] ]
Y_tmp = [ line [ ' Y ' ] ]
Z_tmp = [ line [ ' Z ' ] ]
#for the last line
derivative , chainage = forward_derivative ( X_tmp , Y_tmp , Z_tmp )
if len ( X_tmp ) > 10 :
Z_set , chainage_tmp , temp_x , temp_y , distance = remove_problems ( X_tmp , Y_tmp , Z_tmp , tmp_x_last , tmp_y_last , set_check_value )
#except:
else :
Z_set = Z_tmp
temp_x = X_tmp [ len ( Z_set ) - 1 ]
temp_y = Y_tmp [ len ( Z_set ) - 1 ]
distance = 0
X_set = X_tmp [ 0 : len ( Z_set ) ]
Y_set = Y_tmp [ 0 : len ( Z_set ) ]
#write to new data frame
#if len(Z_set)>0:
# for i in range(0, len(Z_set)):
# processed_data =processed_data.append({'X':X_set[i],'Y':Y_set[i],'Z':Z_set[i],'r':r_set[i],'g':g_set[i],'b':b_set[i]}, ignore_index=True)
#add to crop line
distance_2_old = distance_point_to_poly ( X_tmp , Y_tmp , temp_x , temp_y )
if distance_2_old < distance_check : # find a way to change so it is checking the distance from the first crop polyogn, concave_now.buffer(buffer)
tmp_x_last = temp_x
tmp_y_last = temp_y
crop_line . append ( [ X_tmp [ len ( Z_set ) - 1 ] , Y_tmp [ len ( Z_set ) - 1 ] ] )
top_line . append ( [ X_tmp [ 0 ] , Y_tmp [ 0 ] ] )
#otherwise crop by the distance_check
else :
exceed_list . append ( 1 )
tmp_x_last = X_tmp [ len ( X_tmp ) - distance_check ]
tmp_y_last = Y_tmp [ len ( Y_tmp ) - distance_check ]
crop_line . append ( tmp_x_last , tmp_y_last )
top_line . append ( [ X_tmp [ 0 ] , Y_tmp [ 0 ] ] )
#otherwise dont add. straight line is better
if direction == ' north_south ' :
y_filtered = nine_pt_moving_average ( [ i [ 1 ] for i in crop_line ] )
crop_new = [ [ crop_line [ i ] [ 0 ] , y_filtered [ i ] ] for i in range ( 0 , len ( crop_line ) ) ]
elif direction == ' west_east ' :
x_filtered = nine_pt_moving_average ( [ i [ 0 ] for i in crop_line ] )
crop_new = [ [ x_filtered [ i ] , crop_line [ i ] [ 1 ] ] for i in range ( 0 , len ( crop_line ) ) ]
for_shape = crop_new + top_line [ : : - 1 ]
for_shape . append ( crop_new [ 0 ] )
#print('exceeded the manual distance_check %s%% of the time. manually cropping undertaken' % (round(len(exceed_list)/a,2)*100))
#making the cropping shapefile
#print('making the crop polygon')
# Export polygon as shapefile
df = gpd . GeoDataFrame ( geometry = [ Polygon ( for_shape ) ] )
df . crs = { ' init ' : ' epsg:283 {} ' . format ( zone ) , ' no_defs ' : True }
df . to_file ( shp_name , driver = ' ESRI Shapefile ' )
return None
def remove_temp_files ( directory ) :
for f in os . listdir ( directory ) :
os . unlink ( os . path . join ( directory , f ) )
return None
def main ( ) :
parser = argparse . ArgumentParser ( )
parser . add_argument (
' input_file ' ,
metavar = ' PARAMS_FILE ' ,
help = ' name of parameter file ' ,
default = None )
# Print usage if no arguments are provided
if len ( sys . argv ) == 1 :
parser . print_help ( sys . stderr )
sys . exit ( 1 )
args = parser . parse_args ( )
# read the parameters file and scroll through it
input_file = args . input_file
params_file = pd . read_excel ( input_file , sheet_name = " PARAMS " )
for i , row in params_file . iterrows ( ) :
print ( " Starting to process %s " % row [ ' Beach ' ] )
input_las = row [ ' INPUT LAS ' ]
initial_crop_poly = row [ ' INITIAL CROP POLY ' ]
lasground_step = row [ ' LASGROUND STEP ' ]
zone_MGA = row [ ' ZONE MGA ' ]
check_value = row [ ' CHECK VALUE ' ]
direct = row [ ' DIRECTION ' ]
check_distance = row [ ' CHECK DISTANCE ' ]
las_dir = row [ ' LAS CLASSIFIED FOLDER ' ]
shp_dir = row [ ' SHP SWASH FOLDER ' ]
tmp_dir = row [ ' TMP FOLDER ' ]
# Get base name of input las
las_basename = os . path . splitext ( os . path . basename ( input_las ) ) [ 0 ]
# Crop to beach boundary
print ( ' Clipping... ' )
las_data = call_lastools ( ' lasclip ' , input = input_las , output = ' -stdout ' ,
args = [ ' -poly ' , initial_crop_poly ] , verbose = False )
# Classify ground points
print ( ' Classifying ground... ' )
las_data = call_lastools ( ' lasground_new ' , input = las_data , output = ' -stdout ' ,
args = [ ' -step ' , lasground_step ] , verbose = False )
# Save classified point cloud
las_name = os . path . join ( las_dir , las_basename + ' .las ' )
with open ( las_name , ' wb ' ) as f :
f . write ( las_data )
# Interpolate point cloud onto a grid
print ( ' Interpolating to grid... ' )
xyz_name = os . path . join ( tmp_dir , las_basename + ' .xyz ' )
call_lastools ( ' las2dem ' , input = las_data , output = xyz_name ,
args = [ ' -step ' , 1 ] , verbose = False )
# Make runup clipping mask from gridded point cloud
print ( ' Calculating runup clipping mask... ' )
shp_name = os . path . join ( shp_dir , las_basename + ' .shp ' )
polygon_wave_runup ( xyz_name , direct , shp_name , check_value , check_distance , zone_MGA )
#NOTE THAT YOU NEED TO CHECK THE OUTPUT SHP FILE AND ADJUST AS REQUIRED
#delete the temp files
remove_temp_files ( tmp_dir )
if __name__ == ' __main__ ' :
main ( )