source: trunk/anuga_core/source/anuga/abstract_2d_finite_volumes/test_gauge.py @ 8068

Last change on this file since 8068 was 8068, checked in by habili, 13 years ago

Fixed bug where starttime and duration in the evolve function did not produce correct results. Starttime is now set in the constructor of Domain and is 0 by default. time is is to starttime. Also a bug was fixed where duration did not correctly calculate the finaltime. Associated unit tests have also been fixed to reflect the change.

File size: 19.7 KB
Line 
1#!/usr/bin/env python
2
3
4import unittest
5from math import sqrt, pi
6import tempfile, os
7from os import access, F_OK,sep, removedirs,remove,mkdir,getcwd
8
9import anuga
10#from anuga.abstract_2d_finite_volumes.util import *
11from anuga.abstract_2d_finite_volumes.gauge import *
12from anuga.config import epsilon
13
14from anuga.utilities.numerical_tools import NAN,mean
15
16from sys import platform
17
18from anuga.pmesh.mesh import Mesh
19
20from anuga.file.sww import SWW_file
21from anuga.file_conversion.file_conversion import timefile2netcdf
22from anuga.utilities.file_utils import del_dir
23from csv import reader, writer
24import time
25import string
26
27import numpy as num
28
29
30def test_function(x, y):
31    return x+y
32
33class Test_Gauge(unittest.TestCase):
34    def setUp(self):
35
36        def elevation_function(x, y):
37            return -x
38       
39        """ Setup for all tests. """
40       
41        mesh_file = tempfile.mktemp(".tsh")   
42        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
43        m = Mesh()
44        m.add_vertices(points)
45        m.auto_segment()
46        m.generate_mesh(verbose=False)
47        m.export_mesh_file(mesh_file)
48       
49        # Create shallow water domain
50        domain = anuga.Domain(mesh_file)
51        os.remove(mesh_file)
52 
53        domain.default_order = 2
54
55        # This test was made before tight_slope_limiters were introduced
56        # Since were are testing interpolation values this is OK
57        domain.tight_slope_limiters = 0
58               
59        # Set some field values
60        domain.set_quantity('elevation', elevation_function)
61        domain.set_quantity('friction', 0.03)
62        domain.set_quantity('xmomentum', 3.0)
63        domain.set_quantity('ymomentum', 4.0)
64
65        ######################
66        # Boundary conditions
67        B = anuga.Transmissive_boundary(domain)
68        domain.set_boundary( {'exterior': B})
69
70        # This call mangles the stage values.
71        domain.distribute_to_vertices_and_edges()
72        domain.set_quantity('stage', 1.0)
73
74        domain.set_name('datatest' + str(time.time()))
75        domain.smooth = True
76        domain.reduction = mean
77       
78        self.domain = domain
79       
80       
81    def tearDown(self):
82        """Called at end of each test."""
83        if self.sww:
84            os.remove(self.sww.filename)
85
86    def _create_sww(self,stage=10.0, timestep=2.0):
87        self.sww = SWW_file(self.domain)
88        self.sww.store_connectivity()
89        self.sww.store_timestep()
90        self.domain.set_quantity('stage', stage) # This is automatically limited
91        # so it will not be less than the elevation
92        self.domain.set_time(self.domain.get_time()-self.domain.starttime+timestep)
93        self.sww.store_timestep()
94       
95       
96    def test_sww2csv(self):
97
98        """Most of this test was copied from test_interpolate
99        test_interpole_sww2csv
100       
101        This is testing the sww2csv_gauges function, by creating a sww file and
102        then exporting the gauges and checking the results.
103        """
104       
105        domain = self.domain
106        self._create_sww()
107       
108        # test the function
109        points = [[5.0,1.],[0.5,2.]]
110
111        points_file = tempfile.mktemp(".csv")
112#        points_file = 'test_point.csv'
113        file_id = open(points_file,"w")
114        file_id.write("name, easting, northing, elevation \n\
115point1, 5.0, 1.0, 3.0\n\
116point2, 0.5, 2.0, 9.0\n")
117        file_id.close()
118
119       
120        sww2csv_gauges(self.sww.filename, 
121                       points_file,
122                       verbose=False,
123                       use_cache=False)
124
125#        point1_answers_array = [[0.0,1.0,-5.0,3.0,4.0], [2.0,10.0,-5.0,3.0,4.0]]
126        point1_answers_array = [[0.0,0.0,1.0,6.0,-5.0,3.0,4.0], [2.0,2.0/3600.,10.0,15.0,-5.0,3.0,4.0]]
127        point1_filename = 'gauge_point1.csv'
128        point1_handle = file(point1_filename)
129        point1_reader = reader(point1_handle)
130        point1_reader.next()
131
132        line=[]
133        for i,row in enumerate(point1_reader):
134#            print 'i',i,'row',row
135            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
136                         float(row[4]),float(row[5]),float(row[6])])
137#            print 'assert line',line[i],'point1',point1_answers_array[i]
138            assert num.allclose(line[i], point1_answers_array[i])
139
140        point2_answers_array = [[0.0,0.0,1.0,1.5,-0.5,3.0,4.0], [2.0,2.0/3600.,10.0,10.5,-0.5,3.0,4.0]]
141        point2_filename = 'gauge_point2.csv' 
142        point2_handle = file(point2_filename)
143        point2_reader = reader(point2_handle)
144        point2_reader.next()
145                       
146        line=[]
147        for i,row in enumerate(point2_reader):
148#            print 'i',i,'row',row
149            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
150                         float(row[4]),float(row[5]),float(row[6])])
151#            print 'assert line',line[i],'point1',point1_answers_array[i]
152            assert num.allclose(line[i], point2_answers_array[i])
153                         
154        # clean up
155        point1_handle.close()
156        point2_handle.close()
157        os.remove(points_file)
158        os.remove(point1_filename)
159        os.remove(point2_filename)
160
161
162    def test_sww2csv_gauges1(self):
163        from anuga.pmesh.mesh import Mesh
164        from csv import reader,writer
165        import time
166        import string
167       
168        """Most of this test was copied from test_interpolate
169        test_interpole_sww2csv
170       
171        This is testing the sww2csv_gauges function, by creating a sww file and
172        then exporting the gauges and checking the results.
173       
174        This tests the ablity not to have elevation in the points file and
175        not store xmomentum and ymomentum
176        """
177       
178        domain = self.domain
179        self._create_sww()
180       
181        # test the function
182        points = [[5.0,1.],[0.5,2.]]
183
184        points_file = tempfile.mktemp(".csv")
185#        points_file = 'test_point.csv'
186        file_id = open(points_file,"w")
187        file_id.write("name,easting,northing \n\
188point1, 5.0, 1.0\n\
189point2, 0.5, 2.0\n")
190        file_id.close()
191
192        sww2csv_gauges(self.sww.filename, 
193                            points_file,
194                            quantities=['stage', 'elevation'],
195                            use_cache=False,
196                            verbose=False)
197
198        point1_answers_array = [[0.0,1.0,-5.0], [2.0,10.0,-5.0]]
199        point1_filename = 'gauge_point1.csv'
200        point1_handle = file(point1_filename)
201        point1_reader = reader(point1_handle)
202        point1_reader.next()
203
204        line=[]
205        for i,row in enumerate(point1_reader):
206#            print 'i',i,'row',row
207            # note the 'hole' (element 1) below - skip the new 'hours' field
208            line.append([float(row[0]),float(row[2]),float(row[3])])
209            #print 'line',line[i],'point1',point1_answers_array[i]
210            assert num.allclose(line[i], point1_answers_array[i])
211
212        point2_answers_array = [[0.0,1.0,-0.5], [2.0,10.0,-0.5]]
213        point2_filename = 'gauge_point2.csv' 
214        point2_handle = file(point2_filename)
215        point2_reader = reader(point2_handle)
216        point2_reader.next()
217                       
218        line=[]
219        for i,row in enumerate(point2_reader):
220#            print 'i',i,'row',row
221            # note the 'hole' (element 1) below - skip the new 'hours' field
222            line.append([float(row[0]),float(row[2]),float(row[3])])
223#            print 'line',line[i],'point1',point1_answers_array[i]
224            assert num.allclose(line[i], point2_answers_array[i])
225                         
226        # clean up
227        point1_handle.close()
228        point2_handle.close() 
229        os.remove(points_file)
230        os.remove(point1_filename)
231        os.remove(point2_filename)       
232       
233
234    def test_sww2csv_gauges2(self):
235       
236        """Most of this test was copied from test_interpolate
237        test_interpole_sww2csv
238       
239        This is testing the sww2csv_gauges function, by creating a sww file and
240        then exporting the gauges and checking the results.
241       
242        This is the same as sww2csv_gauges except set domain.set_starttime to 5.
243        Therefore testing the storing of the absolute time in the csv files
244        """
245       
246        domain = self.domain
247        domain.set_starttime(1)
248       
249        self._create_sww(timestep=2)
250       
251        # test the function
252        points = [[5.0,1.],[0.5,2.]]
253
254        points_file = tempfile.mktemp(".csv")
255#        points_file = 'test_point.csv'
256        file_id = open(points_file,"w")
257        file_id.write("name, easting, northing, elevation \n\
258point1, 5.0, 1.0, 3.0\n\
259point2, 0.5, 2.0, 9.0\n")
260        file_id.close()
261       
262        sww2csv_gauges(self.sww.filename, 
263                            points_file,
264                            verbose=False,
265                            use_cache=False)
266
267#        point1_answers_array = [[0.0,1.0,-5.0,3.0,4.0], [2.0,10.0,-5.0,3.0,4.0]]
268        point1_answers_array = [[2.0,2.0/3600.,1.0,6.0,-5.0,3.0,4.0], [3.0,3.0/3600.,10.0,15.0,-5.0,3.0,4.0]]
269        point1_filename = 'gauge_point1.csv'
270        point1_handle = file(point1_filename)
271        point1_reader = reader(point1_handle)
272        point1_reader.next()
273
274        line=[]
275        for i,row in enumerate(point1_reader):
276            #print 'i',i,'row',row
277            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
278                         float(row[4]), float(row[5]), float(row[6])])
279            #print 'assert line',line[i],'answer',point1_answers_array[i]
280            assert num.allclose(line[i], point1_answers_array[i])
281
282        point2_answers_array = [[2.0,2.0/3600.,1.0,1.5,-0.5,3.0,4.0], [3.0,3.0/3600.,10.0,10.5,-0.5,3.0,4.0]]
283        point2_filename = 'gauge_point2.csv' 
284        point2_handle = file(point2_filename)
285        point2_reader = reader(point2_handle)
286        point2_reader.next()
287                       
288        line=[]
289        for i,row in enumerate(point2_reader):
290            #print 'i',i,'row',row
291            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
292                         float(row[4]),float(row[5]), float(row[6])])
293            #print 'assert line',line[i],'point1',point1_answers_array[i]
294            assert num.allclose(line[i], point2_answers_array[i])
295                         
296        # clean up
297        point1_handle.close()
298        point2_handle.close()
299        os.remove(points_file)
300        os.remove(point1_filename)
301        os.remove(point2_filename)
302
303
304       
305    def test_sww2csv_gauge_point_off_mesh(self):
306        from anuga.pmesh.mesh import Mesh
307        from csv import reader,writer
308        import time
309        import string
310       
311        """Most of this test was copied from test_interpolate
312        test_interpole_sww2csv
313       
314        This is testing the sww2csv_gauges function with one gauge off the mesh, by creating a sww file and
315        then exporting the gauges and checking the results.
316       
317        This tests the correct values for when a gauge is off the mesh, which is important for parallel.
318        """
319
320        domain = self.domain
321        sww = self._create_sww()
322     
323        # test the function
324        points = [[50.0,1.],[50.5,-20.25]]
325
326#        points_file = tempfile.mktemp(".csv")
327        points_file = 'test_point.csv'
328        file_id = open(points_file,"w")
329        file_id.write("name,easting,northing \n\
330offmesh1, 50.0, 1.0\n\
331offmesh2, 50.5, 20.25\n")
332        file_id.close()
333
334        points_files = ['offmesh1.csv', 'offmesh2.csv']       
335       
336        for point_filename in points_files:
337            if os.path.exists(point_filename): os.remove(point_filename)         
338       
339        sww2csv_gauges(self.sww.filename, 
340                            points_file,
341                            quantities=['stage', 'elevation', 'bearing'],
342                            use_cache=False,
343                            verbose=False)
344
345        for point_filename in points_files: 
346            assert not os.path.exists(point_filename)
347           
348        os.remove(points_file)
349       
350       
351    def test_sww2csv_centroid(self):
352       
353        """Check sww2csv timeseries at centroid.
354       
355        Test the ability to get a timeseries at the centroid of a triangle, rather
356        than the given gauge point.
357        """
358       
359        domain = self.domain
360        sww = self._create_sww()
361       
362        # create a csv file containing our gauge points
363        points_file = tempfile.mktemp(".csv")
364        file_id = open(points_file,"w")
365# These values are where the centroids should be       
366#        file_id.write("name, easting, northing, elevation \n\
367#point1, 2.0, 2.0, 3.0\n\
368#point2, 4.0, 4.0, 9.0\n")
369 
370# These values are slightly off the centroids - will it find the centroids?
371        file_id.write("name, easting, northing, elevation \n\
372point1, 2.0, 1.0, 3.0\n\
373point2, 4.5, 4.0, 9.0\n")
374
375 
376        file_id.close()
377
378        sww2csv_gauges(self.sww.filename, 
379                       points_file,
380                       verbose=False,
381                       use_cache=False,
382                       output_centroids=True)
383
384        point1_answers_array = [[0.0,0.0,1.0,3.0,-2.0,3.0,4.0], [2.0,2.0/3600.,10.0,12.0,-2.0,3.0,4.0]]
385        point1_filename = 'gauge_point1.csv'
386        point1_handle = file(point1_filename)
387        point1_reader = reader(point1_handle)
388        point1_reader.next()
389
390        line=[]
391        for i,row in enumerate(point1_reader):
392            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
393                         float(row[4]),float(row[5]),float(row[6])])
394#           print 'assert line',line[i],'point1',point1_answers_array[i]
395            assert num.allclose(line[i], point1_answers_array[i])
396
397        point2_answers_array = [[0.0,0.0,1.0,5.0,-4.0,3.0,4.0], [2.0,2.0/3600.,10.0,14.0,-4.0,3.0,4.0]]
398        point2_filename = 'gauge_point2.csv' 
399        point2_handle = file(point2_filename)
400        point2_reader = reader(point2_handle)
401        point2_reader.next()
402                       
403        line=[]
404        for i,row in enumerate(point2_reader):
405            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
406                         float(row[4]),float(row[5]),float(row[6])])
407#           print i, 'assert line',line[i],'point2',point2_answers_array[i]
408            assert num.allclose(line[i], point2_answers_array[i])
409                         
410        # clean up
411        point1_handle.close()
412        point2_handle.close()
413        os.remove(points_file)
414        os.remove(point1_filename)
415        os.remove(point2_filename)
416
417
418    def test_sww2csv_output_centroid_attribute(self):
419       
420        """Check sww2csv timeseries at centroid, then output the centroid coordinates.
421       
422        Test the ability to get a timeseries at the centroid of a triangle, rather
423        than the given gauge point, then output the results.
424        """
425       
426        domain = self.domain       
427        self._create_sww()
428       
429        # create a csv file containing our gauge points
430        points_file = tempfile.mktemp(".csv")
431        file_id = open(points_file,"w")
432 
433# These values are slightly off the centroids - will it find the centroids?
434        file_id.write("name, easting, northing, elevation \n\
435point1, 2.5, 4.25, 3.0\n")
436
437        file_id.close()
438
439        sww2csv_gauges(self.sww.filename, 
440                       points_file,
441                       quantities=['stage', 'xcentroid', 'ycentroid'],
442                       verbose=False,
443                       use_cache=False,
444                       output_centroids=True)
445
446        point1_answers_array = [[0.0,0.0,1.0,4.0,4.0], [2.0,2.0/3600.,10.0,4.0,4.0]]
447        point1_filename = 'gauge_point1.csv'
448        point1_handle = file(point1_filename)
449        point1_reader = reader(point1_handle)
450        point1_reader.next()
451
452        line=[]
453        for i,row in enumerate(point1_reader):
454            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),float(row[4])])
455#            print 'assert line',line[i],'point1',point1_answers_array[i]
456            assert num.allclose(line[i], point1_answers_array[i])
457
458        # clean up
459        point1_handle.close()       
460        os.remove(points_file)
461        os.remove(point1_filename)
462
463    def test_sww2csv_multiple_files(self):
464        """
465        This is testing the sww2csv_gauges function, by creating a multiple
466        sww file and then exporting the gauges and checking the results.
467        """
468        timestep=2.0
469        domain = self.domain
470        domain.set_starttime(0.)
471        # Create two sww files with timestep at end. These are to be
472        # stored consecutively in the gauge csv files
473        basename='datatest'
474        domain.set_name(basename) 
475        self._create_sww(stage=10.,timestep=timestep)
476
477        domain.set_name(basename+str(time.time())) 
478        domain.set_time(domain.get_time()+timestep)
479        self._create_sww(stage=20.,timestep=timestep)
480
481        points_file = tempfile.mktemp(".csv")
482        file_id = open(points_file,"w")
483
484        # test the function at these points
485        points = [[5.0,1.],[0.5,2.]]
486
487        # create a csv file containing our gauge points
488        points_file = tempfile.mktemp(".csv")
489        file_id = open(points_file,"w")
490        file_id.write("name,easting,northing \n\
491point1, 5.0, 1.0\n\
492point2, 0.5, 2.0\n")
493        file_id.close()
494
495        sww2csv_gauges(basename+".sww", 
496                       points_file,
497                       quantities=['stage', 'elevation'],
498                       use_cache=False,
499                       verbose=False)
500
501        point1_answers_array = [[0.0,1.0,-5.0], [2.0,10.0,-5.0],[4.0,10.0,-5.0],
502                                [6.0,20.0,-5.0]]
503        point1_filename = 'gauge_point1.csv'
504        point1_handle = file(point1_filename)
505        point1_reader = reader(point1_handle)
506        point1_reader.next()
507
508        line=[]
509        for i,row in enumerate(point1_reader):
510            # note the 'hole' (element 1) below - skip the new 'hours' field
511            line.append([float(row[0]),float(row[2]),float(row[3])])
512            #print 'line',line[i],'point1',point1_answers_array[i]
513            assert num.allclose(line[i], point1_answers_array[i])
514
515        point2_answers_array = [[0.0,1.0,-0.5], [2.0,10.0,-0.5],[4.0,10.0,-0.5],
516                                [6.0,20.0,-0.5]]
517        point2_filename = 'gauge_point2.csv' 
518        point2_handle = file(point2_filename)
519        point2_reader = reader(point2_handle)
520        point2_reader.next()
521                       
522        line=[]
523        for i,row in enumerate(point2_reader):
524            # note the 'hole' (element 1) below - skip the new 'hours' field
525            line.append([float(row[0]),float(row[2]),float(row[3])])
526            #print 'line',line[i],'point2',point2_answers_array[i]
527            assert num.allclose(line[i], point2_answers_array[i])
528                         
529        # clean up
530        point1_handle.close()
531        point2_handle.close() 
532        os.remove(points_file)
533        os.remove(point1_filename)
534        os.remove(point2_filename)       
535
536        #remove second swwfile not removed by tearDown
537        os.remove(basename+".sww")
538
539#-------------------------------------------------------------
540
541if __name__ == "__main__":
542    suite = unittest.makeSuite(Test_Gauge, 'test')
543#    runner = unittest.TextTestRunner(verbosity=2)
544    runner = unittest.TextTestRunner(verbosity=1)
545    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.