source: anuga_core/source/anuga/abstract_2d_finite_volumes/test_util.py @ 4170

Last change on this file since 4170 was 4170, checked in by ole, 18 years ago

Implemented stored revision information for distributions and allowed get_revision_number to get that information from either stored file or svn as per ticket:125

File size: 40.7 KB
Line 
1#!/usr/bin/env python
2
3
4import unittest
5from Numeric import zeros, array, allclose, Float
6from math import sqrt, pi
7import tempfile, os
8from os import access, F_OK,sep, removedirs
9
10from anuga.abstract_2d_finite_volumes.util import *
11from anuga.config import epsilon
12from anuga.shallow_water.data_manager import timefile2netcdf
13
14from anuga.utilities.numerical_tools import NAN
15
16from sys import platform
17
18def test_function(x, y):
19    return x+y
20
21class Test_Util(unittest.TestCase):
22    def setUp(self):
23        pass
24
25    def tearDown(self):
26        pass
27
28
29
30
31    #Geometric
32    #def test_distance(self):
33    #    from anuga.abstract_2d_finite_volumes.util import distance#
34    #
35    #    self.failUnless( distance([4,2],[7,6]) == 5.0,
36    #                     'Distance is wrong!')
37    #    self.failUnless( allclose(distance([7,6],[9,8]), 2.82842712475),
38    #                    'distance is wrong!')
39    #    self.failUnless( allclose(distance([9,8],[4,2]), 7.81024967591),
40    #                    'distance is wrong!')
41    #
42    #    self.failUnless( distance([9,8],[4,2]) == distance([4,2],[9,8]),
43    #                    'distance is wrong!')
44
45
46    def test_file_function_time1(self):
47        """Test that File function interpolates correctly
48        between given times. No x,y dependency here.
49        """
50
51        #Write file
52        import os, time
53        from anuga.config import time_format
54        from math import sin, pi
55
56        #Typical ASCII file
57        finaltime = 1200
58        filename = 'test_file_function'
59        fid = open(filename + '.txt', 'w')
60        start = time.mktime(time.strptime('2000', '%Y'))
61        dt = 60  #One minute intervals
62        t = 0.0
63        while t <= finaltime:
64            t_string = time.strftime(time_format, time.gmtime(t+start))
65            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
66            t += dt
67
68        fid.close()
69
70        #Convert ASCII file to NetCDF (Which is what we really like!)
71        timefile2netcdf(filename)
72
73
74        #Create file function from time series
75        F = file_function(filename + '.tms',
76                          quantities = ['Attribute0',
77                                        'Attribute1',
78                                        'Attribute2'])
79       
80        #Now try interpolation
81        for i in range(20):
82            t = i*10
83            q = F(t)
84
85            #Exact linear intpolation
86            assert allclose(q[0], 2*t)
87            if i%6 == 0:
88                assert allclose(q[1], t**2)
89                assert allclose(q[2], sin(t*pi/600))
90
91        #Check non-exact
92
93        t = 90 #Halfway between 60 and 120
94        q = F(t)
95        assert allclose( (120**2 + 60**2)/2, q[1] )
96        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
97
98
99        t = 100 #Two thirds of the way between between 60 and 120
100        q = F(t)
101        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
102        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
103
104        os.remove(filename + '.txt')
105        os.remove(filename + '.tms')       
106
107
108       
109    def test_spatio_temporal_file_function_basic(self):
110        """Test that spatio temporal file function performs the correct
111        interpolations in both time and space
112        NetCDF version (x,y,t dependency)       
113        """
114        import time
115
116        #Create sww file of simple propagation from left to right
117        #through rectangular domain
118        from shallow_water import Domain, Dirichlet_boundary
119        from mesh_factory import rectangular
120        from Numeric import take, concatenate, reshape
121
122        #Create basic mesh and shallow water domain
123        points, vertices, boundary = rectangular(3, 3)
124        domain1 = Domain(points, vertices, boundary)
125
126        from anuga.utilities.numerical_tools import mean
127        domain1.reduction = mean
128        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
129                              # only one value.
130
131        domain1.default_order = 2
132        domain1.store = True
133        domain1.set_datadir('.')
134        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
135        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
136
137        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
138        domain1.set_quantity('elevation', 0)
139        domain1.set_quantity('friction', 0)
140        domain1.set_quantity('stage', 0)
141
142        # Boundary conditions
143        B0 = Dirichlet_boundary([0,0,0])
144        B6 = Dirichlet_boundary([0.6,0,0])
145        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
146        domain1.check_integrity()
147
148        finaltime = 8
149        #Evolution
150        t0 = -1
151        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
152            #print 'Timesteps: %.16f, %.16f' %(t0, t)
153            #if t == t0:
154            #    msg = 'Duplicate timestep found: %f, %f' %(t0, t)
155            #   raise msg
156            t0 = t
157             
158            #domain1.write_time()
159
160
161        #Now read data from sww and check
162        from Scientific.IO.NetCDF import NetCDFFile
163        filename = domain1.get_name() + '.' + domain1.format
164        fid = NetCDFFile(filename)
165
166        x = fid.variables['x'][:]
167        y = fid.variables['y'][:]
168        stage = fid.variables['stage'][:]
169        xmomentum = fid.variables['xmomentum'][:]
170        ymomentum = fid.variables['ymomentum'][:]
171        time = fid.variables['time'][:]
172
173        #Take stage vertex values at last timestep on diagonal
174        #Diagonal is identified by vertices: 0, 5, 10, 15
175
176        last_time_index = len(time)-1 #Last last_time_index
177        d_stage = reshape(take(stage[last_time_index, :], [0,5,10,15]), (4,1))
178        d_uh = reshape(take(xmomentum[last_time_index, :], [0,5,10,15]), (4,1))
179        d_vh = reshape(take(ymomentum[last_time_index, :], [0,5,10,15]), (4,1))
180        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
181
182        #Reference interpolated values at midpoints on diagonal at
183        #this timestep are
184        r0 = (D[0] + D[1])/2
185        r1 = (D[1] + D[2])/2
186        r2 = (D[2] + D[3])/2
187
188        #And the midpoints are found now
189        Dx = take(reshape(x, (16,1)), [0,5,10,15])
190        Dy = take(reshape(y, (16,1)), [0,5,10,15])
191
192        diag = concatenate( (Dx, Dy), axis=1)
193        d_midpoints = (diag[1:] + diag[:-1])/2
194
195        #Let us see if the file function can find the correct
196        #values at the midpoints at the last timestep:
197        f = file_function(filename, domain1,
198                          interpolation_points = d_midpoints)
199
200        T = f.get_time()
201        msg = 'duplicate timesteps: %.16f and %.16f' %(T[-1], T[-2])
202        assert not T[-1] == T[-2], msg
203        t = time[last_time_index]
204        q = f(t, point_id=0); assert allclose(r0, q)
205        q = f(t, point_id=1); assert allclose(r1, q)
206        q = f(t, point_id=2); assert allclose(r2, q)
207
208
209        ##################
210        #Now do the same for the first timestep
211
212        timestep = 0 #First timestep
213        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
214        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
215        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
216        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
217
218        #Reference interpolated values at midpoints on diagonal at
219        #this timestep are
220        r0 = (D[0] + D[1])/2
221        r1 = (D[1] + D[2])/2
222        r2 = (D[2] + D[3])/2
223
224        #Let us see if the file function can find the correct
225        #values
226        q = f(0, point_id=0); assert allclose(r0, q)
227        q = f(0, point_id=1); assert allclose(r1, q)
228        q = f(0, point_id=2); assert allclose(r2, q)
229
230
231        ##################
232        #Now do it again for a timestep in the middle
233
234        timestep = 33
235        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
236        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
237        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
238        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
239
240        #Reference interpolated values at midpoints on diagonal at
241        #this timestep are
242        r0 = (D[0] + D[1])/2
243        r1 = (D[1] + D[2])/2
244        r2 = (D[2] + D[3])/2
245
246        q = f(timestep/10., point_id=0); assert allclose(r0, q)
247        q = f(timestep/10., point_id=1); assert allclose(r1, q)
248        q = f(timestep/10., point_id=2); assert allclose(r2, q)
249
250
251        ##################
252        #Now check temporal interpolation
253        #Halfway between timestep 15 and 16
254
255        timestep = 15
256        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
257        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
258        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
259        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
260
261        #Reference interpolated values at midpoints on diagonal at
262        #this timestep are
263        r0_0 = (D[0] + D[1])/2
264        r1_0 = (D[1] + D[2])/2
265        r2_0 = (D[2] + D[3])/2
266
267        #
268        timestep = 16
269        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
270        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
271        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
272        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
273
274        #Reference interpolated values at midpoints on diagonal at
275        #this timestep are
276        r0_1 = (D[0] + D[1])/2
277        r1_1 = (D[1] + D[2])/2
278        r2_1 = (D[2] + D[3])/2
279
280        # The reference values are
281        r0 = (r0_0 + r0_1)/2
282        r1 = (r1_0 + r1_1)/2
283        r2 = (r2_0 + r2_1)/2
284
285        q = f((timestep - 0.5)/10., point_id=0); assert allclose(r0, q)
286        q = f((timestep - 0.5)/10., point_id=1); assert allclose(r1, q)
287        q = f((timestep - 0.5)/10., point_id=2); assert allclose(r2, q)
288
289        ##################
290        #Finally check interpolation 2 thirds of the way
291        #between timestep 15 and 16
292
293        # The reference values are
294        r0 = (r0_0 + 2*r0_1)/3
295        r1 = (r1_0 + 2*r1_1)/3
296        r2 = (r2_0 + 2*r2_1)/3
297
298        #And the file function gives
299        q = f((timestep - 1.0/3)/10., point_id=0); assert allclose(r0, q)
300        q = f((timestep - 1.0/3)/10., point_id=1); assert allclose(r1, q)
301        q = f((timestep - 1.0/3)/10., point_id=2); assert allclose(r2, q)
302
303        fid.close()
304        import os
305        os.remove(filename)
306
307
308
309    def test_spatio_temporal_file_function_different_origin(self):
310        """Test that spatio temporal file function performs the correct
311        interpolations in both time and space where space is offset by
312        xllcorner and yllcorner
313        NetCDF version (x,y,t dependency)       
314        """
315        import time
316
317        #Create sww file of simple propagation from left to right
318        #through rectangular domain
319        from shallow_water import Domain, Dirichlet_boundary
320        from mesh_factory import rectangular
321        from Numeric import take, concatenate, reshape
322
323
324        from anuga.coordinate_transforms.geo_reference import Geo_reference
325        xllcorner = 2048
326        yllcorner = 11000
327        zone = 2
328
329        #Create basic mesh and shallow water domain
330        points, vertices, boundary = rectangular(3, 3)
331        domain1 = Domain(points, vertices, boundary,
332                         geo_reference = Geo_reference(xllcorner = xllcorner,
333                                                       yllcorner = yllcorner))
334       
335
336        from anuga.utilities.numerical_tools import mean       
337        domain1.reduction = mean
338        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
339                              # only one value.
340
341        domain1.default_order = 2
342        domain1.store = True
343        domain1.set_datadir('.')
344        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
345        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
346
347        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
348        domain1.set_quantity('elevation', 0)
349        domain1.set_quantity('friction', 0)
350        domain1.set_quantity('stage', 0)
351
352        # Boundary conditions
353        B0 = Dirichlet_boundary([0,0,0])
354        B6 = Dirichlet_boundary([0.6,0,0])
355        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
356        domain1.check_integrity()
357
358        finaltime = 8
359        #Evolution
360        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
361            pass
362            #domain1.write_time()
363
364
365        #Now read data from sww and check
366        from Scientific.IO.NetCDF import NetCDFFile
367        filename = domain1.get_name() + '.' + domain1.format
368        fid = NetCDFFile(filename)
369
370        x = fid.variables['x'][:]
371        y = fid.variables['y'][:]
372        stage = fid.variables['stage'][:]
373        xmomentum = fid.variables['xmomentum'][:]
374        ymomentum = fid.variables['ymomentum'][:]
375        time = fid.variables['time'][:]
376
377        #Take stage vertex values at last timestep on diagonal
378        #Diagonal is identified by vertices: 0, 5, 10, 15
379
380        last_time_index = len(time)-1 #Last last_time_index     
381        d_stage = reshape(take(stage[last_time_index, :], [0,5,10,15]), (4,1))
382        d_uh = reshape(take(xmomentum[last_time_index, :], [0,5,10,15]), (4,1))
383        d_vh = reshape(take(ymomentum[last_time_index, :], [0,5,10,15]), (4,1))
384        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
385
386        #Reference interpolated values at midpoints on diagonal at
387        #this timestep are
388        r0 = (D[0] + D[1])/2
389        r1 = (D[1] + D[2])/2
390        r2 = (D[2] + D[3])/2
391
392        #And the midpoints are found now
393        Dx = take(reshape(x, (16,1)), [0,5,10,15])
394        Dy = take(reshape(y, (16,1)), [0,5,10,15])
395
396        diag = concatenate( (Dx, Dy), axis=1)
397        d_midpoints = (diag[1:] + diag[:-1])/2
398
399
400        #Adjust for georef - make interpolation points absolute
401        d_midpoints[:,0] += xllcorner
402        d_midpoints[:,1] += yllcorner               
403
404        #Let us see if the file function can find the correct
405        #values at the midpoints at the last timestep:
406        f = file_function(filename, domain1,
407                          interpolation_points = d_midpoints)
408
409        t = time[last_time_index]                         
410        q = f(t, point_id=0); assert allclose(r0, q)
411        q = f(t, point_id=1); assert allclose(r1, q)
412        q = f(t, point_id=2); assert allclose(r2, q)
413
414
415        ##################
416        #Now do the same for the first timestep
417
418        timestep = 0 #First timestep
419        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
420        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
421        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
422        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
423
424        #Reference interpolated values at midpoints on diagonal at
425        #this timestep are
426        r0 = (D[0] + D[1])/2
427        r1 = (D[1] + D[2])/2
428        r2 = (D[2] + D[3])/2
429
430        #Let us see if the file function can find the correct
431        #values
432        q = f(0, point_id=0); assert allclose(r0, q)
433        q = f(0, point_id=1); assert allclose(r1, q)
434        q = f(0, point_id=2); assert allclose(r2, q)
435
436
437        ##################
438        #Now do it again for a timestep in the middle
439
440        timestep = 33
441        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
442        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
443        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
444        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
445
446        #Reference interpolated values at midpoints on diagonal at
447        #this timestep are
448        r0 = (D[0] + D[1])/2
449        r1 = (D[1] + D[2])/2
450        r2 = (D[2] + D[3])/2
451
452        q = f(timestep/10., point_id=0); assert allclose(r0, q)
453        q = f(timestep/10., point_id=1); assert allclose(r1, q)
454        q = f(timestep/10., point_id=2); assert allclose(r2, q)
455
456
457        ##################
458        #Now check temporal interpolation
459        #Halfway between timestep 15 and 16
460
461        timestep = 15
462        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
463        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
464        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
465        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
466
467        #Reference interpolated values at midpoints on diagonal at
468        #this timestep are
469        r0_0 = (D[0] + D[1])/2
470        r1_0 = (D[1] + D[2])/2
471        r2_0 = (D[2] + D[3])/2
472
473        #
474        timestep = 16
475        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
476        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
477        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
478        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
479
480        #Reference interpolated values at midpoints on diagonal at
481        #this timestep are
482        r0_1 = (D[0] + D[1])/2
483        r1_1 = (D[1] + D[2])/2
484        r2_1 = (D[2] + D[3])/2
485
486        # The reference values are
487        r0 = (r0_0 + r0_1)/2
488        r1 = (r1_0 + r1_1)/2
489        r2 = (r2_0 + r2_1)/2
490
491        q = f((timestep - 0.5)/10., point_id=0); assert allclose(r0, q)
492        q = f((timestep - 0.5)/10., point_id=1); assert allclose(r1, q)
493        q = f((timestep - 0.5)/10., point_id=2); assert allclose(r2, q)
494
495        ##################
496        #Finally check interpolation 2 thirds of the way
497        #between timestep 15 and 16
498
499        # The reference values are
500        r0 = (r0_0 + 2*r0_1)/3
501        r1 = (r1_0 + 2*r1_1)/3
502        r2 = (r2_0 + 2*r2_1)/3
503
504        #And the file function gives
505        q = f((timestep - 1.0/3)/10., point_id=0); assert allclose(r0, q)
506        q = f((timestep - 1.0/3)/10., point_id=1); assert allclose(r1, q)
507        q = f((timestep - 1.0/3)/10., point_id=2); assert allclose(r2, q)
508
509        fid.close()
510        import os
511        os.remove(filename)
512
513       
514
515
516    def qtest_spatio_temporal_file_function_time(self):
517        """Test that File function interpolates correctly
518        between given times.
519        NetCDF version (x,y,t dependency)
520        """
521
522        #Create NetCDF (sww) file to be read
523        # x: 0, 5, 10, 15
524        # y: -20, -10, 0, 10
525        # t: 0, 60, 120, ...., 1200
526        #
527        # test quantities (arbitrary but non-trivial expressions):
528        #
529        #   stage     = 3*x - y**2 + 2*t
530        #   xmomentum = exp( -((x-7)**2 + (y+5)**2)/20 ) * t**2
531        #   ymomentum = x**2 + y**2 * sin(t*pi/600)
532
533        #NOTE: Nice test that may render some of the others redundant.
534
535        import os, time
536        from anuga.config import time_format
537        from Numeric import sin, pi, exp
538        from mesh_factory import rectangular
539        from shallow_water import Domain
540        import anuga.shallow_water.data_manager
541
542        finaltime = 1200
543        filename = 'test_file_function'
544
545        #Create a domain to hold test grid
546        #(0:15, -20:10)
547        points, vertices, boundary =\
548                rectangular(4, 4, 15, 30, origin = (0, -20))
549        print "points", points
550
551        #print 'Number of elements', len(vertices)
552        domain = Domain(points, vertices, boundary)
553        domain.smooth = False
554        domain.default_order = 2
555        domain.set_datadir('.')
556        domain.set_name(filename)
557        domain.store = True
558        domain.format = 'sww'   #Native netcdf visualisation format
559
560        #print points
561        start = time.mktime(time.strptime('2000', '%Y'))
562        domain.starttime = start
563
564
565        #Store structure
566        domain.initialise_storage()
567
568        #Compute artificial time steps and store
569        dt = 60  #One minute intervals
570        t = 0.0
571        while t <= finaltime:
572            #Compute quantities
573            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
574            domain.set_quantity('stage', f1)
575
576            f2 = lambda x,y: x+y+t**2
577            domain.set_quantity('xmomentum', f2)
578
579            f3 = lambda x,y: x**2 + y**2 * sin(t*pi/600)
580            domain.set_quantity('ymomentum', f3)
581
582            #Store and advance time
583            domain.time = t
584            domain.store_timestep(domain.conserved_quantities)
585            t += dt
586
587
588        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5]]
589     
590        #Deliberately set domain.starttime to too early
591        domain.starttime = start - 1
592
593        #Create file function
594        F = file_function(filename + '.sww', domain,
595                          quantities = domain.conserved_quantities,
596                          interpolation_points = interpolation_points)
597
598        #Check that FF updates fixes domain starttime
599        assert allclose(domain.starttime, start)
600
601        #Check that domain.starttime isn't updated if later
602        domain.starttime = start + 1
603        F = file_function(filename + '.sww', domain,
604                          quantities = domain.conserved_quantities,
605                          interpolation_points = interpolation_points)
606        assert allclose(domain.starttime, start+1)
607        domain.starttime = start
608
609
610        #Check linear interpolation in time
611        F = file_function(filename + '.sww', domain,
612                          quantities = domain.conserved_quantities,
613                          interpolation_points = interpolation_points)               
614        for id in range(len(interpolation_points)):
615            x = interpolation_points[id][0]
616            y = interpolation_points[id][1]
617
618            for i in range(20):
619                t = i*10
620                k = i%6
621
622                if k == 0:
623                    q0 = F(t, point_id=id)
624                    q1 = F(t+60, point_id=id)
625
626                if q0 == NAN:
627                    actual = q0
628                else:
629                    actual = (k*q1 + (6-k)*q0)/6
630                q = F(t, point_id=id)
631                #print i, k, t, q
632                #print ' ', q0
633                #print ' ', q1
634                print "q",q
635                print "actual", actual
636                #print
637                if q0 == NAN:
638                     self.failUnless( q == actual, 'Fail!')
639                else:
640                    assert allclose(q, actual)
641
642
643        #Another check of linear interpolation in time
644        for id in range(len(interpolation_points)):
645            q60 = F(60, point_id=id)
646            q120 = F(120, point_id=id)
647
648            t = 90 #Halfway between 60 and 120
649            q = F(t, point_id=id)
650            assert allclose( (q120+q60)/2, q )
651
652            t = 100 #Two thirds of the way between between 60 and 120
653            q = F(t, point_id=id)
654            assert allclose(q60/3 + 2*q120/3, q)
655
656
657
658        #Check that domain.starttime isn't updated if later than file starttime but earlier
659        #than file end time
660        delta = 23
661        domain.starttime = start + delta
662        F = file_function(filename + '.sww', domain,
663                          quantities = domain.conserved_quantities,
664                          interpolation_points = interpolation_points)
665        assert allclose(domain.starttime, start+delta)
666
667
668
669
670        #Now try interpolation with delta offset
671        for id in range(len(interpolation_points)):           
672            x = interpolation_points[id][0]
673            y = interpolation_points[id][1]
674
675            for i in range(20):
676                t = i*10
677                k = i%6
678
679                if k == 0:
680                    q0 = F(t-delta, point_id=id)
681                    q1 = F(t+60-delta, point_id=id)
682
683                q = F(t-delta, point_id=id)
684                assert allclose(q, (k*q1 + (6-k)*q0)/6)
685
686
687        os.remove(filename + '.sww')
688
689
690
691    def xtest_spatio_temporal_file_function_time(self):
692        # Test that File function interpolates correctly
693        # When some points are outside the mesh
694
695        import os, time
696        from anuga.config import time_format
697        from Numeric import sin, pi, exp
698        from mesh_factory import rectangular
699        from shallow_water import Domain
700        import anuga.shallow_water.data_manager 
701        from anuga.pmesh.mesh_interface import create_mesh_from_regions
702        finaltime = 1200
703       
704        filename = tempfile.mktemp()
705        #print "filename",filename
706        filename = 'test_file_function'
707
708        meshfilename = tempfile.mktemp(".tsh")
709
710        boundary_tags = {'walls':[0,1],'bom':[2]}
711       
712        polygon_absolute = [[0,-20],[10,-20],[10,15],[-20,15]]
713       
714        create_mesh_from_regions(polygon_absolute,
715                                 boundary_tags,
716                                 10000000,
717                                 filename=meshfilename)
718        domain = Domain(mesh_filename=meshfilename)
719        domain.smooth = False
720        domain.default_order = 2
721        domain.set_datadir('.')
722        domain.set_name(filename)
723        domain.store = True
724        domain.format = 'sww'   #Native netcdf visualisation format
725
726        #print points
727        start = time.mktime(time.strptime('2000', '%Y'))
728        domain.starttime = start
729       
730
731        #Store structure
732        domain.initialise_storage()
733
734        #Compute artificial time steps and store
735        dt = 60  #One minute intervals
736        t = 0.0
737        while t <= finaltime:
738            #Compute quantities
739            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
740            domain.set_quantity('stage', f1)
741
742            f2 = lambda x,y: x+y+t**2
743            domain.set_quantity('xmomentum', f2)
744
745            f3 = lambda x,y: x**2 + y**2 * sin(t*pi/600)
746            domain.set_quantity('ymomentum', f3)
747
748            #Store and advance time
749            domain.time = t
750            domain.store_timestep(domain.conserved_quantities)
751            t += dt
752
753        interpolation_points = [[1,0]]
754        interpolation_points = [[100,1000]]
755       
756        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5],
757                                [78787,78787],[7878,3432]]
758           
759        #Deliberately set domain.starttime to too early
760        domain.starttime = start - 1
761
762        #Create file function
763        F = file_function(filename + '.sww', domain,
764                          quantities = domain.conserved_quantities,
765                          interpolation_points = interpolation_points)
766
767        #Check that FF updates fixes domain starttime
768        assert allclose(domain.starttime, start)
769
770        #Check that domain.starttime isn't updated if later
771        domain.starttime = start + 1
772        F = file_function(filename + '.sww', domain,
773                          quantities = domain.conserved_quantities,
774                          interpolation_points = interpolation_points)
775        assert allclose(domain.starttime, start+1)
776        domain.starttime = start
777
778
779        #Check linear interpolation in time
780        # checking points inside and outside the mesh
781        F = file_function(filename + '.sww', domain,
782                          quantities = domain.conserved_quantities,
783                          interpolation_points = interpolation_points)
784       
785        for id in range(len(interpolation_points)):
786            x = interpolation_points[id][0]
787            y = interpolation_points[id][1]
788
789            for i in range(20):
790                t = i*10
791                k = i%6
792
793                if k == 0:
794                    q0 = F(t, point_id=id)
795                    q1 = F(t+60, point_id=id)
796
797                if q0 == NAN:
798                    actual = q0
799                else:
800                    actual = (k*q1 + (6-k)*q0)/6
801                q = F(t, point_id=id)
802                #print i, k, t, q
803                #print ' ', q0
804                #print ' ', q1
805                #print "q",q
806                #print "actual", actual
807                #print
808                if q0 == NAN:
809                     self.failUnless( q == actual, 'Fail!')
810                else:
811                    assert allclose(q, actual)
812
813        # now lets check points inside the mesh
814        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14]] #, [10,-12.5]] - this point doesn't work WHY?
815        interpolation_points = [[10,-12.5]]
816           
817        print "len(interpolation_points)",len(interpolation_points) 
818        F = file_function(filename + '.sww', domain,
819                          quantities = domain.conserved_quantities,
820                          interpolation_points = interpolation_points)
821
822        domain.starttime = start
823
824
825        #Check linear interpolation in time
826        F = file_function(filename + '.sww', domain,
827                          quantities = domain.conserved_quantities,
828                          interpolation_points = interpolation_points)               
829        for id in range(len(interpolation_points)):
830            x = interpolation_points[id][0]
831            y = interpolation_points[id][1]
832
833            for i in range(20):
834                t = i*10
835                k = i%6
836
837                if k == 0:
838                    q0 = F(t, point_id=id)
839                    q1 = F(t+60, point_id=id)
840
841                if q0 == NAN:
842                    actual = q0
843                else:
844                    actual = (k*q1 + (6-k)*q0)/6
845                q = F(t, point_id=id)
846                print "############"
847                print "id, x, y ", id, x, y #k, t, q
848                print "t", t
849                #print ' ', q0
850                #print ' ', q1
851                print "q",q
852                print "actual", actual
853                #print
854                if q0 == NAN:
855                     self.failUnless( q == actual, 'Fail!')
856                else:
857                    assert allclose(q, actual)
858
859
860        #Another check of linear interpolation in time
861        for id in range(len(interpolation_points)):
862            q60 = F(60, point_id=id)
863            q120 = F(120, point_id=id)
864
865            t = 90 #Halfway between 60 and 120
866            q = F(t, point_id=id)
867            assert allclose( (q120+q60)/2, q )
868
869            t = 100 #Two thirds of the way between between 60 and 120
870            q = F(t, point_id=id)
871            assert allclose(q60/3 + 2*q120/3, q)
872
873
874
875        #Check that domain.starttime isn't updated if later than file starttime but earlier
876        #than file end time
877        delta = 23
878        domain.starttime = start + delta
879        F = file_function(filename + '.sww', domain,
880                          quantities = domain.conserved_quantities,
881                          interpolation_points = interpolation_points)
882        assert allclose(domain.starttime, start+delta)
883
884
885
886
887        #Now try interpolation with delta offset
888        for id in range(len(interpolation_points)):           
889            x = interpolation_points[id][0]
890            y = interpolation_points[id][1]
891
892            for i in range(20):
893                t = i*10
894                k = i%6
895
896                if k == 0:
897                    q0 = F(t-delta, point_id=id)
898                    q1 = F(t+60-delta, point_id=id)
899
900                q = F(t-delta, point_id=id)
901                assert allclose(q, (k*q1 + (6-k)*q0)/6)
902
903
904        os.remove(filename + '.sww')
905
906    def test_file_function_time_with_domain(self):
907        """Test that File function interpolates correctly
908        between given times. No x,y dependency here.
909        Use domain with starttime
910        """
911
912        #Write file
913        import os, time, calendar
914        from anuga.config import time_format
915        from math import sin, pi
916        from domain import Domain
917
918        finaltime = 1200
919        filename = 'test_file_function'
920        fid = open(filename + '.txt', 'w')
921        start = time.mktime(time.strptime('2000', '%Y'))
922        dt = 60  #One minute intervals
923        t = 0.0
924        while t <= finaltime:
925            t_string = time.strftime(time_format, time.gmtime(t+start))
926            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
927            t += dt
928
929        fid.close()
930
931
932        #Convert ASCII file to NetCDF (Which is what we really like!)
933        timefile2netcdf(filename)
934
935
936
937        a = [0.0, 0.0]
938        b = [4.0, 0.0]
939        c = [0.0, 3.0]
940
941        points = [a, b, c]
942        vertices = [[0,1,2]]
943        domain = Domain(points, vertices)
944
945        #Check that domain.starttime is updated if non-existing
946        F = file_function(filename + '.tms', domain)
947       
948        assert allclose(domain.starttime, start)
949
950        #Check that domain.starttime is updated if too early
951        domain.starttime = start - 1
952        F = file_function(filename + '.tms', domain)
953        assert allclose(domain.starttime, start)
954
955        #Check that domain.starttime isn't updated if later
956        domain.starttime = start + 1
957        F = file_function(filename + '.tms', domain)       
958        assert allclose(domain.starttime, start+1)
959
960        domain.starttime = start
961        F = file_function(filename + '.tms', domain,
962                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])
963       
964
965        #print F.T
966        #print F.precomputed_values
967        #print 'F(60)', F(60)
968       
969        #Now try interpolation
970        for i in range(20):
971            t = i*10
972            q = F(t)
973
974            #Exact linear intpolation
975            assert allclose(q[0], 2*t)
976            if i%6 == 0:
977                assert allclose(q[1], t**2)
978                assert allclose(q[2], sin(t*pi/600))
979
980        #Check non-exact
981
982        t = 90 #Halfway between 60 and 120
983        q = F(t)
984        assert allclose( (120**2 + 60**2)/2, q[1] )
985        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
986
987
988        t = 100 #Two thirds of the way between between 60 and 120
989        q = F(t)
990        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
991        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
992
993        os.remove(filename + '.tms')
994        os.remove(filename + '.txt')       
995
996    def test_file_function_time_with_domain_different_start(self):
997        """Test that File function interpolates correctly
998        between given times. No x,y dependency here.
999        Use domain with a starttime later than that of file
1000
1001        ASCII version
1002        """
1003
1004        #Write file
1005        import os, time, calendar
1006        from anuga.config import time_format
1007        from math import sin, pi
1008        from domain import Domain
1009
1010        finaltime = 1200
1011        filename = 'test_file_function'
1012        fid = open(filename + '.txt', 'w')
1013        start = time.mktime(time.strptime('2000', '%Y'))
1014        dt = 60  #One minute intervals
1015        t = 0.0
1016        while t <= finaltime:
1017            t_string = time.strftime(time_format, time.gmtime(t+start))
1018            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
1019            t += dt
1020
1021        fid.close()
1022
1023        #Convert ASCII file to NetCDF (Which is what we really like!)
1024        timefile2netcdf(filename)       
1025
1026        a = [0.0, 0.0]
1027        b = [4.0, 0.0]
1028        c = [0.0, 3.0]
1029
1030        points = [a, b, c]
1031        vertices = [[0,1,2]]
1032        domain = Domain(points, vertices)
1033
1034        #Check that domain.starttime isn't updated if later than file starttime but earlier
1035        #than file end time
1036        delta = 23
1037        domain.starttime = start + delta
1038        F = file_function(filename + '.tms', domain,
1039                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])       
1040        assert allclose(domain.starttime, start+delta)
1041
1042
1043
1044
1045        #Now try interpolation with delta offset
1046        for i in range(20):
1047            t = i*10
1048            q = F(t-delta)
1049
1050            #Exact linear intpolation
1051            assert allclose(q[0], 2*t)
1052            if i%6 == 0:
1053                assert allclose(q[1], t**2)
1054                assert allclose(q[2], sin(t*pi/600))
1055
1056        #Check non-exact
1057
1058        t = 90 #Halfway between 60 and 120
1059        q = F(t-delta)
1060        assert allclose( (120**2 + 60**2)/2, q[1] )
1061        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
1062
1063
1064        t = 100 #Two thirds of the way between between 60 and 120
1065        q = F(t-delta)
1066        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
1067        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
1068
1069
1070        os.remove(filename + '.tms')
1071        os.remove(filename + '.txt')               
1072
1073
1074
1075    def test_apply_expression_to_dictionary(self):
1076
1077        #FIXME: Division is not expected to work for integers.
1078        #This must be caught.
1079        foo = array([[1,2,3],
1080                     [4,5,6]], Float)
1081
1082        bar = array([[-1,0,5],
1083                     [6,1,1]], Float)                 
1084
1085        D = {'X': foo, 'Y': bar}
1086
1087        Z = apply_expression_to_dictionary('X+Y', D)       
1088        assert allclose(Z, foo+bar)
1089
1090        Z = apply_expression_to_dictionary('X*Y', D)       
1091        assert allclose(Z, foo*bar)       
1092
1093        Z = apply_expression_to_dictionary('4*X+Y', D)       
1094        assert allclose(Z, 4*foo+bar)       
1095
1096        # test zero division is OK
1097        Z = apply_expression_to_dictionary('X/Y', D)
1098        assert allclose(1/Z, 1/(foo/bar)) # can't compare inf to inf
1099
1100        # make an error for zero on zero
1101        # this is really an error in Numeric, SciPy core can handle it
1102        # Z = apply_expression_to_dictionary('0/Y', D)
1103
1104        #Check exceptions
1105        try:
1106            #Wrong name
1107            Z = apply_expression_to_dictionary('4*X+A', D)       
1108        except NameError:
1109            pass
1110        else:
1111            msg = 'Should have raised a NameError Exception'
1112            raise msg
1113
1114
1115        try:
1116            #Wrong order
1117            Z = apply_expression_to_dictionary(D, '4*X+A')       
1118        except AssertionError:
1119            pass
1120        else:
1121            msg = 'Should have raised a AssertionError Exception'
1122            raise msg       
1123       
1124
1125    def test_multiple_replace(self):
1126        """Hard test that checks a true word-by-word simultaneous replace
1127        """
1128       
1129        D = {'x': 'xi', 'y': 'eta', 'xi':'lam'}
1130        exp = '3*x+y + xi'
1131       
1132        new = multiple_replace(exp, D)
1133       
1134        assert new == '3*xi+eta + lam'
1135                         
1136
1137
1138    def test_point_on_line_obsolete(self):
1139        """Test that obsolete call issues appropriate warning"""
1140
1141        #Turn warning into an exception
1142        import warnings
1143        warnings.filterwarnings('error')
1144
1145        try:
1146            assert point_on_line( 0, 0.5, 0,1, 0,0 )
1147        except DeprecationWarning:
1148            pass
1149        else:
1150            msg = 'point_on_line should have issued a DeprecationWarning'
1151            raise Exception(msg)   
1152
1153        warnings.resetwarnings()
1154   
1155    def test_get_revision_number(self):
1156        """test_get_revision_number(self):
1157
1158        Test that revision number can be retrieved.
1159        """
1160       
1161        n = get_revision_number()
1162        assert n>=0
1163
1164
1165       
1166    def test_add_directories(self):
1167       
1168        import tempfile
1169        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
1170        directories = ['ja','ne','ke']
1171        kens_dir = add_directories(root_dir, directories)
1172        assert kens_dir == root_dir + sep + 'ja' + sep + 'ne' + \
1173               sep + 'ke'
1174        assert access(root_dir,F_OK)
1175
1176        add_directories(root_dir, directories)
1177        assert access(root_dir,F_OK)
1178       
1179        #clean up!
1180        os.rmdir(kens_dir)
1181        os.rmdir(root_dir + sep + 'ja' + sep + 'ne')
1182        os.rmdir(root_dir + sep + 'ja')
1183        os.rmdir(root_dir)
1184
1185    def test_add_directories_bad(self):
1186       
1187        import tempfile
1188        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
1189        directories = ['/\/!@#@#$%^%&*((*:*:','ne','ke']
1190       
1191        try:
1192            kens_dir = add_directories(root_dir, directories)
1193        except OSError:
1194            pass
1195        else:
1196            msg = 'bad dir name should give OSError'
1197            raise Exception(msg)   
1198           
1199        #clean up!
1200        os.rmdir(root_dir)
1201
1202    def test_check_list(self):
1203
1204        check_list(['stage','xmomentum'])
1205
1206       
1207    def test_add_directories(self):
1208       
1209        import tempfile
1210        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
1211        directories = ['ja','ne','ke']
1212        kens_dir = add_directories(root_dir, directories)
1213        assert kens_dir == root_dir + sep + 'ja' + sep + 'ne' + \
1214               sep + 'ke'
1215        assert access(root_dir,F_OK)
1216
1217        add_directories(root_dir, directories)
1218        assert access(root_dir,F_OK)
1219       
1220        #clean up!
1221        os.rmdir(kens_dir)
1222        os.rmdir(root_dir + sep + 'ja' + sep + 'ne')
1223        os.rmdir(root_dir + sep + 'ja')
1224        os.rmdir(root_dir)
1225
1226    def test_add_directories_bad(self):
1227       
1228        import tempfile
1229        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
1230        directories = ['/\/!@#@#$%^%&*((*:*:','ne','ke']
1231       
1232        try:
1233            kens_dir = add_directories(root_dir, directories)
1234        except OSError:
1235            pass
1236        else:
1237            msg = 'bad dir name should give OSError'
1238            raise Exception(msg)   
1239           
1240        #clean up!
1241        os.rmdir(root_dir)
1242
1243    def test_check_list(self):
1244
1245        check_list(['stage','xmomentum'])
1246
1247       
1248#-------------------------------------------------------------
1249if __name__ == "__main__":
1250    suite = unittest.makeSuite(Test_Util,'test')
1251    #suite = unittest.makeSuite(Test_Util,'test_add_directories')
1252    runner = unittest.TextTestRunner()
1253    runner.run(suite)
1254
1255
1256
1257
Note: See TracBrowser for help on using the repository browser.