"""This modul contains various auxiliary function used by pyvolution. It is also a clearing house for function tat may later earn a module of their own. """ def angle(v): """Compute angle between e1 (the unit vector in the x-direction) and the specified vector """ from math import acos, pi, sqrt from Numeric import sum, array l = sqrt( sum (array(v)**2)) v1 = v[0]/l v2 = v[1]/l theta = acos(v1) if v2 < 0: #Quadrant 3 or 4 theta = 2*pi-theta return theta def anglediff(v0, v1): """Compute difference between angle of vector x0, y0 and x1, y1. This is used for determining the ordering of vertices, e.g. for checking if they are counter clockwise. Always return a positive value """ from math import pi a0 = angle(v0) a1 = angle(v1) #Ensure that difference will be positive if a0 < a1: a0 += 2*pi return a0-a1 def mean(x): from Numeric import sum return sum(x)/len(x) def point_on_line(x, y, x0, y0, x1, y1): """Determine whether a point is on a line segment Input: x, y, x0, x0, x1, y1: where point is given by x, y line is given by (x0, y0) and (x1, y1) """ from Numeric import array, dot, allclose from math import sqrt a = array([x - x0, y - y0]) a_normal = array([a[1], -a[0]]) b = array([x1 - x0, y1 - y0]) if dot(a_normal, b) == 0: #Point is somewhere on the infinite extension of the line len_a = sqrt(sum(a**2)) len_b = sqrt(sum(b**2)) if dot(a, b) >= 0 and len_a <= len_b: return True else: return False else: return False def file_function(filename, domain=None, quantities = None, interpolation_points = None, verbose = False): """If domain is specified, don't specify quantites as they are automatically derived """ assert type(filename) == type(''),\ 'First argument to File_function must be a string' try: fid = open(filename) except Exception, e: msg = 'File "%s" could not be opened: Error="%s"'\ %(filename, e) raise msg line = fid.readline() fid.close() if domain is not None: quantities = domain.conserved_quantities else: quantities = None #Choose format #FIXME: Maybe these can be merged later on if line[:3] == 'CDF': return File_function_NetCDF(filename, domain, quantities, interpolation_points, verbose = verbose) else: return File_function_ASCII(filename, domain, quantities, interpolation_points) class File_function_NetCDF: """Read time history of spatial data from NetCDF sww file and return a callable object f(t,x,y) which will return interpolated values based on the input file. x, y may be either scalars or vectors #FIXME: More about format, interpolation and order of quantities The quantities returned by the callable objects are specified by the list quantities which must contain the names of the quantities to be returned and also reflect the order, e.g. for the shallow water wave equation, on would have quantities = ['stage', 'xmomentum', 'ymomentum'] interpolation_points decides at which points interpolated quantities are to be computed whenever object is called. If None, return average value """ def __init__(self, filename, domain=None, quantities=None, interpolation_points=None, verbose = False): """Initialise callable object from a file. If domain is specified, model time (domain.starttime) will be checked and possibly modified All times are assumed to be in UTC """ #FIXME: Check that model origin is the same as file's origin #(both in UTM coordinates) #If not - modify those from file to match domain import time, calendar from config import time_format from Scientific.IO.NetCDF import NetCDFFile #Open NetCDF file if verbose: print 'Reading', filename fid = NetCDFFile(filename, 'r') if quantities is None or len(quantities) < 1: msg = 'ERROR: File must contain at least one independent value' raise msg missing = [] for quantity in ['x', 'y', 'time'] + quantities: if not fid.variables.has_key(quantity): missing.append(quantity) if len(missing) > 0: msg = 'Quantities %s could not be found in file %s'\ %(str(missing), filename) raise msg #Get first timestep try: starttime = fid.starttime[0] except ValueError: msg = 'Could not read starttime from file %s' %filename raise msg self.number_of_values = len(quantities) self.fid = fid self.filename = filename self.starttime = starttime self.quantities = quantities self.domain = domain self.interpolation_points = interpolation_points if domain is not None: msg = 'WARNING: Start time as specified in domain (%s)'\ %domain.starttime msg += ' is earlier than the starttime of file %s: %s.'\ %(self.filename, self.starttime) msg += 'Modifying starttime accordingly.' if self.starttime > domain.starttime: #FIXME: Print depending on some verbosity setting #print msg domain.starttime = self.starttime #Modifying model time #Read all data in and produce values for desired data points at each timestep self.spatial_interpolation(interpolation_points, verbose = verbose) fid.close() def spatial_interpolation(self, interpolation_points, verbose = False): """For each timestep in fid: read surface, interpolate to desired points and store values for use when object is called. """ from Numeric import array, zeros, Float, alltrue, concatenate, reshape from config import time_format from least_squares import Interpolation import time, calendar fid = self.fid #Get variables if verbose: print 'Get varibles' x = fid.variables['x'][:] y = fid.variables['y'][:] z = fid.variables['elevation'][:] triangles = fid.variables['volumes'][:] time = fid.variables['time'][:] #Check msg = 'File %s must list time as a monotonuosly ' %self.filename msg += 'increasing sequence' assert alltrue(time[1:] - time[:-1] > 0 ), msg if interpolation_points is not None: try: P = array(interpolation_points) except: msg = 'Interpolation points must be an N x 2 Numeric array or a list of points\n' msg += 'I got: %s.' %( str(interpolation_points)[:60] + '...') raise msg ####self.Q = zeros( (len(time), self.T = time[:] self.index = 0 #Initial index self.values = {} for name in self.quantities: self.values[name] = zeros( (len(self.T), len(interpolation_points)), Float) #Build interpolator if verbose: print 'Build interpolation matrix' x = reshape(x, (len(x),1)) y = reshape(y, (len(y),1)) vertex_coordinates = concatenate((x,y), axis=1) #m x 2 array interpol = Interpolation(vertex_coordinates, triangles, point_coordinates = P, alpha = 0, verbose = verbose) if verbose: print 'Interpolate' for i, t in enumerate(self.T): #Interpolate quantities at this timestep #print ' time step %d of %d' %(i, len(self.T)) for name in self.quantities: self.values[name][i, :] =\ interpol.interpolate(fid.variables[name][i,:]) else: pass def __repr__(self): return 'File function' def __call__(self, t, x=None, y=None, point_id = None): """Evaluate f(t, point_id) Inputs: t: time - Model time (tau = domain.starttime-self.starttime+t) must lie within existing timesteps point_id: index of one of the preprocessed points. If point_id is None all preprocessed points are computed FIXME: point_id could also be a slice FIXME: One could allow arbitrary x, y coordinates (requires computation of a new interpolator) FIXME: What if x and y are vectors? """ from math import pi, cos, sin, sqrt from Numeric import zeros, Float #Find time tau such that # # domain.starttime + t == self.starttime + tau if self.domain is not None: tau = self.domain.starttime-self.starttime+t else: tau = t msg = 'Time interval derived from file %s (%s:%s) does not match model time: %s'\ %(self.filename, self.T[0], self.T[1], tau) if tau < self.T[0]: raise msg if tau > self.T[-1]: raise msg oldindex = self.index #Time index #Find time slot while tau > self.T[self.index]: self.index += 1 while tau < self.T[self.index]: self.index -= 1 if tau == self.T[self.index]: #Protect against case where tau == T[-1] (last time) # - also works in general when tau == T[i] ratio = 0 else: #t is now between index and index+1 ratio = (tau - self.T[self.index])/\ (self.T[self.index+1] - self.T[self.index]) #Compute interpolated values q = zeros( len(self.quantities), Float) for i, name in enumerate(self.quantities): Q = self.values[name] if ratio > 0: q[i] = Q[self.index, point_id] +\ ratio*(Q[self.index+1, point_id] -\ Q[self.index, point_id]) else: q[i] = Q[self.index, point_id] #Return vector of interpolated values return q class File_function_ASCII: """Read time series from file and return a callable object: f(t,x,y) which will return interpolated values based on the input file. The file format is assumed to be either two fields separated by a comma: time [DD/MM/YY hh:mm:ss], value0 value1 value2 ... or four comma separated fields time [DD/MM/YY hh:mm:ss], x, y, value0 value1 value2 ... In either case, the callable object will return a tuple of interpolated values, one each value stated in the file. E.g 31/08/04 00:00:00, 1.328223 0 0 31/08/04 00:15:00, 1.292912 0 0 will provide a time dependent function f(t,x=None,y=None), ignoring x and y, which returns three values per call. NOTE: At this stage the function is assumed to depend on time only, i.e no spatial dependency!!!!! When that is needed we can use the least_squares interpolation. #FIXME: This should work with netcdf (e.g. sww) and thus render the #spatio-temporal boundary condition in shallow water fully general #FIXME: Specified quantites not used here - #always return whatever is in the file """ def __init__(self, filename, domain=None, quantities = None, interpolation_points=None): """Initialise callable object from a file. See docstring for class File_function If domain is specified, model time (domain,starttime) will be checked and possibly modified All times are assumed to be in UTC """ import time, calendar from Numeric import array from config import time_format fid = open(filename) line = fid.readline() fid.close() fields = line.split(',') msg = 'File %s must have the format date, value0 value1 value2 ...' msg += ' or date, x, y, value0 value1 value2 ...' mode = len(fields) assert mode in [2,4], msg try: starttime = calendar.timegm(time.strptime(fields[0], time_format)) except ValueError: msg = 'First field in file %s must be' %filename msg += ' date-time with format %s.\n' %time_format msg += 'I got %s instead.' %fields[0] raise msg #Split values values = [] for value in fields[mode-1].split(): values.append(float(value)) q = array(values) msg = 'ERROR: File must contain at least one independent value' assert len(q.shape) == 1, msg self.number_of_values = len(q) self.filename = filename self.starttime = starttime self.domain = domain if domain is not None: msg = 'WARNING: Start time as specified in domain (%s)'\ %domain.starttime msg += ' is earlier than the starttime of file %s: %s.'\ %(self.filename, self.starttime) msg += 'Modifying starttime accordingly.' if self.starttime > domain.starttime: #FIXME: Print depending on some verbosity setting #print msg domain.starttime = self.starttime #Modifying model time #if domain.starttime is None: # domain.starttime = self.starttime #else: # msg = 'WARNING: Start time as specified in domain (%s)'\ # %domain.starttime # msg += ' is earlier than the starttime of file %s: %s.'\ # %(self.filename, self.starttime) # msg += 'Modifying starttime accordingly.' # if self.starttime > domain.starttime: # #FIXME: Print depending on some verbosity setting # #print msg # domain.starttime = self.starttime #Modifying model time if mode == 2: self.read_times() #Now read all times in. else: raise 'x,y dependency not yet implemented' def read_times(self): """Read time and values """ from Numeric import zeros, Float, alltrue from config import time_format import time, calendar fid = open(self.filename) lines = fid.readlines() fid.close() N = len(lines) d = self.number_of_values T = zeros(N, Float) #Time Q = zeros((N, d), Float) #Values for i, line in enumerate(lines): fields = line.split(',') realtime = calendar.timegm(time.strptime(fields[0], time_format)) T[i] = realtime - self.starttime for j, value in enumerate(fields[1].split()): Q[i, j] = float(value) msg = 'File %s must list time as a monotonuosly ' %self.filename msg += 'increasing sequence' assert alltrue( T[1:] - T[:-1] > 0 ), msg self.T = T #Time self.Q = Q #Values self.index = 0 #Initial index def __repr__(self): return 'File function' def __call__(self, t, x=None, y=None, point_id=None): """Evaluate f(t,x,y) FIXME: x, y dependency not yet implemented except that result is a vector of same length as x and y FIXME: Naaaa FIXME: Rethink semantics when x,y are vectors. """ from math import pi, cos, sin, sqrt #Find time tau such that # # domain.starttime + t == self.starttime + tau if self.domain is not None: tau = self.domain.starttime-self.starttime+t else: tau = t msg = 'Time interval derived from file %s (%s:%s) does not match model time: %s'\ %(self.filename, self.T[0], self.T[1], tau) if tau < self.T[0]: raise msg if tau > self.T[-1]: raise msg oldindex = self.index #Find slot while tau > self.T[self.index]: self.index += 1 while tau < self.T[self.index]: self.index -= 1 #t is now between index and index+1 ratio = (tau - self.T[self.index])/\ (self.T[self.index+1] - self.T[self.index]) #Compute interpolated values q = self.Q[self.index,:] +\ ratio*(self.Q[self.index+1,:] - self.Q[self.index,:]) #Return vector of interpolated values if x == None and y == None: return q else: try: N = len(x) except: return q else: from Numeric import ones, Float #x is a vector - Create one constant column for each value N = len(x) assert len(y) == N, 'x and y must have same length' res = [] for col in q: res.append(col*ones(N, Float)) return res #FIXME: TEMP class File_function_copy: """Read time series from file and return a callable object: f(t,x,y) which will return interpolated values based on the input file. The file format is assumed to be either two fields separated by a comma: time [DD/MM/YY hh:mm:ss], value0 value1 value2 ... or four comma separated fields time [DD/MM/YY hh:mm:ss], x, y, value0 value1 value2 ... In either case, the callable object will return a tuple of interpolated values, one each value stated in the file. E.g 31/08/04 00:00:00, 1.328223 0 0 31/08/04 00:15:00, 1.292912 0 0 will provide a time dependent function f(t,x=None,y=None), ignoring x and y, which returns three values per call. NOTE: At this stage the function is assumed to depend on time only, i.e no spatial dependency!!!!! When that is needed we can use the least_squares interpolation. #FIXME: This should work with netcdf (e.g. sww) and thus render the #spatio-temporal boundary condition in shallow water fully general """ def __init__(self, filename, domain=None): """Initialise callable object from a file. See docstring for class File_function If domain is specified, model time (domain,starttime) will be checked and possibly modified All times are assumed to be in UTC """ import time, calendar from Numeric import array from config import time_format assert type(filename) == type(''),\ 'First argument to File_function must be a string' try: fid = open(filename) except Exception, e: msg = 'File "%s" could not be opened: Error="%s"'\ %(filename, e) raise msg line = fid.readline() fid.close() fields = line.split(',') msg = 'File %s must have the format date, value0 value1 value2 ...' msg += ' or date, x, y, value0 value1 value2 ...' mode = len(fields) assert mode in [2,4], msg try: starttime = calendar.timegm(time.strptime(fields[0], time_format)) except ValueError: msg = 'First field in file %s must be' %filename msg += ' date-time with format %s.\n' %time_format msg += 'I got %s instead.' %fields[0] raise msg #Split values values = [] for value in fields[mode-1].split(): values.append(float(value)) q = array(values) msg = 'ERROR: File must contain at least one independent value' assert len(q.shape) == 1, msg self.number_of_values = len(q) self.filename = filename self.starttime = starttime self.domain = domain if domain is not None: if domain.starttime is None: domain.starttime = self.starttime else: msg = 'WARNING: Start time as specified in domain (%s)'\ %domain.starttime msg += ' is earlier than the starttime of file %s: %s.'\ %(self.filename, self.starttime) msg += 'Modifying starttime accordingly.' if self.starttime > domain.starttime: #FIXME: Print depending on some verbosity setting #print msg domain.starttime = self.starttime #Modifying model time if mode == 2: self.read_times() #Now read all times in. else: raise 'x,y dependency not yet implemented' def read_times(self): """Read time and values """ from Numeric import zeros, Float, alltrue from config import time_format import time, calendar fid = open(self.filename) lines = fid.readlines() fid.close() N = len(lines) d = self.number_of_values T = zeros(N, Float) #Time Q = zeros((N, d), Float) #Values for i, line in enumerate(lines): fields = line.split(',') realtime = calendar.timegm(time.strptime(fields[0], time_format)) T[i] = realtime - self.starttime for j, value in enumerate(fields[1].split()): Q[i, j] = float(value) msg = 'File %s must list time as a monotonuosly ' %self.filename msg += 'increasing sequence' assert alltrue( T[1:] - T[:-1] > 0 ), msg self.T = T #Time self.Q = Q #Values self.index = 0 #Initial index def __repr__(self): return 'File function' def __call__(self, t, x=None, y=None): """Evaluate f(t,x,y) FIXME: x, y dependency not yet implemented except that result is a vector of same length as x and y FIXME: Naaaa """ from math import pi, cos, sin, sqrt #Find time tau such that # # domain.starttime + t == self.starttime + tau if self.domain is not None: tau = self.domain.starttime-self.starttime+t else: tau = t msg = 'Time interval derived from file %s (%s:%s) does not match model time: %s'\ %(self.filename, self.T[0], self.T[1], tau) if tau < self.T[0]: raise msg if tau > self.T[-1]: raise msg oldindex = self.index #Find slot while tau > self.T[self.index]: self.index += 1 while tau < self.T[self.index]: self.index -= 1 #t is now between index and index+1 ratio = (tau - self.T[self.index])/\ (self.T[self.index+1] - self.T[self.index]) #Compute interpolated values q = self.Q[self.index,:] +\ ratio*(self.Q[self.index+1,:] - self.Q[self.index,:]) #Return vector of interpolated values if x == None and y == None: return q else: try: N = len(x) except: return q else: from Numeric import ones, Float #x is a vector - Create one constant column for each value N = len(x) assert len(y) == N, 'x and y must have same length' res = [] for col in q: res.append(col*ones(N, Float)) return res def read_xya(filename, format = 'netcdf'): """Read simple xya file, possibly with a header in the first line, just x y [attributes] separated by whitespace Format can be either ASCII or NetCDF Return list of points, list of attributes and attribute names if present otherwise None """ #FIXME: Probably obsoleted by similar function in load_ASCII from Scientific.IO.NetCDF import NetCDFFile if format.lower() == 'netcdf': #Get NetCDF fid = NetCDFFile(filename, 'r') # Get the variables points = fid.variables['points'] keys = fid.variables.keys() attributes = {} for key in keys: attributes[key] = fid.variables[key] #Don't close - arrays are needed outside this function, #alternatively take a copy here else: #Read as ASCII file assuming that it is separated by whitespace fid = open(filename) lines = fid.readlines() fid.close() #Check if there is a header line fields = lines[0].strip().split() try: float(fields[0]) except: #This must be a header line attribute_names = fields lines = lines[1:] else: attribute_names = ['elevation'] #HACK attributes = {} for key in attribute_names: attributes[key] = [] points = [] for line in lines: fields = line.strip().split() points.append( (float(fields[0]), float(fields[1])) ) for i, key in enumerate(attribute_names): attributes[key].append( float(fields[2+i]) ) return points, attributes ##################################### #POLYGON STUFF # #FIXME: ALl these should be put into new module polygon.py def inside_polygon(point, polygon, closed = True): """Determine whether points are inside or outside a polygon Input: point - Tuple of (x, y) coordinates, or list of tuples polygon - list of vertices of polygon closed - (optional) determine whether points on boundary should be regarded as belonging to the polygon (closed = True) or not (closed = False) Output: If one point is considered, True or False is returned. If multiple points are passed in, the function returns indices of those points that fall inside the polygon Examples: inside_polygon( [0.5, 0.5], [[0,0], [1,0], [1,1], [0,1]] ) will evaluate to True as the point 0.5, 0.5 is inside the unit square inside_polygon( [[0.5, 0.5], [1, -0.5], [0.3, 0.2]] ) will return the indices [0, 2] as only the first and the last point is inside the unit square Remarks: The vertices may be listed clockwise or counterclockwise and the first point may optionally be repeated. Polygons do not need to be convex. Polygons can have holes in them and points inside a hole is regarded as being outside the polygon. Algorithm is based on work by Darel Finley, http://www.alienryderflex.com/polygon/ """ from Numeric import array, Float, reshape #Input checks try: point = array(point).astype(Float) except: msg = 'Point %s could not be converted to Numeric array' %point raise msg try: polygon = array(polygon).astype(Float) except: msg = 'Polygon %s could not be converted to Numeric array' %polygon raise msg assert len(polygon.shape) == 2,\ 'Polygon array must be a 2d array of vertices: %s' %polygon assert polygon.shape[1] == 2,\ 'Polygon array must have two columns: %s' %polygon if len(point.shape) == 1: one_point = True points = reshape(point, (1,2)) else: one_point = False points = point N = polygon.shape[0] #Number of vertices in polygon px = polygon[:,0] py = polygon[:,1] #Begin algorithm indices = [] for k in range(points.shape[0]): #for each point x = points[k, 0] y = points[k, 1] inside = False for i in range(N): j = (i+1)%N #Check for case where point is contained in line segment if point_on_line(x, y, px[i], py[i], px[j], py[j]): if closed: inside = True else: inside = False break else: #Check if truly inside polygon if py[i] < y and py[j] >= y or\ py[j] < y and py[i] >= y: if px[i] + (y-py[i])/(py[j]-py[i])*(px[j]-px[i]) < x: inside = not inside if inside: indices.append(k) if one_point: return inside else: return indices class Polygon_function: """Create callable object f: x,y -> z, where a,y,z are vectors and where f will return different values depending on whether x,y belongs to specified polygons. To instantiate: Polygon_function(polygons) where polygons is a dictionary of the form {P0: v0, P1: v1, ...} with Pi being lists of vertices defining polygons and vi either constants or functions of x,y to be applied to points with the polygon. The function takes an optional argument, default which is the value (or function) to used for points not belonging to any polygon. For example: Polygon_function(polygons, default = 0.03) If omitted the default value will be 0.0 Note: If two polygons overlap, the one last in the list takes precedence """ def __init__(self, regions, default = 0.0): try: len(regions) except: msg = 'Polygon_function takes a list of pairs (polygon, value). Got %s' %polygons raise msg T = regions[0] try: a = len(T) except: msg = 'Polygon_function takes a list of pairs (polygon, value). Got %s' %polygons raise msg assert a == 2, 'Must have two component each: %s' %T self.regions = regions self.default = default def __call__(self, x, y): from util import inside_polygon from Numeric import ones, Float, concatenate, array, reshape, choose x = array(x).astype(Float) y = array(y).astype(Float) N = len(x) assert len(y) == N points = concatenate( (reshape(x, (N, 1)), reshape(y, (N, 1))), axis=1 ) if callable(self.default): z = self.default(x,y) else: z = ones(N, Float) * self.default for polygon, value in self.regions: indices = inside_polygon(points, polygon) #FIXME: This needs to be vectorised if callable(value): for i in indices: xx = array([x[i]]) yy = array([y[i]]) z[i] = value(xx, yy)[0] else: for i in indices: z[i] = value return z def read_polygon(filename): """Read points assumed to form a polygon There must be exactly two numbers in each line """ #Get polygon fid = open(filename) lines = fid.readlines() fid.close() polygon = [] for line in lines: fields = line.split(',') polygon.append( [float(fields[0]), float(fields[1])] ) return polygon def populate_polygon(polygon, number_of_points, seed = None): """Populate given polygon with uniformly distributed points. Input: polygon - list of vertices of polygon number_of_points - (optional) number of points seed - seed for random number generator (default=None) Output: points - list of points inside polygon Examples: populate_polygon( [[0,0], [1,0], [1,1], [0,1]], 5 ) will return five randomly sleected points inside the unit square """ from random import uniform, seed seed(seed) points = [] #Find outer extent of polygon max_x = min_x = polygon[0][0] max_y = min_y = polygon[0][1] for point in polygon[1:]: x = point[0] if x > max_x: max_x = x if x < min_x: min_x = x y = point[1] if y > max_y: max_y = y if y < min_y: min_y = y while len(points) < number_of_points: x = uniform(min_x, max_x) y = uniform(min_y, max_y) if inside_polygon( [x,y], polygon ): points.append([x,y]) return points #################################################################### #Python versions of function that are also implemented in util_gateway.c # def gradient_python(x0, y0, x1, y1, x2, y2, q0, q1, q2): """ """ det = (y2-y0)*(x1-x0) - (y1-y0)*(x2-x0) a = (y2-y0)*(q1-q0) - (y1-y0)*(q2-q0) a /= det b = (x1-x0)*(q2-q0) - (x2-x0)*(q1-q0) b /= det return a, b ############################################## #Initialise module import compile if compile.can_use_C_extension('util_ext.c'): from util_ext import gradient, point_on_line else: gradient = gradient_python if __name__ == "__main__": pass