source: trunk/anuga_core/source/anuga/file/test_sww.py @ 8455

Last change on this file since 8455 was 8455, checked in by steve, 13 years ago

Added in a Set_stage_operator

File size: 16.9 KB
Line 
1import os
2import unittest
3import tempfile
4import numpy as num
5
6from anuga.coordinate_transforms.geo_reference import Geo_reference
7from csv_file import load_csv_as_array, load_csv_as_dict
8from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular
9from anuga.shallow_water.shallow_water_domain import Domain
10from sww import load_sww_as_domain, weed, get_mesh_and_quantities_from_file, \
11                Write_sww
12from Scientific.IO.NetCDF import NetCDFFile
13
14from anuga.config import netcdf_mode_w, netcdf_float
15
16# boundary functions
17from anuga.shallow_water.boundaries import Reflective_boundary, \
18            Field_boundary, Transmissive_momentum_set_stage_boundary, \
19            Transmissive_stage_zero_momentum_boundary
20from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\
21     import Transmissive_boundary, Dirichlet_boundary, \
22            Time_boundary, File_boundary, AWI_boundary
23
24
25class Test_sww(unittest.TestCase):
26    def setUp(self):
27        self.verbose = False
28        pass
29
30    def tearDown(self):
31        pass
32       
33    def test_sww2domain1(self):
34        ################################################
35        #Create a test domain, and evolve and save it.
36        ################################################
37        from mesh_factory import rectangular
38
39        #Create basic mesh
40
41        yiel=0.01
42        points, vertices, boundary = rectangular(10,10)
43
44        #print "=============== boundary rect ======================="
45        #print boundary
46
47        #Create shallow water domain
48        domain = Domain(points, vertices, boundary)
49        domain.geo_reference = Geo_reference(56,11,11)
50        domain.smooth = False
51        domain.store = True
52        domain.set_name('bedslope')
53        domain.default_order=2
54        #Bed-slope and friction
55        domain.set_quantity('elevation', lambda x,y: -x/3)
56        domain.set_quantity('friction', 0.1)
57        # Boundary conditions
58        from math import sin, pi
59        Br = Reflective_boundary(domain)
60        Bt = Transmissive_boundary(domain)
61        Bd = Dirichlet_boundary([0.2,0.,0.])
62        Bw = Time_boundary(domain=domain,f=lambda t: [(0.1*sin(t*2*pi)), 0.0, 0.0])
63
64        #domain.set_boundary({'left': Bd, 'right': Br, 'top': Br, 'bottom': Br})
65        domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd})
66
67        domain.quantities_to_be_stored['xmomentum'] = 2
68        domain.quantities_to_be_stored['ymomentum'] = 2
69        #Initial condition
70        h = 0.05
71        elevation = domain.quantities['elevation'].vertex_values
72        domain.set_quantity('stage', elevation + h)
73
74        domain.check_integrity()
75        #Evolution
76        #domain.tight_slope_limiters = 1
77        for t in domain.evolve(yieldstep = yiel, finaltime = 0.05):
78            #domain.write_time()
79            pass
80
81        #print boundary
82
83
84        filename = domain.datadir + os.sep + domain.get_name() + '.sww'
85        domain2 = load_sww_as_domain(filename, None, fail_if_NaN=False,
86                                        verbose=self.verbose)
87
88        # Unfortunately we loss the boundaries top, bottom, left and right,
89        # they are now all lumped into "exterior"
90
91        #print "=============== boundary domain2 ======================="
92        #print domain2.boundary
93       
94
95        #print domain2.get_boundary_tags()
96       
97        #points, vertices, boundary = rectangular(15,15)
98        #domain2.boundary = boundary
99        ###################
100        ##NOW TEST IT!!!
101        ###################
102
103        os.remove(filename)
104
105        bits = ['vertex_coordinates']
106        for quantity in ['stage']:
107            bits.append('get_quantity("%s").get_integral()' % quantity)
108            bits.append('get_quantity("%s").get_values()' % quantity)
109
110        for bit in bits:
111            #print 'testing that domain.'+bit+' has been restored'
112            #print bit
113            #print 'done'
114            #print eval('domain.'+bit)
115            #print eval('domain2.'+bit)
116            assert num.allclose(eval('domain.'+bit),eval('domain2.'+bit))
117
118        ######################################
119        #Now evolve them both, just to be sure
120        ######################################x
121        from time import sleep
122
123        final = .1
124        domain.set_quantity('friction', 0.1)
125        domain.store = False
126        domain.set_boundary({'exterior': Bd, 'left' : Bd, 'right': Bd, 'top': Bd, 'bottom': Bd})
127
128
129        for t in domain.evolve(yieldstep = yiel, finaltime = final):
130            #domain.write_time()
131            pass
132
133        #BUT since domain1 gets time hacked back to 0:
134       
135        final = final + (domain2.get_starttime() - domain.get_starttime())
136
137        domain2.smooth = False
138        domain2.store = False
139        domain2.default_order=2
140        domain2.set_quantity('friction', 0.1)
141        #Bed-slope and friction
142        # Boundary conditions
143        Bd2=Dirichlet_boundary([0.2,0.,0.])
144        domain2.boundary = domain.boundary
145        #print 'domain2.boundary'
146        #print domain2.boundary
147        domain2.set_boundary({'exterior': Bd, 'left' : Bd,  'right': Bd, 'top': Bd, 'bottom': Bd})
148        #domain2.set_boundary({'exterior': Bd})
149
150        domain2.check_integrity()
151
152        for t in domain2.evolve(yieldstep = yiel, finaltime = final):
153            #domain2.write_time()
154            pass
155
156        ###################
157        ##NOW TEST IT!!!
158        ##################
159
160        bits = ['vertex_coordinates']
161
162        for quantity in ['elevation','stage', 'ymomentum','xmomentum']:
163            bits.append('get_quantity("%s").get_integral()' %quantity)
164            bits.append('get_quantity("%s").get_values()' %quantity)
165
166        #print bits
167        for bit in bits:
168            #print bit
169            #print eval('domain.'+bit)
170            #print eval('domain2.'+bit)
171           
172            #print eval('domain.'+bit+'-domain2.'+bit)
173            msg = 'Values in the two domains are different for ' + bit
174            assert num.allclose(eval('domain.'+bit),eval('domain2.'+bit),
175                                rtol=5.e-2, atol=5.e-2), msg
176
177
178
179    def test_get_mesh_and_quantities_from_sww_file(self):
180        """test_get_mesh_and_quantities_from_sww_file(self):
181        """     
182       
183        # Generate a test sww file with non trivial georeference
184       
185        import time, os
186        from Scientific.IO.NetCDF import NetCDFFile
187
188        # Setup
189        from mesh_factory import rectangular
190
191        # Create basic mesh (100m x 5m)
192        width = 5
193        length = 50
194        t_end = 10
195        points, vertices, boundary = rectangular(length, width, 50, 5)
196
197        # Create shallow water domain
198        domain = Domain(points, vertices, boundary,
199                        geo_reference = Geo_reference(56,308500,6189000))
200
201        domain.set_name('flowtest')
202        swwfile = domain.get_name() + '.sww'
203        domain.set_datadir('.')
204
205        Br = Reflective_boundary(domain)    # Side walls
206        Bd = Dirichlet_boundary([1, 0, 0])  # inflow
207
208        domain.set_boundary( {'left': Bd, 'right': Bd, 'top': Br, 'bottom': Br})
209
210        for t in domain.evolve(yieldstep=1, finaltime = t_end):
211            pass
212
213       
214        # Read it
215
216        # Get mesh and quantities from sww file
217        X = get_mesh_and_quantities_from_file(swwfile,
218                                              quantities=['elevation',
219                                                          'stage',
220                                                          'xmomentum',
221                                                          'ymomentum'], 
222                                              verbose=False)
223        mesh, quantities, time = X
224       
225
226        # Check that mesh has been recovered
227        assert num.alltrue(mesh.triangles == domain.get_triangles())
228        assert num.allclose(mesh.nodes, domain.get_nodes())
229
230        # Check that time has been recovered
231        assert num.allclose(time, range(t_end+1))
232
233        # Check that quantities have been recovered
234        # (sww files use single precision)
235        z=domain.get_quantity('elevation').get_values(location='unique vertices')
236        assert num.allclose(quantities['elevation'], z)
237
238        for q in ['stage', 'xmomentum', 'ymomentum']:
239            # Get quantity at last timestep
240            q_ref=domain.get_quantity(q).get_values(location='unique vertices')
241
242            #print q,quantities[q]
243            q_sww=quantities[q][-1,:]
244
245            msg = 'Quantity %s failed to be recovered' %q
246            assert num.allclose(q_ref, q_sww, atol=1.0e-6), msg
247           
248        # Cleanup
249        os.remove(swwfile)
250       
251       
252
253    def test_weed(self):
254        coordinates1 = [[0.,0.],[1.,0.],[1.,1.],[1.,0.],[2.,0.],[1.,1.]]
255        volumes1 = [[0,1,2],[3,4,5]]
256        boundary1= {(0,1): 'external',(1,2): 'not external',(2,0): 'external',(3,4): 'external',(4,5): 'external',(5,3): 'not external'}
257        coordinates2,volumes2,boundary2=weed(coordinates1,volumes1,boundary1)
258
259        points2 = {(0.,0.):None,(1.,0.):None,(1.,1.):None,(2.,0.):None}
260
261        assert len(points2)==len(coordinates2)
262        for i in range(len(coordinates2)):
263            coordinate = tuple(coordinates2[i])
264            assert points2.has_key(coordinate)
265            points2[coordinate]=i
266
267        for triangle in volumes1:
268            for coordinate in triangle:
269                assert coordinates2[points2[tuple(coordinates1[coordinate])]][0]==coordinates1[coordinate][0]
270                assert coordinates2[points2[tuple(coordinates1[coordinate])]][1]==coordinates1[coordinate][1]
271
272
273    def test_triangulation(self):
274        #
275       
276       
277        filename = tempfile.mktemp("_data_manager.sww")
278        outfile = NetCDFFile(filename, netcdf_mode_w)
279        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
280        volumes = (0,1,2)
281        elevation = [0,1,2]
282        new_origin = None
283        new_origin = Geo_reference(56, 0, 0)
284        times = [0, 10]
285        number_of_volumes = len(volumes)
286        number_of_points = len(points_utm)
287        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])
288        sww.store_header(outfile, times, number_of_volumes,
289                         number_of_points, description='fully sick testing',
290                         verbose=self.verbose,sww_precision=netcdf_float)
291        sww.store_triangulation(outfile, points_utm, volumes,
292                                elevation,  new_origin=new_origin,
293                                verbose=self.verbose)       
294        outfile.close()
295        fid = NetCDFFile(filename)
296
297        x = fid.variables['x'][:]
298        y = fid.variables['y'][:]
299        fid.close()
300
301        assert num.allclose(num.array(map(None, x,y)), points_utm)
302        os.remove(filename)
303
304       
305    def test_triangulationII(self):
306        #
307       
308
309        DEFAULT_ZONE = 0 # Not documented anywhere what this should be.
310       
311        filename = tempfile.mktemp("_data_manager.sww")
312        outfile = NetCDFFile(filename, netcdf_mode_w)
313        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
314        volumes = (0,1,2)
315        elevation = [0,1,2]
316        new_origin = None
317        #new_origin = Geo_reference(56, 0, 0)
318        times = [0, 10]
319        number_of_volumes = len(volumes)
320        number_of_points = len(points_utm)
321        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
322        sww.store_header(outfile, times, number_of_volumes,
323                         number_of_points, description='fully sick testing',
324                         verbose=self.verbose,sww_precision=netcdf_float)
325        sww.store_triangulation(outfile, points_utm, volumes,
326                                new_origin=new_origin,
327                                verbose=self.verbose)
328        sww.store_static_quantities(outfile, elevation=elevation)                               
329                               
330        outfile.close()
331        fid = NetCDFFile(filename)
332
333        x = fid.variables['x'][:]
334        y = fid.variables['y'][:]
335        results_georef = Geo_reference()
336        results_georef.read_NetCDF(fid)
337        assert results_georef == Geo_reference(DEFAULT_ZONE, 0, 0)
338        fid.close()
339
340        assert num.allclose(num.array(map(None, x,y)), points_utm)
341        os.remove(filename)
342
343       
344    def test_triangulation_new_origin(self):
345        #
346       
347       
348        filename = tempfile.mktemp("_data_manager.sww")
349        outfile = NetCDFFile(filename, netcdf_mode_w)
350        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
351        volumes = (0,1,2)
352        elevation = [0,1,2]
353        new_origin = None
354        new_origin = Geo_reference(56, 1, 554354)
355        points_utm = new_origin.change_points_geo_ref(points_utm)
356        times = [0, 10]
357        number_of_volumes = len(volumes)
358        number_of_points = len(points_utm)
359        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
360        sww.store_header(outfile, times, number_of_volumes,
361                         number_of_points, description='fully sick testing',
362                         verbose=self.verbose,sww_precision=netcdf_float)
363        sww.store_triangulation(outfile, points_utm, volumes,
364                                elevation,  new_origin=new_origin,
365                                verbose=self.verbose)
366        outfile.close()
367        fid = NetCDFFile(filename)
368
369        x = fid.variables['x'][:]
370        y = fid.variables['y'][:]
371        results_georef = Geo_reference()
372        results_georef.read_NetCDF(fid)
373        assert results_georef == new_origin
374        fid.close()
375
376        absolute = Geo_reference(56, 0,0)
377        assert num.allclose(num.array(
378            absolute.change_points_geo_ref(map(None, x,y),
379                                           new_origin)),points_utm)
380       
381        os.remove(filename)
382       
383    def test_triangulation_points_georeference(self):
384        #
385       
386       
387        filename = tempfile.mktemp("_data_manager.sww")
388        outfile = NetCDFFile(filename, netcdf_mode_w)
389        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
390        volumes = (0,1,2)
391        elevation = [0,1,2]
392        new_origin = None
393        points_georeference = Geo_reference(56, 1, 554354)
394        points_utm = points_georeference.change_points_geo_ref(points_utm)
395        times = [0, 10]
396        number_of_volumes = len(volumes)
397        number_of_points = len(points_utm)
398        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
399        sww.store_header(outfile, times, number_of_volumes,
400                         number_of_points, description='fully sick testing',
401                         verbose=self.verbose,sww_precision=netcdf_float)
402        sww.store_triangulation(outfile, points_utm, volumes,
403                                elevation,  new_origin=new_origin,
404                                points_georeference=points_georeference,
405                                verbose=self.verbose)       
406        outfile.close()
407        fid = NetCDFFile(filename)
408
409        x = fid.variables['x'][:]
410        y = fid.variables['y'][:]
411        results_georef = Geo_reference()
412        results_georef.read_NetCDF(fid)
413        assert results_georef == points_georeference
414        fid.close()
415
416        assert num.allclose(num.array(map(None, x,y)), points_utm)
417        os.remove(filename)
418       
419    def test_triangulation_2_geo_refs(self):
420        #
421       
422       
423        filename = tempfile.mktemp("_data_manager.sww")
424        outfile = NetCDFFile(filename, netcdf_mode_w)
425        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
426        volumes = (0,1,2)
427        elevation = [0,1,2]
428        new_origin = Geo_reference(56, 1, 1)
429        points_georeference = Geo_reference(56, 0, 0)
430        points_utm = points_georeference.change_points_geo_ref(points_utm)
431        times = [0, 10]
432        number_of_volumes = len(volumes)
433        number_of_points = len(points_utm)
434        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
435        sww.store_header(outfile, times, number_of_volumes,
436                         number_of_points, description='fully sick testing',
437                         verbose=self.verbose,sww_precision=netcdf_float)
438        sww.store_triangulation(outfile, points_utm, volumes,
439                                elevation,  new_origin=new_origin,
440                                points_georeference=points_georeference,
441                                verbose=self.verbose)       
442        outfile.close()
443        fid = NetCDFFile(filename)
444
445        x = fid.variables['x'][:]
446        y = fid.variables['y'][:]
447        results_georef = Geo_reference()
448        results_georef.read_NetCDF(fid)
449        assert results_georef == new_origin
450        fid.close()
451
452
453        absolute = Geo_reference(56, 0,0)
454        assert num.allclose(num.array(
455            absolute.change_points_geo_ref(map(None, x,y),
456                                           new_origin)),points_utm)
457        os.remove(filename)
458
459#################################################################################
460
461if __name__ == "__main__":
462    suite = unittest.makeSuite(Test_sww, 'test')
463    runner = unittest.TextTestRunner(verbosity=1)
464    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.