"""Class Quantity - Implements values at each triangular element To create: Quantity(domain, vertex_values) domain: Associated domain structure. Required. vertex_values: N x 3 array of values at each vertex for each element. Default None If vertex_values are None Create array of zeros compatible with domain. Otherwise check that it is compatible with dimenions of domain. Otherwise raise an exception """ class Quantity: def __init__(self, domain, vertex_values=None): from mesh import Mesh from Numeric import array, zeros, Float msg = 'First argument in Quantity.__init__ ' msg += 'must be of class Mesh (or a subclass thereof)' assert isinstance(domain, Mesh), msg if vertex_values is None: N = domain.number_of_elements self.vertex_values = zeros((N, 3), Float) else: self.vertex_values = array(vertex_values).astype(Float) N, V = self.vertex_values.shape assert V == 3,\ 'Three vertex values per element must be specified' msg = 'Number of vertex values (%d) must be consistent with'\ %N msg += 'number of elements in specified domain (%d).'\ %domain.number_of_elements assert N == domain.number_of_elements, msg self.domain = domain #Allocate space for other quantities self.centroid_values = zeros(N, Float) self.edge_values = zeros((N, 3), Float) #Intialise centroid and edge_values self.interpolate() def __len__(self): return self.centroid_values.shape[0] def interpolate(self): """Compute interpolated values at edges and centroid Pre-condition: vertex_values have been set """ N = self.vertex_values.shape[0] for i in range(N): v0 = self.vertex_values[i, 0] v1 = self.vertex_values[i, 1] v2 = self.vertex_values[i, 2] self.centroid_values[i] = (v0 + v1 + v2)/3 self.interpolate_from_vertices_to_edges() def interpolate_from_vertices_to_edges(self): #Call correct module function #(either from this module or C-extension) interpolate_from_vertices_to_edges(self) def set_values(self, X, location='vertices', indexes = None): """Set values for quantity X: Compatible list, Numeric array (see below), constant or function location: Where values are to be stored. Permissible options are: vertices, edges, centroids Default is "vertices" In case of location == 'centroids' the dimension values must be a list of a Numerical array of length N, N being the number of elements. Otherwise it must be of dimension Nx3 The values will be stored in elements following their internal ordering. If values are described a function, it will be evaluated at specified points If indexex is not 'unique vertices' Indexes is the set of element ids that the operation applies to. If indexex is 'unique vertices' Indexes is the set of vertex ids that the operation applies to. If selected location is vertices, values for centroid and edges will be assigned interpolated values. In any other case, only values for the specified locations will be assigned and the others will be left undefined. """ if location not in ['vertices', 'centroids', 'edges', 'unique vertices']: msg = 'Invalid location: %s' %location raise msg if X is None: msg = 'Given values are None' raise msg import types, Numeric assert type(indexes) in [types.ListType, types.NoneType, Numeric.ArrayType],\ 'Indices must be a list or None' if callable(X): #Use function specific method self.set_function_values(X, location, indexes = indexes) elif type(X) in [types.FloatType, types.IntType, types.LongType]: if location == 'centroids': if (indexes == None): self.centroid_values[:] = X else: #Brute force for i in indexes: self.centroid_values[i,:] = X elif location == 'edges': if (indexes == None): self.edge_values[:] = X else: #Brute force for i in indexes: self.edge_values[i,:] = X elif location == 'unique vertices': if (indexes == None): self.edge_values[:] = X else: #Go through list of unique vertices for unique_vert_id in indexes: triangles = self.domain.vertexlist[unique_vert_id] #In case there are unused points if triangles is None: continue #Go through all triangle, vertex pairs #and set corresponding vertex value for triangle_id, vertex_id in triangles: self.vertex_values[triangle_id, vertex_id] = X #Intialise centroid and edge_values self.interpolate() else: if (indexes == None): self.vertex_values[:] = X else: #Brute force for i_vertex in indexes: self.vertex_values[i_vertex,:] = X else: #Use array specific method self.set_array_values(X, location, indexes = indexes) if location == 'vertices' or location == 'unique vertices': #Intialise centroid and edge_values self.interpolate() if location == 'centroids': #Extrapolate 1st order - to capture notion of area being specified self.extrapolate_first_order() def get_values(self, location='vertices', indexes = None): """get values for quantity return X, Compatible list, Numeric array (see below) location: Where values are to be stored. Permissible options are: vertices, edges, centroid Default is "vertices" In case of location == 'centroid' the dimension values must be a list of a Numerical array of length N, N being the number of elements. Otherwise it must be of dimension Nx3 The returned values with be a list the length of indexes (N if indexes = None). Each value will be a list of the three vertex values for this quantity. Indexes is the set of element ids that the operation applies to. """ from Numeric import take if location not in ['vertices', 'centroids', 'edges', 'unique vertices']: msg = 'Invalid location: %s' %location raise msg import types, Numeric assert type(indexes) in [types.ListType, types.NoneType, Numeric.ArrayType],\ 'Indices must be a list or None' if location == 'centroids': if (indexes == None): indexes = range(len(self)) return take(self.centroid_values,indexes) elif location == 'edges': if (indexes == None): indexes = range(len(self)) return take(self.edge_values,indexes) elif location == 'unique vertices': if (indexes == None): indexes=range(self.domain.coordinates.shape[0]) vert_values = [] #Go through list of unique vertices for unique_vert_id in indexes: triangles = self.domain.vertexlist[unique_vert_id] #In case there are unused points if triangles is None: msg = 'Unique vertex not associated with triangles' raise msg # Go through all triangle, vertex pairs # Average the values sum = 0 for triangle_id, vertex_id in triangles: sum += self.vertex_values[triangle_id, vertex_id] vert_values.append(sum/len(triangles)) return Numeric.array(vert_values) else: if (indexes == None): indexes = range(len(self)) return take(self.vertex_values,indexes) def set_function_values(self, f, location='vertices', indexes = None): """Set values for quantity using specified function f: x, y -> z Function where x, y and z are arrays location: Where values are to be stored. Permissible options are: vertices, centroid Default is "vertices" """ #FIXME: Should check that function returns something sensible and #raise a meaningfol exception if it returns None for example from Numeric import take if (indexes == None): indexes = range(len(self)) is_subset = False else: is_subset = True if location == 'centroids': P = take(self.domain.centroid_coordinates,indexes) if is_subset: self.set_values(f(P[:,0], P[:,1]), location, indexes = indexes) else: self.set_values(f(P[:,0], P[:,1]), location) elif location == 'vertices': P = self.domain.vertex_coordinates if is_subset: #Brute force for e in indexes: for i in range(3): self.vertex_values[e,i] = f(P[e,2*i], P[e,2*i+1]) else: for i in range(3): self.vertex_values[:,i] = f(P[:,2*i], P[:,2*i+1]) else: raise 'Not implemented: %s' %location def set_array_values(self, values, location='vertices', indexes = None): """Set values for quantity values: Numeric array location: Where values are to be stored. Permissible options are: vertices, edges, centroid, unique vertices Default is "vertices" indexes - if this action is carried out on a subset of elements or unique vertices The element/unique vertex indexes are specified here. In case of location == 'centroid' the dimension values must be a list of a Numerical array of length N, N being the number of elements. Otherwise it must be of dimension Nx3 The values will be stored in elements following their internal ordering. If selected location is vertices, values for centroid and edges will be assigned interpolated values. In any other case, only values for the specified locations will be assigned and the others will be left undefined. """ from Numeric import array, Float, Int, allclose values = array(values).astype(Float) if (indexes <> None): indexes = array(indexes).astype(Int) msg = 'Number of values must match number of indexes' assert values.shape[0] == indexes.shape[0], msg N = self.centroid_values.shape[0] if location == 'centroids': assert len(values.shape) == 1, 'Values array must be 1d' if indexes == None: msg = 'Number of values must match number of elements' assert values.shape[0] == N, msg self.centroid_values = values else: msg = 'Number of values must match number of indexes' assert values.shape[0] == indexes.shape[0], msg #Brute force for i in range(len(indexes)): self.centroid_values[indexes[i]] = values[i] elif location == 'edges': assert len(values.shape) == 2, 'Values array must be 2d' msg = 'Number of values must match number of elements' assert values.shape[0] == N, msg msg = 'Array must be N x 3' assert values.shape[1] == 3, msg self.edge_values = values elif location == 'unique vertices': assert len(values.shape) == 1 or allclose(values.shape[1:], 1),\ 'Values array must be 1d' self.set_vertex_values(values.flat, indexes = indexes) else: if len(values.shape) == 1: self.set_vertex_values(values, indexes = indexes) #if indexes == None: #Values are being specified once for each unique vertex # msg = 'Number of values must match number of vertices' # assert values.shape[0] == self.domain.coordinates.shape[0], msg # self.set_vertex_values(values) #else: # for element_index, value in map(None, indexes, values): # self.vertex_values[element_index, :] = value elif len(values.shape) == 2: #Vertex values are given as a triplet for each triangle msg = 'Array must be N x 3' assert values.shape[1] == 3, msg if indexes == None: self.vertex_values = values else: for element_index, value in map(None, indexes, values): self.vertex_values[element_index] = value else: msg = 'Values array must be 1d or 2d' raise msg # FIXME have a get_vertex_values as well, so the 'stage' quantity can be # set, based on the elevation def set_vertex_values(self, A, indexes = None): """Set vertex values for all unique vertices based on input array A which has one entry per unique vertex, i.e. one value for each row in array self.domain.coordinates or one value for each row in vertexlist. indexes is the list of vertex_id's that will be set. Note: Functions not allowed """ from Numeric import array, Float #Assert that A can be converted to a Numeric array of appropriate dim A = array(A, Float) #print 'SHAPE A', A.shape assert len(A.shape) == 1 if indexes == None: assert A.shape[0] == self.domain.coordinates.shape[0] vertex_list = range(A.shape[0]) else: assert A.shape[0] == len(indexes) vertex_list = indexes #Go through list of unique vertices for i_index,unique_vert_id in enumerate(vertex_list): triangles = self.domain.vertexlist[unique_vert_id] if triangles is None: continue #In case there are unused points #Go through all triangle, vertex pairs #touching vertex unique_vert_id and set corresponding vertex value for triangle_id, vertex_id in triangles: self.vertex_values[triangle_id, vertex_id] = A[i_index] #Intialise centroid and edge_values self.interpolate() def smooth_vertex_values(self, value_array='field_values', precision = None): """ Smooths field_values or conserved_quantities data. TODO: be able to smooth individual fields NOTE: This function does not have a test. FIXME: NOT DONE - do we need it? FIXME: this function isn't called by anything. Maybe it should be removed..-DSG """ from Numeric import concatenate, zeros, Float, Int, array, reshape A,V = self.get_vertex_values(xy=False, value_array=value_array, smooth = True, precision = precision) #Set some field values for volume in self: for i,v in enumerate(volume.vertices): if value_array == 'field_values': volume.set_field_values('vertex', i, A[v,:]) elif value_array == 'conserved_quantities': volume.set_conserved_quantities('vertex', i, A[v,:]) if value_array == 'field_values': self.precompute() elif value_array == 'conserved_quantities': Volume.interpolate_conserved_quantities() #Method for outputting model results #FIXME: Split up into geometric and numeric stuff. #FIXME: Geometric (X,Y,V) should live in mesh.py #FIXME: STill remember to move XY to mesh def get_vertex_values(self, xy=True, smooth = None, precision = None, reduction = None): """Return vertex values like an OBJ format The vertex values are returned as one sequence in the 1D float array A. If requested the coordinates will be returned in 1D arrays X and Y. The connectivity is represented as an integer array, V, of dimension M x 3, where M is the number of volumes. Each row has three indices into the X, Y, A arrays defining the triangle. if smooth is True, vertex values corresponding to one common coordinate set will be smoothed according to the given reduction operator. In this case vertex coordinates will be de-duplicated. If no smoothings is required, vertex coordinates and values will be aggregated as a concatenation of values at vertices 0, vertices 1 and vertices 2 Calling convention if xy is True: X,Y,A,V = get_vertex_values else: A,V = get_vertex_values """ from Numeric import concatenate, zeros, Float, Int, array, reshape if smooth is None: smooth = self.domain.smooth if precision is None: precision = Float if reduction is None: reduction = self.domain.reduction #Create connectivity if smooth == True: V = self.domain.get_vertices() N = len(self.domain.vertexlist) A = zeros(N, precision) #Smoothing loop for k in range(N): L = self.domain.vertexlist[k] #Go through all triangle, vertex pairs #contributing to vertex k and register vertex value if L is None: continue #In case there are unused points contributions = [] for volume_id, vertex_id in L: v = self.vertex_values[volume_id, vertex_id] contributions.append(v) A[k] = reduction(contributions) if xy is True: X = self.domain.coordinates[:,0].astype(precision) Y = self.domain.coordinates[:,1].astype(precision) return X, Y, A, V else: return A, V else: #Don't smooth #obj machinery moved to general_mesh # Create a V like [[0 1 2], [3 4 5]....[3*m-2 3*m-1 3*m]] # These vert_id's will relate to the verts created below #m = len(self.domain) #Number of volumes #M = 3*m #Total number of unique vertices #V = reshape(array(range(M)).astype(Int), (m,3)) V = self.domain.get_triangles(obj=True) #FIXME use get_vertices, when ready A = self.vertex_values.flat #Do vertex coordinates if xy is True: C = self.domain.get_vertex_coordinates() X = C[:,0:6:2].copy() Y = C[:,1:6:2].copy() return X.flat, Y.flat, A, V else: return A, V def extrapolate_first_order(self): """Extrapolate conserved quantities from centroid to vertices for each volume using first order scheme. """ qc = self.centroid_values qv = self.vertex_values for i in range(3): qv[:,i] = qc def get_integral(self): """Compute the integral of quantity across entire domain """ integral = 0 for k in range(self.domain.number_of_elements): area = self.domain.areas[k] qc = self.centroid_values[k] integral += qc*area return integral class Conserved_quantity(Quantity): """Class conserved quantity adds to Quantity: boundary values, storage and method for updating, and methods for (second order) extrapolation from centroid to vertices inluding gradients and limiters """ def __init__(self, domain, vertex_values=None): Quantity.__init__(self, domain, vertex_values) from Numeric import zeros, Float #Allocate space for boundary values L = len(domain.boundary) self.boundary_values = zeros(L, Float) #Allocate space for updates of conserved quantities by #flux calculations and forcing functions N = domain.number_of_elements self.explicit_update = zeros(N, Float ) self.semi_implicit_update = zeros(N, Float ) def update(self, timestep): #Call correct module function #(either from this module or C-extension) return update(self, timestep) def compute_gradients(self): #Call correct module function #(either from this module or C-extension) return compute_gradients(self) def limit(self): #Call correct module function #(either from this module or C-extension) limit(self) def extrapolate_second_order(self): #Call correct module function #(either from this module or C-extension) extrapolate_second_order(self) def update(quantity, timestep): """Update centroid values based on values stored in explicit_update and semi_implicit_update as well as given timestep Function implementing forcing terms must take on argument which is the domain and they must update either explicit or implicit updates, e,g,: def gravity(domain): .... domain.quantities['xmomentum'].explicit_update = ... domain.quantities['ymomentum'].explicit_update = ... Explicit terms must have the form G(q, t) and explicit scheme is q^{(n+1}) = q^{(n)} + delta_t G(q^{n}, n delta_t) Semi implicit forcing terms are assumed to have the form G(q, t) = H(q, t) q and the semi implicit scheme will then be q^{(n+1}) = q^{(n)} + delta_t H(q^{n}, n delta_t) q^{(n+1}) """ from Numeric import sum, equal, ones, Float N = quantity.centroid_values.shape[0] #Divide H by conserved quantity to obtain G (see docstring above) for k in range(N): x = quantity.centroid_values[k] if x == 0.0: #FIXME: Is this right quantity.semi_implicit_update[k] = 0.0 else: quantity.semi_implicit_update[k] /= x #Explicit updates quantity.centroid_values += timestep*quantity.explicit_update #Semi implicit updates denominator = ones(N, Float)-timestep*quantity.semi_implicit_update if sum(equal(denominator, 0.0)) > 0.0: msg = 'Zero division in semi implicit update. Call Stephen :-)' raise msg else: #Update conserved_quantities from semi implicit updates quantity.centroid_values /= denominator def interpolate_from_vertices_to_edges(quantity): """Compute edge values from vertex values using linear interpolation """ for k in range(quantity.vertex_values.shape[0]): q0 = quantity.vertex_values[k, 0] q1 = quantity.vertex_values[k, 1] q2 = quantity.vertex_values[k, 2] quantity.edge_values[k, 0] = 0.5*(q1+q2) quantity.edge_values[k, 1] = 0.5*(q0+q2) quantity.edge_values[k, 2] = 0.5*(q0+q1) def extrapolate_second_order(quantity): """Extrapolate conserved quantities from centroid to vertices for each volume using second order scheme. """ a, b = quantity.compute_gradients() X = quantity.domain.get_vertex_coordinates() qc = quantity.centroid_values qv = quantity.vertex_values #Check each triangle for k in range(quantity.domain.number_of_elements): #Centroid coordinates x, y = quantity.domain.centroid_coordinates[k] #vertex coordinates x0, y0, x1, y1, x2, y2 = X[k,:] #Extrapolate qv[k,0] = qc[k] + a[k]*(x0-x) + b[k]*(y0-y) qv[k,1] = qc[k] + a[k]*(x1-x) + b[k]*(y1-y) qv[k,2] = qc[k] + a[k]*(x2-x) + b[k]*(y2-y) def compute_gradients(quantity): """Compute gradients of triangle surfaces defined by centroids of neighbouring volumes. If one edge is on the boundary, use own centroid as neighbour centroid. If two or more are on the boundary, fall back to first order scheme. """ from Numeric import zeros, Float from util import gradient centroid_coordinates = quantity.domain.centroid_coordinates surrogate_neighbours = quantity.domain.surrogate_neighbours centroid_values = quantity.centroid_values number_of_boundaries = quantity.domain.number_of_boundaries N = centroid_values.shape[0] a = zeros(N, Float) b = zeros(N, Float) for k in range(N): if number_of_boundaries[k] < 2: #Two or three true neighbours #Get indices of neighbours (or self when used as surrogate) k0, k1, k2 = surrogate_neighbours[k,:] #Get data q0 = centroid_values[k0] q1 = centroid_values[k1] q2 = centroid_values[k2] x0, y0 = centroid_coordinates[k0] #V0 centroid x1, y1 = centroid_coordinates[k1] #V1 centroid x2, y2 = centroid_coordinates[k2] #V2 centroid #Gradient a[k], b[k] = gradient(x0, y0, x1, y1, x2, y2, q0, q1, q2) elif number_of_boundaries[k] == 2: #One true neighbour #Get index of the one neighbour for k0 in surrogate_neighbours[k,:]: if k0 != k: break assert k0 != k k1 = k #self #Get data q0 = centroid_values[k0] q1 = centroid_values[k1] x0, y0 = centroid_coordinates[k0] #V0 centroid x1, y1 = centroid_coordinates[k1] #V1 centroid #Gradient a[k], b[k] = gradient2(x0, y0, x1, y1, q0, q1) else: #No true neighbours - #Fall back to first order scheme pass return a, b def limit(quantity): """Limit slopes for each volume to eliminate artificial variance introduced by e.g. second order extrapolator This is an unsophisticated limiter as it does not take into account dependencies among quantities. precondition: vertex values are estimated from gradient postcondition: vertex values are updated """ from Numeric import zeros, Float N = quantity.domain.number_of_elements beta_w = quantity.domain.beta_w qc = quantity.centroid_values qv = quantity.vertex_values #Find min and max of this and neighbour's centroid values qmax = zeros(qc.shape, Float) qmin = zeros(qc.shape, Float) for k in range(N): qmax[k] = qmin[k] = qc[k] for i in range(3): n = quantity.domain.neighbours[k,i] if n >= 0: qn = qc[n] #Neighbour's centroid value qmin[k] = min(qmin[k], qn) qmax[k] = max(qmax[k], qn) #Diffences between centroids and maxima/minima dqmax = qmax - qc dqmin = qmin - qc #Deltas between vertex and centroid values dq = zeros(qv.shape, Float) for i in range(3): dq[:,i] = qv[:,i] - qc #Phi limiter for k in range(N): #Find the gradient limiter (phi) across vertices phi = 1.0 for i in range(3): r = 1.0 if (dq[k,i] > 0): r = dqmax[k]/dq[k,i] if (dq[k,i] < 0): r = dqmin[k]/dq[k,i] phi = min( min(r*beta_w, 1), phi ) #Then update using phi limiter for i in range(3): qv[k,i] = qc[k] + phi*dq[k,i] import compile if compile.can_use_C_extension('quantity_ext.c'): #Replace python version with c implementations from quantity_ext import limit, compute_gradients,\ extrapolate_second_order, interpolate_from_vertices_to_edges, update