source: inundation/pyvolution/test_interpolate_sww.py @ 1794

Last change on this file since 1794 was 1396, checked in by duncan, 19 years ago

load_mesh refactoring

File size: 6.8 KB
Line 
1#!/usr/bin/env python
2#
3"""
4Testing interpolate_sww, based on test_data_manageer, so there maybe code
5that isn't needed, eg in the setup file
6"""
7
8import unittest
9from Numeric import zeros, array, allclose, Float
10from util import mean
11
12from interpolate_sww import *
13from shallow_water import *
14from data_manager import *
15#from config import epsilon
16
17
18class Test_Interpolate_sww(unittest.TestCase):
19    def setUp(self):
20
21        import time
22        from mesh_factory import rectangular
23
24
25        #Create basic mesh
26        points, vertices, boundary = rectangular(2, 2)
27
28        #Create shallow water domain
29        domain = Domain(points, vertices, boundary)
30        domain.default_order=2
31        domain.beta_h = 0
32
33
34        #Set some field values
35        domain.set_quantity('elevation', lambda x,y: -x)
36        domain.set_quantity('friction', 0.03)
37
38
39        ######################
40        # Boundary conditions
41        B = Transmissive_boundary(domain)
42        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
43
44
45        ######################
46        #Initial condition - with jumps
47
48        bed = domain.quantities['elevation'].vertex_values
49        stage = zeros(bed.shape, Float)
50
51        h = 0.3
52        for i in range(stage.shape[0]):
53            if i % 2 == 0:
54                stage[i,:] = bed[i,:] + h
55            else:
56                stage[i,:] = bed[i,:]
57
58        domain.set_quantity('stage', stage)
59
60        domain.distribute_to_vertices_and_edges()
61
62
63        self.domain = domain
64
65        C = domain.get_vertex_coordinates()
66        self.X = C[:,0:6:2].copy()
67        self.Y = C[:,1:6:2].copy()
68
69        self.F = bed
70
71
72    def tearDown(self):
73        pass
74
75    def test_sww_DSG(self):
76        """Not a test, rather a look at the sww format
77        """
78
79        import time, os
80        from Numeric import array, zeros, allclose, Float, concatenate
81        from Scientific.IO.NetCDF import NetCDFFile
82
83        self.domain.filename = 'datatest' + str(time.time())
84        self.domain.format = 'sww'
85        self.domain.smooth = True
86        self.domain.reduction = mean
87
88        sww = get_dataobject(self.domain)
89        sww.store_connectivity()
90        sww.store_timestep('stage')
91        self.domain.time = 2.
92        sww.store_timestep('stage')
93
94        #Check contents
95        #Get NetCDF
96        fid = NetCDFFile(sww.filename, 'r')
97
98        # Get the variables
99        x = fid.variables['x']
100        y = fid.variables['y']
101        z = fid.variables['elevation']
102
103        volumes = fid.variables['volumes']
104        time = fid.variables['time']
105
106        # 2D
107        stage = fid.variables['stage']
108
109        X = x[:]
110        Y = y[:]
111        Z = z[:]
112        V = volumes[:]
113        T = time[:]
114        S = stage[:,:]
115
116        if False:
117            print "****************************"
118            print "X ",X
119            print "****************************"
120            print "Y ",Y
121            print "****************************"
122            print "Z ",Z
123            print "****************************"
124            print "V ",V
125            print "****************************"
126            print "Time ",T
127            print "****************************"
128            print "Stage ",S
129            print "****************************"
130
131
132
133
134        fid.close()
135
136        #Cleanup
137        os.remove(sww.filename)
138
139    def test_interpolate_sww(self):
140        """Not reaa unit test, rather a system test for
141        """
142
143        import time, os
144        from Numeric import array, zeros, allclose, Float, concatenate, \
145             transpose
146        from Scientific.IO.NetCDF import NetCDFFile
147        import tempfile
148        from load_mesh.loadASCII import  import_points_file, \
149             concatinate_attributelist
150
151        self.domain.filename = 'datatest' + str(time.time())
152        self.domain.format = 'sww'
153        self.domain.smooth = True
154        self.domain.reduction = mean
155
156        sww = get_dataobject(self.domain)
157        sww.store_connectivity()
158        sww.store_timestep('stage')
159        self.domain.time = 2.
160        sww.store_timestep('stage')
161
162        #print "self.domain.filename",self.domain.filename
163        interp = Interpolate_sww(sww.filename, 'depth')
164
165        assert allclose(interp.time,[0.0,2.0])
166
167        # create an .xya file
168        point_file = tempfile.mktemp(".xya")
169        fd = open(point_file,'w')
170        fd.write("colour, thickness \n 0.0, 0.6,2.,4 \n 0.0, 0.9,4,8 \n 0.0,0.1,4.,8 \n 0.4,1.0,4.,8 \n")
171        fd.close()
172
173        interp.interpolate_xya(point_file)
174
175        answer = {}
176        answer['0.0'] =  [ 0.08, 0.02, 0.14,0.08]
177        answer['2.0'] =  [ 0.08, 0.02, 0.14,0.08]
178       
179        #print "answer",answer
180        #print "interp.interpolated_quantity",interp.interpolated_quantity
181        assert allclose(interp.interpolated_quantity['0.0'],answer['0.0'])
182        assert allclose(interp.interpolated_quantity['2.0'],answer['2.0'])
183
184        # create an output .xya file
185        point_file_out = tempfile.mktemp(".xya")
186        interp.write_depth_xya(point_file_out)
187
188        #check the output file
189        xya_dict = import_points_file(point_file_out)
190        assert allclose(xya_dict['attributelist']['0.0'],answer['0.0'])
191        assert allclose(xya_dict['attributelist']['2.0'],answer['2.0'])
192        assert allclose(xya_dict['pointlist'],[[ 0.0, 0.6],[0.0, 0.9],[ 0.0,0.1],[ 0.4,1.0]])
193
194        # Try another quantity
195        interp = Interpolate_sww(sww.filename, 'stage')
196        interp.interpolate_xya(point_file)
197
198        answer['0.0'] =  [ 0.08, 0.02, 0.14,-0.32]
199        answer['2.0'] =  [ 0.08, 0.02, 0.14,-0.32]       
200        #print "answer",answer
201        #print "interp.interpolated_quantity",interp.interpolated_quantity
202        assert allclose(interp.interpolated_quantity['0.0'],answer['0.0'])
203        assert allclose(interp.interpolated_quantity['2.0'],answer['2.0'])
204
205        # look at error catching
206        try:
207            interp = Interpolate_sww(sww.filename, 'funky!')
208        except KeyError:
209            pass
210        else:
211            self.failUnless(0==1,
212                        'bad key did not raise an error!')
213
214        # look at error catching
215        try:
216            interp = Interpolate_sww(sww.filename, 'elevation')
217        except KeyError:
218            pass
219        else:
220            self.failUnless(0==1,
221                        'bad key did not raise an error!')
222
223        #Cleanup
224        os.remove(sww.filename)
225        os.remove(point_file_out)
226        os.remove(point_file)
227
228#-------------------------------------------------------------
229if __name__ == "__main__":
230    suite = unittest.makeSuite(Test_Interpolate_sww,'test')
231    runner = unittest.TextTestRunner()
232    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.