source: inundation/ga/storm_surge/pyvolution/test_interpolate_sww.py @ 994

Last change on this file since 994 was 907, checked in by ole, 20 years ago

Addressed problem with artificial momentum generated by discontinuous water depths in the presence of steep slopes.
Now very shallow water is limited with a separate h-limiter controlled by beta_h
(see config.py) for details.

File size: 6.9 KB
Line 
1#!/usr/bin/env python
2#
3"""
4Testing interpolate_sww, based on test_data_manageer, so there maybe code
5that isn't needed, eg in the setup file
6"""
7
8import unittest
9from Numeric import zeros, array, allclose, Float
10from util import mean
11
12from interpolate_sww import *
13from shallow_water import *
14from data_manager import *
15#from config import epsilon
16
17
18class dataTestCase(unittest.TestCase):
19    def setUp(self):
20        import time
21        from mesh_factory import rectangular
22
23
24        #Create basic mesh
25        points, vertices, boundary = rectangular(2, 2)
26
27        #Create shallow water domain
28        domain = Domain(points, vertices, boundary)
29        domain.default_order=2
30        domain.beta_h = 0
31
32       
33        #Set some field values
34        domain.set_quantity('elevation', lambda x,y: -x)       
35        domain.set_quantity('friction', 0.03)
36
37
38        ######################
39        # Boundary conditions
40        B = Transmissive_boundary(domain)
41        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
42       
43
44        ######################
45        #Initial condition - with jumps
46
47        bed = domain.quantities['elevation'].vertex_values
48        stage = zeros(bed.shape, Float)
49
50        h = 0.3
51        for i in range(stage.shape[0]):
52            if i % 2 == 0:           
53                stage[i,:] = bed[i,:] + h
54            else:
55                stage[i,:] = bed[i,:]
56               
57        domain.set_quantity('stage', stage)
58
59        domain.distribute_to_vertices_and_edges()   
60
61
62        self.domain = domain
63
64        C = domain.get_vertex_coordinates()
65        self.X = C[:,0:6:2].copy()
66        self.Y = C[:,1:6:2].copy()
67       
68        self.F = bed
69
70       
71    def tearDown(self):
72        pass
73
74    def test_sww_DSG(self):
75        """Not a test, rather a look at the sww format
76        """
77
78        import time, os
79        from Numeric import array, zeros, allclose, Float, concatenate
80        from Scientific.IO.NetCDF import NetCDFFile       
81
82        self.domain.filename = 'datatest' + str(time.time())
83        self.domain.format = 'sww'
84        self.domain.smooth = True
85        self.domain.reduction = mean             
86     
87        sww = get_dataobject(self.domain)
88        sww.store_connectivity()
89        sww.store_timestep('stage')
90        self.domain.time = 2. 
91        sww.store_timestep('stage')
92
93        #Check contents
94        #Get NetCDF
95        fid = NetCDFFile(sww.filename, 'r') 
96     
97        # Get the variables
98        x = fid.variables['x']
99        y = fid.variables['y']
100        z = fid.variables['elevation']
101
102        volumes = fid.variables['volumes']   
103        time = fid.variables['time']
104
105        # 2D
106        stage = fid.variables['stage']       
107
108        X = x[:]
109        Y = y[:]
110        Z = z[:]
111        V = volumes[:]
112        T = time[:]
113        S = stage[:,:]
114
115        if False:
116            print "****************************"
117            print "X ",X
118            print "****************************"
119            print "Y ",Y
120            print "****************************"
121            print "Z ",Z
122            print "****************************"
123            print "V ",V
124            print "****************************"
125            print "Time ",T
126            print "****************************"
127            print "Stage ",S
128            print "****************************"
129           
130
131       
132       
133        fid.close()
134     
135        #Cleanup
136        os.remove(sww.filename)
137 
138    def test_interpolate_sww(self):
139        """Not reaa unit test, rather a system test for
140        """
141
142        import time, os
143        from Numeric import array, zeros, allclose, Float, concatenate, \
144             transpose
145        from Scientific.IO.NetCDF import NetCDFFile   
146        import tempfile
147        from load_mesh.loadASCII import  load_points_file, \
148             concatinate_attributelist
149
150        self.domain.filename = 'datatest' + str(time.time())
151        self.domain.format = 'sww'
152        self.domain.smooth = True
153        self.domain.reduction = mean             
154     
155        sww = get_dataobject(self.domain)
156        sww.store_connectivity()
157        sww.store_timestep('stage')
158        self.domain.time = 2. 
159        sww.store_timestep('stage')
160
161        #print "self.domain.filename",self.domain.filename
162        interp = Interpolate_sww(sww.filename, 'depth')
163       
164        assert allclose(interp.time,[0.0,2.0])
165
166        #answer = [ 0.15, 0.1, 0., -0.3, -0.35, -0.4, -0.7, -0.8, -0.850]
167        #print "answer",answer
168        #print interp.stage[0]
169        #stage_t = transpose(interp.stage)
170        #assert allclose(stage_t[0], answer)
171        #assert allclose(stage_t[1],stage_t[0])
172
173        # create an .xya file
174        point_file = tempfile.mktemp(".xya")
175        fd = open(point_file,'w')
176        fd.write("colour, thickness \n 0.0, 0.6,2.,4 \n 0.0, 0.9,4,8 \n 0.0,0.1,4.,8 \n 0.4,1.0,4.,8 \n")
177        fd.close()
178
179        interp.interpolate_xya(point_file)
180
181        answer = [[0.08, 0.08], [0.02, 0.02], [0.14, 0.14], [.08,.08]]
182        #print "answer",answer
183        assert allclose(interp.interpolated_quantity,answer)
184
185        # create an output .xya file
186        point_file_out = tempfile.mktemp(".xya")
187        interp.write_depth_xya(point_file_out)
188       
189        #check the output file
190        xya_dict = load_points_file(point_file_out)
191        #print "xya_dict", xya_dict
192        title_list,point_attributes = concatinate_attributelist(xya_dict['attributelist']) 
193        assert allclose(interp.point_coordinates, xya_dict['pointlist'])
194        assert allclose(interp.interpolated_quantity,
195                        point_attributes )
196
197        # FIXME -remove
198        #title = xya_dict['title'].split('\n')
199        #print "title",title
200        #string_list = title[0].split(',') # assume a title has only one line
201        #time_list = []
202
203        # Try another quantity     
204        interp = Interpolate_sww(sww.filename, 'stage')
205        interp.interpolate_xya(point_file)
206
207        answer = [[0.08, 0.08], [0.02, 0.02], [0.14, 0.14], [-.32,-.32]]
208        #print "answer",answer
209        assert allclose(interp.interpolated_quantity,answer)
210
211       
212        # look at error catching
213        try:
214            interp = Interpolate_sww(sww.filename, 'funky!')
215        except KeyError:
216            pass
217        else:
218            self.failUnless(0==1,
219                        'bad key did not raise an error!')
220       
221        # look at error catching
222        try:
223            interp = Interpolate_sww(sww.filename, 'elevation')
224        except KeyError:
225            pass
226        else:
227            self.failUnless(0==1,
228                        'bad key did not raise an error!')
229               
230        #Cleanup
231        os.remove(sww.filename)
232        os.remove(point_file_out)
233        os.remove(point_file)
234 
235#-------------------------------------------------------------
236if __name__ == "__main__":
237    suite = unittest.makeSuite(dataTestCase,'test')
238    runner = unittest.TextTestRunner()
239    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.