source: inundation/pyvolution/test_interpolate_sww.py @ 3298

Last change on this file since 3298 was 3298, checked in by duncan, 18 years ago

minor changes

File size: 9.0 KB
Line 
1#!/usr/bin/env python
2#
3"""
4Testing interpolate_sww, based on test_data_manageer, so there maybe code
5that isn't needed, eg in the setup file
6"""
7
8import unittest
9from Numeric import zeros, array, allclose, Float
10from utilities.numerical_tools import mean
11
12from interpolate_sww import *
13from pyvolution.shallow_water import Domain, Transmissive_boundary
14from pyvolution.data_manager import get_dataobject
15
16
17class Test_Interpolate_sww(unittest.TestCase):
18    def setUp(self):
19
20        import time
21        from mesh_factory import rectangular
22
23
24        #Create basic mesh
25        points, vertices, boundary = rectangular(2, 2)
26
27        #Create shallow water domain
28        domain = Domain(points, vertices, boundary)
29        domain.default_order=2
30        domain.beta_h = 0
31
32
33        #Set some field values
34        domain.set_quantity('elevation', lambda x,y: -x)
35        domain.set_quantity('friction', 0.03)
36
37
38        ######################
39        # Boundary conditions
40        B = Transmissive_boundary(domain)
41        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
42
43
44        ######################
45        #Initial condition - with jumps
46
47        bed = domain.quantities['elevation'].vertex_values
48        stage = zeros(bed.shape, Float)
49
50        h = 0.3
51        for i in range(stage.shape[0]):
52            if i % 2 == 0:
53                stage[i,:] = bed[i,:] + h
54            else:
55                stage[i,:] = bed[i,:]
56
57        domain.set_quantity('stage', stage)
58
59        domain.distribute_to_vertices_and_edges()
60
61
62        self.domain = domain
63
64        C = domain.get_vertex_coordinates()
65        self.X = C[:,0:6:2].copy()
66        self.Y = C[:,1:6:2].copy()
67
68        self.F = bed
69
70
71    def tearDown(self):
72        pass
73
74    def test_sww_DSG(self):
75        """Not a test, rather a look at the sww format
76        """
77
78        import time, os
79        from Numeric import array, zeros, allclose, Float, concatenate
80        from Scientific.IO.NetCDF import NetCDFFile
81
82        self.domain.filename = 'datatest' + str(time.time())
83        self.domain.format = 'sww'
84        self.domain.smooth = True
85        self.domain.reduction = mean
86
87        sww = get_dataobject(self.domain)
88        sww.store_connectivity()
89        sww.store_timestep('stage')
90        self.domain.time = 2.
91        sww.store_timestep('stage')
92
93        #Check contents
94        #Get NetCDF
95        fid = NetCDFFile(sww.filename, 'r')
96
97        # Get the variables
98        x = fid.variables['x']
99        y = fid.variables['y']
100        z = fid.variables['elevation']
101
102        volumes = fid.variables['volumes']
103        time = fid.variables['time']
104
105        # 2D
106        stage = fid.variables['stage']
107
108        X = x[:]
109        Y = y[:]
110        Z = z[:]
111        V = volumes[:]
112        T = time[:]
113        S = stage[:,:]
114
115        if False:
116            print "****************************"
117            print "X ",X
118            print "****************************"
119            print "Y ",Y
120            print "****************************"
121            print "Z ",Z
122            print "****************************"
123            print "V ",V
124            print "****************************"
125            print "Time ",T
126            print "****************************"
127            print "Stage ",S
128            print "****************************"
129
130
131
132
133        fid.close()
134
135        #Cleanup
136        os.remove(sww.filename)
137
138    def test_interpolate_sww(self):
139        ### Not really a unit test, rather a system test for
140        ### 
141
142        import time, os
143        from Numeric import array, zeros, allclose, Float, concatenate, \
144             transpose
145        from Scientific.IO.NetCDF import NetCDFFile
146        import tempfile
147        from load_mesh.loadASCII import  import_points_file, \
148             concatinate_attributelist
149
150        self.domain.filename = 'datatest' + str(time.time())
151        self.domain.format = 'sww'
152        self.domain.smooth = True
153        self.domain.reduction = mean
154
155        sww = get_dataobject(self.domain)
156        sww.store_connectivity()
157        sww.store_timestep('stage')
158        self.domain.time = 2.
159        sww.store_timestep('stage')
160
161        #print "self.domain.filename",self.domain.filename
162        interp = Interpolate_sww(sww.filename, 'depth')
163
164        assert allclose(interp.time,[0.0,2.0])
165
166        # create an .xya file
167        point_file = tempfile.mktemp(".xya")
168        fd = open(point_file,'w')
169        fd.write("colour, thickness \n 0.0, 0.6,2.,4 \n 0.0, 0.9,4,8 \n 0.0,0.1,4.,8 \n 0.4,1.0,4.,8 \n")
170        fd.close()
171
172        interp.interpolate_xya(point_file)
173
174        answer = {}
175        answer['0.0'] =  [ 0.08, 0.02, 0.14,0.08]
176        answer['2.0'] =  [ 0.08, 0.02, 0.14,0.08]
177       
178        #print "answer",answer
179        #print "interp.interpolated_quantity",interp.interpolated_quantity
180        assert allclose(interp.interpolated_quantity['0.0'],answer['0.0'])
181        assert allclose(interp.interpolated_quantity['2.0'],answer['2.0'])
182
183        # create an output .xya file
184        point_file_out = tempfile.mktemp(".xya")
185        interp.write_depth_xya(point_file_out)
186
187        #check the output file
188        xya_dict = import_points_file(point_file_out)
189        assert allclose(xya_dict['attributelist']['0.0'],answer['0.0'])
190        assert allclose(xya_dict['attributelist']['2.0'],answer['2.0'])
191        assert allclose(xya_dict['pointlist'],[[ 0.0, 0.6],[0.0, 0.9],[ 0.0,0.1],[ 0.4,1.0]])
192
193        # Try another quantity
194        interp = Interpolate_sww(sww.filename, 'stage')
195        interp.interpolate_xya(point_file)
196
197        answer['0.0'] =  [ 0.08, 0.02, 0.14,-0.32]
198        answer['2.0'] =  [ 0.08, 0.02, 0.14,-0.32]       
199        #print "answer",answer
200        #print "interp.interpolated_quantity",interp.interpolated_quantity
201        assert allclose(interp.interpolated_quantity['0.0'],answer['0.0'])
202        assert allclose(interp.interpolated_quantity['2.0'],answer['2.0'])
203
204        # look at error catching
205        try:
206            interp = Interpolate_sww(sww.filename, 'funky!')
207        except KeyError:
208            pass
209        else:
210            self.failUnless(0==1,
211                        'bad key did not raise an error!')
212
213        # look at error catching
214        try:
215            interp = Interpolate_sww(sww.filename, 'elevation')
216        except KeyError:
217            pass
218        else:
219            self.failUnless(0==1,
220                        'bad key did not raise an error!')
221
222        #Cleanup
223        os.remove(sww.filename)
224        os.remove(point_file_out)
225        os.remove(point_file)
226
227    def test_Interpolate_sww_errors(self):
228        import tempfile
229        import os
230        try:
231            interpolate_sww2xya('??ffd??', 'stage','yeah','yeah.x',
232                                verbose = False)
233        except SystemExit:  pass
234        else:
235            self.failUnless(0 ==1,  'Bad file did not raise error!') 
236       
237    def DISABLED_TEST_test_Interpolate_sww_errorsII(self):
238        """
239        THIS TEST HAS BEEN DISABLED, SINCE IT PRINTS TO SCREEN,
240        but is should still work.  test it sometimes!
241        """
242        import tempfile
243        import os
244        import sys
245       
246        sww_file = tempfile.mktemp(".sww")
247        fd = open(sww_file,'w')
248        fd.write("unit testing a bad .sww file \n")
249        fd.close()
250       
251        try:
252            interpolate_sww2xya(sww_file, 'stage','yeah','yeah.x',
253                                verbose = False)
254           
255        except SystemExit:  pass
256        else:
257            self.failUnless(0 ==1,  'Bad file did not raise error!')       
258        #clean up
259        os.remove(sww_file)
260       
261    def test_Interpolate_sww_errorsIII(self):
262        import time, os
263        from Numeric import array, zeros, allclose, Float, concatenate, \
264             transpose
265        from Scientific.IO.NetCDF import NetCDFFile
266        from load_mesh.loadASCII import  import_points_file, \
267             concatinate_attributelist
268
269        self.domain.filename = 'datatest' + str(time.time())
270        self.domain.format = 'sww'
271        self.domain.smooth = True
272        self.domain.reduction = mean
273
274        sww = get_dataobject(self.domain)
275        sww.store_connectivity()
276        sww.store_timestep('stage')
277        self.domain.time = 2.
278        sww.store_timestep('stage')
279       
280        try:
281            interpolate_sww2xya(self.domain.filename,
282                                'stage','yeah','yeah.x',
283                                verbose = False)
284        except SystemExit:  pass
285        else:
286            self.failUnless(0 ==1,  'Bad file did not raise error!')       
287        #clean up
288        os.remove(sww.filename)
289#-------------------------------------------------------------
290if __name__ == "__main__":
291    suite = unittest.makeSuite(Test_Interpolate_sww,'test')
292    runner = unittest.TextTestRunner(verbosity=1)
293    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.