source: inundation/ga/storm_surge/pyvolution/test_data_manager.py @ 839

Last change on this file since 839 was 839, checked in by ole, 20 years ago

Slight change to pts format having attribute names as variables

File size: 15.4 KB
Line 
1#!/usr/bin/env python
2#
3
4import unittest
5import copy
6from Numeric import zeros, array, allclose, Float
7from util import mean
8
9from data_manager import *
10from shallow_water import *
11from config import epsilon
12
13class dataTestCase(unittest.TestCase):
14    def setUp(self):
15        import time
16        from mesh_factory import rectangular
17
18
19        #Create basic mesh
20        points, vertices, boundary = rectangular(2, 2)
21
22        #Create shallow water domain
23        domain = Domain(points, vertices, boundary)
24        domain.default_order=2
25
26       
27        #Set some field values
28        domain.set_quantity('elevation', lambda x,y: -x)       
29        domain.set_quantity('friction', 0.03)
30
31
32        ######################
33        # Boundary conditions
34        B = Transmissive_boundary(domain)
35        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
36       
37
38        ######################
39        #Initial condition - with jumps
40
41
42        bed = domain.quantities['elevation'].vertex_values
43        stage = zeros(bed.shape, Float)
44
45        h = 0.3
46        for i in range(stage.shape[0]):
47            if i % 2 == 0:           
48                stage[i,:] = bed[i,:] + h
49            else:
50                stage[i,:] = bed[i,:]
51               
52        domain.set_quantity('stage', stage)
53        self.initial_stage = copy.copy(domain.quantities['stage'].vertex_values)
54
55        domain.distribute_to_vertices_and_edges()   
56
57
58        self.domain = domain
59
60        C = domain.get_vertex_coordinates()
61        self.X = C[:,0:6:2].copy()
62        self.Y = C[:,1:6:2].copy()
63       
64        self.F = bed
65
66       
67    def tearDown(self):
68        pass
69
70
71       
72
73#     def test_xya(self):
74#         import os
75#         from Numeric import concatenate
76
77#         import time, os
78#         from Numeric import array, zeros, allclose, Float, concatenate
79
80#         domain = self.domain
81       
82#         domain.filename = 'datatest' + str(time.time())
83#         domain.format = 'xya'
84#         domain.smooth = True
85       
86#         xya = get_dataobject(self.domain)
87#         xya.store_all()
88
89
90#         #Read back
91#         file = open(xya.filename)
92#         lFile = file.read().split('\n')
93#         lFile = lFile[:-1]
94
95#         file.close()
96#         os.remove(xya.filename)
97
98#         #Check contents
99#         if domain.smooth:
100#             self.failUnless(lFile[0] == '9 3 # <vertex #> <x> <y> [attributes]')
101#         else:   
102#             self.failUnless(lFile[0] == '24 3 # <vertex #> <x> <y> [attributes]')           
103
104#         #Get smoothed field values with X and Y
105#         X,Y,F,V = domain.get_vertex_values(xy=True, value_array='field_values',
106#                                            indices = (0,1), precision = Float)
107
108
109#         Q,V = domain.get_vertex_values(xy=False, value_array='conserved_quantities',
110#                                            indices = (0,), precision = Float)       
111
112
113       
114#         for i, line in enumerate(lFile[1:]):
115#             fields = line.split()
116
117#             assert len(fields) == 5
118
119#             assert allclose(float(fields[0]), X[i])
120#             assert allclose(float(fields[1]), Y[i])
121#             assert allclose(float(fields[2]), F[i,0])
122#             assert allclose(float(fields[3]), Q[i,0])
123#             assert allclose(float(fields[4]), F[i,1])           
124
125
126
127
128    def test_sww_constant(self):
129        """Test that constant sww information can be written correctly
130        (non smooth)
131        """
132
133        import time, os
134        from Numeric import array, zeros, allclose, Float, concatenate
135        from Scientific.IO.NetCDF import NetCDFFile       
136
137        self.domain.filename = 'datatest' + str(time.time())
138        self.domain.format = 'sww'
139        self.domain.smooth = False
140     
141        sww = get_dataobject(self.domain)
142        sww.store_connectivity()
143
144        #Check contents
145        #Get NetCDF
146        fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
147     
148        # Get the variables
149        x = fid.variables['x']
150        y = fid.variables['y']
151        z = fid.variables['z']
152
153        volumes = fid.variables['volumes']       
154
155
156        assert allclose (x[:], self.X.flat)
157        assert allclose (y[:], self.Y.flat)
158        assert allclose (z[:], self.F.flat)
159
160        V = volumes
161
162        P = len(self.domain)
163        for k in range(P):
164            assert V[k, 0] == 3*k
165            assert V[k, 1] == 3*k+1
166            assert V[k, 2] == 3*k+2           
167     
168     
169        fid.close()
170     
171        #Cleanup
172        os.remove(sww.filename)
173
174
175    def test_sww_constant_smooth(self):
176        """Test that constant sww information can be written correctly
177        (non smooth)
178        """
179
180        import time, os
181        from Numeric import array, zeros, allclose, Float, concatenate
182        from Scientific.IO.NetCDF import NetCDFFile       
183
184        self.domain.filename = 'datatest' + str(time.time())
185        self.domain.format = 'sww'
186        self.domain.smooth = True
187     
188        sww = get_dataobject(self.domain)
189        sww.store_connectivity()       
190
191        #Check contents
192        #Get NetCDF
193        fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
194     
195        # Get the variables
196        x = fid.variables['x']
197        y = fid.variables['y']
198        z = fid.variables['z']
199
200        volumes = fid.variables['volumes']       
201
202        X = x[:]
203        Y = y[:]
204     
205        assert allclose([X[0], Y[0]], array([0.0, 0.0]))
206        assert allclose([X[1], Y[1]], array([0.0, 0.5]))
207        assert allclose([X[2], Y[2]], array([0.0, 1.0]))
208
209        assert allclose([X[4], Y[4]], array([0.5, 0.5]))
210
211        assert allclose([X[7], Y[7]], array([1.0, 0.5]))       
212
213        Z = z[:]
214        assert Z[4] == -0.5
215
216        V = volumes
217        assert V[2,0] == 4
218        assert V[2,1] == 5
219        assert V[2,2] == 1
220
221        assert V[4,0] == 6
222        assert V[4,1] == 7
223        assert V[4,2] == 3                     
224     
225
226        fid.close()
227     
228        #Cleanup
229        os.remove(sww.filename)
230
231
232
233    def test_sww_variable(self):
234        """Test that sww information can be written correctly
235        """
236
237        import time, os
238        from Numeric import array, zeros, allclose, Float, concatenate
239        from Scientific.IO.NetCDF import NetCDFFile       
240
241        self.domain.filename = 'datatest' + str(time.time())
242        self.domain.format = 'sww'
243        self.domain.smooth = True
244        self.domain.reduction = mean             
245     
246        sww = get_dataobject(self.domain)
247        sww.store_connectivity()
248        sww.store_timestep('stage')
249
250        #Check contents
251        #Get NetCDF
252        fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
253     
254
255        # Get the variables
256        x = fid.variables['x']
257        y = fid.variables['y']
258        z = fid.variables['z']       
259        time = fid.variables['time']
260        stage = fid.variables['stage']
261     
262
263        Q = self.domain.quantities['stage']     
264        Q0 = Q.vertex_values[:,0]
265        Q1 = Q.vertex_values[:,1]
266        Q2 = Q.vertex_values[:,2]
267
268        A = stage[0,:]
269        #print A[0], (Q2[0,0] + Q1[1,0])/2
270        assert allclose(A[0], (Q2[0] + Q1[1])/2) 
271        assert allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3)
272        assert allclose(A[2], Q0[3])
273        assert allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3)
274
275        #Center point
276        assert allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\
277                                 Q0[5] + Q2[6] + Q1[7])/6)
278                               
279     
280     
281        fid.close()
282     
283        #Cleanup
284        os.remove(sww.filename)
285       
286       
287    def test_sww_variable2(self):
288        """Test that sww information can be written correctly
289        multiple timesteps. Use average as reduction operator
290        """
291
292        import time, os
293        from Numeric import array, zeros, allclose, Float, concatenate
294        from Scientific.IO.NetCDF import NetCDFFile
295
296        self.domain.filename = 'datatest' + str(time.time())
297        self.domain.format = 'sww'
298        self.domain.smooth = True
299
300        self.domain.reduction = mean     
301     
302        sww = get_dataobject(self.domain)
303        sww.store_connectivity()
304        sww.store_timestep('stage')     
305        self.domain.evolve_to_end(finaltime = 0.01)
306        sww.store_timestep('stage')
307
308
309        #Check contents
310        #Get NetCDF
311        fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
312
313        # Get the variables
314        x = fid.variables['x']
315        y = fid.variables['y']
316        z = fid.variables['z']
317        time = fid.variables['time']
318        stage = fid.variables['stage']       
319
320        #Check values
321        Q = self.domain.quantities['stage']     
322        Q0 = Q.vertex_values[:,0]
323        Q1 = Q.vertex_values[:,1]
324        Q2 = Q.vertex_values[:,2]
325
326        A = stage[1,:]
327        assert allclose(A[0], (Q2[0] + Q1[1])/2) 
328        assert allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3)
329        assert allclose(A[2], Q0[3])
330        assert allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3)
331
332        #Center point
333        assert allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\
334                                 Q0[5] + Q2[6] + Q1[7])/6)
335                               
336     
337     
338         
339     
340        fid.close()
341     
342        #Cleanup
343        os.remove(sww.filename)
344       
345    def test_sww_variable3(self):
346        """Test that sww information can be written correctly
347        multiple timesteps using a different reduction operator (min)
348        """
349
350        import time, os
351        from Numeric import array, zeros, allclose, Float, concatenate
352        from Scientific.IO.NetCDF import NetCDFFile
353
354        self.domain.filename = 'datatest' + str(time.time())
355        self.domain.format = 'sww'
356        self.domain.smooth = True
357        self.domain.reduction = min
358     
359        sww = get_dataobject(self.domain)
360        sww.store_connectivity()
361        sww.store_timestep('stage')
362     
363        self.domain.evolve_to_end(finaltime = 0.01)
364        sww.store_timestep('stage')
365
366
367
368        #Check contents
369        #Get NetCDF
370        fid = NetCDFFile(sww.filename, 'r')
371     
372
373        # Get the variables
374        x = fid.variables['x']
375        y = fid.variables['y']
376        z = fid.variables['z']
377        time = fid.variables['time']
378        stage = fid.variables['stage']       
379
380        #Check values
381        #Check values
382        Q = self.domain.quantities['stage']     
383        Q0 = Q.vertex_values[:,0]
384        Q1 = Q.vertex_values[:,1]
385        Q2 = Q.vertex_values[:,2]
386
387        A = stage[1,:]
388        assert allclose(A[0], min(Q2[0], Q1[1])) 
389        assert allclose(A[1], min(Q0[1], Q1[3], Q2[2]))
390        assert allclose(A[2], Q0[3])
391        assert allclose(A[3], min(Q0[0], Q1[5], Q2[4]))
392
393        #Center point
394        assert allclose(A[4], min(Q1[0], Q2[1], Q0[2],\
395                                  Q0[5], Q2[6], Q1[7]))
396                               
397     
398        fid.close()
399     
400        #Cleanup
401        os.remove(sww.filename)
402
403
404    def test_sync(self):
405        """Test info stored at each timestep is as expected (incl initial condition)
406        """
407
408        import time, os, config
409        from Numeric import array, zeros, allclose, Float, concatenate
410        from Scientific.IO.NetCDF import NetCDFFile       
411
412        self.domain.filename = 'synctest'
413        self.domain.format = 'sww'
414        self.domain.smooth = False
415        self.domain.store = True
416
417        #Evolution
418        for t in self.domain.evolve(yieldstep = 1.0, finaltime = 4.0):
419            stage = self.domain.quantities['stage'].vertex_values
420
421            #Get NetCDF
422            fid = NetCDFFile(self.domain.writer.filename, 'r')
423            stage_file = fid.variables['stage']
424
425            if t == 0.0:
426                assert allclose(stage, self.initial_stage)
427                assert allclose(stage_file[:], stage.flat) 
428            else:
429                assert not allclose(stage, self.initial_stage)
430                assert not allclose(stage_file[:], stage.flat)                 
431
432            fid.close()
433        os.remove(self.domain.writer.filename)
434
435
436
437    def test_sww_DSG(self):
438        """Not a test, rather a look at the sww format
439        """
440
441        import time, os
442        from Numeric import array, zeros, allclose, Float, concatenate
443        from Scientific.IO.NetCDF import NetCDFFile       
444
445        self.domain.filename = 'datatest' + str(time.time())
446        self.domain.format = 'sww'
447        self.domain.smooth = True
448        self.domain.reduction = mean             
449     
450        sww = get_dataobject(self.domain)
451        sww.store_connectivity()
452        sww.store_timestep('stage')
453
454        #Check contents
455        #Get NetCDF
456        fid = NetCDFFile(sww.filename, 'r') 
457     
458        # Get the variables
459        x = fid.variables['x']
460        y = fid.variables['y']
461        z = fid.variables['z']
462
463        volumes = fid.variables['volumes']   
464        time = fid.variables['time']
465
466        # 2D
467        stage = fid.variables['stage']       
468
469        X = x[:]
470        Y = y[:]
471        Z = z[:]
472        V = volumes[:]
473        T = time[:]
474        S = stage[:,:]
475     
476#         print "****************************"
477#         print "X ",X
478#         print "****************************"
479#         print "Y ",Y
480#         print "****************************"
481#         print "Z ",Z
482#         print "****************************"
483#         print "V ",V
484#         print "****************************"
485#         print "Time ",T
486#         print "****************************"
487#         print "Stage ",S
488#         print "****************************"
489
490
491       
492       
493        fid.close()
494     
495        #Cleanup
496        os.remove(sww.filename)
497
498
499
500    def test_dem2pts(self):
501        """Test conversion from dem in ascii format to native NetCDF xya format
502        """
503
504        import time, os
505        from Numeric import array, zeros, allclose, Float, concatenate
506        from Scientific.IO.NetCDF import NetCDFFile
507
508        #Write test dem file
509        root = 'demtest'+str(time.time())
510
511        filename = root+'.asc'
512        fid = open(filename, 'w')
513        fid.write("""ncols         5
514nrows         6
515xllcorner     2000.5
516yllcorner     3000.5
517cellsize      25
518NODATA_value  -9999
519""")
520        #Create linear function
521
522        ref_points = []
523        ref_elevation = []
524        for i in range(6):
525            y = (6-i)*25.0           
526            for j in range(5):
527                x = j*25.0
528                z = x+2*y
529
530                ref_points.append( [x,y] )
531                ref_elevation.append(z)
532                fid.write('%f ' %z)
533            fid.write('\n')   
534
535        fid.close()
536
537        #Convert to NetCDF xya
538        convert_dem_from_ascii2netcdf(filename)
539        dem2pts(root + '.dem')
540
541        #Check contents
542        #Get NetCDF
543        fid = NetCDFFile(root+'.pts', 'r')
544     
545        # Get the variables
546        #print fid.variables.keys()
547        points = fid.variables['points']
548        elevation = fid.variables['elevation']
549
550        #Check values
551
552        #print points[:]
553        #print ref_points
554        assert allclose(points, ref_points)
555
556        #print attributes[:]
557        #print ref_elevation
558        assert allclose(elevation, ref_elevation)       
559     
560        #Cleanup
561        fid.close()
562       
563
564        os.remove(root + '.pts')
565        os.remove(root + '.dem')               
566        os.remove(root + '.asc')       
567       
568       
569#-------------------------------------------------------------
570if __name__ == "__main__":
571    suite = unittest.makeSuite(dataTestCase,'test')
572    runner = unittest.TextTestRunner()
573    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.