source: inundation/ga/storm_surge/pyvolution/test_interpolate_sww.py @ 1102

Last change on this file since 1102 was 1018, checked in by steve, 20 years ago

Cleaned up test function

File size: 6.9 KB
Line 
1#!/usr/bin/env python
2#
3"""
4Testing interpolate_sww, based on test_data_manageer, so there maybe code
5that isn't needed, eg in the setup file
6"""
7
8import unittest
9from Numeric import zeros, array, allclose, Float
10from util import mean
11
12from interpolate_sww import *
13from shallow_water import *
14from data_manager import *
15#from config import epsilon
16
17
18class Test_Interpolate_sww(unittest.TestCase):
19    def setUp(self):
20        import time
21        from mesh_factory import rectangular
22
23
24        #Create basic mesh
25        points, vertices, boundary = rectangular(2, 2)
26
27        #Create shallow water domain
28        domain = Domain(points, vertices, boundary)
29        domain.default_order=2
30        domain.beta_h = 0
31
32
33        #Set some field values
34        domain.set_quantity('elevation', lambda x,y: -x)
35        domain.set_quantity('friction', 0.03)
36
37
38        ######################
39        # Boundary conditions
40        B = Transmissive_boundary(domain)
41        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
42
43
44        ######################
45        #Initial condition - with jumps
46
47        bed = domain.quantities['elevation'].vertex_values
48        stage = zeros(bed.shape, Float)
49
50        h = 0.3
51        for i in range(stage.shape[0]):
52            if i % 2 == 0:
53                stage[i,:] = bed[i,:] + h
54            else:
55                stage[i,:] = bed[i,:]
56
57        domain.set_quantity('stage', stage)
58
59        domain.distribute_to_vertices_and_edges()
60
61
62        self.domain = domain
63
64        C = domain.get_vertex_coordinates()
65        self.X = C[:,0:6:2].copy()
66        self.Y = C[:,1:6:2].copy()
67
68        self.F = bed
69
70
71    def tearDown(self):
72        pass
73
74    def test_sww_DSG(self):
75        """Not a test, rather a look at the sww format
76        """
77
78        import time, os
79        from Numeric import array, zeros, allclose, Float, concatenate
80        from Scientific.IO.NetCDF import NetCDFFile
81
82        self.domain.filename = 'datatest' + str(time.time())
83        self.domain.format = 'sww'
84        self.domain.smooth = True
85        self.domain.reduction = mean
86
87        sww = get_dataobject(self.domain)
88        sww.store_connectivity()
89        sww.store_timestep('stage')
90        self.domain.time = 2.
91        sww.store_timestep('stage')
92
93        #Check contents
94        #Get NetCDF
95        fid = NetCDFFile(sww.filename, 'r')
96
97        # Get the variables
98        x = fid.variables['x']
99        y = fid.variables['y']
100        z = fid.variables['elevation']
101
102        volumes = fid.variables['volumes']
103        time = fid.variables['time']
104
105        # 2D
106        stage = fid.variables['stage']
107
108        X = x[:]
109        Y = y[:]
110        Z = z[:]
111        V = volumes[:]
112        T = time[:]
113        S = stage[:,:]
114
115        if False:
116            print "****************************"
117            print "X ",X
118            print "****************************"
119            print "Y ",Y
120            print "****************************"
121            print "Z ",Z
122            print "****************************"
123            print "V ",V
124            print "****************************"
125            print "Time ",T
126            print "****************************"
127            print "Stage ",S
128            print "****************************"
129
130
131
132
133        fid.close()
134
135        #Cleanup
136        os.remove(sww.filename)
137
138    def test_interpolate_sww(self):
139        """Not reaa unit test, rather a system test for
140        """
141
142        import time, os
143        from Numeric import array, zeros, allclose, Float, concatenate, \
144             transpose
145        from Scientific.IO.NetCDF import NetCDFFile
146        import tempfile
147        from load_mesh.loadASCII import  load_points_file, \
148             concatinate_attributelist
149
150        self.domain.filename = 'datatest' + str(time.time())
151        self.domain.format = 'sww'
152        self.domain.smooth = True
153        self.domain.reduction = mean
154
155        sww = get_dataobject(self.domain)
156        sww.store_connectivity()
157        sww.store_timestep('stage')
158        self.domain.time = 2.
159        sww.store_timestep('stage')
160
161        #print "self.domain.filename",self.domain.filename
162        interp = Interpolate_sww(sww.filename, 'depth')
163
164        assert allclose(interp.time,[0.0,2.0])
165
166        #answer = [ 0.15, 0.1, 0., -0.3, -0.35, -0.4, -0.7, -0.8, -0.850]
167        #print "answer",answer
168        #print interp.stage[0]
169        #stage_t = transpose(interp.stage)
170        #assert allclose(stage_t[0], answer)
171        #assert allclose(stage_t[1],stage_t[0])
172
173        # create an .xya file
174        point_file = tempfile.mktemp(".xya")
175        fd = open(point_file,'w')
176        fd.write("colour, thickness \n 0.0, 0.6,2.,4 \n 0.0, 0.9,4,8 \n 0.0,0.1,4.,8 \n 0.4,1.0,4.,8 \n")
177        fd.close()
178
179        interp.interpolate_xya(point_file)
180
181        answer = [[0.08, 0.08], [0.02, 0.02], [0.14, 0.14], [.08,.08]]
182        #print "answer",answer
183        assert allclose(interp.interpolated_quantity,answer)
184
185        # create an output .xya file
186        point_file_out = tempfile.mktemp(".xya")
187        interp.write_depth_xya(point_file_out)
188
189        #check the output file
190        xya_dict = load_points_file(point_file_out)
191        #print "xya_dict", xya_dict
192        title_list,point_attributes = concatinate_attributelist(xya_dict['attributelist'])
193        assert allclose(interp.point_coordinates, xya_dict['pointlist'])
194        assert allclose(interp.interpolated_quantity,
195                        point_attributes )
196
197        # FIXME -remove
198        #title = xya_dict['title'].split('\n')
199        #print "title",title
200        #string_list = title[0].split(',') # assume a title has only one line
201        #time_list = []
202
203        # Try another quantity
204        interp = Interpolate_sww(sww.filename, 'stage')
205        interp.interpolate_xya(point_file)
206
207        answer = [[0.08, 0.08], [0.02, 0.02], [0.14, 0.14], [-.32,-.32]]
208        #print "answer",answer
209        assert allclose(interp.interpolated_quantity,answer)
210
211
212        # look at error catching
213        try:
214            interp = Interpolate_sww(sww.filename, 'funky!')
215        except KeyError:
216            pass
217        else:
218            self.failUnless(0==1,
219                        'bad key did not raise an error!')
220
221        # look at error catching
222        try:
223            interp = Interpolate_sww(sww.filename, 'elevation')
224        except KeyError:
225            pass
226        else:
227            self.failUnless(0==1,
228                        'bad key did not raise an error!')
229
230        #Cleanup
231        os.remove(sww.filename)
232        os.remove(point_file_out)
233        os.remove(point_file)
234
235#-------------------------------------------------------------
236if __name__ == "__main__":
237    suite = unittest.makeSuite(Test_Interpolate_sww,'test')
238    runner = unittest.TextTestRunner()
239    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.