#!/usr/bin/env python import unittest from math import sqrt, pi from shallow_water_vel_domain import * from Numeric import allclose, array, ones, Float, maximum, zeros class Test_Shallow_Water(unittest.TestCase): def setUp(self): self.points = [0.0, 1.0, 2.0, 3.0] self.vertex_values = [[1.0,2.0],[4.0,5.0],[-1.0,2.0]] self.points2 = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0] def tearDown(self): pass #print " Tearing down" def test_creation(self): domain = Domain(self.points) assert allclose(domain.centroids, [0.5, 1.5, 2.5]) def test_reflective_boundary(self): """ Test setting reflective boundary """ domain = Domain(self.points) domain.set_quantity('stage',2.0) domain.set_quantity('xmomentum',6.0) domain.set_boundary({'exterior' : Reflective_boundary(domain)}) domain.distribute_to_vertices_and_edges() domain.update_boundary() ## print 'In test reflective' ## print domain.quantities['stage'].vertex_values ## print domain.quantities['xmomentum'].vertex_values ## print domain.quantities['elevation'].vertex_values ## print domain.quantities['height'].vertex_values ## print domain.quantities['velocity'].vertex_values ## print domain.quantities['stage'].boundary_values ## print domain.quantities['xmomentum'].boundary_values ## print domain.quantities['elevation'].boundary_values ## print domain.quantities['height'].boundary_values ## print domain.quantities['velocity'].boundary_values assert allclose( domain.quantities['stage' ].boundary_values, [2.0, 2.0]) assert allclose( domain.quantities['xmomentum'].boundary_values, [-6.0, -6.0]) assert allclose( domain.quantities['elevation'].boundary_values, [0.0, 0.0]) assert allclose( domain.quantities['height' ].boundary_values, [2.0, 2.0]) assert allclose( domain.quantities['velocity' ].boundary_values, [-3.0, -3.0]) def test_dirichlet_boundary(self): """ Test setting dirichlet boundary """ domain = Domain(self.points) domain.set_quantity('stage',2.0) domain.set_quantity('xmomentum',6.0) domain.set_boundary({'exterior' : Dirichlet_boundary([3.0, 8.0, 1.0, 2.0, 4.0])}) domain.distribute_to_vertices_and_edges() domain.update_boundary() ## print 'In test dirichlet' ## print domain.quantities['stage'].vertex_values ## print domain.quantities['xmomentum'].vertex_values ## print domain.quantities['elevation'].vertex_values ## print domain.quantities['height'].vertex_values ## print domain.quantities['velocity'].vertex_values ## print domain.quantities['stage'].boundary_values ## print domain.quantities['xmomentum'].boundary_values ## print domain.quantities['elevation'].boundary_values ## print domain.quantities['height'].boundary_values ## print domain.quantities['velocity'].boundary_values assert allclose( domain.quantities['stage' ].boundary_values, [3.0, 3.0]) assert allclose( domain.quantities['xmomentum'].boundary_values, [8.0, 8.0]) assert allclose( domain.quantities['elevation'].boundary_values, [1.0, 1.0]) assert allclose( domain.quantities['height' ].boundary_values, [2.0, 2.0]) assert allclose( domain.quantities['velocity' ].boundary_values, [4.0, 4.0]) def test_compute_fluxes(self): """ Compare shallow_water_domain flux calculation against a previous Python implementation (defined in this file) """ domain = Domain(self.points) domain.set_quantity('stage',2.0) domain.set_boundary({'exterior' : Dirichlet_boundary([0.0, 0.0, 0.0, 0.0, 0.0])}) domain.distribute_to_vertices_and_edges() domain.update_boundary() stage_ud, xmom_ud = local_compute_fluxes(domain) domain.compute_fluxes() # print domain.quantities['stage'].vertex_values # print domain.quantities['xmomentum'].vertex_values # print domain.quantities['elevation'].vertex_values # print domain.quantities['height'].vertex_values # print domain.quantities['velocity'].vertex_values # # print domain.quantities['stage'].boundary_values # print domain.quantities['xmomentum'].boundary_values # print domain.quantities['elevation'].boundary_values # print domain.quantities['height'].boundary_values # print domain.quantities['velocity'].boundary_values print domain.quantities['stage'].explicit_update print domain.quantities['xmomentum'].explicit_update print stage_ud print xmom_ud assert allclose( domain.quantities['stage'].explicit_update, stage_ud ) assert allclose( domain.quantities['xmomentum'].explicit_update, xmom_ud ) def test_local_flux_function(self): normal = 1.0 ql = array([1.0, 2.0],Float) qr = array([1.0, 2.0],Float) zl = 0.0 zr = 0.0 #This assumes h0 = 1.0e-3!! edgeflux, maxspeed = local_flux_function(normal, ql,qr,zl,zr) #print maxspeed #print edgeflux assert allclose(array([2.0, 8.9],Float), edgeflux, rtol=1.0e-005) assert allclose(5.1305, maxspeed, rtol=1.0e-005) normal = -1.0 ql = array([1.0, 2.0],Float) qr = array([1.0, 2.0],Float) zl = 0.0 zr = 0.0 edgeflux, maxspeed = local_flux_function(normal, ql,qr,zl,zr) #print maxspeed #print edgeflux assert allclose(array([-2.0, -8.9],Float), edgeflux, rtol=1.0e-005) assert allclose(5.1305, maxspeed, rtol=1.0e-005) def test_gravity(self): """ Compare shallow_water_domain gravity calculation """ def slope_one(x): return x domain = Domain(self.points) domain.set_quantity('stage',4.0) domain.set_quantity('elevation',slope_one) domain.set_boundary({'exterior' : Reflective_boundary(domain)}) domain.distribute_to_vertices_and_edges() domain.update_boundary() gravity(domain) #print domain.quantities['stage'].vertex_values #print domain.quantities['elevation'].vertex_values #print domain.quantities['xmomentum'].explicit_update assert allclose( array([-34.3, -24.5, -14.7], Float), domain.quantities['xmomentum'].explicit_update ) def test_evolve_first_order(self): """ Compare still lake solution for various versions of shallow_water_domain """ def slope_square(x): return maximum(4.0-(x-5.0)*(x-5.0), 0.0) domain = Domain(self.points2) domain.set_quantity('stage',10.0) domain.set_quantity('elevation',slope_square) domain.set_boundary({'exterior' : Reflective_boundary(domain)}) domain.default_order = 1 domain.set_timestepping_method('euler') yieldstep=0.25 finaltime=1.0 for t in domain.evolve(yieldstep=yieldstep, finaltime=finaltime): pass ## print domain.quantities['stage'].vertex_values ## print domain.quantities['elevation'].vertex_values ## print domain.quantities['xmomentum'].vertex_values ## ## ## print domain.quantities['stage'].centroid_values ## print domain.quantities['elevation'].centroid_values ## print domain.quantities['xmomentum'].centroid_values #assert allclose( 10.0*ones(10), domain.quantities['stage'].centroid_values ) #assert allclose( zeros(10), domain.quantities['xmomentum'].centroid_values ) ## def test_evolve_euler_second_order_space(self): ## """ ## Compare still lake solution for various versions of shallow_water_domain ## """ ## def slope_square(x): ## return maximum(4.0-(x-5.0)*(x-5.0), 0.0) ## domain = Domain(self.points2) ## domain.set_quantity('stage',10.0) ## domain.set_quantity('elevation',slope_square) ## domain.set_boundary({'exterior' : Reflective_boundary(domain)}) ## domain.default_order = 2 ## domain.set_timestepping_method('rk2') ## yieldstep=1.0 ## finaltime=1.0 ## for t in domain.evolve(yieldstep=yieldstep, finaltime=finaltime): ## pass ## assert allclose( 10.0*ones(10), domain.quantities['stage'].centroid_values ) ## assert allclose( zeros(10), domain.quantities['xmomentum'].centroid_values ) ## def test_evolve_second_order_space_time(self): ## """ ## Compare still lake solution for various versions of shallow_water_domain ## """ ## def slope_square(x): ## return maximum(4.0-(x-5.0)*(x-5.0), 0.0) ## domain = Domain(self.points2) ## domain.set_quantity('stage',10.0) ## domain.set_quantity('elevation',slope_square) ## domain.set_boundary({'exterior' : Reflective_boundary(domain)}) ## domain.default_order = 2 ## domain.set_timestepping_method('rk3') ## yieldstep=1.0 ## finaltime=1.0 ## for t in domain.evolve(yieldstep=yieldstep, finaltime=finaltime): ## pass ## assert allclose( 10.0*ones(10), domain.quantities['stage'].centroid_values ) ## assert allclose( zeros(10), domain.quantities['xmomentum'].centroid_values ) #============================================================================== def local_compute_fluxes(domain): """Compute all fluxes and the timestep suitable for all volumes in domain. Compute total flux for each conserved quantity using "flux_function" Fluxes across each edge are scaled by edgelengths and summed up Resulting flux is then scaled by area and stored in explicit_update for each of the three conserved quantities stage, xmomentum and ymomentum The maximal allowable speed computed by the flux_function for each volume is converted to a timestep that must not be exceeded. The minimum of those is computed as the next overall timestep. Post conditions: domain.explicit_update is reset to computed flux values domain.timestep is set to the largest step satisfying all volumes. """ import sys from Numeric import zeros, Float N = domain.number_of_elements tmp0 = zeros(N,Float) tmp1 = zeros(N,Float) #Shortcuts Stage = domain.quantities['stage'] Xmom = domain.quantities['xmomentum'] # Ymom = domain.quantities['ymomentum'] Bed = domain.quantities['elevation'] #Arrays #stage = Stage.edge_values #xmom = Xmom.edge_values # ymom = Ymom.edge_values #bed = Bed.edge_values stage = Stage.vertex_values xmom = Xmom.vertex_values bed = Bed.vertex_values #print 'stage edge values', stage #print 'xmom edge values', xmom #print 'bed values', bed stage_bdry = Stage.boundary_values xmom_bdry = Xmom.boundary_values #print 'stage_bdry',stage_bdry #print 'xmom_bdry', xmom_bdry # ymom_bdry = Ymom.boundary_values # flux = zeros(3, Float) #Work array for summing up fluxes flux = zeros(2, Float) #Work array for summing up fluxes ql = zeros(2, Float) qr = zeros(2, Float) #Loop timestep = float(sys.maxint) enter = True for k in range(N): flux[:,] = 0. #Reset work array #for i in range(3): for i in range(2): #Quantities inside volume facing neighbour i #ql[0] = stage[k, i] #ql[1] = xmom[k, i] ql = [stage[k, i], xmom[k, i]] zl = bed[k, i] #Quantities at neighbour on nearest face n = domain.neighbours[k,i] if n < 0: m = -n-1 #Convert negative flag to index qr[0] = stage_bdry[m] qr[1] = xmom_bdry[m] zr = zl #Extend bed elevation to boundary else: #m = domain.neighbour_edges[k,i] m = domain.neighbour_vertices[k,i] #print i, ' ' , m #qr = [stage[n, m], xmom[n, m], ymom[n, m]] qr[0] = stage[n, m] qr[1] = xmom[n, m] zr = bed[n, m] #Outward pointing normal vector normal = domain.normals[k, i] #Flux computation using provided function #edgeflux, max_speed = flux_function(normal, ql, qr, zl, zr) #print 'ql',ql #print 'qr',qr edgeflux, max_speed = local_flux_function(normal, ql, qr, zl, zr) #print 'edgeflux', edgeflux # THIS IS THE LINE TO DEAL WITH LEFT AND RIGHT FLUXES # flux = edgefluxleft - edgefluxright flux -= edgeflux #* domain.edgelengths[k,i] #Update optimal_timestep try: #timestep = min(timestep, 0.5*domain.radii[k]/max_speed) timestep = min(timestep, domain.CFL*0.5*domain.areas[k]/max_speed) except ZeroDivisionError: pass #Normalise by area and store for when all conserved #quantities get updated flux /= domain.areas[k] #Stage.explicit_update[k] = flux[0] tmp0[k] = flux[0] tmp1[k] = flux[1] return tmp0, tmp1 def local_flux_function(normal, ql, qr, zl, zr): """Compute fluxes between volumes for the shallow water wave equation cast in terms of w = h+z using the 'central scheme' as described in Kurganov, Noelle, Petrova. 'Semidiscrete Central-Upwind Schemes For Hyperbolic Conservation Laws and Hamilton-Jacobi Equations'. Siam J. Sci. Comput. Vol. 23, No. 3, pp. 707-740. The implemented formula is given in equation (3.15) on page 714 Conserved quantities w, uh, are stored as elements 0 and 1 in the numerical vectors ql an qr. Bed elevations zl and zr. """ from config import g, epsilon, h0 from math import sqrt from Numeric import array #print 'ql',ql #Align momentums with x-axis #q_left = rotate(ql, normal, direction = 1) #q_right = rotate(qr, normal, direction = 1) q_left = ql q_left[1] = q_left[1]*normal q_right = qr q_right[1] = q_right[1]*normal #z = (zl+zr)/2 #Take average of field values z = 0.5*(zl+zr) #Take average of field values w_left = q_left[0] #w=h+z h_left = w_left-z uh_left = q_left[1] if h_left < epsilon: u_left = 0.0 #Could have been negative h_left = 0.0 else: u_left = uh_left/(h_left + h0/h_left) uh_left = u_left*h_left w_right = q_right[0] #w=h+z h_right = w_right-z uh_right = q_right[1] if h_right < epsilon: u_right = 0.0 #Could have been negative h_right = 0.0 else: u_right = uh_right/(h_right + h0/h_right) uh_right = u_right*h_right #vh_left = q_left[2] #vh_right = q_right[2] #print h_right #print u_right #print h_left #print u_right soundspeed_left = sqrt(g*h_left) soundspeed_right = sqrt(g*h_right) #Maximal wave speed s_max = max(u_left+soundspeed_left, u_right+soundspeed_right, 0) #Minimal wave speed s_min = min(u_left-soundspeed_left, u_right-soundspeed_right, 0) #Flux computation #flux_left = array([u_left*h_left, # u_left*uh_left + 0.5*g*h_left**2]) #flux_right = array([u_right*h_right, # u_right*uh_right + 0.5*g*h_right**2]) flux_left = array([u_left*h_left, u_left*uh_left + 0.5*g*h_left*h_left]) flux_right = array([u_right*h_right, u_right*uh_right + 0.5*g*h_right*h_right]) denom = s_max-s_min if denom == 0.0: edgeflux = array([0.0, 0.0]) max_speed = 0.0 else: edgeflux = (s_max*flux_left - s_min*flux_right)/denom edgeflux += s_max*s_min*(q_right-q_left)/denom edgeflux[1] = edgeflux[1]*normal max_speed = max(abs(s_max), abs(s_min)) return edgeflux, max_speed #------------------------------------------------------------- if __name__ == "__main__": suite = unittest.makeSuite(Test_Shallow_Water, 'test') #suite = unittest.makeSuite(Test_Quantity, 'test_set_values_from_file_using_polygon') #suite = unittest.makeSuite(Test_Quantity, 'test_set_vertex_values_using_general_interface_with_subset') #print "restricted test" #suite = unittest.makeSuite(Test_Quantity,'verbose_test_set_values_from_UTM_pts') runner = unittest.TextTestRunner() runner.run(suite)