source: trunk/anuga_core/source/anuga/abstract_2d_finite_volumes/test_gauge.py @ 7778

Last change on this file since 7778 was 7778, checked in by hudson, 14 years ago

Cleaned up unit tests to use API.

File size: 16.7 KB
Line 
1#!/usr/bin/env python
2
3
4import unittest
5from math import sqrt, pi
6import tempfile, os
7from os import access, F_OK,sep, removedirs,remove,mkdir,getcwd
8
9import anuga
10#from anuga.abstract_2d_finite_volumes.util import *
11from anuga.abstract_2d_finite_volumes.gauge import *
12from anuga.config import epsilon
13
14from anuga.utilities.numerical_tools import NAN,mean
15
16from sys import platform
17
18from anuga.pmesh.mesh import Mesh
19
20from anuga.file.sww import SWW_file
21from anuga.file_conversion.file_conversion import timefile2netcdf
22from anuga.utilities.file_utils import del_dir
23from csv import reader, writer
24import time
25import string
26
27import numpy as num
28
29
30def test_function(x, y):
31    return x+y
32
33class Test_Gauge(unittest.TestCase):
34    def setUp(self):
35
36        def elevation_function(x, y):
37            return -x
38       
39        """ Setup for all tests. """
40       
41        mesh_file = tempfile.mktemp(".tsh")   
42        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
43        m = Mesh()
44        m.add_vertices(points)
45        m.auto_segment()
46        m.generate_mesh(verbose=False)
47        m.export_mesh_file(mesh_file)
48       
49        # Create shallow water domain
50        domain = anuga.Domain(mesh_file)
51        os.remove(mesh_file)
52 
53        domain.default_order = 2
54
55        # This test was made before tight_slope_limiters were introduced
56        # Since were are testing interpolation values this is OK
57        domain.tight_slope_limiters = 0
58               
59        # Set some field values
60        domain.set_quantity('elevation', elevation_function)
61        domain.set_quantity('friction', 0.03)
62        domain.set_quantity('xmomentum', 3.0)
63        domain.set_quantity('ymomentum', 4.0)
64
65        ######################
66        # Boundary conditions
67        B = anuga.Transmissive_boundary(domain)
68        domain.set_boundary( {'exterior': B})
69
70        # This call mangles the stage values.
71        domain.distribute_to_vertices_and_edges()
72        domain.set_quantity('stage', 1.0)
73
74        domain.set_name('datatest' + str(time.time()))
75        domain.smooth = True
76        domain.reduction = mean
77       
78        self.domain = domain
79       
80       
81    def tearDown(self):
82        """Called at end of each test."""
83        if self.sww:
84            os.remove(self.sww.filename)
85
86    def _create_sww(self):
87        self.sww = SWW_file(self.domain)
88        self.sww.store_connectivity()
89        self.sww.store_timestep()
90        self.domain.set_quantity('stage', 10.0) # This is automatically limited
91        # so it will not be less than the elevation
92        self.domain.time = 2.
93        self.sww.store_timestep()
94       
95       
96    def test_sww2csv(self):
97
98        """Most of this test was copied from test_interpolate
99        test_interpole_sww2csv
100       
101        This is testing the sww2csv_gauges function, by creating a sww file and
102        then exporting the gauges and checking the results.
103        """
104       
105        domain = self.domain
106        self._create_sww()
107       
108        # test the function
109        points = [[5.0,1.],[0.5,2.]]
110
111        points_file = tempfile.mktemp(".csv")
112#        points_file = 'test_point.csv'
113        file_id = open(points_file,"w")
114        file_id.write("name, easting, northing, elevation \n\
115point1, 5.0, 1.0, 3.0\n\
116point2, 0.5, 2.0, 9.0\n")
117        file_id.close()
118
119       
120        sww2csv_gauges(self.sww.filename, 
121                       points_file,
122                       verbose=False,
123                       use_cache=False)
124
125#        point1_answers_array = [[0.0,1.0,-5.0,3.0,4.0], [2.0,10.0,-5.0,3.0,4.0]]
126        point1_answers_array = [[0.0,0.0,1.0,6.0,-5.0,3.0,4.0], [2.0,2.0/3600.,10.0,15.0,-5.0,3.0,4.0]]
127        point1_filename = 'gauge_point1.csv'
128        point1_handle = file(point1_filename)
129        point1_reader = reader(point1_handle)
130        point1_reader.next()
131
132        line=[]
133        for i,row in enumerate(point1_reader):
134#            print 'i',i,'row',row
135            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
136                         float(row[4]),float(row[5]),float(row[6])])
137#            print 'assert line',line[i],'point1',point1_answers_array[i]
138            assert num.allclose(line[i], point1_answers_array[i])
139
140        point2_answers_array = [[0.0,0.0,1.0,1.5,-0.5,3.0,4.0], [2.0,2.0/3600.,10.0,10.5,-0.5,3.0,4.0]]
141        point2_filename = 'gauge_point2.csv' 
142        point2_handle = file(point2_filename)
143        point2_reader = reader(point2_handle)
144        point2_reader.next()
145                       
146        line=[]
147        for i,row in enumerate(point2_reader):
148#            print 'i',i,'row',row
149            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
150                         float(row[4]),float(row[5]),float(row[6])])
151#            print 'assert line',line[i],'point1',point1_answers_array[i]
152            assert num.allclose(line[i], point2_answers_array[i])
153                         
154        # clean up
155        point1_handle.close()
156        point2_handle.close()
157        os.remove(points_file)
158        os.remove(point1_filename)
159        os.remove(point2_filename)
160
161
162    def test_sww2csv_gauges1(self):
163        from anuga.pmesh.mesh import Mesh
164        from csv import reader,writer
165        import time
166        import string
167       
168        """Most of this test was copied from test_interpolate
169        test_interpole_sww2csv
170       
171        This is testing the sww2csv_gauges function, by creating a sww file and
172        then exporting the gauges and checking the results.
173       
174        This tests the ablity not to have elevation in the points file and
175        not store xmomentum and ymomentum
176        """
177       
178        domain = self.domain
179        self._create_sww()
180       
181        # test the function
182        points = [[5.0,1.],[0.5,2.]]
183
184        points_file = tempfile.mktemp(".csv")
185#        points_file = 'test_point.csv'
186        file_id = open(points_file,"w")
187        file_id.write("name,easting,northing \n\
188point1, 5.0, 1.0\n\
189point2, 0.5, 2.0\n")
190        file_id.close()
191
192        sww2csv_gauges(self.sww.filename, 
193                            points_file,
194                            quantities=['stage', 'elevation'],
195                            use_cache=False,
196                            verbose=False)
197
198        point1_answers_array = [[0.0,1.0,-5.0], [2.0,10.0,-5.0]]
199        point1_filename = 'gauge_point1.csv'
200        point1_handle = file(point1_filename)
201        point1_reader = reader(point1_handle)
202        point1_reader.next()
203
204        line=[]
205        for i,row in enumerate(point1_reader):
206#            print 'i',i,'row',row
207            # note the 'hole' (element 1) below - skip the new 'hours' field
208            line.append([float(row[0]),float(row[2]),float(row[3])])
209            #print 'line',line[i],'point1',point1_answers_array[i]
210            assert num.allclose(line[i], point1_answers_array[i])
211
212        point2_answers_array = [[0.0,1.0,-0.5], [2.0,10.0,-0.5]]
213        point2_filename = 'gauge_point2.csv' 
214        point2_handle = file(point2_filename)
215        point2_reader = reader(point2_handle)
216        point2_reader.next()
217                       
218        line=[]
219        for i,row in enumerate(point2_reader):
220#            print 'i',i,'row',row
221            # note the 'hole' (element 1) below - skip the new 'hours' field
222            line.append([float(row[0]),float(row[2]),float(row[3])])
223#            print 'line',line[i],'point1',point1_answers_array[i]
224            assert num.allclose(line[i], point2_answers_array[i])
225                         
226        # clean up
227        point1_handle.close()
228        point2_handle.close() 
229        os.remove(points_file)
230        os.remove(point1_filename)
231        os.remove(point2_filename)       
232       
233
234    def test_sww2csv_gauges2(self):
235       
236        """Most of this test was copied from test_interpolate
237        test_interpole_sww2csv
238       
239        This is testing the sww2csv_gauges function, by creating a sww file and
240        then exporting the gauges and checking the results.
241       
242        This is the same as sww2csv_gauges except set domain.set_starttime to 5.
243        Therefore testing the storing of the absolute time in the csv files
244        """
245       
246        domain = self.domain
247        domain.set_starttime(5)
248        self._create_sww()
249       
250        # test the function
251        points = [[5.0,1.],[0.5,2.]]
252
253        points_file = tempfile.mktemp(".csv")
254#        points_file = 'test_point.csv'
255        file_id = open(points_file,"w")
256        file_id.write("name, easting, northing, elevation \n\
257point1, 5.0, 1.0, 3.0\n\
258point2, 0.5, 2.0, 9.0\n")
259        file_id.close()
260
261        sww2csv_gauges(self.sww.filename, 
262                            points_file,
263                            verbose=False,
264                            use_cache=False)
265
266#        point1_answers_array = [[0.0,1.0,-5.0,3.0,4.0], [2.0,10.0,-5.0,3.0,4.0]]
267        point1_answers_array = [[5.0,5.0/3600.,1.0,6.0,-5.0,3.0,4.0], [7.0,7.0/3600.,10.0,15.0,-5.0,3.0,4.0]]
268        point1_filename = 'gauge_point1.csv'
269        point1_handle = file(point1_filename)
270        point1_reader = reader(point1_handle)
271        point1_reader.next()
272
273        line=[]
274        for i,row in enumerate(point1_reader):
275            #print 'i',i,'row',row
276            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
277                         float(row[4]), float(row[5]), float(row[6])])
278            #print 'assert line',line[i],'answer',point1_answers_array[i]
279            assert num.allclose(line[i], point1_answers_array[i])
280
281        point2_answers_array = [[5.0,5.0/3600.,1.0,1.5,-0.5,3.0,4.0], [7.0,7.0/3600.,10.0,10.5,-0.5,3.0,4.0]]
282        point2_filename = 'gauge_point2.csv' 
283        point2_handle = file(point2_filename)
284        point2_reader = reader(point2_handle)
285        point2_reader.next()
286                       
287        line=[]
288        for i,row in enumerate(point2_reader):
289            #print 'i',i,'row',row
290            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
291                         float(row[4]),float(row[5]), float(row[6])])
292            #print 'assert line',line[i],'point1',point1_answers_array[i]
293            assert num.allclose(line[i], point2_answers_array[i])
294                         
295        # clean up
296        point1_handle.close()
297        point2_handle.close()
298        os.remove(points_file)
299        os.remove(point1_filename)
300        os.remove(point2_filename)
301
302
303       
304    def test_sww2csv_gauge_point_off_mesh(self):
305        from anuga.pmesh.mesh import Mesh
306        from csv import reader,writer
307        import time
308        import string
309       
310        """Most of this test was copied from test_interpolate
311        test_interpole_sww2csv
312       
313        This is testing the sww2csv_gauges function with one gauge off the mesh, by creating a sww file and
314        then exporting the gauges and checking the results.
315       
316        This tests the correct values for when a gauge is off the mesh, which is important for parallel.
317        """
318
319        domain = self.domain
320        sww = self._create_sww()
321     
322        # test the function
323        points = [[50.0,1.],[50.5,-20.25]]
324
325#        points_file = tempfile.mktemp(".csv")
326        points_file = 'test_point.csv'
327        file_id = open(points_file,"w")
328        file_id.write("name,easting,northing \n\
329offmesh1, 50.0, 1.0\n\
330offmesh2, 50.5, 20.25\n")
331        file_id.close()
332
333        points_files = ['offmesh1.csv', 'offmesh2.csv']       
334       
335        for point_filename in points_files:
336            if os.path.exists(point_filename): os.remove(point_filename)         
337       
338        sww2csv_gauges(self.sww.filename, 
339                            points_file,
340                            quantities=['stage', 'elevation', 'bearing'],
341                            use_cache=False,
342                            verbose=False)
343
344        for point_filename in points_files: 
345            assert not os.path.exists(point_filename)
346           
347        os.remove(points_file)
348       
349       
350    def test_sww2csv_centroid(self):
351       
352        """Check sww2csv timeseries at centroid.
353       
354        Test the ability to get a timeseries at the centroid of a triangle, rather
355        than the given gauge point.
356        """
357       
358        domain = self.domain
359        sww = self._create_sww()
360       
361        # create a csv file containing our gauge points
362        points_file = tempfile.mktemp(".csv")
363        file_id = open(points_file,"w")
364# These values are where the centroids should be       
365#        file_id.write("name, easting, northing, elevation \n\
366#point1, 2.0, 2.0, 3.0\n\
367#point2, 4.0, 4.0, 9.0\n")
368 
369# These values are slightly off the centroids - will it find the centroids?
370        file_id.write("name, easting, northing, elevation \n\
371point1, 2.0, 1.0, 3.0\n\
372point2, 4.5, 4.0, 9.0\n")
373
374 
375        file_id.close()
376
377        sww2csv_gauges(self.sww.filename, 
378                       points_file,
379                       verbose=False,
380                       use_cache=False,
381                       output_centroids=True)
382
383        point1_answers_array = [[0.0,0.0,1.0,3.0,-2.0,3.0,4.0], [2.0,2.0/3600.,10.0,12.0,-2.0,3.0,4.0]]
384        point1_filename = 'gauge_point1.csv'
385        point1_handle = file(point1_filename)
386        point1_reader = reader(point1_handle)
387        point1_reader.next()
388
389        line=[]
390        for i,row in enumerate(point1_reader):
391            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
392                         float(row[4]),float(row[5]),float(row[6])])
393#           print 'assert line',line[i],'point1',point1_answers_array[i]
394            assert num.allclose(line[i], point1_answers_array[i])
395
396        point2_answers_array = [[0.0,0.0,1.0,5.0,-4.0,3.0,4.0], [2.0,2.0/3600.,10.0,14.0,-4.0,3.0,4.0]]
397        point2_filename = 'gauge_point2.csv' 
398        point2_handle = file(point2_filename)
399        point2_reader = reader(point2_handle)
400        point2_reader.next()
401                       
402        line=[]
403        for i,row in enumerate(point2_reader):
404            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
405                         float(row[4]),float(row[5]),float(row[6])])
406#           print i, 'assert line',line[i],'point2',point2_answers_array[i]
407            assert num.allclose(line[i], point2_answers_array[i])
408                         
409        # clean up
410        point1_handle.close()
411        point2_handle.close()
412        os.remove(points_file)
413        os.remove(point1_filename)
414        os.remove(point2_filename)
415
416
417    def test_sww2csv_output_centroid_attribute(self):
418       
419        """Check sww2csv timeseries at centroid, then output the centroid coordinates.
420       
421        Test the ability to get a timeseries at the centroid of a triangle, rather
422        than the given gauge point, then output the results.
423        """
424       
425        domain = self.domain       
426        self._create_sww()
427       
428        # create a csv file containing our gauge points
429        points_file = tempfile.mktemp(".csv")
430        file_id = open(points_file,"w")
431 
432# These values are slightly off the centroids - will it find the centroids?
433        file_id.write("name, easting, northing, elevation \n\
434point1, 2.5, 4.25, 3.0\n")
435
436        file_id.close()
437
438        sww2csv_gauges(self.sww.filename, 
439                       points_file,
440                       quantities=['stage', 'xcentroid', 'ycentroid'],
441                       verbose=False,
442                       use_cache=False,
443                       output_centroids=True)
444
445        point1_answers_array = [[0.0,0.0,1.0,4.0,4.0], [2.0,2.0/3600.,10.0,4.0,4.0]]
446        point1_filename = 'gauge_point1.csv'
447        point1_handle = file(point1_filename)
448        point1_reader = reader(point1_handle)
449        point1_reader.next()
450
451        line=[]
452        for i,row in enumerate(point1_reader):
453            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),float(row[4])])
454#            print 'assert line',line[i],'point1',point1_answers_array[i]
455            assert num.allclose(line[i], point1_answers_array[i])
456
457        # clean up
458        point1_handle.close()       
459        os.remove(points_file)
460        os.remove(point1_filename)
461       
462       
463
464#-------------------------------------------------------------
465
466if __name__ == "__main__":
467    suite = unittest.makeSuite(Test_Gauge, 'test')
468#    runner = unittest.TextTestRunner(verbosity=2)
469    runner = unittest.TextTestRunner(verbosity=1)
470    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.