import sys from os import sep sys.path.append('..'+sep+'pyvolution') """Class Domain - 2D triangular domains for finite-volume computations of the advection equation. This module contains a specialisation of class Domain from module domain.py consisting of methods specific to the advection equantion The equation is u_t + (v_1 u)_x + (v_2 u)_y = 0 There is only one conserved quantity, the stage u The advection equation is a very simple specialisation of the generic domain and may serve as an instructive example or a test of other components such as visualisation. Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou Geoscience Australia, 2004 """ import logging, logging.config logger = logging.getLogger('advection') logger.setLevel(logging.WARNING) try: logging.config.fileConfig('log.ini') except: pass from domain import * Generic_domain = Domain #Rename class Domain(Generic_domain): def __init__(self, coordinates, vertices, boundary = None, tagged_elements = None, geo_reference = None, use_inscribed_circle=False, velocity = None): conserved_quantities = ['stage'] other_quantities = [] Generic_domain.__init__(self, coordinates, vertices, boundary, conserved_quantities, other_quantities, tagged_elements, geo_reference, use_inscribed_circle) if velocity is None: self.velocity = [1,0] else: self.velocity = velocity #Only first is implemented for advection self.default_order = self.order = 1 #Realtime visualisation self.visualiser = None self.visualise = False self.visualise_color_stage = False self.visualise_timer = True self.visualise_range_z = None self.smooth = True def initialise_visualiser(self,scale_z=1.0,rect=None): #Realtime visualisation if self.visualiser is None: from realtime_visualisation_new import Visualiser self.visualiser = Visualiser(self,scale_z,rect) self.visualise = True def check_integrity(self): Generic_domain.check_integrity(self) msg = 'Conserved quantity must be "stage"' assert self.conserved_quantities[0] == 'stage', msg def flux_function(self, normal, ql, qr, zl=None, zr=None): """Compute outward flux as inner product between velocity vector v=(v_1, v_2) and normal vector n. if > 0 flux direction is outward bound and its magnitude is determined by the quantity inside volume: ql. Otherwise it is inbound and magnitude is determined by the quantity outside the volume: qr. """ v1 = self.velocity[0] v2 = self.velocity[1] normal_velocity = v1*normal[0] + v2*normal[1] if normal_velocity < 0: flux = qr * normal_velocity else: flux = ql * normal_velocity max_speed = abs(normal_velocity) return flux, max_speed def compute_fluxes(self): try: import weave self.weave_available = True except: self.weave_available = False if self.weave_available: self.compute_fluxes_weave() else: self.compute_fluxes_python() def compute_fluxes_python(self): """Compute all fluxes and the timestep suitable for all volumes in domain. Compute total flux for each conserved quantity using "flux_function" Fluxes across each edge are scaled by edgelengths and summed up Resulting flux is then scaled by area and stored in domain.explicit_update The maximal allowable speed computed by the flux_function for each volume is converted to a timestep that must not be exceeded. The minimum of those is computed as the next overall timestep. Post conditions: domain.explicit_update is reset to computed flux values domain.timestep is set to the largest step satisfying all volumes. """ import sys from Numeric import zeros, Float from config import max_timestep N = self.number_of_elements neighbours = self.neighbours neighbour_edges = self.neighbour_edges normals = self.normals areas = self.areas radii = self.radii edgelengths = self.edgelengths timestep = max_timestep #FIXME: Get rid of this #Shortcuts Stage = self.quantities['stage'] #Arrays stage = Stage.edge_values stage_bdry = Stage.boundary_values flux = zeros(1, Float) #Work array for summing up fluxes #Loop for k in range(N): optimal_timestep = float(sys.maxint) flux[:] = 0. #Reset work array for i in range(3): #Quantities inside volume facing neighbour i ql = stage[k, i] #Quantities at neighbour on nearest face n = neighbours[k,i] if n < 0: m = -n-1 #Convert neg flag to index qr = stage_bdry[m] else: m = neighbour_edges[k,i] qr = stage[n, m] #Outward pointing normal vector normal = normals[k, 2*i:2*i+2] #Flux computation using provided function edgeflux, max_speed = self.flux_function(normal, ql, qr) flux -= edgeflux * edgelengths[k,i] #Update optimal_timestep try: optimal_timestep = min(optimal_timestep, radii[k]/max_speed) except ZeroDivisionError: pass #Normalise by area and store for when all conserved #quantities get updated flux /= areas[k] Stage.explicit_update[k] = flux[0] timestep = min(timestep, optimal_timestep) self.timestep = timestep def compute_fluxes_weave(self): """Compute all fluxes and the timestep suitable for all volumes in domain. Compute total flux for each conserved quantity using "flux_function" Fluxes across each edge are scaled by edgelengths and summed up Resulting flux is then scaled by area and stored in domain.explicit_update The maximal allowable speed computed by the flux_function for each volume is converted to a timestep that must not be exceeded. The minimum of those is computed as the next overall timestep. Post conditions: domain.explicit_update is reset to computed flux values domain.timestep is set to the largest step satisfying all volumes. """ import sys from Numeric import zeros, Float from config import max_timestep import weave from weave import converters N = self.number_of_elements timestep = zeros( 1, Float); timestep[0] = float(max_timestep) #FIXME: Get rid of this #Shortcuts Stage = self.quantities['stage'] #Arrays neighbours = self.neighbours neighbour_edges = self.neighbour_edges normals = self.normals areas = self.areas radii = self.radii edgelengths = self.edgelengths stage_edge = Stage.edge_values stage_bdry = Stage.boundary_values stage_update = Stage.explicit_update huge_timestep = float(sys.maxint) v1 = self.velocity[0] v2 = self.velocity[1] code = """ //Loop double qr,ql; int m,n; double normal[2]; double normal_velocity; double flux, edgeflux; double max_speed; double optimal_timestep; for (int k=0; kradii(k)/max_speed) ? radii(k)/max_speed : optimal_timestep; } } //Normalise by area and store for when all conserved //quantities get updated stage_update(k) = flux/areas(k); timestep(0) = (timestep(0)>optimal_timestep) ? optimal_timestep : timestep(0); } """ logger.debug('Trying to weave advection.compute_fluxes') weave.inline(code, ['stage_edge','stage_bdry','stage_update', 'neighbours','neighbour_edges','normals', 'areas','radii','edgelengths','huge_timestep', 'timestep','v1','v2','N'], type_converters = converters.blitz, compiler='gcc'); self.timestep = timestep[0] def evolve(self, yieldstep = None, finaltime = None): """Specialisation of basic evolve method from parent class """ #Initialise real time viz if requested if self.visualise is True and self.time == 0.0: #import realtime_visualisation_new as visualise self.initialise_visualiser() #self.visualiser.update_quantity('stage') self.visualiser.update_timer() #Call basic machinery from parent class for t in Generic_domain.evolve(self, yieldstep, finaltime): #Real time viz if self.visualise is True: #self.visualiser.update_quantity('stage') self.visualiser.update_timer() #Pass control on to outer loop for more specific actions yield(t)