source: anuga_core/source/anuga/abstract_2d_finite_volumes/test_gauge.py @ 7673

Last change on this file since 7673 was 7673, checked in by hudson, 13 years ago

Ticket 113 - added an output_centroids parameter to allow centroid data to be written to gauges, rather than the data at the sampled point.

Added 2 new unit tests for this functionality.

File size: 17.9 KB
Line 
1#!/usr/bin/env python
2
3
4import unittest
5from math import sqrt, pi
6import tempfile, os
7from os import access, F_OK,sep, removedirs,remove,mkdir,getcwd
8
9#from anuga.abstract_2d_finite_volumes.util import *
10from anuga.abstract_2d_finite_volumes.gauge import *
11from anuga.config import epsilon
12from anuga.shallow_water.data_manager import timefile2netcdf,del_dir
13
14from anuga.utilities.numerical_tools import NAN,mean
15
16from sys import platform
17
18from anuga.pmesh.mesh import Mesh
19from anuga.shallow_water import Domain, Transmissive_boundary
20from anuga.shallow_water.data_manager import SWW_file
21from csv import reader,writer
22import time
23import string
24
25import numpy as num
26
27
28def test_function(x, y):
29    return x+y
30
31class Test_Util(unittest.TestCase):
32    def setUp(self):
33        pass
34
35    def tearDown(self):
36        pass
37
38    def test_sww2csv(self):
39
40        def elevation_function(x, y):
41            return -x
42       
43        """Most of this test was copied from test_interpolate
44        test_interpole_sww2csv
45       
46        This is testing the gauge_sww2csv function, by creating a sww file and
47        then exporting the gauges and checking the results.
48        """
49       
50        # Create mesh
51        mesh_file = tempfile.mktemp(".tsh")   
52        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
53        m = Mesh()
54        m.add_vertices(points)
55        m.auto_segment()
56        m.generate_mesh(verbose=False)
57        m.export_mesh_file(mesh_file)
58       
59        # Create shallow water domain
60        domain = Domain(mesh_file)
61        os.remove(mesh_file)
62       
63        domain.default_order=2
64       
65        # This test was made before tight_slope_limiters were introduced
66        # Since were are testing interpolation values this is OK
67        domain.tight_slope_limiters = 0 
68       
69
70        # Set some field values
71        domain.set_quantity('elevation', elevation_function)
72        domain.set_quantity('friction', 0.03)
73        domain.set_quantity('xmomentum', 3.0)
74        domain.set_quantity('ymomentum', 4.0)
75
76        ######################
77        # Boundary conditions
78        B = Transmissive_boundary(domain)
79        domain.set_boundary( {'exterior': B})
80
81        # This call mangles the stage values.
82        domain.distribute_to_vertices_and_edges()
83        domain.set_quantity('stage', 1.0)
84
85
86        domain.set_name('datatest' + str(time.time()))
87        domain.smooth = True
88        domain.reduction = mean
89
90
91        sww = SWW_file(domain)
92        sww.store_connectivity()
93        sww.store_timestep()
94        domain.set_quantity('stage', 10.0) # This is automatically limited
95        # so it will not be less than the elevation
96        domain.time = 2.
97        sww.store_timestep()
98
99        # test the function
100        points = [[5.0,1.],[0.5,2.]]
101
102        points_file = tempfile.mktemp(".csv")
103#        points_file = 'test_point.csv'
104        file_id = open(points_file,"w")
105        file_id.write("name, easting, northing, elevation \n\
106point1, 5.0, 1.0, 3.0\n\
107point2, 0.5, 2.0, 9.0\n")
108        file_id.close()
109
110       
111        gauge_sww2csv(sww.filename, 
112                       points_file,
113                       verbose=False,
114                       use_cache=False)
115
116#        point1_answers_array = [[0.0,1.0,-5.0,3.0,4.0], [2.0,10.0,-5.0,3.0,4.0]]
117        point1_answers_array = [[0.0,0.0,1.0,6.0,-5.0,3.0,4.0], [2.0,2.0/3600.,10.0,15.0,-5.0,3.0,4.0]]
118        point1_filename = 'gauge_point1.csv'
119        point1_handle = file(point1_filename)
120        point1_reader = reader(point1_handle)
121        point1_reader.next()
122
123        line=[]
124        for i,row in enumerate(point1_reader):
125#            print 'i',i,'row',row
126            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
127                         float(row[4]),float(row[5]),float(row[6])])
128#            print 'assert line',line[i],'point1',point1_answers_array[i]
129            assert num.allclose(line[i], point1_answers_array[i])
130
131        point2_answers_array = [[0.0,0.0,1.0,1.5,-0.5,3.0,4.0], [2.0,2.0/3600.,10.0,10.5,-0.5,3.0,4.0]]
132        point2_filename = 'gauge_point2.csv' 
133        point2_handle = file(point2_filename)
134        point2_reader = reader(point2_handle)
135        point2_reader.next()
136                       
137        line=[]
138        for i,row in enumerate(point2_reader):
139#            print 'i',i,'row',row
140            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
141                         float(row[4]),float(row[5]),float(row[6])])
142#            print 'assert line',line[i],'point1',point1_answers_array[i]
143            assert num.allclose(line[i], point2_answers_array[i])
144                         
145        # clean up
146        point1_handle.close()
147        point2_handle.close()
148        #print "sww.filename",sww.filename
149        os.remove(sww.filename)
150        os.remove(points_file)
151        os.remove(point1_filename)
152        os.remove(point2_filename)
153
154
155    def test_sww2csv_gauges1(self):
156        from anuga.pmesh.mesh import Mesh
157        from anuga.shallow_water import Domain, Transmissive_boundary
158        from csv import reader,writer
159        import time
160        import string
161
162        def elevation_function(x, y):
163            return -x
164       
165        """Most of this test was copied from test_interpolate
166        test_interpole_sww2csv
167       
168        This is testing the gauge_sww2csv function, by creating a sww file and
169        then exporting the gauges and checking the results.
170       
171        This tests the ablity not to have elevation in the points file and
172        not store xmomentum and ymomentum
173        """
174       
175        # Create mesh
176        mesh_file = tempfile.mktemp(".tsh")   
177        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
178        m = Mesh()
179        m.add_vertices(points)
180        m.auto_segment()
181        m.generate_mesh(verbose=False)
182        m.export_mesh_file(mesh_file)
183       
184        # Create shallow water domain
185        domain = Domain(mesh_file)
186        os.remove(mesh_file)
187       
188        domain.default_order=2
189
190        # Set some field values
191        domain.set_quantity('elevation', elevation_function)
192        domain.set_quantity('friction', 0.03)
193        domain.set_quantity('xmomentum', 3.0)
194        domain.set_quantity('ymomentum', 4.0)
195
196        ######################
197        # Boundary conditions
198        B = Transmissive_boundary(domain)
199        domain.set_boundary( {'exterior': B})
200
201        # This call mangles the stage values.
202        domain.distribute_to_vertices_and_edges()
203        domain.set_quantity('stage', 1.0)
204
205
206        domain.set_name('datatest' + str(time.time()))
207        domain.smooth = True
208        domain.reduction = mean
209
210        sww = SWW_file(domain)
211        sww.store_connectivity()
212        sww.store_timestep()
213        domain.set_quantity('stage', 10.0) # This is automatically limited
214        # so it will not be less than the elevation
215        domain.time = 2.
216        sww.store_timestep()
217
218        # test the function
219        points = [[5.0,1.],[0.5,2.]]
220
221        points_file = tempfile.mktemp(".csv")
222#        points_file = 'test_point.csv'
223        file_id = open(points_file,"w")
224        file_id.write("name,easting,northing \n\
225point1, 5.0, 1.0\n\
226point2, 0.5, 2.0\n")
227        file_id.close()
228
229        gauge_sww2csv(sww.filename, 
230                            points_file,
231                            quantities=['stage', 'elevation'],
232                            use_cache=False,
233                            verbose=False)
234
235        point1_answers_array = [[0.0,1.0,-5.0], [2.0,10.0,-5.0]]
236        point1_filename = 'gauge_point1.csv'
237        point1_handle = file(point1_filename)
238        point1_reader = reader(point1_handle)
239        point1_reader.next()
240
241        line=[]
242        for i,row in enumerate(point1_reader):
243#            print 'i',i,'row',row
244            # note the 'hole' (element 1) below - skip the new 'hours' field
245            line.append([float(row[0]),float(row[2]),float(row[3])])
246            #print 'line',line[i],'point1',point1_answers_array[i]
247            assert num.allclose(line[i], point1_answers_array[i])
248
249        point2_answers_array = [[0.0,1.0,-0.5], [2.0,10.0,-0.5]]
250        point2_filename = 'gauge_point2.csv' 
251        point2_handle = file(point2_filename)
252        point2_reader = reader(point2_handle)
253        point2_reader.next()
254                       
255        line=[]
256        for i,row in enumerate(point2_reader):
257#            print 'i',i,'row',row
258            # note the 'hole' (element 1) below - skip the new 'hours' field
259            line.append([float(row[0]),float(row[2]),float(row[3])])
260#            print 'line',line[i],'point1',point1_answers_array[i]
261            assert num.allclose(line[i], point2_answers_array[i])
262                         
263        # clean up
264        point1_handle.close()
265        point2_handle.close()
266        #print "sww.filename",sww.filename
267        os.remove(sww.filename)
268        os.remove(points_file)
269        os.remove(point1_filename)
270        os.remove(point2_filename)
271
272
273    def test_sww2csv_gauges2(self):
274
275        def elevation_function(x, y):
276            return -x
277       
278        """Most of this test was copied from test_interpolate
279        test_interpole_sww2csv
280       
281        This is testing the gauge_sww2csv function, by creating a sww file and
282        then exporting the gauges and checking the results.
283       
284        This is the same as sww2csv_gauges except set domain.set_starttime to 5.
285        Therefore testing the storing of the absolute time in the csv files
286        """
287       
288        # Create mesh
289        mesh_file = tempfile.mktemp(".tsh")   
290        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
291        m = Mesh()
292        m.add_vertices(points)
293        m.auto_segment()
294        m.generate_mesh(verbose=False)
295        m.export_mesh_file(mesh_file)
296       
297        # Create shallow water domain
298        domain = Domain(mesh_file)
299        os.remove(mesh_file)
300       
301        domain.default_order=2
302
303        # This test was made before tight_slope_limiters were introduced
304        # Since were are testing interpolation values this is OK
305        domain.tight_slope_limiters = 0         
306
307        # Set some field values
308        domain.set_quantity('elevation', elevation_function)
309        domain.set_quantity('friction', 0.03)
310        domain.set_quantity('xmomentum', 3.0)
311        domain.set_quantity('ymomentum', 4.0)
312        domain.set_starttime(5)
313
314        ######################
315        # Boundary conditions
316        B = Transmissive_boundary(domain)
317        domain.set_boundary( {'exterior': B})
318
319        # This call mangles the stage values.
320        domain.distribute_to_vertices_and_edges()
321        domain.set_quantity('stage', 1.0)
322       
323
324
325        domain.set_name('datatest' + str(time.time()))
326        domain.smooth = True
327        domain.reduction = mean
328
329        sww = SWW_file(domain)
330        sww.store_connectivity()
331        sww.store_timestep()
332        domain.set_quantity('stage', 10.0) # This is automatically limited
333        # so it will not be less than the elevation
334        domain.time = 2.
335        sww.store_timestep()
336
337        # test the function
338        points = [[5.0,1.],[0.5,2.]]
339
340        points_file = tempfile.mktemp(".csv")
341#        points_file = 'test_point.csv'
342        file_id = open(points_file,"w")
343        file_id.write("name, easting, northing, elevation \n\
344point1, 5.0, 1.0, 3.0\n\
345point2, 0.5, 2.0, 9.0\n")
346        file_id.close()
347
348       
349        gauge_sww2csv(sww.filename, 
350                            points_file,
351                            verbose=False,
352                            use_cache=False)
353
354#        point1_answers_array = [[0.0,1.0,-5.0,3.0,4.0], [2.0,10.0,-5.0,3.0,4.0]]
355        point1_answers_array = [[5.0,5.0/3600.,1.0,6.0,-5.0,3.0,4.0], [7.0,7.0/3600.,10.0,15.0,-5.0,3.0,4.0]]
356        point1_filename = 'gauge_point1.csv'
357        point1_handle = file(point1_filename)
358        point1_reader = reader(point1_handle)
359        point1_reader.next()
360
361        line=[]
362        for i,row in enumerate(point1_reader):
363            #print 'i',i,'row',row
364            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
365                         float(row[4]), float(row[5]), float(row[6])])
366            #print 'assert line',line[i],'point1',point1_answers_array[i]
367            assert num.allclose(line[i], point1_answers_array[i])
368
369        point2_answers_array = [[5.0,5.0/3600.,1.0,1.5,-0.5,3.0,4.0], [7.0,7.0/3600.,10.0,10.5,-0.5,3.0,4.0]]
370        point2_filename = 'gauge_point2.csv' 
371        point2_handle = file(point2_filename)
372        point2_reader = reader(point2_handle)
373        point2_reader.next()
374                       
375        line=[]
376        for i,row in enumerate(point2_reader):
377            #print 'i',i,'row',row
378            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
379                         float(row[4]),float(row[5]), float(row[6])])
380            #print 'assert line',line[i],'point1',point1_answers_array[i]
381            assert num.allclose(line[i], point2_answers_array[i])
382                         
383        # clean up
384        point1_handle.close()
385        point2_handle.close()
386        #print "sww.filename",sww.filename
387        os.remove(sww.filename)
388        os.remove(points_file)
389        os.remove(point1_filename)
390        os.remove(point2_filename)
391               
392
393    def test_sww2csv_centroid(self):
394
395        def elevation_function(x, y):
396            return -x
397       
398        """Check sww2csv timeseries at centroid.
399       
400        Test the ability to get a timeseries at the centroid of a triangle, rather
401                than the given gauge point.
402        """
403       
404        # Create rectangular mesh
405        mesh_file = tempfile.mktemp(".tsh")   
406        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
407        m = Mesh()
408        m.add_vertices(points)
409        m.auto_segment()
410        m.generate_mesh(verbose=False)
411        m.export_mesh_file(mesh_file)
412       
413        # Create shallow water domain
414        domain = Domain(mesh_file)
415       
416        domain.default_order=2
417       
418        # This test was made before tight_slope_limiters were introduced
419        # Since were are testing interpolation values this is OK
420        domain.tight_slope_limiters = 0 
421       
422
423        # Set some field values
424        domain.set_quantity('elevation', elevation_function)
425        domain.set_quantity('friction', 0.03)
426        domain.set_quantity('xmomentum', 3.0)
427        domain.set_quantity('ymomentum', 4.0)
428
429        ######################
430        # Boundary conditions
431        B = Transmissive_boundary(domain)
432        domain.set_boundary( {'exterior': B})
433
434        # This call mangles the stage values.
435        domain.distribute_to_vertices_and_edges()
436        domain.set_quantity('stage', 1.0)
437
438
439        domain.set_name('datatest' + str(time.time()))
440        domain.smooth = True
441        domain.reduction = mean
442
443
444        sww = SWW_file(domain)
445        sww.store_connectivity()
446        sww.store_timestep()
447        domain.set_quantity('stage', 10.0) # This is automatically limited
448        # so it will not be less than the elevation
449        domain.time = 2.
450        sww.store_timestep()
451
452        # create a csv file containing our gauge points
453        points = [[2.0,1.0],[4.5,4.0]]
454
455        points_file = tempfile.mktemp(".csv")
456        file_id = open(points_file,"w")
457# These values are where the centroids should be               
458#        file_id.write("name, easting, northing, elevation \n\
459#point1, 2.0, 2.0, 3.0\n\
460#point2, 4.0, 4.0, 9.0\n")
461 
462# These values are slightly off the centroids - will it find the centroids?
463        file_id.write("name, easting, northing, elevation \n\
464point1, 2.0, 1.0, 3.0\n\
465point2, 4.5, 4.0, 9.0\n")
466
467 
468        file_id.close()
469
470        gauge_sww2csv(sww.filename, 
471                       points_file,
472                       verbose=False,
473                       use_cache=False,
474                       output_centroids=True)
475
476        point1_answers_array = [[0.0,0.0,1.0,3.0,-2.0,3.0,4.0], [2.0,2.0/3600.,10.0,12.0,-2.0,3.0,4.0]]
477        point1_filename = 'gauge_point1.csv'
478        point1_handle = file(point1_filename)
479        point1_reader = reader(point1_handle)
480        point1_reader.next()
481
482        line=[]
483        for i,row in enumerate(point1_reader):
484            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
485                         float(row[4]),float(row[5]),float(row[6])])
486#           print 'assert line',line[i],'point1',point1_answers_array[i]
487            assert num.allclose(line[i], point1_answers_array[i])
488
489        point2_answers_array = [[0.0,0.0,1.0,5.0,-4.0,3.0,4.0], [2.0,2.0/3600.,10.0,14.0,-4.0,3.0,4.0]]
490        point2_filename = 'gauge_point2.csv' 
491        point2_handle = file(point2_filename)
492        point2_reader = reader(point2_handle)
493        point2_reader.next()
494                       
495        line=[]
496        for i,row in enumerate(point2_reader):
497            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
498                         float(row[4]),float(row[5]),float(row[6])])
499#           print i, 'assert line',line[i],'point2',point2_answers_array[i]
500            assert num.allclose(line[i], point2_answers_array[i])
501                         
502        # clean up
503        point1_handle.close()
504        point2_handle.close()
505        os.remove(mesh_file)           
506        os.remove(sww.filename)
507        os.remove(points_file)
508        os.remove(point1_filename)
509        os.remove(point2_filename)
510
511
512
513#-------------------------------------------------------------
514
515if __name__ == "__main__":
516    suite = unittest.makeSuite(Test_Util, 'test')
517#    runner = unittest.TextTestRunner(verbosity=2)
518    runner = unittest.TextTestRunner(verbosity=1)
519    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.