source: inundation/ga/storm_surge/pyvolution/test_interpolate_sww.py @ 631

Last change on this file since 631 was 631, checked in by duncan, 19 years ago

user can choose the quantity to interpolate

File size: 6.7 KB
Line 
1#!/usr/bin/env python
2#
3"""
4Testing interpolate_sww, based on test_data_manageer, so there maybe code
5that isn't needed, eg in the setup file
6"""
7
8import unittest
9from Numeric import zeros, array, allclose, Float
10from util import mean
11
12from interpolate_sww import *
13from shallow_water import *
14from data_manager import *
15#from config import epsilon
16
17
18class dataTestCase(unittest.TestCase):
19    def setUp(self):
20        import time
21        from mesh_factory import rectangular
22
23
24        #Create basic mesh
25        points, vertices, boundary = rectangular(2, 2)
26
27        #Create shallow water domain
28        domain = Domain(points, vertices, boundary)
29        domain.default_order=2
30
31       
32        #Set some field values
33        domain.set_quantity('elevation', lambda x,y: -x)       
34        domain.set_quantity('friction', 0.03)
35
36
37        ######################
38        # Boundary conditions
39        B = Transmissive_boundary(domain)
40        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
41       
42
43        ######################
44        #Initial condition - with jumps
45
46
47        bed = domain.quantities['elevation'].vertex_values
48        level = zeros(bed.shape, Float)
49
50        h = 0.3
51        for i in range(level.shape[0]):
52            if i % 2 == 0:           
53                level[i,:] = bed[i,:] + h
54            else:
55                level[i,:] = bed[i,:]
56               
57        domain.set_quantity('level', level)
58
59        domain.distribute_to_vertices_and_edges()   
60
61
62        self.domain = domain
63
64        C = domain.get_vertex_coordinates()
65        self.X = C[:,0:6:2].copy()
66        self.Y = C[:,1:6:2].copy()
67       
68        self.F = bed
69
70       
71    def tearDown(self):
72        pass
73
74    def test_sww_DSG(self):
75        """Not a test, rather a look at the sww format
76        """
77
78        import time, os
79        from Numeric import array, zeros, allclose, Float, concatenate
80        from Scientific.IO.NetCDF import NetCDFFile       
81
82        self.domain.filename = 'datatest' + str(time.time())
83        self.domain.format = 'sww'
84        self.domain.smooth = True
85        self.domain.reduction = mean             
86     
87        sww = get_dataobject(self.domain)
88        sww.store_connectivity()
89        sww.store_timestep('level')
90        self.domain.time = 2. 
91        sww.store_timestep('level')
92
93        #Check contents
94        #Get NetCDF
95        fid = NetCDFFile(sww.filename, 'r') 
96     
97        # Get the variables
98        x = fid.variables['x']
99        y = fid.variables['y']
100        z = fid.variables['z']
101
102        volumes = fid.variables['volumes']   
103        time = fid.variables['time']
104
105        # 2D
106        stage = fid.variables['stage']       
107
108        X = x[:]
109        Y = y[:]
110        Z = z[:]
111        V = volumes[:]
112        T = time[:]
113        S = stage[:,:]
114
115        if False:
116            print "****************************"
117            print "X ",X
118            print "****************************"
119            print "Y ",Y
120            print "****************************"
121            print "Z ",Z
122            print "****************************"
123            print "V ",V
124            print "****************************"
125            print "Time ",T
126            print "****************************"
127            print "Stage ",S
128            print "****************************"
129           
130
131       
132       
133        fid.close()
134     
135        #Cleanup
136        os.remove(sww.filename)
137 
138    def test_interpolate_sww(self):
139        """Not reaa unit test, rather a system test for
140        """
141
142        import time, os
143        from Numeric import array, zeros, allclose, Float, concatenate, \
144             transpose
145        from Scientific.IO.NetCDF import NetCDFFile   
146        import tempfile
147        from load_mesh.loadASCII import  load_xya_file     
148
149        self.domain.filename = 'datatest' + str(time.time())
150        self.domain.format = 'sww'
151        self.domain.smooth = True
152        self.domain.reduction = mean             
153     
154        sww = get_dataobject(self.domain)
155        sww.store_connectivity()
156        sww.store_timestep('level')
157        self.domain.time = 2. 
158        sww.store_timestep('level')
159
160        #print "self.domain.filename",self.domain.filename
161        interp = Interpolate_sww(sww.filename, 'height')
162       
163        assert allclose(interp.time,[0.0,2.0])
164
165        #answer = [ 0.15, 0.1, 0., -0.3, -0.35, -0.4, -0.7, -0.8, -0.850]
166        #print "answer",answer
167        #print interp.stage[0]
168        #stage_t = transpose(interp.stage)
169        #assert allclose(stage_t[0], answer)
170        #assert allclose(stage_t[1],stage_t[0])
171
172        # create an .xya file
173        point_file = tempfile.mktemp(".xya")
174        fd = open(point_file,'w')
175        fd.write("# demo \n 0.0, 0.6,2.,4 \n 0.0, 0.9,4,8 \n 0.0,0.1,4.,8 \n 0.4,1.0,4.,8 \n")
176        fd.close()
177
178        interp.interpolate_xya(point_file)
179
180        answer = [[0.08, 0.08], [0.02, 0.02], [0.14, 0.14], [.08,.08]]
181        #print "answer",answer
182        assert allclose(interp.interpolated_quantity,answer)
183
184        # create an output .xya file
185        point_file_out = tempfile.mktemp(".xya")
186        interp.write_depth_xya(point_file_out)
187       
188        #check the output file
189        xya_dict = load_xya_file(point_file_out)
190
191        assert allclose(interp.point_coordinates, xya_dict['pointlist'])
192        assert allclose(interp.interpolated_quantity,
193                        xya_dict['pointattributelist'] )
194       
195        title = xya_dict['title'].split('\n')
196        #print "title",title
197        string_list = title[0].split(',') # assume a title has only one line
198        time_list = []
199
200        # Try another quantity     
201        interp = Interpolate_sww(sww.filename, 'stage')
202        interp.interpolate_xya(point_file)
203
204        answer = [[0.08, 0.08], [0.02, 0.02], [0.14, 0.14], [-.32,-.32]]
205        #print "answer",answer
206        assert allclose(interp.interpolated_quantity,answer)
207
208       
209        # look at error catching
210        try:
211            interp = Interpolate_sww(sww.filename, 'funky!')
212        except KeyError:
213            pass
214        else:
215            self.failUnless(0==1,
216                        'bad key did not raise an error!')
217       
218        # look at error catching
219        try:
220            interp = Interpolate_sww(sww.filename, 'z')
221        except KeyError:
222            pass
223        else:
224            self.failUnless(0==1,
225                        'bad key did not raise an error!')
226               
227        #Cleanup
228        os.remove(sww.filename)
229        os.remove(point_file_out)
230        os.remove(point_file)
231 
232#-------------------------------------------------------------
233if __name__ == "__main__":
234    suite = unittest.makeSuite(dataTestCase,'test')
235    runner = unittest.TextTestRunner()
236    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.