source: inundation/ga/storm_surge/pyvolution/test_interpolate_sww.py @ 451

Last change on this file since 451 was 374, checked in by duncan, 20 years ago

minor changes - vert att titles

File size: 6.2 KB
Line 
1#!/usr/bin/env python
2#
3"""
4Testing interpolate_sww, based on test_data_manageer, so there maybe code
5that isn't needed, eg in the setup file
6"""
7
8import unittest
9from Numeric import zeros, array, allclose, Float
10from util import mean
11
12from interpolate_sww import *
13from shallow_water import *
14from data_manager import *
15#from config import epsilon
16
17
18class dataTestCase(unittest.TestCase):
19    def setUp(self):
20        import time
21        from mesh_factory import rectangular
22
23
24        #Create basic mesh
25        points, vertices, boundary = rectangular(2, 2)
26
27        #Create shallow water domain
28        domain = Domain(points, vertices, boundary)
29        domain.default_order=2
30
31       
32        #Set some field values
33        domain.set_quantity('elevation', lambda x,y: -x)       
34        domain.set_quantity('friction', 0.03)
35
36
37        ######################
38        # Boundary conditions
39        B = Transmissive_boundary(domain)
40        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
41       
42
43        ######################
44        #Initial condition - with jumps
45
46
47        bed = domain.quantities['elevation'].vertex_values
48        level = zeros(bed.shape, Float)
49
50        h = 0.3
51        for i in range(level.shape[0]):
52            if i % 2 == 0:           
53                level[i,:] = bed[i,:] + h
54            else:
55                level[i,:] = bed[i,:]
56               
57        domain.set_quantity('level', level)
58
59        domain.distribute_to_vertices_and_edges()   
60
61
62        self.domain = domain
63
64        C = domain.get_vertex_coordinates()
65        self.X = C[:,0:6:2].copy()
66        self.Y = C[:,1:6:2].copy()
67       
68        self.F = bed
69
70       
71    def tearDown(self):
72        pass
73
74    def test_sww_DSG(self):
75        """Not a test, rather a look at the sww format
76        """
77
78        import time, os
79        from Numeric import array, zeros, allclose, Float, concatenate
80        from Scientific.IO.NetCDF import NetCDFFile       
81
82        self.domain.filename = 'datatest' + str(time.time())
83        self.domain.format = 'sww'
84        self.domain.smooth = True
85        self.domain.reduction = mean             
86     
87        sww = get_dataobject(self.domain)
88        sww.store_connectivity()
89        sww.store_timestep('level')
90        self.domain.time = 2. 
91        sww.store_timestep('level')
92
93        #Check contents
94        #Get NetCDF
95        fid = NetCDFFile(sww.filename, 'r') 
96     
97        # Get the variables
98        x = fid.variables['x']
99        y = fid.variables['y']
100        z = fid.variables['z']
101
102        volumes = fid.variables['volumes']   
103        time = fid.variables['time']
104
105        # 2D
106        stage = fid.variables['stage']       
107
108        X = x[:]
109        Y = y[:]
110        Z = z[:]
111        V = volumes[:]
112        T = time[:]
113        S = stage[:,:]
114
115        if False:
116            print "****************************"
117            print "X ",X
118            print "****************************"
119            print "Y ",Y
120            print "****************************"
121            print "Z ",Z
122            print "****************************"
123            print "V ",V
124            print "****************************"
125            print "Time ",T
126            print "****************************"
127            print "Stage ",S
128            print "****************************"
129           
130
131       
132       
133        fid.close()
134     
135        #Cleanup
136        os.remove(sww.filename)
137 
138    def test_interpolate_sww(self):
139        """Not reaa unit test, rather a system test for
140        """
141
142        import time, os
143        from Numeric import array, zeros, allclose, Float, concatenate, \
144             transpose
145        from Scientific.IO.NetCDF import NetCDFFile   
146        import tempfile
147        from load_mesh.loadASCII import  load_xya_file     
148
149        self.domain.filename = 'datatest' + str(time.time())
150        self.domain.format = 'sww'
151        self.domain.smooth = True
152        self.domain.reduction = mean             
153     
154        sww = get_dataobject(self.domain)
155        sww.store_connectivity()
156        sww.store_timestep('level')
157        self.domain.time = 2. 
158        sww.store_timestep('level')
159
160        #print "self.domain.filename",self.domain.filename
161        interp = Interpolate_sww(sww.filename)
162       
163        assert allclose(interp.time,[0.0,2.0])
164        answer = [ 0.15, 0.1, 0., -0.3, -0.35, -0.4, -0.7, -0.8, -0.850]
165        #print "answer",answer
166        #print interp.stage[0]
167        stage_t = transpose(interp.stage)
168        assert allclose(stage_t[0], answer)
169        assert allclose(stage_t[1],stage_t[0])
170
171        # create an .xya file
172        point_file = tempfile.mktemp(".xya")
173        fd = open(point_file,'w')
174        fd.write("# demo \n 0.0, 0.6,2.,4 \n 0.0, 0.9,4,8 \n 0.0,0.1,4.,8 \n 0.4,1.0,4.,8 \n")
175        fd.close()
176
177        interp.interpolate_xya(point_file)
178
179
180        answer = [[0.08, 0.08], [0.02, 0.02], [0.14, 0.14], [.08,.08]]
181        #print "answer",answer
182        assert allclose(interp.point_heights,answer)
183
184        # create an output .xya file
185        point_file_out = tempfile.mktemp(".xya")
186        interp.write_point_heights_xya(point_file_out)
187       
188        #check the output file
189        xya_dict = load_xya_file(point_file_out)
190
191        assert allclose(interp.point_coordinates, xya_dict['pointlist'])
192        assert allclose(interp.point_heights,
193                        xya_dict['pointattributelist'] )
194       
195        title = xya_dict['title'].split('\n')
196        #print "title",title
197        string_list = title[0].split(',') # assume a title has only one line
198        time_list = []
199
200        # this is if titles start with x,y
201        #answer = string_list.pop(0)
202        #self.failUnless( answer == 'x', 'Title is wrong!')
203        #self.failUnless( string_list.pop(0) == 'y', 'Title is wrong!')
204       
205        for time in string_list:
206            time_list.append(float(time))
207        #print "interp.time", interp.time
208        #print "time_list", time_list
209        assert allclose(interp.time,
210                        time_list)
211       
212       
213        #Cleanup
214        os.remove(sww.filename)
215        os.remove(point_file_out)
216 
217#-------------------------------------------------------------
218if __name__ == "__main__":
219    suite = unittest.makeSuite(dataTestCase,'test')
220    runner = unittest.TextTestRunner()
221    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.