source: anuga_work/development/anuga_1d/test_shallow_water.py @ 5562

Last change on this file since 5562 was 5538, checked in by steve, 16 years ago
File size: 5.6 KB
Line 
1#!/usr/bin/env python
2
3import unittest
4from math import sqrt, pi
5
6
7from shallow_water_domain import *
8from Numeric import allclose, array, ones, Float
9
10
11class Test_Shallow_Water(unittest.TestCase):
12    def setUp(self):
13        self.points = [0.0, 1.0, 2.0, 3.0]
14        self.vertex_values = [[1.0,2.0],[4.0,5.0],[-1.0,2.0]]
15       
16    def tearDown(self):
17        pass
18        #print "  Tearing down"
19
20
21    def test_creation(self):
22        domain = Domain(self.points)
23        assert allclose(domain.centroids, [0.5, 1.5, 2.5])
24
25    def test_compute_fluxes(self):
26        """
27        Compare shallow_water_domain flux calculation against a previous
28        Python implementation (defined in this file)
29        """
30        domain = Domain(self.points)
31        domain.set_quantity('stage',2.0)
32        domain.set_boundary({'exterior' : Reflective_boundary(domain)})
33
34        stage_ud, xmom_ud = compute_fluxes_python(domain)
35       
36        domain.compute_fluxes()
37
38        #print doamin.quantities['xmomentum'].explicit_update
39        #print compute_fluxes_python(domain)
40
41        assert allclose( domain.quantities['stage'].explicit_update, stage_ud )
42        assert allclose( domain.quantities['xmomentum'].explicit_update, xmom_ud )
43
44
45#==============================================================================
46
47def compute_fluxes_python(domain):
48    """Compute all fluxes and the timestep suitable for all volumes
49    in domain.
50
51    Compute total flux for each conserved quantity using "flux_function"
52
53    Fluxes across each edge are scaled by edgelengths and summed up
54    Resulting flux is then scaled by area and stored in
55    explicit_update for each of the three conserved quantities
56    stage, xmomentum and ymomentum
57
58    The maximal allowable speed computed by the flux_function for each volume
59    is converted to a timestep that must not be exceeded. The minimum of
60    those is computed as the next overall timestep.
61
62    Post conditions:
63      domain.explicit_update is reset to computed flux values
64      domain.timestep is set to the largest step satisfying all volumes.
65    """
66
67    import sys
68    from Numeric import zeros, Float
69
70    N = domain.number_of_elements
71
72    tmp0 = zeros((N,),Float)
73    tmp1 = zeros((N,),Float)
74
75    #Shortcuts
76    Stage = domain.quantities['stage']
77    Xmom = domain.quantities['xmomentum']
78#    Ymom = domain.quantities['ymomentum']
79    Bed = domain.quantities['elevation']
80
81    #Arrays
82    #stage = Stage.edge_values
83    #xmom =  Xmom.edge_values
84 #   ymom =  Ymom.edge_values
85    #bed =   Bed.edge_values
86   
87    stage = Stage.vertex_values
88    xmom =  Xmom.vertex_values
89    bed =   Bed.vertex_values
90   
91    #print 'stage edge values', stage
92    #print 'xmom edge values', xmom
93    #print 'bed values', bed
94
95    stage_bdry = Stage.boundary_values
96    xmom_bdry =  Xmom.boundary_values
97    #print 'stage_bdry',stage_bdry
98    #print 'xmom_bdry', xmom_bdry
99#    ymom_bdry =  Ymom.boundary_values
100
101#    flux = zeros(3, Float) #Work array for summing up fluxes
102    flux = zeros(2, Float) #Work array for summing up fluxes
103    ql = zeros(2, Float)
104    qr = zeros(2, Float)
105
106    #Loop
107    timestep = float(sys.maxint)
108    enter = True
109    for k in range(N):
110
111        flux[:] = 0.  #Reset work array
112        #for i in range(3):
113        for i in range(2):
114            #Quantities inside volume facing neighbour i
115            #ql[0] = stage[k, i]
116            #ql[1] = xmom[k, i]
117            ql = [stage[k, i], xmom[k, i]]
118            zl = bed[k, i]
119
120            #Quantities at neighbour on nearest face
121            n = domain.neighbours[k,i]
122            if n < 0:
123                m = -n-1 #Convert negative flag to index
124                qr[0] = stage_bdry[m]
125                qr[1] = xmom_bdry[m]
126                zr = zl #Extend bed elevation to boundary
127            else:
128                #m = domain.neighbour_edges[k,i]
129                m = domain.neighbour_vertices[k,i]
130                #qr = [stage[n, m], xmom[n, m], ymom[n, m]]
131                qr[0] = stage[n, m]
132                qr[1] = xmom[n, m]
133                zr = bed[n, m]
134
135
136            #Outward pointing normal vector
137            normal = domain.normals[k, i]
138       
139            #Flux computation using provided function
140            #edgeflux, max_speed = flux_function(normal, ql, qr, zl, zr)
141            #print 'ql',ql
142            #print 'qr',qr
143           
144
145            edgeflux, max_speed = flux_function(normal, ql, qr, zl, zr)
146
147            #print 'edgeflux', edgeflux
148
149            # THIS IS THE LINE TO DEAL WITH LEFT AND RIGHT FLUXES
150            # flux = edgefluxleft - edgefluxright
151            flux -= edgeflux #* domain.edgelengths[k,i]
152            #Update optimal_timestep
153            try:
154                #timestep = min(timestep, 0.5*domain.radii[k]/max_speed)
155                timestep = min(timestep, domain.cfl*0.5*domain.areas[k]/max_speed)
156            except ZeroDivisionError:
157                pass
158
159        #Normalise by area and store for when all conserved
160        #quantities get updated
161        flux /= domain.areas[k]
162
163        #Stage.explicit_update[k] = flux[0]
164        tmp0[k] = flux[0]
165        tmp1[k] = flux[1]
166
167
168    return tmp0, tmp1
169
170
171
172#-------------------------------------------------------------
173if __name__ == "__main__":
174    suite = unittest.makeSuite(Test_Shallow_Water, 'test')   
175    #suite = unittest.makeSuite(Test_Quantity, 'test_set_values_from_file_using_polygon')
176
177    #suite = unittest.makeSuite(Test_Quantity, 'test_set_vertex_values_using_general_interface_with_subset')
178    #print "restricted test"
179    #suite = unittest.makeSuite(Test_Quantity,'verbose_test_set_values_from_UTM_pts')
180    runner = unittest.TextTestRunner()
181    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.