#!/usr/bin/env python import unittest, os from os.path import join from math import pi, sqrt import tempfile from anuga.config import g, epsilon from anuga.config import netcdf_mode_r, netcdf_mode_w, netcdf_mode_a import Numeric as num from anuga.utilities.numerical_tools import mean from anuga.utilities.polygon import is_inside_polygon from anuga.coordinate_transforms.geo_reference import Geo_reference from anuga.abstract_2d_finite_volumes.quantity import Quantity from anuga.geospatial_data.geospatial_data import Geospatial_data from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross from anuga.utilities.system_tools import get_pathname_from_package from shallow_water_domain import * # Get gateway to C implementation of flux function for direct testing from shallow_water_ext import flux_function_central as flux_function # For test_fitting_using_shallow_water_domain example def linear_function(point): point = num.array(point) return point[:,0]+point[:,1] class Weir: """Set a bathymetry for weir with a hole and a downstream gutter x,y are assumed to be in the unit square """ def __init__(self, stage): self.inflow_stage = stage def __call__(self, x, y): N = len(x) assert N == len(y) z = num.zeros(N, num.Float) for i in range(N): z[i] = -x[i]/2 #General slope #Flattish bit to the left if x[i] < 0.3: z[i] = -x[i]/10 #Weir if x[i] >= 0.3 and x[i] < 0.4: z[i] = -x[i]+0.9 #Dip x0 = 0.6 #depth = -1.3 depth = -1.0 #plateaux = -0.9 plateaux = -0.6 if y[i] < 0.7: if x[i] > x0 and x[i] < 0.9: z[i] = depth #RHS plateaux if x[i] >= 0.9: z[i] = plateaux elif y[i] >= 0.7 and y[i] < 1.5: #Restrict and deepen if x[i] >= x0 and x[i] < 0.8: z[i] = depth-(y[i]/3-0.3) #z[i] = depth-y[i]/5 #z[i] = depth elif x[i] >= 0.8: #RHS plateaux z[i] = plateaux elif y[i] >= 1.5: if x[i] >= x0 and x[i] < 0.8 + (y[i]-1.5)/1.2: #Widen up and stay at constant depth z[i] = depth-1.5/5 elif x[i] >= 0.8 + (y[i]-1.5)/1.2: #RHS plateaux z[i] = plateaux #Hole in weir (slightly higher than inflow condition) if x[i] >= 0.3 and x[i] < 0.4 and y[i] > 0.2 and y[i] < 0.4: z[i] = -x[i]+self.inflow_stage + 0.02 #Channel behind weir x0 = 0.5 if x[i] >= 0.4 and x[i] < x0 and y[i] > 0.2 and y[i] < 0.4: z[i] = -x[i]+self.inflow_stage + 0.02 if x[i] >= x0 and x[i] < 0.6 and y[i] > 0.2 and y[i] < 0.4: #Flatten it out towards the end z[i] = -x0+self.inflow_stage + 0.02 + (x0-x[i])/5 #Hole to the east x0 = 1.1; y0 = 0.35 #if x[i] < -0.2 and y < 0.5: if num.sqrt((2*(x[i]-x0))**2 + (2*(y[i]-y0))**2) < 0.2: z[i] = num.sqrt(((x[i]-x0))**2 + ((y[i]-y0))**2)-1.0 #Tiny channel draining hole if x[i] >= 1.14 and x[i] < 1.2 and y[i] >= 0.4 and y[i] < 0.6: z[i] = -0.9 #North south if x[i] >= 0.9 and x[i] < 1.18 and y[i] >= 0.58 and y[i] < 0.65: z[i] = -1.0 + (x[i]-0.9)/3 #East west #Stuff not in use #Upward slope at inlet to the north west #if x[i] < 0.0: # and y[i] > 0.5: # #z[i] = -y[i]+0.5 #-x[i]/2 # z[i] = x[i]/4 - y[i]**2 + 0.5 #Hole to the west #x0 = -0.4; y0 = 0.35 # center #if sqrt((2*(x[i]-x0))**2 + (2*(y[i]-y0))**2) < 0.2: # z[i] = sqrt(((x[i]-x0))**2 + ((y[i]-y0))**2)-0.2 return z/2 class Weir_simple: """Set a bathymetry for weir with a hole and a downstream gutter x,y are assumed to be in the unit square """ def __init__(self, stage): self.inflow_stage = stage def __call__(self, x, y): N = len(x) assert N == len(y) z = num.zeros(N, num.Float) for i in range(N): z[i] = -x[i] #General slope #Flat bit to the left if x[i] < 0.3: z[i] = -x[i]/10 #General slope #Weir if x[i] > 0.3 and x[i] < 0.4: z[i] = -x[i]+0.9 #Dip if x[i] > 0.6 and x[i] < 0.9: z[i] = -x[i]-0.5 #-y[i]/5 #Hole in weir (slightly higher than inflow condition) if x[i] > 0.3 and x[i] < 0.4 and y[i] > 0.2 and y[i] < 0.4: z[i] = -x[i]+self.inflow_stage + 0.05 return z/2 #Variable windfield implemented using functions def speed(t,x,y): """Large speeds halfway between center and edges Low speeds at center and edges """ from math import exp, cos, pi x = num.array(x) y = num.array(y) N = len(x) s = 0*x #New array for k in range(N): r = num.sqrt(x[k]**2 + y[k]**2) factor = exp( -(r-0.15)**2 ) s[k] = 4000 * factor * (cos(t*2*pi/150) + 2) return s def scalar_func(t,x,y): """Function that returns a scalar. Used to test error message when Numeric array is expected """ return 17.7 def angle(t,x,y): """Rotating field """ from math import atan, pi x = num.array(x) y = num.array(y) N = len(x) a = 0*x #New array for k in range(N): r = num.sqrt(x[k]**2 + y[k]**2) angle = atan(y[k]/x[k]) if x[k] < 0: angle+=pi #atan in ]-pi/2; pi/2[ #Take normal direction angle -= pi/2 #Ensure positive radians if angle < 0: angle += 2*pi a[k] = angle/pi*180 return a class Test_Shallow_Water(unittest.TestCase): def setUp(self): pass def tearDown(self): pass def test_rotate(self): normal = num.array([0.0,-1.0]) q = num.array([1.0,2.0,3.0]) r = rotate(q, normal, direction = 1) assert r[0] == 1 assert r[1] == -3 assert r[2] == 2 w = rotate(r, normal, direction = -1) assert num.allclose(w, q) #Check error check try: rotate(r, num.array([1,1,1], num.Int)) #array default# except: pass else: raise 'Should have raised an exception' # Individual flux tests def test_flux_zero_case(self): ql = num.zeros( 3, num.Float ) qr = num.zeros( 3, num.Float ) normal = num.zeros( 2, num.Float ) edgeflux = num.zeros( 3, num.Float ) zl = zr = 0. H0 = 0.0 max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux, epsilon, g, H0) assert num.allclose(edgeflux, [0,0,0]) assert max_speed == 0. def test_flux_constants(self): w = 2.0 normal = num.array([1.,0]) ql = num.array([w, 0, 0]) qr = num.array([w, 0, 0]) edgeflux = num.zeros(3, num.Float) zl = zr = 0. h = w - (zl+zr)/2 H0 = 0.0 max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux, epsilon, g, H0) assert num.allclose(edgeflux, [0., 0.5*g*h**2, 0.]) assert max_speed == num.sqrt(g*h) #def test_flux_slope(self): # #FIXME: TODO # w = 2.0 # # normal = array([1.,0]) # ql = array([w, 0, 0]) # qr = array([w, 0, 0]) # zl = zr = 0. # h = w - (zl+zr)/2 # # flux, max_speed = flux_function(normal, ql, qr, zl, zr) # # assert allclose(flux, [0., 0.5*g*h**2, 0.]) # assert max_speed == sqrt(g*h) def test_flux1(self): #Use data from previous version of abstract_2d_finite_volumes normal = num.array([1.,0]) ql = num.array([-0.2, 2, 3]) qr = num.array([-0.2, 2, 3]) zl = zr = -0.5 edgeflux = num.zeros(3, num.Float) H0 = 0.0 max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux, epsilon, g, H0) assert num.allclose(edgeflux, [2.,13.77433333, 20.]) assert num.allclose(max_speed, 8.38130948661) def test_flux2(self): #Use data from previous version of abstract_2d_finite_volumes normal = num.array([0., -1.]) ql = num.array([-0.075, 2, 3]) qr = num.array([-0.075, 2, 3]) zl = zr = -0.375 edgeflux = num.zeros(3, num.Float) H0 = 0.0 max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux, epsilon, g, H0) assert num.allclose(edgeflux, [-3.,-20.0, -30.441]) assert num.allclose(max_speed, 11.7146428199) def test_flux3(self): #Use data from previous version of abstract_2d_finite_volumes normal = num.array([-sqrt(2)/2, sqrt(2)/2]) ql = num.array([-0.075, 2, 3]) qr = num.array([-0.075, 2, 3]) zl = zr = -0.375 edgeflux = num.zeros(3, num.Float) H0 = 0.0 max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux, epsilon, g, H0) assert num.allclose(edgeflux, [sqrt(2)/2, 4.40221112, 7.3829019]) assert num.allclose(max_speed, 4.0716654239) def test_flux4(self): #Use data from previous version of abstract_2d_finite_volumes normal = num.array([-sqrt(2)/2, sqrt(2)/2]) ql = num.array([-0.34319278, 0.10254161, 0.07273855]) qr = num.array([-0.30683287, 0.1071986, 0.05930515]) zl = zr = -0.375 edgeflux = num.zeros(3, num.Float) H0 = 0.0 max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux, epsilon, g, H0) assert num.allclose(edgeflux, [-0.04072676, -0.07096636, -0.01604364]) assert num.allclose(max_speed, 1.31414103233) def test_flux_computation(self): """test_flux_computation - test flux calculation (actual C implementation) This one tests the constant case where only the pressure term contributes to each edge and cancels out once the total flux has been summed up. """ a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe, daf, dae vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) domain.check_integrity() # The constant case domain.set_quantity('elevation', -1) domain.set_quantity('stage', 1) domain.compute_fluxes() assert num.allclose(domain.get_quantity('stage').explicit_update[1], 0) # Central triangle # The more general case def surface(x,y): return -x/2 domain.set_quantity('elevation', -10) domain.set_quantity('stage', surface) domain.set_quantity('xmomentum', 1) domain.compute_fluxes() #print domain.get_quantity('stage').explicit_update # FIXME (Ole): TODO the general case #assert allclose(domain.get_quantity('stage').explicit_update[1], ........??) def test_sw_domain_simple(self): a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe, daf, dae vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] #from anuga.abstract_2d_finite_volumes.domain import Domain as Generic_domain #msg = 'The class %s is not a subclass of the generic domain class %s'\ # %(DomainClass, Domain) #assert issubclass(DomainClass, Domain), msg domain = Domain(points, vertices) domain.check_integrity() for name in ['stage', 'xmomentum', 'ymomentum', 'elevation', 'friction']: assert domain.quantities.has_key(name) assert domain.get_conserved_quantities(0, edge=1) == 0. def test_boundary_conditions(self): a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4] ] boundary = { (0, 0): 'Third', (0, 2): 'First', (2, 0): 'Second', (2, 1): 'Second', (3, 1): 'Second', (3, 2): 'Third'} domain = Domain(points, vertices, boundary) domain.check_integrity() domain.set_quantity('stage', [[1,2,3], [5,5,5], [0,0,9], [-6, 3, 3]]) domain.set_quantity('xmomentum', [[1,1,1], [2,2,2], [3,3,3], [4, 4, 4]]) domain.set_quantity('ymomentum', [[10,10,10], [20,20,20], [30,30,30], [40, 40, 40]]) D = Dirichlet_boundary([5,2,1]) T = Transmissive_boundary(domain) R = Reflective_boundary(domain) domain.set_boundary( {'First': D, 'Second': T, 'Third': R}) domain.update_boundary() #Stage assert domain.quantities['stage'].boundary_values[0] == 2.5 assert domain.quantities['stage'].boundary_values[0] ==\ domain.get_conserved_quantities(0, edge=0)[0] #Reflective (2.5) assert domain.quantities['stage'].boundary_values[1] == 5. #Dirichlet assert domain.quantities['stage'].boundary_values[2] ==\ domain.get_conserved_quantities(2, edge=0)[0] #Transmissive (4.5) assert domain.quantities['stage'].boundary_values[3] ==\ domain.get_conserved_quantities(2, edge=1)[0] #Transmissive (4.5) assert domain.quantities['stage'].boundary_values[4] ==\ domain.get_conserved_quantities(3, edge=1)[0] #Transmissive (-1.5) assert domain.quantities['stage'].boundary_values[5] ==\ domain.get_conserved_quantities(3, edge=2)[0] #Reflective (-1.5) #Xmomentum assert domain.quantities['xmomentum'].boundary_values[0] == 1.0 #Reflective assert domain.quantities['xmomentum'].boundary_values[1] == 2. #Dirichlet assert domain.quantities['xmomentum'].boundary_values[2] ==\ domain.get_conserved_quantities(2, edge=0)[1] #Transmissive assert domain.quantities['xmomentum'].boundary_values[3] ==\ domain.get_conserved_quantities(2, edge=1)[1] #Transmissive assert domain.quantities['xmomentum'].boundary_values[4] ==\ domain.get_conserved_quantities(3, edge=1)[1] #Transmissive assert domain.quantities['xmomentum'].boundary_values[5] == -4.0 #Reflective #Ymomentum assert domain.quantities['ymomentum'].boundary_values[0] == -10.0 #Reflective assert domain.quantities['ymomentum'].boundary_values[1] == 1. #Dirichlet assert domain.quantities['ymomentum'].boundary_values[2] == 30. #Transmissive assert domain.quantities['ymomentum'].boundary_values[3] == 30. #Transmissive assert domain.quantities['ymomentum'].boundary_values[4] == 40. #Transmissive assert domain.quantities['ymomentum'].boundary_values[5] == 40. #Reflective def test_boundary_conditionsII(self): a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4] ] boundary = { (0, 0): 'Third', (0, 2): 'First', (2, 0): 'Second', (2, 1): 'Second', (3, 1): 'Second', (3, 2): 'Third', (0, 1): 'Internal'} domain = Domain(points, vertices, boundary) domain.check_integrity() domain.set_quantity('stage', [[1,2,3], [5,5,5], [0,0,9], [-6, 3, 3]]) domain.set_quantity('xmomentum', [[1,1,1], [2,2,2], [3,3,3], [4, 4, 4]]) domain.set_quantity('ymomentum', [[10,10,10], [20,20,20], [30,30,30], [40, 40, 40]]) D = Dirichlet_boundary([5,2,1]) T = Transmissive_boundary(domain) R = Reflective_boundary(domain) domain.set_boundary( {'First': D, 'Second': T, 'Third': R, 'Internal': None}) domain.update_boundary() domain.check_integrity() def test_boundary_conditionsIII(self): """test_boundary_conditionsIII Test Transmissive_stage_zero_momentum_boundary """ a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4] ] boundary = { (0, 0): 'Third', (0, 2): 'First', (2, 0): 'Second', (2, 1): 'Second', (3, 1): 'Second', (3, 2): 'Third'} domain = Domain(points, vertices, boundary) domain.check_integrity() domain.set_quantity('stage', [[1,2,3], [5,5,5], [0,0,9], [-6, 3, 3]]) domain.set_quantity('xmomentum', [[1,1,1], [2,2,2], [3,3,3], [4, 4, 4]]) domain.set_quantity('ymomentum', [[10,10,10], [20,20,20], [30,30,30], [40, 40, 40]]) D = Dirichlet_boundary([5,2,1]) T = Transmissive_stage_zero_momentum_boundary(domain) R = Reflective_boundary(domain) domain.set_boundary( {'First': D, 'Second': T, 'Third': R}) domain.update_boundary() # Stage assert domain.quantities['stage'].boundary_values[0] == 2.5 assert domain.quantities['stage'].boundary_values[0] ==\ domain.get_conserved_quantities(0, edge=0)[0] #Reflective (2.5) assert domain.quantities['stage'].boundary_values[1] == 5. #Dirichlet assert domain.quantities['stage'].boundary_values[2] ==\ domain.get_conserved_quantities(2, edge=0)[0] #Transmissive (4.5) assert domain.quantities['stage'].boundary_values[3] ==\ domain.get_conserved_quantities(2, edge=1)[0] #Transmissive (4.5) assert domain.quantities['stage'].boundary_values[4] ==\ domain.get_conserved_quantities(3, edge=1)[0] #Transmissive (-1.5) assert domain.quantities['stage'].boundary_values[5] ==\ domain.get_conserved_quantities(3, edge=2)[0] #Reflective (-1.5) # Xmomentum assert domain.quantities['xmomentum'].boundary_values[0] == 1.0 #Reflective assert domain.quantities['xmomentum'].boundary_values[1] == 2. #Dirichlet assert domain.quantities['xmomentum'].boundary_values[2] == 0.0 assert domain.quantities['xmomentum'].boundary_values[3] == 0.0 assert domain.quantities['xmomentum'].boundary_values[4] == 0.0 assert domain.quantities['xmomentum'].boundary_values[5] == -4.0 #Reflective # Ymomentum assert domain.quantities['ymomentum'].boundary_values[0] == -10.0 #Reflective assert domain.quantities['ymomentum'].boundary_values[1] == 1. #Dirichlet assert domain.quantities['ymomentum'].boundary_values[2] == 0.0 assert domain.quantities['ymomentum'].boundary_values[3] == 0.0 assert domain.quantities['ymomentum'].boundary_values[4] == 0.0 assert domain.quantities['ymomentum'].boundary_values[5] == 40. #Reflective def test_boundary_condition_time(self): """test_boundary_condition_time This tests that boundary conditions are evaluated using the right time from domain. """ # Setup computational domain from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross #-------------------------------------------------------------- # Setup computational domain #-------------------------------------------------------------- N = 5 points, vertices, boundary = rectangular_cross(N, N) domain = Domain(points, vertices, boundary) #-------------------------------------------------------------- # Setup initial conditions #-------------------------------------------------------------- domain.set_quantity('elevation', 0.0) domain.set_quantity('friction', 0.0) domain.set_quantity('stage', 0.0) #-------------------------------------------------------------- # Setup boundary conditions #-------------------------------------------------------------- Bt = Time_boundary(domain=domain, # Time dependent boundary f=lambda t: [t, 0.0, 0.0]) Br = Reflective_boundary(domain) # Reflective wall domain.set_boundary({'left': Bt, 'right': Br, 'top': Br, 'bottom': Br}) for t in domain.evolve(yieldstep = 10, finaltime = 20.0): q = Bt.evaluate() # FIXME (Ole): This test would not have passed in # changeset:5846. msg = 'Time boundary not evaluated correctly' assert num.allclose(t, q[0]), msg def test_compute_fluxes0(self): # Do a full triangle and check that fluxes cancel out for # the constant stage case a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) domain.set_quantity('stage', [[2,2,2], [2,2,2], [2,2,2], [2,2,2]]) domain.check_integrity() assert num.allclose(domain.neighbours, [[-1,1,-1], [2,3,0], [-1,-1,1],[1,-1,-1]]) assert num.allclose(domain.neighbour_edges, [[-1,2,-1], [2,0,1], [-1,-1,0],[1,-1,-1]]) zl=zr=0. # Assume flat bed edgeflux = num.zeros(3, num.Float) edgeflux0 = num.zeros(3, num.Float) edgeflux1 = num.zeros(3, num.Float) edgeflux2 = num.zeros(3, num.Float) H0 = 0.0 # Flux across right edge of volume 1 normal = domain.get_normal(1,0) ql = domain.get_conserved_quantities(vol_id=1, edge=0) qr = domain.get_conserved_quantities(vol_id=2, edge=2) max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux0, epsilon, g, H0) # Check that flux seen from other triangles is inverse tmp = qr; qr=ql; ql=tmp normal = domain.get_normal(2,2) max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux, epsilon, g, H0) assert num.allclose(edgeflux0 + edgeflux, 0.) # Flux across upper edge of volume 1 normal = domain.get_normal(1,1) ql = domain.get_conserved_quantities(vol_id=1, edge=1) qr = domain.get_conserved_quantities(vol_id=3, edge=0) max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux1, epsilon, g, H0) # Check that flux seen from other triangles is inverse tmp = qr; qr=ql; ql=tmp normal = domain.get_normal(3,0) max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux, epsilon, g, H0) assert num.allclose(edgeflux1 + edgeflux, 0.) # Flux across lower left hypotenuse of volume 1 normal = domain.get_normal(1,2) ql = domain.get_conserved_quantities(vol_id=1, edge=2) qr = domain.get_conserved_quantities(vol_id=0, edge=1) max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux2, epsilon, g, H0) # Check that flux seen from other triangles is inverse tmp = qr; qr=ql; ql=tmp normal = domain.get_normal(0,1) max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux, epsilon, g, H0) assert num.allclose(edgeflux2 + edgeflux, 0.) # Scale by edgelengths, add up anc check that total flux is zero e0 = domain.edgelengths[1, 0] e1 = domain.edgelengths[1, 1] e2 = domain.edgelengths[1, 2] assert num.allclose(e0*edgeflux0+e1*edgeflux1+e2*edgeflux2, 0.) # Now check that compute_flux yields zeros as well domain.compute_fluxes() for name in ['stage', 'xmomentum', 'ymomentum']: #print name, domain.quantities[name].explicit_update assert num.allclose(domain.quantities[name].explicit_update[1], 0) def test_compute_fluxes1(self): #Use values from previous version a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) val0 = 2.+2.0/3 val1 = 4.+4.0/3 val2 = 8.+2.0/3 val3 = 2.+8.0/3 domain.set_quantity('stage', [[val0, val0, val0], [val1, val1, val1], [val2, val2, val2], [val3, val3, val3]]) domain.check_integrity() zl=zr=0. #Assume flat bed edgeflux = num.zeros(3, num.Float) edgeflux0 = num.zeros(3, num.Float) edgeflux1 = num.zeros(3, num.Float) edgeflux2 = num.zeros(3, num.Float) H0 = 0.0 # Flux across right edge of volume 1 normal = domain.get_normal(1,0) #Get normal 0 of triangle 1 assert num.allclose(normal, [1, 0]) ql = domain.get_conserved_quantities(vol_id=1, edge=0) assert num.allclose(ql, [val1, 0, 0]) qr = domain.get_conserved_quantities(vol_id=2, edge=2) assert num.allclose(qr, [val2, 0, 0]) max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux0, epsilon, g, H0) # Flux across edge in the east direction (as per normal vector) assert num.allclose(edgeflux0, [-15.3598804, 253.71111111, 0.]) assert num.allclose(max_speed, 9.21592824046) #Flux across edge in the west direction (opposite sign for xmomentum) normal_opposite = domain.get_normal(2,2) #Get normal 2 of triangle 2 assert num.allclose(normal_opposite, [-1, 0]) max_speed = flux_function(normal_opposite, ql, qr, zl, zr, edgeflux, epsilon, g, H0) #flux_opposite, max_speed = flux_function([-1, 0], ql, qr, zl, zr) assert num.allclose(edgeflux, [-15.3598804, -253.71111111, 0.]) #Flux across upper edge of volume 1 normal = domain.get_normal(1,1) ql = domain.get_conserved_quantities(vol_id=1, edge=1) qr = domain.get_conserved_quantities(vol_id=3, edge=0) max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux1, epsilon, g, H0) assert num.allclose(edgeflux1, [2.4098563, 0., 123.04444444]) assert num.allclose(max_speed, 7.22956891292) #Flux across lower left hypotenuse of volume 1 normal = domain.get_normal(1,2) ql = domain.get_conserved_quantities(vol_id=1, edge=2) qr = domain.get_conserved_quantities(vol_id=0, edge=1) max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux2, epsilon, g, H0) assert num.allclose(edgeflux2, [9.63942522, -61.59685738, -61.59685738]) assert num.allclose(max_speed, 7.22956891292) #Scale, add up and check that compute_fluxes is correct for vol 1 e0 = domain.edgelengths[1, 0] e1 = domain.edgelengths[1, 1] e2 = domain.edgelengths[1, 2] total_flux = -(e0*edgeflux0+e1*edgeflux1+e2*edgeflux2)/domain.areas[1] assert num.allclose(total_flux, [-0.68218178, -166.6, -35.93333333]) domain.compute_fluxes() #assert num.allclose(total_flux, domain.explicit_update[1,:]) for i, name in enumerate(['stage', 'xmomentum', 'ymomentum']): assert num.allclose(total_flux[i], domain.quantities[name].explicit_update[1]) #assert allclose(domain.explicit_update, [ # [0., -69.68888889, -69.68888889], # [-0.68218178, -166.6, -35.93333333], # [-111.77316251, 69.68888889, 0.], # [-35.68522449, 0., 69.68888889]]) assert num.allclose(domain.quantities['stage'].explicit_update, [0., -0.68218178, -111.77316251, -35.68522449]) assert num.allclose(domain.quantities['xmomentum'].explicit_update, [-69.68888889, -166.6, 69.68888889, 0]) assert num.allclose(domain.quantities['ymomentum'].explicit_update, [-69.68888889, -35.93333333, 0., 69.68888889]) #assert allclose(domain.quantities[name].explicit_update def test_compute_fluxes2(self): #Random values, incl momentum a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) val0 = 2.+2.0/3 val1 = 4.+4.0/3 val2 = 8.+2.0/3 val3 = 2.+8.0/3 zl=zr=0 #Assume flat zero bed edgeflux = num.zeros(3, num.Float) edgeflux0 = num.zeros(3, num.Float) edgeflux1 = num.zeros(3, num.Float) edgeflux2 = num.zeros(3, num.Float) H0 = 0.0 domain.set_quantity('elevation', zl*num.ones( (4,3) )) domain.set_quantity('stage', [[val0, val0-1, val0-2], [val1, val1+1, val1], [val2, val2-2, val2], [val3-0.5, val3, val3]]) domain.set_quantity('xmomentum', [[1, 2, 3], [3, 4, 5], [1, -1, 0], [0, -2, 2]]) domain.set_quantity('ymomentum', [[1, -1, 0], [0, -3, 2], [0, 1, 0], [-1, 2, 2]]) domain.check_integrity() #Flux across right edge of volume 1 normal = domain.get_normal(1,0) ql = domain.get_conserved_quantities(vol_id=1, edge=0) qr = domain.get_conserved_quantities(vol_id=2, edge=2) max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux0, epsilon, g, H0) #Flux across upper edge of volume 1 normal = domain.get_normal(1,1) ql = domain.get_conserved_quantities(vol_id=1, edge=1) qr = domain.get_conserved_quantities(vol_id=3, edge=0) max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux1, epsilon, g, H0) #Flux across lower left hypotenuse of volume 1 normal = domain.get_normal(1,2) ql = domain.get_conserved_quantities(vol_id=1, edge=2) qr = domain.get_conserved_quantities(vol_id=0, edge=1) max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux2, epsilon, g, H0) #Scale, add up and check that compute_fluxes is correct for vol 1 e0 = domain.edgelengths[1, 0] e1 = domain.edgelengths[1, 1] e2 = domain.edgelengths[1, 2] total_flux = -(e0*edgeflux0+e1*edgeflux1+e2*edgeflux2)/domain.areas[1] domain.compute_fluxes() for i, name in enumerate(['stage', 'xmomentum', 'ymomentum']): assert num.allclose(total_flux[i], domain.quantities[name].explicit_update[1]) #assert allclose(total_flux, domain.explicit_update[1,:]) # FIXME (Ole): Need test like this for fluxes in very shallow water. def test_compute_fluxes3(self): #Random values, incl momentum a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) val0 = 2.+2.0/3 val1 = 4.+4.0/3 val2 = 8.+2.0/3 val3 = 2.+8.0/3 zl=zr=-3.75 #Assume constant bed (must be less than stage) domain.set_quantity('elevation', zl*num.ones( (4,3) )) edgeflux = num.zeros(3, num.Float) edgeflux0 = num.zeros(3, num.Float) edgeflux1 = num.zeros(3, num.Float) edgeflux2 = num.zeros(3, num.Float) H0 = 0.0 domain.set_quantity('stage', [[val0, val0-1, val0-2], [val1, val1+1, val1], [val2, val2-2, val2], [val3-0.5, val3, val3]]) domain.set_quantity('xmomentum', [[1, 2, 3], [3, 4, 5], [1, -1, 0], [0, -2, 2]]) domain.set_quantity('ymomentum', [[1, -1, 0], [0, -3, 2], [0, 1, 0], [-1, 2, 2]]) domain.check_integrity() #Flux across right edge of volume 1 normal = domain.get_normal(1,0) ql = domain.get_conserved_quantities(vol_id=1, edge=0) qr = domain.get_conserved_quantities(vol_id=2, edge=2) max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux0, epsilon, g, H0) #Flux across upper edge of volume 1 normal = domain.get_normal(1,1) ql = domain.get_conserved_quantities(vol_id=1, edge=1) qr = domain.get_conserved_quantities(vol_id=3, edge=0) max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux1, epsilon, g, H0) #Flux across lower left hypotenuse of volume 1 normal = domain.get_normal(1,2) ql = domain.get_conserved_quantities(vol_id=1, edge=2) qr = domain.get_conserved_quantities(vol_id=0, edge=1) max_speed = flux_function(normal, ql, qr, zl, zr, edgeflux2, epsilon, g, H0) #Scale, add up and check that compute_fluxes is correct for vol 1 e0 = domain.edgelengths[1, 0] e1 = domain.edgelengths[1, 1] e2 = domain.edgelengths[1, 2] total_flux = -(e0*edgeflux0+e1*edgeflux1+e2*edgeflux2)/domain.areas[1] domain.compute_fluxes() for i, name in enumerate(['stage', 'xmomentum', 'ymomentum']): assert num.allclose(total_flux[i], domain.quantities[name].explicit_update[1]) def xtest_catching_negative_heights(self): #OBSOLETE a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) val0 = 2.+2.0/3 val1 = 4.+4.0/3 val2 = 8.+2.0/3 val3 = 2.+8.0/3 zl=zr=4 #Too large domain.set_quantity('elevation', zl*num.ones( (4,3) )) domain.set_quantity('stage', [[val0, val0-1, val0-2], [val1, val1+1, val1], [val2, val2-2, val2], [val3-0.5, val3, val3]]) #Should fail try: domain.check_integrity() except: pass def test_get_wet_elements(self): a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) val0 = 2.+2.0/3 val1 = 4.+4.0/3 val2 = 8.+2.0/3 val3 = 2.+8.0/3 zl=zr=5 domain.set_quantity('elevation', zl*num.ones( (4,3) )) domain.set_quantity('stage', [[val0, val0-1, val0-2], [val1, val1+1, val1], [val2, val2-2, val2], [val3-0.5, val3, val3]]) #print domain.get_quantity('elevation').get_values(location='centroids') #print domain.get_quantity('stage').get_values(location='centroids') domain.check_integrity() indices = domain.get_wet_elements() assert num.allclose(indices, [1,2]) indices = domain.get_wet_elements(indices=[0,1,3]) assert num.allclose(indices, [1]) def test_get_maximum_inundation_1(self): a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) domain.set_quantity('elevation', lambda x, y: x+2*y) #2 4 4 6 domain.set_quantity('stage', 3) domain.check_integrity() indices = domain.get_wet_elements() assert num.allclose(indices, [0]) q = domain.get_maximum_inundation_elevation() assert num.allclose(q, domain.get_quantity('elevation').get_values(location='centroids')[0]) x, y = domain.get_maximum_inundation_location() assert num.allclose([x, y], domain.get_centroid_coordinates()[0]) def test_get_maximum_inundation_2(self): """test_get_maximum_inundation_2(self) Test multiple wet cells with same elevation """ a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) domain.set_quantity('elevation', lambda x, y: x+2*y) #2 4 4 6 domain.set_quantity('stage', 4.1) domain.check_integrity() indices = domain.get_wet_elements() assert num.allclose(indices, [0,1,2]) q = domain.get_maximum_inundation_elevation() assert num.allclose(q, 4) x, y = domain.get_maximum_inundation_location() assert num.allclose([x, y], domain.get_centroid_coordinates()[1]) def test_get_maximum_inundation_3(self): """test_get_maximum_inundation_3(self) Test of real runup example: """ from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross initial_runup_height = -0.4 final_runup_height = -0.3 #-------------------------------------------------------------- # Setup computational domain #-------------------------------------------------------------- N = 5 points, vertices, boundary = rectangular_cross(N, N) domain = Domain(points, vertices, boundary) domain.set_maximum_allowed_speed(1.0) #-------------------------------------------------------------- # Setup initial conditions #-------------------------------------------------------------- def topography(x,y): return -x/2 # linear bed slope domain.set_quantity('elevation', topography) # Use function for elevation domain.set_quantity('friction', 0.) # Zero friction domain.set_quantity('stage', initial_runup_height) # Constant negative initial stage #-------------------------------------------------------------- # Setup boundary conditions #-------------------------------------------------------------- Br = Reflective_boundary(domain) # Reflective wall Bd = Dirichlet_boundary([final_runup_height, # Constant inflow 0, 0]) # All reflective to begin with (still water) domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br}) #-------------------------------------------------------------- # Test initial inundation height #-------------------------------------------------------------- indices = domain.get_wet_elements() z = domain.get_quantity('elevation').\ get_values(location='centroids', indices=indices) assert num.alltrue(z q_max: q_max = q #-------------------------------------------------------------- # Test inundation height again #-------------------------------------------------------------- indices = domain.get_wet_elements() z = domain.get_quantity('elevation').\ get_values(location='centroids', indices=indices) assert num.alltrue(z 20: msg = 'Model time exceeded.' raise Modeltime_too_late, msg else: return 3*t + 7 domain.forcing_terms = [] R = Rainfall(domain, rate=main_rate, polygon = [[1,1], [2,1], [2,2], [1,2]], default_rate=5.0) assert num.allclose(R.exchange_area, 2) domain.forcing_terms.append(R) domain.time = 10. domain.compute_forcing_terms() #print domain.quantities['stage'].explicit_update assert num.allclose(domain.quantities['stage'].explicit_update[1], (3*domain.time+7)/1000) assert num.allclose(domain.quantities['stage'].explicit_update[0], 0) assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) domain.time = 100. domain.quantities['stage'].explicit_update[:] = 0.0 # Reset domain.compute_forcing_terms() #print domain.quantities['stage'].explicit_update assert num.allclose(domain.quantities['stage'].explicit_update[1], 5.0/1000) # Default value assert num.allclose(domain.quantities['stage'].explicit_update[0], 0) assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) def test_rainfall_forcing_with_evolve(self): """test_rainfall_forcing_with_evolve Test how forcing terms are called within evolve """ # FIXME(Ole): This test is just to experiment a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) #Flat surface with 1m of water domain.set_quantity('elevation', 0) domain.set_quantity('stage', 1.0) domain.set_quantity('friction', 0) Br = Reflective_boundary(domain) domain.set_boundary({'exterior': Br}) # Setup only one forcing term, time dependent rainfall that expires at t==20 from anuga.fit_interpolate.interpolate import Modeltime_too_late def main_rate(t): if t > 20: msg = 'Model time exceeded.' raise Modeltime_too_late, msg else: return 3*t + 7 domain.forcing_terms = [] R = Rainfall(domain, rate=main_rate, polygon=[[1,1], [2,1], [2,2], [1,2]], default_rate=5.0) assert num.allclose(R.exchange_area, 2) domain.forcing_terms.append(R) for t in domain.evolve(yieldstep=1, finaltime=25): pass #print t, domain.quantities['stage'].explicit_update, (3*t+7)/1000 #FIXME(Ole): A test here is hard because explicit_update also # receives updates from the flux calculation. def test_inflow_using_circle(self): from math import pi, cos, sin a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) # Flat surface with 1m of water domain.set_quantity('elevation', 0) domain.set_quantity('stage', 1.0) domain.set_quantity('friction', 0) Br = Reflective_boundary(domain) domain.set_boundary({'exterior': Br}) # Setup only one forcing term, constant inflow of 2 m^3/s on a circle affecting triangles #0 and #1 (bac and bce) domain.forcing_terms = [] I = Inflow(domain, rate=2.0, center=(1,1), radius=1) domain.forcing_terms.append(I) domain.compute_forcing_terms() A = I.exchange_area assert num.allclose(A, 4) # Two triangles assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A) assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A) assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) def test_inflow_using_circle_function(self): from math import pi, cos, sin a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) # Flat surface with 1m of water domain.set_quantity('elevation', 0) domain.set_quantity('stage', 1.0) domain.set_quantity('friction', 0) Br = Reflective_boundary(domain) domain.set_boundary({'exterior': Br}) # Setup only one forcing term, time dependent inflow of 2 m^3/s on a circle affecting triangles #0 and #1 (bac and bce) domain.forcing_terms = [] I = Inflow(domain, rate=lambda t: 2., center=(1,1), radius=1) domain.forcing_terms.append(I) domain.compute_forcing_terms() A = I.exchange_area assert num.allclose(A, 4) # Two triangles assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A) assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A) assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) def test_inflow_catch_too_few_triangles(self): """test_inflow_catch_too_few_triangles Test that exception is thrown if no triangles are covered by the inflow area """ from math import pi, cos, sin a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) # Flat surface with 1m of water domain.set_quantity('elevation', 0) domain.set_quantity('stage', 1.0) domain.set_quantity('friction', 0) Br = Reflective_boundary(domain) domain.set_boundary({'exterior': Br}) # Setup only one forcing term, constant inflow of 2 m^3/s on a circle affecting triangles #0 and #1 (bac and bce) try: Inflow(domain, rate=2.0, center=(1,1.1), radius=0.01) except: pass else: msg = 'Should have raised exception' raise Exception, msg def Xtest_inflow_outflow_conservation(self): """test_inflow_outflow_conservation Test what happens if water is abstracted from one area and injected into another - especially if there is not enough water to match the abstraction. This tests that the total volume is kept constant under a range of scenarios. This test will fail as the problem was only fixed for culverts. FIXME(Ole): It'd be nice to turn it into a volume conservation test """ from math import pi, cos, sin length = 20. width = 10. dx = dy = 2 # 1 or 2 OK points, vertices, boundary = rectangular_cross(int(length/dx), int(width/dy), len1=length, len2=width) domain = Domain(points, vertices, boundary) domain.set_name('test_inflow_conservation') # Output name domain.set_default_order(2) # Flat surface with 1m of water stage = 1.0 domain.set_quantity('elevation', 0) domain.set_quantity('stage', stage) domain.set_quantity('friction', 0) Br = Reflective_boundary(domain) domain.set_boundary({'left': Br, 'right': Br, 'bottom': Br, 'top': Br}) # Setup one forcing term, constant inflow of 2 m^3/s on a circle domain.forcing_terms = [] domain.forcing_terms.append(Inflow(domain, rate=2.0, center=(5,5), radius=1)) domain.compute_forcing_terms() #print domain.quantities['stage'].explicit_update # Check that update values are correct for x in domain.quantities['stage'].explicit_update: assert num.allclose(x, 2.0/pi) or num.allclose(x, 0.0) # Check volumes without inflow domain.forcing_terms = [] initial_volume = domain.quantities['stage'].get_integral() assert num.allclose(initial_volume, width*length*stage) for t in domain.evolve(yieldstep = 0.05, finaltime = 5.0): volume = domain.quantities['stage'].get_integral() assert num.allclose (volume, initial_volume) # Now apply the inflow and check volumes for a range of stage values for stage in [2.0, 1.0, 0.5, 0.25, 0.1, 0.0]: domain.time = 0.0 domain.set_quantity('stage', stage) domain.forcing_terms = [] domain.forcing_terms.append(Inflow(domain, rate=2.0, center=(5,5), radius=1)) initial_volume = domain.quantities['stage'].get_integral() predicted_volume = initial_volume dt = 0.05 for t in domain.evolve(yieldstep = dt, finaltime = 5.0): volume = domain.quantities['stage'].get_integral() assert num.allclose (volume, predicted_volume) predicted_volume = predicted_volume + 2.0/pi/100/dt # Why 100? # Apply equivalent outflow only and check volumes for a range of stage values for stage in [2.0, 1.0, 0.5, 0.25, 0.1, 0.0]: print stage domain.time = 0.0 domain.set_quantity('stage', stage) domain.forcing_terms = [] domain.forcing_terms.append(Inflow(domain, rate=-2.0, center=(15,5), radius=1)) initial_volume = domain.quantities['stage'].get_integral() predicted_volume = initial_volume dt = 0.05 for t in domain.evolve(yieldstep = dt, finaltime = 5.0): volume = domain.quantities['stage'].get_integral() print t, volume, predicted_volume assert num.allclose (volume, predicted_volume) predicted_volume = predicted_volume - 2.0/pi/100/dt # Why 100? # Apply both inflow and outflow and check volumes being constant for a # range of stage values for stage in [2.0, 1.0, 0.5, 0.25, 0.1, 0.0]: print stage domain.time = 0.0 domain.set_quantity('stage', stage) domain.forcing_terms = [] domain.forcing_terms.append(Inflow(domain, rate=2.0, center=(5,5), radius=1)) domain.forcing_terms.append(Inflow(domain, rate=-2.0, center=(15,5), radius=1)) initial_volume = domain.quantities['stage'].get_integral() dt = 0.05 for t in domain.evolve(yieldstep = dt, finaltime = 5.0): volume = domain.quantities['stage'].get_integral() print t, volume assert num.allclose (volume, initial_volume) ##################################################### def test_first_order_extrapolator_const_z(self): a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) val0 = 2.+2.0/3 val1 = 4.+4.0/3 val2 = 8.+2.0/3 val3 = 2.+8.0/3 zl=zr=-3.75 #Assume constant bed (must be less than stage) domain.set_quantity('elevation', zl*num.ones( (4,3) )) domain.set_quantity('stage', [[val0, val0-1, val0-2], [val1, val1+1, val1], [val2, val2-2, val2], [val3-0.5, val3, val3]]) domain._order_ = 1 domain.distribute_to_vertices_and_edges() #Check that centroid values were distributed to vertices C = domain.quantities['stage'].centroid_values for i in range(3): assert num.allclose( domain.quantities['stage'].vertex_values[:,i], C) def test_first_order_limiter_variable_z(self): #Check that first order limiter follows bed_slope from anuga.config import epsilon a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) val0 = 2.+2.0/3 val1 = 4.+4.0/3 val2 = 8.+2.0/3 val3 = 2.+8.0/3 domain.set_quantity('elevation', [[0,0,0], [6,0,0], [6,6,6], [6,6,6]]) domain.set_quantity('stage', [[val0, val0, val0], [val1, val1, val1], [val2, val2, val2], [val3, val3, val3]]) E = domain.quantities['elevation'].vertex_values L = domain.quantities['stage'].vertex_values #Check that some stages are not above elevation (within eps) #- so that the limiter has something to work with assert not num.alltrue(num.alltrue(num.greater_equal(L,E-epsilon))) domain._order_ = 1 domain.distribute_to_vertices_and_edges() #Check that all stages are above elevation (within eps) assert num.alltrue(num.alltrue(num.greater_equal(L,E-epsilon))) ##################################################### def test_distribute_basic(self): #Using test data generated by abstract_2d_finite_volumes-2 #Assuming no friction and flat bed (0.0) a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) val0 = 2. val1 = 4. val2 = 8. val3 = 2. domain.set_quantity('stage', [val0, val1, val2, val3], location='centroids') L = domain.quantities['stage'].vertex_values #First order domain._order_ = 1 domain.distribute_to_vertices_and_edges() assert num.allclose(L[1], val1) #Second order domain._order_ = 2 domain.beta_w = 0.9 domain.beta_w_dry = 0.9 domain.beta_uh = 0.9 domain.beta_uh_dry = 0.9 domain.beta_vh = 0.9 domain.beta_vh_dry = 0.9 domain.distribute_to_vertices_and_edges() assert num.allclose(L[1], [2.2, 4.9, 4.9]) def test_distribute_away_from_bed(self): #Using test data generated by abstract_2d_finite_volumes-2 #Assuming no friction and flat bed (0.0) a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) L = domain.quantities['stage'].vertex_values def stage(x,y): return x**2 domain.set_quantity('stage', stage, location='centroids') domain.quantities['stage'].compute_gradients() a, b = domain.quantities['stage'].get_gradients() assert num.allclose(a[1], 3.33333334) assert num.allclose(b[1], 0.0) domain._order_ = 1 domain.distribute_to_vertices_and_edges() assert num.allclose(L[1], 1.77777778) domain._order_ = 2 domain.beta_w = 0.9 domain.beta_w_dry = 0.9 domain.beta_uh = 0.9 domain.beta_uh_dry = 0.9 domain.beta_vh = 0.9 domain.beta_vh_dry = 0.9 domain.distribute_to_vertices_and_edges() assert num.allclose(L[1], [0.57777777, 2.37777778, 2.37777778]) def test_distribute_away_from_bed1(self): #Using test data generated by abstract_2d_finite_volumes-2 #Assuming no friction and flat bed (0.0) a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) L = domain.quantities['stage'].vertex_values def stage(x,y): return x**4+y**2 domain.set_quantity('stage', stage, location='centroids') #print domain.quantities['stage'].centroid_values domain.quantities['stage'].compute_gradients() a, b = domain.quantities['stage'].get_gradients() assert num.allclose(a[1], 25.18518519) assert num.allclose(b[1], 3.33333333) domain._order_ = 1 domain.distribute_to_vertices_and_edges() assert num.allclose(L[1], 4.9382716) domain._order_ = 2 domain.beta_w = 0.9 domain.beta_w_dry = 0.9 domain.beta_uh = 0.9 domain.beta_uh_dry = 0.9 domain.beta_vh = 0.9 domain.beta_vh_dry = 0.9 domain.distribute_to_vertices_and_edges() assert num.allclose(L[1], [1.07160494, 6.46058131, 7.28262855]) def test_distribute_near_bed(self): a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) #Set up for a gradient of (10,0) at mid triangle (bce) def slope(x, y): return 10*x h = 0.1 def stage(x, y): return slope(x, y) + h domain.set_quantity('elevation', slope) domain.set_quantity('stage', stage, location='centroids') #print domain.quantities['elevation'].centroid_values #print domain.quantities['stage'].centroid_values E = domain.quantities['elevation'].vertex_values L = domain.quantities['stage'].vertex_values # Get reference values volumes = [] for i in range(len(L)): volumes.append(num.sum(L[i])/3) assert num.allclose(volumes[i], domain.quantities['stage'].centroid_values[i]) domain._order_ = 1 domain.tight_slope_limiters = 0 domain.distribute_to_vertices_and_edges() assert num.allclose(L[1], [0.1, 20.1, 20.1]) for i in range(len(L)): assert num.allclose(volumes[i], num.sum(L[i])/3) domain.tight_slope_limiters = 1 # Allow triangle to be flatter (closer to bed) domain.distribute_to_vertices_and_edges() assert num.allclose(L[1], [0.298, 20.001, 20.001]) for i in range(len(L)): assert num.allclose(volumes[i], num.sum(L[i])/3) domain._order_ = 2 domain.tight_slope_limiters = 0 domain.distribute_to_vertices_and_edges() assert num.allclose(L[1], [0.1, 20.1, 20.1]) for i in range(len(L)): assert num.allclose(volumes[i], num.sum(L[i])/3) domain.tight_slope_limiters = 1 # Allow triangle to be flatter (closer to bed) domain.distribute_to_vertices_and_edges() assert num.allclose(L[1], [0.298, 20.001, 20.001]) for i in range(len(L)): assert num.allclose(volumes[i], num.sum(L[i])/3) def test_distribute_near_bed1(self): a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) #Set up for a gradient of (8,2) at mid triangle (bce) def slope(x, y): return x**4+y**2 h = 0.1 def stage(x,y): return slope(x,y)+h domain.set_quantity('elevation', slope) domain.set_quantity('stage', stage) #print domain.quantities['elevation'].centroid_values #print domain.quantities['stage'].centroid_values E = domain.quantities['elevation'].vertex_values L = domain.quantities['stage'].vertex_values # Get reference values volumes = [] for i in range(len(L)): volumes.append(num.sum(L[i])/3) assert num.allclose(volumes[i], domain.quantities['stage'].centroid_values[i]) #print E domain._order_ = 1 domain.tight_slope_limiters = 0 domain.distribute_to_vertices_and_edges() assert num.allclose(L[1], [4.1, 16.1, 20.1]) for i in range(len(L)): assert num.allclose(volumes[i], num.sum(L[i])/3) domain.tight_slope_limiters = 1 # Allow triangle to be flatter (closer to bed) domain.distribute_to_vertices_and_edges() assert num.allclose(L[1], [4.2386, 16.0604, 20.001]) for i in range(len(L)): assert num.allclose(volumes[i], num.sum(L[i])/3) domain._order_ = 2 domain.tight_slope_limiters = 0 domain.distribute_to_vertices_and_edges() assert num.allclose(L[1], [4.1, 16.1, 20.1]) for i in range(len(L)): assert num.allclose(volumes[i], num.sum(L[i])/3) domain.tight_slope_limiters = 1 # Allow triangle to be flatter (closer to bed) domain.distribute_to_vertices_and_edges() #print L[1] assert num.allclose(L[1], [4.23370103, 16.06529897, 20.001]) or\ num.allclose(L[1], [4.18944138, 16.10955862, 20.001]) or\ num.allclose(L[1], [4.19351461, 16.10548539, 20.001]) # old limiters for i in range(len(L)): assert num.allclose(volumes[i], num.sum(L[i])/3) def test_second_order_distribute_real_data(self): #Using test data generated by abstract_2d_finite_volumes-2 #Assuming no friction and flat bed (0.0) a = [0.0, 0.0] b = [0.0, 1.0/5] c = [0.0, 2.0/5] d = [1.0/5, 0.0] e = [1.0/5, 1.0/5] f = [1.0/5, 2.0/5] g = [2.0/5, 2.0/5] points = [a, b, c, d, e, f, g] #bae, efb, cbf, feg vertices = [ [1,0,4], [4,5,1], [2,1,5], [5,4,6]] domain = Domain(points, vertices) def slope(x, y): return -x/3 domain.set_quantity('elevation', slope) domain.set_quantity('stage', [0.01298164, 0.00365611, 0.01440365, -0.0381856437096], location='centroids') domain.set_quantity('xmomentum', [0.00670439, 0.01263789, 0.00647805, 0.0178180740668], location='centroids') domain.set_quantity('ymomentum', [-7.23510980e-004, -6.30413883e-005, 6.30413883e-005, 0.000200907255866], location='centroids') E = domain.quantities['elevation'].vertex_values L = domain.quantities['stage'].vertex_values X = domain.quantities['xmomentum'].vertex_values Y = domain.quantities['ymomentum'].vertex_values #print E domain._order_ = 2 domain.beta_w = 0.9 domain.beta_w_dry = 0.9 domain.beta_uh = 0.9 domain.beta_uh_dry = 0.9 domain.beta_vh = 0.9 domain.beta_vh_dry = 0.9 # FIXME (Ole): Need tests where this is commented out domain.tight_slope_limiters = 0 # Backwards compatibility (14/4/7) domain.use_centroid_velocities = 0 # Backwards compatibility (7/5/8) domain.distribute_to_vertices_and_edges() #print L[1,:] #print X[1,:] #print Y[1,:] assert num.allclose(L[1,:], [-0.00825735775384, -0.00801881482869, 0.0272445025825]) assert num.allclose(X[1,:], [0.0143507718962, 0.0142502147066, 0.00931268339717]) assert num.allclose(Y[1,:], [-0.000117062180693, 7.94434448109e-005, -0.000151505429018]) def test_balance_deep_and_shallow(self): """Test that balanced limiters preserve conserved quantites. This test is using old depth based balanced limiters """ import copy a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe elements = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4] ] domain = Domain(points, elements) domain.check_integrity() #Create a deliberate overshoot domain.set_quantity('stage', [[3,0,3], [2,2,6], [5,3,8], [8,3,5]]) domain.set_quantity('elevation', 0) #Flat bed stage = domain.quantities['stage'] ref_centroid_values = copy.copy(stage.centroid_values[:]) #Copy #Limit domain.tight_slope_limiters = 0 domain.distribute_to_vertices_and_edges() #Assert that quantities are conserved for k in range(len(domain)): assert num.allclose (ref_centroid_values[k], num.sum(stage.vertex_values[k,:])/3) #Now try with a non-flat bed - closely hugging initial stage in places #This will create alphas in the range [0, 0.478260, 1] domain.set_quantity('stage', [[3,0,3], [2,2,6], [5,3,8], [8,3,5]]) domain.set_quantity('elevation', [[0,0,0], [1.8,1.9,5.9], [4.6,0,0], [0,2,4]]) stage = domain.quantities['stage'] ref_centroid_values = copy.copy(stage.centroid_values[:]) #Copy ref_vertex_values = copy.copy(stage.vertex_values[:]) #Copy #Limit domain.tight_slope_limiters = 0 domain.distribute_to_vertices_and_edges() #Assert that all vertex quantities have changed for k in range(len(domain)): #print ref_vertex_values[k,:], stage.vertex_values[k,:] assert not num.allclose (ref_vertex_values[k,:], stage.vertex_values[k,:]) #and assert that quantities are still conserved for k in range(len(domain)): assert num.allclose (ref_centroid_values[k], num.sum(stage.vertex_values[k,:])/3) # Check actual results assert num.allclose (stage.vertex_values, [[2,2,2], [1.93333333, 2.03333333, 6.03333333], [6.93333333, 4.53333333, 4.53333333], [5.33333333, 5.33333333, 5.33333333]]) def test_balance_deep_and_shallow_tight_SL(self): """Test that balanced limiters preserve conserved quantites. This test is using Tight Slope Limiters """ import copy a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe elements = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4] ] domain = Domain(points, elements) domain.check_integrity() #Create a deliberate overshoot domain.set_quantity('stage', [[3,0,3], [2,2,6], [5,3,8], [8,3,5]]) domain.set_quantity('elevation', 0) #Flat bed stage = domain.quantities['stage'] ref_centroid_values = copy.copy(stage.centroid_values[:]) #Copy #Limit domain.tight_slope_limiters = 1 domain.distribute_to_vertices_and_edges() #Assert that quantities are conserved for k in range(len(domain)): assert num.allclose (ref_centroid_values[k], num.sum(stage.vertex_values[k,:])/3) #Now try with a non-flat bed - closely hugging initial stage in places #This will create alphas in the range [0, 0.478260, 1] domain.set_quantity('stage', [[3,0,3], [2,2,6], [5,3,8], [8,3,5]]) domain.set_quantity('elevation', [[0,0,0], [1.8,1.9,5.9], [4.6,0,0], [0,2,4]]) stage = domain.quantities['stage'] ref_centroid_values = copy.copy(stage.centroid_values[:]) #Copy ref_vertex_values = copy.copy(stage.vertex_values[:]) #Copy #Limit domain.tight_slope_limiters = 1 domain.distribute_to_vertices_and_edges() #Assert that all vertex quantities have changed for k in range(len(domain)): #print ref_vertex_values[k,:], stage.vertex_values[k,:] assert not num.allclose (ref_vertex_values[k,:], stage.vertex_values[k,:]) #and assert that quantities are still conserved for k in range(len(domain)): assert num.allclose (ref_centroid_values[k], num.sum(stage.vertex_values[k,:])/3) #Also check that Python and C version produce the same # No longer applicable if tight_slope_limiters == 1 #print stage.vertex_values #assert allclose (stage.vertex_values, # [[2,2,2], # [1.93333333, 2.03333333, 6.03333333], # [6.93333333, 4.53333333, 4.53333333], # [5.33333333, 5.33333333, 5.33333333]]) def test_balance_deep_and_shallow_Froude(self): """Test that balanced limiters preserve conserved quantites - and also that excessive Froude numbers are dealt with. This test is using tight slope limiters. """ import copy a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] # bac, bce, ecf, dbe elements = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4] ] domain = Domain(points, elements) domain.check_integrity() domain.tight_slope_limiters = True domain.use_centroid_velocities = True # Create non-flat bed - closely hugging initial stage in places # This will create alphas in the range [0, 0.478260, 1] domain.set_quantity('stage', [[3,0,3], [2,2,6], [5,3,8], [8,3,5]]) domain.set_quantity('elevation', [[0,0,0], [1.8,1.999,5.999], [4.6,0,0], [0,2,4]]) # Create small momenta, that nonetheless will generate large speeds # due to shallow depth at isolated vertices domain.set_quantity('xmomentum', -0.0058) domain.set_quantity('ymomentum', 0.0890) stage = domain.quantities['stage'] elevation = domain.quantities['elevation'] xmomentum = domain.quantities['xmomentum'] ymomentum = domain.quantities['ymomentum'] # Setup triangle #1 to mimick real Froude explosion observed # in the Onslow example 13 Nov 2007. stage.vertex_values[1,:] = [1.6385, 1.6361, 1.2953] elevation.vertex_values[1,:] = [1.6375, 1.6336, 0.4647] xmomentum.vertex_values[1,:] = [-0.0058, -0.0050, -0.0066] ymomentum.vertex_values[1,:] = [0.0890, 0.0890, 0.0890] xmomentum.interpolate() ymomentum.interpolate() stage.interpolate() elevation.interpolate() # Verify interpolation assert num.allclose(stage.centroid_values[1], 1.5233) assert num.allclose(elevation.centroid_values[1], 1.2452667) assert num.allclose(xmomentum.centroid_values[1], -0.0058) assert num.allclose(ymomentum.centroid_values[1], 0.089) # Derived quantities depth = stage-elevation u = xmomentum/depth v = ymomentum/depth denom = (depth*g)**0.5 Fx = u/denom Fy = v/denom # Verify against Onslow example (14 Nov 2007) assert num.allclose(depth.centroid_values[1], 0.278033) assert num.allclose(u.centroid_values[1], -0.0208608) assert num.allclose(v.centroid_values[1], 0.3201055) assert num.allclose(denom.centroid_values[1], num.sqrt(depth.centroid_values[1]*g)) assert num.allclose(u.centroid_values[1]/denom.centroid_values[1], -0.012637746977) assert num.allclose(Fx.centroid_values[1], u.centroid_values[1]/denom.centroid_values[1]) # Check that Froude numbers are small at centroids. assert num.allclose(Fx.centroid_values[1], -0.012637746977) assert num.allclose(Fy.centroid_values[1], 0.193924048435) # But Froude numbers are huge at some vertices and edges assert num.allclose(Fx.vertex_values[1,:], [-5.85888475e+01, -1.27775313e+01, -2.78511420e-03]) assert num.allclose(Fx.edge_values[1,:], [-6.89150773e-03, -7.38672488e-03, -2.35626238e+01]) assert num.allclose(Fy.vertex_values[1,:], [8.99035764e+02, 2.27440057e+02, 3.75568430e-02]) assert num.allclose(Fy.edge_values[1,:], [1.05748998e-01, 1.06035244e-01, 3.88346947e+02]) # The task is now to arrange the limiters such that Froude numbers # remain under control whil at the same time obeying the conservation # laws. ref_centroid_values = copy.copy(stage.centroid_values[:]) #Copy ref_vertex_values = copy.copy(stage.vertex_values[:]) #Copy # Limit (and invoke balance_deep_and_shallow) domain.tight_slope_limiters = 1 domain.distribute_to_vertices_and_edges() # Redo derived quantities depth = stage-elevation u = xmomentum/depth v = ymomentum/depth # Assert that all vertex velocities stay within one # order of magnitude of centroid velocities. #print u.vertex_values[1,:] #print u.centroid_values[1] assert num.alltrue(num.absolute(u.vertex_values[1,:]) <= num.absolute(u.centroid_values[1])*10) assert num.alltrue(num.absolute(v.vertex_values[1,:]) <= num.absolute(v.centroid_values[1])*10) denom = (depth*g)**0.5 Fx = u/denom Fy = v/denom # Assert that Froude numbers are less than max value (TBA) # at vertices, edges and centroids. from anuga.config import maximum_froude_number assert num.alltrue(num.absolute(Fx.vertex_values[1,:]) < maximum_froude_number) assert num.alltrue(num.absolute(Fy.vertex_values[1,:]) < maximum_froude_number) # Assert that all vertex quantities have changed for k in range(len(domain)): #print ref_vertex_values[k,:], stage.vertex_values[k,:] assert not num.allclose (ref_vertex_values[k,:], stage.vertex_values[k,:]) # Assert that quantities are still conserved for k in range(len(domain)): assert num.allclose (ref_centroid_values[k], num.sum(stage.vertex_values[k,:])/3) return qwidth = 12 for k in [1]: #range(len(domain)): print 'Triangle %d (C, V, E)' %k print 'stage'.ljust(qwidth), stage.centroid_values[k],\ stage.vertex_values[k,:], stage.edge_values[k,:] print 'elevation'.ljust(qwidth), elevation.centroid_values[k],\ elevation.vertex_values[k,:], elevation.edge_values[k,:] print 'depth'.ljust(qwidth), depth.centroid_values[k],\ depth.vertex_values[k,:], depth.edge_values[k,:] print 'xmomentum'.ljust(qwidth), xmomentum.centroid_values[k],\ xmomentum.vertex_values[k,:], xmomentum.edge_values[k,:] print 'ymomentum'.ljust(qwidth), ymomentum.centroid_values[k],\ ymomentum.vertex_values[k,:], ymomentum.edge_values[k,:] print 'u'.ljust(qwidth),u.centroid_values[k],\ u.vertex_values[k,:], u.edge_values[k,:] print 'v'.ljust(qwidth), v.centroid_values[k],\ v.vertex_values[k,:], v.edge_values[k,:] print 'Fx'.ljust(qwidth), Fx.centroid_values[k],\ Fx.vertex_values[k,:], Fx.edge_values[k,:] print 'Fy'.ljust(qwidth), Fy.centroid_values[k],\ Fy.vertex_values[k,:], Fy.edge_values[k,:] def test_conservation_1(self): """Test that stage is conserved globally This one uses a flat bed, reflective bdries and a suitable initial condition """ from mesh_factory import rectangular #Create basic mesh points, vertices, boundary = rectangular(6, 6) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.smooth = False domain.default_order=2 #IC def x_slope(x, y): return x/3 domain.set_quantity('elevation', 0) domain.set_quantity('friction', 0) domain.set_quantity('stage', x_slope) # Boundary conditions (reflective everywhere) Br = Reflective_boundary(domain) domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br}) domain.check_integrity() initial_volume = domain.quantities['stage'].get_integral() initial_xmom = domain.quantities['xmomentum'].get_integral() #print initial_xmom #Evolution for t in domain.evolve(yieldstep = 0.05, finaltime = 5.0): volume = domain.quantities['stage'].get_integral() assert num.allclose (volume, initial_volume) #I don't believe that the total momentum should be the same #It starts with zero and ends with zero though #xmom = domain.quantities['xmomentum'].get_integral() #print xmom #assert allclose (xmom, initial_xmom) os.remove(domain.get_name() + '.sww') def test_conservation_2(self): """Test that stage is conserved globally This one uses a slopy bed, reflective bdries and a suitable initial condition """ from mesh_factory import rectangular #Create basic mesh points, vertices, boundary = rectangular(6, 6) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.smooth = False domain.default_order=2 #IC def x_slope(x, y): return x/3 domain.set_quantity('elevation', x_slope) domain.set_quantity('friction', 0) domain.set_quantity('stage', 0.4) #Steady # Boundary conditions (reflective everywhere) Br = Reflective_boundary(domain) domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br}) domain.check_integrity() initial_volume = domain.quantities['stage'].get_integral() initial_xmom = domain.quantities['xmomentum'].get_integral() #print initial_xmom #Evolution for t in domain.evolve(yieldstep = 0.05, finaltime = 5.0): volume = domain.quantities['stage'].get_integral() assert num.allclose (volume, initial_volume) #FIXME: What would we expect from momentum #xmom = domain.quantities['xmomentum'].get_integral() #print xmom #assert allclose (xmom, initial_xmom) os.remove(domain.get_name() + '.sww') def test_conservation_3(self): """Test that stage is conserved globally This one uses a larger grid, convoluted bed, reflective bdries and a suitable initial condition """ from mesh_factory import rectangular #Create basic mesh points, vertices, boundary = rectangular(2, 1) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.smooth = False domain.default_order = 2 domain.set_quantities_to_be_stored(['stage', 'xmomentum', 'ymomentum']) #IC def x_slope(x, y): z = 0*x for i in range(len(x)): if x[i] < 0.3: z[i] = x[i]/3 if 0.3 <= x[i] < 0.5: z[i] = -0.5 if 0.5 <= x[i] < 0.7: z[i] = 0.39 if 0.7 <= x[i]: z[i] = x[i]/3 return z domain.set_quantity('elevation', x_slope) domain.set_quantity('friction', 0) domain.set_quantity('stage', 0.4) #Steady # Boundary conditions (reflective everywhere) Br = Reflective_boundary(domain) domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br}) domain.check_integrity() initial_volume = domain.quantities['stage'].get_integral() initial_xmom = domain.quantities['xmomentum'].get_integral() import copy ref_centroid_values =\ copy.copy(domain.quantities['stage'].centroid_values) #print 'ORG', domain.quantities['stage'].centroid_values domain.distribute_to_vertices_and_edges() #print domain.quantities['stage'].centroid_values assert num.allclose(domain.quantities['stage'].centroid_values, ref_centroid_values) #Check that initial limiter doesn't violate cons quan assert num.allclose(domain.quantities['stage'].get_integral(), initial_volume) #Evolution for t in domain.evolve(yieldstep = 0.05, finaltime = 10): volume = domain.quantities['stage'].get_integral() #print t, volume, initial_volume assert num.allclose (volume, initial_volume) os.remove(domain.get_name() + '.sww') def test_conservation_4(self): """Test that stage is conserved globally This one uses a larger grid, convoluted bed, reflective bdries and a suitable initial condition """ from mesh_factory import rectangular #Create basic mesh points, vertices, boundary = rectangular(6, 6) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.smooth = False domain.default_order=2 #IC def x_slope(x, y): z = 0*x for i in range(len(x)): if x[i] < 0.3: z[i] = x[i]/3 if 0.3 <= x[i] < 0.5: z[i] = -0.5 if 0.5 <= x[i] < 0.7: #z[i] = 0.3 #OK with beta == 0.2 z[i] = 0.34 #OK with beta == 0.0 #z[i] = 0.35#Fails after 80 timesteps with an error #of the order 1.0e-5 if 0.7 <= x[i]: z[i] = x[i]/3 return z domain.set_quantity('elevation', x_slope) domain.set_quantity('friction', 0) domain.set_quantity('stage', 0.4) #Steady # Boundary conditions (reflective everywhere) Br = Reflective_boundary(domain) domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br}) domain.check_integrity() initial_volume = domain.quantities['stage'].get_integral() initial_xmom = domain.quantities['xmomentum'].get_integral() import copy ref_centroid_values =\ copy.copy(domain.quantities['stage'].centroid_values) #Test limiter by itself domain.distribute_to_vertices_and_edges() #Check that initial limiter doesn't violate cons quan assert num.allclose (domain.quantities['stage'].get_integral(), initial_volume) #NOTE: This would fail if any initial stage was less than the #corresponding bed elevation - but that is reasonable. #Evolution for t in domain.evolve(yieldstep = 0.05, finaltime = 10.0): volume = domain.quantities['stage'].get_integral() #print t, volume, initial_volume assert num.allclose (volume, initial_volume) os.remove(domain.get_name() + '.sww') def test_conservation_5(self): """Test that momentum is conserved globally in steady state scenario This one uses a slopy bed, dirichlet and reflective bdries """ from mesh_factory import rectangular # Create basic mesh points, vertices, boundary = rectangular(6, 6) # Create shallow water domain domain = Domain(points, vertices, boundary) domain.smooth = False domain.default_order = 2 # IC def x_slope(x, y): return x/3 domain.set_quantity('elevation', x_slope) domain.set_quantity('friction', 0) domain.set_quantity('stage', 0.4) # Steady # Boundary conditions (reflective everywhere) Br = Reflective_boundary(domain) Bleft = Dirichlet_boundary([0.5,0,0]) Bright = Dirichlet_boundary([0.1,0,0]) domain.set_boundary({'left': Bleft, 'right': Bright, 'top': Br, 'bottom': Br}) domain.check_integrity() initial_volume = domain.quantities['stage'].get_integral() initial_xmom = domain.quantities['xmomentum'].get_integral() # Evolution for t in domain.evolve(yieldstep = 0.05, finaltime = 15.0): stage = domain.quantities['stage'].get_integral() xmom = domain.quantities['xmomentum'].get_integral() ymom = domain.quantities['ymomentum'].get_integral() if num.allclose(t, 6): # Steady state reached steady_xmom = domain.quantities['xmomentum'].get_integral() steady_ymom = domain.quantities['ymomentum'].get_integral() steady_stage = domain.quantities['stage'].get_integral() if t > 6: #print '%.2f %14.8f %14.8f' %(t, ymom, steady_ymom) msg = 'xmom=%.2f, steady_xmom=%.2f' %(xmom, steady_xmom) assert num.allclose(xmom, steady_xmom), msg assert num.allclose(ymom, steady_ymom) assert num.allclose(stage, steady_stage) os.remove(domain.get_name() + '.sww') def test_conservation_real(self): """Test that momentum is conserved globally Stephen finally made a test that revealed the problem. This test failed with code prior to 25 July 2005 """ yieldstep = 0.01 finaltime = 0.05 min_depth = 1.0e-2 import sys from os import sep; sys.path.append('..'+sep+'abstract_2d_finite_volumes') from mesh_factory import rectangular #Create shallow water domain points, vertices, boundary = rectangular(10, 10, len1=500, len2=500) domain = Domain(points, vertices, boundary) domain.smooth = False domain.default_order = 1 domain.minimum_allowed_height = min_depth # Set initial condition class Set_IC: """Set an initial condition with a constant value, for x0self.x0)&(x= 0.0 assert depth['max'] >= 0.0 ##assert depth[1] <= ?? initial_runup_height #-------------------------------------------------------------- # Update boundary to allow inflow #-------------------------------------------------------------- domain.set_boundary({'right': Bd}) #-------------------------------------------------------------- # Evolve system through time #-------------------------------------------------------------- for t in domain.evolve(yieldstep = 0.1, finaltime = 3.0): #domain.write_time() domain.quantity_statistics() # Run it silently #-------------------------------------------------------------- # Test extrema again #-------------------------------------------------------------- stage = domain.quantities_to_be_monitored['stage'] assert stage['min'] <= stage['max'] assert num.allclose(stage['min'], initial_runup_height, rtol = 1.0/N) # First order accuracy depth = domain.quantities_to_be_monitored['stage-elevation'] assert depth['min'] <= depth['max'] assert depth['min'] >= 0.0 assert depth['max'] >= 0.0 #Cleanup os.remove(domain.get_name() + '.' + domain.format) def test_tight_slope_limiters(self): """Test that new slope limiters (Feb 2007) don't induce extremely small timesteps. This test actually reveals the problem as it was in March-April 2007 """ import time, os from Scientific.IO.NetCDF import NetCDFFile from data_manager import get_dataobject, extent_sww from mesh_factory import rectangular #Create basic mesh points, vertices, boundary = rectangular(2, 2) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.default_order = 2 # This will pass #domain.tight_slope_limiters = 1 #domain.H0 = 0.01 # This will fail #domain.tight_slope_limiters = 1 #domain.H0 = 0.001 # This will pass provided C extension implements limiting of # momentum in _compute_speeds domain.tight_slope_limiters = 1 domain.H0 = 0.001 domain.protect_against_isolated_degenerate_timesteps = True #Set some field values domain.set_quantity('elevation', lambda x,y: -x) domain.set_quantity('friction', 0.03) ###################### # Boundary conditions B = Transmissive_boundary(domain) domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B}) ###################### #Initial condition - with jumps bed = domain.quantities['elevation'].vertex_values stage = num.zeros(bed.shape, num.Float) h = 0.3 for i in range(stage.shape[0]): if i % 2 == 0: stage[i,:] = bed[i,:] + h else: stage[i,:] = bed[i,:] domain.set_quantity('stage', stage) domain.distribute_to_vertices_and_edges() domain.set_name('tight_limiters') domain.format = 'sww' domain.smooth = True domain.reduction = mean domain.set_datadir('.') domain.smooth = False domain.store = True #Evolution for t in domain.evolve(yieldstep = 0.1, finaltime = 0.3): #domain.write_time(track_speeds=True) stage = domain.quantities['stage'].vertex_values #Get NetCDF fid = NetCDFFile(domain.writer.filename, netcdf_mode_r) stage_file = fid.variables['stage'] fid.close() os.remove(domain.writer.filename) def test_pmesh2Domain(self): import os import tempfile fileName = tempfile.mktemp(".tsh") file = open(fileName,"w") file.write("4 3 # [attributes]\n \ 0 0.0 0.0 0.0 0.0 0.01 \n \ 1 1.0 0.0 10.0 10.0 0.02 \n \ 2 0.0 1.0 0.0 10.0 0.03 \n \ 3 0.5 0.25 8.0 12.0 0.04 \n \ # Vert att title \n \ elevation \n \ stage \n \ friction \n \ 2 # [] [] \n\ 0 0 3 2 -1 -1 1 dsg\n\ 1 0 1 3 -1 0 -1 ole nielsen\n\ 4 # [boundary tag] \n\ 0 1 0 2 \n\ 1 0 2 3 \n\ 2 2 3 \n\ 3 3 1 1 \n\ 3 0 # [attributes] ...Mesh Vertices... \n \ 0 216.0 -86.0 \n \ 1 160.0 -167.0 \n \ 2 114.0 -91.0 \n \ 3 # [boundary tag] ...Mesh Segments... \n \ 0 0 1 0 \n \ 1 1 2 0 \n \ 2 2 0 0 \n \ 0 # ...Mesh Holes... \n \ 0 # ...Mesh Regions... \n \ 0 # ...Mesh Regions, area... \n\ #Geo reference \n \ 56 \n \ 140 \n \ 120 \n") file.close() tags = {} b1 = Dirichlet_boundary(conserved_quantities = num.array([0.0])) b2 = Dirichlet_boundary(conserved_quantities = num.array([1.0])) b3 = Dirichlet_boundary(conserved_quantities = num.array([2.0])) tags["1"] = b1 tags["2"] = b2 tags["3"] = b3 #from anuga.abstract_2d_finite_volumes.pmesh2domain import pmesh_to_domain_instance #domain = pmesh_to_domain_instance(fileName, Domain) domain = Domain(mesh_filename=fileName) #verbose=True, use_cache=True) #print "domain.tagged_elements", domain.tagged_elements ## check the quantities #print domain.quantities['elevation'].vertex_values answer = [[0., 8., 0.], [0., 10., 8.]] assert num.allclose(domain.quantities['elevation'].vertex_values, answer) #print domain.quantities['stage'].vertex_values answer = [[0., 12., 10.], [0., 10., 12.]] assert num.allclose(domain.quantities['stage'].vertex_values, answer) #print domain.quantities['friction'].vertex_values answer = [[0.01, 0.04, 0.03], [0.01, 0.02, 0.04]] assert num.allclose(domain.quantities['friction'].vertex_values, answer) #print domain.quantities['friction'].vertex_values tagged_elements = domain.get_tagged_elements() assert num.allclose(tagged_elements['dsg'][0],0) assert num.allclose(tagged_elements['ole nielsen'][0],1) self.failUnless( domain.boundary[(1, 0)] == '1', "test_tags_to_boundaries failed. Single boundary wasn't added.") self.failUnless( domain.boundary[(1, 2)] == '2', "test_tags_to_boundaries failed. Single boundary wasn't added.") self.failUnless( domain.boundary[(0, 1)] == '3', "test_tags_to_boundaries failed. Single boundary wasn't added.") self.failUnless( domain.boundary[(0, 0)] == 'exterior', "test_tags_to_boundaries failed. Single boundary wasn't added.") #print "domain.boundary",domain.boundary self.failUnless( len(domain.boundary) == 4, "test_pmesh2Domain Too many boundaries") #FIXME change to use get_xllcorner #print "d.geo_reference.xllcorner",domain.geo_reference.xllcorner self.failUnless(domain.geo_reference.xllcorner == 140.0, "bad geo_referece") #************ domain = Domain(fileName) #print "domain.tagged_elements", domain.tagged_elements ## check the quantities #print domain.quantities['elevation'].vertex_values answer = [[0., 8., 0.], [0., 10., 8.]] assert num.allclose(domain.quantities['elevation'].vertex_values, answer) #print domain.quantities['stage'].vertex_values answer = [[0., 12., 10.], [0., 10., 12.]] assert num.allclose(domain.quantities['stage'].vertex_values, answer) #print domain.quantities['friction'].vertex_values answer = [[0.01, 0.04, 0.03], [0.01, 0.02, 0.04]] assert num.allclose(domain.quantities['friction'].vertex_values, answer) #print domain.quantities['friction'].vertex_values tagged_elements = domain.get_tagged_elements() assert num.allclose(tagged_elements['dsg'][0],0) assert num.allclose(tagged_elements['ole nielsen'][0],1) self.failUnless( domain.boundary[(1, 0)] == '1', "test_tags_to_boundaries failed. Single boundary wasn't added.") self.failUnless( domain.boundary[(1, 2)] == '2', "test_tags_to_boundaries failed. Single boundary wasn't added.") self.failUnless( domain.boundary[(0, 1)] == '3', "test_tags_to_boundaries failed. Single boundary wasn't added.") self.failUnless( domain.boundary[(0, 0)] == 'exterior', "test_tags_to_boundaries failed. Single boundary wasn't added.") #print "domain.boundary",domain.boundary self.failUnless( len(domain.boundary) == 4, "test_pmesh2Domain Too many boundaries") #FIXME change to use get_xllcorner #print "d.geo_reference.xllcorner",domain.geo_reference.xllcorner self.failUnless(domain.geo_reference.xllcorner == 140.0, "bad geo_referece") #************ os.remove(fileName) #------------------------------------------------------------- def test_get_lone_vertices(self): a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4] ] boundary = { (0, 0): 'Third', (0, 2): 'First', (2, 0): 'Second', (2, 1): 'Second', (3, 1): 'Second', (3, 2): 'Third'} domain = Domain(points, vertices, boundary) #domain.check_integrity() domain.get_lone_vertices() def test_fitting_using_shallow_water_domain(self): #Mesh in zone 56 (absolute coords) x0 = 314036.58727982 y0 = 6224951.2960092 a = [x0+0.0, y0+0.0] b = [x0+0.0, y0+2.0] c = [x0+2.0, y0+0.0] d = [x0+0.0, y0+4.0] e = [x0+2.0, y0+2.0] f = [x0+4.0, y0+0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe elements = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4] ] #absolute going in .. mesh4 = Domain(points, elements, geo_reference = Geo_reference(56, 0, 0)) mesh4.check_integrity() quantity = Quantity(mesh4) #Get (enough) datapoints (relative to georef) data_points_rel = [[ 0.66666667, 0.66666667], [ 1.33333333, 1.33333333], [ 2.66666667, 0.66666667], [ 0.66666667, 2.66666667], [ 0.0, 1.0], [ 0.0, 3.0], [ 1.0, 0.0], [ 1.0, 1.0], [ 1.0, 2.0], [ 1.0, 3.0], [ 2.0, 1.0], [ 3.0, 0.0], [ 3.0, 1.0]] data_geo_spatial = Geospatial_data(data_points_rel, geo_reference = Geo_reference(56, x0, y0)) data_points_absolute = data_geo_spatial.get_data_points(absolute=True) attributes = linear_function(data_points_absolute) att = 'spam_and_eggs' #Create .txt file ptsfile = tempfile.mktemp(".txt") file = open(ptsfile,"w") file.write(" x,y," + att + " \n") for data_point, attribute in map(None, data_points_absolute ,attributes): row = str(data_point[0]) + ',' + str(data_point[1]) \ + ',' + str(attribute) file.write(row + "\n") file.close() #file = open(ptsfile, 'r') #lines = file.readlines() #file.close() #Check that values can be set from file quantity.set_values(filename = ptsfile, attribute_name = att, alpha = 0) answer = linear_function(quantity.domain.get_vertex_coordinates()) assert num.allclose(quantity.vertex_values.flat, answer) #Check that values can be set from file using default attribute quantity.set_values(filename = ptsfile, alpha = 0) assert num.allclose(quantity.vertex_values.flat, answer) #Cleanup import os os.remove(ptsfile) def test_fitting_example_that_crashed_1(self): """test_fitting_example_that_crashed_1 This unit test has been derived from a real world example (the Towradgi '98 rainstorm simulation). It shows a condition where fitting as called from set_quantity crashes when ANUGA mesh is reused. The test passes in the case where a new mesh is created. See ticket:314 """ verbose = False from anuga.shallow_water import Domain from anuga.pmesh.mesh_interface import create_mesh_from_regions from anuga.geospatial_data.geospatial_data import Geospatial_data # Get path where this test is run path = get_pathname_from_package('anuga.shallow_water') #---------------------------------------------------------------------- # Create domain #---------------------------------------------------------------------- W=303400 N=6195800 E=308640 S=6193120 bounding_polygon = [[W, S], [E, S], [E, N], [W, N]] offending_regions = [] # From culvert 8 offending_regions.append([[307611.43896231, 6193631.6894806], [307600.11394969, 6193608.2855474], [307597.41349586, 6193609.59227963], [307608.73850848, 6193632.99621282]]) offending_regions.append([[307633.69143231, 6193620.9216536], [307622.36641969, 6193597.5177204], [307625.06687352, 6193596.21098818], [307636.39188614, 6193619.61492137]]) # From culvert 9 offending_regions.append([[306326.69660524, 6194818.62900522], [306324.67939476, 6194804.37099478], [306323.75856492, 6194804.50127295], [306325.7757754, 6194818.7592834]]) offending_regions.append([[306365.57160524, 6194813.12900522], [306363.55439476, 6194798.87099478], [306364.4752246, 6194798.7407166], [306366.49243508, 6194812.99872705]]) # From culvert 10 offending_regions.append([[306955.071019428608, 6194465.704096679576], [306951.616980571358, 6194457.295903320424], [306950.044491164153, 6194457.941873183474], [306953.498530021403, 6194466.350066542625]]) offending_regions.append([[307002.540019428649, 6194446.204096679576], [306999.085980571399, 6194437.795903320424], [307000.658469978604, 6194437.149933457375], [307004.112508835853, 6194445.558126816526]]) interior_regions = [] for polygon in offending_regions: interior_regions.append( [polygon, 100] ) meshname = join(path, 'offending_mesh.msh') create_mesh_from_regions(bounding_polygon, boundary_tags={'south': [0], 'east': [1], 'north': [2], 'west': [3]}, maximum_triangle_area=1000000, interior_regions=interior_regions, filename=meshname, use_cache=False, verbose=verbose) domain = Domain(meshname, use_cache=False, verbose=verbose) #---------------------------------------------------------------------- # Fit data point to mesh #---------------------------------------------------------------------- points_file = join(path, 'offending_point.pts') # Offending point G=Geospatial_data(data_points=[[306953.344, 6194461.5]], attributes=[1]) G.export_points_file(points_file) try: domain.set_quantity('elevation', filename=points_file, use_cache=False, verbose=verbose, alpha=0.01) except RuntimeError, e: msg = 'Test failed: %s' % str(e) raise Exception, msg os.remove(meshname) os.remove(points_file) else: os.remove(meshname) os.remove(points_file) #finally: # Cleanup regardless #FIXME(Ole): Finally does not work like this in python2.4 #FIXME(Ole): Reinstate this when Python2.4 is out of the way #FIXME(Ole): Python 2.6 apparently introduces something called 'with' #os.remove(meshname) #os.remove(points_file) def test_fitting_example_that_crashed_2(self): """test_fitting_example_that_crashed_2 This unit test has been derived from a real world example (the JJKelly study, by Petar Milevski). It shows a condition where set_quantity crashes due to AtA not being built properly See ticket:314 """ verbose = False from anuga.shallow_water import Domain from anuga.pmesh.mesh_interface import create_mesh_from_regions from anuga.geospatial_data import Geospatial_data # Get path where this test is run path = get_pathname_from_package('anuga.shallow_water') meshname = join(path, 'test_mesh.msh') W=304180 S=6185270 E=307650 N=6189040 maximum_triangle_area = 1000000 bounding_polygon = [[W, S], [E, S], [E, N], [W, N]] create_mesh_from_regions(bounding_polygon, boundary_tags={'south': [0], 'east': [1], 'north': [2], 'west': [3]}, maximum_triangle_area=maximum_triangle_area, filename=meshname, use_cache=False, verbose=verbose) domain = Domain(meshname, use_cache=True, verbose=verbose) # Large test set revealed one problem points_file = join(path, 'test_points_large.csv') try: domain.set_quantity('elevation', filename=points_file, use_cache=False, verbose=verbose) except AssertionError, e: msg = 'Test failed: %s' % str(e) raise Exception, msg # Cleanup in case this failed os.remove(meshname) # Small test set revealed another problem points_file = join(path, 'test_points_small.csv') try: domain.set_quantity('elevation', filename=points_file, use_cache=False, verbose=verbose) except AssertionError, e: msg = 'Test failed: %s' % str(e) raise Exception, msg os.remove(meshname) else: os.remove(meshname) def test_total_volume(self): """test_total_volume Test that total volume can be computed correctly """ #---------------------------------------------------------------------- # Import necessary modules #---------------------------------------------------------------------- from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross from anuga.shallow_water import Domain #---------------------------------------------------------------------- # Setup computational domain #---------------------------------------------------------------------- length = 100. width = 20. dx = dy = 5 # Resolution: of grid on both axes points, vertices, boundary = rectangular_cross(int(length/dx), int(width/dy), len1=length, len2=width) domain = Domain(points, vertices, boundary) #---------------------------------------------------------------------- # Simple flat bottom bathtub #---------------------------------------------------------------------- d = 1.0 domain.set_quantity('elevation', 0.0) domain.set_quantity('stage', d) assert num.allclose(domain.compute_total_volume(), length*width*d) #---------------------------------------------------------------------- # Slope #---------------------------------------------------------------------- slope = 1.0/10 # RHS drops to -10m def topography(x, y): return -x * slope domain.set_quantity('elevation', topography) domain.set_quantity('stage', 0.0) # Domain full V = domain.compute_total_volume() assert num.allclose(V, length*width*10/2) domain.set_quantity('stage', -5.0) # Domain 'half' full # IMPORTANT: Adjust stage to match elevation domain.distribute_to_vertices_and_edges() V = domain.compute_total_volume() assert num.allclose(V, width*(length/2)*5.0/2) def test_volumetric_balance_computation(self): """test_volumetric_balance_computation Test that total in and out flows are computed correctly FIXME(Ole): This test is more about looking at the printed report """ verbose = False #--------------------------------------------------------------------- # Import necessary modules #--------------------------------------------------------------------- from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross from anuga.shallow_water import Domain from anuga.shallow_water.shallow_water_domain import Reflective_boundary from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary from anuga.shallow_water.shallow_water_domain import Inflow from anuga.shallow_water.data_manager import get_flow_through_cross_section #---------------------------------------------------------------------- # Setup computational domain #---------------------------------------------------------------------- finaltime = 300.0 length = 300. width = 20. dx = dy = 5 # Resolution: of grid on both axes # Input parameters uh = 1.0 vh = 0.0 d = 1.0 ref_flow = uh*d*width # 20 m^3/s in the x direction across entire domain points, vertices, boundary = rectangular_cross(int(length/dx), int(width/dy), len1=length, len2=width) domain = Domain(points, vertices, boundary) domain.set_name('Inflow_flowline_test') # Output name #---------------------------------------------------------------------- # Setup initial conditions #---------------------------------------------------------------------- slope = 0.0 def topography(x, y): z=-x * slope return z domain.set_quantity('elevation', topography) # Use function for elevation domain.set_quantity('friction', 0.0) # Constant friction domain.set_quantity('stage', expression='elevation') #---------------------------------------------------------------------- # Setup boundary conditions #---------------------------------------------------------------------- Br = Reflective_boundary(domain) # Solid reflective wall # Constant flow into domain # Depth = 1m, uh=1 m/s, i.e. a flow of 20 m^3/s Bi = Dirichlet_boundary([d, uh, vh]) Bo = Dirichlet_boundary([0, 0, 0]) domain.set_boundary({'left': Bi, 'right': Bo, 'top': Br, 'bottom': Br}) #---------------------------------------------------------------------- # Evolve system through time #---------------------------------------------------------------------- for t in domain.evolve(yieldstep=100.0, finaltime=finaltime): S = domain.volumetric_balance_statistics() if verbose : print domain.timestepping_statistics() print S def test_volume_conservation_inflow(self): """test_volume_conservation Test that total volume in domain is as expected, based on questions raised by Petar Milevski in May 2009. This test adds inflow at a known rate and verifies that the total terminal volume is as expected. """ verbose = False #--------------------------------------------------------------------- # Import necessary modules #--------------------------------------------------------------------- from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross from anuga.shallow_water import Domain from anuga.shallow_water.shallow_water_domain import Reflective_boundary from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary from anuga.shallow_water.shallow_water_domain import Inflow from anuga.shallow_water.data_manager import get_flow_through_cross_section #---------------------------------------------------------------------- # Setup computational domain #---------------------------------------------------------------------- finaltime = 200.0 length = 300. width = 20. dx = dy = 5 # Resolution: of grid on both axes points, vertices, boundary = rectangular_cross(int(length/dx), int(width/dy), len1=length, len2=width) domain = Domain(points, vertices, boundary) domain.set_name('Inflow_volume_test') # Output name #---------------------------------------------------------------------- # Setup initial conditions #---------------------------------------------------------------------- slope = 0.0 def topography(x, y): z=-x * slope return z domain.set_quantity('elevation', topography) # Use function for elevation domain.set_quantity('friction', 0.0) # Constant friction domain.set_quantity('stage', expression='elevation') # Dry initially #-------------------------------------------------------------- # Setup Inflow #-------------------------------------------------------------- # Fixed Flowrate onto Area fixed_inflow = Inflow(domain, center=(10.0, 10.0), radius=5.00, rate=10.00) domain.forcing_terms.append(fixed_inflow) #---------------------------------------------------------------------- # Setup boundary conditions #---------------------------------------------------------------------- Br = Reflective_boundary(domain) # Solid reflective wall domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br}) #---------------------------------------------------------------------- # Evolve system through time #---------------------------------------------------------------------- ref_volume = 0.0 ys = 10.0 # Yieldstep for t in domain.evolve(yieldstep=ys, finaltime=finaltime): # Check volume assert num.allclose(domain.compute_total_volume(), ref_volume) if verbose : print domain.timestepping_statistics() print domain.volumetric_balance_statistics() print 'reference volume', ref_volume # Update reference volume ref_volume += ys * fixed_inflow.rate def test_volume_conservation_rain(self): """test_volume_conservation Test that total volume in domain is as expected, based on questions raised by Petar Milevski in May 2009. This test adds rain at a known rate and verifies that the total terminal volume is as expected. """ verbose = False #--------------------------------------------------------------------- # Import necessary modules #--------------------------------------------------------------------- from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross from anuga.shallow_water import Domain from anuga.shallow_water.shallow_water_domain import Reflective_boundary from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary from anuga.shallow_water.shallow_water_domain import Rainfall from anuga.shallow_water.data_manager import get_flow_through_cross_section #---------------------------------------------------------------------- # Setup computational domain #---------------------------------------------------------------------- finaltime = 200.0 length = 300. width = 20. dx = dy = 5 # Resolution: of grid on both axes points, vertices, boundary = rectangular_cross(int(length/dx), int(width/dy), len1=length, len2=width) domain = Domain(points, vertices, boundary) domain.set_name('Rain_volume_test') # Output name #---------------------------------------------------------------------- # Setup initial conditions #---------------------------------------------------------------------- slope = 0.0 def topography(x, y): z=-x * slope return z domain.set_quantity('elevation', topography) # Use function for elevation domain.set_quantity('friction', 0.0) # Constant friction domain.set_quantity('stage', expression='elevation') # Dry initially #-------------------------------------------------------------- # Setup rain #-------------------------------------------------------------- # Fixed rain onto small circular area fixed_rain = Rainfall(domain, center=(10.0, 10.0), radius=5.00, rate=10.00) # 10 mm/s domain.forcing_terms.append(fixed_rain) #---------------------------------------------------------------------- # Setup boundary conditions #---------------------------------------------------------------------- Br = Reflective_boundary(domain) # Solid reflective wall domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br}) #---------------------------------------------------------------------- # Evolve system through time #---------------------------------------------------------------------- ref_volume = 0.0 ys = 10.0 # Yieldstep for t in domain.evolve(yieldstep=ys, finaltime=finaltime): # Check volume V = domain.compute_total_volume() msg = 'V = %e, Ref = %e' % (V, ref_volume) assert num.allclose(V, ref_volume), msg if verbose : print domain.timestepping_statistics() print domain.volumetric_balance_statistics() print 'reference volume', ref_volume print V # Update reference volume. # FIXME: Note that rate has now been redefined # as m/s internally. This is a little confusing # when it was specfied as mm/s. delta_V = fixed_rain.rate*fixed_rain.exchange_area ref_volume += ys * delta_V def Xtest_rain_conservation_and_runoff(self): """test_rain_conservation_and_runoff Test that total volume in domain is as expected, based on questions raised by Petar Milevski in May 2009. This test adds rain at a known rate and verifies that the total volume and outflows are as expected. """ # FIXME (Ole): Does not work yet. Investigate boundary flows verbose = True #False #--------------------------------------------------------------------- # Import necessary modules #--------------------------------------------------------------------- from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross from anuga.shallow_water import Domain from anuga.shallow_water.shallow_water_domain import Reflective_boundary from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary from anuga.shallow_water.shallow_water_domain import Rainfall from anuga.shallow_water.data_manager import get_flow_through_cross_section #---------------------------------------------------------------------- # Setup computational domain #---------------------------------------------------------------------- finaltime = 500.0 length = 300. width = 20. dx = dy = 5 # Resolution: of grid on both axes points, vertices, boundary = rectangular_cross(int(length/dx), int(width/dy), len1=length, len2=width) domain = Domain(points, vertices, boundary) domain.set_name('Rain_volume_runoff_test') # Output name #---------------------------------------------------------------------- # Setup initial conditions #---------------------------------------------------------------------- slope = 0.0 def topography(x, y): z=-x * slope return z domain.set_quantity('elevation', topography) # Use function for elevation domain.set_quantity('friction', 0.0) # Constant friction domain.set_quantity('stage', expression='elevation') # Dry initially #-------------------------------------------------------------- # Setup rain #-------------------------------------------------------------- # Fixed rain onto small circular area fixed_rain = Rainfall(domain, center=(10.0, 10.0), radius=5.00, rate=10.00) # 10 mm/s domain.forcing_terms.append(fixed_rain) #---------------------------------------------------------------------- # Setup boundary conditions #---------------------------------------------------------------------- Br = Reflective_boundary(domain) # Solid reflective wall Bt = Transmissive_stage_zero_momentum_boundary(domain) Bd = Dirichlet_boundary([-10, 0, 0]) domain.set_boundary({'left': Bt, 'right': Bd, 'top': Bt, 'bottom': Bt}) #---------------------------------------------------------------------- # Evolve system through time #---------------------------------------------------------------------- ref_volume = 0.0 ys = 10.0 # Yieldstep for t in domain.evolve(yieldstep=ys, finaltime=finaltime): # Check volume V = domain.compute_total_volume() msg = 'V = %e, Ref = %e' % (V, ref_volume) #assert num.allclose(V, ref_volume) or V < ref_volume, msg if verbose: print domain.timestepping_statistics() print domain.volumetric_balance_statistics() print 'reference volume', ref_volume print V # Update reference volume. # FIXME: Note that rate has now been redefined # as m/s internally. This is a little confusing # when it was specfied as mm/s. delta_V = fixed_rain.rate*fixed_rain.exchange_area ref_volume += ys * delta_V # Compute outflow at right hand downstream boundary boundary_flows, inflow , outflow = domain.compute_boundary_flows() net_outflow = outflow - inflow outflow = boundary_flows['right'] if verbose: print 'Outflow', outflow print 'Net outflow', net_outflow # Update reference volume ref_volume += ys * outflow def test_inflow_using_flowline(self): """test_inflow_using_flowline Test the ability of a flowline to match inflow above the flowline by creating constant inflow onto a circle at the head of a 20m wide by 300m long plane dipping at various slopes with a perpendicular flowline and gauge downstream of the inflow and a 45 degree flowlines at 200m downstream A more substantial version of this test with finer resolution and including the depth calculation using Manning's equation is available under the validate_all suite in the directory anuga_validation/automated_validation_tests/flow_tests """ verbose = False #---------------------------------------------------------------------- # Import necessary modules #---------------------------------------------------------------------- from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross from anuga.shallow_water import Domain from anuga.shallow_water.shallow_water_domain import Reflective_boundary from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary from anuga.shallow_water.shallow_water_domain import Inflow from anuga.shallow_water.data_manager import get_flow_through_cross_section from anuga.abstract_2d_finite_volumes.util import sww2csv_gauges, csv2timeseries_graphs #---------------------------------------------------------------------- # Setup computational domain #---------------------------------------------------------------------- number_of_inflows = 2 # Number of inflows on top of each other finaltime = 500 #700.0 # If this is too short, steady state will not be achieved length = 250. width = 20. dx = dy = 5 # Resolution: of grid on both axes points, vertices, boundary = rectangular_cross(int(length/dx), int(width/dy), len1=length, len2=width) for mannings_n in [0.1, 0.01]: # Loop over a range of roughnesses for slope in [1.0/300, 1.0/100]: # Loop over a range of bedslopes representing sub to super critical flows domain = Domain(points, vertices, boundary) domain.set_name('inflow_flowline_test') #------------------------------------------------------------- # Setup initial conditions #------------------------------------------------------------- def topography(x, y): z=-x * slope return z domain.set_quantity('elevation', topography) domain.set_quantity('friction', mannings_n) domain.set_quantity('stage', expression='elevation') #-------------------------------------------------------------- # Setup Inflow #-------------------------------------------------------------- # Fixed Flowrate onto Area fixed_inflow = Inflow(domain, center=(10.0, 10.0), radius=5.00, rate=10.00) # Stack this flow for i in range(number_of_inflows): domain.forcing_terms.append(fixed_inflow) ref_flow = fixed_inflow.rate*number_of_inflows # Compute normal depth on plane using Mannings equation # v=1/n*(r^2/3)*(s^0.5) or r=(Q*n/(s^0.5*W))^0.6 normal_depth=(ref_flow*mannings_n/(slope**0.5*width))**0.6 if verbose: print print 'Slope:', slope, 'Mannings n:', mannings_n #-------------------------------------------------------------- # Setup boundary conditions #-------------------------------------------------------------- Br = Reflective_boundary(domain) # Define downstream boundary based on predicted depth def normal_depth_stage_downstream(t): return (-slope*length) + normal_depth Bt = Transmissive_momentum_set_stage_boundary(domain=domain, function=normal_depth_stage_downstream) domain.set_boundary({'left': Br, 'right': Bt, 'top': Br, 'bottom': Br}) #-------------------------------------------------------------- # Evolve system through time #-------------------------------------------------------------- for t in domain.evolve(yieldstep=100.0, finaltime=finaltime): pass #if verbose : # print domain.timestepping_statistics() # print domain.volumetric_balance_statistics() #-------------------------------------------------------------- # Compute flow thru flowlines ds of inflow #-------------------------------------------------------------- # Square on flowline at 200m q=domain.get_flow_through_cross_section([[200.0,0.0], [200.0,20.0]]) msg = 'Predicted flow was %f, should have been %f' %\ (q, ref_flow) if verbose: print '90 degree flowline: ANUGA = %f, Ref = %f' %\ (q, ref_flow) assert num.allclose(q, ref_flow, rtol=1.0e-2), msg # 45 degree flowline at 200m q=domain.get_flow_through_cross_section([[200.0,0.0], [220.0,20.0]]) msg = 'Predicted flow was %f, should have been %f' %\ (q, ref_flow) if verbose: print '45 degree flowline: ANUGA = %f, Ref = %f' %\ (q, ref_flow) assert num.allclose(q, ref_flow, rtol=1.0e-2), msg def Xtest_inflow_boundary_using_flowline(self): """test_inflow_boundary_using_flowline Test the ability of a flowline to match inflow above the flowline by creating constant inflow into the boundary at the head of a 20m wide by 300m long plane dipping at various slopes with a perpendicular flowline and gauge downstream of the inflow and a 45 degree flowlines at 200m downstream """ # FIXME (Ole): Work in progress verbose = False #---------------------------------------------------------------------- # Import necessary modules #---------------------------------------------------------------------- from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross from anuga.shallow_water import Domain from anuga.shallow_water.shallow_water_domain import Reflective_boundary from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary from anuga.shallow_water.shallow_water_domain import Inflow_boundary from anuga.shallow_water.data_manager import get_flow_through_cross_section from anuga.abstract_2d_finite_volumes.util import sww2csv_gauges, csv2timeseries_graphs #---------------------------------------------------------------------- # Setup computational domain #---------------------------------------------------------------------- finaltime = 500 #700.0 # If this is too short, steady state will not be achieved length = 250. width = 20. dx = dy = 5 # Resolution: of grid on both axes points, vertices, boundary = rectangular_cross(int(length/dx), int(width/dy), len1=length, len2=width) for mannings_n in [0.1, 0.01]: # Loop over a range of roughnesses for slope in [1.0/300, 1.0/100]: # Loop over a range of bedslopes representing sub to super critical flows domain = Domain(points, vertices, boundary) domain.set_name('inflow_boundary_flowline_test') #------------------------------------------------------------- # Setup initial conditions #------------------------------------------------------------- def topography(x, y): z=-x * slope return z domain.set_quantity('elevation', topography) domain.set_quantity('friction', mannings_n) domain.set_quantity('stage', expression='elevation') #-------------------------------------------------------------- # Setup boundary conditions #-------------------------------------------------------------- ref_flow = 10.00 # Compute normal depth on plane using Mannings equation # v=1/n*(r^2/3)*(s^0.5) or r=(Q*n/(s^0.5*W))^0.6 normal_depth=(ref_flow*mannings_n/(slope**0.5*width))**0.6 if verbose: print print 'Slope:', slope, 'Mannings n:', mannings_n Bi = Inflow_boundary(domain, rate=ref_flow) Br = Reflective_boundary(domain) # Define downstream boundary based on predicted depth def normal_depth_stage_downstream(t): return (-slope*length) + normal_depth Bt = Transmissive_momentum_set_stage_boundary(domain=domain, function=normal_depth_stage_downstream) domain.set_boundary({'left': Bi, 'right': Bt, 'top': Br, 'bottom': Br}) #-------------------------------------------------------------- # Evolve system through time #-------------------------------------------------------------- for t in domain.evolve(yieldstep=100.0, finaltime=finaltime): pass #if verbose : # print domain.timestepping_statistics() # print domain.volumetric_balance_statistics() #-------------------------------------------------------------- # Compute flow thru flowlines ds of inflow #-------------------------------------------------------------- # Square on flowline at 200m q=domain.get_flow_through_cross_section([[200.0,0.0],[200.0,20.0]]) msg = 'Predicted flow was %f, should have been %f' % (q, ref_flow) if verbose: print '90 degree flowline: ANUGA = %f, Ref = %f' % (q, ref_flow) assert num.allclose(q, ref_flow, rtol=1.0e-2), msg # 45 degree flowline at 200m q=domain.get_flow_through_cross_section([[200.0,0.0],[220.0,20.0]]) msg = 'Predicted flow was %f, should have been %f' % (q, ref_flow) if verbose: print '45 degree flowline: ANUGA = %f, Ref = %f' % (q, ref_flow) assert num.allclose(q, ref_flow, rtol=1.0e-2), msg def Xtest_friction_dependent_flow_using_flowline(self): """test_friction_dependent_flow_using_flowline Test the internal flow (using flowline) as a function of different values of Mannings n and different slopes. Flow is applied in the form of boundary conditions with fixed momentum. """ # FIXME(Ole): This is not finished and maybe not necessary # anymore (see test_inflow_using_flowline) verbose = True #------------------------------------------------------------------------------ # Import necessary modules #------------------------------------------------------------------------------ from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross from anuga.shallow_water import Domain from anuga.shallow_water.shallow_water_domain import Reflective_boundary from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary from anuga.shallow_water.shallow_water_domain import Inflow from anuga.shallow_water.data_manager import get_flow_through_cross_section from anuga.abstract_2d_finite_volumes.util import sww2csv_gauges, csv2timeseries_graphs #------------------------------------------------------------------------------ # Setup computational domain #------------------------------------------------------------------------------ finaltime = 1000.0 length = 300. width = 20. dx = dy = 5 # Resolution: of grid on both axes # Input parameters uh = 1.0 vh = 0.0 d = 1.0 ref_flow = uh*d*width # 20 m^3/s in the x direction across entire domain points, vertices, boundary = rectangular_cross(int(length/dx), int(width/dy), len1=length, len2=width) for mannings_n in [0.035]: #[0.0, 0.012, 0.035]: for slope in [1.0/300]: #[0.0, 1.0/300, 1.0/150]: # Loop over a range of bedslopes representing sub to super critical flows if verbose: print print 'Slope:', slope, 'Mannings n:', mannings_n domain = Domain(points, vertices, boundary) domain.set_name('Inflow_flowline_test') # Output name #------------------------------------------------------------------------------ # Setup initial conditions #------------------------------------------------------------------------------ def topography(x, y): z=-x * slope return z domain.set_quantity('elevation', topography) # Use function for elevation domain.set_quantity('friction', mannings_n) # Constant friction #domain.set_quantity('stage', # expression='elevation') # Set initial flow as depth=1m, uh=1.0 m/s, vh = 0.0 # making it 20 m^3/s across entire domain domain.set_quantity('stage', expression='elevation + %f' % d) domain.set_quantity('xmomentum', uh) domain.set_quantity('ymomentum', vh) #------------------------------------------------------------------------------ # Setup boundary conditions #------------------------------------------------------------------------------ Br = Reflective_boundary(domain) # Solid reflective wall # Constant flow in and out of domain # Depth = 1m, uh=1 m/s, i.e. a flow of 20 m^3/s # across boundaries Bi = Dirichlet_boundary([d, uh, vh]) Bo = Dirichlet_boundary([-length*slope+d, uh, vh]) #Bo = Dirichlet_boundary([-100, 0, 0]) domain.set_boundary({'left': Bi, 'right': Bo, 'top': Br, 'bottom': Br}) #------------------------------------------------------------------------------ # Evolve system through time #------------------------------------------------------------------------------ for t in domain.evolve(yieldstep=100.0, finaltime=finaltime): if verbose : print domain.timestepping_statistics() print domain.volumetric_balance_statistics() # 90 degree flowline at 200m q=domain.get_flow_through_cross_section([[200.0,0.0],[200.0,20.0]]) msg = 'Predicted flow was %f, should have been %f' % (q, ref_flow) if verbose: print '90 degree flowline: ANUGA = %f, Ref = %f' % (q, ref_flow) # 45 degree flowline at 200m q=domain.get_flow_through_cross_section([[200.0,0.0],[220.0,20.0]]) msg = 'Predicted flow was %f, should have been %f' % (q, ref_flow) if verbose: print '45 degree flowline: ANUGA = %f, Ref = %f' % (q, ref_flow) # Stage recorder (gauge) in middle of plane at 200m x=200.0 y=10.00 w = domain.get_quantity('stage').get_values(interpolation_points=[[x, y]])[0] z = domain.get_quantity('elevation').get_values(interpolation_points=[[x, y]])[0] domain_depth = w-z xmom = domain.get_quantity('xmomentum').get_values(interpolation_points=[[x, y]])[0] ymom = domain.get_quantity('ymomentum').get_values(interpolation_points=[[x, y]])[0] if verbose: print 'At interpolation point (h, uh, vh): ', domain_depth, xmom, ymom print 'uh * d * width = ', xmom*domain_depth*width if slope > 0.0: # Compute normal depth at gauge location using Manning equation # v=1/n*(r^2/3)*(s^0.5) or r=(Q*n/(s^0.5*W))^0.6 normal_depth=(ref_flow*mannings_n/(slope**0.5*width))**0.6 if verbose: print 'Depth: ANUGA = %f, Mannings = %f' % (domain_depth, normal_depth) if __name__ == "__main__": suite = unittest.makeSuite(Test_Shallow_Water, 'test') runner = unittest.TextTestRunner(verbosity=1) runner.run(suite)