"""Least squares smooting and interpolation. Implements a penalised least-squares fit and associated interpolations. The penalty term (or smoothing term) is controlled by the smoothing parameter alpha. With a value of alpha=0, the fit function will attempt to interpolate as closely as possible in the least-squares sense. With values alpha > 0, a certain amount of smoothing will be applied. A positive alpha is essential in cases where there are too few data points. A negative alpha is not allowed. A typical value of alpha is 1.0e-6 Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou Geoscience Australia, 2004. """ import exceptions class ShapeError(exceptions.Exception): pass #from general_mesh import General_mesh from Numeric import zeros, array, Float, Int, dot, transpose, concatenate, ArrayType from mesh import Mesh from Numeric import zeros, take, array, Float, Int, dot, transpose, concatenate, ArrayType from sparse import Sparse, Sparse_CSR from cg_solve import conjugate_gradient, VectorShapeError from coordinate_transforms.geo_reference import Geo_reference import time try: from util import gradient except ImportError, e: #FIXME reduce the dependency of modules in pyvolution # Have util in a dir, working like load_mesh, and get rid of this def gradient(x0, y0, x1, y1, x2, y2, q0, q1, q2): """ """ det = (y2-y0)*(x1-x0) - (y1-y0)*(x2-x0) a = (y2-y0)*(q1-q0) - (y1-y0)*(q2-q0) a /= det b = (x1-x0)*(q2-q0) - (x2-x0)*(q1-q0) b /= det return a, b DEFAULT_ALPHA = 0.001 def fit_to_mesh_file(mesh_file, point_file, mesh_output_file, alpha=DEFAULT_ALPHA, verbose= False, expand_search = False, data_origin = None, mesh_origin = None, precrop = False): """ Given a mesh file (tsh) and a point attribute file (xya), fit point attributes to the mesh and write a mesh file with the results. If data_origin is not None it is assumed to be a 3-tuple with geo referenced UTM coordinates (zone, easting, northing) mesh_origin is the same but refers to the input tsh file. FIXME: When the tsh format contains it own origin, these parameters can go. FIXME: And both origins should be obtained from the specified files. """ from load_mesh.loadASCII import mesh_file_to_mesh_dictionary, \ load_points_file, export_mesh_file, \ concatinate_attributelist mesh_dict = mesh_file_to_mesh_dictionary(mesh_file) vertex_coordinates = mesh_dict['vertices'] triangles = mesh_dict['triangles'] if type(mesh_dict['vertex_attributes']) == ArrayType: old_point_attributes = mesh_dict['vertex_attributes'].tolist() else: old_point_attributes = mesh_dict['vertex_attributes'] if type(mesh_dict['vertex_attribute_titles']) == ArrayType: old_title_list = mesh_dict['vertex_attribute_titles'].tolist() else: old_title_list = mesh_dict['vertex_attribute_titles'] if verbose: print 'tsh file %s loaded' %mesh_file # load in the .pts file try: point_dict = load_points_file(point_file, delimiter = ',', verbose=verbose) except SyntaxError,e: point_dict = load_points_file(point_file, delimiter = ' ', verbose=verbose) point_coordinates = point_dict['pointlist'] title_list,point_attributes = concatinate_attributelist(point_dict['attributelist']) if point_dict.has_key('geo_reference') and not point_dict['geo_reference'] is None: data_origin = point_dict['geo_reference'].get_origin() else: data_origin = (56, 0, 0) #FIXME(DSG-DSG) if mesh_dict.has_key('geo_reference') and not mesh_dict['geo_reference'] is None: mesh_origin = mesh_dict['geo_reference'].get_origin() else: mesh_origin = (56, 0, 0) #FIXME(DSG-DSG) if verbose: print "points file loaded" if verbose:print "fitting to mesh" f = fit_to_mesh(vertex_coordinates, triangles, point_coordinates, point_attributes, alpha = alpha, verbose = verbose, expand_search = expand_search, data_origin = data_origin, mesh_origin = mesh_origin, precrop = precrop) if verbose: print "finished fitting to mesh" # convert array to list of lists new_point_attributes = f.tolist() #FIXME have this overwrite attributes with the same title - DSG #Put the newer attributes last if old_title_list <> []: old_title_list.extend(title_list) #FIXME can this be done a faster way? - DSG for i in range(len(old_point_attributes)): old_point_attributes[i].extend(new_point_attributes[i]) mesh_dict['vertex_attributes'] = old_point_attributes mesh_dict['vertex_attribute_titles'] = old_title_list else: mesh_dict['vertex_attributes'] = new_point_attributes mesh_dict['vertex_attribute_titles'] = title_list #FIXME (Ole): Remember to output mesh_origin as well if verbose: print "exporting to file ",mesh_output_file export_mesh_file(mesh_output_file, mesh_dict) def fit_to_mesh(vertex_coordinates, triangles, point_coordinates, point_attributes, alpha = DEFAULT_ALPHA, verbose = False, expand_search = False, data_origin = None, mesh_origin = None, precrop = False): """ Fit a smooth surface to a triangulation, given data points with attributes. Inputs: vertex_coordinates: List of coordinate pairs [xi, eta] of points constituting mesh (or a an m x 2 Numeric array) triangles: List of 3-tuples (or a Numeric array) of integers representing indices of all vertices in the mesh. point_coordinates: List of coordinate pairs [x, y] of data points (or an nx2 Numeric array) alpha: Smoothing parameter. point_attributes: Vector or array of data at the point_coordinates. data_origin and mesh_origin are 3-tuples consisting of UTM zone, easting and northing. If specified point coordinates and vertex coordinates are assumed to be relative to their respective origins. """ interp = Interpolation(vertex_coordinates, triangles, point_coordinates, alpha = alpha, verbose = verbose, expand_search = expand_search, data_origin = data_origin, mesh_origin = mesh_origin, precrop = precrop) vertex_attributes = interp.fit_points(point_attributes, verbose = verbose) return vertex_attributes def pts2rectangular(pts_name, M, N, alpha = DEFAULT_ALPHA, verbose = False, reduction = 1, format = 'netcdf'): """Fits attributes from pts file to MxN rectangular mesh Read pts file and create rectangular mesh of resolution MxN such that it covers all points specified in pts file. FIXME: This may be a temporary function until we decide on netcdf formats etc FIXME: Uses elevation hardwired """ import util, mesh_factory if verbose: print 'Read pts' points, attributes = util.read_xya(pts_name, format) #Reduce number of points a bit points = points[::reduction] elevation = attributes['elevation'] #Must be elevation elevation = elevation[::reduction] if verbose: print 'Got %d data points' %len(points) if verbose: print 'Create mesh' #Find extent max_x = min_x = points[0][0] max_y = min_y = points[0][1] for point in points[1:]: x = point[0] if x > max_x: max_x = x if x < min_x: min_x = x y = point[1] if y > max_y: max_y = y if y < min_y: min_y = y #Create appropriate mesh vertex_coordinates, triangles, boundary =\ mesh_factory.rectangular(M, N, max_x-min_x, max_y-min_y, (min_x, min_y)) #Fit attributes to mesh vertex_attributes = fit_to_mesh(vertex_coordinates, triangles, points, elevation, alpha=alpha, verbose=verbose) return vertex_coordinates, triangles, boundary, vertex_attributes class Interpolation: def __init__(self, vertex_coordinates, triangles, point_coordinates = None, alpha = DEFAULT_ALPHA, verbose = False, expand_search = True, max_points_per_cell = 30, mesh_origin = None, data_origin = None, precrop = False): """ Build interpolation matrix mapping from function values at vertices to function values at data points Inputs: vertex_coordinates: List of coordinate pairs [xi, eta] of points constituting mesh (or a an m x 2 Numeric array) triangles: List of 3-tuples (or a Numeric array) of integers representing indices of all vertices in the mesh. point_coordinates: List of coordinate pairs [x, y] of data points (or an nx2 Numeric array) If point_coordinates is absent, only smoothing matrix will be built alpha: Smoothing parameter data_origin and mesh_origin are 3-tuples consisting of UTM zone, easting and northing. If specified point coordinates and vertex coordinates are assumed to be relative to their respective origins. """ from util import ensure_numeric #Convert input to Numeric arrays triangles = ensure_numeric(triangles, Int) vertex_coordinates = ensure_numeric(vertex_coordinates, Float) #Build underlying mesh if verbose: print 'Building mesh' #self.mesh = General_mesh(vertex_coordinates, triangles, #FIXME: Trying the normal mesh while testing precrop, # The functionality of boundary_polygon is needed for that #FIXME - geo ref does not have to go into mesh. # Change the point co-ords to conform to the # mesh co-ords early in the code if mesh_origin == None: geo = None else: geo = Geo_reference(mesh_origin[0],mesh_origin[1],mesh_origin[2]) self.mesh = Mesh(vertex_coordinates, triangles, geo_reference = geo) #FIXME, remove if mesh checks it. self.mesh.check_integrity() self.data_origin = data_origin self.point_indices = None #Smoothing parameter self.alpha = alpha #Build coefficient matrices self.build_coefficient_matrix_B(point_coordinates, verbose = verbose, expand_search = expand_search, max_points_per_cell =\ max_points_per_cell, data_origin = data_origin, precrop = precrop) def set_point_coordinates(self, point_coordinates, data_origin = None): """ A public interface to setting the point co-ordinates. """ self.build_coefficient_matrix_B(point_coordinates, data_origin) def build_coefficient_matrix_B(self, point_coordinates=None, verbose = False, expand_search = True, max_points_per_cell=30, data_origin = None, precrop = False): """Build final coefficient matrix""" if self.alpha <> 0: if verbose: print 'Building smoothing matrix' self.build_smoothing_matrix_D() if point_coordinates is not None: if verbose: print 'Building interpolation matrix' self.build_interpolation_matrix_A(point_coordinates, verbose = verbose, expand_search = expand_search, max_points_per_cell =\ max_points_per_cell, data_origin = data_origin, precrop = precrop) if self.alpha <> 0: self.B = self.AtA + self.alpha*self.D else: self.B = self.AtA #Convert self.B matrix to CSR format for faster matrix vector self.B = Sparse_CSR(self.B) def build_interpolation_matrix_A(self, point_coordinates, verbose = False, expand_search = True, max_points_per_cell=30, data_origin = None, precrop = False): """Build n x m interpolation matrix, where n is the number of data points and m is the number of basis functions phi_k (one per vertex) This algorithm uses a quad tree data structure for fast binning of data points origin is a 3-tuple consisting of UTM zone, easting and northing. If specified coordinates are assumed to be relative to this origin. This one will override any data_origin that may be specified in interpolation instance """ from quad import build_quadtree from util import ensure_numeric if data_origin is None: data_origin = self.data_origin #Use the one from #interpolation instance #Convert input to Numeric arrays just in case. point_coordinates = ensure_numeric(point_coordinates, Float) #Shift data points to same origin as mesh (if specified) #FIXME this will shift if there was no geo_ref. #But all this should be removed amyhow. #change coords before this point mesh_origin = self.mesh.geo_reference.get_origin() if point_coordinates is not None: if data_origin is not None: if mesh_origin is not None: #Transformation: # #Let x_0 be the reference point of the point coordinates #and xi_0 the reference point of the mesh. # #A point coordinate (x + x_0) is then made relative #to xi_0 by # # x_new = x + x_0 - xi_0 # #and similarly for eta x_offset = data_origin[1] - mesh_origin[1] y_offset = data_origin[2] - mesh_origin[2] else: #Shift back to a zero origin x_offset = data_origin[1] y_offset = data_origin[2] point_coordinates[:,0] += x_offset point_coordinates[:,1] += y_offset else: if mesh_origin is not None: #Use mesh origin for data points point_coordinates[:,0] -= mesh_origin[1] point_coordinates[:,1] -= mesh_origin[2] #Remove points falling outside mesh boundary #This reduced one example from 1356 seconds to 825 seconds #And more could be had by writing util.inside_polygon in C if precrop is True: from Numeric import take from util import inside_polygon if verbose: print 'Getting boundary polygon' P = self.mesh.get_boundary_polygon() if verbose: print 'Getting indices inside mesh boundary' indices = inside_polygon(point_coordinates, P, verbose = verbose) if verbose: print 'Done' if len(indices) != point_coordinates.shape[0]: print '%d points outside mesh have been cropped.'\ %(point_coordinates.shape[0] - len(indices)) point_coordinates = take(point_coordinates, indices) self.point_indices = indices #Build n x m interpolation matrix m = self.mesh.coordinates.shape[0] #Nbr of basis functions (1/vertex) n = point_coordinates.shape[0] #Nbr of data points if verbose: print 'Number of datapoints: %d' %n if verbose: print 'Number of basis functions: %d' %m #FIXME (Ole): We should use CSR here since mat-mat mult is now OK. #However, Sparse_CSR does not have the same methods as Sparse yet #The tests will reveal what needs to be done self.A = Sparse(n,m) self.AtA = Sparse(m,m) #Build quad tree of vertices (FIXME: Is this the right spot for that?) root = build_quadtree(self.mesh, max_points_per_cell = max_points_per_cell) #Compute matrix elements for i in range(n): #For each data_coordinate point if verbose and i%((n+10)/10)==0: print 'Doing %d of %d' %(i, n) x = point_coordinates[i] #Find vertices near x candidate_vertices = root.search(x[0], x[1]) is_more_elements = True element_found, sigma0, sigma1, sigma2, k = \ self.search_triangles_of_vertices(candidate_vertices, x) while not element_found and is_more_elements and expand_search: #if verbose: print 'Expanding search' candidate_vertices, branch = root.expand_search() if branch == []: # Searching all the verts from the root cell that haven't # been searched. This is the last try element_found, sigma0, sigma1, sigma2, k = \ self.search_triangles_of_vertices(candidate_vertices, x) is_more_elements = False else: element_found, sigma0, sigma1, sigma2, k = \ self.search_triangles_of_vertices(candidate_vertices, x) #Update interpolation matrix A if necessary if element_found is True: #Assign values to matrix A j0 = self.mesh.triangles[k,0] #Global vertex id for sigma0 j1 = self.mesh.triangles[k,1] #Global vertex id for sigma1 j2 = self.mesh.triangles[k,2] #Global vertex id for sigma2 sigmas = {j0:sigma0, j1:sigma1, j2:sigma2} js = [j0,j1,j2] for j in js: self.A[i,j] = sigmas[j] for k in js: self.AtA[j,k] += sigmas[j]*sigmas[k] else: pass #Ok if there is no triangle for datapoint #(as in brute force version) #raise 'Could not find triangle for point', x def search_triangles_of_vertices(self, candidate_vertices, x): #Find triangle containing x: element_found = False # This will be returned if element_found = False sigma2 = -10.0 sigma0 = -10.0 sigma1 = -10.0 k = -10.0 #For all vertices in same cell as point x for v in candidate_vertices: #for each triangle id (k) which has v as a vertex for k, _ in self.mesh.vertexlist[v]: #Get the three vertex_points of candidate triangle xi0 = self.mesh.get_vertex_coordinate(k, 0) xi1 = self.mesh.get_vertex_coordinate(k, 1) xi2 = self.mesh.get_vertex_coordinate(k, 2) #print "PDSG - k", k #print "PDSG - xi0", xi0 #print "PDSG - xi1", xi1 #print "PDSG - xi2", xi2 #print "PDSG element %i verts((%f, %f),(%f, %f),(%f, %f))"\ # % (k, xi0[0], xi0[1], xi1[0], xi1[1], xi2[0], xi2[1]) #Get the three normals n0 = self.mesh.get_normal(k, 0) n1 = self.mesh.get_normal(k, 1) n2 = self.mesh.get_normal(k, 2) #Compute interpolation sigma2 = dot((x-xi0), n2)/dot((xi2-xi0), n2) sigma0 = dot((x-xi1), n0)/dot((xi0-xi1), n0) sigma1 = dot((x-xi2), n1)/dot((xi1-xi2), n1) #print "PDSG - sigma0", sigma0 #print "PDSG - sigma1", sigma1 #print "PDSG - sigma2", sigma2 #FIXME: Maybe move out to test or something epsilon = 1.0e-6 assert abs(sigma0 + sigma1 + sigma2 - 1.0) < epsilon #Check that this triangle contains the data point #Sigmas can get negative within #machine precision on some machines (e.g nautilus) #Hence the small eps eps = 1.0e-15 if sigma0 >= -eps and sigma1 >= -eps and sigma2 >= -eps: element_found = True break if element_found is True: #Don't look for any other triangle break return element_found, sigma0, sigma1, sigma2, k def build_interpolation_matrix_A_brute(self, point_coordinates): """Build n x m interpolation matrix, where n is the number of data points and m is the number of basis functions phi_k (one per vertex) This is the brute force which is too slow for large problems, but could be used for testing """ from util import ensure_numeric #Convert input to Numeric arrays point_coordinates = ensure_numeric(point_coordinates, Float) #Build n x m interpolation matrix m = self.mesh.coordinates.shape[0] #Nbr of basis functions (1/vertex) n = point_coordinates.shape[0] #Nbr of data points self.A = Sparse(n,m) self.AtA = Sparse(m,m) #Compute matrix elements for i in range(n): #For each data_coordinate point x = point_coordinates[i] element_found = False k = 0 while not element_found and k < len(self.mesh): #For each triangle (brute force) #FIXME: Real algorithm should only visit relevant triangles #Get the three vertex_points xi0 = self.mesh.get_vertex_coordinate(k, 0) xi1 = self.mesh.get_vertex_coordinate(k, 1) xi2 = self.mesh.get_vertex_coordinate(k, 2) #Get the three normals n0 = self.mesh.get_normal(k, 0) n1 = self.mesh.get_normal(k, 1) n2 = self.mesh.get_normal(k, 2) #Compute interpolation sigma2 = dot((x-xi0), n2)/dot((xi2-xi0), n2) sigma0 = dot((x-xi1), n0)/dot((xi0-xi1), n0) sigma1 = dot((x-xi2), n1)/dot((xi1-xi2), n1) #FIXME: Maybe move out to test or something epsilon = 1.0e-6 assert abs(sigma0 + sigma1 + sigma2 - 1.0) < epsilon #Check that this triangle contains data point if sigma0 >= 0 and sigma1 >= 0 and sigma2 >= 0: element_found = True #Assign values to matrix A j0 = self.mesh.triangles[k,0] #Global vertex id #self.A[i, j0] = sigma0 j1 = self.mesh.triangles[k,1] #Global vertex id #self.A[i, j1] = sigma1 j2 = self.mesh.triangles[k,2] #Global vertex id #self.A[i, j2] = sigma2 sigmas = {j0:sigma0, j1:sigma1, j2:sigma2} js = [j0,j1,j2] for j in js: self.A[i,j] = sigmas[j] for k in js: self.AtA[j,k] += sigmas[j]*sigmas[k] k = k+1 def get_A(self): return self.A.todense() def get_B(self): return self.B.todense() def get_D(self): return self.D.todense() #FIXME: Remember to re-introduce the 1/n factor in the #interpolation term def build_smoothing_matrix_D(self): """Build m x m smoothing matrix, where m is the number of basis functions phi_k (one per vertex) The smoothing matrix is defined as D = D1 + D2 where [D1]_{k,l} = \int_\Omega \frac{\partial \phi_k}{\partial x} \frac{\partial \phi_l}{\partial x}\, dx dy [D2]_{k,l} = \int_\Omega \frac{\partial \phi_k}{\partial y} \frac{\partial \phi_l}{\partial y}\, dx dy The derivatives \frac{\partial \phi_k}{\partial x}, \frac{\partial \phi_k}{\partial x} for a particular triangle are obtained by computing the gradient a_k, b_k for basis function k """ #FIXME: algorithm might be optimised by computing local 9x9 #"element stiffness matrices: m = self.mesh.coordinates.shape[0] #Nbr of basis functions (1/vertex) self.D = Sparse(m,m) #For each triangle compute contributions to D = D1+D2 for i in range(len(self.mesh)): #Get area area = self.mesh.areas[i] #Get global vertex indices v0 = self.mesh.triangles[i,0] v1 = self.mesh.triangles[i,1] v2 = self.mesh.triangles[i,2] #Get the three vertex_points xi0 = self.mesh.get_vertex_coordinate(i, 0) xi1 = self.mesh.get_vertex_coordinate(i, 1) xi2 = self.mesh.get_vertex_coordinate(i, 2) #Compute gradients for each vertex a0, b0 = gradient(xi0[0], xi0[1], xi1[0], xi1[1], xi2[0], xi2[1], 1, 0, 0) a1, b1 = gradient(xi0[0], xi0[1], xi1[0], xi1[1], xi2[0], xi2[1], 0, 1, 0) a2, b2 = gradient(xi0[0], xi0[1], xi1[0], xi1[1], xi2[0], xi2[1], 0, 0, 1) #Compute diagonal contributions self.D[v0,v0] += (a0*a0 + b0*b0)*area self.D[v1,v1] += (a1*a1 + b1*b1)*area self.D[v2,v2] += (a2*a2 + b2*b2)*area #Compute contributions for basis functions sharing edges e01 = (a0*a1 + b0*b1)*area self.D[v0,v1] += e01 self.D[v1,v0] += e01 e12 = (a1*a2 + b1*b2)*area self.D[v1,v2] += e12 self.D[v2,v1] += e12 e20 = (a2*a0 + b2*b0)*area self.D[v2,v0] += e20 self.D[v0,v2] += e20 def fit(self, z): """Fit a smooth surface to given 1d array of data points z. The smooth surface is computed at each vertex in the underlying mesh using the formula given in the module doc string. Pre Condition: self.A, self.At and self.B have been initialised Inputs: z: Single 1d vector or array of data at the point_coordinates. """ #Convert input to Numeric arrays from util import ensure_numeric z = ensure_numeric(z, Float) if len(z.shape) > 1 : raise VectorShapeError, 'Can only deal with 1d data vector' if self.point_indices is not None: #Remove values for any points that were outside mesh z = take(z, self.point_indices) #Compute right hand side based on data Atz = self.A.trans_mult(z) #Check sanity n, m = self.A.shape if n 4: if sys.argv[4][0] == "e" or sys.argv[4][0] == "E": expand_search = True else: expand_search = False verbose = False if len(sys.argv) > 5: if sys.argv[5][0] == "n" or sys.argv[5][0] == "N": verbose = False else: verbose = True if len(sys.argv) > 6: alpha = sys.argv[6] else: alpha = DEFAULT_ALPHA print "alpha = ",alpha t0 = time.time() fit_to_mesh_file(mesh_file, point_file, mesh_output_file, alpha, verbose= verbose, expand_search = expand_search) print 'That took %.2f seconds' %(time.time()-t0)