source: anuga_work/development/anuga_1d/test_shallow_water.py @ 5536

Last change on this file since 5536 was 5536, checked in by steve, 16 years ago

Added unit test files for quantity and shallow water

File size: 5.4 KB
Line 
1#!/usr/bin/env python
2
3import unittest
4from math import sqrt, pi
5
6
7from shallow_water_domain import *
8from Numeric import allclose, array, ones, Float
9
10
11class Test_Shallow_Water(unittest.TestCase):
12    def setUp(self):
13        self.points1 = [0.0, 1.0, 2.0, 3.0]
14        self.vertex_values = [[1.0,2.0],[4.0,5.0],[-1.0,2.0]]
15
16
17        pass
18       
19    def tearDown(self):
20        pass
21        #print "  Tearing down"
22
23
24    def test_creation(self):
25
26
27        D1 = Domain(self.points1)
28
29        assert allclose(D1.centroids, [0.5, 1.5, 2.5])
30
31
32    def test_compute_fluxes(self):
33       
34
35        D1 = Domain(self.points1)
36        D1.set_quantity('stage',2.0)
37        D1.set_boundary({'exterior' : Reflective_boundary(D1)})
38
39        stage_ud, xmom_ud = compute_fluxes_python(D1)
40       
41        D1.compute_fluxes()
42
43        #print D1.quantities['xmomentum'].explicit_update
44        #print compute_fluxes_python(D1)
45
46        assert allclose( D1.quantities['stage'].explicit_update, stage_ud )
47        assert allclose( D1.quantities['xmomentum'].explicit_update, xmom_ud )
48
49
50#==============================================================================
51
52def compute_fluxes_python(domain):
53    """Compute all fluxes and the timestep suitable for all volumes
54    in domain.
55
56    Compute total flux for each conserved quantity using "flux_function"
57
58    Fluxes across each edge are scaled by edgelengths and summed up
59    Resulting flux is then scaled by area and stored in
60    explicit_update for each of the three conserved quantities
61    stage, xmomentum and ymomentum
62
63    The maximal allowable speed computed by the flux_function for each volume
64    is converted to a timestep that must not be exceeded. The minimum of
65    those is computed as the next overall timestep.
66
67    Post conditions:
68      domain.explicit_update is reset to computed flux values
69      domain.timestep is set to the largest step satisfying all volumes.
70    """
71
72    import sys
73    from Numeric import zeros, Float
74
75    N = domain.number_of_elements
76
77    tmp0 = zeros((N,),Float)
78    tmp1 = zeros((N,),Float)
79
80    #Shortcuts
81    Stage = domain.quantities['stage']
82    Xmom = domain.quantities['xmomentum']
83#    Ymom = domain.quantities['ymomentum']
84    Bed = domain.quantities['elevation']
85
86    #Arrays
87    #stage = Stage.edge_values
88    #xmom =  Xmom.edge_values
89 #   ymom =  Ymom.edge_values
90    #bed =   Bed.edge_values
91   
92    stage = Stage.vertex_values
93    xmom =  Xmom.vertex_values
94    bed =   Bed.vertex_values
95   
96    #print 'stage edge values', stage
97    #print 'xmom edge values', xmom
98    #print 'bed values', bed
99
100    stage_bdry = Stage.boundary_values
101    xmom_bdry =  Xmom.boundary_values
102    #print 'stage_bdry',stage_bdry
103    #print 'xmom_bdry', xmom_bdry
104#    ymom_bdry =  Ymom.boundary_values
105
106#    flux = zeros(3, Float) #Work array for summing up fluxes
107    flux = zeros(2, Float) #Work array for summing up fluxes
108    ql = zeros(2, Float)
109    qr = zeros(2, Float)
110
111    #Loop
112    timestep = float(sys.maxint)
113    enter = True
114    for k in range(N):
115
116        flux[:] = 0.  #Reset work array
117        #for i in range(3):
118        for i in range(2):
119            #Quantities inside volume facing neighbour i
120            #ql[0] = stage[k, i]
121            #ql[1] = xmom[k, i]
122            ql = [stage[k, i], xmom[k, i]]
123            zl = bed[k, i]
124
125            #Quantities at neighbour on nearest face
126            n = domain.neighbours[k,i]
127            if n < 0:
128                m = -n-1 #Convert negative flag to index
129                qr[0] = stage_bdry[m]
130                qr[1] = xmom_bdry[m]
131                zr = zl #Extend bed elevation to boundary
132            else:
133                #m = domain.neighbour_edges[k,i]
134                m = domain.neighbour_vertices[k,i]
135                #qr = [stage[n, m], xmom[n, m], ymom[n, m]]
136                qr[0] = stage[n, m]
137                qr[1] = xmom[n, m]
138                zr = bed[n, m]
139
140
141            #Outward pointing normal vector
142            normal = domain.normals[k, i]
143       
144            #Flux computation using provided function
145            #edgeflux, max_speed = flux_function(normal, ql, qr, zl, zr)
146            #print 'ql',ql
147            #print 'qr',qr
148           
149
150            edgeflux, max_speed = flux_function(normal, ql, qr, zl, zr)
151
152            #print 'edgeflux', edgeflux
153
154            # THIS IS THE LINE TO DEAL WITH LEFT AND RIGHT FLUXES
155            # flux = edgefluxleft - edgefluxright
156            flux -= edgeflux #* domain.edgelengths[k,i]
157            #Update optimal_timestep
158            try:
159                #timestep = min(timestep, 0.5*domain.radii[k]/max_speed)
160                timestep = min(timestep, domain.cfl*0.5*domain.areas[k]/max_speed)
161            except ZeroDivisionError:
162                pass
163
164        #Normalise by area and store for when all conserved
165        #quantities get updated
166        flux /= domain.areas[k]
167
168        #Stage.explicit_update[k] = flux[0]
169        tmp0[k] = flux[0]
170        tmp1[k] = flux[1]
171
172
173    return tmp0, tmp1
174
175
176
177#-------------------------------------------------------------
178if __name__ == "__main__":
179    suite = unittest.makeSuite(Test_Shallow_Water, 'test')   
180    #suite = unittest.makeSuite(Test_Quantity, 'test_set_values_from_file_using_polygon')
181
182    #suite = unittest.makeSuite(Test_Quantity, 'test_set_vertex_values_using_general_interface_with_subset')
183    #print "restricted test"
184    #suite = unittest.makeSuite(Test_Quantity,'verbose_test_set_values_from_UTM_pts')
185    runner = unittest.TextTestRunner()
186    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.