source: inundation/ga/storm_surge/pyvolution/test_data_manager.py @ 393

Last change on this file since 393 was 353, checked in by duncan, 20 years ago

added more least squares tests, investigating sww files

File size: 12.5 KB
Line 
1#!/usr/bin/env python
2#
3
4import unittest
5from Numeric import zeros, array, allclose, Float
6from util import mean
7
8from data_manager import *
9from shallow_water import *
10from config import epsilon
11
12class dataTestCase(unittest.TestCase):
13    def setUp(self):
14        import time
15        from mesh_factory import rectangular
16
17
18        #Create basic mesh
19        points, vertices, boundary = rectangular(2, 2)
20
21        #Create shallow water domain
22        domain = Domain(points, vertices, boundary)
23        domain.default_order=2
24
25       
26        #Set some field values
27        domain.set_quantity('elevation', lambda x,y: -x)       
28        domain.set_quantity('friction', 0.03)
29
30
31        ######################
32        # Boundary conditions
33        B = Transmissive_boundary(domain)
34        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
35       
36
37        ######################
38        #Initial condition - with jumps
39
40
41        bed = domain.quantities['elevation'].vertex_values
42        level = zeros(bed.shape, Float)
43
44        h = 0.3
45        for i in range(level.shape[0]):
46            if i % 2 == 0:           
47                level[i,:] = bed[i,:] + h
48            else:
49                level[i,:] = bed[i,:]
50               
51        domain.set_quantity('level', level)
52
53        domain.distribute_to_vertices_and_edges()   
54
55
56        self.domain = domain
57
58        C = domain.get_vertex_coordinates()
59        self.X = C[:,0:6:2].copy()
60        self.Y = C[:,1:6:2].copy()
61       
62        self.F = bed
63
64       
65    def tearDown(self):
66        pass
67
68
69       
70
71#     def test_xya(self):
72#         import os
73#         from Numeric import concatenate
74
75#         import time, os
76#         from Numeric import array, zeros, allclose, Float, concatenate
77
78#         domain = self.domain
79       
80#         domain.filename = 'datatest' + str(time.time())
81#         domain.format = 'xya'
82#         domain.smooth = True
83       
84#         xya = get_dataobject(self.domain)
85#         xya.store_all()
86
87
88#         #Read back
89#         file = open(xya.filename)
90#         lFile = file.read().split('\n')
91#         lFile = lFile[:-1]
92
93#         file.close()
94#         os.remove(xya.filename)
95
96#         #Check contents
97#         if domain.smooth:
98#             self.failUnless(lFile[0] == '9 3 # <vertex #> <x> <y> [attributes]')
99#         else:   
100#             self.failUnless(lFile[0] == '24 3 # <vertex #> <x> <y> [attributes]')           
101
102#         #Get smoothed field values with X and Y
103#         X,Y,F,V = domain.get_vertex_values(xy=True, value_array='field_values',
104#                                            indices = (0,1), precision = Float)
105
106
107#         Q,V = domain.get_vertex_values(xy=False, value_array='conserved_quantities',
108#                                            indices = (0,), precision = Float)       
109
110
111       
112#         for i, line in enumerate(lFile[1:]):
113#             fields = line.split()
114
115#             assert len(fields) == 5
116
117#             assert allclose(float(fields[0]), X[i])
118#             assert allclose(float(fields[1]), Y[i])
119#             assert allclose(float(fields[2]), F[i,0])
120#             assert allclose(float(fields[3]), Q[i,0])
121#             assert allclose(float(fields[4]), F[i,1])           
122
123
124
125
126    def test_sww_constant(self):
127        """Test that constant sww information can be written correctly
128        (non smooth)
129        """
130
131        import time, os
132        from Numeric import array, zeros, allclose, Float, concatenate
133        from Scientific.IO.NetCDF import NetCDFFile       
134
135        self.domain.filename = 'datatest' + str(time.time())
136        self.domain.format = 'sww'
137        self.domain.smooth = False
138     
139        sww = get_dataobject(self.domain)
140        sww.store_connectivity()
141
142        #Check contents
143        #Get NetCDF
144        fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
145     
146        # Get the variables
147        x = fid.variables['x']
148        y = fid.variables['y']
149        z = fid.variables['z']
150
151        volumes = fid.variables['volumes']       
152
153
154        assert allclose (x[:], self.X.flat)
155        assert allclose (y[:], self.Y.flat)
156        assert allclose (z[:], self.F.flat)
157
158        V = volumes
159
160        P = len(self.domain)
161        for k in range(P):
162            assert V[k, 0] == 3*k
163            assert V[k, 1] == 3*k+1
164            assert V[k, 2] == 3*k+2           
165     
166     
167        fid.close()
168     
169        #Cleanup
170        os.remove(sww.filename)
171
172
173    def test_sww_constant_smooth(self):
174        """Test that constant sww information can be written correctly
175        (non smooth)
176        """
177
178        import time, os
179        from Numeric import array, zeros, allclose, Float, concatenate
180        from Scientific.IO.NetCDF import NetCDFFile       
181
182        self.domain.filename = 'datatest' + str(time.time())
183        self.domain.format = 'sww'
184        self.domain.smooth = True
185     
186        sww = get_dataobject(self.domain)
187        sww.store_connectivity()       
188
189        #Check contents
190        #Get NetCDF
191        fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
192     
193        # Get the variables
194        x = fid.variables['x']
195        y = fid.variables['y']
196        z = fid.variables['z']
197
198        volumes = fid.variables['volumes']       
199
200        X = x[:]
201        Y = y[:]
202     
203        assert allclose([X[0], Y[0]], array([0.0, 0.0]))
204        assert allclose([X[1], Y[1]], array([0.0, 0.5]))
205        assert allclose([X[2], Y[2]], array([0.0, 1.0]))
206
207        assert allclose([X[4], Y[4]], array([0.5, 0.5]))
208
209        assert allclose([X[7], Y[7]], array([1.0, 0.5]))       
210
211        Z = z[:]
212        assert Z[4] == -0.5
213
214        V = volumes
215        assert V[2,0] == 4
216        assert V[2,1] == 5
217        assert V[2,2] == 1
218
219        assert V[4,0] == 6
220        assert V[4,1] == 7
221        assert V[4,2] == 3                     
222     
223
224        fid.close()
225     
226        #Cleanup
227        os.remove(sww.filename)
228
229
230
231    def test_sww_variable(self):
232        """Test that sww information can be written correctly
233        """
234
235        import time, os
236        from Numeric import array, zeros, allclose, Float, concatenate
237        from Scientific.IO.NetCDF import NetCDFFile       
238
239        self.domain.filename = 'datatest' + str(time.time())
240        self.domain.format = 'sww'
241        self.domain.smooth = True
242        self.domain.reduction = mean             
243     
244        sww = get_dataobject(self.domain)
245        sww.store_connectivity()
246        sww.store_timestep('level')
247
248        #Check contents
249        #Get NetCDF
250        fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
251     
252
253        # Get the variables
254        x = fid.variables['x']
255        y = fid.variables['y']
256        z = fid.variables['z']       
257        time = fid.variables['time']
258        stage = fid.variables['stage']
259     
260
261        Q = self.domain.quantities['level']     
262        Q0 = Q.vertex_values[:,0]
263        Q1 = Q.vertex_values[:,1]
264        Q2 = Q.vertex_values[:,2]
265
266        A = stage[0,:]
267        #print A[0], (Q2[0,0] + Q1[1,0])/2
268        assert allclose(A[0], (Q2[0] + Q1[1])/2) 
269        assert allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3)
270        assert allclose(A[2], Q0[3])
271        assert allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3)
272
273        #Center point
274        assert allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\
275                                 Q0[5] + Q2[6] + Q1[7])/6)
276                               
277     
278     
279        fid.close()
280     
281        #Cleanup
282        os.remove(sww.filename)
283       
284       
285    def test_sww_variable2(self):
286        """Test that sww information can be written correctly
287        multiple timesteps. Use average as reduction operator
288        """
289
290        import time, os
291        from Numeric import array, zeros, allclose, Float, concatenate
292        from Scientific.IO.NetCDF import NetCDFFile
293
294        self.domain.filename = 'datatest' + str(time.time())
295        self.domain.format = 'sww'
296        self.domain.smooth = True
297
298        self.domain.reduction = mean     
299     
300        sww = get_dataobject(self.domain)
301        sww.store_connectivity()
302        sww.store_timestep('level')     
303        self.domain.evolve_to_end(finaltime = 0.01)
304        sww.store_timestep('level')
305
306
307        #Check contents
308        #Get NetCDF
309        fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
310
311        # Get the variables
312        x = fid.variables['x']
313        y = fid.variables['y']
314        z = fid.variables['z']
315        time = fid.variables['time']
316        stage = fid.variables['stage']       
317
318        #Check values
319        Q = self.domain.quantities['level']     
320        Q0 = Q.vertex_values[:,0]
321        Q1 = Q.vertex_values[:,1]
322        Q2 = Q.vertex_values[:,2]
323
324        A = stage[1,:]
325        assert allclose(A[0], (Q2[0] + Q1[1])/2) 
326        assert allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3)
327        assert allclose(A[2], Q0[3])
328        assert allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3)
329
330        #Center point
331        assert allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\
332                                 Q0[5] + Q2[6] + Q1[7])/6)
333                               
334     
335     
336         
337     
338        fid.close()
339     
340        #Cleanup
341        os.remove(sww.filename)
342       
343    def test_sww_variable3(self):
344        """Test that sww information can be written correctly
345        multiple timesteps using a different reduction operator (min)
346        """
347
348        import time, os
349        from Numeric import array, zeros, allclose, Float, concatenate
350        from Scientific.IO.NetCDF import NetCDFFile
351
352        self.domain.filename = 'datatest' + str(time.time())
353        self.domain.format = 'sww'
354        self.domain.smooth = True
355        self.domain.reduction = min
356     
357        sww = get_dataobject(self.domain)
358        sww.store_connectivity()
359        sww.store_timestep('level')
360     
361        self.domain.evolve_to_end(finaltime = 0.01)
362        sww.store_timestep('level')
363
364
365
366        #Check contents
367        #Get NetCDF
368        fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
369     
370
371        # Get the variables
372        x = fid.variables['x']
373        y = fid.variables['y']
374        z = fid.variables['z']
375        time = fid.variables['time']
376        stage = fid.variables['stage']       
377
378        #Check values
379        #Check values
380        Q = self.domain.quantities['level']     
381        Q0 = Q.vertex_values[:,0]
382        Q1 = Q.vertex_values[:,1]
383        Q2 = Q.vertex_values[:,2]
384
385        A = stage[1,:]
386        assert allclose(A[0], min(Q2[0], Q1[1])) 
387        assert allclose(A[1], min(Q0[1], Q1[3], Q2[2]))
388        assert allclose(A[2], Q0[3])
389        assert allclose(A[3], min(Q0[0], Q1[5], Q2[4]))
390
391        #Center point
392        assert allclose(A[4], min(Q1[0], Q2[1], Q0[2],\
393                                  Q0[5], Q2[6], Q1[7]))
394                               
395     
396        fid.close()
397     
398        #Cleanup
399        os.remove(sww.filename)
400
401
402    def test_sww_DSG(self):
403        """Not a test, rather a look at the sww format
404        """
405
406        import time, os
407        from Numeric import array, zeros, allclose, Float, concatenate
408        from Scientific.IO.NetCDF import NetCDFFile       
409
410        self.domain.filename = 'datatest' + str(time.time())
411        self.domain.format = 'sww'
412        self.domain.smooth = True
413        self.domain.reduction = mean             
414     
415        sww = get_dataobject(self.domain)
416        sww.store_connectivity()
417        sww.store_timestep('level')
418
419        #Check contents
420        #Get NetCDF
421        fid = NetCDFFile(sww.filename, 'r') 
422     
423        # Get the variables
424        x = fid.variables['x']
425        y = fid.variables['y']
426        z = fid.variables['z']
427
428        volumes = fid.variables['volumes']   
429        time = fid.variables['time']
430
431        # 2D
432        stage = fid.variables['stage']       
433
434        X = x[:]
435        Y = y[:]
436        Z = z[:]
437        V = volumes[:]
438        T = time[:]
439        S = stage[:,:]
440     
441#         print "****************************"
442#         print "X ",X
443#         print "****************************"
444#         print "Y ",Y
445#         print "****************************"
446#         print "Z ",Z
447#         print "****************************"
448#         print "V ",V
449#         print "****************************"
450#         print "Time ",T
451#         print "****************************"
452#         print "Stage ",S
453#         print "****************************"
454
455
456       
457       
458        fid.close()
459     
460        #Cleanup
461        os.remove(sww.filename)
462       
463#-------------------------------------------------------------
464if __name__ == "__main__":
465    suite = unittest.makeSuite(dataTestCase,'test')
466    runner = unittest.TextTestRunner()
467    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.