source: inundation/ga/storm_surge/pyvolution/test_data_manager.py @ 278

Last change on this file since 278 was 276, checked in by ole, 21 years ago

Added old data manager and test for future integration

File size: 12.5 KB
Line 
1#!/usr/bin/env python
2#
3
4import unittest
5from Numeric import zeros, array, allclose, Float
6from util import mean
7
8from data_manager import *
9from shallow_water import *
10from config import epsilon
11
12class dataTestCase(unittest.TestCase):
13    def setUp(self):
14        import time
15        from mesh_factory import rectangular
16
17
18        #Create basic mesh
19        points, vertices, boundary = rectangular(2, 2)
20
21        #Create shallow water domain
22        domain = Domain(points, vertices, boundary)
23        domain.default_order=2
24
25       
26        #Set some field values
27        domain.set_quantity('elevation', lambda x,y: -x)       
28        domain.set_quantity('friction', 0.03)
29
30
31        ######################
32        # Boundary conditions
33        B = Transmissive_boundary(domain)
34        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
35       
36
37        ######################
38        #Initial condition - with jumps
39
40
41        bed = domain.quantities['elevation'].vertex_values
42        level = zeros(bed.shape, Float)
43
44        h = 0.3
45        for i in range(level.shape[0]):
46            if i % 2 == 0:           
47                level[i,:] = bed[i,:] + h
48            else:
49                level[i,:] = bed[i,:]
50               
51        domain.set_quantity('level', level)
52
53        domain.distribute_to_vertices_and_edges()   
54
55
56        self.domain = domain
57
58        self.X = domain.get_vertex_coordinates()
59        self.X0 = self.X[:,0]
60        self.Y0 = self.X[:,1]
61        self.X1 = self.X[:,2]
62        self.Y1 = self.X[:,3]
63        self.X2 = self.X[:,4]
64        self.Y2 = self.X[:,5]               
65       
66        self.F = bed
67
68       
69    def tearDown(self):
70        pass
71
72
73       
74
75#     def test_xya(self):
76#         import os
77#         from Numeric import concatenate
78
79#         import time, os
80#         from Numeric import array, zeros, allclose, Float, concatenate
81
82#         domain = self.domain
83       
84#         domain.filename = 'datatest' + str(time.time())
85#         domain.format = 'xya'
86#         domain.smooth = True
87       
88#         xya = get_dataobject(self.domain)
89#         xya.store_all()
90
91
92#         #Read back
93#         file = open(xya.filename)
94#         lFile = file.read().split('\n')
95#         lFile = lFile[:-1]
96
97#         file.close()
98#         os.remove(xya.filename)
99
100#         #Check contents
101#         if domain.smooth:
102#             self.failUnless(lFile[0] == '9 3 # <vertex #> <x> <y> [attributes]')
103#         else:   
104#             self.failUnless(lFile[0] == '24 3 # <vertex #> <x> <y> [attributes]')           
105
106#         #Get smoothed field values with X and Y
107#         X,Y,F,V = domain.get_vertex_values(xy=True, value_array='field_values',
108#                                            indices = (0,1), precision = Float)
109
110
111#         Q,V = domain.get_vertex_values(xy=False, value_array='conserved_quantities',
112#                                            indices = (0,), precision = Float)       
113
114
115       
116#         for i, line in enumerate(lFile[1:]):
117#             fields = line.split()
118
119#             assert len(fields) == 5
120
121#             assert allclose(float(fields[0]), X[i])
122#             assert allclose(float(fields[1]), Y[i])
123#             assert allclose(float(fields[2]), F[i,0])
124#             assert allclose(float(fields[3]), Q[i,0])
125#             assert allclose(float(fields[4]), F[i,1])           
126
127
128
129
130#     def test_sww_constant(self):
131#         """Test that constant sww information can be written correctly
132#         (non smooth)
133#         """
134
135#         import time, os
136#         from Numeric import array, zeros, allclose, Float, concatenate
137#         from Scientific.IO.NetCDF import NetCDFFile       
138
139#         self.domain.filename = 'datatest' + str(time.time())
140#         self.domain.format = 'sww'
141#         self.domain.smooth = False
142       
143#         sww = get_dataobject(self.domain)
144#         sww.store_all()
145
146#         #Check contents
147#         #Get NetCDF
148#         fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
149       
150#         # Get the variables
151#         x = fid.variables['x']
152#         y = fid.variables['y']
153#         z = fid.variables['z']
154
155#         volumes = fid.variables['volumes']       
156
157#         assert allclose (x[:], concatenate( (self.X0, self.X1, self.X2) ))
158#         assert allclose (y[:], concatenate( (self.Y0, self.Y1, self.Y2) ))
159#         assert allclose (z[:], concatenate( (self.F0[:,0],
160#                                              self.F1[:,0],
161#                                              self.F2[:,0]) ))
162
163#         V = volumes
164
165#         P = len(self.domain)
166#         for k in range(P):
167#              assert V[k,0] == V[k,1] - P
168#              assert V[k,1] == V[k,2] - P
169       
170
171
172       
173#         fid.close()
174       
175#         #Cleanup
176#         os.remove(sww.filename)
177
178
179#     def test_sww_constant_smooth(self):
180#         """Test that constant sww information can be written correctly
181#         (non smooth)
182#         """
183
184
185#         import time, os
186#         from Numeric import array, zeros, allclose, Float, concatenate
187#         from Scientific.IO.NetCDF import NetCDFFile       
188
189#         self.domain.filename = 'datatest' + str(time.time())
190#         self.domain.format = 'sww'
191#         self.domain.smooth = True
192       
193#         sww = get_dataobject(self.domain)
194#         sww.store_all()
195
196#         #Check contents
197#         #Get NetCDF
198#         fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
199       
200#         # Get the variables
201#         x = fid.variables['x']
202#         y = fid.variables['y']
203#         z = fid.variables['z']
204
205#         volumes = fid.variables['volumes']       
206
207#         X = x[:]
208#         Y = y[:]
209       
210#         assert allclose([X[0], Y[0]], array([0.0, 0.0]))
211#         assert allclose([X[1], Y[1]], array([0.0, 0.5]))
212#         assert allclose([X[2], Y[2]], array([0.0, 1.0]))
213
214#         assert allclose([X[4], Y[4]], array([0.5, 0.5]))
215
216#         assert allclose([X[7], Y[7]], array([1.0, 0.5]))       
217
218#         Z = z[:]
219#         assert Z[4] == -0.5
220
221
222
223#         V = volumes
224#         assert V[2,0] == 4
225#         assert V[2,1] == 5
226#         assert V[2,2] == 1
227
228#         assert V[4,0] == 6
229#         assert V[4,1] == 7
230#         assert V[4,2] == 3                     
231       
232       
233
234
235       
236#         fid.close()
237       
238#         #Cleanup
239#         os.remove(sww.filename)
240
241
242
243#     def test_sww_variable(self):
244#         """Test that sww information can be written correctly
245#         """
246
247#         import time, os
248#         from Numeric import array, zeros, allclose, Float, concatenate
249#         from Scientific.IO.NetCDF import NetCDFFile       
250
251#         self.domain.filename = 'datatest' + str(time.time())
252#         self.domain.format = 'sww'
253#         self.domain.smooth = True
254#         self.domain.reduction = mean             
255       
256#         sww = get_dataobject(self.domain)
257#         sww.store_all()
258#         sww.store_timestep()
259
260#         #Check contents
261#         #Get NetCDF
262#         fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
263       
264
265#         # Get the variables
266#         x = fid.variables['x']
267#         y = fid.variables['y']
268#         z = fid.variables['z']       
269#         time = fid.variables['time']
270#         stage = fid.variables['stage']
271
272
273#         Q0 = self.domain.volume_class.conserved_quantities_vertex0
274#         Q1 = self.domain.volume_class.conserved_quantities_vertex1
275#         Q2 = self.domain.volume_class.conserved_quantities_vertex2       
276
277#         A = stage[0,:]
278#         #print A[0], (Q2[0,0] + Q1[1,0])/2
279#         assert allclose(A[0], (Q2[0,0] + Q1[1,0])/2) 
280#         assert allclose(A[1], (Q0[1,0] + Q1[3,0] + Q2[2,0])/3)
281#         assert allclose(A[2], Q0[3,0])
282#         assert allclose(A[3], (Q0[0,0] + Q1[5,0] + Q2[4,0])/3)
283
284#         #Center point
285#         assert allclose(A[4], (Q1[0,0] + Q2[1,0] + Q0[2,0] +\
286#                                  Q0[5,0] + Q2[6,0] + Q1[7,0])/6)
287                                 
288       
289       
290#         fid.close()
291       
292#         #Cleanup
293#         os.remove(sww.filename)
294       
295       
296#     def test_sww_variable2(self):
297#         """Test that sww information can be written correctly
298#         multiple timesteps. Use average as reduction operator
299#         """
300
301#         import time, os
302#         from Numeric import array, zeros, allclose, Float, concatenate
303#         from Scientific.IO.NetCDF import NetCDFFile
304
305#         self.domain.filename = 'datatest' + str(time.time())
306#         self.domain.format = 'sww'
307#         self.domain.smooth = True
308
309#         self.domain.reduction = mean     
310       
311#         sww = get_dataobject(self.domain)
312#         sww.store_all()
313#         sww.store_timestep()
314       
315#         self.domain.evolve_to_end(max_timestep = 0.01, finaltime = 0.01)
316#         sww.store_timestep()
317
318
319
320#         #Check contents
321#         #Get NetCDF
322#         fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
323       
324
325#         # Get the variables
326#         x = fid.variables['x']
327#         y = fid.variables['y']
328#         z = fid.variables['z']
329#         time = fid.variables['time']
330#         stage = fid.variables['stage']       
331
332#         #Check values
333
334#         Q0 = self.domain.volume_class.conserved_quantities_vertex0
335#         Q1 = self.domain.volume_class.conserved_quantities_vertex1
336#         Q2 = self.domain.volume_class.conserved_quantities_vertex2       
337
338#         A = stage[1,:]
339#         assert allclose(A[0], (Q2[0,0] + Q1[1,0])/2) 
340#         assert allclose(A[1], (Q0[1,0] + Q1[3,0] + Q2[2,0])/3)
341#         assert allclose(A[2], Q0[3,0])
342#         assert allclose(A[3], (Q0[0,0] + Q1[5,0] + Q2[4,0])/3)
343
344#         #Center point
345#         assert allclose(A[4], (Q1[0,0] + Q2[1,0] + Q0[2,0] +\
346#                                  Q0[5,0] + Q2[6,0] + Q1[7,0])/6)
347                                 
348       
349       
350           
351       
352#         fid.close()
353       
354#         #Cleanup
355#         os.remove(sww.filename)
356       
357#     def test_sww_variable3(self):
358#         """Test that sww information can be written correctly
359#         multiple timesteps using a different reduction operator (min)
360#         """
361
362#         import time, os
363#         from Numeric import array, zeros, allclose, Float, concatenate
364#         from Scientific.IO.NetCDF import NetCDFFile
365
366#         self.domain.filename = 'datatest' + str(time.time())
367#         self.domain.format = 'sww'
368#         self.domain.smooth = True
369#         self.domain.reduction = min
370       
371#         sww = get_dataobject(self.domain)
372#         sww.store_all()
373#         sww.store_timestep()
374       
375#         self.domain.evolve_to_end(max_timestep = 0.01, finaltime = 0.01)
376#         sww.store_timestep()
377
378
379
380#         #Check contents
381#         #Get NetCDF
382#         fid = NetCDFFile(sww.filename, 'r')  #Open existing file for append
383       
384
385#         # Get the variables
386#         x = fid.variables['x']
387#         y = fid.variables['y']
388#         z = fid.variables['z']
389#         time = fid.variables['time']
390#         stage = fid.variables['stage']       
391
392#         #Check values
393
394#         Q0 = self.domain.volume_class.conserved_quantities_vertex0
395#         Q1 = self.domain.volume_class.conserved_quantities_vertex1
396#         Q2 = self.domain.volume_class.conserved_quantities_vertex2       
397
398#         A = stage[1,:]
399#         assert allclose(A[0], min(Q2[0,0], Q1[1,0]) ) 
400#         assert allclose(A[1], min(Q0[1,0], Q1[3,0], Q2[2,0]) )
401#         assert allclose(A[2], Q0[3,0])
402#         assert allclose(A[3], min(Q0[0,0], Q1[5,0], Q2[4,0]) )
403
404#         #Center point
405#         assert allclose(A[4], min(Q1[0,0], Q2[1,0], Q0[2,0],
406#                                   Q0[5,0], Q2[6,0], Q1[7,0]) )
407                                 
408       
409       
410           
411       
412#         fid.close()
413       
414#         #Cleanup
415#         os.remove(sww.filename)
416
417
418    def test_dummy(self):
419        pass
420
421#-------------------------------------------------------------
422if __name__ == "__main__":
423    #suite = unittest.makeSuite(dataTestCase,'test_sww_constant')
424    suite = unittest.makeSuite(dataTestCase,'test_dummy')
425    runner = unittest.TextTestRunner()
426    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.