"""This module contains various auxiliary function used by pyvolution. It is also a clearing house for functions that may later earn a module of their own. """ #FIXME: Deprecate this shortcut from utilities.numerical_tools import ensure_numeric import utilities.polygon from warnings import warn def point_on_line(*args, **kwargs): """Temporary Interface to new location""" msg = 'point_on_line has moved from util.py. ' msg += 'Please use "from utilities.polygon import point_on_line"' warn(msg, DeprecationWarning) return utilities.polygon.point_on_line(*args, **kwargs) def inside_polygon(*args, **kwargs): """Temporary Interface to new location""" print 'inside_polygon has moved from util.py. ', print 'Please use "from utilities.polygon import inside_polygon"' return utilities.polygon.inside_polygon(*args, **kwargs) def outside_polygon(*args, **kwargs): """Temporary Interface to new location""" print 'outside_polygon has moved from util.py. ', print 'Please use "from utilities.polygon import outside_polygon"' return utilities.polygon.outside_polygon(*args, **kwargs) def separate_points_by_polygon(*args, **kwargs): """Temporary Interface to new location""" print 'separate_points_by_polygon has moved from util.py. ', print 'Please use "from utilities.polygon import separate_points_by_polygon"' return utilities.polygon.separate_points_by_polygon(*args, **kwargs) from utilities.polygon import Polygon_function #No warning def read_polygon(*args, **kwargs): """Temporary Interface to new location""" print 'read_polygon has moved from util.py. ', print 'Please use "from utilities.polygon import read_polygon"' return utilities.polygon.read_polygon(*args, **kwargs) def populate_polygon(*args, **kwargs): """Temporary Interface to new location""" print 'populate_polygon has moved from util.py. ', print 'Please use "from utilities.polygon import populate_polygon"' return utilities.polygon.populate_polygon(*args, **kwargs) def read_xya(filename): """Read simple xya file, possibly with a header in the first line, just x y [attributes] separated by whitespace Format can be either ASCII or NetCDF Return list of points, list of attributes and attribute names if present otherwise None """ print "read_xya is obsolete. Use import_points_file from load_mesh.loadASCII" #FIXME: Probably obsoleted by similar function in load_ASCII #FIXME: Name read_data_points (or see 'load' in pylab) from load_mesh.loadASCII import import_points_file points_dict = import_points_file(filename) return points_dict['pointlist'], points_dict['attributelist'] #Normal util stuff def angle(v): """Compute angle between e1 (the unit vector in the x-direction) and the specified vector """ from math import acos, pi, sqrt from Numeric import sum, array l = sqrt( sum (array(v)**2)) v1 = v[0]/l v2 = v[1]/l theta = acos(v1) #try: # theta = acos(v1) #except ValueError, e: # print 'WARNING (util.py): Angle acos(%s) failed: %s' %(str(v1), e) # # #FIXME, hack to avoid acos(1.0) Value error # # why is it happening? # # is it catching something we should avoid? # #FIXME (Ole): When did this happen? We need a unit test to # #reveal this condition or otherwise remove the hack # # s = 1e-6 # if (v1+s > 1.0) and (v1-s < 1.0) : # theta = 0.0 # elif (v1+s > -1.0) and (v1-s < -1.0): # theta = 3.1415926535897931 # print 'WARNING (util.py): angle v1 is %f, setting acos to %f '\ # %(v1, theta) if v2 < 0: #Quadrant 3 or 4 theta = 2*pi-theta return theta def anglediff(v0, v1): """Compute difference between angle of vector x0, y0 and x1, y1. This is used for determining the ordering of vertices, e.g. for checking if they are counter clockwise. Always return a positive value """ from math import pi a0 = angle(v0) a1 = angle(v1) #Ensure that difference will be positive if a0 < a1: a0 += 2*pi return a0-a1 def mean(x): from Numeric import sum return sum(x)/len(x) def file_function(filename, domain = None, quantities = None, interpolation_points = None, verbose = False, use_cache = False): """Read time history of spatial data from NetCDF file and return a callable object. Input variables: filename - Name of sww or tms file If the file has extension 'sww' then it is assumed to be spatio-temporal or temporal and the callable object will have the form f(t,x,y) or f(t) depending on whether the file contains spatial data If the file has extension 'tms' then it is assumed to be temporal only and the callable object will have the form f(t) Either form will return interpolated values based on the input file using the underlying interpolation_function. domain - Associated domain object If domain is specified, model time (domain.starttime) will be checked and possibly modified. All times are assumed to be in UTC All spatial information is assumed to be in absolute UTM coordinates. quantities - the name of the quantity to be interpolated or a list of quantity names. The resulting function will return a tuple of values - one for each quantity interpolation_points - list of absolute UTM coordinates for points at which values are sought use_cache: True means that caching of intermediate result of Interpolation_function is attempted See Interpolation function for further documentation """ if use_cache is True: try: from caching import cache except: msg = 'Caching was requested, but caching module'+\ 'could not be imported' raise msg f = cache(_file_function, filename, {'domain': domain, 'quantities': quantities, 'interpolation_points': interpolation_points, 'verbose': verbose}, dependencies = [filename], compression = False, verbose = verbose) #FIXME (Ole): Pass cache arguments, such as compression, in some sort of #structure else: f = _file_function(filename, domain, quantities, interpolation_points, verbose) return f def _file_function(filename, domain = None, quantities = None, interpolation_points = None, verbose = False): """Internal function See file_function for documentatiton """ #FIXME (OLE): Should check origin of domain against that of file #In fact, this is where origin should be converted to that of domain #Also, check that file covers domain fully. #If we use the suggested Point_set class for interpolation points #here it would be easier #Take into account: #- domain's georef #- sww file's georef #- interpolation points as absolute UTM coordinates assert type(filename) == type(''),\ 'First argument to File_function must be a string' try: fid = open(filename) except Exception, e: msg = 'File "%s" could not be opened: Error="%s"'\ %(filename, e) raise msg line = fid.readline() fid.close() if quantities is None: if domain is not None: quantities = domain.conserved_quantities if line[:3] == 'CDF': return get_netcdf_file_function(filename, domain, quantities, interpolation_points, verbose = verbose) else: raise 'Must be a NetCDF File' def get_netcdf_file_function(filename, domain=None, quantity_names=None, interpolation_points=None, verbose = False): """Read time history of spatial data from NetCDF sww file and return a callable object f(t,x,y) which will return interpolated values based on the input file. If domain is specified, model time (domain.starttime) will be checked and possibly modified All times are assumed to be in UTC See Interpolation function for further documetation """ #FIXME: Check that model origin is the same as file's origin #(both in UTM coordinates) #If not - modify those from file to match domain #Take this code from e.g. dem2pts in data_manager.py #FIXME: Use geo_reference to read and write xllcorner... #FIXME: Maybe move caching out to this level rather than at the #Interpolation_function level (below) import time, calendar, types from config import time_format from Scientific.IO.NetCDF import NetCDFFile from Numeric import array, zeros, Float, alltrue, concatenate, reshape from util import ensure_numeric #Open NetCDF file if verbose: print 'Reading', filename fid = NetCDFFile(filename, 'r') if type(quantity_names) == types.StringType: quantity_names = [quantity_names] if quantity_names is None or len(quantity_names) < 1: #If no quantities are specified get quantities from file #x, y, time are assumed as the independent variables so #they are excluded as potentiol quantities quantity_names = [] for name in fid.variables.keys(): if name not in ['x', 'y', 'time']: quantity_names.append(name) if len(quantity_names) < 1: msg = 'ERROR: At least one independent value must be specified' raise msg if interpolation_points is not None: interpolation_points = ensure_numeric(interpolation_points, Float) #Now assert that requested quantitites (and the independent ones) #are present in file missing = [] for quantity in ['time'] + quantity_names: if not fid.variables.has_key(quantity): missing.append(quantity) if len(missing) > 0: msg = 'Quantities %s could not be found in file %s'\ %(str(missing), filename) fid.close() raise msg #Decide whether this data has a spatial dimension spatial = True for quantity in ['x', 'y']: if not fid.variables.has_key(quantity): spatial = False if filename[-3:] == 'tms' and spatial is True: msg = 'Files of type tms must not contain spatial information' raise msg if filename[-3:] == 'sww' and spatial is False: msg = 'Files of type sww must contain spatial information' raise msg #Get first timestep try: starttime = fid.starttime[0] except ValueError: msg = 'Could not read starttime from file %s' %filename raise msg #Get variables if verbose: print 'Get variables' time = fid.variables['time'][:] if spatial: #Get origin xllcorner = fid.xllcorner[0] yllcorner = fid.yllcorner[0] zone = fid.zone[0] x = fid.variables['x'][:] y = fid.variables['y'][:] triangles = fid.variables['volumes'][:] x = reshape(x, (len(x),1)) y = reshape(y, (len(y),1)) vertex_coordinates = concatenate((x,y), axis=1) #m x 2 array if interpolation_points is not None: #Adjust for georef interpolation_points[:,0] -= xllcorner interpolation_points[:,1] -= yllcorner if domain is not None: #Update domain.startime if it is *earlier* than starttime if starttime > domain.starttime: msg = 'WARNING: Start time as specified in domain (%f)'\ %domain.starttime msg += ' is earlier than the starttime of file %s (%f).'\ %(filename, starttime) msg += ' Modifying domain starttime accordingly.' if verbose: print msg domain.starttime = starttime #Modifying model time if verbose: print 'Domain starttime is now set to %f'\ %domain.starttime #If domain.startime is *later* than starttime, #move time back - relative to domain's time if domain.starttime > starttime: time = time - domain.starttime + starttime #FIXME Use method in geo to reconcile #if spatial: #assert domain.geo_reference.xllcorner == xllcorner #assert domain.geo_reference.yllcorner == yllcorner #assert domain.geo_reference.zone == zone if verbose: print 'File_function data obtained from: %s' %filename print ' References:' #print ' Datum ....' #FIXME if spatial: print ' Lower left corner: [%f, %f]'\ %(xllcorner, yllcorner) print ' Start time: %f' %starttime #Produce values for desired data points at #each timestep quantities = {} for i, name in enumerate(quantity_names): quantities[name] = fid.variables[name][:] fid.close() from least_squares import Interpolation_function if not spatial: vertex_coordinates = triangles = interpolation_points = None return Interpolation_function(time, quantities, quantity_names, vertex_coordinates, triangles, interpolation_points, verbose = verbose) def tsh2sww(filename, verbose=True): """ to check if a tsh/msh file 'looks' good. """ #FIXME Move to data_manager from shallow_water import Domain from pmesh2domain import pmesh_to_domain_instance import time, os from data_manager import get_dataobject from os import sep, path #from util import mean if verbose == True:print 'Creating domain from', filename domain = pmesh_to_domain_instance(filename, Domain) if verbose == True:print "Number of triangles = ", len(domain) domain.smooth = True domain.format = 'sww' #Native netcdf visualisation format file_path, filename = path.split(filename) filename, ext = path.splitext(filename) domain.filename = filename domain.reduction = mean if verbose == True:print "file_path",file_path if file_path == "":file_path = "." domain.set_datadir(file_path) if verbose == True: print "Output written to " + domain.get_datadir() + sep + \ domain.filename + "." + domain.format sww = get_dataobject(domain) sww.store_connectivity() sww.store_timestep('stage') def multiple_replace(text, dictionary): """Multiple replace of words in text text: String to be modified dictionary: Mapping of words that are to be substituted Python Cookbook 3.14 page 88 and page 90 """ import re #Create a regular expression from all of the dictionary keys #matching only entire words regex = re.compile(r'\b'+ \ r'\b|\b'.join(map(re.escape, dictionary.keys()))+ \ r'\b' ) #For each match, lookup the corresponding value in the dictionary return regex.sub(lambda match: dictionary[match.group(0)], text) def apply_expression_to_dictionary(expression, dictionary):#dictionary): """Apply arbitrary expression to values of dictionary Given an expression in terms of the keys, replace key by the corresponding values and evaluate. expression: Arbitrary, e.g. arithmetric, expression relating keys from dictionary. dictionary: Mapping between symbols in expression and objects that will be evaluated by expression. Values in dictionary must support operators given in expression e.g. by overloading """ import types import re assert isinstance(expression, basestring) assert type(dictionary) == types.DictType #Convert dictionary values to textual representations suitable for eval D = {} for key in dictionary: D[key] = 'dictionary["%s"]' %key #Perform substitution of variables expression = multiple_replace(expression, D) #Evaluate and return try: return eval(expression) except NameError, e: msg = 'Expression "%s" could not be evaluated: %s' %(expression, e) raise NameError, msg #################################################################### #Python versions of function that are also implemented in util_gateway.c # def gradient_python(x0, y0, x1, y1, x2, y2, q0, q1, q2): """ """ det = (y2-y0)*(x1-x0) - (y1-y0)*(x2-x0) a = (y2-y0)*(q1-q0) - (y1-y0)*(q2-q0) a /= det b = (x1-x0)*(q2-q0) - (x2-x0)*(q1-q0) b /= det return a, b def gradient2_python(x0, y0, x1, y1, q0, q1): """Compute radient based on two points and enforce zero gradient in the direction orthogonal to (x1-x0), (y1-y0) """ #Old code #det = x0*y1 - x1*y0 #if det != 0.0: # a = (y1*q0 - y0*q1)/det # b = (x0*q1 - x1*q0)/det #Correct code (ON) det = (x1-x0)**2 + (y1-y0)**2 if det != 0.0: a = (x1-x0)*(q1-q0)/det b = (y1-y0)*(q1-q0)/det return a, b ############################################## #Initialise module from utilities import compile if compile.can_use_C_extension('util_ext.c'): from util_ext import gradient, gradient2#, point_on_line #separate_points_by_polygon = separate_points_by_polygon_c else: gradient = gradient_python gradient2 = gradient2_python if __name__ == "__main__": pass