#!/usr/bin/env python import unittest from math import sqrt, pi from shallow_water_domain import * from Numeric import allclose, array, ones, Float class Test_Shallow_Water(unittest.TestCase): def setUp(self): self.points = [0.0, 1.0, 2.0, 3.0] self.vertex_values = [[1.0,2.0],[4.0,5.0],[-1.0,2.0]] def tearDown(self): pass #print " Tearing down" def test_creation(self): domain = Domain(self.points) assert allclose(domain.centroids, [0.5, 1.5, 2.5]) def test_compute_fluxes(self): """ Compare shallow_water_domain flux calculation against a previous Python implementation (defined in this file) """ domain = Domain(self.points) domain.set_quantity('stage',2.0) domain.set_boundary({'exterior' : Reflective_boundary(domain)}) stage_ud, xmom_ud = compute_fluxes_python(domain) domain.compute_fluxes() #print doamin.quantities['xmomentum'].explicit_update #print compute_fluxes_python(domain) assert allclose( domain.quantities['stage'].explicit_update, stage_ud ) assert allclose( domain.quantities['xmomentum'].explicit_update, xmom_ud ) def test_local_flux_function(self): normal = 1.0 ql = array([1.0, 2.0],Float) qr = array([1.0, 2.0],Float) zl = 0.0 zr = 0.0 #This assumes h0 = 1.0e-3!! edgeflux, maxspeed = flux_function(normal, ql,qr,zl,zr) #print maxspeed #print edgeflux assert allclose(array([1.998002, 8.89201198],Float), edgeflux) assert allclose(5.1284971665, maxspeed) normal = -1.0 ql = array([1.0, 2.0],Float) qr = array([1.0, 2.0],Float) zl = 0.0 zr = 0.0 edgeflux, maxspeed = flux_function(normal, ql,qr,zl,zr) #print maxspeed #print edgeflux assert allclose(array([-1.998002, -8.89201198],Float), edgeflux) assert allclose(5.1284971665, maxspeed) def test_domain_flux_function(self): normal = 1.0 ql = array([1.0, 2.0],Float) qr = array([1.0, 2.0],Float) zl = 0.0 zr = 0.0 edgeflux, maxspeed = flux_function(normal, ql,qr,zl,zr) #print edgeflux from shallow_water_domain import flux_function as domain_flux_function domainedgeflux, domainmaxspeed = domain_flux_function(normal, ql,qr,zl,zr) #print domainedgeflux assert allclose(domainedgeflux, edgeflux) assert allclose(domainmaxspeed, maxspeed) #============================================================================== def compute_fluxes_python(domain): """Compute all fluxes and the timestep suitable for all volumes in domain. Compute total flux for each conserved quantity using "flux_function" Fluxes across each edge are scaled by edgelengths and summed up Resulting flux is then scaled by area and stored in explicit_update for each of the three conserved quantities stage, xmomentum and ymomentum The maximal allowable speed computed by the flux_function for each volume is converted to a timestep that must not be exceeded. The minimum of those is computed as the next overall timestep. Post conditions: domain.explicit_update is reset to computed flux values domain.timestep is set to the largest step satisfying all volumes. """ import sys from Numeric import zeros, Float N = domain.number_of_elements tmp0 = zeros((N,),Float) tmp1 = zeros((N,),Float) #Shortcuts Stage = domain.quantities['stage'] Xmom = domain.quantities['xmomentum'] # Ymom = domain.quantities['ymomentum'] Bed = domain.quantities['elevation'] #Arrays #stage = Stage.edge_values #xmom = Xmom.edge_values # ymom = Ymom.edge_values #bed = Bed.edge_values stage = Stage.vertex_values xmom = Xmom.vertex_values bed = Bed.vertex_values #print 'stage edge values', stage #print 'xmom edge values', xmom #print 'bed values', bed stage_bdry = Stage.boundary_values xmom_bdry = Xmom.boundary_values #print 'stage_bdry',stage_bdry #print 'xmom_bdry', xmom_bdry # ymom_bdry = Ymom.boundary_values # flux = zeros(3, Float) #Work array for summing up fluxes flux = zeros(2, Float) #Work array for summing up fluxes ql = zeros(2, Float) qr = zeros(2, Float) #Loop timestep = float(sys.maxint) enter = True for k in range(N): flux[:] = 0. #Reset work array #for i in range(3): for i in range(2): #Quantities inside volume facing neighbour i #ql[0] = stage[k, i] #ql[1] = xmom[k, i] ql = [stage[k, i], xmom[k, i]] zl = bed[k, i] #Quantities at neighbour on nearest face n = domain.neighbours[k,i] if n < 0: m = -n-1 #Convert negative flag to index qr[0] = stage_bdry[m] qr[1] = xmom_bdry[m] zr = zl #Extend bed elevation to boundary else: #m = domain.neighbour_edges[k,i] m = domain.neighbour_vertices[k,i] #print i, ' ' , m #qr = [stage[n, m], xmom[n, m], ymom[n, m]] qr[0] = stage[n, m] qr[1] = xmom[n, m] zr = bed[n, m] #Outward pointing normal vector normal = domain.normals[k, i] #Flux computation using provided function #edgeflux, max_speed = flux_function(normal, ql, qr, zl, zr) #print 'ql',ql #print 'qr',qr edgeflux, max_speed = flux_function(normal, ql, qr, zl, zr) #print 'edgeflux', edgeflux # THIS IS THE LINE TO DEAL WITH LEFT AND RIGHT FLUXES # flux = edgefluxleft - edgefluxright flux -= edgeflux #* domain.edgelengths[k,i] #Update optimal_timestep try: #timestep = min(timestep, 0.5*domain.radii[k]/max_speed) timestep = min(timestep, domain.cfl*0.5*domain.areas[k]/max_speed) except ZeroDivisionError: pass #Normalise by area and store for when all conserved #quantities get updated flux /= domain.areas[k] #Stage.explicit_update[k] = flux[0] tmp0[k] = flux[0] tmp1[k] = flux[1] return tmp0, tmp1 def flux_function(normal, ql, qr, zl, zr): """Compute fluxes between volumes for the shallow water wave equation cast in terms of w = h+z using the 'central scheme' as described in Kurganov, Noelle, Petrova. 'Semidiscrete Central-Upwind Schemes For Hyperbolic Conservation Laws and Hamilton-Jacobi Equations'. Siam J. Sci. Comput. Vol. 23, No. 3, pp. 707-740. The implemented formula is given in equation (3.15) on page 714 Conserved quantities w, uh, are stored as elements 0 and 1 in the numerical vectors ql an qr. Bed elevations zl and zr. """ from config import g, epsilon, h0 from math import sqrt from Numeric import array #print 'ql',ql #Align momentums with x-axis #q_left = rotate(ql, normal, direction = 1) #q_right = rotate(qr, normal, direction = 1) q_left = ql q_left[1] = q_left[1]*normal q_right = qr q_right[1] = q_right[1]*normal #z = (zl+zr)/2 #Take average of field values z = 0.5*(zl+zr) #Take average of field values w_left = q_left[0] #w=h+z h_left = w_left-z uh_left = q_left[1] if h_left < epsilon: u_left = 0.0 #Could have been negative h_left = 0.0 else: u_left = uh_left/(h_left + h0/h_left) uh_left = u_left*h_left w_right = q_right[0] #w=h+z h_right = w_right-z uh_right = q_right[1] if h_right < epsilon: u_right = 0.0 #Could have been negative h_right = 0.0 else: u_right = uh_right/(h_right + h0/h_right) uh_right = u_right*h_right #vh_left = q_left[2] #vh_right = q_right[2] #print h_right #print u_right #print h_left #print u_right soundspeed_left = sqrt(g*h_left) soundspeed_right = sqrt(g*h_right) #Maximal wave speed s_max = max(u_left+soundspeed_left, u_right+soundspeed_right, 0) #Minimal wave speed s_min = min(u_left-soundspeed_left, u_right-soundspeed_right, 0) #Flux computation #flux_left = array([u_left*h_left, # u_left*uh_left + 0.5*g*h_left**2]) #flux_right = array([u_right*h_right, # u_right*uh_right + 0.5*g*h_right**2]) flux_left = array([u_left*h_left, u_left*uh_left + 0.5*g*h_left*h_left]) flux_right = array([u_right*h_right, u_right*uh_right + 0.5*g*h_right*h_right]) denom = s_max-s_min if denom == 0.0: edgeflux = array([0.0, 0.0]) max_speed = 0.0 else: edgeflux = (s_max*flux_left - s_min*flux_right)/denom edgeflux += s_max*s_min*(q_right-q_left)/denom edgeflux[1] = edgeflux[1]*normal max_speed = max(abs(s_max), abs(s_min)) return edgeflux, max_speed #------------------------------------------------------------- if __name__ == "__main__": suite = unittest.makeSuite(Test_Shallow_Water, 'test') #suite = unittest.makeSuite(Test_Quantity, 'test_set_values_from_file_using_polygon') #suite = unittest.makeSuite(Test_Quantity, 'test_set_vertex_values_using_general_interface_with_subset') #print "restricted test" #suite = unittest.makeSuite(Test_Quantity,'verbose_test_set_values_from_UTM_pts') runner = unittest.TextTestRunner() runner.run(suite)