source: trunk/anuga_core/source/anuga/file/test_sww.py @ 7870

Last change on this file since 7870 was 7870, checked in by hudson, 13 years ago

Cleaned up pylint warnings.

File size: 16.3 KB
Line 
1import os
2import unittest
3import tempfile
4import numpy as num
5
6from anuga.coordinate_transforms.geo_reference import Geo_reference
7from csv_file import load_csv_as_array, load_csv_as_dict
8from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular
9from anuga.shallow_water.shallow_water_domain import Domain
10from sww import load_sww_as_domain, weed, get_mesh_and_quantities_from_file
11from Scientific.IO.NetCDF import NetCDFFile
12
13# boundary functions
14from anuga.shallow_water.boundaries import Reflective_boundary, \
15            Field_boundary, Transmissive_momentum_set_stage_boundary, \
16            Transmissive_stage_zero_momentum_boundary
17from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\
18     import Transmissive_boundary, Dirichlet_boundary, \
19            Time_boundary, File_boundary, AWI_boundary
20
21
22class Test_sww(unittest.TestCase):
23    def setUp(self):
24        self.verbose = False
25        pass
26
27    def tearDown(self):
28        pass
29       
30    def test_sww2domain1(self):
31        ################################################
32        #Create a test domain, and evolve and save it.
33        ################################################
34        from mesh_factory import rectangular
35
36        #Create basic mesh
37
38        yiel=0.01
39        points, vertices, boundary = rectangular(10,10)
40
41        #Create shallow water domain
42        domain = Domain(points, vertices, boundary)
43        domain.geo_reference = Geo_reference(56,11,11)
44        domain.smooth = False
45        domain.store = True
46        domain.set_name('bedslope')
47        domain.default_order=2
48        #Bed-slope and friction
49        domain.set_quantity('elevation', lambda x,y: -x/3)
50        domain.set_quantity('friction', 0.1)
51        # Boundary conditions
52        from math import sin, pi
53        Br = Reflective_boundary(domain)
54        Bt = Transmissive_boundary(domain)
55        Bd = Dirichlet_boundary([0.2,0.,0.])
56        Bw = Time_boundary(domain=domain,f=lambda t: [(0.1*sin(t*2*pi)), 0.0, 0.0])
57
58        #domain.set_boundary({'left': Bd, 'right': Br, 'top': Br, 'bottom': Br})
59        domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd})
60
61        domain.quantities_to_be_stored['xmomentum'] = 2
62        domain.quantities_to_be_stored['ymomentum'] = 2
63        #Initial condition
64        h = 0.05
65        elevation = domain.quantities['elevation'].vertex_values
66        domain.set_quantity('stage', elevation + h)
67
68        domain.check_integrity()
69        #Evolution
70        #domain.tight_slope_limiters = 1
71        for t in domain.evolve(yieldstep = yiel, finaltime = 0.05):
72            #domain.write_time()
73            pass
74
75
76
77        filename = domain.datadir + os.sep + domain.get_name() + '.sww'
78        domain2 = load_sww_as_domain(filename, None, fail_if_NaN=False,
79                                        verbose=self.verbose)
80        #points, vertices, boundary = rectangular(15,15)
81        #domain2.boundary = boundary
82        ###################
83        ##NOW TEST IT!!!
84        ###################
85
86        os.remove(filename)
87
88        bits = ['vertex_coordinates']
89        for quantity in domain.quantities_to_be_stored:
90            bits.append('get_quantity("%s").get_integral()' % quantity)
91            bits.append('get_quantity("%s").get_values()' % quantity)
92
93        for bit in bits:
94            #print 'testing that domain.'+bit+' has been restored'
95            #print bit
96            #print 'done'
97            assert num.allclose(eval('domain.'+bit),eval('domain2.'+bit))
98
99        ######################################
100        #Now evolve them both, just to be sure
101        ######################################x
102        domain.time = 0.
103        from time import sleep
104
105        final = .1
106        domain.set_quantity('friction', 0.1)
107        domain.store = False
108        domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd})
109
110
111        for t in domain.evolve(yieldstep = yiel, finaltime = final):
112            #domain.write_time()
113            pass
114
115        final = final - (domain2.starttime-domain.starttime)
116        #BUT since domain1 gets time hacked back to 0:
117        final = final + (domain2.starttime-domain.starttime)
118
119        domain2.smooth = False
120        domain2.store = False
121        domain2.default_order=2
122        domain2.set_quantity('friction', 0.1)
123        #Bed-slope and friction
124        # Boundary conditions
125        Bd2=Dirichlet_boundary([0.2,0.,0.])
126        domain2.boundary = domain.boundary
127        #print 'domain2.boundary'
128        #print domain2.boundary
129        domain2.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd})
130        #domain2.set_boundary({'exterior': Bd})
131
132        domain2.check_integrity()
133
134        for t in domain2.evolve(yieldstep = yiel, finaltime = final):
135            #domain2.write_time()
136            pass
137
138        ###################
139        ##NOW TEST IT!!!
140        ##################
141
142        bits = ['vertex_coordinates']
143
144        for quantity in ['elevation','stage', 'ymomentum','xmomentum']:
145            bits.append('get_quantity("%s").get_integral()' %quantity)
146            bits.append('get_quantity("%s").get_values()' %quantity)
147
148        #print bits
149        for bit in bits:
150            #print bit
151            #print eval('domain.'+bit)
152            #print eval('domain2.'+bit)
153           
154            #print eval('domain.'+bit+'-domain2.'+bit)
155            msg = 'Values in the two domains are different for ' + bit
156            assert num.allclose(eval('domain.'+bit),eval('domain2.'+bit),
157                                rtol=1.e-5, atol=3.e-8), msg
158
159
160
161    def test_get_mesh_and_quantities_from_sww_file(self):
162        """test_get_mesh_and_quantities_from_sww_file(self):
163        """     
164       
165        # Generate a test sww file with non trivial georeference
166       
167        import time, os
168        from Scientific.IO.NetCDF import NetCDFFile
169
170        # Setup
171        from mesh_factory import rectangular
172
173        # Create basic mesh (100m x 5m)
174        width = 5
175        length = 50
176        t_end = 10
177        points, vertices, boundary = rectangular(length, width, 50, 5)
178
179        # Create shallow water domain
180        domain = Domain(points, vertices, boundary,
181                        geo_reference = Geo_reference(56,308500,6189000))
182
183        domain.set_name('flowtest')
184        swwfile = domain.get_name() + '.sww'
185        domain.set_datadir('.')
186
187        Br = Reflective_boundary(domain)    # Side walls
188        Bd = Dirichlet_boundary([1, 0, 0])  # inflow
189
190        domain.set_boundary( {'left': Bd, 'right': Bd, 'top': Br, 'bottom': Br})
191
192        for t in domain.evolve(yieldstep=1, finaltime = t_end):
193            pass
194
195       
196        # Read it
197
198        # Get mesh and quantities from sww file
199        X = get_mesh_and_quantities_from_file(swwfile,
200                                              quantities=['elevation',
201                                                          'stage',
202                                                          'xmomentum',
203                                                          'ymomentum'], 
204                                              verbose=False)
205        mesh, quantities, time = X
206       
207
208        # Check that mesh has been recovered
209        assert num.alltrue(mesh.triangles == domain.get_triangles())
210        assert num.allclose(mesh.nodes, domain.get_nodes())
211
212        # Check that time has been recovered
213        assert num.allclose(time, range(t_end+1))
214
215        # Check that quantities have been recovered
216        # (sww files use single precision)
217        z=domain.get_quantity('elevation').get_values(location='unique vertices')
218        assert num.allclose(quantities['elevation'], z)
219
220        for q in ['stage', 'xmomentum', 'ymomentum']:
221            # Get quantity at last timestep
222            q_ref=domain.get_quantity(q).get_values(location='unique vertices')
223
224            #print q,quantities[q]
225            q_sww=quantities[q][-1,:]
226
227            msg = 'Quantity %s failed to be recovered' %q
228            assert num.allclose(q_ref, q_sww, atol=1.0e-6), msg
229           
230        # Cleanup
231        os.remove(swwfile)
232       
233       
234
235    def test_weed(self):
236        coordinates1 = [[0.,0.],[1.,0.],[1.,1.],[1.,0.],[2.,0.],[1.,1.]]
237        volumes1 = [[0,1,2],[3,4,5]]
238        boundary1= {(0,1): 'external',(1,2): 'not external',(2,0): 'external',(3,4): 'external',(4,5): 'external',(5,3): 'not external'}
239        coordinates2,volumes2,boundary2=weed(coordinates1,volumes1,boundary1)
240
241        points2 = {(0.,0.):None,(1.,0.):None,(1.,1.):None,(2.,0.):None}
242
243        assert len(points2)==len(coordinates2)
244        for i in range(len(coordinates2)):
245            coordinate = tuple(coordinates2[i])
246            assert points2.has_key(coordinate)
247            points2[coordinate]=i
248
249        for triangle in volumes1:
250            for coordinate in triangle:
251                assert coordinates2[points2[tuple(coordinates1[coordinate])]][0]==coordinates1[coordinate][0]
252                assert coordinates2[points2[tuple(coordinates1[coordinate])]][1]==coordinates1[coordinate][1]
253
254
255    def test_triangulation(self):
256        #
257       
258       
259        filename = tempfile.mktemp("_data_manager.sww")
260        outfile = NetCDFFile(filename, netcdf_mode_w)
261        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
262        volumes = (0,1,2)
263        elevation = [0,1,2]
264        new_origin = None
265        new_origin = Geo_reference(56, 0, 0)
266        times = [0, 10]
267        number_of_volumes = len(volumes)
268        number_of_points = len(points_utm)
269        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])
270        sww.store_header(outfile, times, number_of_volumes,
271                         number_of_points, description='fully sick testing',
272                         verbose=self.verbose,sww_precision=netcdf_float)
273        sww.store_triangulation(outfile, points_utm, volumes,
274                                elevation,  new_origin=new_origin,
275                                verbose=self.verbose)       
276        outfile.close()
277        fid = NetCDFFile(filename)
278
279        x = fid.variables['x'][:]
280        y = fid.variables['y'][:]
281        fid.close()
282
283        assert num.allclose(num.array(map(None, x,y)), points_utm)
284        os.remove(filename)
285
286       
287    def test_triangulationII(self):
288        #
289       
290       
291        filename = tempfile.mktemp("_data_manager.sww")
292        outfile = NetCDFFile(filename, netcdf_mode_w)
293        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
294        volumes = (0,1,2)
295        elevation = [0,1,2]
296        new_origin = None
297        #new_origin = Geo_reference(56, 0, 0)
298        times = [0, 10]
299        number_of_volumes = len(volumes)
300        number_of_points = len(points_utm)
301        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
302        sww.store_header(outfile, times, number_of_volumes,
303                         number_of_points, description='fully sick testing',
304                         verbose=self.verbose,sww_precision=netcdf_float)
305        sww.store_triangulation(outfile, points_utm, volumes,
306                                new_origin=new_origin,
307                                verbose=self.verbose)
308        sww.store_static_quantities(outfile, elevation=elevation)                               
309                               
310        outfile.close()
311        fid = NetCDFFile(filename)
312
313        x = fid.variables['x'][:]
314        y = fid.variables['y'][:]
315        results_georef = Geo_reference()
316        results_georef.read_NetCDF(fid)
317        assert results_georef == Geo_reference(DEFAULT_ZONE, 0, 0)
318        fid.close()
319
320        assert num.allclose(num.array(map(None, x,y)), points_utm)
321        os.remove(filename)
322
323       
324    def test_triangulation_new_origin(self):
325        #
326       
327       
328        filename = tempfile.mktemp("_data_manager.sww")
329        outfile = NetCDFFile(filename, netcdf_mode_w)
330        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
331        volumes = (0,1,2)
332        elevation = [0,1,2]
333        new_origin = None
334        new_origin = Geo_reference(56, 1, 554354)
335        points_utm = new_origin.change_points_geo_ref(points_utm)
336        times = [0, 10]
337        number_of_volumes = len(volumes)
338        number_of_points = len(points_utm)
339        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
340        sww.store_header(outfile, times, number_of_volumes,
341                         number_of_points, description='fully sick testing',
342                         verbose=self.verbose,sww_precision=netcdf_float)
343        sww.store_triangulation(outfile, points_utm, volumes,
344                                elevation,  new_origin=new_origin,
345                                verbose=self.verbose)
346        outfile.close()
347        fid = NetCDFFile(filename)
348
349        x = fid.variables['x'][:]
350        y = fid.variables['y'][:]
351        results_georef = Geo_reference()
352        results_georef.read_NetCDF(fid)
353        assert results_georef == new_origin
354        fid.close()
355
356        absolute = Geo_reference(56, 0,0)
357        assert num.allclose(num.array(
358            absolute.change_points_geo_ref(map(None, x,y),
359                                           new_origin)),points_utm)
360       
361        os.remove(filename)
362       
363    def test_triangulation_points_georeference(self):
364        #
365       
366       
367        filename = tempfile.mktemp("_data_manager.sww")
368        outfile = NetCDFFile(filename, netcdf_mode_w)
369        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
370        volumes = (0,1,2)
371        elevation = [0,1,2]
372        new_origin = None
373        points_georeference = Geo_reference(56, 1, 554354)
374        points_utm = points_georeference.change_points_geo_ref(points_utm)
375        times = [0, 10]
376        number_of_volumes = len(volumes)
377        number_of_points = len(points_utm)
378        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
379        sww.store_header(outfile, times, number_of_volumes,
380                         number_of_points, description='fully sick testing',
381                         verbose=self.verbose,sww_precision=netcdf_float)
382        sww.store_triangulation(outfile, points_utm, volumes,
383                                elevation,  new_origin=new_origin,
384                                points_georeference=points_georeference,
385                                verbose=self.verbose)       
386        outfile.close()
387        fid = NetCDFFile(filename)
388
389        x = fid.variables['x'][:]
390        y = fid.variables['y'][:]
391        results_georef = Geo_reference()
392        results_georef.read_NetCDF(fid)
393        assert results_georef == points_georeference
394        fid.close()
395
396        assert num.allclose(num.array(map(None, x,y)), points_utm)
397        os.remove(filename)
398       
399    def test_triangulation_2_geo_refs(self):
400        #
401       
402       
403        filename = tempfile.mktemp("_data_manager.sww")
404        outfile = NetCDFFile(filename, netcdf_mode_w)
405        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
406        volumes = (0,1,2)
407        elevation = [0,1,2]
408        new_origin = Geo_reference(56, 1, 1)
409        points_georeference = Geo_reference(56, 0, 0)
410        points_utm = points_georeference.change_points_geo_ref(points_utm)
411        times = [0, 10]
412        number_of_volumes = len(volumes)
413        number_of_points = len(points_utm)
414        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
415        sww.store_header(outfile, times, number_of_volumes,
416                         number_of_points, description='fully sick testing',
417                         verbose=self.verbose,sww_precision=netcdf_float)
418        sww.store_triangulation(outfile, points_utm, volumes,
419                                elevation,  new_origin=new_origin,
420                                points_georeference=points_georeference,
421                                verbose=self.verbose)       
422        outfile.close()
423        fid = NetCDFFile(filename)
424
425        x = fid.variables['x'][:]
426        y = fid.variables['y'][:]
427        results_georef = Geo_reference()
428        results_georef.read_NetCDF(fid)
429        assert results_georef == new_origin
430        fid.close()
431
432
433        absolute = Geo_reference(56, 0,0)
434        assert num.allclose(num.array(
435            absolute.change_points_geo_ref(map(None, x,y),
436                                           new_origin)),points_utm)
437        os.remove(filename)
438
439#################################################################################
440
441if __name__ == "__main__":
442    suite = unittest.makeSuite(Test_sww, 'test')
443    runner = unittest.TextTestRunner(verbosity=1)
444    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.