source: trunk/anuga_core/source/anuga/file/test_sww.py @ 7871

Last change on this file since 7871 was 7871, checked in by hudson, 14 years ago

Fixed a couple of errors in the unit test.

File size: 16.4 KB
Line 
1import os
2import unittest
3import tempfile
4import numpy as num
5
6from anuga.coordinate_transforms.geo_reference import Geo_reference
7from csv_file import load_csv_as_array, load_csv_as_dict
8from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular
9from anuga.shallow_water.shallow_water_domain import Domain
10from sww import load_sww_as_domain, weed, get_mesh_and_quantities_from_file
11from Scientific.IO.NetCDF import NetCDFFile
12
13from anuga.config import netcdf_mode_w
14
15# boundary functions
16from anuga.shallow_water.boundaries import Reflective_boundary, \
17            Field_boundary, Transmissive_momentum_set_stage_boundary, \
18            Transmissive_stage_zero_momentum_boundary
19from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\
20     import Transmissive_boundary, Dirichlet_boundary, \
21            Time_boundary, File_boundary, AWI_boundary
22
23
24class Test_sww(unittest.TestCase):
25    def setUp(self):
26        self.verbose = False
27        pass
28
29    def tearDown(self):
30        pass
31       
32    def test_sww2domain1(self):
33        ################################################
34        #Create a test domain, and evolve and save it.
35        ################################################
36        from mesh_factory import rectangular
37
38        #Create basic mesh
39
40        yiel=0.01
41        points, vertices, boundary = rectangular(10,10)
42
43        #Create shallow water domain
44        domain = Domain(points, vertices, boundary)
45        domain.geo_reference = Geo_reference(56,11,11)
46        domain.smooth = False
47        domain.store = True
48        domain.set_name('bedslope')
49        domain.default_order=2
50        #Bed-slope and friction
51        domain.set_quantity('elevation', lambda x,y: -x/3)
52        domain.set_quantity('friction', 0.1)
53        # Boundary conditions
54        from math import sin, pi
55        Br = Reflective_boundary(domain)
56        Bt = Transmissive_boundary(domain)
57        Bd = Dirichlet_boundary([0.2,0.,0.])
58        Bw = Time_boundary(domain=domain,f=lambda t: [(0.1*sin(t*2*pi)), 0.0, 0.0])
59
60        #domain.set_boundary({'left': Bd, 'right': Br, 'top': Br, 'bottom': Br})
61        domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd})
62
63        domain.quantities_to_be_stored['xmomentum'] = 2
64        domain.quantities_to_be_stored['ymomentum'] = 2
65        #Initial condition
66        h = 0.05
67        elevation = domain.quantities['elevation'].vertex_values
68        domain.set_quantity('stage', elevation + h)
69
70        domain.check_integrity()
71        #Evolution
72        #domain.tight_slope_limiters = 1
73        for t in domain.evolve(yieldstep = yiel, finaltime = 0.05):
74            #domain.write_time()
75            pass
76
77
78
79        filename = domain.datadir + os.sep + domain.get_name() + '.sww'
80        domain2 = load_sww_as_domain(filename, None, fail_if_NaN=False,
81                                        verbose=self.verbose)
82        #points, vertices, boundary = rectangular(15,15)
83        #domain2.boundary = boundary
84        ###################
85        ##NOW TEST IT!!!
86        ###################
87
88        os.remove(filename)
89
90        bits = ['vertex_coordinates']
91        for quantity in domain.quantities_to_be_stored:
92            bits.append('get_quantity("%s").get_integral()' % quantity)
93            bits.append('get_quantity("%s").get_values()' % quantity)
94
95        for bit in bits:
96            #print 'testing that domain.'+bit+' has been restored'
97            #print bit
98            #print 'done'
99            assert num.allclose(eval('domain.'+bit),eval('domain2.'+bit))
100
101        ######################################
102        #Now evolve them both, just to be sure
103        ######################################x
104        domain.time = 0.
105        from time import sleep
106
107        final = .1
108        domain.set_quantity('friction', 0.1)
109        domain.store = False
110        domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd})
111
112
113        for t in domain.evolve(yieldstep = yiel, finaltime = final):
114            #domain.write_time()
115            pass
116
117        final = final - (domain2.starttime-domain.starttime)
118        #BUT since domain1 gets time hacked back to 0:
119        final = final + (domain2.starttime-domain.starttime)
120
121        domain2.smooth = False
122        domain2.store = False
123        domain2.default_order=2
124        domain2.set_quantity('friction', 0.1)
125        #Bed-slope and friction
126        # Boundary conditions
127        Bd2=Dirichlet_boundary([0.2,0.,0.])
128        domain2.boundary = domain.boundary
129        #print 'domain2.boundary'
130        #print domain2.boundary
131        domain2.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd})
132        #domain2.set_boundary({'exterior': Bd})
133
134        domain2.check_integrity()
135
136        for t in domain2.evolve(yieldstep = yiel, finaltime = final):
137            #domain2.write_time()
138            pass
139
140        ###################
141        ##NOW TEST IT!!!
142        ##################
143
144        bits = ['vertex_coordinates']
145
146        for quantity in ['elevation','stage', 'ymomentum','xmomentum']:
147            bits.append('get_quantity("%s").get_integral()' %quantity)
148            bits.append('get_quantity("%s").get_values()' %quantity)
149
150        #print bits
151        for bit in bits:
152            #print bit
153            #print eval('domain.'+bit)
154            #print eval('domain2.'+bit)
155           
156            #print eval('domain.'+bit+'-domain2.'+bit)
157            msg = 'Values in the two domains are different for ' + bit
158            assert num.allclose(eval('domain.'+bit),eval('domain2.'+bit),
159                                rtol=1.e-5, atol=3.e-8), msg
160
161
162
163    def test_get_mesh_and_quantities_from_sww_file(self):
164        """test_get_mesh_and_quantities_from_sww_file(self):
165        """     
166       
167        # Generate a test sww file with non trivial georeference
168       
169        import time, os
170        from Scientific.IO.NetCDF import NetCDFFile
171
172        # Setup
173        from mesh_factory import rectangular
174
175        # Create basic mesh (100m x 5m)
176        width = 5
177        length = 50
178        t_end = 10
179        points, vertices, boundary = rectangular(length, width, 50, 5)
180
181        # Create shallow water domain
182        domain = Domain(points, vertices, boundary,
183                        geo_reference = Geo_reference(56,308500,6189000))
184
185        domain.set_name('flowtest')
186        swwfile = domain.get_name() + '.sww'
187        domain.set_datadir('.')
188
189        Br = Reflective_boundary(domain)    # Side walls
190        Bd = Dirichlet_boundary([1, 0, 0])  # inflow
191
192        domain.set_boundary( {'left': Bd, 'right': Bd, 'top': Br, 'bottom': Br})
193
194        for t in domain.evolve(yieldstep=1, finaltime = t_end):
195            pass
196
197       
198        # Read it
199
200        # Get mesh and quantities from sww file
201        X = get_mesh_and_quantities_from_file(swwfile,
202                                              quantities=['elevation',
203                                                          'stage',
204                                                          'xmomentum',
205                                                          'ymomentum'], 
206                                              verbose=False)
207        mesh, quantities, time = X
208       
209
210        # Check that mesh has been recovered
211        assert num.alltrue(mesh.triangles == domain.get_triangles())
212        assert num.allclose(mesh.nodes, domain.get_nodes())
213
214        # Check that time has been recovered
215        assert num.allclose(time, range(t_end+1))
216
217        # Check that quantities have been recovered
218        # (sww files use single precision)
219        z=domain.get_quantity('elevation').get_values(location='unique vertices')
220        assert num.allclose(quantities['elevation'], z)
221
222        for q in ['stage', 'xmomentum', 'ymomentum']:
223            # Get quantity at last timestep
224            q_ref=domain.get_quantity(q).get_values(location='unique vertices')
225
226            #print q,quantities[q]
227            q_sww=quantities[q][-1,:]
228
229            msg = 'Quantity %s failed to be recovered' %q
230            assert num.allclose(q_ref, q_sww, atol=1.0e-6), msg
231           
232        # Cleanup
233        os.remove(swwfile)
234       
235       
236
237    def test_weed(self):
238        coordinates1 = [[0.,0.],[1.,0.],[1.,1.],[1.,0.],[2.,0.],[1.,1.]]
239        volumes1 = [[0,1,2],[3,4,5]]
240        boundary1= {(0,1): 'external',(1,2): 'not external',(2,0): 'external',(3,4): 'external',(4,5): 'external',(5,3): 'not external'}
241        coordinates2,volumes2,boundary2=weed(coordinates1,volumes1,boundary1)
242
243        points2 = {(0.,0.):None,(1.,0.):None,(1.,1.):None,(2.,0.):None}
244
245        assert len(points2)==len(coordinates2)
246        for i in range(len(coordinates2)):
247            coordinate = tuple(coordinates2[i])
248            assert points2.has_key(coordinate)
249            points2[coordinate]=i
250
251        for triangle in volumes1:
252            for coordinate in triangle:
253                assert coordinates2[points2[tuple(coordinates1[coordinate])]][0]==coordinates1[coordinate][0]
254                assert coordinates2[points2[tuple(coordinates1[coordinate])]][1]==coordinates1[coordinate][1]
255
256
257    def test_triangulation(self):
258        #
259       
260       
261        filename = tempfile.mktemp("_data_manager.sww")
262        outfile = NetCDFFile(filename, netcdf_mode_w)
263        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
264        volumes = (0,1,2)
265        elevation = [0,1,2]
266        new_origin = None
267        new_origin = Geo_reference(56, 0, 0)
268        times = [0, 10]
269        number_of_volumes = len(volumes)
270        number_of_points = len(points_utm)
271        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])
272        sww.store_header(outfile, times, number_of_volumes,
273                         number_of_points, description='fully sick testing',
274                         verbose=self.verbose,sww_precision=netcdf_float)
275        sww.store_triangulation(outfile, points_utm, volumes,
276                                elevation,  new_origin=new_origin,
277                                verbose=self.verbose)       
278        outfile.close()
279        fid = NetCDFFile(filename)
280
281        x = fid.variables['x'][:]
282        y = fid.variables['y'][:]
283        fid.close()
284
285        assert num.allclose(num.array(map(None, x,y)), points_utm)
286        os.remove(filename)
287
288       
289    def test_triangulationII(self):
290        #
291       
292       
293        filename = tempfile.mktemp("_data_manager.sww")
294        outfile = NetCDFFile(filename, netcdf_mode_w)
295        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
296        volumes = (0,1,2)
297        elevation = [0,1,2]
298        new_origin = None
299        #new_origin = Geo_reference(56, 0, 0)
300        times = [0, 10]
301        number_of_volumes = len(volumes)
302        number_of_points = len(points_utm)
303        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
304        sww.store_header(outfile, times, number_of_volumes,
305                         number_of_points, description='fully sick testing',
306                         verbose=self.verbose,sww_precision=netcdf_float)
307        sww.store_triangulation(outfile, points_utm, volumes,
308                                new_origin=new_origin,
309                                verbose=self.verbose)
310        sww.store_static_quantities(outfile, elevation=elevation)                               
311                               
312        outfile.close()
313        fid = NetCDFFile(filename)
314
315        x = fid.variables['x'][:]
316        y = fid.variables['y'][:]
317        results_georef = Geo_reference()
318        results_georef.read_NetCDF(fid)
319        assert results_georef == Geo_reference(DEFAULT_ZONE, 0, 0)
320        fid.close()
321
322        assert num.allclose(num.array(map(None, x,y)), points_utm)
323        os.remove(filename)
324
325       
326    def test_triangulation_new_origin(self):
327        #
328       
329       
330        filename = tempfile.mktemp("_data_manager.sww")
331        outfile = NetCDFFile(filename, netcdf_mode_w)
332        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
333        volumes = (0,1,2)
334        elevation = [0,1,2]
335        new_origin = None
336        new_origin = Geo_reference(56, 1, 554354)
337        points_utm = new_origin.change_points_geo_ref(points_utm)
338        times = [0, 10]
339        number_of_volumes = len(volumes)
340        number_of_points = len(points_utm)
341        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
342        sww.store_header(outfile, times, number_of_volumes,
343                         number_of_points, description='fully sick testing',
344                         verbose=self.verbose,sww_precision=netcdf_float)
345        sww.store_triangulation(outfile, points_utm, volumes,
346                                elevation,  new_origin=new_origin,
347                                verbose=self.verbose)
348        outfile.close()
349        fid = NetCDFFile(filename)
350
351        x = fid.variables['x'][:]
352        y = fid.variables['y'][:]
353        results_georef = Geo_reference()
354        results_georef.read_NetCDF(fid)
355        assert results_georef == new_origin
356        fid.close()
357
358        absolute = Geo_reference(56, 0,0)
359        assert num.allclose(num.array(
360            absolute.change_points_geo_ref(map(None, x,y),
361                                           new_origin)),points_utm)
362       
363        os.remove(filename)
364       
365    def test_triangulation_points_georeference(self):
366        #
367       
368       
369        filename = tempfile.mktemp("_data_manager.sww")
370        outfile = NetCDFFile(filename, netcdf_mode_w)
371        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
372        volumes = (0,1,2)
373        elevation = [0,1,2]
374        new_origin = None
375        points_georeference = Geo_reference(56, 1, 554354)
376        points_utm = points_georeference.change_points_geo_ref(points_utm)
377        times = [0, 10]
378        number_of_volumes = len(volumes)
379        number_of_points = len(points_utm)
380        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
381        sww.store_header(outfile, times, number_of_volumes,
382                         number_of_points, description='fully sick testing',
383                         verbose=self.verbose,sww_precision=netcdf_float)
384        sww.store_triangulation(outfile, points_utm, volumes,
385                                elevation,  new_origin=new_origin,
386                                points_georeference=points_georeference,
387                                verbose=self.verbose)       
388        outfile.close()
389        fid = NetCDFFile(filename)
390
391        x = fid.variables['x'][:]
392        y = fid.variables['y'][:]
393        results_georef = Geo_reference()
394        results_georef.read_NetCDF(fid)
395        assert results_georef == points_georeference
396        fid.close()
397
398        assert num.allclose(num.array(map(None, x,y)), points_utm)
399        os.remove(filename)
400       
401    def test_triangulation_2_geo_refs(self):
402        #
403       
404       
405        filename = tempfile.mktemp("_data_manager.sww")
406        outfile = NetCDFFile(filename, netcdf_mode_w)
407        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
408        volumes = (0,1,2)
409        elevation = [0,1,2]
410        new_origin = Geo_reference(56, 1, 1)
411        points_georeference = Geo_reference(56, 0, 0)
412        points_utm = points_georeference.change_points_geo_ref(points_utm)
413        times = [0, 10]
414        number_of_volumes = len(volumes)
415        number_of_points = len(points_utm)
416        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
417        sww.store_header(outfile, times, number_of_volumes,
418                         number_of_points, description='fully sick testing',
419                         verbose=self.verbose,sww_precision=netcdf_float)
420        sww.store_triangulation(outfile, points_utm, volumes,
421                                elevation,  new_origin=new_origin,
422                                points_georeference=points_georeference,
423                                verbose=self.verbose)       
424        outfile.close()
425        fid = NetCDFFile(filename)
426
427        x = fid.variables['x'][:]
428        y = fid.variables['y'][:]
429        results_georef = Geo_reference()
430        results_georef.read_NetCDF(fid)
431        assert results_georef == new_origin
432        fid.close()
433
434
435        absolute = Geo_reference(56, 0,0)
436        assert num.allclose(num.array(
437            absolute.change_points_geo_ref(map(None, x,y),
438                                           new_origin)),points_utm)
439        os.remove(filename)
440
441#################################################################################
442
443if __name__ == "__main__":
444    suite = unittest.makeSuite(Test_sww, 'test')
445    runner = unittest.TextTestRunner(verbosity=1)
446    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.