source: anuga_core/source/anuga/abstract_2d_finite_volumes/test_gauge.py @ 7675

Last change on this file since 7675 was 7675, checked in by hudson, 14 years ago

Ticket 113 is complete, and all tests pass.
A centroid list is built by Interpolation_function as it calculates the interpolation matrix, and this is passed out as extra quantities which are output into the gauge.csv file.

File size: 21.0 KB
RevLine 
[7672]1#!/usr/bin/env python
2
3
4import unittest
5from math import sqrt, pi
6import tempfile, os
7from os import access, F_OK,sep, removedirs,remove,mkdir,getcwd
8
9#from anuga.abstract_2d_finite_volumes.util import *
10from anuga.abstract_2d_finite_volumes.gauge import *
11from anuga.config import epsilon
12from anuga.shallow_water.data_manager import timefile2netcdf,del_dir
13
14from anuga.utilities.numerical_tools import NAN,mean
15
16from sys import platform
17
18from anuga.pmesh.mesh import Mesh
19from anuga.shallow_water import Domain, Transmissive_boundary
20from anuga.shallow_water.data_manager import SWW_file
21from csv import reader,writer
22import time
23import string
24
25import numpy as num
26
27
28def test_function(x, y):
29    return x+y
30
31class Test_Util(unittest.TestCase):
32    def setUp(self):
33        pass
34
35    def tearDown(self):
36        pass
37
38    def test_sww2csv(self):
39
40        def elevation_function(x, y):
41            return -x
42       
43        """Most of this test was copied from test_interpolate
44        test_interpole_sww2csv
45       
46        This is testing the gauge_sww2csv function, by creating a sww file and
47        then exporting the gauges and checking the results.
48        """
49       
50        # Create mesh
51        mesh_file = tempfile.mktemp(".tsh")   
52        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
53        m = Mesh()
54        m.add_vertices(points)
55        m.auto_segment()
56        m.generate_mesh(verbose=False)
57        m.export_mesh_file(mesh_file)
58       
59        # Create shallow water domain
60        domain = Domain(mesh_file)
61        os.remove(mesh_file)
62       
63        domain.default_order=2
64       
65        # This test was made before tight_slope_limiters were introduced
66        # Since were are testing interpolation values this is OK
67        domain.tight_slope_limiters = 0 
68       
69
70        # Set some field values
71        domain.set_quantity('elevation', elevation_function)
72        domain.set_quantity('friction', 0.03)
73        domain.set_quantity('xmomentum', 3.0)
74        domain.set_quantity('ymomentum', 4.0)
75
76        ######################
77        # Boundary conditions
78        B = Transmissive_boundary(domain)
79        domain.set_boundary( {'exterior': B})
80
81        # This call mangles the stage values.
82        domain.distribute_to_vertices_and_edges()
83        domain.set_quantity('stage', 1.0)
84
85
86        domain.set_name('datatest' + str(time.time()))
87        domain.smooth = True
88        domain.reduction = mean
89
90
91        sww = SWW_file(domain)
92        sww.store_connectivity()
93        sww.store_timestep()
94        domain.set_quantity('stage', 10.0) # This is automatically limited
95        # so it will not be less than the elevation
96        domain.time = 2.
97        sww.store_timestep()
98
99        # test the function
100        points = [[5.0,1.],[0.5,2.]]
101
102        points_file = tempfile.mktemp(".csv")
103#        points_file = 'test_point.csv'
104        file_id = open(points_file,"w")
105        file_id.write("name, easting, northing, elevation \n\
106point1, 5.0, 1.0, 3.0\n\
107point2, 0.5, 2.0, 9.0\n")
108        file_id.close()
109
110       
111        gauge_sww2csv(sww.filename, 
112                       points_file,
113                       verbose=False,
114                       use_cache=False)
115
116#        point1_answers_array = [[0.0,1.0,-5.0,3.0,4.0], [2.0,10.0,-5.0,3.0,4.0]]
117        point1_answers_array = [[0.0,0.0,1.0,6.0,-5.0,3.0,4.0], [2.0,2.0/3600.,10.0,15.0,-5.0,3.0,4.0]]
118        point1_filename = 'gauge_point1.csv'
119        point1_handle = file(point1_filename)
120        point1_reader = reader(point1_handle)
121        point1_reader.next()
122
123        line=[]
124        for i,row in enumerate(point1_reader):
125#            print 'i',i,'row',row
126            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
127                         float(row[4]),float(row[5]),float(row[6])])
128#            print 'assert line',line[i],'point1',point1_answers_array[i]
129            assert num.allclose(line[i], point1_answers_array[i])
130
131        point2_answers_array = [[0.0,0.0,1.0,1.5,-0.5,3.0,4.0], [2.0,2.0/3600.,10.0,10.5,-0.5,3.0,4.0]]
132        point2_filename = 'gauge_point2.csv' 
133        point2_handle = file(point2_filename)
134        point2_reader = reader(point2_handle)
135        point2_reader.next()
136                       
137        line=[]
138        for i,row in enumerate(point2_reader):
139#            print 'i',i,'row',row
140            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
141                         float(row[4]),float(row[5]),float(row[6])])
142#            print 'assert line',line[i],'point1',point1_answers_array[i]
143            assert num.allclose(line[i], point2_answers_array[i])
144                         
145        # clean up
146        point1_handle.close()
147        point2_handle.close()
148        #print "sww.filename",sww.filename
149        os.remove(sww.filename)
150        os.remove(points_file)
151        os.remove(point1_filename)
152        os.remove(point2_filename)
153
154
155    def test_sww2csv_gauges1(self):
156        from anuga.pmesh.mesh import Mesh
157        from anuga.shallow_water import Domain, Transmissive_boundary
158        from csv import reader,writer
159        import time
160        import string
161
162        def elevation_function(x, y):
163            return -x
164       
165        """Most of this test was copied from test_interpolate
166        test_interpole_sww2csv
167       
168        This is testing the gauge_sww2csv function, by creating a sww file and
169        then exporting the gauges and checking the results.
170       
171        This tests the ablity not to have elevation in the points file and
172        not store xmomentum and ymomentum
173        """
174       
175        # Create mesh
176        mesh_file = tempfile.mktemp(".tsh")   
177        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
178        m = Mesh()
179        m.add_vertices(points)
180        m.auto_segment()
181        m.generate_mesh(verbose=False)
182        m.export_mesh_file(mesh_file)
183       
184        # Create shallow water domain
185        domain = Domain(mesh_file)
186        os.remove(mesh_file)
187       
188        domain.default_order=2
189
190        # Set some field values
191        domain.set_quantity('elevation', elevation_function)
192        domain.set_quantity('friction', 0.03)
193        domain.set_quantity('xmomentum', 3.0)
194        domain.set_quantity('ymomentum', 4.0)
195
196        ######################
197        # Boundary conditions
198        B = Transmissive_boundary(domain)
199        domain.set_boundary( {'exterior': B})
200
201        # This call mangles the stage values.
202        domain.distribute_to_vertices_and_edges()
203        domain.set_quantity('stage', 1.0)
204
205
206        domain.set_name('datatest' + str(time.time()))
207        domain.smooth = True
208        domain.reduction = mean
209
210        sww = SWW_file(domain)
211        sww.store_connectivity()
212        sww.store_timestep()
213        domain.set_quantity('stage', 10.0) # This is automatically limited
214        # so it will not be less than the elevation
215        domain.time = 2.
216        sww.store_timestep()
217
218        # test the function
219        points = [[5.0,1.],[0.5,2.]]
220
221        points_file = tempfile.mktemp(".csv")
222#        points_file = 'test_point.csv'
223        file_id = open(points_file,"w")
224        file_id.write("name,easting,northing \n\
225point1, 5.0, 1.0\n\
226point2, 0.5, 2.0\n")
227        file_id.close()
228
229        gauge_sww2csv(sww.filename, 
230                            points_file,
231                            quantities=['stage', 'elevation'],
232                            use_cache=False,
233                            verbose=False)
234
235        point1_answers_array = [[0.0,1.0,-5.0], [2.0,10.0,-5.0]]
236        point1_filename = 'gauge_point1.csv'
237        point1_handle = file(point1_filename)
238        point1_reader = reader(point1_handle)
239        point1_reader.next()
240
241        line=[]
242        for i,row in enumerate(point1_reader):
243#            print 'i',i,'row',row
244            # note the 'hole' (element 1) below - skip the new 'hours' field
245            line.append([float(row[0]),float(row[2]),float(row[3])])
246            #print 'line',line[i],'point1',point1_answers_array[i]
247            assert num.allclose(line[i], point1_answers_array[i])
248
249        point2_answers_array = [[0.0,1.0,-0.5], [2.0,10.0,-0.5]]
250        point2_filename = 'gauge_point2.csv' 
251        point2_handle = file(point2_filename)
252        point2_reader = reader(point2_handle)
253        point2_reader.next()
254                       
255        line=[]
256        for i,row in enumerate(point2_reader):
257#            print 'i',i,'row',row
258            # note the 'hole' (element 1) below - skip the new 'hours' field
259            line.append([float(row[0]),float(row[2]),float(row[3])])
260#            print 'line',line[i],'point1',point1_answers_array[i]
261            assert num.allclose(line[i], point2_answers_array[i])
262                         
263        # clean up
264        point1_handle.close()
265        point2_handle.close()
266        #print "sww.filename",sww.filename
267        os.remove(sww.filename)
268        os.remove(points_file)
269        os.remove(point1_filename)
270        os.remove(point2_filename)
271
272
273    def test_sww2csv_gauges2(self):
274
275        def elevation_function(x, y):
276            return -x
277       
278        """Most of this test was copied from test_interpolate
279        test_interpole_sww2csv
280       
281        This is testing the gauge_sww2csv function, by creating a sww file and
282        then exporting the gauges and checking the results.
283       
284        This is the same as sww2csv_gauges except set domain.set_starttime to 5.
285        Therefore testing the storing of the absolute time in the csv files
286        """
287       
288        # Create mesh
289        mesh_file = tempfile.mktemp(".tsh")   
290        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
291        m = Mesh()
292        m.add_vertices(points)
293        m.auto_segment()
294        m.generate_mesh(verbose=False)
295        m.export_mesh_file(mesh_file)
296       
297        # Create shallow water domain
298        domain = Domain(mesh_file)
299        os.remove(mesh_file)
300       
301        domain.default_order=2
302
303        # This test was made before tight_slope_limiters were introduced
304        # Since were are testing interpolation values this is OK
305        domain.tight_slope_limiters = 0         
306
307        # Set some field values
308        domain.set_quantity('elevation', elevation_function)
309        domain.set_quantity('friction', 0.03)
310        domain.set_quantity('xmomentum', 3.0)
311        domain.set_quantity('ymomentum', 4.0)
312        domain.set_starttime(5)
313
314        ######################
315        # Boundary conditions
316        B = Transmissive_boundary(domain)
317        domain.set_boundary( {'exterior': B})
318
319        # This call mangles the stage values.
320        domain.distribute_to_vertices_and_edges()
321        domain.set_quantity('stage', 1.0)
322       
323
324
325        domain.set_name('datatest' + str(time.time()))
326        domain.smooth = True
327        domain.reduction = mean
328
329        sww = SWW_file(domain)
330        sww.store_connectivity()
331        sww.store_timestep()
332        domain.set_quantity('stage', 10.0) # This is automatically limited
333        # so it will not be less than the elevation
334        domain.time = 2.
335        sww.store_timestep()
336
337        # test the function
338        points = [[5.0,1.],[0.5,2.]]
339
340        points_file = tempfile.mktemp(".csv")
341#        points_file = 'test_point.csv'
342        file_id = open(points_file,"w")
343        file_id.write("name, easting, northing, elevation \n\
344point1, 5.0, 1.0, 3.0\n\
345point2, 0.5, 2.0, 9.0\n")
346        file_id.close()
347
348       
349        gauge_sww2csv(sww.filename, 
350                            points_file,
351                            verbose=False,
352                            use_cache=False)
353
354#        point1_answers_array = [[0.0,1.0,-5.0,3.0,4.0], [2.0,10.0,-5.0,3.0,4.0]]
355        point1_answers_array = [[5.0,5.0/3600.,1.0,6.0,-5.0,3.0,4.0], [7.0,7.0/3600.,10.0,15.0,-5.0,3.0,4.0]]
356        point1_filename = 'gauge_point1.csv'
357        point1_handle = file(point1_filename)
358        point1_reader = reader(point1_handle)
359        point1_reader.next()
360
361        line=[]
362        for i,row in enumerate(point1_reader):
363            #print 'i',i,'row',row
364            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
365                         float(row[4]), float(row[5]), float(row[6])])
366            #print 'assert line',line[i],'point1',point1_answers_array[i]
367            assert num.allclose(line[i], point1_answers_array[i])
368
369        point2_answers_array = [[5.0,5.0/3600.,1.0,1.5,-0.5,3.0,4.0], [7.0,7.0/3600.,10.0,10.5,-0.5,3.0,4.0]]
370        point2_filename = 'gauge_point2.csv' 
371        point2_handle = file(point2_filename)
372        point2_reader = reader(point2_handle)
373        point2_reader.next()
374                       
375        line=[]
376        for i,row in enumerate(point2_reader):
377            #print 'i',i,'row',row
378            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
379                         float(row[4]),float(row[5]), float(row[6])])
380            #print 'assert line',line[i],'point1',point1_answers_array[i]
381            assert num.allclose(line[i], point2_answers_array[i])
382                         
383        # clean up
384        point1_handle.close()
385        point2_handle.close()
386        #print "sww.filename",sww.filename
387        os.remove(sww.filename)
388        os.remove(points_file)
389        os.remove(point1_filename)
390        os.remove(point2_filename)
391               
392
393    def test_sww2csv_centroid(self):
394
395        def elevation_function(x, y):
396            return -x
397       
398        """Check sww2csv timeseries at centroid.
399       
400        Test the ability to get a timeseries at the centroid of a triangle, rather
401                than the given gauge point.
402        """
403       
404        # Create rectangular mesh
405        mesh_file = tempfile.mktemp(".tsh")   
406        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
407        m = Mesh()
408        m.add_vertices(points)
409        m.auto_segment()
410        m.generate_mesh(verbose=False)
411        m.export_mesh_file(mesh_file)
412       
413        # Create shallow water domain
414        domain = Domain(mesh_file)
415       
416        domain.default_order=2
417       
418        # This test was made before tight_slope_limiters were introduced
419        # Since were are testing interpolation values this is OK
420        domain.tight_slope_limiters = 0 
421       
422
423        # Set some field values
424        domain.set_quantity('elevation', elevation_function)
425        domain.set_quantity('friction', 0.03)
426        domain.set_quantity('xmomentum', 3.0)
427        domain.set_quantity('ymomentum', 4.0)
428
429        ######################
430        # Boundary conditions
431        B = Transmissive_boundary(domain)
432        domain.set_boundary( {'exterior': B})
433
434        # This call mangles the stage values.
435        domain.distribute_to_vertices_and_edges()
436        domain.set_quantity('stage', 1.0)
437
438
439        domain.set_name('datatest' + str(time.time()))
440        domain.smooth = True
441        domain.reduction = mean
442
443
444        sww = SWW_file(domain)
445        sww.store_connectivity()
446        sww.store_timestep()
447        domain.set_quantity('stage', 10.0) # This is automatically limited
448        # so it will not be less than the elevation
449        domain.time = 2.
450        sww.store_timestep()
451
452        # create a csv file containing our gauge points
453        points_file = tempfile.mktemp(".csv")
454        file_id = open(points_file,"w")
[7673]455# These values are where the centroids should be               
[7672]456#        file_id.write("name, easting, northing, elevation \n\
[7673]457#point1, 2.0, 2.0, 3.0\n\
458#point2, 4.0, 4.0, 9.0\n")
[7672]459 
[7673]460# These values are slightly off the centroids - will it find the centroids?
[7672]461        file_id.write("name, easting, northing, elevation \n\
462point1, 2.0, 1.0, 3.0\n\
[7673]463point2, 4.5, 4.0, 9.0\n")
[7672]464
465 
466        file_id.close()
467
468        gauge_sww2csv(sww.filename, 
469                       points_file,
470                       verbose=False,
471                       use_cache=False,
472                       output_centroids=True)
473
[7673]474        point1_answers_array = [[0.0,0.0,1.0,3.0,-2.0,3.0,4.0], [2.0,2.0/3600.,10.0,12.0,-2.0,3.0,4.0]]
[7672]475        point1_filename = 'gauge_point1.csv'
476        point1_handle = file(point1_filename)
477        point1_reader = reader(point1_handle)
478        point1_reader.next()
479
480        line=[]
481        for i,row in enumerate(point1_reader):
482            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
483                         float(row[4]),float(row[5]),float(row[6])])
484#           print 'assert line',line[i],'point1',point1_answers_array[i]
485            assert num.allclose(line[i], point1_answers_array[i])
486
[7673]487        point2_answers_array = [[0.0,0.0,1.0,5.0,-4.0,3.0,4.0], [2.0,2.0/3600.,10.0,14.0,-4.0,3.0,4.0]]
[7672]488        point2_filename = 'gauge_point2.csv' 
489        point2_handle = file(point2_filename)
490        point2_reader = reader(point2_handle)
491        point2_reader.next()
492                       
493        line=[]
494        for i,row in enumerate(point2_reader):
495            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
496                         float(row[4]),float(row[5]),float(row[6])])
[7673]497#           print i, 'assert line',line[i],'point2',point2_answers_array[i]
[7672]498            assert num.allclose(line[i], point2_answers_array[i])
499                         
500        # clean up
501        point1_handle.close()
502        point2_handle.close()
503        os.remove(mesh_file)           
504        os.remove(sww.filename)
505        os.remove(points_file)
506        os.remove(point1_filename)
507        os.remove(point2_filename)
508
509
[7675]510    def test_sww2csv_output_centroid_attribute(self):
[7672]511
[7675]512        def elevation_function(x, y):
513            return -x
514       
515        """Check sww2csv timeseries at centroid, then output the centroid coordinates.
516       
517        Test the ability to get a timeseries at the centroid of a triangle, rather
518                than the given gauge point, then output the results.
519        """
520       
521        # Create rectangular mesh
522        mesh_file = tempfile.mktemp(".tsh")   
523        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
524        m = Mesh()
525        m.add_vertices(points)
526        m.auto_segment()
527        m.generate_mesh(verbose=False)
528        m.export_mesh_file(mesh_file)
529       
530        # Create shallow water domain
531        domain = Domain(mesh_file)
532       
533        domain.default_order=2
534       
535        # This test was made before tight_slope_limiters were introduced
536        # Since were are testing interpolation values this is OK
537        domain.tight_slope_limiters = 0 
538       
539
540        # Set some field values
541        domain.set_quantity('elevation', elevation_function)
542        domain.set_quantity('friction', 0.03)
543
544        ######################
545        # Boundary conditions
546        B = Transmissive_boundary(domain)
547        domain.set_boundary( {'exterior': B})
548
549        # This call mangles the stage values.
550        domain.distribute_to_vertices_and_edges()
551        domain.set_quantity('stage', 1.0)
552
553
554        domain.set_name('datatest' + str(time.time()))
555        domain.smooth = True
556        domain.reduction = mean
557
558
559        sww = SWW_file(domain)
560        sww.store_connectivity()
561        sww.store_timestep()
562        domain.set_quantity('stage', 10.0) # This is automatically limited
563        # so it will not be less than the elevation
564        domain.time = 2.
565        sww.store_timestep()
566
567        # create a csv file containing our gauge points
568        points_file = tempfile.mktemp(".csv")
569        file_id = open(points_file,"w")
570 
571# These values are slightly off the centroids - will it find the centroids?
572        file_id.write("name, easting, northing, elevation \n\
573point1, 2.5, 4.25, 3.0\n")
574
575        file_id.close()
576
577        gauge_sww2csv(sww.filename, 
578                       points_file,
579                       quantities=['stage', 'xcentroid', 'ycentroid'],
580                       verbose=False,
581                       use_cache=False,
582                       output_centroids=True)
583
584        point1_answers_array = [[0.0,0.0,1.0,4.0,4.0], [2.0,2.0/3600.,10.0,4.0,4.0]]
585        point1_filename = 'gauge_point1.csv'
586        point1_handle = file(point1_filename)
587        point1_reader = reader(point1_handle)
588        point1_reader.next()
589
590        line=[]
591        for i,row in enumerate(point1_reader):
592            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),float(row[4])])
593#            print 'assert line',line[i],'point1',point1_answers_array[i]
594            assert num.allclose(line[i], point1_answers_array[i])
595
596        # clean up
597        point1_handle.close()
598        os.remove(mesh_file)           
599        os.remove(sww.filename)
600        os.remove(points_file)
601        os.remove(point1_filename)
602               
603               
604
[7672]605#-------------------------------------------------------------
606
607if __name__ == "__main__":
608    suite = unittest.makeSuite(Test_Util, 'test')
609#    runner = unittest.TextTestRunner(verbosity=2)
610    runner = unittest.TextTestRunner(verbosity=1)
611    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.