"""Class Domain - 2D triangular domains for finite-volume computations of the shallow water wave equation Copyright 2004 Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou Geoscience Australia """ from mesh import Mesh from generic_boundary_conditions import * import types class Domain(Mesh): def __init__(self, coordinates, vertices, boundary = None, conserved_quantities = None, other_quantities = None, tagged_elements = None): Mesh.__init__(self, coordinates, vertices, boundary, tagged_elements) from Numeric import zeros, Float from quantity import Quantity, Conserved_quantity #List of quantity names entering #the conservation equations #(Must be a subset of quantities) if conserved_quantities is None: self.conserved_quantities = [] else: self.conserved_quantities = conserved_quantities if other_quantities is None: self.other_quantities = [] else: self.other_quantities = other_quantities #Build dictionary of Quantity instances keyed by quantity names self.quantities = {} #FIXME: remove later for name in self.conserved_quantities: self.quantities[name] = Conserved_quantity(self) for name in self.other_quantities: self.quantities[name] = Quantity(self) #Create an empty list for explicit forcing terms self.forcing_terms = [] #Defaults from config import max_smallsteps, beta, epsilon self.beta = beta self.epsilon = epsilon self.default_order = 1 self.order = self.default_order self.smallsteps = 0 self.max_smallsteps = max_smallsteps self.number_of_steps = 0 self.number_of_first_order_steps = 0 #Model time self.time = 0.0 self.finaltime = None self.min_timestep = self.max_timestep = 0.0 self.starttime = None #Checkpointing self.filename = 'domain' self.checkpoint = False #Public interface to Domain def get_conserved_quantities(self, vol_id, vertex=None, edge=None): """Get conserved quantities at volume vol_id If vertex is specified use it as index for vertex values If edge is specified use it as index for edge values If neither are specified use centroid values If both are specified an exeception is raised Return value: Vector of length == number_of_conserved quantities """ from Numeric import zeros, Float if not (vertex is None or edge is None): msg = 'Values for both vertex and edge was specified.' msg += 'Only one (or none) is allowed.' raise msg q = zeros( len(self.conserved_quantities), Float) for i, name in enumerate(self.conserved_quantities): Q = self.quantities[name] if vertex is not None: q[i] = Q.vertex_values[vol_id, vertex] elif edge is not None: q[i] = Q.edge_values[vol_id, edge] else: q[i] = Q.centroid_values[vol_id] return q def set_quantity_vertices_dict(self, quantity_dict): """Set values for named quantities. The index is the quantity name: Name of quantity X: Compatible list, Numeric array, const or function (see below) The values will be stored in elements following their internal ordering. """ for key in quantity_dict.keys(): self.set_quantity(key, quantity_dict[key], location='vertices') def set_quantity(self, name, X, location='vertices', indexes = None): """Set values for named quantity name: Name of quantity X: Compatible list, Numeric array, const or function (see below) location: Where values are to be stored. Permissible options are: vertices, edges, centroid In case of location == 'centroid' the dimension values must be a list of a Numerical array of length N, N being the number of elements. Otherwise it must be of dimension Nx3. Indexes is the set of element ids that the operation applies to The values will be stored in elements following their internal ordering. """ #from quantity import Quantity, Conserved_quantity #Create appropriate quantity object ##if name in self.conserved_quantities: ## self.quantities[name] = Conserved_quantity(self) ##else: ## self.quantities[name] = Quantity(self) #Set value self.quantities[name].set_values(X, location, indexes = indexes) def get_quantity(self, name, location='vertices', indexes = None): """Get values for named quantity name: Name of quantity In case of location == 'centroid' the dimension values must be a list of a Numerical array of length N, N being the number of elements. Otherwise it must be of dimension Nx3. Indexes is the set of element ids that the operation applies to. The values will be stored in elements following their internal ordering. """ return self.quantities[name].get_values( location, indexes = indexes) def set_boundary(self, boundary_map): """Associate boundary objects with tagged boundary segments. Input boundary_map is a dictionary of boundary objects keyed by symbolic tags to matched against tags in the internal dictionary self.boundary. As result one pointer to a boundary object is stored for each vertex in the list self.boundary_objects. More entries may point to the same boundary object Schematically the mapping is from two dictionaries to one list where the index is used as pointer to the boundary_values arrays within each quantity. self.boundary: (vol_id, edge_id): tag boundary_map (input): tag: boundary_object ---------------------------------------------- self.boundary_objects: ((vol_id, edge_id), boundary_object) Pre-condition: self.boundary has been built. Post-condition: self.boundary_objects is built If a tag from the domain doesn't appear in the input dictionary an exception is raised. However, if a tag is not used to the domain, no error is thrown. FIXME: This would lead to implementation of a default boundary condition Note: If a segment is listed in the boundary dictionary and if it is not None, it *will* become a boundary - even if there is a neighbouring triangle. This would be the case for internal boundaries Boundary objects that are None will be skipped. FIXME: If set_boundary is called multiple times and if Boundary object is changed into None, the neighbour structure will not be restored!!! """ self.boundary_objects = [] #FIXME: Try to remove the sorting and fix test_mesh.py x = self.boundary.keys() x.sort() for k, (vol_id, edge_id) in enumerate(x): tag = self.boundary[ (vol_id, edge_id) ] if boundary_map.has_key(tag): B = boundary_map[tag] if B is not None: self.boundary_objects.append( ((vol_id, edge_id), B) ) self.neighbours[vol_id, edge_id] = -len(self.boundary_objects) else: pass #FIXME: Check and perhaps fix neighbour structure else: msg = 'ERROR (domain.py): Tag "%s" has not been ' %tag msg += 'bound to a boundary object.\n' msg += 'All boundary tags defined in domain must appear ' msg += 'in the supplied dictionary.\n' msg += 'The tags are: %s' %self.get_boundary_tags() raise msg def set_region(self, functions): # The order of functions in the list is used. if type(functions) not in [types.ListType,types.TupleType]: functions = [functions] for function in functions: for tag in self.tagged_elements.keys(): function(tag, self.tagged_elements[tag], self) #Do we need to do this sort of thing? #self = function(tag, self.tagged_elements[tag], self) #MISC def check_integrity(self): Mesh.check_integrity(self) for quantity in self.conserved_quantities: msg = 'Conserved quantities must be a subset of all quantities' assert quantity in self.quantities, msg ##assert hasattr(self, 'boundary_objects') def write_time(self): if self.min_timestep == self.max_timestep: print 'Time = %.4f, delta t = %.8f, steps=%d (%d)'\ %(self.time, self.min_timestep, self.number_of_steps, self.number_of_first_order_steps) elif self.min_timestep > self.max_timestep: print 'Time = %.4f, steps=%d (%d)'\ %(self.time, self.number_of_steps, self.number_of_first_order_steps) else: print 'Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\ %(self.time, self.min_timestep, self.max_timestep, self.number_of_steps, self.number_of_first_order_steps) def get_name(self): return self.filename def set_name(self, name): self.filename = name #def set_defaults(self): # """Set default values for uninitialised quantities. # Should be overridden or specialised by specific modules # """# # # for name in self.conserved_quantities + self.other_quantities: # self.set_quantity(name, 0.0) ########################### #Main components of evolve def evolve(self, yieldstep = None, finaltime = None): """Evolve model from time=0.0 to finaltime yielding results every yieldstep. Internally, smaller timesteps may be taken. Evolve is implemented as a generator and is to be called as such, e.g. for t in domain.evolve(timestep, yieldstep, finaltime): """ #import data_manager from config import min_timestep, max_timestep, epsilon #FIXME: Maybe lump into a larger check prior to evolving msg = 'Boundary tags must be bound to boundary objects before evolving system, ' msg += 'e.g. using the method set_boundary.\n' msg += 'This system has the boundary tags %s ' %self.get_boundary_tags() assert hasattr(self, 'boundary_objects'), msg ##self.set_defaults() if yieldstep is None: yieldstep = max_timestep self.order = self.default_order self.yieldtime = 0.0 #Time between 'yields' #Initialise interval of timestep sizes (for reporting only) self.min_timestep = max_timestep self.max_timestep = min_timestep self.finaltime = finaltime self.number_of_steps = 0 self.number_of_first_order_steps = 0 #Initial update of vertex and edge values self.distribute_to_vertices_and_edges() #Or maybe restore from latest checkpoint if self.checkpoint is True: self.goto_latest_checkpoint() yield(self.time) #Yield initial values while True: #Update boundary values self.update_boundary() #Compute fluxes across each element edge self.compute_fluxes() #Update timestep to fit yieldstep and finaltime self.update_timestep(yieldstep, finaltime) #Update conserved quantities self.update_conserved_quantities() #Update vertex and edge values self.distribute_to_vertices_and_edges() #Update time self.time += self.timestep self.yieldtime += self.timestep self.number_of_steps += 1 if self.order == 1: self.number_of_first_order_steps += 1 #Yield results if finaltime is not None and abs(self.time - finaltime) < epsilon: # Yield final time and stop yield(self.time) break if abs(self.yieldtime - yieldstep) < epsilon: # Yield (intermediate) time and allow inspection of domain if self.checkpoint is True: self.store_checkpoint() self.delete_old_checkpoints() #Pass control on to outer loop for more specific actions yield(self.time) # Reinitialise self.yieldtime = 0.0 self.min_timestep = max_timestep self.max_timestep = min_timestep self.number_of_steps = 0 self.number_of_first_order_steps = 0 def evolve_to_end(self, finaltime = 1.0): """Iterate evolve all the way to the end """ for _ in self.evolve(yieldstep=None, finaltime=finaltime): pass def update_boundary(self): """Go through list of boundary objects and update boundary values for all conserved quantities on boundary. """ #FIXME: Update only those that change (if that can be worked out) for i, ((vol_id, edge_id), B) in enumerate(self.boundary_objects): q = B.evaluate(vol_id, edge_id) for j, name in enumerate(self.conserved_quantities): Q = self.quantities[name] Q.boundary_values[i] = q[j] def compute_fluxes(self): msg = 'Method compute_fluxes must be overridden by Domain subclass' raise msg def update_timestep(self, yieldstep, finaltime): from config import min_timestep timestep = self.timestep #Record maximal and minimal values of timestep for reporting self.max_timestep = max(timestep, self.max_timestep) self.min_timestep = min(timestep, self.min_timestep) #Protect against degenerate time steps if timestep < min_timestep: #Number of consecutive small steps taken b4 taking action self.smallsteps += 1 if self.smallsteps > self.max_smallsteps: self.smallsteps = 0 #Reset if self.order == 1: msg = 'Too small timestep %.16f reached ' %timestep msg += 'even after %d steps of 1 order scheme'\ %self.max_smallsteps raise msg else: #Try to overcome situation by switching to 1 order self.order = 1 else: self.smallsteps = 0 if self.order == 1 and self.default_order == 2: self.order = 2 #Ensure that final time is not exceeded if finaltime is not None and self.time + timestep > finaltime: timestep = finaltime-self.time #Ensure that model time is aligned with yieldsteps if self.yieldtime + timestep > yieldstep: timestep = yieldstep-self.yieldtime self.timestep = timestep def compute_forcing_terms(self): """If there are any forcing functions driving the system they should be defined in Domain subclass and appended to the list self.forcing_terms """ for f in self.forcing_terms: f(self) def update_conserved_quantities(self): """Update vectors of conserved quantities using previously computed fluxes specified forcing functions. """ from Numeric import ones, sum, equal, Float N = self.number_of_elements d = len(self.conserved_quantities) timestep = self.timestep #Compute forcing terms self.compute_forcing_terms() #Update conserved_quantities for name in self.conserved_quantities: Q = self.quantities[name] Q.update(timestep) #Clean up #Note that Q.explicit_update is reset by compute_fluxes Q.semi_implicit_update[:] = 0.0 def distribute_to_vertices_and_edges(self): """Extrapolate conserved quantities from centroid to vertices and edge-midpoints for each volume Default implementation is straight first order, i.e. constant values throughout each element and no reference to non-conserved quantities. """ for name in self.conserved_quantities: Q = self.quantities[name] if self.order == 1: Q.extrapolate_first_order() elif self.order == 2: Q.extrapolate_second_order() Q.limit() else: raise 'Unknown order' Q.interpolate_from_vertices_to_edges() ############################################## #Initialise module #Optimisation with psyco from config import use_psyco if use_psyco: try: import psyco except: msg = 'WARNING: psyco (speedup) could not import'+\ ', you may want to consider installing it' print msg else: psyco.bind(Domain.update_boundary) #psyco.bind(Domain.update_timestep) #Not worth it psyco.bind(Domain.update_conserved_quantities) psyco.bind(Domain.distribute_to_vertices_and_edges) if __name__ == "__main__": pass