source: anuga_core/source/anuga/abstract_2d_finite_volumes/test_util.py @ 5182

Last change on this file since 5182 was 5182, checked in by ole, 16 years ago

Removed test that only executed on Windows
Also made any further reference to make_plots_from_csv_file raise an Exception.

File size: 60.1 KB
Line 
1#!/usr/bin/env python
2
3
4import unittest
5from Numeric import zeros, array, allclose, Float
6from math import sqrt, pi
7import tempfile, os
8from os import access, F_OK,sep, removedirs,remove,mkdir,getcwd
9
10from anuga.abstract_2d_finite_volumes.util import *
11from anuga.config import epsilon
12from anuga.shallow_water.data_manager import timefile2netcdf,del_dir
13
14from anuga.utilities.numerical_tools import NAN
15
16from sys import platform
17
18from anuga.pmesh.mesh import Mesh
19from anuga.shallow_water import Domain, Transmissive_boundary
20from anuga.shallow_water.data_manager import get_dataobject
21from csv import reader,writer
22import time
23import string
24
25def test_function(x, y):
26    return x+y
27
28class Test_Util(unittest.TestCase):
29    def setUp(self):
30        pass
31
32    def tearDown(self):
33        pass
34
35
36
37
38    #Geometric
39    #def test_distance(self):
40    #    from anuga.abstract_2d_finite_volumes.util import distance#
41    #
42    #    self.failUnless( distance([4,2],[7,6]) == 5.0,
43    #                     'Distance is wrong!')
44    #    self.failUnless( allclose(distance([7,6],[9,8]), 2.82842712475),
45    #                    'distance is wrong!')
46    #    self.failUnless( allclose(distance([9,8],[4,2]), 7.81024967591),
47    #                    'distance is wrong!')
48    #
49    #    self.failUnless( distance([9,8],[4,2]) == distance([4,2],[9,8]),
50    #                    'distance is wrong!')
51
52
53    def test_file_function_time1(self):
54        """Test that File function interpolates correctly
55        between given times. No x,y dependency here.
56        """
57
58        #Write file
59        import os, time
60        from anuga.config import time_format
61        from math import sin, pi
62
63        #Typical ASCII file
64        finaltime = 1200
65        filename = 'test_file_function'
66        fid = open(filename + '.txt', 'w')
67        start = time.mktime(time.strptime('2000', '%Y'))
68        dt = 60  #One minute intervals
69        t = 0.0
70        while t <= finaltime:
71            t_string = time.strftime(time_format, time.gmtime(t+start))
72            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
73            t += dt
74
75        fid.close()
76
77        #Convert ASCII file to NetCDF (Which is what we really like!)
78        timefile2netcdf(filename)
79
80
81        #Create file function from time series
82        F = file_function(filename + '.tms',
83                          quantities = ['Attribute0',
84                                        'Attribute1',
85                                        'Attribute2'])
86       
87        #Now try interpolation
88        for i in range(20):
89            t = i*10
90            q = F(t)
91
92            #Exact linear intpolation
93            assert allclose(q[0], 2*t)
94            if i%6 == 0:
95                assert allclose(q[1], t**2)
96                assert allclose(q[2], sin(t*pi/600))
97
98        #Check non-exact
99
100        t = 90 #Halfway between 60 and 120
101        q = F(t)
102        assert allclose( (120**2 + 60**2)/2, q[1] )
103        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
104
105
106        t = 100 #Two thirds of the way between between 60 and 120
107        q = F(t)
108        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
109        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
110
111        os.remove(filename + '.txt')
112        os.remove(filename + '.tms')       
113
114
115       
116    def test_spatio_temporal_file_function_basic(self):
117        """Test that spatio temporal file function performs the correct
118        interpolations in both time and space
119        NetCDF version (x,y,t dependency)       
120        """
121        import time
122
123        #Create sww file of simple propagation from left to right
124        #through rectangular domain
125        from shallow_water import Domain, Dirichlet_boundary
126        from mesh_factory import rectangular
127        from Numeric import take, concatenate, reshape
128
129        #Create basic mesh and shallow water domain
130        points, vertices, boundary = rectangular(3, 3)
131        domain1 = Domain(points, vertices, boundary)
132
133        from anuga.utilities.numerical_tools import mean
134        domain1.reduction = mean
135        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
136                              # only one value.
137
138        domain1.default_order = 2
139        domain1.store = True
140        domain1.set_datadir('.')
141        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
142        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
143
144        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
145        domain1.set_quantity('elevation', 0)
146        domain1.set_quantity('friction', 0)
147        domain1.set_quantity('stage', 0)
148
149        # Boundary conditions
150        B0 = Dirichlet_boundary([0,0,0])
151        B6 = Dirichlet_boundary([0.6,0,0])
152        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
153        domain1.check_integrity()
154
155        finaltime = 8
156        #Evolution
157        t0 = -1
158        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
159            #print 'Timesteps: %.16f, %.16f' %(t0, t)
160            #if t == t0:
161            #    msg = 'Duplicate timestep found: %f, %f' %(t0, t)
162            #   raise msg
163            t0 = t
164             
165            #domain1.write_time()
166
167
168        #Now read data from sww and check
169        from Scientific.IO.NetCDF import NetCDFFile
170        filename = domain1.get_name() + '.' + domain1.format
171        fid = NetCDFFile(filename)
172
173        x = fid.variables['x'][:]
174        y = fid.variables['y'][:]
175        stage = fid.variables['stage'][:]
176        xmomentum = fid.variables['xmomentum'][:]
177        ymomentum = fid.variables['ymomentum'][:]
178        time = fid.variables['time'][:]
179
180        #Take stage vertex values at last timestep on diagonal
181        #Diagonal is identified by vertices: 0, 5, 10, 15
182
183        last_time_index = len(time)-1 #Last last_time_index
184        d_stage = reshape(take(stage[last_time_index, :], [0,5,10,15]), (4,1))
185        d_uh = reshape(take(xmomentum[last_time_index, :], [0,5,10,15]), (4,1))
186        d_vh = reshape(take(ymomentum[last_time_index, :], [0,5,10,15]), (4,1))
187        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
188
189        #Reference interpolated values at midpoints on diagonal at
190        #this timestep are
191        r0 = (D[0] + D[1])/2
192        r1 = (D[1] + D[2])/2
193        r2 = (D[2] + D[3])/2
194
195        #And the midpoints are found now
196        Dx = take(reshape(x, (16,1)), [0,5,10,15])
197        Dy = take(reshape(y, (16,1)), [0,5,10,15])
198
199        diag = concatenate( (Dx, Dy), axis=1)
200        d_midpoints = (diag[1:] + diag[:-1])/2
201
202        #Let us see if the file function can find the correct
203        #values at the midpoints at the last timestep:
204        f = file_function(filename, domain1,
205                          interpolation_points = d_midpoints)
206
207        T = f.get_time()
208        msg = 'duplicate timesteps: %.16f and %.16f' %(T[-1], T[-2])
209        assert not T[-1] == T[-2], msg
210        t = time[last_time_index]
211        q = f(t, point_id=0); assert allclose(r0, q)
212        q = f(t, point_id=1); assert allclose(r1, q)
213        q = f(t, point_id=2); assert allclose(r2, q)
214
215
216        ##################
217        #Now do the same for the first timestep
218
219        timestep = 0 #First timestep
220        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
221        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
222        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
223        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
224
225        #Reference interpolated values at midpoints on diagonal at
226        #this timestep are
227        r0 = (D[0] + D[1])/2
228        r1 = (D[1] + D[2])/2
229        r2 = (D[2] + D[3])/2
230
231        #Let us see if the file function can find the correct
232        #values
233        q = f(0, point_id=0); assert allclose(r0, q)
234        q = f(0, point_id=1); assert allclose(r1, q)
235        q = f(0, point_id=2); assert allclose(r2, q)
236
237
238        ##################
239        #Now do it again for a timestep in the middle
240
241        timestep = 33
242        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
243        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
244        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
245        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
246
247        #Reference interpolated values at midpoints on diagonal at
248        #this timestep are
249        r0 = (D[0] + D[1])/2
250        r1 = (D[1] + D[2])/2
251        r2 = (D[2] + D[3])/2
252
253        q = f(timestep/10., point_id=0); assert allclose(r0, q)
254        q = f(timestep/10., point_id=1); assert allclose(r1, q)
255        q = f(timestep/10., point_id=2); assert allclose(r2, q)
256
257
258        ##################
259        #Now check temporal interpolation
260        #Halfway between timestep 15 and 16
261
262        timestep = 15
263        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
264        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
265        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
266        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
267
268        #Reference interpolated values at midpoints on diagonal at
269        #this timestep are
270        r0_0 = (D[0] + D[1])/2
271        r1_0 = (D[1] + D[2])/2
272        r2_0 = (D[2] + D[3])/2
273
274        #
275        timestep = 16
276        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
277        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
278        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
279        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
280
281        #Reference interpolated values at midpoints on diagonal at
282        #this timestep are
283        r0_1 = (D[0] + D[1])/2
284        r1_1 = (D[1] + D[2])/2
285        r2_1 = (D[2] + D[3])/2
286
287        # The reference values are
288        r0 = (r0_0 + r0_1)/2
289        r1 = (r1_0 + r1_1)/2
290        r2 = (r2_0 + r2_1)/2
291
292        q = f((timestep - 0.5)/10., point_id=0); assert allclose(r0, q)
293        q = f((timestep - 0.5)/10., point_id=1); assert allclose(r1, q)
294        q = f((timestep - 0.5)/10., point_id=2); assert allclose(r2, q)
295
296        ##################
297        #Finally check interpolation 2 thirds of the way
298        #between timestep 15 and 16
299
300        # The reference values are
301        r0 = (r0_0 + 2*r0_1)/3
302        r1 = (r1_0 + 2*r1_1)/3
303        r2 = (r2_0 + 2*r2_1)/3
304
305        #And the file function gives
306        q = f((timestep - 1.0/3)/10., point_id=0); assert allclose(r0, q)
307        q = f((timestep - 1.0/3)/10., point_id=1); assert allclose(r1, q)
308        q = f((timestep - 1.0/3)/10., point_id=2); assert allclose(r2, q)
309
310        fid.close()
311        import os
312        os.remove(filename)
313
314
315
316    def test_spatio_temporal_file_function_different_origin(self):
317        """Test that spatio temporal file function performs the correct
318        interpolations in both time and space where space is offset by
319        xllcorner and yllcorner
320        NetCDF version (x,y,t dependency)       
321        """
322        import time
323
324        #Create sww file of simple propagation from left to right
325        #through rectangular domain
326        from shallow_water import Domain, Dirichlet_boundary
327        from mesh_factory import rectangular
328        from Numeric import take, concatenate, reshape
329
330
331        from anuga.coordinate_transforms.geo_reference import Geo_reference
332        xllcorner = 2048
333        yllcorner = 11000
334        zone = 2
335
336        #Create basic mesh and shallow water domain
337        points, vertices, boundary = rectangular(3, 3)
338        domain1 = Domain(points, vertices, boundary,
339                         geo_reference = Geo_reference(xllcorner = xllcorner,
340                                                       yllcorner = yllcorner))
341       
342
343        from anuga.utilities.numerical_tools import mean       
344        domain1.reduction = mean
345        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
346                              # only one value.
347
348        domain1.default_order = 2
349        domain1.store = True
350        domain1.set_datadir('.')
351        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
352        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
353
354        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
355        domain1.set_quantity('elevation', 0)
356        domain1.set_quantity('friction', 0)
357        domain1.set_quantity('stage', 0)
358
359        # Boundary conditions
360        B0 = Dirichlet_boundary([0,0,0])
361        B6 = Dirichlet_boundary([0.6,0,0])
362        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
363        domain1.check_integrity()
364
365        finaltime = 8
366        #Evolution
367        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
368            pass
369            #domain1.write_time()
370
371
372        #Now read data from sww and check
373        from Scientific.IO.NetCDF import NetCDFFile
374        filename = domain1.get_name() + '.' + domain1.format
375        fid = NetCDFFile(filename)
376
377        x = fid.variables['x'][:]
378        y = fid.variables['y'][:]
379        stage = fid.variables['stage'][:]
380        xmomentum = fid.variables['xmomentum'][:]
381        ymomentum = fid.variables['ymomentum'][:]
382        time = fid.variables['time'][:]
383
384        #Take stage vertex values at last timestep on diagonal
385        #Diagonal is identified by vertices: 0, 5, 10, 15
386
387        last_time_index = len(time)-1 #Last last_time_index     
388        d_stage = reshape(take(stage[last_time_index, :], [0,5,10,15]), (4,1))
389        d_uh = reshape(take(xmomentum[last_time_index, :], [0,5,10,15]), (4,1))
390        d_vh = reshape(take(ymomentum[last_time_index, :], [0,5,10,15]), (4,1))
391        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
392
393        #Reference interpolated values at midpoints on diagonal at
394        #this timestep are
395        r0 = (D[0] + D[1])/2
396        r1 = (D[1] + D[2])/2
397        r2 = (D[2] + D[3])/2
398
399        #And the midpoints are found now
400        Dx = take(reshape(x, (16,1)), [0,5,10,15])
401        Dy = take(reshape(y, (16,1)), [0,5,10,15])
402
403        diag = concatenate( (Dx, Dy), axis=1)
404        d_midpoints = (diag[1:] + diag[:-1])/2
405
406
407        #Adjust for georef - make interpolation points absolute
408        d_midpoints[:,0] += xllcorner
409        d_midpoints[:,1] += yllcorner               
410
411        #Let us see if the file function can find the correct
412        #values at the midpoints at the last timestep:
413        f = file_function(filename, domain1,
414                          interpolation_points = d_midpoints)
415
416        t = time[last_time_index]                         
417        q = f(t, point_id=0); assert allclose(r0, q)
418        q = f(t, point_id=1); assert allclose(r1, q)
419        q = f(t, point_id=2); assert allclose(r2, q)
420
421
422        ##################
423        #Now do the same for the first timestep
424
425        timestep = 0 #First timestep
426        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
427        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
428        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
429        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
430
431        #Reference interpolated values at midpoints on diagonal at
432        #this timestep are
433        r0 = (D[0] + D[1])/2
434        r1 = (D[1] + D[2])/2
435        r2 = (D[2] + D[3])/2
436
437        #Let us see if the file function can find the correct
438        #values
439        q = f(0, point_id=0); assert allclose(r0, q)
440        q = f(0, point_id=1); assert allclose(r1, q)
441        q = f(0, point_id=2); assert allclose(r2, q)
442
443
444        ##################
445        #Now do it again for a timestep in the middle
446
447        timestep = 33
448        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
449        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
450        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
451        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
452
453        #Reference interpolated values at midpoints on diagonal at
454        #this timestep are
455        r0 = (D[0] + D[1])/2
456        r1 = (D[1] + D[2])/2
457        r2 = (D[2] + D[3])/2
458
459        q = f(timestep/10., point_id=0); assert allclose(r0, q)
460        q = f(timestep/10., point_id=1); assert allclose(r1, q)
461        q = f(timestep/10., point_id=2); assert allclose(r2, q)
462
463
464        ##################
465        #Now check temporal interpolation
466        #Halfway between timestep 15 and 16
467
468        timestep = 15
469        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
470        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
471        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
472        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
473
474        #Reference interpolated values at midpoints on diagonal at
475        #this timestep are
476        r0_0 = (D[0] + D[1])/2
477        r1_0 = (D[1] + D[2])/2
478        r2_0 = (D[2] + D[3])/2
479
480        #
481        timestep = 16
482        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
483        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
484        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
485        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
486
487        #Reference interpolated values at midpoints on diagonal at
488        #this timestep are
489        r0_1 = (D[0] + D[1])/2
490        r1_1 = (D[1] + D[2])/2
491        r2_1 = (D[2] + D[3])/2
492
493        # The reference values are
494        r0 = (r0_0 + r0_1)/2
495        r1 = (r1_0 + r1_1)/2
496        r2 = (r2_0 + r2_1)/2
497
498        q = f((timestep - 0.5)/10., point_id=0); assert allclose(r0, q)
499        q = f((timestep - 0.5)/10., point_id=1); assert allclose(r1, q)
500        q = f((timestep - 0.5)/10., point_id=2); assert allclose(r2, q)
501
502        ##################
503        #Finally check interpolation 2 thirds of the way
504        #between timestep 15 and 16
505
506        # The reference values are
507        r0 = (r0_0 + 2*r0_1)/3
508        r1 = (r1_0 + 2*r1_1)/3
509        r2 = (r2_0 + 2*r2_1)/3
510
511        #And the file function gives
512        q = f((timestep - 1.0/3)/10., point_id=0); assert allclose(r0, q)
513        q = f((timestep - 1.0/3)/10., point_id=1); assert allclose(r1, q)
514        q = f((timestep - 1.0/3)/10., point_id=2); assert allclose(r2, q)
515
516        fid.close()
517        import os
518        os.remove(filename)
519
520       
521
522
523    def qtest_spatio_temporal_file_function_time(self):
524        """Test that File function interpolates correctly
525        between given times.
526        NetCDF version (x,y,t dependency)
527        """
528
529        #Create NetCDF (sww) file to be read
530        # x: 0, 5, 10, 15
531        # y: -20, -10, 0, 10
532        # t: 0, 60, 120, ...., 1200
533        #
534        # test quantities (arbitrary but non-trivial expressions):
535        #
536        #   stage     = 3*x - y**2 + 2*t
537        #   xmomentum = exp( -((x-7)**2 + (y+5)**2)/20 ) * t**2
538        #   ymomentum = x**2 + y**2 * sin(t*pi/600)
539
540        #NOTE: Nice test that may render some of the others redundant.
541
542        import os, time
543        from anuga.config import time_format
544        from Numeric import sin, pi, exp
545        from mesh_factory import rectangular
546        from shallow_water import Domain
547        import anuga.shallow_water.data_manager
548
549        finaltime = 1200
550        filename = 'test_file_function'
551
552        #Create a domain to hold test grid
553        #(0:15, -20:10)
554        points, vertices, boundary =\
555                rectangular(4, 4, 15, 30, origin = (0, -20))
556        print "points", points
557
558        #print 'Number of elements', len(vertices)
559        domain = Domain(points, vertices, boundary)
560        domain.smooth = False
561        domain.default_order = 2
562        domain.set_datadir('.')
563        domain.set_name(filename)
564        domain.store = True
565        domain.format = 'sww'   #Native netcdf visualisation format
566
567        #print points
568        start = time.mktime(time.strptime('2000', '%Y'))
569        domain.starttime = start
570
571
572        #Store structure
573        domain.initialise_storage()
574
575        #Compute artificial time steps and store
576        dt = 60  #One minute intervals
577        t = 0.0
578        while t <= finaltime:
579            #Compute quantities
580            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
581            domain.set_quantity('stage', f1)
582
583            f2 = lambda x,y: x+y+t**2
584            domain.set_quantity('xmomentum', f2)
585
586            f3 = lambda x,y: x**2 + y**2 * sin(t*pi/600)
587            domain.set_quantity('ymomentum', f3)
588
589            #Store and advance time
590            domain.time = t
591            domain.store_timestep(domain.conserved_quantities)
592            t += dt
593
594
595        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5]]
596     
597        #Deliberately set domain.starttime to too early
598        domain.starttime = start - 1
599
600        #Create file function
601        F = file_function(filename + '.sww', domain,
602                          quantities = domain.conserved_quantities,
603                          interpolation_points = interpolation_points)
604
605        #Check that FF updates fixes domain starttime
606        assert allclose(domain.starttime, start)
607
608        #Check that domain.starttime isn't updated if later
609        domain.starttime = start + 1
610        F = file_function(filename + '.sww', domain,
611                          quantities = domain.conserved_quantities,
612                          interpolation_points = interpolation_points)
613        assert allclose(domain.starttime, start+1)
614        domain.starttime = start
615
616
617        #Check linear interpolation in time
618        F = file_function(filename + '.sww', domain,
619                          quantities = domain.conserved_quantities,
620                          interpolation_points = interpolation_points)               
621        for id in range(len(interpolation_points)):
622            x = interpolation_points[id][0]
623            y = interpolation_points[id][1]
624
625            for i in range(20):
626                t = i*10
627                k = i%6
628
629                if k == 0:
630                    q0 = F(t, point_id=id)
631                    q1 = F(t+60, point_id=id)
632
633                if q0 == NAN:
634                    actual = q0
635                else:
636                    actual = (k*q1 + (6-k)*q0)/6
637                q = F(t, point_id=id)
638                #print i, k, t, q
639                #print ' ', q0
640                #print ' ', q1
641                print "q",q
642                print "actual", actual
643                #print
644                if q0 == NAN:
645                     self.failUnless( q == actual, 'Fail!')
646                else:
647                    assert allclose(q, actual)
648
649
650        #Another check of linear interpolation in time
651        for id in range(len(interpolation_points)):
652            q60 = F(60, point_id=id)
653            q120 = F(120, point_id=id)
654
655            t = 90 #Halfway between 60 and 120
656            q = F(t, point_id=id)
657            assert allclose( (q120+q60)/2, q )
658
659            t = 100 #Two thirds of the way between between 60 and 120
660            q = F(t, point_id=id)
661            assert allclose(q60/3 + 2*q120/3, q)
662
663
664
665        #Check that domain.starttime isn't updated if later than file starttime but earlier
666        #than file end time
667        delta = 23
668        domain.starttime = start + delta
669        F = file_function(filename + '.sww', domain,
670                          quantities = domain.conserved_quantities,
671                          interpolation_points = interpolation_points)
672        assert allclose(domain.starttime, start+delta)
673
674
675
676
677        #Now try interpolation with delta offset
678        for id in range(len(interpolation_points)):           
679            x = interpolation_points[id][0]
680            y = interpolation_points[id][1]
681
682            for i in range(20):
683                t = i*10
684                k = i%6
685
686                if k == 0:
687                    q0 = F(t-delta, point_id=id)
688                    q1 = F(t+60-delta, point_id=id)
689
690                q = F(t-delta, point_id=id)
691                assert allclose(q, (k*q1 + (6-k)*q0)/6)
692
693
694        os.remove(filename + '.sww')
695
696
697
698    def NOtest_spatio_temporal_file_function_time(self):
699        # FIXME: This passes but needs some TLC
700        # Test that File function interpolates correctly
701        # When some points are outside the mesh
702
703        import os, time
704        from anuga.config import time_format
705        from Numeric import sin, pi, exp
706        from mesh_factory import rectangular
707        from shallow_water import Domain
708        import anuga.shallow_water.data_manager 
709        from anuga.pmesh.mesh_interface import create_mesh_from_regions
710        finaltime = 1200
711       
712        filename = tempfile.mktemp()
713        #print "filename",filename
714        filename = 'test_file_function'
715
716        meshfilename = tempfile.mktemp(".tsh")
717
718        boundary_tags = {'walls':[0,1],'bom':[2]}
719       
720        polygon_absolute = [[0,-20],[10,-20],[10,15],[-20,15]]
721       
722        create_mesh_from_regions(polygon_absolute,
723                                 boundary_tags,
724                                 10000000,
725                                 filename=meshfilename)
726        domain = Domain(mesh_filename=meshfilename)
727        domain.smooth = False
728        domain.default_order = 2
729        domain.set_datadir('.')
730        domain.set_name(filename)
731        domain.store = True
732        domain.format = 'sww'   #Native netcdf visualisation format
733
734        #print points
735        start = time.mktime(time.strptime('2000', '%Y'))
736        domain.starttime = start
737       
738
739        #Store structure
740        domain.initialise_storage()
741
742        #Compute artificial time steps and store
743        dt = 60  #One minute intervals
744        t = 0.0
745        while t <= finaltime:
746            #Compute quantities
747            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
748            domain.set_quantity('stage', f1)
749
750            f2 = lambda x,y: x+y+t**2
751            domain.set_quantity('xmomentum', f2)
752
753            f3 = lambda x,y: x**2 + y**2 * sin(t*pi/600)
754            domain.set_quantity('ymomentum', f3)
755
756            #Store and advance time
757            domain.time = t
758            domain.store_timestep(domain.conserved_quantities)
759            t += dt
760
761        interpolation_points = [[1,0]]
762        interpolation_points = [[100,1000]]
763       
764        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5],
765                                [78787,78787],[7878,3432]]
766           
767        #Deliberately set domain.starttime to too early
768        domain.starttime = start - 1
769
770        #Create file function
771        F = file_function(filename + '.sww', domain,
772                          quantities = domain.conserved_quantities,
773                          interpolation_points = interpolation_points)
774
775        #Check that FF updates fixes domain starttime
776        assert allclose(domain.starttime, start)
777
778        #Check that domain.starttime isn't updated if later
779        domain.starttime = start + 1
780        F = file_function(filename + '.sww', domain,
781                          quantities = domain.conserved_quantities,
782                          interpolation_points = interpolation_points)
783        assert allclose(domain.starttime, start+1)
784        domain.starttime = start
785
786
787        #Check linear interpolation in time
788        # checking points inside and outside the mesh
789        F = file_function(filename + '.sww', domain,
790                          quantities = domain.conserved_quantities,
791                          interpolation_points = interpolation_points)
792       
793        for id in range(len(interpolation_points)):
794            x = interpolation_points[id][0]
795            y = interpolation_points[id][1]
796
797            for i in range(20):
798                t = i*10
799                k = i%6
800
801                if k == 0:
802                    q0 = F(t, point_id=id)
803                    q1 = F(t+60, point_id=id)
804
805                if q0 == NAN:
806                    actual = q0
807                else:
808                    actual = (k*q1 + (6-k)*q0)/6
809                q = F(t, point_id=id)
810                #print i, k, t, q
811                #print ' ', q0
812                #print ' ', q1
813                #print "q",q
814                #print "actual", actual
815                #print
816                if q0 == NAN:
817                     self.failUnless( q == actual, 'Fail!')
818                else:
819                    assert allclose(q, actual)
820
821        # now lets check points inside the mesh
822        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14]] #, [10,-12.5]] - this point doesn't work WHY?
823        interpolation_points = [[10,-12.5]]
824           
825        print "len(interpolation_points)",len(interpolation_points) 
826        F = file_function(filename + '.sww', domain,
827                          quantities = domain.conserved_quantities,
828                          interpolation_points = interpolation_points)
829
830        domain.starttime = start
831
832
833        #Check linear interpolation in time
834        F = file_function(filename + '.sww', domain,
835                          quantities = domain.conserved_quantities,
836                          interpolation_points = interpolation_points)               
837        for id in range(len(interpolation_points)):
838            x = interpolation_points[id][0]
839            y = interpolation_points[id][1]
840
841            for i in range(20):
842                t = i*10
843                k = i%6
844
845                if k == 0:
846                    q0 = F(t, point_id=id)
847                    q1 = F(t+60, point_id=id)
848
849                if q0 == NAN:
850                    actual = q0
851                else:
852                    actual = (k*q1 + (6-k)*q0)/6
853                q = F(t, point_id=id)
854                print "############"
855                print "id, x, y ", id, x, y #k, t, q
856                print "t", t
857                #print ' ', q0
858                #print ' ', q1
859                print "q",q
860                print "actual", actual
861                #print
862                if q0 == NAN:
863                     self.failUnless( q == actual, 'Fail!')
864                else:
865                    assert allclose(q, actual)
866
867
868        #Another check of linear interpolation in time
869        for id in range(len(interpolation_points)):
870            q60 = F(60, point_id=id)
871            q120 = F(120, point_id=id)
872
873            t = 90 #Halfway between 60 and 120
874            q = F(t, point_id=id)
875            assert allclose( (q120+q60)/2, q )
876
877            t = 100 #Two thirds of the way between between 60 and 120
878            q = F(t, point_id=id)
879            assert allclose(q60/3 + 2*q120/3, q)
880
881
882
883        #Check that domain.starttime isn't updated if later than file starttime but earlier
884        #than file end time
885        delta = 23
886        domain.starttime = start + delta
887        F = file_function(filename + '.sww', domain,
888                          quantities = domain.conserved_quantities,
889                          interpolation_points = interpolation_points)
890        assert allclose(domain.starttime, start+delta)
891
892
893
894
895        #Now try interpolation with delta offset
896        for id in range(len(interpolation_points)):           
897            x = interpolation_points[id][0]
898            y = interpolation_points[id][1]
899
900            for i in range(20):
901                t = i*10
902                k = i%6
903
904                if k == 0:
905                    q0 = F(t-delta, point_id=id)
906                    q1 = F(t+60-delta, point_id=id)
907
908                q = F(t-delta, point_id=id)
909                assert allclose(q, (k*q1 + (6-k)*q0)/6)
910
911
912        os.remove(filename + '.sww')
913
914    def test_file_function_time_with_domain(self):
915        """Test that File function interpolates correctly
916        between given times. No x,y dependency here.
917        Use domain with starttime
918        """
919
920        #Write file
921        import os, time, calendar
922        from anuga.config import time_format
923        from math import sin, pi
924        from domain import Domain
925
926        finaltime = 1200
927        filename = 'test_file_function'
928        fid = open(filename + '.txt', 'w')
929        start = time.mktime(time.strptime('2000', '%Y'))
930        dt = 60  #One minute intervals
931        t = 0.0
932        while t <= finaltime:
933            t_string = time.strftime(time_format, time.gmtime(t+start))
934            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
935            t += dt
936
937        fid.close()
938
939
940        #Convert ASCII file to NetCDF (Which is what we really like!)
941        timefile2netcdf(filename)
942
943
944
945        a = [0.0, 0.0]
946        b = [4.0, 0.0]
947        c = [0.0, 3.0]
948
949        points = [a, b, c]
950        vertices = [[0,1,2]]
951        domain = Domain(points, vertices)
952
953        #Check that domain.starttime is updated if non-existing
954        F = file_function(filename + '.tms', domain)
955       
956        assert allclose(domain.starttime, start)
957
958        #Check that domain.starttime is updated if too early
959        domain.starttime = start - 1
960        F = file_function(filename + '.tms', domain)
961        assert allclose(domain.starttime, start)
962
963        #Check that domain.starttime isn't updated if later
964        domain.starttime = start + 1
965        F = file_function(filename + '.tms', domain)       
966        assert allclose(domain.starttime, start+1)
967
968        domain.starttime = start
969        F = file_function(filename + '.tms', domain,
970                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'],
971                          use_cache=True)
972       
973
974        #print F.precomputed_values
975        #print 'F(60)', F(60)
976       
977        #Now try interpolation
978        for i in range(20):
979            t = i*10
980            q = F(t)
981
982            #Exact linear intpolation
983            assert allclose(q[0], 2*t)
984            if i%6 == 0:
985                assert allclose(q[1], t**2)
986                assert allclose(q[2], sin(t*pi/600))
987
988        #Check non-exact
989
990        t = 90 #Halfway between 60 and 120
991        q = F(t)
992        assert allclose( (120**2 + 60**2)/2, q[1] )
993        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
994
995
996        t = 100 #Two thirds of the way between between 60 and 120
997        q = F(t)
998        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
999        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
1000
1001        os.remove(filename + '.tms')
1002        os.remove(filename + '.txt')       
1003
1004    def test_file_function_time_with_domain_different_start(self):
1005        """Test that File function interpolates correctly
1006        between given times. No x,y dependency here.
1007        Use domain with a starttime later than that of file
1008
1009        ASCII version
1010        """
1011
1012        #Write file
1013        import os, time, calendar
1014        from anuga.config import time_format
1015        from math import sin, pi
1016        from domain import Domain
1017
1018        finaltime = 1200
1019        filename = 'test_file_function'
1020        fid = open(filename + '.txt', 'w')
1021        start = time.mktime(time.strptime('2000', '%Y'))
1022        dt = 60  #One minute intervals
1023        t = 0.0
1024        while t <= finaltime:
1025            t_string = time.strftime(time_format, time.gmtime(t+start))
1026            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
1027            t += dt
1028
1029        fid.close()
1030
1031        #Convert ASCII file to NetCDF (Which is what we really like!)
1032        timefile2netcdf(filename)       
1033
1034        a = [0.0, 0.0]
1035        b = [4.0, 0.0]
1036        c = [0.0, 3.0]
1037
1038        points = [a, b, c]
1039        vertices = [[0,1,2]]
1040        domain = Domain(points, vertices)
1041
1042        #Check that domain.starttime isn't updated if later than file starttime but earlier
1043        #than file end time
1044        delta = 23
1045        domain.starttime = start + delta
1046        F = file_function(filename + '.tms', domain,
1047                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])       
1048        assert allclose(domain.starttime, start+delta)
1049
1050
1051
1052
1053        #Now try interpolation with delta offset
1054        for i in range(20):
1055            t = i*10
1056            q = F(t-delta)
1057
1058            #Exact linear intpolation
1059            assert allclose(q[0], 2*t)
1060            if i%6 == 0:
1061                assert allclose(q[1], t**2)
1062                assert allclose(q[2], sin(t*pi/600))
1063
1064        #Check non-exact
1065
1066        t = 90 #Halfway between 60 and 120
1067        q = F(t-delta)
1068        assert allclose( (120**2 + 60**2)/2, q[1] )
1069        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
1070
1071
1072        t = 100 #Two thirds of the way between between 60 and 120
1073        q = F(t-delta)
1074        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
1075        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
1076
1077
1078        os.remove(filename + '.tms')
1079        os.remove(filename + '.txt')               
1080
1081
1082
1083    def test_apply_expression_to_dictionary(self):
1084
1085        #FIXME: Division is not expected to work for integers.
1086        #This must be caught.
1087        foo = array([[1,2,3],
1088                     [4,5,6]], Float)
1089
1090        bar = array([[-1,0,5],
1091                     [6,1,1]], Float)                 
1092
1093        D = {'X': foo, 'Y': bar}
1094
1095        Z = apply_expression_to_dictionary('X+Y', D)       
1096        assert allclose(Z, foo+bar)
1097
1098        Z = apply_expression_to_dictionary('X*Y', D)       
1099        assert allclose(Z, foo*bar)       
1100
1101        Z = apply_expression_to_dictionary('4*X+Y', D)       
1102        assert allclose(Z, 4*foo+bar)       
1103
1104        # test zero division is OK
1105        Z = apply_expression_to_dictionary('X/Y', D)
1106        assert allclose(1/Z, 1/(foo/bar)) # can't compare inf to inf
1107
1108        # make an error for zero on zero
1109        # this is really an error in Numeric, SciPy core can handle it
1110        # Z = apply_expression_to_dictionary('0/Y', D)
1111
1112        #Check exceptions
1113        try:
1114            #Wrong name
1115            Z = apply_expression_to_dictionary('4*X+A', D)       
1116        except NameError:
1117            pass
1118        else:
1119            msg = 'Should have raised a NameError Exception'
1120            raise msg
1121
1122
1123        try:
1124            #Wrong order
1125            Z = apply_expression_to_dictionary(D, '4*X+A')       
1126        except AssertionError:
1127            pass
1128        else:
1129            msg = 'Should have raised a AssertionError Exception'
1130            raise msg       
1131       
1132
1133    def test_multiple_replace(self):
1134        """Hard test that checks a true word-by-word simultaneous replace
1135        """
1136       
1137        D = {'x': 'xi', 'y': 'eta', 'xi':'lam'}
1138        exp = '3*x+y + xi'
1139       
1140        new = multiple_replace(exp, D)
1141       
1142        assert new == '3*xi+eta + lam'
1143                         
1144
1145
1146    def test_point_on_line_obsolete(self):
1147        """Test that obsolete call issues appropriate warning"""
1148
1149        #Turn warning into an exception
1150        import warnings
1151        warnings.filterwarnings('error')
1152
1153        try:
1154            assert point_on_line( 0, 0.5, 0,1, 0,0 )
1155        except DeprecationWarning:
1156            pass
1157        else:
1158            msg = 'point_on_line should have issued a DeprecationWarning'
1159            raise Exception(msg)   
1160
1161        warnings.resetwarnings()
1162   
1163    def test_get_revision_number(self):
1164        """test_get_revision_number(self):
1165
1166        Test that revision number can be retrieved.
1167        """
1168        if os.environ.has_key('USER') and os.environ['USER'] == 'dgray':
1169            # I have a known snv incompatability issue,
1170            # so I'm skipping this test.
1171            # FIXME when SVN is upgraded on our clusters
1172            pass
1173        else:   
1174            n = get_revision_number()
1175            assert n>=0
1176
1177
1178       
1179    def test_add_directories(self):
1180       
1181        import tempfile
1182        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
1183        directories = ['ja','ne','ke']
1184        kens_dir = add_directories(root_dir, directories)
1185        assert kens_dir == root_dir + sep + 'ja' + sep + 'ne' + \
1186               sep + 'ke'
1187        assert access(root_dir,F_OK)
1188
1189        add_directories(root_dir, directories)
1190        assert access(root_dir,F_OK)
1191       
1192        #clean up!
1193        os.rmdir(kens_dir)
1194        os.rmdir(root_dir + sep + 'ja' + sep + 'ne')
1195        os.rmdir(root_dir + sep + 'ja')
1196        os.rmdir(root_dir)
1197
1198    def test_add_directories_bad(self):
1199       
1200        import tempfile
1201        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
1202        directories = ['/\/!@#@#$%^%&*((*:*:','ne','ke']
1203       
1204        try:
1205            kens_dir = add_directories(root_dir, directories)
1206        except OSError:
1207            pass
1208        else:
1209            msg = 'bad dir name should give OSError'
1210            raise Exception(msg)   
1211           
1212        #clean up!
1213        os.rmdir(root_dir)
1214
1215    def test_check_list(self):
1216
1217        check_list(['stage','xmomentum'])
1218
1219       
1220    def test_add_directories(self):
1221       
1222        import tempfile
1223        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
1224        directories = ['ja','ne','ke']
1225        kens_dir = add_directories(root_dir, directories)
1226        assert kens_dir == root_dir + sep + 'ja' + sep + 'ne' + \
1227               sep + 'ke'
1228        assert access(root_dir,F_OK)
1229
1230        add_directories(root_dir, directories)
1231        assert access(root_dir,F_OK)
1232       
1233        #clean up!
1234        os.rmdir(kens_dir)
1235        os.rmdir(root_dir + sep + 'ja' + sep + 'ne')
1236        os.rmdir(root_dir + sep + 'ja')
1237        os.rmdir(root_dir)
1238
1239    def test_add_directories_bad(self):
1240       
1241        import tempfile
1242        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
1243        directories = ['/\/!@#@#$%^%&*((*:*:','ne','ke']
1244       
1245        try:
1246            kens_dir = add_directories(root_dir, directories)
1247        except OSError:
1248            pass
1249        else:
1250            msg = 'bad dir name should give OSError'
1251            raise Exception(msg)   
1252           
1253        #clean up!
1254        os.rmdir(root_dir)
1255
1256    def test_check_list(self):
1257
1258        check_list(['stage','xmomentum'])
1259       
1260    def test_remove_lone_verts_d(self):
1261        verts = [[0,0],[1,0],[0,1]]
1262        tris = [[0,1,2]]
1263        new_verts, new_tris = remove_lone_verts(verts, tris)
1264        assert new_verts == verts
1265        assert new_tris == tris
1266     
1267
1268    def test_remove_lone_verts_e(self):
1269        verts = [[0,0],[1,0],[0,1],[99,99]]
1270        tris = [[0,1,2]]
1271        new_verts, new_tris = remove_lone_verts(verts, tris)
1272        assert new_verts == verts[0:3]
1273        assert new_tris == tris
1274       
1275    def test_remove_lone_verts_a(self):
1276        verts = [[99,99],[0,0],[1,0],[99,99],[0,1],[99,99]]
1277        tris = [[1,2,4]]
1278        new_verts, new_tris = remove_lone_verts(verts, tris)
1279        #print "new_verts", new_verts
1280        assert new_verts == [[0,0],[1,0],[0,1]]
1281        assert new_tris == [[0,1,2]]
1282     
1283    def test_remove_lone_verts_c(self):
1284        verts = [[0,0],[1,0],[99,99],[0,1]]
1285        tris = [[0,1,3]]
1286        new_verts, new_tris = remove_lone_verts(verts, tris)
1287        #print "new_verts", new_verts
1288        assert new_verts == [[0,0],[1,0],[0,1]]
1289        assert new_tris == [[0,1,2]]
1290       
1291    def test_remove_lone_verts_b(self):
1292        verts = [[0,0],[1,0],[0,1],[99,99],[99,99],[99,99]]
1293        tris = [[0,1,2]]
1294        new_verts, new_tris = remove_lone_verts(verts, tris)
1295        assert new_verts == verts[0:3]
1296        assert new_tris == tris
1297     
1298
1299    def test_remove_lone_verts_e(self):
1300        verts = [[0,0],[1,0],[0,1],[99,99]]
1301        tris = [[0,1,2]]
1302        new_verts, new_tris = remove_lone_verts(verts, tris)
1303        assert new_verts == verts[0:3]
1304        assert new_tris == tris
1305       
1306    def test_get_min_max_values(self):
1307       
1308        list=[8,9,6,1,4]
1309        min1, max1 = get_min_max_values(list)
1310       
1311        assert min1==1 
1312        assert max1==9
1313       
1314    def test_get_min_max_values1(self):
1315       
1316        list=[-8,-9,-6,-1,-4]
1317        min1, max1 = get_min_max_values(list)
1318       
1319#        print 'min1,max1',min1,max1
1320        assert min1==-9 
1321        assert max1==-1
1322
1323#    def test_get_min_max_values2(self):
1324#        '''
1325#        The min and max supplied are greater than the ones in the
1326#        list and therefore are the ones returned
1327#        '''
1328#        list=[-8,-9,-6,-1,-4]
1329#        min1, max1 = get_min_max_values(list,-10,10)
1330#       
1331##        print 'min1,max1',min1,max1
1332#        assert min1==-10
1333#        assert max1==10
1334       
1335    def test_make_plots_from_csv_files(self):
1336       
1337        #if sys.platform == 'win32':  #Windows
1338            try: 
1339                import pylab
1340            except ImportError:
1341                #ANUGA don't need pylab to work so the system doesn't
1342                #rely on pylab being installed
1343                return
1344           
1345       
1346            current_dir=getcwd()+sep+'abstract_2d_finite_volumes'
1347            temp_dir = tempfile.mkdtemp('','figures')
1348    #        print 'temp_dir',temp_dir
1349            fileName = temp_dir+sep+'time_series_3.csv'
1350            file = open(fileName,"w")
1351            file.write("time,stage,speed,momentum,elevation\n\
13521.0, 0, 0, 0, 10 \n\
13532.0, 5, 2, 4, 10 \n\
13543.0, 3, 3, 5, 10 \n")
1355            file.close()
1356   
1357            fileName1 = temp_dir+sep+'time_series_4.csv'
1358            file1 = open(fileName1,"w")
1359            file1.write("time,stage,speed,momentum,elevation\n\
13601.0, 0, 0, 0, 5 \n\
13612.0, -5, -2, -4, 5 \n\
13623.0, -4, -3, -5, 5 \n")
1363            file1.close()
1364   
1365            fileName2 = temp_dir+sep+'time_series_5.csv'
1366            file2 = open(fileName2,"w")
1367            file2.write("time,stage,speed,momentum,elevation\n\
13681.0, 0, 0, 0, 7 \n\
13692.0, 4, -0.45, 57, 7 \n\
13703.0, 6, -0.5, 56, 7 \n")
1371            file2.close()
1372           
1373            dir, name=os.path.split(fileName)
1374            csv2timeseries_graphs(directories_dic={dir:['gauge', 0, 0]},
1375                                  output_dir=temp_dir,
1376                                  base_name='time_series_',
1377                                  plot_numbers=['3-5'],
1378                                  quantities=['speed','stage','momentum'],
1379                                  assess_all_csv_files=True,
1380                                  extra_plot_name='test')
1381           
1382            #print dir+sep+name[:-4]+'_stage_test.png'
1383            assert(access(dir+sep+name[:-4]+'_stage_test.png',F_OK)==True)
1384            assert(access(dir+sep+name[:-4]+'_speed_test.png',F_OK)==True)
1385            assert(access(dir+sep+name[:-4]+'_momentum_test.png',F_OK)==True)
1386   
1387            dir1, name1=os.path.split(fileName1)
1388            assert(access(dir+sep+name1[:-4]+'_stage_test.png',F_OK)==True)
1389            assert(access(dir+sep+name1[:-4]+'_speed_test.png',F_OK)==True)
1390            assert(access(dir+sep+name1[:-4]+'_momentum_test.png',F_OK)==True)
1391   
1392   
1393            dir2, name2=os.path.split(fileName2)
1394            assert(access(dir+sep+name2[:-4]+'_stage_test.png',F_OK)==True)
1395            assert(access(dir+sep+name2[:-4]+'_speed_test.png',F_OK)==True)
1396            assert(access(dir+sep+name2[:-4]+'_momentum_test.png',F_OK)==True)
1397   
1398            del_dir(temp_dir)
1399       
1400
1401    def test_sww2csv_gauges(self):
1402
1403        def elevation_function(x, y):
1404            return -x
1405       
1406        """Most of this test was copied from test_interpolate
1407        test_interpole_sww2csv
1408       
1409        This is testing the gauge_sww2csv function, by creating a sww file and
1410        then exporting the gauges and checking the results.
1411        """
1412       
1413        # Create mesh
1414        mesh_file = tempfile.mktemp(".tsh")   
1415        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
1416        m = Mesh()
1417        m.add_vertices(points)
1418        m.auto_segment()
1419        m.generate_mesh(verbose=False)
1420        m.export_mesh_file(mesh_file)
1421       
1422        # Create shallow water domain
1423        domain = Domain(mesh_file)
1424        os.remove(mesh_file)
1425       
1426        domain.default_order=2
1427        domain.beta_h = 0
1428       
1429        # This test was made before tight_slope_limiters were introduced
1430        # Since were are testing interpolation values this is OK
1431        domain.tight_slope_limiters = 0 
1432       
1433
1434        # Set some field values
1435        domain.set_quantity('elevation', elevation_function)
1436        domain.set_quantity('friction', 0.03)
1437        domain.set_quantity('xmomentum', 3.0)
1438        domain.set_quantity('ymomentum', 4.0)
1439
1440        ######################
1441        # Boundary conditions
1442        B = Transmissive_boundary(domain)
1443        domain.set_boundary( {'exterior': B})
1444
1445        # This call mangles the stage values.
1446        domain.distribute_to_vertices_and_edges()
1447        domain.set_quantity('stage', 1.0)
1448
1449
1450        domain.set_name('datatest' + str(time.time()))
1451        domain.format = 'sww'
1452        domain.smooth = True
1453        domain.reduction = mean
1454
1455
1456        sww = get_dataobject(domain)
1457        sww.store_connectivity()
1458        sww.store_timestep(['stage', 'xmomentum', 'ymomentum','elevation'])
1459        domain.set_quantity('stage', 10.0) # This is automatically limited
1460        # so it will not be less than the elevation
1461        domain.time = 2.
1462        sww.store_timestep(['stage','elevation', 'xmomentum', 'ymomentum'])
1463
1464        # test the function
1465        points = [[5.0,1.],[0.5,2.]]
1466
1467        points_file = tempfile.mktemp(".csv")
1468#        points_file = 'test_point.csv'
1469        file_id = open(points_file,"w")
1470        file_id.write("name, easting, northing, elevation \n\
1471point1, 5.0, 1.0, 3.0\n\
1472point2, 0.5, 2.0, 9.0\n")
1473        file_id.close()
1474
1475       
1476        sww2csv_gauges(sww.filename, 
1477                       points_file,
1478                       verbose=False,
1479                       use_cache=False)
1480
1481#        point1_answers_array = [[0.0,1.0,-5.0,3.0,4.0], [2.0,10.0,-5.0,3.0,4.0]]
1482        point1_answers_array = [[0.0,1.0,6.0,-5.0,3.0,4.0], [2.0,10.0,15.0,-5.0,3.0,4.0]]
1483        point1_filename = 'gauge_point1.csv'
1484        point1_handle = file(point1_filename)
1485        point1_reader = reader(point1_handle)
1486        point1_reader.next()
1487
1488        line=[]
1489        for i,row in enumerate(point1_reader):
1490            #print 'i',i,'row',row
1491            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),float(row[4]),float(row[5])])
1492            #print 'assert line',line[i],'point1',point1_answers_array[i]
1493            assert allclose(line[i], point1_answers_array[i])
1494
1495        point2_answers_array = [[0.0,1.0,1.5,-0.5,3.0,4.0], [2.0,10.0,10.5,-0.5,3.0,4.0]]
1496        point2_filename = 'gauge_point2.csv' 
1497        point2_handle = file(point2_filename)
1498        point2_reader = reader(point2_handle)
1499        point2_reader.next()
1500                       
1501        line=[]
1502        for i,row in enumerate(point2_reader):
1503            #print 'i',i,'row',row
1504            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),float(row[4]),float(row[5])])
1505            #print 'assert line',line[i],'point1',point1_answers_array[i]
1506            assert allclose(line[i], point2_answers_array[i])
1507                         
1508        # clean up
1509        point1_handle.close()
1510        point2_handle.close()
1511        #print "sww.filename",sww.filename
1512        os.remove(sww.filename)
1513        os.remove(points_file)
1514        os.remove(point1_filename)
1515        os.remove(point2_filename)
1516
1517
1518
1519    def test_sww2csv_gauges1(self):
1520        from anuga.pmesh.mesh import Mesh
1521        from anuga.shallow_water import Domain, Transmissive_boundary
1522        from anuga.shallow_water.data_manager import get_dataobject
1523        from csv import reader,writer
1524        import time
1525        import string
1526
1527        def elevation_function(x, y):
1528            return -x
1529       
1530        """Most of this test was copied from test_interpolate
1531        test_interpole_sww2csv
1532       
1533        This is testing the gauge_sww2csv function, by creating a sww file and
1534        then exporting the gauges and checking the results.
1535       
1536        This tests the ablity not to have elevation in the points file and
1537        not store xmomentum and ymomentum
1538        """
1539       
1540        # Create mesh
1541        mesh_file = tempfile.mktemp(".tsh")   
1542        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
1543        m = Mesh()
1544        m.add_vertices(points)
1545        m.auto_segment()
1546        m.generate_mesh(verbose=False)
1547        m.export_mesh_file(mesh_file)
1548       
1549        # Create shallow water domain
1550        domain = Domain(mesh_file)
1551        os.remove(mesh_file)
1552       
1553        domain.default_order=2
1554        domain.beta_h = 0
1555
1556        # Set some field values
1557        domain.set_quantity('elevation', elevation_function)
1558        domain.set_quantity('friction', 0.03)
1559        domain.set_quantity('xmomentum', 3.0)
1560        domain.set_quantity('ymomentum', 4.0)
1561
1562        ######################
1563        # Boundary conditions
1564        B = Transmissive_boundary(domain)
1565        domain.set_boundary( {'exterior': B})
1566
1567        # This call mangles the stage values.
1568        domain.distribute_to_vertices_and_edges()
1569        domain.set_quantity('stage', 1.0)
1570
1571
1572        domain.set_name('datatest' + str(time.time()))
1573        domain.format = 'sww'
1574        domain.smooth = True
1575        domain.reduction = mean
1576
1577        sww = get_dataobject(domain)
1578        sww.store_connectivity()
1579        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
1580        domain.set_quantity('stage', 10.0) # This is automatically limited
1581        # so it will not be less than the elevation
1582        domain.time = 2.
1583        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
1584
1585        # test the function
1586        points = [[5.0,1.],[0.5,2.]]
1587
1588        points_file = tempfile.mktemp(".csv")
1589#        points_file = 'test_point.csv'
1590        file_id = open(points_file,"w")
1591        file_id.write("name,easting,northing \n\
1592point1, 5.0, 1.0\n\
1593point2, 0.5, 2.0\n")
1594        file_id.close()
1595
1596        sww2csv_gauges(sww.filename, 
1597                            points_file,
1598                            quantities=['stage', 'elevation'],
1599                            use_cache=False,
1600                            verbose=False)
1601
1602        point1_answers_array = [[0.0,1.0,-5.0], [2.0,10.0,-5.0]]
1603        point1_filename = 'gauge_point1.csv'
1604        point1_handle = file(point1_filename)
1605        point1_reader = reader(point1_handle)
1606        point1_reader.next()
1607
1608        line=[]
1609        for i,row in enumerate(point1_reader):
1610#            print 'i',i,'row',row
1611            line.append([float(row[0]),float(row[1]),float(row[2])])
1612            #print 'line',line[i],'point1',point1_answers_array[i]
1613            assert allclose(line[i], point1_answers_array[i])
1614
1615        point2_answers_array = [[0.0,1.0,-0.5], [2.0,10.0,-0.5]]
1616        point2_filename = 'gauge_point2.csv' 
1617        point2_handle = file(point2_filename)
1618        point2_reader = reader(point2_handle)
1619        point2_reader.next()
1620                       
1621        line=[]
1622        for i,row in enumerate(point2_reader):
1623#            print 'i',i,'row',row
1624            line.append([float(row[0]),float(row[1]),float(row[2])])
1625#            print 'line',line[i],'point1',point1_answers_array[i]
1626            assert allclose(line[i], point2_answers_array[i])
1627                         
1628        # clean up
1629        point1_handle.close()
1630        point2_handle.close()
1631        #print "sww.filename",sww.filename
1632        os.remove(sww.filename)
1633        os.remove(points_file)
1634        os.remove(point1_filename)
1635        os.remove(point2_filename)
1636
1637
1638    def test_sww2csv_gauges2(self):
1639
1640        def elevation_function(x, y):
1641            return -x
1642       
1643        """Most of this test was copied from test_interpolate
1644        test_interpole_sww2csv
1645       
1646        This is testing the gauge_sww2csv function, by creating a sww file and
1647        then exporting the gauges and checking the results.
1648       
1649        This is the same as sww2csv_gauges except set domain.set_starttime to 5.
1650        Therefore testing the storing of the absolute time in the csv files
1651        """
1652       
1653        # Create mesh
1654        mesh_file = tempfile.mktemp(".tsh")   
1655        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
1656        m = Mesh()
1657        m.add_vertices(points)
1658        m.auto_segment()
1659        m.generate_mesh(verbose=False)
1660        m.export_mesh_file(mesh_file)
1661       
1662        # Create shallow water domain
1663        domain = Domain(mesh_file)
1664        os.remove(mesh_file)
1665       
1666        domain.default_order=2
1667        domain.beta_h = 0
1668
1669        # This test was made before tight_slope_limiters were introduced
1670        # Since were are testing interpolation values this is OK
1671        domain.tight_slope_limiters = 0         
1672
1673        # Set some field values
1674        domain.set_quantity('elevation', elevation_function)
1675        domain.set_quantity('friction', 0.03)
1676        domain.set_quantity('xmomentum', 3.0)
1677        domain.set_quantity('ymomentum', 4.0)
1678        domain.set_starttime(5)
1679
1680        ######################
1681        # Boundary conditions
1682        B = Transmissive_boundary(domain)
1683        domain.set_boundary( {'exterior': B})
1684
1685        # This call mangles the stage values.
1686        domain.distribute_to_vertices_and_edges()
1687        domain.set_quantity('stage', 1.0)
1688       
1689
1690
1691        domain.set_name('datatest' + str(time.time()))
1692        domain.format = 'sww'
1693        domain.smooth = True
1694        domain.reduction = mean
1695
1696        sww = get_dataobject(domain)
1697        sww.store_connectivity()
1698        sww.store_timestep(['stage', 'xmomentum', 'ymomentum','elevation'])
1699        domain.set_quantity('stage', 10.0) # This is automatically limited
1700        # so it will not be less than the elevation
1701        domain.time = 2.
1702        sww.store_timestep(['stage','elevation', 'xmomentum', 'ymomentum'])
1703
1704        # test the function
1705        points = [[5.0,1.],[0.5,2.]]
1706
1707        points_file = tempfile.mktemp(".csv")
1708#        points_file = 'test_point.csv'
1709        file_id = open(points_file,"w")
1710        file_id.write("name, easting, northing, elevation \n\
1711point1, 5.0, 1.0, 3.0\n\
1712point2, 0.5, 2.0, 9.0\n")
1713        file_id.close()
1714
1715       
1716        sww2csv_gauges(sww.filename, 
1717                            points_file,
1718                            verbose=False,
1719                            use_cache=False)
1720
1721#        point1_answers_array = [[0.0,1.0,-5.0,3.0,4.0], [2.0,10.0,-5.0,3.0,4.0]]
1722        point1_answers_array = [[5.0,1.0,6.0,-5.0,3.0,4.0], [7.0,10.0,15.0,-5.0,3.0,4.0]]
1723        point1_filename = 'gauge_point1.csv'
1724        point1_handle = file(point1_filename)
1725        point1_reader = reader(point1_handle)
1726        point1_reader.next()
1727
1728        line=[]
1729        for i,row in enumerate(point1_reader):
1730            #print 'i',i,'row',row
1731            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),float(row[4]),float(row[5])])
1732            #print 'assert line',line[i],'point1',point1_answers_array[i]
1733            assert allclose(line[i], point1_answers_array[i])
1734
1735        point2_answers_array = [[5.0,1.0,1.5,-0.5,3.0,4.0], [7.0,10.0,10.5,-0.5,3.0,4.0]]
1736        point2_filename = 'gauge_point2.csv' 
1737        point2_handle = file(point2_filename)
1738        point2_reader = reader(point2_handle)
1739        point2_reader.next()
1740                       
1741        line=[]
1742        for i,row in enumerate(point2_reader):
1743            #print 'i',i,'row',row
1744            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),float(row[4]),float(row[5])])
1745            #print 'assert line',line[i],'point1',point1_answers_array[i]
1746            assert allclose(line[i], point2_answers_array[i])
1747                         
1748        # clean up
1749        point1_handle.close()
1750        point2_handle.close()
1751        #print "sww.filename",sww.filename
1752        os.remove(sww.filename)
1753        os.remove(points_file)
1754        os.remove(point1_filename)
1755        os.remove(point2_filename)
1756
1757
1758    def test_greens_law(self):
1759
1760        from math import sqrt
1761       
1762        d1 = 80.0
1763        d2 = 20.0
1764        h1 = 1.0
1765        h2 = greens_law(d1,d2,h1)
1766
1767        assert h2==sqrt(2.0)
1768       
1769    def test_calc_bearings(self):
1770 
1771        from math import atan, degrees
1772        #Test East
1773        uh = 1
1774        vh = 1.e-15
1775        angle = calc_bearing(uh, vh)
1776        if 89 < angle < 91: v=1
1777        assert v==1
1778        #Test West
1779        uh = -1
1780        vh = 1.e-15
1781        angle = calc_bearing(uh, vh)
1782        if 269 < angle < 271: v=1
1783        assert v==1
1784        #Test North
1785        uh = 1.e-15
1786        vh = 1
1787        angle = calc_bearing(uh, vh)
1788        if -1 < angle < 1: v=1
1789        assert v==1
1790        #Test South
1791        uh = 1.e-15
1792        vh = -1
1793        angle = calc_bearing(uh, vh)
1794        if 179 < angle < 181: v=1
1795        assert v==1
1796        #Test South-East
1797        uh = 1
1798        vh = -1
1799        angle = calc_bearing(uh, vh)
1800        if 134 < angle < 136: v=1
1801        assert v==1
1802        #Test North-East
1803        uh = 1
1804        vh = 1
1805        angle = calc_bearing(uh, vh)
1806        if 44 < angle < 46: v=1
1807        assert v==1
1808        #Test South-West
1809        uh = -1
1810        vh = -1
1811        angle = calc_bearing(uh, vh)
1812        if 224 < angle < 226: v=1
1813        assert v==1
1814        #Test North-West
1815        uh = -1
1816        vh = 1
1817        angle = calc_bearing(uh, vh)
1818        if 314 < angle < 316: v=1
1819        assert v==1
1820
1821 
1822
1823
1824       
1825
1826#-------------------------------------------------------------
1827if __name__ == "__main__":
1828    suite = unittest.makeSuite(Test_Util,'test')
1829#    suite = unittest.makeSuite(Test_Util,'test_sww2csv')
1830#    runner = unittest.TextTestRunner(verbosity=2)
1831    runner = unittest.TextTestRunner(verbosity=1)
1832    runner.run(suite)
1833
1834
1835
1836
Note: See TracBrowser for help on using the repository browser.