"""Least squares interpolation. Implements a least-squares interpolation. Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou Geoscience Australia, 2004. DESIGN ISSUES * what variables should be global? - if there are no global vars functions can be moved around alot easier * What will be the public interface to this class? TO DO * remove points outside the mesh ?(in interpolate_block)? * geo-ref (in interpolate_block) * add functional interpolate interface - in mesh and points, out interp data """ import time import os from warnings import warn from Numeric import zeros, array, Float, Int, dot, transpose, concatenate, \ ArrayType, allclose, take, NewAxis from pyvolution.mesh import Mesh from utilities.sparse import Sparse, Sparse_CSR from utilities.cg_solve import conjugate_gradient, VectorShapeError from coordinate_transforms.geo_reference import Geo_reference from pyvolution.quad import build_quadtree from utilities.numerical_tools import ensure_numeric, mean, INF from utilities.polygon import inside_polygon from search_functions import search_tree_of_vertices class Interpolate: def __init__(self, vertex_coordinates, triangles, mesh_origin = None, verbose=False, max_vertices_per_cell=30): """ Build interpolation matrix mapping from function values at vertices to function values at data points Inputs: vertex_coordinates: List of coordinate pairs [xi, eta] of points constituting a mesh (or an m x 2 Numeric array) Points may appear multiple times (e.g. if vertices have discontinuities) triangles: List of 3-tuples (or a Numeric array) of integers representing indices of all vertices in the mesh. mesh_origin: 3-tuples consisting of UTM zone, easting and northing. If specified vertex coordinates are assumed to be relative to their respective origins. max_vertices_per_cell: Number of vertices in a quad tree cell at which the cell is split into 4. """ # why no alpha in parameter list? # Initialise variabels self._A_can_be_reused = False self._point_coordinates = None #Convert input to Numeric arrays triangles = ensure_numeric(triangles, Int) vertex_coordinates = ensure_numeric(vertex_coordinates, Float) #Build underlying mesh if verbose: print 'Building mesh' #self.mesh = General_mesh(vertex_coordinates, triangles, #FIXME: Trying the normal mesh while testing precrop, # The functionality of boundary_polygon is needed for that #FIXME: geo_ref can also be a geo_ref object #FIXME: move this to interpolate_block #Note: not so keen to use geospacial to represent the mesh, # since the data structure is # points, geo-ref and triangles, rather than just points and geo-ref # this info will usually be coming from a domain instance. if mesh_origin is None: geo = None else: geo = Geo_reference(mesh_origin[0],mesh_origin[1],mesh_origin[2]) self.mesh = Mesh(vertex_coordinates, triangles, geo_reference = geo) self.mesh.check_integrity() self.root = build_quadtree(self.mesh, max_points_per_cell = max_vertices_per_cell) #print "self.root",self.root.show() def _build_interpolation_matrix_A(self, point_coordinates, verbose = False): """Build n x m interpolation matrix, where n is the number of data points and m is the number of basis functions phi_k (one per vertex) This algorithm uses a quad tree data structure for fast binning of data points origin is a 3-tuple consisting of UTM zone, easting and northing. If specified coordinates are assumed to be relative to this origin. This one will override any data_origin that may be specified in instance interpolation Preconditions Point_coordindates and mesh vertices have the same origin. """ #Convert point_coordinates to Numeric arrays, in case it was a list. point_coordinates = ensure_numeric(point_coordinates, Float) #Remove points falling outside mesh boundary # do this bit later - that sorta means this becomes an object # get a list of what indices are outside the boundary # maybe fill these rows with n/a? #Build n x m interpolation matrix m = self.mesh.coordinates.shape[0] #Nbr of basis functions (1/vertex) n = point_coordinates.shape[0] #Nbr of data points if verbose: print 'Number of datapoints: %d' %n if verbose: print 'Number of basis functions: %d' %m A = Sparse(n,m) #Compute matrix elements for i in range(n): #For each data_coordinate point if verbose and i%((n+10)/10)==0: print 'Doing %d of %d' %(i, n) x = point_coordinates[i] element_found, sigma0, sigma1, sigma2, k = \ search_tree_of_vertices(self.root, self.mesh, x) #Update interpolation matrix A if necessary if element_found is True: #Assign values to matrix A j0 = self.mesh.triangles[k,0] #Global vertex id for sigma0 j1 = self.mesh.triangles[k,1] #Global vertex id for sigma1 j2 = self.mesh.triangles[k,2] #Global vertex id for sigma2 sigmas = {j0:sigma0, j1:sigma1, j2:sigma2} js = [j0,j1,j2] for j in js: A[i,j] = sigmas[j] else: # Hack so this message only appears when the test is #run in fit_interpoate directory. #Error message reminds me to do something #with points outside the polygon if os.getcwd()[-15:] == "fit_interpolate": print 'Could not find triangle for point', x return A # FIXME: What is a good start_blocking_len value? def interpolate(self, f, point_coordinates = None, start_blocking_len = 500000, verbose=False): """Interpolate mesh data f to determine values, z, at points. f is the data on the mesh vertices. The mesh values representing a smooth surface are assumed to be specified in f. Inputs: f: Vector or array of data at the mesh vertices. If f is an array, interpolation will be done for each column as per underlying matrix-matrix multiplication point_coordinates: Interpolate mesh data to these positions. List of coordinate pairs [x, y] of data points (or an nx2 Numeric array) If point_coordinates is absent, the points inputted last time this method was called are used, if possible. start_blocking_len: If the # of points is more or greater than this, start blocking Output: Interpolated values at inputted points (z). """ #print "point_coordinates interpolate.interpolate",point_coordinates # Can I interpolate, based on previous point_coordinates? if point_coordinates is None: if self._A_can_be_reused is True and \ len(self._point_coordinates) < start_blocking_len: z = self._get_point_data_z(f, verbose=verbose) elif self._point_coordinates is not None: # if verbose, give warning if verbose: print 'WARNING: Recalculating A matrix, due to blocking.' point_coordinates = self._point_coordinates else: #There are no good point_coordinates. import sys; sys.exit() msg = 'ERROR (interpolate.py): No point_coordinates inputted' raise Exception(msg) if point_coordinates is not None: self._point_coordinates = point_coordinates if len(point_coordinates) < start_blocking_len or \ start_blocking_len == 0: self._A_can_be_reused = True z = self.interpolate_block(f, point_coordinates, verbose=verbose) else: #Handle blocking self._A_can_be_reused = False start=0 z = self.interpolate_block(f, point_coordinates[0:0]) for end in range(start_blocking_len ,len(point_coordinates) ,start_blocking_len): t = self.interpolate_block(f, point_coordinates[start:end], verbose=verbose) z = concatenate((z,t)) start = end end = len(point_coordinates) t = self.interpolate_block(f, point_coordinates[start:end], verbose=verbose) z = concatenate((z,t)) return z def interpolate_block(self, f, point_coordinates = None, verbose=False): """ Call this if you want to control the blocking or make sure blocking doesn't occur. See interpolate for doc info. """ if point_coordinates is not None: self._A =self._build_interpolation_matrix_A(point_coordinates, verbose=verbose) return self._get_point_data_z(f) def _get_point_data_z(self, f, verbose=False): return self._A * f class Interpolation_function: """Interpolation_interface - creates callable object f(t, id) or f(t,x,y) which is interpolated from time series defined at vertices of triangular mesh (such as those stored in sww files) Let m be the number of vertices, n the number of triangles and p the number of timesteps. Mandatory input time: px1 array of monotonously increasing times (Float) quantities: Dictionary of arrays or 1 array (Float) The arrays must either have dimensions pxm or mx1. The resulting function will be time dependent in the former case while it will be constan with respect to time in the latter case. Optional input: quantity_names: List of keys into the quantities dictionary vertex_coordinates: mx2 array of coordinates (Float) triangles: nx3 array of indices into vertex_coordinates (Int) interpolation_points: Nx2 array of coordinates to be interpolated to verbose: Level of reporting The quantities returned by the callable object are specified by the list quantities which must contain the names of the quantities to be returned and also reflect the order, e.g. for the shallow water wave equation, on would have quantities = ['stage', 'xmomentum', 'ymomentum'] The parameter interpolation_points decides at which points interpolated quantities are to be computed whenever object is called. If None, return average value """ def __init__(self, time, quantities, quantity_names = None, vertex_coordinates = None, triangles = None, interpolation_points = None, verbose = False): """Initialise object and build spatial interpolation if required """ from Numeric import array, zeros, Float, alltrue, concatenate,\ reshape, ArrayType #from util import mean, ensure_numeric from config import time_format import types #Check temporal info time = ensure_numeric(time) msg = 'Time must be a monotonuosly ' msg += 'increasing sequence %s' %time assert alltrue(time[1:] - time[:-1] >= 0 ), msg #Check if quantities is a single array only if type(quantities) != types.DictType: quantities = ensure_numeric(quantities) quantity_names = ['Attribute'] #Make it a dictionary quantities = {quantity_names[0]: quantities} #Use keys if no names are specified if quantity_names is None: quantity_names = quantities.keys() #Check spatial info if vertex_coordinates is None: self.spatial = False else: vertex_coordinates = ensure_numeric(vertex_coordinates) assert triangles is not None, 'Triangles array must be specified' triangles = ensure_numeric(triangles) self.spatial = True #Save for use with statistics self.quantity_names = quantity_names self.quantities = quantities self.vertex_coordinates = vertex_coordinates self.interpolation_points = interpolation_points self.T = time[:] # Time assumed to be relative to starttime self.index = 0 # Initial time index self.precomputed_values = {} #Precomputed spatial interpolation if requested if interpolation_points is not None: if self.spatial is False: raise 'Triangles and vertex_coordinates must be specified' try: self.interpolation_points = ensure_numeric(interpolation_points) except: msg = 'Interpolation points must be an N x 2 Numeric array '+\ 'or a list of points\n' msg += 'I got: %s.' %(str(self.interpolation_points)[:60] +\ '...') raise msg m = len(self.interpolation_points) p = len(self.T) for name in quantity_names: self.precomputed_values[name] = zeros((p, m), Float) #Build interpolator interpol = Interpolate(vertex_coordinates, triangles, #point_coordinates = \ #self.interpolation_points, #alpha = 0, verbose = verbose) if verbose: print 'Interpolate' for i, t in enumerate(self.T): #Interpolate quantities at this timestep if verbose and i%((p+10)/10)==0: print ' time step %d of %d' %(i, p) for name in quantity_names: if len(quantities[name].shape) == 2: result = interpol.interpolate(quantities[name][i,:], point_coordinates = \ self.interpolation_points) else: #Assume no time dependency result = interpol.interpolate(quantities[name][:], point_coordinates = \ self.interpolation_points) self.precomputed_values[name][i, :] = result #Report if verbose: print self.statistics() #self.print_statistics() else: #Store quantitites as is for name in quantity_names: self.precomputed_values[name] = quantities[name] def __repr__(self): #return 'Interpolation function (spatio-temporal)' return self.statistics() def __call__(self, t, point_id = None, x = None, y = None): """Evaluate f(t), f(t, point_id) or f(t, x, y) Inputs: t: time - Model time. Must lie within existing timesteps point_id: index of one of the preprocessed points. x, y: Overrides location, point_id ignored If spatial info is present and all of x,y,point_id are None an exception is raised If no spatial info is present, point_id and x,y arguments are ignored making f a function of time only. FIXME: point_id could also be a slice FIXME: What if x and y are vectors? FIXME: What about f(x,y) without t? """ from math import pi, cos, sin, sqrt from Numeric import zeros, Float from util import mean if self.spatial is True: if point_id is None: if x is None or y is None: msg = 'Either point_id or x and y must be specified' raise Exception(msg) else: if self.interpolation_points is None: msg = 'Interpolation_function must be instantiated ' +\ 'with a list of interpolation points before parameter ' +\ 'point_id can be used' raise Exception(msg) msg = 'Time interval [%s:%s]' %(self.T[0], self.T[1]) msg += ' does not match model time: %s\n' %t if t < self.T[0]: raise Exception(msg) if t > self.T[-1]: raise Exception(msg) oldindex = self.index #Time index #Find current time slot while t > self.T[self.index]: self.index += 1 while t < self.T[self.index]: self.index -= 1 if t == self.T[self.index]: #Protect against case where t == T[-1] (last time) # - also works in general when t == T[i] ratio = 0 else: #t is now between index and index+1 ratio = (t - self.T[self.index])/\ (self.T[self.index+1] - self.T[self.index]) #Compute interpolated values q = zeros(len(self.quantity_names), Float) for i, name in enumerate(self.quantity_names): Q = self.precomputed_values[name] if self.spatial is False: #If there is no spatial info assert len(Q.shape) == 1 Q0 = Q[self.index] if ratio > 0: Q1 = Q[self.index+1] else: if x is not None and y is not None: #Interpolate to x, y raise 'x,y interpolation not yet implemented' else: #Use precomputed point Q0 = Q[self.index, point_id] if ratio > 0: Q1 = Q[self.index+1, point_id] #Linear temporal interpolation if ratio > 0: q[i] = Q0 + ratio*(Q1 - Q0) else: q[i] = Q0 #Return vector of interpolated values #if len(q) == 1: # return q[0] #else: # return q #Return vector of interpolated values #FIXME: if self.spatial is True: return q else: #Replicate q according to x and y #This is e.g used for Wind_stress if x is None or y is None: return q else: try: N = len(x) except: return q else: from Numeric import ones, Float #x is a vector - Create one constant column for each value N = len(x) assert len(y) == N, 'x and y must have same length' res = [] for col in q: res.append(col*ones(N, Float)) return res def statistics(self): """Output statistics about interpolation_function """ vertex_coordinates = self.vertex_coordinates interpolation_points = self.interpolation_points quantity_names = self.quantity_names quantities = self.quantities precomputed_values = self.precomputed_values x = vertex_coordinates[:,0] y = vertex_coordinates[:,1] str = '------------------------------------------------\n' str += 'Interpolation_function (spatio-temporal) statistics:\n' str += ' Extent:\n' str += ' x in [%f, %f], len(x) == %d\n'\ %(min(x), max(x), len(x)) str += ' y in [%f, %f], len(y) == %d\n'\ %(min(y), max(y), len(y)) str += ' t in [%f, %f], len(t) == %d\n'\ %(min(self.T), max(self.T), len(self.T)) str += ' Quantities:\n' for name in quantity_names: q = quantities[name][:].flat str += ' %s in [%f, %f]\n' %(name, min(q), max(q)) if interpolation_points is not None: str += ' Interpolation points (xi, eta):'\ ' number of points == %d\n' %interpolation_points.shape[0] str += ' xi in [%f, %f]\n' %(min(interpolation_points[:,0]), max(interpolation_points[:,0])) str += ' eta in [%f, %f]\n' %(min(interpolation_points[:,1]), max(interpolation_points[:,1])) str += ' Interpolated quantities (over all timesteps):\n' for name in quantity_names: q = precomputed_values[name][:].flat str += ' %s at interpolation points in [%f, %f]\n'\ %(name, min(q), max(q)) str += '------------------------------------------------\n' return str def interpolate_sww(sww_file, time, interpolation_points, quantity_names = None, verbose = False): """ obsolete. use file_function in utils """ #open sww file x, y, volumes, time, quantities = read_sww(sww_file) print "x",x print "y",y print "time", time print "quantities", quantities #Add the x and y together vertex_coordinates = concatenate((x[:,NewAxis], y[:,NewAxis]),axis=1) #Will return the quantity values at the specified times and locations interp = Interpolation_interface( time, quantities, quantity_names = quantity_names, vertex_coordinates = vertex_coordinates, triangles = volumes, interpolation_points = interpolation_points, verbose = verbose) def read_sww(file_name): """ obsolete - Nothing should be calling this Read in an sww file. Input; file_name - the sww file Output; x - Vector of x values y - Vector of y values z - Vector of bed elevation volumes - Array. Each row has 3 values, representing the vertices that define the volume time - Vector of the times where there is stage information stage - array with respect to time and vertices (x,y) """ #FIXME Have this reader as part of data_manager? from Scientific.IO.NetCDF import NetCDFFile import tempfile import sys import os #Check contents #Get NetCDF # see if the file is there. Throw a QUIET IO error if it isn't # I don't think I could implement the above #throws prints to screen if file not present junk = tempfile.mktemp(".txt") fd = open(junk,'w') stdout = sys.stdout sys.stdout = fd fid = NetCDFFile(file_name, 'r') sys.stdout = stdout fd.close() #clean up os.remove(junk) # Get the variables x = fid.variables['x'][:] y = fid.variables['y'][:] volumes = fid.variables['volumes'][:] time = fid.variables['time'][:] keys = fid.variables.keys() keys.remove("x") keys.remove("y") keys.remove("volumes") keys.remove("time") #Turn NetCDF objects into Numeric arrays quantities = {} for name in keys: quantities[name] = fid.variables[name][:] fid.close() return x, y, volumes, time, quantities #------------------------------------------------------------- if __name__ == "__main__": a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] points = [a, b, c] vertices = [ [1,0,2] ] #bac data = [ [2.0/3, 2.0/3] ] #Use centroid as one data point interp = Interpolate(points, vertices) #, data) A = interp._build_interpolation_matrix_A(data, verbose=True) A = A.todense() print "A",A assert allclose(A, [[1./3, 1./3, 1./3]]) print "finished"