source: anuga_core/source/anuga/shallow_water/test_interpolate_sww.py @ 3846

Last change on this file since 3846 was 3846, checked in by ole, 17 years ago

Refactored references to domain.filename away.
Use

domain.set_name()
domain.get_name()

instead.

File size: 9.0 KB
Line 
1#!/usr/bin/env python
2#
3"""
4Testing interpolate_sww, based on test_data_manageer, so there maybe code
5that isn't needed, eg in the setup file
6"""
7
8import unittest
9from Numeric import zeros, array, allclose, Float
10from anuga.utilities.numerical_tools import mean
11
12from interpolate_sww import *
13from anuga.shallow_water import Domain, Transmissive_boundary
14from anuga.shallow_water.data_manager import get_dataobject
15
16
17class Test_Interpolate_sww(unittest.TestCase):
18    def setUp(self):
19
20        import time
21        from mesh_factory import rectangular
22
23
24        #Create basic mesh
25        points, vertices, boundary = rectangular(2, 2)
26
27        #Create shallow water domain
28        domain = Domain(points, vertices, boundary)
29        domain.default_order=2
30        domain.beta_h = 0
31
32
33        #Set some field values
34        domain.set_quantity('elevation', lambda x,y: -x)
35        domain.set_quantity('friction', 0.03)
36
37
38        ######################
39        # Boundary conditions
40        B = Transmissive_boundary(domain)
41        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
42
43
44        ######################
45        #Initial condition - with jumps
46
47        bed = domain.quantities['elevation'].vertex_values
48        stage = zeros(bed.shape, Float)
49
50        h = 0.3
51        for i in range(stage.shape[0]):
52            if i % 2 == 0:
53                stage[i,:] = bed[i,:] + h
54            else:
55                stage[i,:] = bed[i,:]
56
57        domain.set_quantity('stage', stage)
58
59        domain.distribute_to_vertices_and_edges()
60
61
62        self.domain = domain
63
64        C = domain.get_vertex_coordinates()
65        self.X = C[:,0:6:2].copy()
66        self.Y = C[:,1:6:2].copy()
67
68        self.F = bed
69
70
71    def tearDown(self):
72        pass
73
74    def test_sww_DSG(self):
75        """Not a test, rather a look at the sww format
76        """
77
78        import time, os
79        from Numeric import array, zeros, allclose, Float, concatenate
80        from Scientific.IO.NetCDF import NetCDFFile
81
82        #FIXME (Ole): This test still passes if commented out
83        #self.domain.set_name('datatest' + str(time.time()))
84       
85       
86        self.domain.format = 'sww'
87        self.domain.smooth = True
88        self.domain.reduction = mean
89
90        sww = get_dataobject(self.domain)
91        sww.store_connectivity()
92        sww.store_timestep('stage')
93        self.domain.time = 2.
94        sww.store_timestep('stage')
95
96        #Check contents
97        #Get NetCDF
98        fid = NetCDFFile(sww.filename, 'r')
99
100        # Get the variables
101        x = fid.variables['x']
102        y = fid.variables['y']
103        z = fid.variables['elevation']
104
105        volumes = fid.variables['volumes']
106        time = fid.variables['time']
107
108        # 2D
109        stage = fid.variables['stage']
110
111        X = x[:]
112        Y = y[:]
113        Z = z[:]
114        V = volumes[:]
115        T = time[:]
116        S = stage[:,:]
117
118        if False:
119            print "****************************"
120            print "X ",X
121            print "****************************"
122            print "Y ",Y
123            print "****************************"
124            print "Z ",Z
125            print "****************************"
126            print "V ",V
127            print "****************************"
128            print "Time ",T
129            print "****************************"
130            print "Stage ",S
131            print "****************************"
132
133
134
135
136        fid.close()
137
138        #Cleanup
139        os.remove(sww.filename)
140
141    def test_interpolate_sww(self):
142        ### Not really a unit test, rather a system test for
143        ### 
144
145        import time, os
146        from Numeric import array, zeros, allclose, Float, concatenate, \
147             transpose
148        from Scientific.IO.NetCDF import NetCDFFile
149        import tempfile
150        from load_mesh.loadASCII import  import_points_file, \
151             concatinate_attributelist
152
153        self.domain.set_name('datatest' + str(time.time()))
154        self.domain.format = 'sww'
155        self.domain.smooth = True
156        self.domain.reduction = mean
157
158        sww = get_dataobject(self.domain)
159        sww.store_connectivity()
160        sww.store_timestep('stage')
161        self.domain.time = 2.
162        sww.store_timestep('stage')
163
164        interp = Interpolate_sww(sww.filename, 'depth')
165
166        assert allclose(interp.time,[0.0,2.0])
167
168        # create an .xya file
169        point_file = tempfile.mktemp(".xya")
170        fd = open(point_file,'w')
171        fd.write("colour, thickness \n 0.0, 0.6,2.,4 \n 0.0, 0.9,4,8 \n 0.0,0.1,4.,8 \n 0.4,1.0,4.,8 \n")
172        fd.close()
173
174        interp.interpolate_xya(point_file)
175
176        answer = {}
177        answer['0.0'] =  [ 0.08, 0.02, 0.14,0.08]
178        answer['2.0'] =  [ 0.08, 0.02, 0.14,0.08]
179       
180        #print "answer",answer
181        #print "interp.interpolated_quantity",interp.interpolated_quantity
182        assert allclose(interp.interpolated_quantity['0.0'],answer['0.0'])
183        assert allclose(interp.interpolated_quantity['2.0'],answer['2.0'])
184
185        # create an output .xya file
186        point_file_out = tempfile.mktemp(".xya")
187        interp.write_depth_xya(point_file_out)
188
189        #check the output file
190        xya_dict = import_points_file(point_file_out)
191        assert allclose(xya_dict['attributelist']['0.0'],answer['0.0'])
192        assert allclose(xya_dict['attributelist']['2.0'],answer['2.0'])
193        assert allclose(xya_dict['pointlist'],[[ 0.0, 0.6],[0.0, 0.9],[ 0.0,0.1],[ 0.4,1.0]])
194
195        # Try another quantity
196        interp = Interpolate_sww(sww.filename, 'stage')
197        interp.interpolate_xya(point_file)
198
199        answer['0.0'] =  [ 0.08, 0.02, 0.14,-0.32]
200        answer['2.0'] =  [ 0.08, 0.02, 0.14,-0.32]       
201        #print "answer",answer
202        #print "interp.interpolated_quantity",interp.interpolated_quantity
203        assert allclose(interp.interpolated_quantity['0.0'],answer['0.0'])
204        assert allclose(interp.interpolated_quantity['2.0'],answer['2.0'])
205
206        # look at error catching
207        try:
208            interp = Interpolate_sww(sww.filename, 'funky!')
209        except KeyError:
210            pass
211        else:
212            self.failUnless(0==1,
213                        'bad key did not raise an error!')
214
215        # look at error catching
216        try:
217            interp = Interpolate_sww(sww.filename, 'elevation')
218        except KeyError:
219            pass
220        else:
221            self.failUnless(0==1,
222                        'bad key did not raise an error!')
223
224        #Cleanup
225        os.remove(sww.filename)
226        os.remove(point_file_out)
227        os.remove(point_file)
228
229    def test_Interpolate_sww_errors(self):
230        import tempfile
231        import os
232        try:
233            interpolate_sww2xya('??ffd??', 'stage','yeah','yeah.x',
234                                verbose = False)
235        except SystemExit:  pass
236        else:
237            self.failUnless(0 ==1,  'Bad file did not raise error!') 
238       
239    def DISABLED_TEST_test_Interpolate_sww_errorsII(self):
240        """
241        THIS TEST HAS BEEN DISABLED, SINCE IT PRINTS TO SCREEN,
242        but is should still work.  test it sometimes!
243        """
244        import tempfile
245        import os
246        import sys
247       
248        sww_file = tempfile.mktemp(".sww")
249        fd = open(sww_file,'w')
250        fd.write("unit testing a bad .sww file \n")
251        fd.close()
252       
253        try:
254            interpolate_sww2xya(sww_file, 'stage','yeah','yeah.x',
255                                verbose = False)
256           
257        except SystemExit:  pass
258        else:
259            self.failUnless(0 ==1,  'Bad file did not raise error!')       
260        #clean up
261        os.remove(sww_file)
262       
263    def test_Interpolate_sww_errorsIII(self):
264        import time, os
265        from Numeric import array, zeros, allclose, Float, concatenate, \
266             transpose
267        from Scientific.IO.NetCDF import NetCDFFile
268        from load_mesh.loadASCII import  import_points_file, \
269             concatinate_attributelist
270
271        self.domain.set_name('datatest' + str(time.time()))
272        self.domain.format = 'sww'
273        self.domain.smooth = True
274        self.domain.reduction = mean
275
276        sww = get_dataobject(self.domain)
277        sww.store_connectivity()
278        sww.store_timestep('stage')
279        self.domain.time = 2.
280        sww.store_timestep('stage')
281       
282        try:
283            interpolate_sww2xya(self.domain.get_name(),
284                                'stage','yeah','yeah.x',
285                                verbose = False)
286        except SystemExit:  pass
287        else:
288            self.failUnless(0 ==1,  'Bad file did not raise error!')       
289        #clean up
290        os.remove(sww.filename)
291#-------------------------------------------------------------
292if __name__ == "__main__":
293    suite = unittest.makeSuite(Test_Interpolate_sww,'test')
294    runner = unittest.TextTestRunner(verbosity=1)
295    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.