"""Example of shallow water wave equation. Specific methods pertaining to the 2D shallow water equation are imported from shallow_water for use with the generic finite volume framework Conserved quantities are h, uh and vh stored as elements 0, 1 and 2 in the numerical vector named conserved_quantities. """ #------------------------------------------------------------------------------ # Import necessary modules #------------------------------------------------------------------------------ from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross from anuga.shallow_water import Domain from anuga.shallow_water.shallow_water_domain import Reflective_boundary from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary #------------------------------------------------------------------------------ # Setup computational domain #------------------------------------------------------------------------------ length = 40. width = 5. #dx = dy = 1 # Resolution: Length of subdivisions on both axes #dx = dy = .5 # Resolution: Length of subdivisions on both axes dx = dy = .1 # Resolution: Length of subdivisions on both axes points, vertices, boundary = rectangular_cross(int(length/dx), int(width/dy), len1=length, len2=width) domain = Domain(points, vertices, boundary) domain.set_name('culvert_example') # Output name print 'Size', len(domain) #------------------------------------------------------------------------------ # Setup initial conditions #------------------------------------------------------------------------------ def topography(x, y): """Set up a weir A culvert will connect either side """ z = -x/10 N = len(x) for i in range(N): # Weir from 10 to 12 m if 10 < x[i] < 12: z[i] += 0.4 - 0.05*y[i] # Constriction #if 27 < x[i] < 29 and y[i] > 3: # z[i] += 2 # Pole #if (x[i] - 34)**2 + (y[i] - 2)**2 < 0.4**2: # z[i] += 2 return z domain.set_quantity('elevation', topography) # Use function for elevation domain.set_quantity('friction', 0.01) # Constant friction domain.set_quantity('stage', expression='elevation') # Dry initial condition #------------------------------------------------------------------------------ # Setup specialised forcing terms #------------------------------------------------------------------------------ class Inflow: """Class Inflow - general 'rain and drain' forcing term. Useful for implementing flows in and out of the domain. Inflow(center, radius, flow) center [m]: Coordinates at center of flow point radius [m]: Size of circular area flow [m/s]: Rate of change of quantity over the specified area. This parameter can be either a constant or a function of time. Positive values indicate inflow, negative values indicate outflow. Examples Inflow((0.7, 0.4), 0.07, -0.2) # Constant drain at 0.2 m/s. # This corresponds to a flow of # 0.07**2*pi*0.2 = 0.00314 m^3/s Inflow((0.5, 0.5), 0.001, lambda t: min(4*t, 5)) # Tap turning up to # a maximum inflow of # 5 m/s over the # specified area """ # FIXME (OLE): Add a polygon as an alternative. # FIXME (OLE): Generalise to all quantities # FIXME (OLE): def __init__(self, domain, center=None, radius=None, flow=0.0, quantity_name = 'stage'): from anuga.utilities.numerical_tools import ensure_numeric if center is not None and radius is not None: assert len(center) == 2 else: msg = 'Both center and radius must be specified' raise Exception, msg self.domain = domain self.center = ensure_numeric(center) self.radius = radius self.flow = flow self.quantity = domain.quantities[quantity_name].explicit_update # Determine indices in flow area N = len(domain) self.indices = [] coordinates = domain.get_centroid_coordinates() for k in range(N): x, y = coordinates[k,:] # Centroid if ((x-center[0])**2+(y-center[1])**2) < radius**2: self.indices.append(k) def __call__(self, domain): # Update inflow if callable(self.flow): flow = self.flow(domain.get_time()) else: flow = self.flow for k in self.indices: self.quantity[k] += flow class Culvert_flow: """Culvert flow - transfer water from one hole to another """ def __init__(self, domain, center0=None, radius0=None, center1=None, radius1=None, transfer_flow=None): from Numeric import sqrt, sum self.openings = [] self.openings.append(Inflow(domain, center=center0, radius=radius0, flow=None)) self.openings.append(Inflow(domain, center=center1, radius=radius1, flow=None)) # Compute distance between opening centers. I assume there are only two. self.distance = sqrt(sum( (self.openings[0].center - self.openings[1].center)**2 )) print 'Distance between openings is', self.distance def __call__(self, domain): from anuga.utilities.numerical_tools import mean # Get average water depths at each opening for opening in self.openings: stage = domain.quantities['stage'].get_values(location='centroids', indices=opening.indices) elevation = domain.quantities['elevation'].get_values(location='centroids', indices=opening.indices) # Store current avg depth with opening object opening.depth = mean(stage-elevation) # print 'Depth', opening.depth # Here's a simple transfer function hardwired into this Culvert class: # Let flow rate depend on difference in depth # Flow is instantaneous for now but one might use # the precomputed distance between the two openings # to somehow introduce a delay. delta_h = self.openings[0].depth - self.openings[1].depth flow_rate = 0.5*9.81*delta_h**2 # Just an example of flow rate flow_rate /= (self.openings[0].radius + self.openings[1].radius)/2 # if delta_h > 0: # Opening 0 has the greatest depth. # Flow will go from opening 0 to opening 1 self.openings[0].flow = -flow_rate self.openings[1].flow = flow_rate else: # Opening 1 has the greatest depth. # Flow will go from opening 1 to opening 0 self.openings[0].flow = flow_rate self.openings[1].flow = -flow_rate # Execute flow term for each opening for opening in self.openings: opening(domain) #sink = Inflow(domain, center=[0.7, 0.4], radius=0.0707, flow = 0.0) #tap = Inflow(domain, center=(0.5, 0.5), radius=0.0316, # flow=lambda t: min(4*t, 5)) # Tap turning up #domain.forcing_terms.append(tap) #domain.forcing_terms.append(sink) culvert = Culvert_flow(domain, center0=[5.0, 2.5], radius0=0.3, center1=[20., 2.5], radius1=0.3, transfer_flow=None) # Hardwired for now domain.forcing_terms.append(culvert) #------------------------------------------------------------------------------ # Setup boundary conditions #------------------------------------------------------------------------------ Bi = Dirichlet_boundary([0.4, 0, 0]) # Inflow Br = Reflective_boundary(domain) # Solid reflective wall Bo = Dirichlet_boundary([-5, 0, 0]) # Outflow domain.set_boundary({'left': Bi, 'right': Bo, 'top': Br, 'bottom': Br}) #------------------------------------------------------------------------------ # Evolve system through time #------------------------------------------------------------------------------ for t in domain.evolve(yieldstep = 0.01, finaltime = 15): domain.write_time() #if domain.get_time() >= 4 and tap.flow != 0.0: # print 'Turning tap off' # tap.flow = 0.0 #if domain.get_time() >= 3 and sink.flow < 0.0: # print 'Turning drain on' # sink.flow = -0.8