source: inundation/ga/storm_surge/pyvolution/test_data_manager.py @ 303

Last change on this file since 303 was 282, checked in by ole, 21 years ago

Storing of sww format OK and tested

File size: 10.9 KB
Line 
1#!/usr/bin/env python
2#
3
4import unittest
5from Numeric import zeros, array, allclose, Float
6from util import mean
7
8from data_manager import *
9from shallow_water import *
10from config import epsilon
11
12class dataTestCase(unittest.TestCase):
13    def setUp(self):
14        import time
15        from mesh_factory import rectangular
16
17
18        #Create basic mesh
19        points, vertices, boundary = rectangular(2, 2)
20
21        #Create shallow water domain
22        domain = Domain(points, vertices, boundary)
23        domain.default_order=2
24
25       
26        #Set some field values
27        domain.set_quantity('elevation', lambda x,y: -x)       
28        domain.set_quantity('friction', 0.03)
29
30
31        ######################
32        # Boundary conditions
33        B = Transmissive_boundary(domain)
34        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
35       
36
37        ######################
38        #Initial condition - with jumps
39
40
41        bed = domain.quantities['elevation'].vertex_values
42        level = zeros(bed.shape, Float)
43
44        h = 0.3
45        for i in range(level.shape[0]):
46            if i % 2 == 0:           
47                level[i,:] = bed[i,:] + h
48            else:
49                level[i,:] = bed[i,:]
50               
51        domain.set_quantity('level', level)
52
53        domain.distribute_to_vertices_and_edges()   
54
55
56        self.domain = domain
57
58        C = domain.get_vertex_coordinates()
59        self.X = C[:,0:6:2].copy()
60        self.Y = C[:,1:6:2].copy()
61       
62        self.F = bed
63
64       
65    def tearDown(self):
66        pass
67
68
69       
70
71#     def test_xya(self):
72#         import os
73#         from Numeric import concatenate
74
75#         import time, os
76#         from Numeric import array, zeros, allclose, Float, concatenate
77
78#         domain = self.domain
79       
80#         domain.filename = 'datatest' + str(time.time())
81#         domain.format = 'xya'
82#         domain.smooth = True
83       
84#         xya = get_dataobject(self.domain)
85#         xya.store_all()
86
87
88#         #Read back
89#         file = open(xya.filename)
90#         lFile = file.read().split('\n')
91#         lFile = lFile[:-1]
92
93#         file.close()
94#         os.remove(xya.filename)
95
96#         #Check contents
97#         if domain.smooth:
98#             self.failUnless(lFile[0] == '9 3 # <vertex #> <x> <y> [attributes]')
99#         else:   
100#             self.failUnless(lFile[0] == '24 3 # <vertex #> <x> <y> [attributes]')           
101
102#         #Get smoothed field values with X and Y
103#         X,Y,F,V = domain.get_vertex_values(xy=True, value_array='field_values',
104#                                            indices = (0,1), precision = Float)
105
106
107#         Q,V = domain.get_vertex_values(xy=False, value_array='conserved_quantities',
108#                                            indices = (0,), precision = Float)       
109
110
111       
112#         for i, line in enumerate(lFile[1:]):
113#             fields = line.split()
114
115#             assert len(fields) == 5
116
117#             assert allclose(float(fields[0]), X[i])
118#             assert allclose(float(fields[1]), Y[i])
119#             assert allclose(float(fields[2]), F[i,0])
120#             assert allclose(float(fields[3]), Q[i,0])
121#             assert allclose(float(fields[4]), F[i,1])           
122
123
124
125
126    def test_sww_constant(self):
127        """Test that constant sww information can be written correctly
128        (non smooth)
129        """
130
131        import time, os
132        from Numeric import array, zeros, allclose, Float, concatenate
133        from Scientific.IO.NetCDF import NetCDFFile       
134
135        self.domain.filename = 'datatest' + str(time.time())
136        self.domain.format = 'sww'
137        self.domain.smooth = False
138     
139        sww = get_dataobject(self.domain)
140        sww.store_connectivity()
141
142        #Check contents
143        #Get NetCDF
144        fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
145     
146        # Get the variables
147        x = fid.variables['x']
148        y = fid.variables['y']
149        z = fid.variables['z']
150
151        volumes = fid.variables['volumes']       
152
153
154        assert allclose (x[:], self.X.flat)
155        assert allclose (y[:], self.Y.flat)
156        assert allclose (z[:], self.F.flat)
157
158        V = volumes
159
160        P = len(self.domain)
161        for k in range(P):
162            assert V[k, 0] == 3*k
163            assert V[k, 1] == 3*k+1
164            assert V[k, 2] == 3*k+2           
165     
166     
167        fid.close()
168     
169        #Cleanup
170        os.remove(sww.filename)
171
172
173    def test_sww_constant_smooth(self):
174        """Test that constant sww information can be written correctly
175        (non smooth)
176        """
177
178        import time, os
179        from Numeric import array, zeros, allclose, Float, concatenate
180        from Scientific.IO.NetCDF import NetCDFFile       
181
182        self.domain.filename = 'datatest' + str(time.time())
183        self.domain.format = 'sww'
184        self.domain.smooth = True
185     
186        sww = get_dataobject(self.domain)
187        sww.store_connectivity()       
188
189        #Check contents
190        #Get NetCDF
191        fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
192     
193        # Get the variables
194        x = fid.variables['x']
195        y = fid.variables['y']
196        z = fid.variables['z']
197
198        volumes = fid.variables['volumes']       
199
200        X = x[:]
201        Y = y[:]
202     
203        assert allclose([X[0], Y[0]], array([0.0, 0.0]))
204        assert allclose([X[1], Y[1]], array([0.0, 0.5]))
205        assert allclose([X[2], Y[2]], array([0.0, 1.0]))
206
207        assert allclose([X[4], Y[4]], array([0.5, 0.5]))
208
209        assert allclose([X[7], Y[7]], array([1.0, 0.5]))       
210
211        Z = z[:]
212        assert Z[4] == -0.5
213
214        V = volumes
215        assert V[2,0] == 4
216        assert V[2,1] == 5
217        assert V[2,2] == 1
218
219        assert V[4,0] == 6
220        assert V[4,1] == 7
221        assert V[4,2] == 3                     
222     
223     
224     
225        fid.close()
226     
227        #Cleanup
228        os.remove(sww.filename)
229
230
231
232    def test_sww_variable(self):
233        """Test that sww information can be written correctly
234        """
235
236        import time, os
237        from Numeric import array, zeros, allclose, Float, concatenate
238        from Scientific.IO.NetCDF import NetCDFFile       
239
240        self.domain.filename = 'datatest' + str(time.time())
241        self.domain.format = 'sww'
242        self.domain.smooth = True
243        self.domain.reduction = mean             
244     
245        sww = get_dataobject(self.domain)
246        sww.store_connectivity()
247        sww.store_timestep('level')
248
249        #Check contents
250        #Get NetCDF
251        fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
252     
253
254        # Get the variables
255        x = fid.variables['x']
256        y = fid.variables['y']
257        z = fid.variables['z']       
258        time = fid.variables['time']
259        stage = fid.variables['stage']
260
261
262        Q = self.domain.quantities['level']     
263        Q0 = Q.vertex_values[:,0]
264        Q1 = Q.vertex_values[:,1]
265        Q2 = Q.vertex_values[:,2]
266
267        A = stage[0,:]
268        #print A[0], (Q2[0,0] + Q1[1,0])/2
269        assert allclose(A[0], (Q2[0] + Q1[1])/2) 
270        assert allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3)
271        assert allclose(A[2], Q0[3])
272        assert allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3)
273
274        #Center point
275        assert allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\
276                                 Q0[5] + Q2[6] + Q1[7])/6)
277                               
278     
279     
280        fid.close()
281     
282        #Cleanup
283        os.remove(sww.filename)
284       
285       
286    def test_sww_variable2(self):
287        """Test that sww information can be written correctly
288        multiple timesteps. Use average as reduction operator
289        """
290
291        import time, os
292        from Numeric import array, zeros, allclose, Float, concatenate
293        from Scientific.IO.NetCDF import NetCDFFile
294
295        self.domain.filename = 'datatest' + str(time.time())
296        self.domain.format = 'sww'
297        self.domain.smooth = True
298
299        self.domain.reduction = mean     
300     
301        sww = get_dataobject(self.domain)
302        sww.store_connectivity()
303        sww.store_timestep('level')     
304        self.domain.evolve_to_end(finaltime = 0.01)
305        sww.store_timestep('level')
306
307
308        #Check contents
309        #Get NetCDF
310        fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
311
312        # Get the variables
313        x = fid.variables['x']
314        y = fid.variables['y']
315        z = fid.variables['z']
316        time = fid.variables['time']
317        stage = fid.variables['stage']       
318
319        #Check values
320        Q = self.domain.quantities['level']     
321        Q0 = Q.vertex_values[:,0]
322        Q1 = Q.vertex_values[:,1]
323        Q2 = Q.vertex_values[:,2]
324
325        A = stage[1,:]
326        assert allclose(A[0], (Q2[0] + Q1[1])/2) 
327        assert allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3)
328        assert allclose(A[2], Q0[3])
329        assert allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3)
330
331        #Center point
332        assert allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\
333                                 Q0[5] + Q2[6] + Q1[7])/6)
334                               
335     
336     
337         
338     
339        fid.close()
340     
341        #Cleanup
342        os.remove(sww.filename)
343       
344    def test_sww_variable3(self):
345        """Test that sww information can be written correctly
346        multiple timesteps using a different reduction operator (min)
347        """
348
349        import time, os
350        from Numeric import array, zeros, allclose, Float, concatenate
351        from Scientific.IO.NetCDF import NetCDFFile
352
353        self.domain.filename = 'datatest' + str(time.time())
354        self.domain.format = 'sww'
355        self.domain.smooth = True
356        self.domain.reduction = min
357     
358        sww = get_dataobject(self.domain)
359        sww.store_connectivity()
360        sww.store_timestep('level')
361     
362        self.domain.evolve_to_end(finaltime = 0.01)
363        sww.store_timestep('level')
364
365
366
367        #Check contents
368        #Get NetCDF
369        fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
370     
371
372        # Get the variables
373        x = fid.variables['x']
374        y = fid.variables['y']
375        z = fid.variables['z']
376        time = fid.variables['time']
377        stage = fid.variables['stage']       
378
379        #Check values
380        #Check values
381        Q = self.domain.quantities['level']     
382        Q0 = Q.vertex_values[:,0]
383        Q1 = Q.vertex_values[:,1]
384        Q2 = Q.vertex_values[:,2]
385
386        A = stage[1,:]
387        assert allclose(A[0], min(Q2[0], Q1[1])) 
388        assert allclose(A[1], min(Q0[1], Q1[3], Q2[2]))
389        assert allclose(A[2], Q0[3])
390        assert allclose(A[3], min(Q0[0], Q1[5], Q2[4]))
391
392        #Center point
393        assert allclose(A[4], min(Q1[0], Q2[1], Q0[2],\
394                                  Q0[5], Q2[6], Q1[7]))
395                               
396     
397        fid.close()
398     
399        #Cleanup
400        os.remove(sww.filename)
401
402
403#-------------------------------------------------------------
404if __name__ == "__main__":
405    suite = unittest.makeSuite(dataTestCase,'test')
406    runner = unittest.TextTestRunner()
407    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.