source: trunk/anuga_core/source/anuga/file/test_sww.py @ 7866

Last change on this file since 7866 was 7866, checked in by hudson, 14 years ago

More swb tests passing. Cleaned up some pylint errors.

File size: 16.3 KB
Line 
1import os
2import unittest
3import tempfile
4import numpy as num
5
6from anuga.coordinate_transforms.geo_reference import Geo_reference
7from csv_file import load_csv_as_array, load_csv_as_dict
8from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular
9from anuga.shallow_water.shallow_water_domain import Domain
10from sww import load_sww_as_domain, weed, get_mesh_and_quantities_from_file
11
12# boundary functions
13from anuga.shallow_water.boundaries import Reflective_boundary, \
14            Field_boundary, Transmissive_momentum_set_stage_boundary, \
15            Transmissive_stage_zero_momentum_boundary
16from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\
17     import Transmissive_boundary, Dirichlet_boundary, \
18            Time_boundary, File_boundary, AWI_boundary
19
20
21class Test_sww(unittest.TestCase):
22    def setUp(self):
23        self.verbose = False
24        pass
25
26    def tearDown(self):
27        pass
28       
29    def test_sww2domain1(self):
30        ################################################
31        #Create a test domain, and evolve and save it.
32        ################################################
33        from mesh_factory import rectangular
34
35        #Create basic mesh
36
37        yiel=0.01
38        points, vertices, boundary = rectangular(10,10)
39
40        #Create shallow water domain
41        domain = Domain(points, vertices, boundary)
42        domain.geo_reference = Geo_reference(56,11,11)
43        domain.smooth = False
44        domain.store = True
45        domain.set_name('bedslope')
46        domain.default_order=2
47        #Bed-slope and friction
48        domain.set_quantity('elevation', lambda x,y: -x/3)
49        domain.set_quantity('friction', 0.1)
50        # Boundary conditions
51        from math import sin, pi
52        Br = Reflective_boundary(domain)
53        Bt = Transmissive_boundary(domain)
54        Bd = Dirichlet_boundary([0.2,0.,0.])
55        Bw = Time_boundary(domain=domain,f=lambda t: [(0.1*sin(t*2*pi)), 0.0, 0.0])
56
57        #domain.set_boundary({'left': Bd, 'right': Br, 'top': Br, 'bottom': Br})
58        domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd})
59
60        domain.quantities_to_be_stored['xmomentum'] = 2
61        domain.quantities_to_be_stored['ymomentum'] = 2
62        #Initial condition
63        h = 0.05
64        elevation = domain.quantities['elevation'].vertex_values
65        domain.set_quantity('stage', elevation + h)
66
67        domain.check_integrity()
68        #Evolution
69        #domain.tight_slope_limiters = 1
70        for t in domain.evolve(yieldstep = yiel, finaltime = 0.05):
71            #domain.write_time()
72            pass
73
74
75
76        filename = domain.datadir + os.sep + domain.get_name() + '.sww'
77        domain2 = load_sww_as_domain(filename, None, fail_if_NaN=False,
78                                        verbose=self.verbose)
79        #points, vertices, boundary = rectangular(15,15)
80        #domain2.boundary = boundary
81        ###################
82        ##NOW TEST IT!!!
83        ###################
84
85        os.remove(filename)
86
87        bits = ['vertex_coordinates']
88        for quantity in domain.quantities_to_be_stored:
89            bits.append('get_quantity("%s").get_integral()' % quantity)
90            bits.append('get_quantity("%s").get_values()' % quantity)
91
92        for bit in bits:
93            #print 'testing that domain.'+bit+' has been restored'
94            #print bit
95            #print 'done'
96            assert num.allclose(eval('domain.'+bit),eval('domain2.'+bit))
97
98        ######################################
99        #Now evolve them both, just to be sure
100        ######################################x
101        domain.time = 0.
102        from time import sleep
103
104        final = .1
105        domain.set_quantity('friction', 0.1)
106        domain.store = False
107        domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd})
108
109
110        for t in domain.evolve(yieldstep = yiel, finaltime = final):
111            #domain.write_time()
112            pass
113
114        final = final - (domain2.starttime-domain.starttime)
115        #BUT since domain1 gets time hacked back to 0:
116        final = final + (domain2.starttime-domain.starttime)
117
118        domain2.smooth = False
119        domain2.store = False
120        domain2.default_order=2
121        domain2.set_quantity('friction', 0.1)
122        #Bed-slope and friction
123        # Boundary conditions
124        Bd2=Dirichlet_boundary([0.2,0.,0.])
125        domain2.boundary = domain.boundary
126        #print 'domain2.boundary'
127        #print domain2.boundary
128        domain2.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd})
129        #domain2.set_boundary({'exterior': Bd})
130
131        domain2.check_integrity()
132
133        for t in domain2.evolve(yieldstep = yiel, finaltime = final):
134            #domain2.write_time()
135            pass
136
137        ###################
138        ##NOW TEST IT!!!
139        ##################
140
141        bits = ['vertex_coordinates']
142
143        for quantity in ['elevation','stage', 'ymomentum','xmomentum']:
144            bits.append('get_quantity("%s").get_integral()' %quantity)
145            bits.append('get_quantity("%s").get_values()' %quantity)
146
147        #print bits
148        for bit in bits:
149            #print bit
150            #print eval('domain.'+bit)
151            #print eval('domain2.'+bit)
152           
153            #print eval('domain.'+bit+'-domain2.'+bit)
154            msg = 'Values in the two domains are different for ' + bit
155            assert num.allclose(eval('domain.'+bit),eval('domain2.'+bit),
156                                rtol=1.e-5, atol=3.e-8), msg
157
158
159
160    def test_get_mesh_and_quantities_from_sww_file(self):
161        """test_get_mesh_and_quantities_from_sww_file(self):
162        """     
163       
164        # Generate a test sww file with non trivial georeference
165       
166        import time, os
167        from Scientific.IO.NetCDF import NetCDFFile
168
169        # Setup
170        from mesh_factory import rectangular
171
172        # Create basic mesh (100m x 5m)
173        width = 5
174        length = 50
175        t_end = 10
176        points, vertices, boundary = rectangular(length, width, 50, 5)
177
178        # Create shallow water domain
179        domain = Domain(points, vertices, boundary,
180                        geo_reference = Geo_reference(56,308500,6189000))
181
182        domain.set_name('flowtest')
183        swwfile = domain.get_name() + '.sww'
184        domain.set_datadir('.')
185
186        Br = Reflective_boundary(domain)    # Side walls
187        Bd = Dirichlet_boundary([1, 0, 0])  # inflow
188
189        domain.set_boundary( {'left': Bd, 'right': Bd, 'top': Br, 'bottom': Br})
190
191        for t in domain.evolve(yieldstep=1, finaltime = t_end):
192            pass
193
194       
195        # Read it
196
197        # Get mesh and quantities from sww file
198        X = get_mesh_and_quantities_from_file(swwfile,
199                                              quantities=['elevation',
200                                                          'stage',
201                                                          'xmomentum',
202                                                          'ymomentum'], 
203                                              verbose=False)
204        mesh, quantities, time = X
205       
206
207        # Check that mesh has been recovered
208        assert num.alltrue(mesh.triangles == domain.get_triangles())
209        assert num.allclose(mesh.nodes, domain.get_nodes())
210
211        # Check that time has been recovered
212        assert num.allclose(time, range(t_end+1))
213
214        # Check that quantities have been recovered
215        # (sww files use single precision)
216        z=domain.get_quantity('elevation').get_values(location='unique vertices')
217        assert num.allclose(quantities['elevation'], z)
218
219        for q in ['stage', 'xmomentum', 'ymomentum']:
220            # Get quantity at last timestep
221            q_ref=domain.get_quantity(q).get_values(location='unique vertices')
222
223            #print q,quantities[q]
224            q_sww=quantities[q][-1,:]
225
226            msg = 'Quantity %s failed to be recovered' %q
227            assert num.allclose(q_ref, q_sww, atol=1.0e-6), msg
228           
229        # Cleanup
230        os.remove(swwfile)
231       
232       
233
234    def test_weed(self):
235        coordinates1 = [[0.,0.],[1.,0.],[1.,1.],[1.,0.],[2.,0.],[1.,1.]]
236        volumes1 = [[0,1,2],[3,4,5]]
237        boundary1= {(0,1): 'external',(1,2): 'not external',(2,0): 'external',(3,4): 'external',(4,5): 'external',(5,3): 'not external'}
238        coordinates2,volumes2,boundary2=weed(coordinates1,volumes1,boundary1)
239
240        points2 = {(0.,0.):None,(1.,0.):None,(1.,1.):None,(2.,0.):None}
241
242        assert len(points2)==len(coordinates2)
243        for i in range(len(coordinates2)):
244            coordinate = tuple(coordinates2[i])
245            assert points2.has_key(coordinate)
246            points2[coordinate]=i
247
248        for triangle in volumes1:
249            for coordinate in triangle:
250                assert coordinates2[points2[tuple(coordinates1[coordinate])]][0]==coordinates1[coordinate][0]
251                assert coordinates2[points2[tuple(coordinates1[coordinate])]][1]==coordinates1[coordinate][1]
252
253
254    def test_triangulation(self):
255        #
256       
257       
258        filename = tempfile.mktemp("_data_manager.sww")
259        outfile = NetCDFFile(filename, netcdf_mode_w)
260        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
261        volumes = (0,1,2)
262        elevation = [0,1,2]
263        new_origin = None
264        new_origin = Geo_reference(56, 0, 0)
265        times = [0, 10]
266        number_of_volumes = len(volumes)
267        number_of_points = len(points_utm)
268        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])
269        sww.store_header(outfile, times, number_of_volumes,
270                         number_of_points, description='fully sick testing',
271                         verbose=self.verbose,sww_precision=netcdf_float)
272        sww.store_triangulation(outfile, points_utm, volumes,
273                                elevation,  new_origin=new_origin,
274                                verbose=self.verbose)       
275        outfile.close()
276        fid = NetCDFFile(filename)
277
278        x = fid.variables['x'][:]
279        y = fid.variables['y'][:]
280        fid.close()
281
282        assert num.allclose(num.array(map(None, x,y)), points_utm)
283        os.remove(filename)
284
285       
286    def test_triangulationII(self):
287        #
288       
289       
290        filename = tempfile.mktemp("_data_manager.sww")
291        outfile = NetCDFFile(filename, netcdf_mode_w)
292        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
293        volumes = (0,1,2)
294        elevation = [0,1,2]
295        new_origin = None
296        #new_origin = Geo_reference(56, 0, 0)
297        times = [0, 10]
298        number_of_volumes = len(volumes)
299        number_of_points = len(points_utm)
300        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
301        sww.store_header(outfile, times, number_of_volumes,
302                         number_of_points, description='fully sick testing',
303                         verbose=self.verbose,sww_precision=netcdf_float)
304        sww.store_triangulation(outfile, points_utm, volumes,
305                                new_origin=new_origin,
306                                verbose=self.verbose)
307        sww.store_static_quantities(outfile, elevation=elevation)                               
308                               
309        outfile.close()
310        fid = NetCDFFile(filename)
311
312        x = fid.variables['x'][:]
313        y = fid.variables['y'][:]
314        results_georef = Geo_reference()
315        results_georef.read_NetCDF(fid)
316        assert results_georef == Geo_reference(DEFAULT_ZONE, 0, 0)
317        fid.close()
318
319        assert num.allclose(num.array(map(None, x,y)), points_utm)
320        os.remove(filename)
321
322       
323    def test_triangulation_new_origin(self):
324        #
325       
326       
327        filename = tempfile.mktemp("_data_manager.sww")
328        outfile = NetCDFFile(filename, netcdf_mode_w)
329        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
330        volumes = (0,1,2)
331        elevation = [0,1,2]
332        new_origin = None
333        new_origin = Geo_reference(56, 1, 554354)
334        points_utm = new_origin.change_points_geo_ref(points_utm)
335        times = [0, 10]
336        number_of_volumes = len(volumes)
337        number_of_points = len(points_utm)
338        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
339        sww.store_header(outfile, times, number_of_volumes,
340                         number_of_points, description='fully sick testing',
341                         verbose=self.verbose,sww_precision=netcdf_float)
342        sww.store_triangulation(outfile, points_utm, volumes,
343                                elevation,  new_origin=new_origin,
344                                verbose=self.verbose)
345        outfile.close()
346        fid = NetCDFFile(filename)
347
348        x = fid.variables['x'][:]
349        y = fid.variables['y'][:]
350        results_georef = Geo_reference()
351        results_georef.read_NetCDF(fid)
352        assert results_georef == new_origin
353        fid.close()
354
355        absolute = Geo_reference(56, 0,0)
356        assert num.allclose(num.array(
357            absolute.change_points_geo_ref(map(None, x,y),
358                                           new_origin)),points_utm)
359       
360        os.remove(filename)
361       
362    def test_triangulation_points_georeference(self):
363        #
364       
365       
366        filename = tempfile.mktemp("_data_manager.sww")
367        outfile = NetCDFFile(filename, netcdf_mode_w)
368        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
369        volumes = (0,1,2)
370        elevation = [0,1,2]
371        new_origin = None
372        points_georeference = Geo_reference(56, 1, 554354)
373        points_utm = points_georeference.change_points_geo_ref(points_utm)
374        times = [0, 10]
375        number_of_volumes = len(volumes)
376        number_of_points = len(points_utm)
377        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
378        sww.store_header(outfile, times, number_of_volumes,
379                         number_of_points, description='fully sick testing',
380                         verbose=self.verbose,sww_precision=netcdf_float)
381        sww.store_triangulation(outfile, points_utm, volumes,
382                                elevation,  new_origin=new_origin,
383                                points_georeference=points_georeference,
384                                verbose=self.verbose)       
385        outfile.close()
386        fid = NetCDFFile(filename)
387
388        x = fid.variables['x'][:]
389        y = fid.variables['y'][:]
390        results_georef = Geo_reference()
391        results_georef.read_NetCDF(fid)
392        assert results_georef == points_georeference
393        fid.close()
394
395        assert num.allclose(num.array(map(None, x,y)), points_utm)
396        os.remove(filename)
397       
398    def test_triangulation_2_geo_refs(self):
399        #
400       
401       
402        filename = tempfile.mktemp("_data_manager.sww")
403        outfile = NetCDFFile(filename, netcdf_mode_w)
404        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
405        volumes = (0,1,2)
406        elevation = [0,1,2]
407        new_origin = Geo_reference(56, 1, 1)
408        points_georeference = Geo_reference(56, 0, 0)
409        points_utm = points_georeference.change_points_geo_ref(points_utm)
410        times = [0, 10]
411        number_of_volumes = len(volumes)
412        number_of_points = len(points_utm)
413        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
414        sww.store_header(outfile, times, number_of_volumes,
415                         number_of_points, description='fully sick testing',
416                         verbose=self.verbose,sww_precision=netcdf_float)
417        sww.store_triangulation(outfile, points_utm, volumes,
418                                elevation,  new_origin=new_origin,
419                                points_georeference=points_georeference,
420                                verbose=self.verbose)       
421        outfile.close()
422        fid = NetCDFFile(filename)
423
424        x = fid.variables['x'][:]
425        y = fid.variables['y'][:]
426        results_georef = Geo_reference()
427        results_georef.read_NetCDF(fid)
428        assert results_georef == new_origin
429        fid.close()
430
431
432        absolute = Geo_reference(56, 0,0)
433        assert num.allclose(num.array(
434            absolute.change_points_geo_ref(map(None, x,y),
435                                           new_origin)),points_utm)
436        os.remove(filename)
437
438#################################################################################
439
440if __name__ == "__main__":
441    suite = unittest.makeSuite(Test_sww, 'test')
442    runner = unittest.TextTestRunner(verbosity=1)
443    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.