source: anuga_core/source/anuga/abstract_2d_finite_volumes/test_gauge.py @ 7735

Last change on this file since 7735 was 7735, checked in by hudson, 15 years ago

Split up some of the huge modules in shallow_water, fixed most of the unit test dependencies.

File size: 16.9 KB
Line 
1#!/usr/bin/env python
2
3
4import unittest
5from math import sqrt, pi
6import tempfile, os
7from os import access, F_OK,sep, removedirs,remove,mkdir,getcwd
8
9#from anuga.abstract_2d_finite_volumes.util import *
10from anuga.abstract_2d_finite_volumes.gauge import *
11from anuga.config import epsilon
12
13from anuga.utilities.numerical_tools import NAN,mean
14
15from sys import platform
16
17from anuga.pmesh.mesh import Mesh
18from anuga.shallow_water import Domain, Transmissive_boundary
19from anuga.shallow_water.sww_file import SWW_file
20from anuga.shallow_water.file_conversion import timefile2netcdf
21from anuga.utilities.file_utils import del_dir
22from csv import reader,writer
23import time
24import string
25
26import numpy as num
27
28
29def test_function(x, y):
30    return x+y
31
32class Test_Gauge(unittest.TestCase):
33    def setUp(self):
34
35        def elevation_function(x, y):
36            return -x
37       
38        """ Setup for all tests. """
39       
40        mesh_file = tempfile.mktemp(".tsh")   
41        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
42        m = Mesh()
43        m.add_vertices(points)
44        m.auto_segment()
45        m.generate_mesh(verbose=False)
46        m.export_mesh_file(mesh_file)
47       
48        # Create shallow water domain
49        domain = Domain(mesh_file)
50        os.remove(mesh_file)
51 
52        domain.default_order = 2
53
54        # This test was made before tight_slope_limiters were introduced
55        # Since were are testing interpolation values this is OK
56        domain.tight_slope_limiters = 0
57               
58        # Set some field values
59        domain.set_quantity('elevation', elevation_function)
60        domain.set_quantity('friction', 0.03)
61        domain.set_quantity('xmomentum', 3.0)
62        domain.set_quantity('ymomentum', 4.0)
63
64        ######################
65        # Boundary conditions
66        B = Transmissive_boundary(domain)
67        domain.set_boundary( {'exterior': B})
68
69        # This call mangles the stage values.
70        domain.distribute_to_vertices_and_edges()
71        domain.set_quantity('stage', 1.0)
72
73        domain.set_name('datatest' + str(time.time()))
74        domain.smooth = True
75        domain.reduction = mean
76       
77        self.domain = domain
78       
79       
80    def tearDown(self):
81        """Called at end of each test."""
82        if self.sww:
83            os.remove(self.sww.filename)
84
85    def _create_sww(self):
86        self.sww = SWW_file(self.domain)
87        self.sww.store_connectivity()
88        self.sww.store_timestep()
89        self.domain.set_quantity('stage', 10.0) # This is automatically limited
90        # so it will not be less than the elevation
91        self.domain.time = 2.
92        self.sww.store_timestep()
93       
94       
95    def test_sww2csv(self):
96
97        """Most of this test was copied from test_interpolate
98        test_interpole_sww2csv
99       
100        This is testing the sww2csv_gauges function, by creating a sww file and
101        then exporting the gauges and checking the results.
102        """
103       
104        domain = self.domain
105        self._create_sww()
106       
107        # test the function
108        points = [[5.0,1.],[0.5,2.]]
109
110        points_file = tempfile.mktemp(".csv")
111#        points_file = 'test_point.csv'
112        file_id = open(points_file,"w")
113        file_id.write("name, easting, northing, elevation \n\
114point1, 5.0, 1.0, 3.0\n\
115point2, 0.5, 2.0, 9.0\n")
116        file_id.close()
117
118       
119        sww2csv_gauges(self.sww.filename, 
120                       points_file,
121                       verbose=False,
122                       use_cache=False)
123
124#        point1_answers_array = [[0.0,1.0,-5.0,3.0,4.0], [2.0,10.0,-5.0,3.0,4.0]]
125        point1_answers_array = [[0.0,0.0,1.0,6.0,-5.0,3.0,4.0], [2.0,2.0/3600.,10.0,15.0,-5.0,3.0,4.0]]
126        point1_filename = 'gauge_point1.csv'
127        point1_handle = file(point1_filename)
128        point1_reader = reader(point1_handle)
129        point1_reader.next()
130
131        line=[]
132        for i,row in enumerate(point1_reader):
133#            print 'i',i,'row',row
134            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
135                         float(row[4]),float(row[5]),float(row[6])])
136#            print 'assert line',line[i],'point1',point1_answers_array[i]
137            assert num.allclose(line[i], point1_answers_array[i])
138
139        point2_answers_array = [[0.0,0.0,1.0,1.5,-0.5,3.0,4.0], [2.0,2.0/3600.,10.0,10.5,-0.5,3.0,4.0]]
140        point2_filename = 'gauge_point2.csv' 
141        point2_handle = file(point2_filename)
142        point2_reader = reader(point2_handle)
143        point2_reader.next()
144                       
145        line=[]
146        for i,row in enumerate(point2_reader):
147#            print 'i',i,'row',row
148            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
149                         float(row[4]),float(row[5]),float(row[6])])
150#            print 'assert line',line[i],'point1',point1_answers_array[i]
151            assert num.allclose(line[i], point2_answers_array[i])
152                         
153        # clean up
154        point1_handle.close()
155        point2_handle.close()
156        os.remove(points_file)
157        os.remove(point1_filename)
158        os.remove(point2_filename)
159
160
161    def test_sww2csv_gauges1(self):
162        from anuga.pmesh.mesh import Mesh
163        from anuga.shallow_water import Domain, Transmissive_boundary
164        from csv import reader,writer
165        import time
166        import string
167       
168        """Most of this test was copied from test_interpolate
169        test_interpole_sww2csv
170       
171        This is testing the sww2csv_gauges function, by creating a sww file and
172        then exporting the gauges and checking the results.
173       
174        This tests the ablity not to have elevation in the points file and
175        not store xmomentum and ymomentum
176        """
177       
178        domain = self.domain
179        self._create_sww()
180       
181        # test the function
182        points = [[5.0,1.],[0.5,2.]]
183
184        points_file = tempfile.mktemp(".csv")
185#        points_file = 'test_point.csv'
186        file_id = open(points_file,"w")
187        file_id.write("name,easting,northing \n\
188point1, 5.0, 1.0\n\
189point2, 0.5, 2.0\n")
190        file_id.close()
191
192        sww2csv_gauges(self.sww.filename, 
193                            points_file,
194                            quantities=['stage', 'elevation'],
195                            use_cache=False,
196                            verbose=False)
197
198        point1_answers_array = [[0.0,1.0,-5.0], [2.0,10.0,-5.0]]
199        point1_filename = 'gauge_point1.csv'
200        point1_handle = file(point1_filename)
201        point1_reader = reader(point1_handle)
202        point1_reader.next()
203
204        line=[]
205        for i,row in enumerate(point1_reader):
206#            print 'i',i,'row',row
207            # note the 'hole' (element 1) below - skip the new 'hours' field
208            line.append([float(row[0]),float(row[2]),float(row[3])])
209            #print 'line',line[i],'point1',point1_answers_array[i]
210            assert num.allclose(line[i], point1_answers_array[i])
211
212        point2_answers_array = [[0.0,1.0,-0.5], [2.0,10.0,-0.5]]
213        point2_filename = 'gauge_point2.csv' 
214        point2_handle = file(point2_filename)
215        point2_reader = reader(point2_handle)
216        point2_reader.next()
217                       
218        line=[]
219        for i,row in enumerate(point2_reader):
220#            print 'i',i,'row',row
221            # note the 'hole' (element 1) below - skip the new 'hours' field
222            line.append([float(row[0]),float(row[2]),float(row[3])])
223#            print 'line',line[i],'point1',point1_answers_array[i]
224            assert num.allclose(line[i], point2_answers_array[i])
225                         
226        # clean up
227        point1_handle.close()
228        point2_handle.close() 
229        os.remove(points_file)
230        os.remove(point1_filename)
231        os.remove(point2_filename)       
232       
233
234    def test_sww2csv_gauges2(self):
235       
236        """Most of this test was copied from test_interpolate
237        test_interpole_sww2csv
238       
239        This is testing the sww2csv_gauges function, by creating a sww file and
240        then exporting the gauges and checking the results.
241       
242        This is the same as sww2csv_gauges except set domain.set_starttime to 5.
243        Therefore testing the storing of the absolute time in the csv files
244        """
245       
246        domain = self.domain
247        domain.set_starttime(5)
248        self._create_sww()
249       
250        # test the function
251        points = [[5.0,1.],[0.5,2.]]
252
253        points_file = tempfile.mktemp(".csv")
254#        points_file = 'test_point.csv'
255        file_id = open(points_file,"w")
256        file_id.write("name, easting, northing, elevation \n\
257point1, 5.0, 1.0, 3.0\n\
258point2, 0.5, 2.0, 9.0\n")
259        file_id.close()
260
261        sww2csv_gauges(self.sww.filename, 
262                            points_file,
263                            verbose=False,
264                            use_cache=False)
265
266#        point1_answers_array = [[0.0,1.0,-5.0,3.0,4.0], [2.0,10.0,-5.0,3.0,4.0]]
267        point1_answers_array = [[5.0,5.0/3600.,1.0,6.0,-5.0,3.0,4.0], [7.0,7.0/3600.,10.0,15.0,-5.0,3.0,4.0]]
268        point1_filename = 'gauge_point1.csv'
269        point1_handle = file(point1_filename)
270        point1_reader = reader(point1_handle)
271        point1_reader.next()
272
273        line=[]
274        for i,row in enumerate(point1_reader):
275            #print 'i',i,'row',row
276            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
277                         float(row[4]), float(row[5]), float(row[6])])
278            #print 'assert line',line[i],'answer',point1_answers_array[i]
279            assert num.allclose(line[i], point1_answers_array[i])
280
281        point2_answers_array = [[5.0,5.0/3600.,1.0,1.5,-0.5,3.0,4.0], [7.0,7.0/3600.,10.0,10.5,-0.5,3.0,4.0]]
282        point2_filename = 'gauge_point2.csv' 
283        point2_handle = file(point2_filename)
284        point2_reader = reader(point2_handle)
285        point2_reader.next()
286                       
287        line=[]
288        for i,row in enumerate(point2_reader):
289            #print 'i',i,'row',row
290            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
291                         float(row[4]),float(row[5]), float(row[6])])
292            #print 'assert line',line[i],'point1',point1_answers_array[i]
293            assert num.allclose(line[i], point2_answers_array[i])
294                         
295        # clean up
296        point1_handle.close()
297        point2_handle.close()
298        os.remove(points_file)
299        os.remove(point1_filename)
300        os.remove(point2_filename)
301
302
303       
304    def test_sww2csv_gauge_point_off_mesh(self):
305        from anuga.pmesh.mesh import Mesh
306        from anuga.shallow_water import Domain, Transmissive_boundary
307        from csv import reader,writer
308        import time
309        import string
310       
311        """Most of this test was copied from test_interpolate
312        test_interpole_sww2csv
313       
314        This is testing the sww2csv_gauges function with one gauge off the mesh, by creating a sww file and
315        then exporting the gauges and checking the results.
316       
317        This tests the correct values for when a gauge is off the mesh, which is important for parallel.
318        """
319
320        domain = self.domain
321        sww = self._create_sww()
322     
323        # test the function
324        points = [[50.0,1.],[50.5,-20.25]]
325
326#        points_file = tempfile.mktemp(".csv")
327        points_file = 'test_point.csv'
328        file_id = open(points_file,"w")
329        file_id.write("name,easting,northing \n\
330offmesh1, 50.0, 1.0\n\
331offmesh2, 50.5, 20.25\n")
332        file_id.close()
333
334        points_files = ['offmesh1.csv', 'offmesh2.csv']       
335       
336        for point_filename in points_files:
337            if os.path.exists(point_filename): os.remove(point_filename)         
338       
339        sww2csv_gauges(self.sww.filename, 
340                            points_file,
341                            quantities=['stage', 'elevation', 'bearing'],
342                            use_cache=False,
343                            verbose=False)
344
345        for point_filename in points_files: 
346            assert not os.path.exists(point_filename)
347           
348        os.remove(points_file)
349       
350       
351    def test_sww2csv_centroid(self):
352       
353        """Check sww2csv timeseries at centroid.
354       
355        Test the ability to get a timeseries at the centroid of a triangle, rather
356        than the given gauge point.
357        """
358       
359        domain = self.domain
360        sww = self._create_sww()
361       
362        # create a csv file containing our gauge points
363        points_file = tempfile.mktemp(".csv")
364        file_id = open(points_file,"w")
365# These values are where the centroids should be       
366#        file_id.write("name, easting, northing, elevation \n\
367#point1, 2.0, 2.0, 3.0\n\
368#point2, 4.0, 4.0, 9.0\n")
369 
370# These values are slightly off the centroids - will it find the centroids?
371        file_id.write("name, easting, northing, elevation \n\
372point1, 2.0, 1.0, 3.0\n\
373point2, 4.5, 4.0, 9.0\n")
374
375 
376        file_id.close()
377
378        sww2csv_gauges(self.sww.filename, 
379                       points_file,
380                       verbose=False,
381                       use_cache=False,
382                       output_centroids=True)
383
384        point1_answers_array = [[0.0,0.0,1.0,3.0,-2.0,3.0,4.0], [2.0,2.0/3600.,10.0,12.0,-2.0,3.0,4.0]]
385        point1_filename = 'gauge_point1.csv'
386        point1_handle = file(point1_filename)
387        point1_reader = reader(point1_handle)
388        point1_reader.next()
389
390        line=[]
391        for i,row in enumerate(point1_reader):
392            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
393                         float(row[4]),float(row[5]),float(row[6])])
394#           print 'assert line',line[i],'point1',point1_answers_array[i]
395            assert num.allclose(line[i], point1_answers_array[i])
396
397        point2_answers_array = [[0.0,0.0,1.0,5.0,-4.0,3.0,4.0], [2.0,2.0/3600.,10.0,14.0,-4.0,3.0,4.0]]
398        point2_filename = 'gauge_point2.csv' 
399        point2_handle = file(point2_filename)
400        point2_reader = reader(point2_handle)
401        point2_reader.next()
402                       
403        line=[]
404        for i,row in enumerate(point2_reader):
405            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),
406                         float(row[4]),float(row[5]),float(row[6])])
407#           print i, 'assert line',line[i],'point2',point2_answers_array[i]
408            assert num.allclose(line[i], point2_answers_array[i])
409                         
410        # clean up
411        point1_handle.close()
412        point2_handle.close()
413        os.remove(points_file)
414        os.remove(point1_filename)
415        os.remove(point2_filename)
416
417
418    def test_sww2csv_output_centroid_attribute(self):
419       
420        """Check sww2csv timeseries at centroid, then output the centroid coordinates.
421       
422        Test the ability to get a timeseries at the centroid of a triangle, rather
423        than the given gauge point, then output the results.
424        """
425       
426        domain = self.domain       
427        self._create_sww()
428       
429        # create a csv file containing our gauge points
430        points_file = tempfile.mktemp(".csv")
431        file_id = open(points_file,"w")
432 
433# These values are slightly off the centroids - will it find the centroids?
434        file_id.write("name, easting, northing, elevation \n\
435point1, 2.5, 4.25, 3.0\n")
436
437        file_id.close()
438
439        sww2csv_gauges(self.sww.filename, 
440                       points_file,
441                       quantities=['stage', 'xcentroid', 'ycentroid'],
442                       verbose=False,
443                       use_cache=False,
444                       output_centroids=True)
445
446        point1_answers_array = [[0.0,0.0,1.0,4.0,4.0], [2.0,2.0/3600.,10.0,4.0,4.0]]
447        point1_filename = 'gauge_point1.csv'
448        point1_handle = file(point1_filename)
449        point1_reader = reader(point1_handle)
450        point1_reader.next()
451
452        line=[]
453        for i,row in enumerate(point1_reader):
454            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),float(row[4])])
455#            print 'assert line',line[i],'point1',point1_answers_array[i]
456            assert num.allclose(line[i], point1_answers_array[i])
457
458        # clean up
459        point1_handle.close()       
460        os.remove(points_file)
461        os.remove(point1_filename)
462       
463       
464
465#-------------------------------------------------------------
466
467if __name__ == "__main__":
468    suite = unittest.makeSuite(Test_Gauge, 'test')
469#    runner = unittest.TextTestRunner(verbosity=2)
470    runner = unittest.TextTestRunner(verbosity=1)
471    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.