#!/usr/bin/env python import unittest, os from math import sqrt, pi from config import g, epsilon from Numeric import allclose, array, zeros, ones, Float, take from utilities.numerical_tools import mean from shallow_water import * #Variable windfield implemented using functions def speed(t,x,y): """Large speeds halfway between center and edges Low speeds at center and edges """ from math import sqrt, exp, cos, pi x = array(x) y = array(y) N = len(x) s = 0*x #New array for k in range(N): r = sqrt(x[k]**2 + y[k]**2) factor = exp( -(r-0.15)**2 ) s[k] = 4000 * factor * (cos(t*2*pi/150) + 2) return s def scalar_func(t,x,y): """Function that returns a scalar. Used to test error message when Numeric array is expected """ return 17.7 def angle(t,x,y): """Rotating field """ from math import sqrt, atan, pi x = array(x) y = array(y) N = len(x) a = 0*x #New array for k in range(N): r = sqrt(x[k]**2 + y[k]**2) angle = atan(y[k]/x[k]) if x[k] < 0: angle+=pi #atan in ]-pi/2; pi/2[ #Take normal direction angle -= pi/2 #Ensure positive radians if angle < 0: angle += 2*pi a[k] = angle/pi*180 return a class Test_Shallow_Water(unittest.TestCase): def setUp(self): pass def tearDown(self): pass def test_rotate(self): normal = array([0.0,-1.0]) q = array([1.0,2.0,3.0]) r = rotate(q, normal, direction = 1) assert r[0] == 1 assert r[1] == -3 assert r[2] == 2 w = rotate(r, normal, direction = -1) assert allclose(w, q) #Check error check try: rotate(r, array([1,1,1]) ) except: pass else: raise 'Should have raised an exception' def test_flux_zero_case(self): ql = zeros( 3, Float ) qr = zeros( 3, Float ) normal = zeros( 2, Float ) zl = zr = 0. flux, max_speed = flux_function(normal, ql, qr, zl, zr) assert allclose(flux, [0,0,0]) assert max_speed == 0. def test_flux_constants(self): w = 2.0 normal = array([1.,0]) ql = array([w, 0, 0]) qr = array([w, 0, 0]) zl = zr = 0. h = w - (zl+zr)/2 flux, max_speed = flux_function(normal, ql, qr, zl, zr) assert allclose(flux, [0., 0.5*g*h**2, 0.]) assert max_speed == sqrt(g*h) #def test_flux_slope(self): # #FIXME: TODO # w = 2.0 # # normal = array([1.,0]) # ql = array([w, 0, 0]) # qr = array([w, 0, 0]) # zl = zr = 0. # h = w - (zl+zr)/2 # # flux, max_speed = flux_function(normal, ql, qr, zl, zr) # # assert allclose(flux, [0., 0.5*g*h**2, 0.]) # assert max_speed == sqrt(g*h) def test_flux1(self): #Use data from previous version of pyvolution normal = array([1.,0]) ql = array([-0.2, 2, 3]) qr = array([-0.2, 2, 3]) zl = zr = -0.5 flux, max_speed = flux_function(normal, ql, qr, zl, zr) assert allclose(flux, [2.,13.77433333, 20.]) assert allclose(max_speed, 8.38130948661) def test_flux2(self): #Use data from previous version of pyvolution normal = array([0., -1.]) ql = array([-0.075, 2, 3]) qr = array([-0.075, 2, 3]) zl = zr = -0.375 flux, max_speed = flux_function(normal, ql, qr, zl, zr) assert allclose(flux, [-3.,-20.0, -30.441]) assert allclose(max_speed, 11.7146428199) def test_flux3(self): #Use data from previous version of pyvolution normal = array([-sqrt(2)/2, sqrt(2)/2]) ql = array([-0.075, 2, 3]) qr = array([-0.075, 2, 3]) zl = zr = -0.375 flux, max_speed = flux_function(normal, ql, qr, zl, zr) assert allclose(flux, [sqrt(2)/2, 4.40221112, 7.3829019]) assert allclose(max_speed, 4.0716654239) def test_flux4(self): #Use data from previous version of pyvolution normal = array([-sqrt(2)/2, sqrt(2)/2]) ql = array([-0.34319278, 0.10254161, 0.07273855]) qr = array([-0.30683287, 0.1071986, 0.05930515]) zl = zr = -0.375 flux, max_speed = flux_function(normal, ql, qr, zl, zr) assert allclose(flux, [-0.04072676, -0.07096636, -0.01604364]) assert allclose(max_speed, 1.31414103233) def test_sw_domain_simple(self): a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe, daf, dae vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) domain.check_integrity() for name in ['stage', 'xmomentum', 'ymomentum', 'elevation', 'friction']: assert domain.quantities.has_key(name) assert domain.get_conserved_quantities(0, edge=1) == 0. def test_boundary_conditions(self): a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4] ] boundary = { (0, 0): 'Third', (0, 2): 'First', (2, 0): 'Second', (2, 1): 'Second', (3, 1): 'Second', (3, 2): 'Third'} domain = Domain(points, vertices, boundary) domain.check_integrity() domain.set_quantity('stage', [[1,2,3], [5,5,5], [0,0,9], [-6, 3, 3]]) domain.set_quantity('xmomentum', [[1,1,1], [2,2,2], [3,3,3], [4, 4, 4]]) domain.set_quantity('ymomentum', [[10,10,10], [20,20,20], [30,30,30], [40, 40, 40]]) D = Dirichlet_boundary([5,2,1]) T = Transmissive_boundary(domain) R = Reflective_boundary(domain) domain.set_boundary( {'First': D, 'Second': T, 'Third': R}) domain.update_boundary() #Stage assert domain.quantities['stage'].boundary_values[0] == 2.5 assert domain.quantities['stage'].boundary_values[0] ==\ domain.get_conserved_quantities(0, edge=0)[0] #Reflective (2.5) assert domain.quantities['stage'].boundary_values[1] == 5. #Dirichlet assert domain.quantities['stage'].boundary_values[2] ==\ domain.get_conserved_quantities(2, edge=0)[0] #Transmissive (4.5) assert domain.quantities['stage'].boundary_values[3] ==\ domain.get_conserved_quantities(2, edge=1)[0] #Transmissive (4.5) assert domain.quantities['stage'].boundary_values[4] ==\ domain.get_conserved_quantities(3, edge=1)[0] #Transmissive (-1.5) assert domain.quantities['stage'].boundary_values[5] ==\ domain.get_conserved_quantities(3, edge=2)[0] #Reflective (-1.5) #Xmomentum assert domain.quantities['xmomentum'].boundary_values[0] == 1.0 #Reflective assert domain.quantities['xmomentum'].boundary_values[1] == 2. #Dirichlet assert domain.quantities['xmomentum'].boundary_values[2] ==\ domain.get_conserved_quantities(2, edge=0)[1] #Transmissive assert domain.quantities['xmomentum'].boundary_values[3] ==\ domain.get_conserved_quantities(2, edge=1)[1] #Transmissive assert domain.quantities['xmomentum'].boundary_values[4] ==\ domain.get_conserved_quantities(3, edge=1)[1] #Transmissive assert domain.quantities['xmomentum'].boundary_values[5] == -4.0 #Reflective #Ymomentum assert domain.quantities['ymomentum'].boundary_values[0] == -10.0 #Reflective assert domain.quantities['ymomentum'].boundary_values[1] == 1. #Dirichlet assert domain.quantities['ymomentum'].boundary_values[2] == 30. #Transmissive assert domain.quantities['ymomentum'].boundary_values[3] == 30. #Transmissive assert domain.quantities['ymomentum'].boundary_values[4] == 40. #Transmissive assert domain.quantities['ymomentum'].boundary_values[5] == 40. #Reflective def test_boundary_conditionsII(self): a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4] ] boundary = { (0, 0): 'Third', (0, 2): 'First', (2, 0): 'Second', (2, 1): 'Second', (3, 1): 'Second', (3, 2): 'Third', (0, 1): 'Internal'} domain = Domain(points, vertices, boundary) domain.check_integrity() domain.set_quantity('stage', [[1,2,3], [5,5,5], [0,0,9], [-6, 3, 3]]) domain.set_quantity('xmomentum', [[1,1,1], [2,2,2], [3,3,3], [4, 4, 4]]) domain.set_quantity('ymomentum', [[10,10,10], [20,20,20], [30,30,30], [40, 40, 40]]) D = Dirichlet_boundary([5,2,1]) T = Transmissive_boundary(domain) R = Reflective_boundary(domain) domain.set_boundary( {'First': D, 'Second': T, 'Third': R, 'Internal': None}) domain.update_boundary() domain.check_integrity() def test_compute_fluxes0(self): #Do a full triangle and check that fluxes cancel out for #the constant stage case a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) domain.set_quantity('stage', [[2,2,2], [2,2,2], [2,2,2], [2,2,2]]) domain.check_integrity() assert allclose(domain.neighbours, [[-1,1,-1], [2,3,0], [-1,-1,1],[1,-1,-1]]) assert allclose(domain.neighbour_edges, [[-1,2,-1], [2,0,1], [-1,-1,0],[1,-1,-1]]) zl=zr=0. #Assume flat bed #Flux across right edge of volume 1 normal = domain.get_normal(1,0) ql = domain.get_conserved_quantities(vol_id=1, edge=0) qr = domain.get_conserved_quantities(vol_id=2, edge=2) flux0, max_speed = flux_function(normal, ql, qr, zl, zr) #Check that flux seen from other triangles is inverse tmp = qr; qr=ql; ql=tmp normal = domain.get_normal(2,2) flux, max_speed = flux_function(normal, ql, qr, zl, zr) assert allclose(flux + flux0, 0.) #Flux across upper edge of volume 1 normal = domain.get_normal(1,1) ql = domain.get_conserved_quantities(vol_id=1, edge=1) qr = domain.get_conserved_quantities(vol_id=3, edge=0) flux1, max_speed = flux_function(normal, ql, qr, zl, zr) #Check that flux seen from other triangles is inverse tmp = qr; qr=ql; ql=tmp normal = domain.get_normal(3,0) flux, max_speed = flux_function(normal, ql, qr, zl, zr) assert allclose(flux + flux1, 0.) #Flux across lower left hypotenuse of volume 1 normal = domain.get_normal(1,2) ql = domain.get_conserved_quantities(vol_id=1, edge=2) qr = domain.get_conserved_quantities(vol_id=0, edge=1) flux2, max_speed = flux_function(normal, ql, qr, zl, zr) #Check that flux seen from other triangles is inverse tmp = qr; qr=ql; ql=tmp normal = domain.get_normal(0,1) flux, max_speed = flux_function(normal, ql, qr, zl, zr) assert allclose(flux + flux2, 0.) #Scale by edgelengths, add up anc check that total flux is zero e0 = domain.edgelengths[1, 0] e1 = domain.edgelengths[1, 1] e2 = domain.edgelengths[1, 2] assert allclose(e0*flux0+e1*flux1+e2*flux2, 0.) #Now check that compute_flux yields zeros as well domain.compute_fluxes() for name in ['stage', 'xmomentum', 'ymomentum']: #print name, domain.quantities[name].explicit_update assert allclose(domain.quantities[name].explicit_update[1], 0) def test_compute_fluxes1(self): #Use values from previous version a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) val0 = 2.+2.0/3 val1 = 4.+4.0/3 val2 = 8.+2.0/3 val3 = 2.+8.0/3 domain.set_quantity('stage', [[val0, val0, val0], [val1, val1, val1], [val2, val2, val2], [val3, val3, val3]]) domain.check_integrity() zl=zr=0. #Assume flat bed #Flux across right edge of volume 1 normal = domain.get_normal(1,0) ql = domain.get_conserved_quantities(vol_id=1, edge=0) qr = domain.get_conserved_quantities(vol_id=2, edge=2) flux0, max_speed = flux_function(normal, ql, qr, zl, zr) assert allclose(flux0, [-15.3598804, 253.71111111, 0.]) assert allclose(max_speed, 9.21592824046) #Flux across upper edge of volume 1 normal = domain.get_normal(1,1) ql = domain.get_conserved_quantities(vol_id=1, edge=1) qr = domain.get_conserved_quantities(vol_id=3, edge=0) flux1, max_speed = flux_function(normal, ql, qr, zl, zr) assert allclose(flux1, [2.4098563, 0., 123.04444444]) assert allclose(max_speed, 7.22956891292) #Flux across lower left hypotenuse of volume 1 normal = domain.get_normal(1,2) ql = domain.get_conserved_quantities(vol_id=1, edge=2) qr = domain.get_conserved_quantities(vol_id=0, edge=1) flux2, max_speed = flux_function(normal, ql, qr, zl, zr) assert allclose(flux2, [9.63942522, -61.59685738, -61.59685738]) assert allclose(max_speed, 7.22956891292) #Scale, add up and check that compute_fluxes is correct for vol 1 e0 = domain.edgelengths[1, 0] e1 = domain.edgelengths[1, 1] e2 = domain.edgelengths[1, 2] total_flux = -(e0*flux0+e1*flux1+e2*flux2)/domain.areas[1] assert allclose(total_flux, [-0.68218178, -166.6, -35.93333333]) domain.compute_fluxes() #assert allclose(total_flux, domain.explicit_update[1,:]) for i, name in enumerate(['stage', 'xmomentum', 'ymomentum']): assert allclose(total_flux[i], domain.quantities[name].explicit_update[1]) #assert allclose(domain.explicit_update, [ # [0., -69.68888889, -69.68888889], # [-0.68218178, -166.6, -35.93333333], # [-111.77316251, 69.68888889, 0.], # [-35.68522449, 0., 69.68888889]]) assert allclose(domain.quantities['stage'].explicit_update, [0., -0.68218178, -111.77316251, -35.68522449]) assert allclose(domain.quantities['xmomentum'].explicit_update, [-69.68888889, -166.6, 69.68888889, 0]) assert allclose(domain.quantities['ymomentum'].explicit_update, [-69.68888889, -35.93333333, 0., 69.68888889]) #assert allclose(domain.quantities[name].explicit_update def test_compute_fluxes2(self): #Random values, incl momentum a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) val0 = 2.+2.0/3 val1 = 4.+4.0/3 val2 = 8.+2.0/3 val3 = 2.+8.0/3 zl=zr=0 #Assume flat zero bed domain.set_quantity('elevation', zl*ones( (4,3) )) domain.set_quantity('stage', [[val0, val0-1, val0-2], [val1, val1+1, val1], [val2, val2-2, val2], [val3-0.5, val3, val3]]) domain.set_quantity('xmomentum', [[1, 2, 3], [3, 4, 5], [1, -1, 0], [0, -2, 2]]) domain.set_quantity('ymomentum', [[1, -1, 0], [0, -3, 2], [0, 1, 0], [-1, 2, 2]]) domain.check_integrity() #Flux across right edge of volume 1 normal = domain.get_normal(1,0) ql = domain.get_conserved_quantities(vol_id=1, edge=0) qr = domain.get_conserved_quantities(vol_id=2, edge=2) flux0, max_speed = flux_function(normal, ql, qr, zl, zr) #Flux across upper edge of volume 1 normal = domain.get_normal(1,1) ql = domain.get_conserved_quantities(vol_id=1, edge=1) qr = domain.get_conserved_quantities(vol_id=3, edge=0) flux1, max_speed = flux_function(normal, ql, qr, zl, zr) #Flux across lower left hypotenuse of volume 1 normal = domain.get_normal(1,2) ql = domain.get_conserved_quantities(vol_id=1, edge=2) qr = domain.get_conserved_quantities(vol_id=0, edge=1) flux2, max_speed = flux_function(normal, ql, qr, zl, zr) #Scale, add up and check that compute_fluxes is correct for vol 1 e0 = domain.edgelengths[1, 0] e1 = domain.edgelengths[1, 1] e2 = domain.edgelengths[1, 2] total_flux = -(e0*flux0+e1*flux1+e2*flux2)/domain.areas[1] domain.compute_fluxes() for i, name in enumerate(['stage', 'xmomentum', 'ymomentum']): assert allclose(total_flux[i], domain.quantities[name].explicit_update[1]) #assert allclose(total_flux, domain.explicit_update[1,:]) def test_compute_fluxes3(self): #Random values, incl momentum a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) val0 = 2.+2.0/3 val1 = 4.+4.0/3 val2 = 8.+2.0/3 val3 = 2.+8.0/3 zl=zr=-3.75 #Assume constant bed (must be less than stage) domain.set_quantity('elevation', zl*ones( (4,3) )) domain.set_quantity('stage', [[val0, val0-1, val0-2], [val1, val1+1, val1], [val2, val2-2, val2], [val3-0.5, val3, val3]]) domain.set_quantity('xmomentum', [[1, 2, 3], [3, 4, 5], [1, -1, 0], [0, -2, 2]]) domain.set_quantity('ymomentum', [[1, -1, 0], [0, -3, 2], [0, 1, 0], [-1, 2, 2]]) domain.check_integrity() #Flux across right edge of volume 1 normal = domain.get_normal(1,0) ql = domain.get_conserved_quantities(vol_id=1, edge=0) qr = domain.get_conserved_quantities(vol_id=2, edge=2) flux0, max_speed = flux_function(normal, ql, qr, zl, zr) #Flux across upper edge of volume 1 normal = domain.get_normal(1,1) ql = domain.get_conserved_quantities(vol_id=1, edge=1) qr = domain.get_conserved_quantities(vol_id=3, edge=0) flux1, max_speed = flux_function(normal, ql, qr, zl, zr) #Flux across lower left hypotenuse of volume 1 normal = domain.get_normal(1,2) ql = domain.get_conserved_quantities(vol_id=1, edge=2) qr = domain.get_conserved_quantities(vol_id=0, edge=1) flux2, max_speed = flux_function(normal, ql, qr, zl, zr) #Scale, add up and check that compute_fluxes is correct for vol 1 e0 = domain.edgelengths[1, 0] e1 = domain.edgelengths[1, 1] e2 = domain.edgelengths[1, 2] total_flux = -(e0*flux0+e1*flux1+e2*flux2)/domain.areas[1] domain.compute_fluxes() for i, name in enumerate(['stage', 'xmomentum', 'ymomentum']): assert allclose(total_flux[i], domain.quantities[name].explicit_update[1]) def test_catching_negative_heights(self): a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) val0 = 2.+2.0/3 val1 = 4.+4.0/3 val2 = 8.+2.0/3 val3 = 2.+8.0/3 zl=zr=4 #Too large domain.set_quantity('elevation', zl*ones( (4,3) )) domain.set_quantity('stage', [[val0, val0-1, val0-2], [val1, val1+1, val1], [val2, val2-2, val2], [val3-0.5, val3, val3]]) #Should fail try: domain.check_integrity() except: pass ##################################################### def test_initial_condition(self): """Test that initial condition is output at time == 0 """ from config import g import copy a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) #Set up for a gradient of (3,0) at mid triangle def slope(x, y): return 3*x h = 0.1 def stage(x,y): return slope(x,y)+h domain.set_quantity('elevation', slope) domain.set_quantity('stage', stage) initial_stage = copy.copy(domain.quantities['stage'].vertex_values) domain.set_boundary({'exterior': Reflective_boundary(domain)}) #Evolution for t in domain.evolve(yieldstep = 1.0, finaltime = 2.0): stage = domain.quantities['stage'].vertex_values if t == 0.0: assert allclose(stage, initial_stage) else: assert not allclose(stage, initial_stage) os.remove(domain.filename + '.sww') ##################################################### def test_gravity(self): #Assuming no friction from config import g a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) #Set up for a gradient of (3,0) at mid triangle def slope(x, y): return 3*x h = 0.1 def stage(x,y): return slope(x,y)+h domain.set_quantity('elevation', slope) domain.set_quantity('stage', stage) for name in domain.conserved_quantities: assert allclose(domain.quantities[name].explicit_update, 0) assert allclose(domain.quantities[name].semi_implicit_update, 0) domain.compute_forcing_terms() assert allclose(domain.quantities['stage'].explicit_update, 0) assert allclose(domain.quantities['xmomentum'].explicit_update, -g*h*3) assert allclose(domain.quantities['ymomentum'].explicit_update, 0) def test_manning_friction(self): from config import g a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) #Set up for a gradient of (3,0) at mid triangle def slope(x, y): return 3*x h = 0.1 def stage(x,y): return slope(x,y)+h eta = 0.07 domain.set_quantity('elevation', slope) domain.set_quantity('stage', stage) domain.set_quantity('friction', eta) for name in domain.conserved_quantities: assert allclose(domain.quantities[name].explicit_update, 0) assert allclose(domain.quantities[name].semi_implicit_update, 0) domain.compute_forcing_terms() assert allclose(domain.quantities['stage'].explicit_update, 0) assert allclose(domain.quantities['xmomentum'].explicit_update, -g*h*3) assert allclose(domain.quantities['ymomentum'].explicit_update, 0) assert allclose(domain.quantities['stage'].semi_implicit_update, 0) assert allclose(domain.quantities['xmomentum'].semi_implicit_update, 0) assert allclose(domain.quantities['ymomentum'].semi_implicit_update, 0) #Create some momentum for friction to work with domain.set_quantity('xmomentum', 1) S = -g * eta**2 / h**(7.0/3) domain.compute_forcing_terms() assert allclose(domain.quantities['stage'].semi_implicit_update, 0) assert allclose(domain.quantities['xmomentum'].semi_implicit_update, S) assert allclose(domain.quantities['ymomentum'].semi_implicit_update, 0) #A more complex example domain.quantities['stage'].semi_implicit_update[:] = 0.0 domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0 domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0 domain.set_quantity('xmomentum', 3) domain.set_quantity('ymomentum', 4) S = -g * eta**2 * 5 / h**(7.0/3) domain.compute_forcing_terms() assert allclose(domain.quantities['stage'].semi_implicit_update, 0) assert allclose(domain.quantities['xmomentum'].semi_implicit_update, 3*S) assert allclose(domain.quantities['ymomentum'].semi_implicit_update, 4*S) def test_constant_wind_stress(self): from config import rho_a, rho_w, eta_w from math import pi, cos, sin, sqrt a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) #Flat surface with 1m of water domain.set_quantity('elevation', 0) domain.set_quantity('stage', 1.0) domain.set_quantity('friction', 0) Br = Reflective_boundary(domain) domain.set_boundary({'exterior': Br}) #Setup only one forcing term, constant wind stress s = 100 phi = 135 domain.forcing_terms = [] domain.forcing_terms.append( Wind_stress(s, phi) ) domain.compute_forcing_terms() const = eta_w*rho_a/rho_w #Convert to radians phi = phi*pi/180 #Compute velocity vector (u, v) u = s*cos(phi) v = s*sin(phi) #Compute wind stress S = const * sqrt(u**2 + v**2) assert allclose(domain.quantities['stage'].explicit_update, 0) assert allclose(domain.quantities['xmomentum'].explicit_update, S*u) assert allclose(domain.quantities['ymomentum'].explicit_update, S*v) def test_variable_wind_stress(self): from config import rho_a, rho_w, eta_w from math import pi, cos, sin, sqrt a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) #Flat surface with 1m of water domain.set_quantity('elevation', 0) domain.set_quantity('stage', 1.0) domain.set_quantity('friction', 0) Br = Reflective_boundary(domain) domain.set_boundary({'exterior': Br}) domain.time = 5.54 #Take a random time (not zero) #Setup only one forcing term, constant wind stress s = 100 phi = 135 domain.forcing_terms = [] domain.forcing_terms.append( Wind_stress(s = speed, phi = angle) ) domain.compute_forcing_terms() #Compute reference solution const = eta_w*rho_a/rho_w N = domain.number_of_elements xc = domain.get_centroid_coordinates() t = domain.time x = xc[:,0] y = xc[:,1] s_vec = speed(t,x,y) phi_vec = angle(t,x,y) for k in range(N): #Convert to radians phi = phi_vec[k]*pi/180 s = s_vec[k] #Compute velocity vector (u, v) u = s*cos(phi) v = s*sin(phi) #Compute wind stress S = const * sqrt(u**2 + v**2) assert allclose(domain.quantities['stage'].explicit_update[k], 0) assert allclose(domain.quantities['xmomentum'].explicit_update[k], S*u) assert allclose(domain.quantities['ymomentum'].explicit_update[k], S*v) def test_windfield_from_file(self): from config import rho_a, rho_w, eta_w from math import pi, cos, sin, sqrt from config import time_format from util import file_function import time a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) #Flat surface with 1m of water domain.set_quantity('elevation', 0) domain.set_quantity('stage', 1.0) domain.set_quantity('friction', 0) Br = Reflective_boundary(domain) domain.set_boundary({'exterior': Br}) domain.time = 7 #Take a time that is represented in file (not zero) #Write wind stress file (ensure that domain.time is covered) #Take x=1 and y=0 filename = 'test_windstress_from_file' start = time.mktime(time.strptime('2000', '%Y')) fid = open(filename + '.txt', 'w') dt = 1 #One second interval t = 0.0 while t <= 10.0: t_string = time.strftime(time_format, time.gmtime(t+start)) fid.write('%s, %f %f\n' %(t_string, speed(t,[1],[0])[0], angle(t,[1],[0])[0])) t += dt fid.close() #Convert ASCII file to NetCDF (Which is what we really like!) from data_manager import timefile2netcdf timefile2netcdf(filename) os.remove(filename + '.txt') #Setup wind stress F = file_function(filename + '.tms', quantities = ['Attribute0', 'Attribute1']) os.remove(filename + '.tms') #print 'F(5)', F(5) #print 'F(5,x,y)', F(5,x=zeros(3),y=zeros(3)) #print dir(F) #print F.T #print F.precomputed_values # #F = file_function(filename + '.txt') # #print dir(F) #print F.T #print F.Q W = Wind_stress(F) domain.forcing_terms = [] domain.forcing_terms.append(W) domain.compute_forcing_terms() #Compute reference solution const = eta_w*rho_a/rho_w N = domain.number_of_elements t = domain.time s = speed(t,[1],[0])[0] phi = angle(t,[1],[0])[0] #Convert to radians phi = phi*pi/180 #Compute velocity vector (u, v) u = s*cos(phi) v = s*sin(phi) #Compute wind stress S = const * sqrt(u**2 + v**2) for k in range(N): assert allclose(domain.quantities['stage'].explicit_update[k], 0) assert allclose(domain.quantities['xmomentum'].explicit_update[k], S*u) assert allclose(domain.quantities['ymomentum'].explicit_update[k], S*v) def test_wind_stress_error_condition(self): """Test that windstress reacts properly when forcing functions are wrong - e.g. returns a scalar """ from config import rho_a, rho_w, eta_w from math import pi, cos, sin, sqrt a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) #Flat surface with 1m of water domain.set_quantity('elevation', 0) domain.set_quantity('stage', 1.0) domain.set_quantity('friction', 0) Br = Reflective_boundary(domain) domain.set_boundary({'exterior': Br}) domain.time = 5.54 #Take a random time (not zero) #Setup only one forcing term, bad func domain.forcing_terms = [] try: domain.forcing_terms.append(Wind_stress(s = scalar_func, phi = angle)) except AssertionError: pass else: msg = 'Should have raised exception' raise msg try: domain.forcing_terms.append(Wind_stress(s = speed, phi = scalar_func)) except AssertionError: pass else: msg = 'Should have raised exception' raise msg try: domain.forcing_terms.append(Wind_stress(s = speed, phi = 'xx')) except: pass else: msg = 'Should have raised exception' raise msg ##################################################### def test_first_order_extrapolator_const_z(self): a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) val0 = 2.+2.0/3 val1 = 4.+4.0/3 val2 = 8.+2.0/3 val3 = 2.+8.0/3 zl=zr=-3.75 #Assume constant bed (must be less than stage) domain.set_quantity('elevation', zl*ones( (4,3) )) domain.set_quantity('stage', [[val0, val0-1, val0-2], [val1, val1+1, val1], [val2, val2-2, val2], [val3-0.5, val3, val3]]) domain.order = 1 domain.distribute_to_vertices_and_edges() #Check that centroid values were distributed to vertices C = domain.quantities['stage'].centroid_values for i in range(3): assert allclose( domain.quantities['stage'].vertex_values[:,i], C) def test_first_order_limiter_variable_z(self): #Check that first order limiter follows bed_slope from Numeric import alltrue, greater_equal from config import epsilon a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) val0 = 2.+2.0/3 val1 = 4.+4.0/3 val2 = 8.+2.0/3 val3 = 2.+8.0/3 domain.set_quantity('elevation', [[0,0,0], [6,0,0], [6,6,6], [6,6,6]]) domain.set_quantity('stage', [[val0, val0, val0], [val1, val1, val1], [val2, val2, val2], [val3, val3, val3]]) E = domain.quantities['elevation'].vertex_values L = domain.quantities['stage'].vertex_values #Check that some stages are not above elevation (within eps) #- so that the limiter has something to work with assert not alltrue(alltrue(greater_equal(L,E-epsilon))) domain.order = 1 domain.distribute_to_vertices_and_edges() #Check that all stages are above elevation (within eps) assert alltrue(alltrue(greater_equal(L,E-epsilon))) ##################################################### def test_distribute_basic(self): #Using test data generated by pyvolution-2 #Assuming no friction and flat bed (0.0) a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) val0 = 2. val1 = 4. val2 = 8. val3 = 2. domain.set_quantity('stage', [val0, val1, val2, val3], location='centroids') L = domain.quantities['stage'].vertex_values #First order domain.order = 1 domain.distribute_to_vertices_and_edges() assert allclose(L[1], val1) #Second order domain.order = 2 domain.distribute_to_vertices_and_edges() assert allclose(L[1], [2.2, 4.9, 4.9]) def test_distribute_away_from_bed(self): #Using test data generated by pyvolution-2 #Assuming no friction and flat bed (0.0) a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) L = domain.quantities['stage'].vertex_values def stage(x,y): return x**2 domain.set_quantity('stage', stage, location='centroids') a, b = domain.quantities['stage'].compute_gradients() assert allclose(a[1], 3.33333334) assert allclose(b[1], 0.0) domain.order = 1 domain.distribute_to_vertices_and_edges() assert allclose(L[1], 1.77777778) domain.order = 2 domain.distribute_to_vertices_and_edges() assert allclose(L[1], [0.57777777, 2.37777778, 2.37777778]) def test_distribute_away_from_bed1(self): #Using test data generated by pyvolution-2 #Assuming no friction and flat bed (0.0) a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) L = domain.quantities['stage'].vertex_values def stage(x,y): return x**4+y**2 domain.set_quantity('stage', stage, location='centroids') #print domain.quantities['stage'].centroid_values a, b = domain.quantities['stage'].compute_gradients() assert allclose(a[1], 25.18518519) assert allclose(b[1], 3.33333333) domain.order = 1 domain.distribute_to_vertices_and_edges() assert allclose(L[1], 4.9382716) domain.order = 2 domain.distribute_to_vertices_and_edges() assert allclose(L[1], [1.07160494, 6.46058131, 7.28262855]) def test_distribute_near_bed(self): #Using test data generated by pyvolution-2 #Assuming no friction and flat bed (0.0) a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) #Set up for a gradient of (3,0) at mid triangle def slope(x, y): return 10*x h = 0.1 def stage(x,y): return slope(x,y)+h domain.set_quantity('elevation', slope) domain.set_quantity('stage', stage, location='centroids') #print domain.quantities['elevation'].centroid_values #print domain.quantities['stage'].centroid_values E = domain.quantities['elevation'].vertex_values L = domain.quantities['stage'].vertex_values #print E domain.order = 1 domain.distribute_to_vertices_and_edges() ##assert allclose(L[1], [0.19999999, 20.05, 20.05]) assert allclose(L[1], [0.1, 20.1, 20.1]) domain.order = 2 domain.distribute_to_vertices_and_edges() assert allclose(L[1], [0.1, 20.1, 20.1]) def test_distribute_near_bed1(self): #Using test data generated by pyvolution-2 #Assuming no friction and flat bed (0.0) a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe vertices = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4]] domain = Domain(points, vertices) #Set up for a gradient of (3,0) at mid triangle def slope(x, y): return x**4+y**2 h = 0.1 def stage(x,y): return slope(x,y)+h domain.set_quantity('elevation', slope) domain.set_quantity('stage', stage) #print domain.quantities['elevation'].centroid_values #print domain.quantities['stage'].centroid_values E = domain.quantities['elevation'].vertex_values L = domain.quantities['stage'].vertex_values #print E domain.order = 1 domain.distribute_to_vertices_and_edges() ##assert allclose(L[1], [4.19999999, 16.07142857, 20.02857143]) assert allclose(L[1], [4.1, 16.1, 20.1]) domain.order = 2 domain.distribute_to_vertices_and_edges() assert allclose(L[1], [4.1, 16.1, 20.1]) def test_second_order_distribute_real_data(self): #Using test data generated by pyvolution-2 #Assuming no friction and flat bed (0.0) a = [0.0, 0.0] b = [0.0, 1.0/5] c = [0.0, 2.0/5] d = [1.0/5, 0.0] e = [1.0/5, 1.0/5] f = [1.0/5, 2.0/5] g = [2.0/5, 2.0/5] points = [a, b, c, d, e, f, g] #bae, efb, cbf, feg vertices = [ [1,0,4], [4,5,1], [2,1,5], [5,4,6]] domain = Domain(points, vertices) def slope(x, y): return -x/3 domain.set_quantity('elevation', slope) domain.set_quantity('stage', [0.01298164, 0.00365611, 0.01440365, -0.0381856437096], location='centroids') domain.set_quantity('xmomentum', [0.00670439, 0.01263789, 0.00647805, 0.0178180740668], location='centroids') domain.set_quantity('ymomentum', [-7.23510980e-004, -6.30413883e-005, 6.30413883e-005, 0.000200907255866], location='centroids') E = domain.quantities['elevation'].vertex_values L = domain.quantities['stage'].vertex_values X = domain.quantities['xmomentum'].vertex_values Y = domain.quantities['ymomentum'].vertex_values #print E domain.order = 2 domain.beta_h = 0.0 #Use first order in h-limiter domain.distribute_to_vertices_and_edges() #print L[1,:] #print X[1,:] #print Y[1,:] assert allclose(L[1,:], [-0.00825735775384, -0.00801881482869, 0.0272445025825]) assert allclose(X[1,:], [0.0143507718962, 0.0142502147066, 0.00931268339717]) assert allclose(Y[1,:], [-0.000117062180693, 7.94434448109e-005, -0.000151505429018]) def test_balance_deep_and_shallow(self): """Test that balanced limiters preserve conserved quantites. """ import copy a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe elements = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4] ] mesh = Domain(points, elements) mesh.check_integrity() #Create a deliberate overshoot mesh.set_quantity('stage', [[3,0,3], [2,2,6], [5,3,8], [8,3,5]]) mesh.set_quantity('elevation', 0) #Flat bed stage = mesh.quantities['stage'] ref_centroid_values = copy.copy(stage.centroid_values[:]) #Copy #Limit mesh.distribute_to_vertices_and_edges() #Assert that quantities are conserved from Numeric import sum for k in range(mesh.number_of_elements): assert allclose (ref_centroid_values[k], sum(stage.vertex_values[k,:])/3) #Now try with a non-flat bed - closely hugging initial stage in places #This will create alphas in the range [0, 0.478260, 1] mesh.set_quantity('stage', [[3,0,3], [2,2,6], [5,3,8], [8,3,5]]) mesh.set_quantity('elevation', [[0,0,0], [1.8,1.9,5.9], [4.6,0,0], [0,2,4]]) stage = mesh.quantities['stage'] ref_centroid_values = copy.copy(stage.centroid_values[:]) #Copy ref_vertex_values = copy.copy(stage.vertex_values[:]) #Copy #Limit mesh.distribute_to_vertices_and_edges() #Assert that all vertex quantities have changed for k in range(mesh.number_of_elements): #print ref_vertex_values[k,:], stage.vertex_values[k,:] assert not allclose (ref_vertex_values[k,:], stage.vertex_values[k,:]) #and assert that quantities are still conserved from Numeric import sum for k in range(mesh.number_of_elements): assert allclose (ref_centroid_values[k], sum(stage.vertex_values[k,:])/3) #Also check that Python and C version produce the same assert allclose (stage.vertex_values, [[2,2,2], [1.93333333, 2.03333333, 6.03333333], [6.93333333, 4.53333333, 4.53333333], [5.33333333, 5.33333333, 5.33333333]]) def test_conservation_1(self): """Test that stage is conserved globally This one uses a flat bed, reflective bdries and a suitable initial condition """ from mesh_factory import rectangular from shallow_water import Domain, Reflective_boundary,\ Dirichlet_boundary, Constant_height from Numeric import array #Create basic mesh points, vertices, boundary = rectangular(6, 6) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.smooth = False domain.default_order=2 #IC def x_slope(x, y): return x/3 domain.set_quantity('elevation', 0) domain.set_quantity('friction', 0) domain.set_quantity('stage', x_slope) # Boundary conditions (reflective everywhere) Br = Reflective_boundary(domain) domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br}) domain.check_integrity() #domain.visualise = True #If you want to take a sticky beak initial_volume = domain.quantities['stage'].get_integral() initial_xmom = domain.quantities['xmomentum'].get_integral() #print initial_xmom #Evolution for t in domain.evolve(yieldstep = 0.05, finaltime = 5.0): volume = domain.quantities['stage'].get_integral() assert allclose (volume, initial_volume) #I don't believe that the total momentum should be the same #It starts with zero and ends with zero though #xmom = domain.quantities['xmomentum'].get_integral() #print xmom #assert allclose (xmom, initial_xmom) os.remove(domain.filename + '.sww') def test_conservation_2(self): """Test that stage is conserved globally This one uses a slopy bed, reflective bdries and a suitable initial condition """ from mesh_factory import rectangular from shallow_water import Domain, Reflective_boundary,\ Dirichlet_boundary, Constant_height from Numeric import array #Create basic mesh points, vertices, boundary = rectangular(6, 6) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.smooth = False domain.default_order=2 #IC def x_slope(x, y): return x/3 domain.set_quantity('elevation', x_slope) domain.set_quantity('friction', 0) domain.set_quantity('stage', 0.4) #Steady # Boundary conditions (reflective everywhere) Br = Reflective_boundary(domain) domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br}) domain.check_integrity() #domain.visualise = True #If you want to take a sticky beak initial_volume = domain.quantities['stage'].get_integral() initial_xmom = domain.quantities['xmomentum'].get_integral() #print initial_xmom #Evolution for t in domain.evolve(yieldstep = 0.05, finaltime = 5.0): volume = domain.quantities['stage'].get_integral() assert allclose (volume, initial_volume) #FIXME: What would we expect from momentum #xmom = domain.quantities['xmomentum'].get_integral() #print xmom #assert allclose (xmom, initial_xmom) os.remove(domain.filename + '.sww') def test_conservation_3(self): """Test that stage is conserved globally This one uses a larger grid, convoluted bed, reflective bdries and a suitable initial condition """ from mesh_factory import rectangular from shallow_water import Domain, Reflective_boundary,\ Dirichlet_boundary, Constant_height from Numeric import array #Create basic mesh points, vertices, boundary = rectangular(2, 1) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.smooth = False domain.default_order = 2 domain.beta_h = 0.2 domain.set_quantities_to_be_stored(['stage']) #IC def x_slope(x, y): z = 0*x for i in range(len(x)): if x[i] < 0.3: z[i] = x[i]/3 if 0.3 <= x[i] < 0.5: z[i] = -0.5 if 0.5 <= x[i] < 0.7: z[i] = 0.39 if 0.7 <= x[i]: z[i] = x[i]/3 return z domain.set_quantity('elevation', x_slope) domain.set_quantity('friction', 0) domain.set_quantity('stage', 0.4) #Steady # Boundary conditions (reflective everywhere) Br = Reflective_boundary(domain) domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br}) domain.check_integrity() #domain.visualise = True #If you want to take a sticky beak initial_volume = domain.quantities['stage'].get_integral() initial_xmom = domain.quantities['xmomentum'].get_integral() import copy ref_centroid_values =\ copy.copy(domain.quantities['stage'].centroid_values) #print 'ORG', domain.quantities['stage'].centroid_values domain.distribute_to_vertices_and_edges() #print domain.quantities['stage'].centroid_values assert allclose(domain.quantities['stage'].centroid_values, ref_centroid_values) #Check that initial limiter doesn't violate cons quan assert allclose (domain.quantities['stage'].get_integral(), initial_volume) #Evolution for t in domain.evolve(yieldstep = 0.05, finaltime = 10): volume = domain.quantities['stage'].get_integral() #print t, volume, initial_volume assert allclose (volume, initial_volume) os.remove(domain.filename + '.sww') def test_conservation_4(self): """Test that stage is conserved globally This one uses a larger grid, convoluted bed, reflective bdries and a suitable initial condition """ from mesh_factory import rectangular from shallow_water import Domain, Reflective_boundary,\ Dirichlet_boundary, Constant_height from Numeric import array #Create basic mesh points, vertices, boundary = rectangular(6, 6) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.smooth = False domain.default_order=2 domain.beta_h = 0.0 #IC def x_slope(x, y): z = 0*x for i in range(len(x)): if x[i] < 0.3: z[i] = x[i]/3 if 0.3 <= x[i] < 0.5: z[i] = -0.5 if 0.5 <= x[i] < 0.7: #z[i] = 0.3 #OK with beta == 0.2 z[i] = 0.34 #OK with beta == 0.0 #z[i] = 0.35#Fails after 80 timesteps with an error #of the order 1.0e-5 if 0.7 <= x[i]: z[i] = x[i]/3 return z domain.set_quantity('elevation', x_slope) domain.set_quantity('friction', 0) domain.set_quantity('stage', 0.4) #Steady # Boundary conditions (reflective everywhere) Br = Reflective_boundary(domain) domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br}) domain.check_integrity() #domain.visualise = True #If you want to take a sticky beak initial_volume = domain.quantities['stage'].get_integral() initial_xmom = domain.quantities['xmomentum'].get_integral() import copy ref_centroid_values =\ copy.copy(domain.quantities['stage'].centroid_values) #Test limiter by itself domain.distribute_to_vertices_and_edges() #Check that initial limiter doesn't violate cons quan assert allclose (domain.quantities['stage'].get_integral(), initial_volume) #NOTE: This would fail if any initial stage was less than the #corresponding bed elevation - but that is reasonable. #Evolution for t in domain.evolve(yieldstep = 0.05, finaltime = 10.0): volume = domain.quantities['stage'].get_integral() #print t, volume, initial_volume #if not allclose (volume, initial_volume): # print 't==4.05' # for k in range(domain.number_of_elements): # pass # print domain.quantities['stage'].centroid_values[k] -\ # ref_centroid_values[k] assert allclose (volume, initial_volume) os.remove(domain.filename + '.sww') def test_conservation_5(self): """Test that momentum is conserved globally in steady state scenario This one uses a slopy bed, dirichlet and reflective bdries """ from mesh_factory import rectangular from shallow_water import Domain, Reflective_boundary,\ Dirichlet_boundary, Constant_height from Numeric import array #Create basic mesh points, vertices, boundary = rectangular(6, 6) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.smooth = False domain.default_order=2 #IC def x_slope(x, y): return x/3 domain.set_quantity('elevation', x_slope) domain.set_quantity('friction', 0) domain.set_quantity('stage', 0.4) #Steady # Boundary conditions (reflective everywhere) Br = Reflective_boundary(domain) Bleft = Dirichlet_boundary([0.5,0,0]) Bright = Dirichlet_boundary([0.1,0,0]) domain.set_boundary({'left': Bleft, 'right': Bright, 'top': Br, 'bottom': Br}) domain.check_integrity() #domain.visualise = True #If you want to take a sticky beak initial_volume = domain.quantities['stage'].get_integral() initial_xmom = domain.quantities['xmomentum'].get_integral() #Evolution for t in domain.evolve(yieldstep = 0.05, finaltime = 15.0): stage = domain.quantities['stage'].get_integral() xmom = domain.quantities['xmomentum'].get_integral() ymom = domain.quantities['ymomentum'].get_integral() if allclose(t, 6): #Steady state reached steady_xmom = domain.quantities['xmomentum'].get_integral() steady_ymom = domain.quantities['ymomentum'].get_integral() steady_stage = domain.quantities['stage'].get_integral() if t > 6: #print '%.2f %14.8f %14.8f' %(t, ymom, steady_ymom) assert allclose(xmom, steady_xmom) assert allclose(ymom, steady_ymom) assert allclose(stage, steady_stage) os.remove(domain.filename + '.sww') def test_conservation_real(self): """Test that momentum is conserved globally Stephen finally made a test that revealed the problem. This test failed with code prior to 25 July 2005 """ yieldstep = 0.01 finaltime = 0.05 min_depth = 1.0e-2 import sys from os import sep; sys.path.append('..'+sep+'pyvolution') from mesh_factory import rectangular from shallow_water import Domain, Reflective_boundary #Create shallow water domain points, vertices, boundary = rectangular(10, 10, len1=500, len2=500) domain = Domain(points, vertices, boundary) domain.smooth = False domain.visualise = False domain.default_order = 1 domain.minimum_allowed_height = min_depth # Set initial condition class Set_IC: """Set an initial condition with a constant value, for x0self.x0)&(x [attributes]\n \ 0 0.0 0.0 0.0 0.0 0.01 \n \ 1 1.0 0.0 10.0 10.0 0.02 \n \ 2 0.0 1.0 0.0 10.0 0.03 \n \ 3 0.5 0.25 8.0 12.0 0.04 \n \ # Vert att title \n \ elevation \n \ stage \n \ friction \n \ 2 # [] [] \n\ 0 0 3 2 -1 -1 1 dsg\n\ 1 0 1 3 -1 0 -1 ole nielsen\n\ 4 # [boundary tag] \n\ 0 1 0 2 \n\ 1 0 2 3 \n\ 2 2 3 \n\ 3 3 1 1 \n\ 3 0 # [attributes] ...Mesh Vertices... \n \ 0 216.0 -86.0 \n \ 1 160.0 -167.0 \n \ 2 114.0 -91.0 \n \ 3 # [boundary tag] ...Mesh Segments... \n \ 0 0 1 0 \n \ 1 1 2 0 \n \ 2 2 0 0 \n \ 0 # ...Mesh Holes... \n \ 0 # ...Mesh Regions... \n \ 0 # ...Mesh Regions, area... \n\ #Geo reference \n \ 56 \n \ 140 \n \ 120 \n") file.close() tags = {} b1 = Dirichlet_boundary(conserved_quantities = array([0.0])) b2 = Dirichlet_boundary(conserved_quantities = array([1.0])) b3 = Dirichlet_boundary(conserved_quantities = array([2.0])) tags["1"] = b1 tags["2"] = b2 tags["3"] = b3 #from pmesh2domain import pmesh_to_domain_instance #domain = pmesh_to_domain_instance(fileName, Domain) domain = Domain(mesh_filename=fileName) os.remove(fileName) #print "domain.tagged_elements", domain.tagged_elements ## check the quantities #print domain.quantities['elevation'].vertex_values answer = [[0., 8., 0.], [0., 10., 8.]] assert allclose(domain.quantities['elevation'].vertex_values, answer) #print domain.quantities['stage'].vertex_values answer = [[0., 12., 10.], [0., 10., 12.]] assert allclose(domain.quantities['stage'].vertex_values, answer) #print domain.quantities['friction'].vertex_values answer = [[0.01, 0.04, 0.03], [0.01, 0.02, 0.04]] assert allclose(domain.quantities['friction'].vertex_values, answer) #print domain.quantities['friction'].vertex_values assert allclose(domain.tagged_elements['dsg'][0],0) assert allclose(domain.tagged_elements['ole nielsen'][0],1) self.failUnless( domain.boundary[(1, 0)] == '1', "test_tags_to_boundaries failed. Single boundary wasn't added.") self.failUnless( domain.boundary[(1, 2)] == '2', "test_tags_to_boundaries failed. Single boundary wasn't added.") self.failUnless( domain.boundary[(0, 1)] == '3', "test_tags_to_boundaries failed. Single boundary wasn't added.") self.failUnless( domain.boundary[(0, 0)] == 'exterior', "test_tags_to_boundaries failed. Single boundary wasn't added.") #print "domain.boundary",domain.boundary self.failUnless( len(domain.boundary) == 4, "test_pmesh2Domain Too many boundaries") #FIXME change to use get_xllcorner #print "d.geo_reference.xllcorner",domain.geo_reference.xllcorner self.failUnless(domain.geo_reference.xllcorner == 140.0, "bad geo_referece") #************ #------------------------------------------------------------- if __name__ == "__main__": suite = unittest.makeSuite(Test_Shallow_Water,'test') runner = unittest.TextTestRunner() runner.run(suite)