source: anuga_core/source/anuga/abstract_2d_finite_volumes/test_util.py @ 5442

Last change on this file since 5442 was 5442, checked in by ole, 15 years ago

Retired h-limiter and beta_h as per ticket:194.
All unit tests and validation tests pass.

File size: 60.4 KB
RevLine 
[1093]1#!/usr/bin/env python
2
3
4import unittest
[2314]5from Numeric import zeros, array, allclose, Float
[1093]6from math import sqrt, pi
[3983]7import tempfile, os
[4634]8from os import access, F_OK,sep, removedirs,remove,mkdir,getcwd
[1093]9
[3560]10from anuga.abstract_2d_finite_volumes.util import *
[3514]11from anuga.config import epsilon
[4634]12from anuga.shallow_water.data_manager import timefile2netcdf,del_dir
[1093]13
[3514]14from anuga.utilities.numerical_tools import NAN
[1093]15
[4148]16from sys import platform
17
[4876]18from anuga.pmesh.mesh import Mesh
19from anuga.shallow_water import Domain, Transmissive_boundary
20from anuga.shallow_water.data_manager import get_dataobject
21from csv import reader,writer
22import time
23import string
24
[1093]25def test_function(x, y):
26    return x+y
27
28class Test_Util(unittest.TestCase):
29    def setUp(self):
30        pass
31
32    def tearDown(self):
33        pass
34
35
36
37
38    #Geometric
39    #def test_distance(self):
[3560]40    #    from anuga.abstract_2d_finite_volumes.util import distance#
[1093]41    #
42    #    self.failUnless( distance([4,2],[7,6]) == 5.0,
43    #                     'Distance is wrong!')
44    #    self.failUnless( allclose(distance([7,6],[9,8]), 2.82842712475),
45    #                    'distance is wrong!')
46    #    self.failUnless( allclose(distance([9,8],[4,2]), 7.81024967591),
47    #                    'distance is wrong!')
48    #
49    #    self.failUnless( distance([9,8],[4,2]) == distance([4,2],[9,8]),
50    #                    'distance is wrong!')
51
52
[1671]53    def test_file_function_time1(self):
[1093]54        """Test that File function interpolates correctly
55        between given times. No x,y dependency here.
56        """
57
58        #Write file
59        import os, time
[3514]60        from anuga.config import time_format
[1093]61        from math import sin, pi
62
[1671]63        #Typical ASCII file
[1093]64        finaltime = 1200
[1671]65        filename = 'test_file_function'
66        fid = open(filename + '.txt', 'w')
[1093]67        start = time.mktime(time.strptime('2000', '%Y'))
68        dt = 60  #One minute intervals
69        t = 0.0
70        while t <= finaltime:
71            t_string = time.strftime(time_format, time.gmtime(t+start))
72            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
73            t += dt
74
75        fid.close()
76
[1671]77        #Convert ASCII file to NetCDF (Which is what we really like!)
[1835]78        timefile2netcdf(filename)
[1093]79
[1671]80
[1835]81        #Create file function from time series
82        F = file_function(filename + '.tms',
83                          quantities = ['Attribute0',
84                                        'Attribute1',
85                                        'Attribute2'])
[1671]86       
[1093]87        #Now try interpolation
88        for i in range(20):
89            t = i*10
90            q = F(t)
91
92            #Exact linear intpolation
93            assert allclose(q[0], 2*t)
94            if i%6 == 0:
95                assert allclose(q[1], t**2)
96                assert allclose(q[2], sin(t*pi/600))
97
98        #Check non-exact
99
100        t = 90 #Halfway between 60 and 120
101        q = F(t)
102        assert allclose( (120**2 + 60**2)/2, q[1] )
103        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
104
105
106        t = 100 #Two thirds of the way between between 60 and 120
107        q = F(t)
108        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
109        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
110
[1671]111        os.remove(filename + '.txt')
[1835]112        os.remove(filename + '.tms')       
[1093]113
114
[1137]115       
[2852]116    def test_spatio_temporal_file_function_basic(self):
[1664]117        """Test that spatio temporal file function performs the correct
118        interpolations in both time and space
119        NetCDF version (x,y,t dependency)       
120        """
121        import time
122
[4160]123        #Create sww file of simple propagation from left to right
124        #through rectangular domain
125        from shallow_water import Domain, Dirichlet_boundary
[1664]126        from mesh_factory import rectangular
127        from Numeric import take, concatenate, reshape
128
129        #Create basic mesh and shallow water domain
130        points, vertices, boundary = rectangular(3, 3)
131        domain1 = Domain(points, vertices, boundary)
132
[3514]133        from anuga.utilities.numerical_tools import mean
[1664]134        domain1.reduction = mean
135        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
136                              # only one value.
137
138        domain1.default_order = 2
[4160]139        domain1.store = True
[1664]140        domain1.set_datadir('.')
141        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
142        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
143
144        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
145        domain1.set_quantity('elevation', 0)
146        domain1.set_quantity('friction', 0)
147        domain1.set_quantity('stage', 0)
148
149        # Boundary conditions
150        B0 = Dirichlet_boundary([0,0,0])
151        B6 = Dirichlet_boundary([0.6,0,0])
152        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
153        domain1.check_integrity()
154
155        finaltime = 8
156        #Evolution
[4160]157        t0 = -1
[1664]158        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
[2852]159            #print 'Timesteps: %.16f, %.16f' %(t0, t)
[2783]160            #if t == t0:
161            #    msg = 'Duplicate timestep found: %f, %f' %(t0, t)
[4160]162            #   raise msg
163            t0 = t
164             
[1664]165            #domain1.write_time()
166
167
168        #Now read data from sww and check
169        from Scientific.IO.NetCDF import NetCDFFile
170        filename = domain1.get_name() + '.' + domain1.format
171        fid = NetCDFFile(filename)
172
173        x = fid.variables['x'][:]
174        y = fid.variables['y'][:]
175        stage = fid.variables['stage'][:]
176        xmomentum = fid.variables['xmomentum'][:]
177        ymomentum = fid.variables['ymomentum'][:]
178        time = fid.variables['time'][:]
179
180        #Take stage vertex values at last timestep on diagonal
181        #Diagonal is identified by vertices: 0, 5, 10, 15
182
[2852]183        last_time_index = len(time)-1 #Last last_time_index
184        d_stage = reshape(take(stage[last_time_index, :], [0,5,10,15]), (4,1))
185        d_uh = reshape(take(xmomentum[last_time_index, :], [0,5,10,15]), (4,1))
186        d_vh = reshape(take(ymomentum[last_time_index, :], [0,5,10,15]), (4,1))
[1664]187        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
188
189        #Reference interpolated values at midpoints on diagonal at
190        #this timestep are
191        r0 = (D[0] + D[1])/2
192        r1 = (D[1] + D[2])/2
193        r2 = (D[2] + D[3])/2
194
195        #And the midpoints are found now
196        Dx = take(reshape(x, (16,1)), [0,5,10,15])
197        Dy = take(reshape(y, (16,1)), [0,5,10,15])
198
199        diag = concatenate( (Dx, Dy), axis=1)
200        d_midpoints = (diag[1:] + diag[:-1])/2
201
202        #Let us see if the file function can find the correct
203        #values at the midpoints at the last timestep:
204        f = file_function(filename, domain1,
205                          interpolation_points = d_midpoints)
206
[2884]207        T = f.get_time()
208        msg = 'duplicate timesteps: %.16f and %.16f' %(T[-1], T[-2])
209        assert not T[-1] == T[-2], msg
[4160]210        t = time[last_time_index]
[2852]211        q = f(t, point_id=0); assert allclose(r0, q)
212        q = f(t, point_id=1); assert allclose(r1, q)
213        q = f(t, point_id=2); assert allclose(r2, q)
[1664]214
215
216        ##################
217        #Now do the same for the first timestep
218
219        timestep = 0 #First timestep
220        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
221        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
222        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
223        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
224
225        #Reference interpolated values at midpoints on diagonal at
226        #this timestep are
227        r0 = (D[0] + D[1])/2
228        r1 = (D[1] + D[2])/2
229        r2 = (D[2] + D[3])/2
230
231        #Let us see if the file function can find the correct
232        #values
233        q = f(0, point_id=0); assert allclose(r0, q)
234        q = f(0, point_id=1); assert allclose(r1, q)
235        q = f(0, point_id=2); assert allclose(r2, q)
236
237
238        ##################
239        #Now do it again for a timestep in the middle
240
241        timestep = 33
242        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
243        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
244        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
245        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
246
247        #Reference interpolated values at midpoints on diagonal at
248        #this timestep are
249        r0 = (D[0] + D[1])/2
250        r1 = (D[1] + D[2])/2
251        r2 = (D[2] + D[3])/2
252
253        q = f(timestep/10., point_id=0); assert allclose(r0, q)
254        q = f(timestep/10., point_id=1); assert allclose(r1, q)
255        q = f(timestep/10., point_id=2); assert allclose(r2, q)
256
257
258        ##################
259        #Now check temporal interpolation
260        #Halfway between timestep 15 and 16
261
262        timestep = 15
263        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
264        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
265        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
266        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
267
268        #Reference interpolated values at midpoints on diagonal at
269        #this timestep are
270        r0_0 = (D[0] + D[1])/2
271        r1_0 = (D[1] + D[2])/2
272        r2_0 = (D[2] + D[3])/2
273
274        #
275        timestep = 16
276        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
277        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
278        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
279        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
280
281        #Reference interpolated values at midpoints on diagonal at
282        #this timestep are
283        r0_1 = (D[0] + D[1])/2
284        r1_1 = (D[1] + D[2])/2
285        r2_1 = (D[2] + D[3])/2
286
287        # The reference values are
288        r0 = (r0_0 + r0_1)/2
289        r1 = (r1_0 + r1_1)/2
290        r2 = (r2_0 + r2_1)/2
291
292        q = f((timestep - 0.5)/10., point_id=0); assert allclose(r0, q)
293        q = f((timestep - 0.5)/10., point_id=1); assert allclose(r1, q)
294        q = f((timestep - 0.5)/10., point_id=2); assert allclose(r2, q)
295
296        ##################
297        #Finally check interpolation 2 thirds of the way
298        #between timestep 15 and 16
299
300        # The reference values are
301        r0 = (r0_0 + 2*r0_1)/3
302        r1 = (r1_0 + 2*r1_1)/3
303        r2 = (r2_0 + 2*r2_1)/3
304
305        #And the file function gives
306        q = f((timestep - 1.0/3)/10., point_id=0); assert allclose(r0, q)
307        q = f((timestep - 1.0/3)/10., point_id=1); assert allclose(r1, q)
308        q = f((timestep - 1.0/3)/10., point_id=2); assert allclose(r2, q)
309
310        fid.close()
311        import os
312        os.remove(filename)
313
[1884]314
315
316    def test_spatio_temporal_file_function_different_origin(self):
317        """Test that spatio temporal file function performs the correct
318        interpolations in both time and space where space is offset by
319        xllcorner and yllcorner
320        NetCDF version (x,y,t dependency)       
321        """
322        import time
323
[4160]324        #Create sww file of simple propagation from left to right
325        #through rectangular domain
326        from shallow_water import Domain, Dirichlet_boundary
[1884]327        from mesh_factory import rectangular
328        from Numeric import take, concatenate, reshape
329
330
[3514]331        from anuga.coordinate_transforms.geo_reference import Geo_reference
[1884]332        xllcorner = 2048
333        yllcorner = 11000
334        zone = 2
335
336        #Create basic mesh and shallow water domain
337        points, vertices, boundary = rectangular(3, 3)
338        domain1 = Domain(points, vertices, boundary,
339                         geo_reference = Geo_reference(xllcorner = xllcorner,
340                                                       yllcorner = yllcorner))
[1137]341       
342
[4160]343        from anuga.utilities.numerical_tools import mean       
[1884]344        domain1.reduction = mean
345        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
346                              # only one value.
347
348        domain1.default_order = 2
[4160]349        domain1.store = True
[1884]350        domain1.set_datadir('.')
351        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
352        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
353
354        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
355        domain1.set_quantity('elevation', 0)
356        domain1.set_quantity('friction', 0)
357        domain1.set_quantity('stage', 0)
358
359        # Boundary conditions
360        B0 = Dirichlet_boundary([0,0,0])
361        B6 = Dirichlet_boundary([0.6,0,0])
362        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
363        domain1.check_integrity()
364
365        finaltime = 8
366        #Evolution
367        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
368            pass
369            #domain1.write_time()
370
371
372        #Now read data from sww and check
373        from Scientific.IO.NetCDF import NetCDFFile
374        filename = domain1.get_name() + '.' + domain1.format
375        fid = NetCDFFile(filename)
376
377        x = fid.variables['x'][:]
378        y = fid.variables['y'][:]
379        stage = fid.variables['stage'][:]
380        xmomentum = fid.variables['xmomentum'][:]
381        ymomentum = fid.variables['ymomentum'][:]
382        time = fid.variables['time'][:]
383
384        #Take stage vertex values at last timestep on diagonal
385        #Diagonal is identified by vertices: 0, 5, 10, 15
386
[4160]387        last_time_index = len(time)-1 #Last last_time_index     
[2852]388        d_stage = reshape(take(stage[last_time_index, :], [0,5,10,15]), (4,1))
389        d_uh = reshape(take(xmomentum[last_time_index, :], [0,5,10,15]), (4,1))
390        d_vh = reshape(take(ymomentum[last_time_index, :], [0,5,10,15]), (4,1))
[1884]391        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
392
393        #Reference interpolated values at midpoints on diagonal at
394        #this timestep are
395        r0 = (D[0] + D[1])/2
396        r1 = (D[1] + D[2])/2
397        r2 = (D[2] + D[3])/2
398
399        #And the midpoints are found now
400        Dx = take(reshape(x, (16,1)), [0,5,10,15])
401        Dy = take(reshape(y, (16,1)), [0,5,10,15])
402
403        diag = concatenate( (Dx, Dy), axis=1)
404        d_midpoints = (diag[1:] + diag[:-1])/2
405
406
407        #Adjust for georef - make interpolation points absolute
408        d_midpoints[:,0] += xllcorner
409        d_midpoints[:,1] += yllcorner               
410
411        #Let us see if the file function can find the correct
412        #values at the midpoints at the last timestep:
413        f = file_function(filename, domain1,
414                          interpolation_points = d_midpoints)
415
[4160]416        t = time[last_time_index]                         
[2852]417        q = f(t, point_id=0); assert allclose(r0, q)
418        q = f(t, point_id=1); assert allclose(r1, q)
419        q = f(t, point_id=2); assert allclose(r2, q)
[1884]420
421
422        ##################
423        #Now do the same for the first timestep
424
425        timestep = 0 #First timestep
426        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
427        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
428        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
429        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
430
431        #Reference interpolated values at midpoints on diagonal at
432        #this timestep are
433        r0 = (D[0] + D[1])/2
434        r1 = (D[1] + D[2])/2
435        r2 = (D[2] + D[3])/2
436
437        #Let us see if the file function can find the correct
438        #values
439        q = f(0, point_id=0); assert allclose(r0, q)
440        q = f(0, point_id=1); assert allclose(r1, q)
441        q = f(0, point_id=2); assert allclose(r2, q)
442
443
444        ##################
445        #Now do it again for a timestep in the middle
446
447        timestep = 33
448        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
449        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
450        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
451        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
452
453        #Reference interpolated values at midpoints on diagonal at
454        #this timestep are
455        r0 = (D[0] + D[1])/2
456        r1 = (D[1] + D[2])/2
457        r2 = (D[2] + D[3])/2
458
459        q = f(timestep/10., point_id=0); assert allclose(r0, q)
460        q = f(timestep/10., point_id=1); assert allclose(r1, q)
461        q = f(timestep/10., point_id=2); assert allclose(r2, q)
462
463
464        ##################
465        #Now check temporal interpolation
466        #Halfway between timestep 15 and 16
467
468        timestep = 15
469        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
470        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
471        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
472        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
473
474        #Reference interpolated values at midpoints on diagonal at
475        #this timestep are
476        r0_0 = (D[0] + D[1])/2
477        r1_0 = (D[1] + D[2])/2
478        r2_0 = (D[2] + D[3])/2
479
480        #
481        timestep = 16
482        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
483        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
484        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
485        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
486
487        #Reference interpolated values at midpoints on diagonal at
488        #this timestep are
489        r0_1 = (D[0] + D[1])/2
490        r1_1 = (D[1] + D[2])/2
491        r2_1 = (D[2] + D[3])/2
492
493        # The reference values are
494        r0 = (r0_0 + r0_1)/2
495        r1 = (r1_0 + r1_1)/2
496        r2 = (r2_0 + r2_1)/2
497
498        q = f((timestep - 0.5)/10., point_id=0); assert allclose(r0, q)
499        q = f((timestep - 0.5)/10., point_id=1); assert allclose(r1, q)
500        q = f((timestep - 0.5)/10., point_id=2); assert allclose(r2, q)
501
502        ##################
503        #Finally check interpolation 2 thirds of the way
504        #between timestep 15 and 16
505
506        # The reference values are
507        r0 = (r0_0 + 2*r0_1)/3
508        r1 = (r1_0 + 2*r1_1)/3
509        r2 = (r2_0 + 2*r2_1)/3
510
511        #And the file function gives
512        q = f((timestep - 1.0/3)/10., point_id=0); assert allclose(r0, q)
513        q = f((timestep - 1.0/3)/10., point_id=1); assert allclose(r1, q)
514        q = f((timestep - 1.0/3)/10., point_id=2); assert allclose(r2, q)
515
516        fid.close()
517        import os
518        os.remove(filename)
519
520       
521
522
[2679]523    def qtest_spatio_temporal_file_function_time(self):
[1093]524        """Test that File function interpolates correctly
525        between given times.
526        NetCDF version (x,y,t dependency)
527        """
528
529        #Create NetCDF (sww) file to be read
530        # x: 0, 5, 10, 15
531        # y: -20, -10, 0, 10
532        # t: 0, 60, 120, ...., 1200
533        #
534        # test quantities (arbitrary but non-trivial expressions):
535        #
536        #   stage     = 3*x - y**2 + 2*t
537        #   xmomentum = exp( -((x-7)**2 + (y+5)**2)/20 ) * t**2
538        #   ymomentum = x**2 + y**2 * sin(t*pi/600)
539
[1664]540        #NOTE: Nice test that may render some of the others redundant.
[1093]541
542        import os, time
[3514]543        from anuga.config import time_format
[1093]544        from Numeric import sin, pi, exp
545        from mesh_factory import rectangular
546        from shallow_water import Domain
[3563]547        import anuga.shallow_water.data_manager
[1093]548
549        finaltime = 1200
550        filename = 'test_file_function'
551
552        #Create a domain to hold test grid
[1670]553        #(0:15, -20:10)
[1093]554        points, vertices, boundary =\
555                rectangular(4, 4, 15, 30, origin = (0, -20))
[2679]556        print "points", points
[1093]557
558        #print 'Number of elements', len(vertices)
559        domain = Domain(points, vertices, boundary)
560        domain.smooth = False
561        domain.default_order = 2
562        domain.set_datadir('.')
563        domain.set_name(filename)
564        domain.store = True
565        domain.format = 'sww'   #Native netcdf visualisation format
566
567        #print points
568        start = time.mktime(time.strptime('2000', '%Y'))
569        domain.starttime = start
570
571
572        #Store structure
573        domain.initialise_storage()
574
575        #Compute artificial time steps and store
576        dt = 60  #One minute intervals
577        t = 0.0
578        while t <= finaltime:
579            #Compute quantities
580            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
581            domain.set_quantity('stage', f1)
582
583            f2 = lambda x,y: x+y+t**2
584            domain.set_quantity('xmomentum', f2)
585
586            f3 = lambda x,y: x**2 + y**2 * sin(t*pi/600)
587            domain.set_quantity('ymomentum', f3)
588
589            #Store and advance time
590            domain.time = t
591            domain.store_timestep(domain.conserved_quantities)
592            t += dt
593
594
595        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5]]
[2679]596     
597        #Deliberately set domain.starttime to too early
598        domain.starttime = start - 1
[1093]599
[2679]600        #Create file function
601        F = file_function(filename + '.sww', domain,
602                          quantities = domain.conserved_quantities,
603                          interpolation_points = interpolation_points)
[1093]604
[2679]605        #Check that FF updates fixes domain starttime
606        assert allclose(domain.starttime, start)
[1093]607
[2679]608        #Check that domain.starttime isn't updated if later
609        domain.starttime = start + 1
610        F = file_function(filename + '.sww', domain,
611                          quantities = domain.conserved_quantities,
612                          interpolation_points = interpolation_points)
613        assert allclose(domain.starttime, start+1)
614        domain.starttime = start
615
616
617        #Check linear interpolation in time
618        F = file_function(filename + '.sww', domain,
619                          quantities = domain.conserved_quantities,
620                          interpolation_points = interpolation_points)               
621        for id in range(len(interpolation_points)):
622            x = interpolation_points[id][0]
623            y = interpolation_points[id][1]
624
625            for i in range(20):
626                t = i*10
627                k = i%6
628
629                if k == 0:
630                    q0 = F(t, point_id=id)
631                    q1 = F(t+60, point_id=id)
632
[3452]633                if q0 == NAN:
[2679]634                    actual = q0
635                else:
636                    actual = (k*q1 + (6-k)*q0)/6
637                q = F(t, point_id=id)
638                #print i, k, t, q
639                #print ' ', q0
640                #print ' ', q1
641                print "q",q
642                print "actual", actual
643                #print
[3452]644                if q0 == NAN:
[2679]645                     self.failUnless( q == actual, 'Fail!')
646                else:
647                    assert allclose(q, actual)
648
649
650        #Another check of linear interpolation in time
651        for id in range(len(interpolation_points)):
652            q60 = F(60, point_id=id)
653            q120 = F(120, point_id=id)
654
655            t = 90 #Halfway between 60 and 120
656            q = F(t, point_id=id)
657            assert allclose( (q120+q60)/2, q )
658
659            t = 100 #Two thirds of the way between between 60 and 120
660            q = F(t, point_id=id)
661            assert allclose(q60/3 + 2*q120/3, q)
662
663
664
665        #Check that domain.starttime isn't updated if later than file starttime but earlier
666        #than file end time
[4160]667        delta = 23
[2679]668        domain.starttime = start + delta
669        F = file_function(filename + '.sww', domain,
670                          quantities = domain.conserved_quantities,
671                          interpolation_points = interpolation_points)
672        assert allclose(domain.starttime, start+delta)
673
674
675
676
677        #Now try interpolation with delta offset
678        for id in range(len(interpolation_points)):           
679            x = interpolation_points[id][0]
680            y = interpolation_points[id][1]
681
682            for i in range(20):
683                t = i*10
684                k = i%6
685
686                if k == 0:
687                    q0 = F(t-delta, point_id=id)
688                    q1 = F(t+60-delta, point_id=id)
689
690                q = F(t-delta, point_id=id)
691                assert allclose(q, (k*q1 + (6-k)*q0)/6)
692
693
694        os.remove(filename + '.sww')
695
696
697
[4588]698    def NOtest_spatio_temporal_file_function_time(self):
699        # FIXME: This passes but needs some TLC
[2679]700        # Test that File function interpolates correctly
701        # When some points are outside the mesh
702
703        import os, time
[3514]704        from anuga.config import time_format
[2679]705        from Numeric import sin, pi, exp
706        from mesh_factory import rectangular
707        from shallow_water import Domain
[3563]708        import anuga.shallow_water.data_manager 
[3514]709        from anuga.pmesh.mesh_interface import create_mesh_from_regions
[2679]710        finaltime = 1200
711       
712        filename = tempfile.mktemp()
713        #print "filename",filename
714        filename = 'test_file_function'
715
716        meshfilename = tempfile.mktemp(".tsh")
717
718        boundary_tags = {'walls':[0,1],'bom':[2]}
719       
720        polygon_absolute = [[0,-20],[10,-20],[10,15],[-20,15]]
721       
722        create_mesh_from_regions(polygon_absolute,
723                                 boundary_tags,
724                                 10000000,
725                                 filename=meshfilename)
726        domain = Domain(mesh_filename=meshfilename)
727        domain.smooth = False
728        domain.default_order = 2
729        domain.set_datadir('.')
730        domain.set_name(filename)
731        domain.store = True
732        domain.format = 'sww'   #Native netcdf visualisation format
733
734        #print points
735        start = time.mktime(time.strptime('2000', '%Y'))
736        domain.starttime = start
737       
738
739        #Store structure
740        domain.initialise_storage()
741
742        #Compute artificial time steps and store
743        dt = 60  #One minute intervals
744        t = 0.0
745        while t <= finaltime:
746            #Compute quantities
747            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
748            domain.set_quantity('stage', f1)
749
750            f2 = lambda x,y: x+y+t**2
751            domain.set_quantity('xmomentum', f2)
752
753            f3 = lambda x,y: x**2 + y**2 * sin(t*pi/600)
754            domain.set_quantity('ymomentum', f3)
755
756            #Store and advance time
757            domain.time = t
758            domain.store_timestep(domain.conserved_quantities)
759            t += dt
760
761        interpolation_points = [[1,0]]
762        interpolation_points = [[100,1000]]
763       
764        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5],
765                                [78787,78787],[7878,3432]]
766           
[1664]767        #Deliberately set domain.starttime to too early
[1093]768        domain.starttime = start - 1
769
770        #Create file function
771        F = file_function(filename + '.sww', domain,
772                          quantities = domain.conserved_quantities,
773                          interpolation_points = interpolation_points)
774
775        #Check that FF updates fixes domain starttime
776        assert allclose(domain.starttime, start)
777
778        #Check that domain.starttime isn't updated if later
779        domain.starttime = start + 1
780        F = file_function(filename + '.sww', domain,
781                          quantities = domain.conserved_quantities,
782                          interpolation_points = interpolation_points)
783        assert allclose(domain.starttime, start+1)
784        domain.starttime = start
785
786
787        #Check linear interpolation in time
[2679]788        # checking points inside and outside the mesh
[1668]789        F = file_function(filename + '.sww', domain,
790                          quantities = domain.conserved_quantities,
[2679]791                          interpolation_points = interpolation_points)
792       
[1670]793        for id in range(len(interpolation_points)):
[1093]794            x = interpolation_points[id][0]
795            y = interpolation_points[id][1]
796
797            for i in range(20):
798                t = i*10
799                k = i%6
800
801                if k == 0:
802                    q0 = F(t, point_id=id)
803                    q1 = F(t+60, point_id=id)
804
[3452]805                if q0 == NAN:
[2679]806                    actual = q0
807                else:
808                    actual = (k*q1 + (6-k)*q0)/6
[1093]809                q = F(t, point_id=id)
[1668]810                #print i, k, t, q
811                #print ' ', q0
812                #print ' ', q1
[2679]813                #print "q",q
814                #print "actual", actual
[1668]815                #print
[3452]816                if q0 == NAN:
[2679]817                     self.failUnless( q == actual, 'Fail!')
818                else:
819                    assert allclose(q, actual)
[1093]820
[2679]821        # now lets check points inside the mesh
822        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14]] #, [10,-12.5]] - this point doesn't work WHY?
823        interpolation_points = [[10,-12.5]]
824           
825        print "len(interpolation_points)",len(interpolation_points) 
826        F = file_function(filename + '.sww', domain,
827                          quantities = domain.conserved_quantities,
828                          interpolation_points = interpolation_points)
[1093]829
[2679]830        domain.starttime = start
831
832
833        #Check linear interpolation in time
834        F = file_function(filename + '.sww', domain,
835                          quantities = domain.conserved_quantities,
836                          interpolation_points = interpolation_points)               
837        for id in range(len(interpolation_points)):
838            x = interpolation_points[id][0]
839            y = interpolation_points[id][1]
840
841            for i in range(20):
842                t = i*10
843                k = i%6
844
845                if k == 0:
846                    q0 = F(t, point_id=id)
847                    q1 = F(t+60, point_id=id)
848
[3452]849                if q0 == NAN:
[2679]850                    actual = q0
851                else:
852                    actual = (k*q1 + (6-k)*q0)/6
853                q = F(t, point_id=id)
854                print "############"
855                print "id, x, y ", id, x, y #k, t, q
856                print "t", t
857                #print ' ', q0
858                #print ' ', q1
859                print "q",q
860                print "actual", actual
861                #print
[3452]862                if q0 == NAN:
[2679]863                     self.failUnless( q == actual, 'Fail!')
864                else:
865                    assert allclose(q, actual)
866
867
[1093]868        #Another check of linear interpolation in time
869        for id in range(len(interpolation_points)):
870            q60 = F(60, point_id=id)
871            q120 = F(120, point_id=id)
872
873            t = 90 #Halfway between 60 and 120
[1668]874            q = F(t, point_id=id)
[1093]875            assert allclose( (q120+q60)/2, q )
876
877            t = 100 #Two thirds of the way between between 60 and 120
878            q = F(t, point_id=id)
879            assert allclose(q60/3 + 2*q120/3, q)
880
881
882
883        #Check that domain.starttime isn't updated if later than file starttime but earlier
884        #than file end time
[4160]885        delta = 23
[1093]886        domain.starttime = start + delta
887        F = file_function(filename + '.sww', domain,
888                          quantities = domain.conserved_quantities,
889                          interpolation_points = interpolation_points)
890        assert allclose(domain.starttime, start+delta)
891
892
893
894
895        #Now try interpolation with delta offset
[1670]896        for id in range(len(interpolation_points)):           
[1093]897            x = interpolation_points[id][0]
898            y = interpolation_points[id][1]
899
900            for i in range(20):
901                t = i*10
902                k = i%6
903
904                if k == 0:
905                    q0 = F(t-delta, point_id=id)
906                    q1 = F(t+60-delta, point_id=id)
907
908                q = F(t-delta, point_id=id)
909                assert allclose(q, (k*q1 + (6-k)*q0)/6)
910
911
912        os.remove(filename + '.sww')
913
914    def test_file_function_time_with_domain(self):
915        """Test that File function interpolates correctly
916        between given times. No x,y dependency here.
917        Use domain with starttime
918        """
919
920        #Write file
921        import os, time, calendar
[3514]922        from anuga.config import time_format
[1093]923        from math import sin, pi
924        from domain import Domain
925
926        finaltime = 1200
[1671]927        filename = 'test_file_function'
928        fid = open(filename + '.txt', 'w')
[1093]929        start = time.mktime(time.strptime('2000', '%Y'))
930        dt = 60  #One minute intervals
931        t = 0.0
932        while t <= finaltime:
933            t_string = time.strftime(time_format, time.gmtime(t+start))
934            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
935            t += dt
936
937        fid.close()
938
[1671]939
940        #Convert ASCII file to NetCDF (Which is what we really like!)
[1835]941        timefile2netcdf(filename)
[1671]942
943
944
[1093]945        a = [0.0, 0.0]
946        b = [4.0, 0.0]
947        c = [0.0, 3.0]
948
949        points = [a, b, c]
950        vertices = [[0,1,2]]
951        domain = Domain(points, vertices)
952
[5221]953        # Check that domain.starttime is updated if non-existing
954        F = file_function(filename + '.tms',
955                          domain,
956                          quantities = ['Attribute0', 'Attribute1', 'Attribute2']) 
[1093]957        assert allclose(domain.starttime, start)
958
[5221]959        # Check that domain.starttime is updated if too early
[1093]960        domain.starttime = start - 1
[5221]961        F = file_function(filename + '.tms',
962                          domain,
963                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])
[1093]964        assert allclose(domain.starttime, start)
965
[5221]966        # Check that domain.starttime isn't updated if later
[1093]967        domain.starttime = start + 1
[5221]968        F = file_function(filename + '.tms',
969                          domain,
970                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])
[1093]971        assert allclose(domain.starttime, start+1)
972
973        domain.starttime = start
[5221]974        F = file_function(filename + '.tms',
975                          domain,
[4278]976                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'],
977                          use_cache=True)
[1671]978       
[1093]979
[1671]980        #print F.precomputed_values
981        #print 'F(60)', F(60)
982       
[1093]983        #Now try interpolation
984        for i in range(20):
985            t = i*10
986            q = F(t)
987
988            #Exact linear intpolation
989            assert allclose(q[0], 2*t)
990            if i%6 == 0:
991                assert allclose(q[1], t**2)
992                assert allclose(q[2], sin(t*pi/600))
993
994        #Check non-exact
995
996        t = 90 #Halfway between 60 and 120
997        q = F(t)
998        assert allclose( (120**2 + 60**2)/2, q[1] )
999        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
1000
1001
1002        t = 100 #Two thirds of the way between between 60 and 120
1003        q = F(t)
1004        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
1005        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
1006
[1835]1007        os.remove(filename + '.tms')
[1671]1008        os.remove(filename + '.txt')       
[1093]1009
1010    def test_file_function_time_with_domain_different_start(self):
1011        """Test that File function interpolates correctly
1012        between given times. No x,y dependency here.
1013        Use domain with a starttime later than that of file
1014
1015        ASCII version
1016        """
1017
1018        #Write file
1019        import os, time, calendar
[3514]1020        from anuga.config import time_format
[1093]1021        from math import sin, pi
1022        from domain import Domain
1023
1024        finaltime = 1200
[1671]1025        filename = 'test_file_function'
1026        fid = open(filename + '.txt', 'w')
[1093]1027        start = time.mktime(time.strptime('2000', '%Y'))
1028        dt = 60  #One minute intervals
1029        t = 0.0
1030        while t <= finaltime:
1031            t_string = time.strftime(time_format, time.gmtime(t+start))
1032            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
1033            t += dt
1034
1035        fid.close()
1036
[1671]1037        #Convert ASCII file to NetCDF (Which is what we really like!)
[1835]1038        timefile2netcdf(filename)       
[1671]1039
[1093]1040        a = [0.0, 0.0]
1041        b = [4.0, 0.0]
1042        c = [0.0, 3.0]
1043
1044        points = [a, b, c]
1045        vertices = [[0,1,2]]
1046        domain = Domain(points, vertices)
1047
1048        #Check that domain.starttime isn't updated if later than file starttime but earlier
1049        #than file end time
[4160]1050        delta = 23
[1093]1051        domain.starttime = start + delta
[1835]1052        F = file_function(filename + '.tms', domain,
[1671]1053                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])       
[1093]1054        assert allclose(domain.starttime, start+delta)
1055
1056
1057
1058
1059        #Now try interpolation with delta offset
1060        for i in range(20):
1061            t = i*10
1062            q = F(t-delta)
1063
1064            #Exact linear intpolation
1065            assert allclose(q[0], 2*t)
1066            if i%6 == 0:
1067                assert allclose(q[1], t**2)
1068                assert allclose(q[2], sin(t*pi/600))
1069
1070        #Check non-exact
1071
1072        t = 90 #Halfway between 60 and 120
1073        q = F(t-delta)
1074        assert allclose( (120**2 + 60**2)/2, q[1] )
1075        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
1076
1077
1078        t = 100 #Two thirds of the way between between 60 and 120
1079        q = F(t-delta)
1080        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
1081        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
1082
1083
[1835]1084        os.remove(filename + '.tms')
[1671]1085        os.remove(filename + '.txt')               
[1093]1086
1087
[1671]1088
[1919]1089    def test_apply_expression_to_dictionary(self):
[1093]1090
[2314]1091        #FIXME: Division is not expected to work for integers.
1092        #This must be caught.
[1919]1093        foo = array([[1,2,3],
[2314]1094                     [4,5,6]], Float)
[1093]1095
[1919]1096        bar = array([[-1,0,5],
[2314]1097                     [6,1,1]], Float)                 
[1093]1098
[1919]1099        D = {'X': foo, 'Y': bar}
[1093]1100
[1919]1101        Z = apply_expression_to_dictionary('X+Y', D)       
1102        assert allclose(Z, foo+bar)
1103
1104        Z = apply_expression_to_dictionary('X*Y', D)       
1105        assert allclose(Z, foo*bar)       
1106
1107        Z = apply_expression_to_dictionary('4*X+Y', D)       
1108        assert allclose(Z, 4*foo+bar)       
1109
[2314]1110        # test zero division is OK
1111        Z = apply_expression_to_dictionary('X/Y', D)
1112        assert allclose(1/Z, 1/(foo/bar)) # can't compare inf to inf
1113
1114        # make an error for zero on zero
1115        # this is really an error in Numeric, SciPy core can handle it
1116        # Z = apply_expression_to_dictionary('0/Y', D)
1117
[1919]1118        #Check exceptions
1119        try:
1120            #Wrong name
1121            Z = apply_expression_to_dictionary('4*X+A', D)       
1122        except NameError:
1123            pass
1124        else:
1125            msg = 'Should have raised a NameError Exception'
1126            raise msg
1127
1128
1129        try:
1130            #Wrong order
1131            Z = apply_expression_to_dictionary(D, '4*X+A')       
1132        except AssertionError:
1133            pass
1134        else:
1135            msg = 'Should have raised a AssertionError Exception'
1136            raise msg       
1137       
1138
[1927]1139    def test_multiple_replace(self):
1140        """Hard test that checks a true word-by-word simultaneous replace
1141        """
1142       
1143        D = {'x': 'xi', 'y': 'eta', 'xi':'lam'}
1144        exp = '3*x+y + xi'
1145       
1146        new = multiple_replace(exp, D)
1147       
1148        assert new == '3*xi+eta + lam'
1149                         
[1919]1150
1151
[1932]1152    def test_point_on_line_obsolete(self):
1153        """Test that obsolete call issues appropriate warning"""
1154
[4160]1155        #Turn warning into an exception
[1932]1156        import warnings
[4160]1157        warnings.filterwarnings('error')
[1932]1158
1159        try:
[4160]1160            assert point_on_line( 0, 0.5, 0,1, 0,0 )
1161        except DeprecationWarning:
1162            pass
1163        else:
1164            msg = 'point_on_line should have issued a DeprecationWarning'
1165            raise Exception(msg)   
[1932]1166
[4160]1167        warnings.resetwarnings()
[2929]1168   
[4170]1169    def test_get_revision_number(self):
1170        """test_get_revision_number(self):
1171
1172        Test that revision number can be retrieved.
1173        """
[4644]1174        if os.environ.has_key('USER') and os.environ['USER'] == 'dgray':
1175            # I have a known snv incompatability issue,
1176            # so I'm skipping this test.
1177            # FIXME when SVN is upgraded on our clusters
1178            pass
1179        else:   
1180            n = get_revision_number()
1181            assert n>=0
[4159]1182
[4170]1183
1184       
1185    def test_add_directories(self):
1186       
1187        import tempfile
1188        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
1189        directories = ['ja','ne','ke']
1190        kens_dir = add_directories(root_dir, directories)
1191        assert kens_dir == root_dir + sep + 'ja' + sep + 'ne' + \
1192               sep + 'ke'
1193        assert access(root_dir,F_OK)
1194
1195        add_directories(root_dir, directories)
1196        assert access(root_dir,F_OK)
1197       
1198        #clean up!
1199        os.rmdir(kens_dir)
1200        os.rmdir(root_dir + sep + 'ja' + sep + 'ne')
1201        os.rmdir(root_dir + sep + 'ja')
1202        os.rmdir(root_dir)
1203
1204    def test_add_directories_bad(self):
1205       
1206        import tempfile
1207        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
1208        directories = ['/\/!@#@#$%^%&*((*:*:','ne','ke']
1209       
[4159]1210        try:
[4170]1211            kens_dir = add_directories(root_dir, directories)
1212        except OSError:
1213            pass
1214        else:
1215            msg = 'bad dir name should give OSError'
1216            raise Exception(msg)   
1217           
1218        #clean up!
1219        os.rmdir(root_dir)
1220
1221    def test_check_list(self):
1222
1223        check_list(['stage','xmomentum'])
1224
[4160]1225       
[3983]1226    def test_add_directories(self):
1227       
1228        import tempfile
1229        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
1230        directories = ['ja','ne','ke']
1231        kens_dir = add_directories(root_dir, directories)
1232        assert kens_dir == root_dir + sep + 'ja' + sep + 'ne' + \
1233               sep + 'ke'
1234        assert access(root_dir,F_OK)
[1932]1235
[3994]1236        add_directories(root_dir, directories)
1237        assert access(root_dir,F_OK)
1238       
[3983]1239        #clean up!
1240        os.rmdir(kens_dir)
1241        os.rmdir(root_dir + sep + 'ja' + sep + 'ne')
1242        os.rmdir(root_dir + sep + 'ja')
1243        os.rmdir(root_dir)
1244
[3994]1245    def test_add_directories_bad(self):
1246       
1247        import tempfile
1248        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
[3995]1249        directories = ['/\/!@#@#$%^%&*((*:*:','ne','ke']
[3994]1250       
1251        try:
1252            kens_dir = add_directories(root_dir, directories)
[4160]1253        except OSError:
1254            pass
1255        else:
1256            msg = 'bad dir name should give OSError'
1257            raise Exception(msg)   
[3994]1258           
1259        #clean up!
1260        os.rmdir(root_dir)
[4076]1261
1262    def test_check_list(self):
1263
1264        check_list(['stage','xmomentum'])
[4490]1265       
1266    def test_remove_lone_verts_d(self):
1267        verts = [[0,0],[1,0],[0,1]]
1268        tris = [[0,1,2]]
1269        new_verts, new_tris = remove_lone_verts(verts, tris)
1270        assert new_verts == verts
1271        assert new_tris == tris
1272     
1273
1274    def test_remove_lone_verts_e(self):
1275        verts = [[0,0],[1,0],[0,1],[99,99]]
1276        tris = [[0,1,2]]
1277        new_verts, new_tris = remove_lone_verts(verts, tris)
1278        assert new_verts == verts[0:3]
1279        assert new_tris == tris
1280       
1281    def test_remove_lone_verts_a(self):
1282        verts = [[99,99],[0,0],[1,0],[99,99],[0,1],[99,99]]
1283        tris = [[1,2,4]]
1284        new_verts, new_tris = remove_lone_verts(verts, tris)
1285        #print "new_verts", new_verts
1286        assert new_verts == [[0,0],[1,0],[0,1]]
1287        assert new_tris == [[0,1,2]]
1288     
1289    def test_remove_lone_verts_c(self):
1290        verts = [[0,0],[1,0],[99,99],[0,1]]
1291        tris = [[0,1,3]]
1292        new_verts, new_tris = remove_lone_verts(verts, tris)
1293        #print "new_verts", new_verts
1294        assert new_verts == [[0,0],[1,0],[0,1]]
1295        assert new_tris == [[0,1,2]]
1296       
1297    def test_remove_lone_verts_b(self):
1298        verts = [[0,0],[1,0],[0,1],[99,99],[99,99],[99,99]]
1299        tris = [[0,1,2]]
1300        new_verts, new_tris = remove_lone_verts(verts, tris)
1301        assert new_verts == verts[0:3]
1302        assert new_tris == tris
1303     
1304
1305    def test_remove_lone_verts_e(self):
1306        verts = [[0,0],[1,0],[0,1],[99,99]]
1307        tris = [[0,1,2]]
1308        new_verts, new_tris = remove_lone_verts(verts, tris)
1309        assert new_verts == verts[0:3]
1310        assert new_tris == tris
[4634]1311       
1312    def test_get_min_max_values(self):
1313       
1314        list=[8,9,6,1,4]
1315        min1, max1 = get_min_max_values(list)
1316       
1317        assert min1==1 
1318        assert max1==9
[4889]1319       
[4634]1320    def test_get_min_max_values1(self):
1321       
1322        list=[-8,-9,-6,-1,-4]
[4935]1323        min1, max1 = get_min_max_values(list)
[4634]1324       
1325#        print 'min1,max1',min1,max1
1326        assert min1==-9 
1327        assert max1==-1
1328
[4935]1329#    def test_get_min_max_values2(self):
1330#        '''
1331#        The min and max supplied are greater than the ones in the
1332#        list and therefore are the ones returned
1333#        '''
1334#        list=[-8,-9,-6,-1,-4]
1335#        min1, max1 = get_min_max_values(list,-10,10)
1336#       
1337##        print 'min1,max1',min1,max1
1338#        assert min1==-10
1339#        assert max1==10
[4634]1340       
[5136]1341    def test_make_plots_from_csv_files(self):
[4634]1342       
[5182]1343        #if sys.platform == 'win32':  #Windows
[5136]1344            try: 
1345                import pylab
1346            except ImportError:
1347                #ANUGA don't need pylab to work so the system doesn't
1348                #rely on pylab being installed
1349                return
1350           
[4647]1351       
1352            current_dir=getcwd()+sep+'abstract_2d_finite_volumes'
1353            temp_dir = tempfile.mkdtemp('','figures')
1354    #        print 'temp_dir',temp_dir
1355            fileName = temp_dir+sep+'time_series_3.csv'
1356            file = open(fileName,"w")
[5166]1357            file.write("time,stage,speed,momentum,elevation\n\
[4634]13581.0, 0, 0, 0, 10 \n\
13592.0, 5, 2, 4, 10 \n\
13603.0, 3, 3, 5, 10 \n")
[4647]1361            file.close()
1362   
1363            fileName1 = temp_dir+sep+'time_series_4.csv'
1364            file1 = open(fileName1,"w")
[5166]1365            file1.write("time,stage,speed,momentum,elevation\n\
[4634]13661.0, 0, 0, 0, 5 \n\
13672.0, -5, -2, -4, 5 \n\
13683.0, -4, -3, -5, 5 \n")
[4647]1369            file1.close()
1370   
1371            fileName2 = temp_dir+sep+'time_series_5.csv'
1372            file2 = open(fileName2,"w")
[5166]1373            file2.write("time,stage,speed,momentum,elevation\n\
[4634]13741.0, 0, 0, 0, 7 \n\
13752.0, 4, -0.45, 57, 7 \n\
13763.0, 6, -0.5, 56, 7 \n")
[4647]1377            file2.close()
1378           
1379            dir, name=os.path.split(fileName)
[5182]1380            csv2timeseries_graphs(directories_dic={dir:['gauge', 0, 0]},
1381                                  output_dir=temp_dir,
1382                                  base_name='time_series_',
1383                                  plot_numbers=['3-5'],
1384                                  quantities=['speed','stage','momentum'],
1385                                  assess_all_csv_files=True,
1386                                  extra_plot_name='test')
[4647]1387           
[5166]1388            #print dir+sep+name[:-4]+'_stage_test.png'
1389            assert(access(dir+sep+name[:-4]+'_stage_test.png',F_OK)==True)
1390            assert(access(dir+sep+name[:-4]+'_speed_test.png',F_OK)==True)
1391            assert(access(dir+sep+name[:-4]+'_momentum_test.png',F_OK)==True)
[4647]1392   
1393            dir1, name1=os.path.split(fileName1)
[5166]1394            assert(access(dir+sep+name1[:-4]+'_stage_test.png',F_OK)==True)
1395            assert(access(dir+sep+name1[:-4]+'_speed_test.png',F_OK)==True)
1396            assert(access(dir+sep+name1[:-4]+'_momentum_test.png',F_OK)==True)
[4647]1397   
1398   
1399            dir2, name2=os.path.split(fileName2)
[5166]1400            assert(access(dir+sep+name2[:-4]+'_stage_test.png',F_OK)==True)
1401            assert(access(dir+sep+name2[:-4]+'_speed_test.png',F_OK)==True)
1402            assert(access(dir+sep+name2[:-4]+'_momentum_test.png',F_OK)==True)
[4647]1403   
1404            del_dir(temp_dir)
[4634]1405       
1406
[4910]1407    def test_sww2csv_gauges(self):
[4876]1408
1409        def elevation_function(x, y):
1410            return -x
[4306]1411       
[5181]1412        """Most of this test was copied from test_interpolate
1413        test_interpole_sww2csv
[4876]1414       
1415        This is testing the gauge_sww2csv function, by creating a sww file and
1416        then exporting the gauges and checking the results.
1417        """
1418       
[5181]1419        # Create mesh
[4876]1420        mesh_file = tempfile.mktemp(".tsh")   
1421        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
1422        m = Mesh()
1423        m.add_vertices(points)
1424        m.auto_segment()
1425        m.generate_mesh(verbose=False)
1426        m.export_mesh_file(mesh_file)
1427       
[5181]1428        # Create shallow water domain
[4876]1429        domain = Domain(mesh_file)
1430        os.remove(mesh_file)
1431       
1432        domain.default_order=2
[5181]1433       
1434        # This test was made before tight_slope_limiters were introduced
1435        # Since were are testing interpolation values this is OK
1436        domain.tight_slope_limiters = 0 
1437       
[4876]1438
[5181]1439        # Set some field values
[4876]1440        domain.set_quantity('elevation', elevation_function)
1441        domain.set_quantity('friction', 0.03)
1442        domain.set_quantity('xmomentum', 3.0)
1443        domain.set_quantity('ymomentum', 4.0)
1444
1445        ######################
1446        # Boundary conditions
1447        B = Transmissive_boundary(domain)
1448        domain.set_boundary( {'exterior': B})
1449
1450        # This call mangles the stage values.
1451        domain.distribute_to_vertices_and_edges()
1452        domain.set_quantity('stage', 1.0)
1453
1454
1455        domain.set_name('datatest' + str(time.time()))
1456        domain.format = 'sww'
1457        domain.smooth = True
1458        domain.reduction = mean
1459
[5181]1460
[4876]1461        sww = get_dataobject(domain)
1462        sww.store_connectivity()
1463        sww.store_timestep(['stage', 'xmomentum', 'ymomentum','elevation'])
1464        domain.set_quantity('stage', 10.0) # This is automatically limited
1465        # so it will not be less than the elevation
1466        domain.time = 2.
1467        sww.store_timestep(['stage','elevation', 'xmomentum', 'ymomentum'])
1468
1469        # test the function
1470        points = [[5.0,1.],[0.5,2.]]
1471
1472        points_file = tempfile.mktemp(".csv")
1473#        points_file = 'test_point.csv'
1474        file_id = open(points_file,"w")
1475        file_id.write("name, easting, northing, elevation \n\
1476point1, 5.0, 1.0, 3.0\n\
1477point2, 0.5, 2.0, 9.0\n")
1478        file_id.close()
1479
1480       
[4910]1481        sww2csv_gauges(sww.filename, 
[5181]1482                       points_file,
1483                       verbose=False,
1484                       use_cache=False)
[4876]1485
[4935]1486#        point1_answers_array = [[0.0,1.0,-5.0,3.0,4.0], [2.0,10.0,-5.0,3.0,4.0]]
1487        point1_answers_array = [[0.0,1.0,6.0,-5.0,3.0,4.0], [2.0,10.0,15.0,-5.0,3.0,4.0]]
[4876]1488        point1_filename = 'gauge_point1.csv'
1489        point1_handle = file(point1_filename)
1490        point1_reader = reader(point1_handle)
1491        point1_reader.next()
1492
1493        line=[]
1494        for i,row in enumerate(point1_reader):
[4935]1495            #print 'i',i,'row',row
1496            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),float(row[4]),float(row[5])])
[4876]1497            #print 'assert line',line[i],'point1',point1_answers_array[i]
1498            assert allclose(line[i], point1_answers_array[i])
1499
[4935]1500        point2_answers_array = [[0.0,1.0,1.5,-0.5,3.0,4.0], [2.0,10.0,10.5,-0.5,3.0,4.0]]
[4876]1501        point2_filename = 'gauge_point2.csv' 
1502        point2_handle = file(point2_filename)
1503        point2_reader = reader(point2_handle)
1504        point2_reader.next()
1505                       
1506        line=[]
1507        for i,row in enumerate(point2_reader):
[4935]1508            #print 'i',i,'row',row
1509            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),float(row[4]),float(row[5])])
[4876]1510            #print 'assert line',line[i],'point1',point1_answers_array[i]
1511            assert allclose(line[i], point2_answers_array[i])
1512                         
1513        # clean up
1514        point1_handle.close()
1515        point2_handle.close()
1516        #print "sww.filename",sww.filename
1517        os.remove(sww.filename)
1518        os.remove(points_file)
1519        os.remove(point1_filename)
1520        os.remove(point2_filename)
1521
1522
1523
[4910]1524    def test_sww2csv_gauges1(self):
[4876]1525        from anuga.pmesh.mesh import Mesh
1526        from anuga.shallow_water import Domain, Transmissive_boundary
1527        from anuga.shallow_water.data_manager import get_dataobject
1528        from csv import reader,writer
1529        import time
1530        import string
1531
1532        def elevation_function(x, y):
1533            return -x
1534       
[5181]1535        """Most of this test was copied from test_interpolate
1536        test_interpole_sww2csv
[4876]1537       
1538        This is testing the gauge_sww2csv function, by creating a sww file and
1539        then exporting the gauges and checking the results.
1540       
1541        This tests the ablity not to have elevation in the points file and
1542        not store xmomentum and ymomentum
1543        """
1544       
[5181]1545        # Create mesh
[4876]1546        mesh_file = tempfile.mktemp(".tsh")   
1547        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
1548        m = Mesh()
1549        m.add_vertices(points)
1550        m.auto_segment()
1551        m.generate_mesh(verbose=False)
1552        m.export_mesh_file(mesh_file)
1553       
[5181]1554        # Create shallow water domain
[4876]1555        domain = Domain(mesh_file)
1556        os.remove(mesh_file)
1557       
1558        domain.default_order=2
1559
[5181]1560        # Set some field values
[4876]1561        domain.set_quantity('elevation', elevation_function)
1562        domain.set_quantity('friction', 0.03)
1563        domain.set_quantity('xmomentum', 3.0)
1564        domain.set_quantity('ymomentum', 4.0)
1565
1566        ######################
1567        # Boundary conditions
1568        B = Transmissive_boundary(domain)
1569        domain.set_boundary( {'exterior': B})
1570
1571        # This call mangles the stage values.
1572        domain.distribute_to_vertices_and_edges()
1573        domain.set_quantity('stage', 1.0)
1574
1575
1576        domain.set_name('datatest' + str(time.time()))
1577        domain.format = 'sww'
1578        domain.smooth = True
1579        domain.reduction = mean
1580
1581        sww = get_dataobject(domain)
1582        sww.store_connectivity()
1583        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
1584        domain.set_quantity('stage', 10.0) # This is automatically limited
1585        # so it will not be less than the elevation
1586        domain.time = 2.
1587        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
1588
1589        # test the function
1590        points = [[5.0,1.],[0.5,2.]]
1591
1592        points_file = tempfile.mktemp(".csv")
1593#        points_file = 'test_point.csv'
1594        file_id = open(points_file,"w")
[4918]1595        file_id.write("name,easting,northing \n\
[4876]1596point1, 5.0, 1.0\n\
1597point2, 0.5, 2.0\n")
1598        file_id.close()
1599
[4910]1600        sww2csv_gauges(sww.filename, 
[4876]1601                            points_file,
[4935]1602                            quantities=['stage', 'elevation'],
[4876]1603                            use_cache=False,
1604                            verbose=False)
1605
1606        point1_answers_array = [[0.0,1.0,-5.0], [2.0,10.0,-5.0]]
1607        point1_filename = 'gauge_point1.csv'
1608        point1_handle = file(point1_filename)
1609        point1_reader = reader(point1_handle)
1610        point1_reader.next()
1611
1612        line=[]
1613        for i,row in enumerate(point1_reader):
1614#            print 'i',i,'row',row
1615            line.append([float(row[0]),float(row[1]),float(row[2])])
1616            #print 'line',line[i],'point1',point1_answers_array[i]
1617            assert allclose(line[i], point1_answers_array[i])
1618
1619        point2_answers_array = [[0.0,1.0,-0.5], [2.0,10.0,-0.5]]
1620        point2_filename = 'gauge_point2.csv' 
1621        point2_handle = file(point2_filename)
1622        point2_reader = reader(point2_handle)
1623        point2_reader.next()
1624                       
1625        line=[]
1626        for i,row in enumerate(point2_reader):
1627#            print 'i',i,'row',row
1628            line.append([float(row[0]),float(row[1]),float(row[2])])
1629#            print 'line',line[i],'point1',point1_answers_array[i]
1630            assert allclose(line[i], point2_answers_array[i])
1631                         
1632        # clean up
1633        point1_handle.close()
1634        point2_handle.close()
1635        #print "sww.filename",sww.filename
1636        os.remove(sww.filename)
1637        os.remove(points_file)
1638        os.remove(point1_filename)
1639        os.remove(point2_filename)
[4889]1640
[4936]1641
1642    def test_sww2csv_gauges2(self):
1643
1644        def elevation_function(x, y):
1645            return -x
1646       
[5181]1647        """Most of this test was copied from test_interpolate
1648        test_interpole_sww2csv
[4936]1649       
1650        This is testing the gauge_sww2csv function, by creating a sww file and
1651        then exporting the gauges and checking the results.
1652       
[5181]1653        This is the same as sww2csv_gauges except set domain.set_starttime to 5.
1654        Therefore testing the storing of the absolute time in the csv files
[4936]1655        """
1656       
[5181]1657        # Create mesh
[4936]1658        mesh_file = tempfile.mktemp(".tsh")   
1659        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
1660        m = Mesh()
1661        m.add_vertices(points)
1662        m.auto_segment()
1663        m.generate_mesh(verbose=False)
1664        m.export_mesh_file(mesh_file)
1665       
[5181]1666        # Create shallow water domain
[4936]1667        domain = Domain(mesh_file)
1668        os.remove(mesh_file)
1669       
1670        domain.default_order=2
1671
[5181]1672        # This test was made before tight_slope_limiters were introduced
1673        # Since were are testing interpolation values this is OK
1674        domain.tight_slope_limiters = 0         
1675
1676        # Set some field values
[4936]1677        domain.set_quantity('elevation', elevation_function)
1678        domain.set_quantity('friction', 0.03)
1679        domain.set_quantity('xmomentum', 3.0)
1680        domain.set_quantity('ymomentum', 4.0)
1681        domain.set_starttime(5)
1682
1683        ######################
1684        # Boundary conditions
1685        B = Transmissive_boundary(domain)
1686        domain.set_boundary( {'exterior': B})
1687
1688        # This call mangles the stage values.
1689        domain.distribute_to_vertices_and_edges()
1690        domain.set_quantity('stage', 1.0)
1691       
1692
1693
1694        domain.set_name('datatest' + str(time.time()))
1695        domain.format = 'sww'
1696        domain.smooth = True
1697        domain.reduction = mean
1698
1699        sww = get_dataobject(domain)
1700        sww.store_connectivity()
1701        sww.store_timestep(['stage', 'xmomentum', 'ymomentum','elevation'])
1702        domain.set_quantity('stage', 10.0) # This is automatically limited
1703        # so it will not be less than the elevation
1704        domain.time = 2.
1705        sww.store_timestep(['stage','elevation', 'xmomentum', 'ymomentum'])
1706
1707        # test the function
1708        points = [[5.0,1.],[0.5,2.]]
1709
1710        points_file = tempfile.mktemp(".csv")
1711#        points_file = 'test_point.csv'
1712        file_id = open(points_file,"w")
1713        file_id.write("name, easting, northing, elevation \n\
1714point1, 5.0, 1.0, 3.0\n\
1715point2, 0.5, 2.0, 9.0\n")
1716        file_id.close()
1717
1718       
1719        sww2csv_gauges(sww.filename, 
1720                            points_file,
1721                            verbose=False,
1722                            use_cache=False)
1723
1724#        point1_answers_array = [[0.0,1.0,-5.0,3.0,4.0], [2.0,10.0,-5.0,3.0,4.0]]
1725        point1_answers_array = [[5.0,1.0,6.0,-5.0,3.0,4.0], [7.0,10.0,15.0,-5.0,3.0,4.0]]
1726        point1_filename = 'gauge_point1.csv'
1727        point1_handle = file(point1_filename)
1728        point1_reader = reader(point1_handle)
1729        point1_reader.next()
1730
1731        line=[]
1732        for i,row in enumerate(point1_reader):
1733            #print 'i',i,'row',row
1734            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),float(row[4]),float(row[5])])
1735            #print 'assert line',line[i],'point1',point1_answers_array[i]
1736            assert allclose(line[i], point1_answers_array[i])
1737
1738        point2_answers_array = [[5.0,1.0,1.5,-0.5,3.0,4.0], [7.0,10.0,10.5,-0.5,3.0,4.0]]
1739        point2_filename = 'gauge_point2.csv' 
1740        point2_handle = file(point2_filename)
1741        point2_reader = reader(point2_handle)
1742        point2_reader.next()
1743                       
1744        line=[]
1745        for i,row in enumerate(point2_reader):
1746            #print 'i',i,'row',row
1747            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),float(row[4]),float(row[5])])
1748            #print 'assert line',line[i],'point1',point1_answers_array[i]
1749            assert allclose(line[i], point2_answers_array[i])
1750                         
1751        # clean up
1752        point1_handle.close()
1753        point2_handle.close()
1754        #print "sww.filename",sww.filename
1755        os.remove(sww.filename)
1756        os.remove(points_file)
1757        os.remove(point1_filename)
1758        os.remove(point2_filename)
1759
1760
[4889]1761    def test_greens_law(self):
1762
1763        from math import sqrt
[4876]1764       
[4889]1765        d1 = 80.0
1766        d2 = 20.0
1767        h1 = 1.0
1768        h2 = greens_law(d1,d2,h1)
1769
1770        assert h2==sqrt(2.0)
[4984]1771       
1772    def test_calc_bearings(self):
1773 
1774        from math import atan, degrees
1775        #Test East
1776        uh = 1
1777        vh = 1.e-15
1778        angle = calc_bearing(uh, vh)
1779        if 89 < angle < 91: v=1
1780        assert v==1
1781        #Test West
1782        uh = -1
1783        vh = 1.e-15
1784        angle = calc_bearing(uh, vh)
1785        if 269 < angle < 271: v=1
1786        assert v==1
1787        #Test North
1788        uh = 1.e-15
1789        vh = 1
1790        angle = calc_bearing(uh, vh)
1791        if -1 < angle < 1: v=1
1792        assert v==1
1793        #Test South
1794        uh = 1.e-15
1795        vh = -1
1796        angle = calc_bearing(uh, vh)
1797        if 179 < angle < 181: v=1
1798        assert v==1
1799        #Test South-East
1800        uh = 1
1801        vh = -1
1802        angle = calc_bearing(uh, vh)
1803        if 134 < angle < 136: v=1
1804        assert v==1
1805        #Test North-East
1806        uh = 1
1807        vh = 1
1808        angle = calc_bearing(uh, vh)
1809        if 44 < angle < 46: v=1
1810        assert v==1
1811        #Test South-West
1812        uh = -1
1813        vh = -1
1814        angle = calc_bearing(uh, vh)
1815        if 224 < angle < 226: v=1
1816        assert v==1
1817        #Test North-West
1818        uh = -1
1819        vh = 1
1820        angle = calc_bearing(uh, vh)
1821        if 314 < angle < 316: v=1
1822        assert v==1
1823
1824 
1825
1826
[4889]1827       
1828
[4486]1829#-------------------------------------------------------------
1830if __name__ == "__main__":
[4635]1831    suite = unittest.makeSuite(Test_Util,'test')
[4935]1832#    suite = unittest.makeSuite(Test_Util,'test_sww2csv')
[4876]1833#    runner = unittest.TextTestRunner(verbosity=2)
1834    runner = unittest.TextTestRunner(verbosity=1)
[4486]1835    runner.run(suite)
[4076]1836
[4468]1837
[4306]1838
[4480]1839
Note: See TracBrowser for help on using the repository browser.