source: inundation/pyvolution/test_interpolate_sww.py @ 2920

Last change on this file since 2920 was 1814, checked in by duncan, 19 years ago

added more error checking to files loaded

File size: 8.9 KB
Line 
1#!/usr/bin/env python
2#
3"""
4Testing interpolate_sww, based on test_data_manageer, so there maybe code
5that isn't needed, eg in the setup file
6"""
7
8import unittest
9from Numeric import zeros, array, allclose, Float
10from util import mean
11
12from interpolate_sww import *
13from shallow_water import *
14from data_manager import *
15#from config import epsilon
16
17
18class Test_Interpolate_sww(unittest.TestCase):
19    def setUp(self):
20
21        import time
22        from mesh_factory import rectangular
23
24
25        #Create basic mesh
26        points, vertices, boundary = rectangular(2, 2)
27
28        #Create shallow water domain
29        domain = Domain(points, vertices, boundary)
30        domain.default_order=2
31        domain.beta_h = 0
32
33
34        #Set some field values
35        domain.set_quantity('elevation', lambda x,y: -x)
36        domain.set_quantity('friction', 0.03)
37
38
39        ######################
40        # Boundary conditions
41        B = Transmissive_boundary(domain)
42        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
43
44
45        ######################
46        #Initial condition - with jumps
47
48        bed = domain.quantities['elevation'].vertex_values
49        stage = zeros(bed.shape, Float)
50
51        h = 0.3
52        for i in range(stage.shape[0]):
53            if i % 2 == 0:
54                stage[i,:] = bed[i,:] + h
55            else:
56                stage[i,:] = bed[i,:]
57
58        domain.set_quantity('stage', stage)
59
60        domain.distribute_to_vertices_and_edges()
61
62
63        self.domain = domain
64
65        C = domain.get_vertex_coordinates()
66        self.X = C[:,0:6:2].copy()
67        self.Y = C[:,1:6:2].copy()
68
69        self.F = bed
70
71
72    def tearDown(self):
73        pass
74
75    def test_sww_DSG(self):
76        """Not a test, rather a look at the sww format
77        """
78
79        import time, os
80        from Numeric import array, zeros, allclose, Float, concatenate
81        from Scientific.IO.NetCDF import NetCDFFile
82
83        self.domain.filename = 'datatest' + str(time.time())
84        self.domain.format = 'sww'
85        self.domain.smooth = True
86        self.domain.reduction = mean
87
88        sww = get_dataobject(self.domain)
89        sww.store_connectivity()
90        sww.store_timestep('stage')
91        self.domain.time = 2.
92        sww.store_timestep('stage')
93
94        #Check contents
95        #Get NetCDF
96        fid = NetCDFFile(sww.filename, 'r')
97
98        # Get the variables
99        x = fid.variables['x']
100        y = fid.variables['y']
101        z = fid.variables['elevation']
102
103        volumes = fid.variables['volumes']
104        time = fid.variables['time']
105
106        # 2D
107        stage = fid.variables['stage']
108
109        X = x[:]
110        Y = y[:]
111        Z = z[:]
112        V = volumes[:]
113        T = time[:]
114        S = stage[:,:]
115
116        if False:
117            print "****************************"
118            print "X ",X
119            print "****************************"
120            print "Y ",Y
121            print "****************************"
122            print "Z ",Z
123            print "****************************"
124            print "V ",V
125            print "****************************"
126            print "Time ",T
127            print "****************************"
128            print "Stage ",S
129            print "****************************"
130
131
132
133
134        fid.close()
135
136        #Cleanup
137        os.remove(sww.filename)
138
139    def test_interpolate_sww(self):
140        """Not reaa unit test, rather a system test for
141        """
142
143        import time, os
144        from Numeric import array, zeros, allclose, Float, concatenate, \
145             transpose
146        from Scientific.IO.NetCDF import NetCDFFile
147        import tempfile
148        from load_mesh.loadASCII import  import_points_file, \
149             concatinate_attributelist
150
151        self.domain.filename = 'datatest' + str(time.time())
152        self.domain.format = 'sww'
153        self.domain.smooth = True
154        self.domain.reduction = mean
155
156        sww = get_dataobject(self.domain)
157        sww.store_connectivity()
158        sww.store_timestep('stage')
159        self.domain.time = 2.
160        sww.store_timestep('stage')
161
162        #print "self.domain.filename",self.domain.filename
163        interp = Interpolate_sww(sww.filename, 'depth')
164
165        assert allclose(interp.time,[0.0,2.0])
166
167        # create an .xya file
168        point_file = tempfile.mktemp(".xya")
169        fd = open(point_file,'w')
170        fd.write("colour, thickness \n 0.0, 0.6,2.,4 \n 0.0, 0.9,4,8 \n 0.0,0.1,4.,8 \n 0.4,1.0,4.,8 \n")
171        fd.close()
172
173        interp.interpolate_xya(point_file)
174
175        answer = {}
176        answer['0.0'] =  [ 0.08, 0.02, 0.14,0.08]
177        answer['2.0'] =  [ 0.08, 0.02, 0.14,0.08]
178       
179        #print "answer",answer
180        #print "interp.interpolated_quantity",interp.interpolated_quantity
181        assert allclose(interp.interpolated_quantity['0.0'],answer['0.0'])
182        assert allclose(interp.interpolated_quantity['2.0'],answer['2.0'])
183
184        # create an output .xya file
185        point_file_out = tempfile.mktemp(".xya")
186        interp.write_depth_xya(point_file_out)
187
188        #check the output file
189        xya_dict = import_points_file(point_file_out)
190        assert allclose(xya_dict['attributelist']['0.0'],answer['0.0'])
191        assert allclose(xya_dict['attributelist']['2.0'],answer['2.0'])
192        assert allclose(xya_dict['pointlist'],[[ 0.0, 0.6],[0.0, 0.9],[ 0.0,0.1],[ 0.4,1.0]])
193
194        # Try another quantity
195        interp = Interpolate_sww(sww.filename, 'stage')
196        interp.interpolate_xya(point_file)
197
198        answer['0.0'] =  [ 0.08, 0.02, 0.14,-0.32]
199        answer['2.0'] =  [ 0.08, 0.02, 0.14,-0.32]       
200        #print "answer",answer
201        #print "interp.interpolated_quantity",interp.interpolated_quantity
202        assert allclose(interp.interpolated_quantity['0.0'],answer['0.0'])
203        assert allclose(interp.interpolated_quantity['2.0'],answer['2.0'])
204
205        # look at error catching
206        try:
207            interp = Interpolate_sww(sww.filename, 'funky!')
208        except KeyError:
209            pass
210        else:
211            self.failUnless(0==1,
212                        'bad key did not raise an error!')
213
214        # look at error catching
215        try:
216            interp = Interpolate_sww(sww.filename, 'elevation')
217        except KeyError:
218            pass
219        else:
220            self.failUnless(0==1,
221                        'bad key did not raise an error!')
222
223        #Cleanup
224        os.remove(sww.filename)
225        os.remove(point_file_out)
226        os.remove(point_file)
227
228    def test_Interpolate_sww_errors(self):
229        import tempfile
230        import os
231        try:
232            interpolate_sww2xya('??ffd??', 'stage','yeah','yeah.x',
233                                display_errors = False)
234        except SystemExit:  pass
235        else:
236            self.failUnless(0 ==1,  'Bad file did not raise error!') 
237       
238    def DISABLED_TEST_test_Interpolate_sww_errorsII(self):
239        """
240        THIS TEST HAS BEEN DISABLED, SINCE IT PRINTS TO SCREEN,
241        but is should still work.  test it sometimes!
242        """
243        import tempfile
244        import os
245        import sys
246       
247        sww_file = tempfile.mktemp(".sww")
248        fd = open(sww_file,'w')
249        fd.write("unit testing a bad .sww file \n")
250        fd.close()
251       
252        try:
253            interpolate_sww2xya(sww_file, 'stage','yeah','yeah.x',
254                                display_errors = False)
255           
256        except SystemExit:  pass
257        else:
258            self.failUnless(0 ==1,  'Bad file did not raise error!')       
259        #clean up
260        os.remove(sww_file)
261       
262    def test_Interpolate_sww_errorsIII(self):
263        import time, os
264        from Numeric import array, zeros, allclose, Float, concatenate, \
265             transpose
266        from Scientific.IO.NetCDF import NetCDFFile
267        from load_mesh.loadASCII import  import_points_file, \
268             concatinate_attributelist
269
270        self.domain.filename = 'datatest' + str(time.time())
271        self.domain.format = 'sww'
272        self.domain.smooth = True
273        self.domain.reduction = mean
274
275        sww = get_dataobject(self.domain)
276        sww.store_connectivity()
277        sww.store_timestep('stage')
278        self.domain.time = 2.
279        sww.store_timestep('stage')
280       
281        try:
282            interpolate_sww2xya(self.domain.filename,
283                                'stage','yeah','yeah.x',
284                                display_errors = False)
285        except SystemExit:  pass
286        else:
287            self.failUnless(0 ==1,  'Bad file did not raise error!')       
288        #clean up
289        os.remove(sww.filename)
290#-------------------------------------------------------------
291if __name__ == "__main__":
292    suite = unittest.makeSuite(Test_Interpolate_sww,'test')
293    runner = unittest.TextTestRunner(verbosity=1)
294    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.