source: anuga_core/source/anuga/abstract_2d_finite_volumes/test_util.py @ 3983

Last change on this file since 3983 was 3983, checked in by duncan, 17 years ago

new function. Create a directory structure. Handy for writing data to.

File size: 38.6 KB
Line 
1#!/usr/bin/env python
2
3
4import unittest
5from Numeric import zeros, array, allclose, Float
6from math import sqrt, pi
7import tempfile, os
8from os import access, F_OK,sep, removedirs
9
10from anuga.abstract_2d_finite_volumes.util import *
11from anuga.config import epsilon
12from anuga.shallow_water.data_manager import timefile2netcdf
13
14from anuga.utilities.numerical_tools import NAN
15
16def test_function(x, y):
17    return x+y
18
19class Test_Util(unittest.TestCase):
20    def setUp(self):
21        pass
22
23    def tearDown(self):
24        pass
25
26
27
28
29    #Geometric
30    #def test_distance(self):
31    #    from anuga.abstract_2d_finite_volumes.util import distance#
32    #
33    #    self.failUnless( distance([4,2],[7,6]) == 5.0,
34    #                     'Distance is wrong!')
35    #    self.failUnless( allclose(distance([7,6],[9,8]), 2.82842712475),
36    #                    'distance is wrong!')
37    #    self.failUnless( allclose(distance([9,8],[4,2]), 7.81024967591),
38    #                    'distance is wrong!')
39    #
40    #    self.failUnless( distance([9,8],[4,2]) == distance([4,2],[9,8]),
41    #                    'distance is wrong!')
42
43
44    def test_file_function_time1(self):
45        """Test that File function interpolates correctly
46        between given times. No x,y dependency here.
47        """
48
49        #Write file
50        import os, time
51        from anuga.config import time_format
52        from math import sin, pi
53
54        #Typical ASCII file
55        finaltime = 1200
56        filename = 'test_file_function'
57        fid = open(filename + '.txt', 'w')
58        start = time.mktime(time.strptime('2000', '%Y'))
59        dt = 60  #One minute intervals
60        t = 0.0
61        while t <= finaltime:
62            t_string = time.strftime(time_format, time.gmtime(t+start))
63            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
64            t += dt
65
66        fid.close()
67
68        #Convert ASCII file to NetCDF (Which is what we really like!)
69        timefile2netcdf(filename)
70
71
72        #Create file function from time series
73        F = file_function(filename + '.tms',
74                          quantities = ['Attribute0',
75                                        'Attribute1',
76                                        'Attribute2'])
77       
78        #Now try interpolation
79        for i in range(20):
80            t = i*10
81            q = F(t)
82
83            #Exact linear intpolation
84            assert allclose(q[0], 2*t)
85            if i%6 == 0:
86                assert allclose(q[1], t**2)
87                assert allclose(q[2], sin(t*pi/600))
88
89        #Check non-exact
90
91        t = 90 #Halfway between 60 and 120
92        q = F(t)
93        assert allclose( (120**2 + 60**2)/2, q[1] )
94        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
95
96
97        t = 100 #Two thirds of the way between between 60 and 120
98        q = F(t)
99        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
100        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
101
102        os.remove(filename + '.txt')
103        os.remove(filename + '.tms')       
104
105
106       
107    def test_spatio_temporal_file_function_basic(self):
108        """Test that spatio temporal file function performs the correct
109        interpolations in both time and space
110        NetCDF version (x,y,t dependency)       
111        """
112        import time
113
114        #Create sww file of simple propagation from left to right
115        #through rectangular domain
116        from shallow_water import Domain, Dirichlet_boundary
117        from mesh_factory import rectangular
118        from Numeric import take, concatenate, reshape
119
120        #Create basic mesh and shallow water domain
121        points, vertices, boundary = rectangular(3, 3)
122        domain1 = Domain(points, vertices, boundary)
123
124        from anuga.utilities.numerical_tools import mean
125        domain1.reduction = mean
126        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
127                              # only one value.
128
129        domain1.default_order = 2
130        domain1.store = True
131        domain1.set_datadir('.')
132        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
133        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
134
135        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
136        domain1.set_quantity('elevation', 0)
137        domain1.set_quantity('friction', 0)
138        domain1.set_quantity('stage', 0)
139
140        # Boundary conditions
141        B0 = Dirichlet_boundary([0,0,0])
142        B6 = Dirichlet_boundary([0.6,0,0])
143        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
144        domain1.check_integrity()
145
146        finaltime = 8
147        #Evolution
148        t0 = -1
149        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
150            #print 'Timesteps: %.16f, %.16f' %(t0, t)
151            #if t == t0:
152            #    msg = 'Duplicate timestep found: %f, %f' %(t0, t)
153            #   raise msg
154            t0 = t
155             
156            #domain1.write_time()
157
158
159        #Now read data from sww and check
160        from Scientific.IO.NetCDF import NetCDFFile
161        filename = domain1.get_name() + '.' + domain1.format
162        fid = NetCDFFile(filename)
163
164        x = fid.variables['x'][:]
165        y = fid.variables['y'][:]
166        stage = fid.variables['stage'][:]
167        xmomentum = fid.variables['xmomentum'][:]
168        ymomentum = fid.variables['ymomentum'][:]
169        time = fid.variables['time'][:]
170
171        #Take stage vertex values at last timestep on diagonal
172        #Diagonal is identified by vertices: 0, 5, 10, 15
173
174        last_time_index = len(time)-1 #Last last_time_index
175        d_stage = reshape(take(stage[last_time_index, :], [0,5,10,15]), (4,1))
176        d_uh = reshape(take(xmomentum[last_time_index, :], [0,5,10,15]), (4,1))
177        d_vh = reshape(take(ymomentum[last_time_index, :], [0,5,10,15]), (4,1))
178        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
179
180        #Reference interpolated values at midpoints on diagonal at
181        #this timestep are
182        r0 = (D[0] + D[1])/2
183        r1 = (D[1] + D[2])/2
184        r2 = (D[2] + D[3])/2
185
186        #And the midpoints are found now
187        Dx = take(reshape(x, (16,1)), [0,5,10,15])
188        Dy = take(reshape(y, (16,1)), [0,5,10,15])
189
190        diag = concatenate( (Dx, Dy), axis=1)
191        d_midpoints = (diag[1:] + diag[:-1])/2
192
193        #Let us see if the file function can find the correct
194        #values at the midpoints at the last timestep:
195        f = file_function(filename, domain1,
196                          interpolation_points = d_midpoints)
197
198        T = f.get_time()
199        msg = 'duplicate timesteps: %.16f and %.16f' %(T[-1], T[-2])
200        assert not T[-1] == T[-2], msg
201        t = time[last_time_index]
202        q = f(t, point_id=0); assert allclose(r0, q)
203        q = f(t, point_id=1); assert allclose(r1, q)
204        q = f(t, point_id=2); assert allclose(r2, q)
205
206
207        ##################
208        #Now do the same for the first timestep
209
210        timestep = 0 #First timestep
211        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
212        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
213        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
214        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
215
216        #Reference interpolated values at midpoints on diagonal at
217        #this timestep are
218        r0 = (D[0] + D[1])/2
219        r1 = (D[1] + D[2])/2
220        r2 = (D[2] + D[3])/2
221
222        #Let us see if the file function can find the correct
223        #values
224        q = f(0, point_id=0); assert allclose(r0, q)
225        q = f(0, point_id=1); assert allclose(r1, q)
226        q = f(0, point_id=2); assert allclose(r2, q)
227
228
229        ##################
230        #Now do it again for a timestep in the middle
231
232        timestep = 33
233        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
234        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
235        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
236        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
237
238        #Reference interpolated values at midpoints on diagonal at
239        #this timestep are
240        r0 = (D[0] + D[1])/2
241        r1 = (D[1] + D[2])/2
242        r2 = (D[2] + D[3])/2
243
244        q = f(timestep/10., point_id=0); assert allclose(r0, q)
245        q = f(timestep/10., point_id=1); assert allclose(r1, q)
246        q = f(timestep/10., point_id=2); assert allclose(r2, q)
247
248
249        ##################
250        #Now check temporal interpolation
251        #Halfway between timestep 15 and 16
252
253        timestep = 15
254        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
255        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
256        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
257        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
258
259        #Reference interpolated values at midpoints on diagonal at
260        #this timestep are
261        r0_0 = (D[0] + D[1])/2
262        r1_0 = (D[1] + D[2])/2
263        r2_0 = (D[2] + D[3])/2
264
265        #
266        timestep = 16
267        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
268        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
269        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
270        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
271
272        #Reference interpolated values at midpoints on diagonal at
273        #this timestep are
274        r0_1 = (D[0] + D[1])/2
275        r1_1 = (D[1] + D[2])/2
276        r2_1 = (D[2] + D[3])/2
277
278        # The reference values are
279        r0 = (r0_0 + r0_1)/2
280        r1 = (r1_0 + r1_1)/2
281        r2 = (r2_0 + r2_1)/2
282
283        q = f((timestep - 0.5)/10., point_id=0); assert allclose(r0, q)
284        q = f((timestep - 0.5)/10., point_id=1); assert allclose(r1, q)
285        q = f((timestep - 0.5)/10., point_id=2); assert allclose(r2, q)
286
287        ##################
288        #Finally check interpolation 2 thirds of the way
289        #between timestep 15 and 16
290
291        # The reference values are
292        r0 = (r0_0 + 2*r0_1)/3
293        r1 = (r1_0 + 2*r1_1)/3
294        r2 = (r2_0 + 2*r2_1)/3
295
296        #And the file function gives
297        q = f((timestep - 1.0/3)/10., point_id=0); assert allclose(r0, q)
298        q = f((timestep - 1.0/3)/10., point_id=1); assert allclose(r1, q)
299        q = f((timestep - 1.0/3)/10., point_id=2); assert allclose(r2, q)
300
301        fid.close()
302        import os
303        os.remove(filename)
304
305
306
307    def test_spatio_temporal_file_function_different_origin(self):
308        """Test that spatio temporal file function performs the correct
309        interpolations in both time and space where space is offset by
310        xllcorner and yllcorner
311        NetCDF version (x,y,t dependency)       
312        """
313        import time
314
315        #Create sww file of simple propagation from left to right
316        #through rectangular domain
317        from shallow_water import Domain, Dirichlet_boundary
318        from mesh_factory import rectangular
319        from Numeric import take, concatenate, reshape
320
321
322        from anuga.coordinate_transforms.geo_reference import Geo_reference
323        xllcorner = 2048
324        yllcorner = 11000
325        zone = 2
326
327        #Create basic mesh and shallow water domain
328        points, vertices, boundary = rectangular(3, 3)
329        domain1 = Domain(points, vertices, boundary,
330                         geo_reference = Geo_reference(xllcorner = xllcorner,
331                                                       yllcorner = yllcorner))
332       
333
334        from anuga.utilities.numerical_tools import mean       
335        domain1.reduction = mean
336        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
337                              # only one value.
338
339        domain1.default_order = 2
340        domain1.store = True
341        domain1.set_datadir('.')
342        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
343        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
344
345        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
346        domain1.set_quantity('elevation', 0)
347        domain1.set_quantity('friction', 0)
348        domain1.set_quantity('stage', 0)
349
350        # Boundary conditions
351        B0 = Dirichlet_boundary([0,0,0])
352        B6 = Dirichlet_boundary([0.6,0,0])
353        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
354        domain1.check_integrity()
355
356        finaltime = 8
357        #Evolution
358        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
359            pass
360            #domain1.write_time()
361
362
363        #Now read data from sww and check
364        from Scientific.IO.NetCDF import NetCDFFile
365        filename = domain1.get_name() + '.' + domain1.format
366        fid = NetCDFFile(filename)
367
368        x = fid.variables['x'][:]
369        y = fid.variables['y'][:]
370        stage = fid.variables['stage'][:]
371        xmomentum = fid.variables['xmomentum'][:]
372        ymomentum = fid.variables['ymomentum'][:]
373        time = fid.variables['time'][:]
374
375        #Take stage vertex values at last timestep on diagonal
376        #Diagonal is identified by vertices: 0, 5, 10, 15
377
378        last_time_index = len(time)-1 #Last last_time_index     
379        d_stage = reshape(take(stage[last_time_index, :], [0,5,10,15]), (4,1))
380        d_uh = reshape(take(xmomentum[last_time_index, :], [0,5,10,15]), (4,1))
381        d_vh = reshape(take(ymomentum[last_time_index, :], [0,5,10,15]), (4,1))
382        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
383
384        #Reference interpolated values at midpoints on diagonal at
385        #this timestep are
386        r0 = (D[0] + D[1])/2
387        r1 = (D[1] + D[2])/2
388        r2 = (D[2] + D[3])/2
389
390        #And the midpoints are found now
391        Dx = take(reshape(x, (16,1)), [0,5,10,15])
392        Dy = take(reshape(y, (16,1)), [0,5,10,15])
393
394        diag = concatenate( (Dx, Dy), axis=1)
395        d_midpoints = (diag[1:] + diag[:-1])/2
396
397
398        #Adjust for georef - make interpolation points absolute
399        d_midpoints[:,0] += xllcorner
400        d_midpoints[:,1] += yllcorner               
401
402        #Let us see if the file function can find the correct
403        #values at the midpoints at the last timestep:
404        f = file_function(filename, domain1,
405                          interpolation_points = d_midpoints)
406
407        t = time[last_time_index]                         
408        q = f(t, point_id=0); assert allclose(r0, q)
409        q = f(t, point_id=1); assert allclose(r1, q)
410        q = f(t, point_id=2); assert allclose(r2, q)
411
412
413        ##################
414        #Now do the same for the first timestep
415
416        timestep = 0 #First timestep
417        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
418        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
419        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
420        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
421
422        #Reference interpolated values at midpoints on diagonal at
423        #this timestep are
424        r0 = (D[0] + D[1])/2
425        r1 = (D[1] + D[2])/2
426        r2 = (D[2] + D[3])/2
427
428        #Let us see if the file function can find the correct
429        #values
430        q = f(0, point_id=0); assert allclose(r0, q)
431        q = f(0, point_id=1); assert allclose(r1, q)
432        q = f(0, point_id=2); assert allclose(r2, q)
433
434
435        ##################
436        #Now do it again for a timestep in the middle
437
438        timestep = 33
439        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
440        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
441        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
442        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
443
444        #Reference interpolated values at midpoints on diagonal at
445        #this timestep are
446        r0 = (D[0] + D[1])/2
447        r1 = (D[1] + D[2])/2
448        r2 = (D[2] + D[3])/2
449
450        q = f(timestep/10., point_id=0); assert allclose(r0, q)
451        q = f(timestep/10., point_id=1); assert allclose(r1, q)
452        q = f(timestep/10., point_id=2); assert allclose(r2, q)
453
454
455        ##################
456        #Now check temporal interpolation
457        #Halfway between timestep 15 and 16
458
459        timestep = 15
460        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
461        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
462        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
463        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
464
465        #Reference interpolated values at midpoints on diagonal at
466        #this timestep are
467        r0_0 = (D[0] + D[1])/2
468        r1_0 = (D[1] + D[2])/2
469        r2_0 = (D[2] + D[3])/2
470
471        #
472        timestep = 16
473        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
474        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
475        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
476        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
477
478        #Reference interpolated values at midpoints on diagonal at
479        #this timestep are
480        r0_1 = (D[0] + D[1])/2
481        r1_1 = (D[1] + D[2])/2
482        r2_1 = (D[2] + D[3])/2
483
484        # The reference values are
485        r0 = (r0_0 + r0_1)/2
486        r1 = (r1_0 + r1_1)/2
487        r2 = (r2_0 + r2_1)/2
488
489        q = f((timestep - 0.5)/10., point_id=0); assert allclose(r0, q)
490        q = f((timestep - 0.5)/10., point_id=1); assert allclose(r1, q)
491        q = f((timestep - 0.5)/10., point_id=2); assert allclose(r2, q)
492
493        ##################
494        #Finally check interpolation 2 thirds of the way
495        #between timestep 15 and 16
496
497        # The reference values are
498        r0 = (r0_0 + 2*r0_1)/3
499        r1 = (r1_0 + 2*r1_1)/3
500        r2 = (r2_0 + 2*r2_1)/3
501
502        #And the file function gives
503        q = f((timestep - 1.0/3)/10., point_id=0); assert allclose(r0, q)
504        q = f((timestep - 1.0/3)/10., point_id=1); assert allclose(r1, q)
505        q = f((timestep - 1.0/3)/10., point_id=2); assert allclose(r2, q)
506
507        fid.close()
508        import os
509        os.remove(filename)
510
511       
512
513
514    def qtest_spatio_temporal_file_function_time(self):
515        """Test that File function interpolates correctly
516        between given times.
517        NetCDF version (x,y,t dependency)
518        """
519
520        #Create NetCDF (sww) file to be read
521        # x: 0, 5, 10, 15
522        # y: -20, -10, 0, 10
523        # t: 0, 60, 120, ...., 1200
524        #
525        # test quantities (arbitrary but non-trivial expressions):
526        #
527        #   stage     = 3*x - y**2 + 2*t
528        #   xmomentum = exp( -((x-7)**2 + (y+5)**2)/20 ) * t**2
529        #   ymomentum = x**2 + y**2 * sin(t*pi/600)
530
531        #NOTE: Nice test that may render some of the others redundant.
532
533        import os, time
534        from anuga.config import time_format
535        from Numeric import sin, pi, exp
536        from mesh_factory import rectangular
537        from shallow_water import Domain
538        import anuga.shallow_water.data_manager
539
540        finaltime = 1200
541        filename = 'test_file_function'
542
543        #Create a domain to hold test grid
544        #(0:15, -20:10)
545        points, vertices, boundary =\
546                rectangular(4, 4, 15, 30, origin = (0, -20))
547        print "points", points
548
549        #print 'Number of elements', len(vertices)
550        domain = Domain(points, vertices, boundary)
551        domain.smooth = False
552        domain.default_order = 2
553        domain.set_datadir('.')
554        domain.set_name(filename)
555        domain.store = True
556        domain.format = 'sww'   #Native netcdf visualisation format
557
558        #print points
559        start = time.mktime(time.strptime('2000', '%Y'))
560        domain.starttime = start
561
562
563        #Store structure
564        domain.initialise_storage()
565
566        #Compute artificial time steps and store
567        dt = 60  #One minute intervals
568        t = 0.0
569        while t <= finaltime:
570            #Compute quantities
571            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
572            domain.set_quantity('stage', f1)
573
574            f2 = lambda x,y: x+y+t**2
575            domain.set_quantity('xmomentum', f2)
576
577            f3 = lambda x,y: x**2 + y**2 * sin(t*pi/600)
578            domain.set_quantity('ymomentum', f3)
579
580            #Store and advance time
581            domain.time = t
582            domain.store_timestep(domain.conserved_quantities)
583            t += dt
584
585
586        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5]]
587     
588        #Deliberately set domain.starttime to too early
589        domain.starttime = start - 1
590
591        #Create file function
592        F = file_function(filename + '.sww', domain,
593                          quantities = domain.conserved_quantities,
594                          interpolation_points = interpolation_points)
595
596        #Check that FF updates fixes domain starttime
597        assert allclose(domain.starttime, start)
598
599        #Check that domain.starttime isn't updated if later
600        domain.starttime = start + 1
601        F = file_function(filename + '.sww', domain,
602                          quantities = domain.conserved_quantities,
603                          interpolation_points = interpolation_points)
604        assert allclose(domain.starttime, start+1)
605        domain.starttime = start
606
607
608        #Check linear interpolation in time
609        F = file_function(filename + '.sww', domain,
610                          quantities = domain.conserved_quantities,
611                          interpolation_points = interpolation_points)               
612        for id in range(len(interpolation_points)):
613            x = interpolation_points[id][0]
614            y = interpolation_points[id][1]
615
616            for i in range(20):
617                t = i*10
618                k = i%6
619
620                if k == 0:
621                    q0 = F(t, point_id=id)
622                    q1 = F(t+60, point_id=id)
623
624                if q0 == NAN:
625                    actual = q0
626                else:
627                    actual = (k*q1 + (6-k)*q0)/6
628                q = F(t, point_id=id)
629                #print i, k, t, q
630                #print ' ', q0
631                #print ' ', q1
632                print "q",q
633                print "actual", actual
634                #print
635                if q0 == NAN:
636                     self.failUnless( q == actual, 'Fail!')
637                else:
638                    assert allclose(q, actual)
639
640
641        #Another check of linear interpolation in time
642        for id in range(len(interpolation_points)):
643            q60 = F(60, point_id=id)
644            q120 = F(120, point_id=id)
645
646            t = 90 #Halfway between 60 and 120
647            q = F(t, point_id=id)
648            assert allclose( (q120+q60)/2, q )
649
650            t = 100 #Two thirds of the way between between 60 and 120
651            q = F(t, point_id=id)
652            assert allclose(q60/3 + 2*q120/3, q)
653
654
655
656        #Check that domain.starttime isn't updated if later than file starttime but earlier
657        #than file end time
658        delta = 23
659        domain.starttime = start + delta
660        F = file_function(filename + '.sww', domain,
661                          quantities = domain.conserved_quantities,
662                          interpolation_points = interpolation_points)
663        assert allclose(domain.starttime, start+delta)
664
665
666
667
668        #Now try interpolation with delta offset
669        for id in range(len(interpolation_points)):           
670            x = interpolation_points[id][0]
671            y = interpolation_points[id][1]
672
673            for i in range(20):
674                t = i*10
675                k = i%6
676
677                if k == 0:
678                    q0 = F(t-delta, point_id=id)
679                    q1 = F(t+60-delta, point_id=id)
680
681                q = F(t-delta, point_id=id)
682                assert allclose(q, (k*q1 + (6-k)*q0)/6)
683
684
685        os.remove(filename + '.sww')
686
687
688
689    def xtest_spatio_temporal_file_function_time(self):
690        # Test that File function interpolates correctly
691        # When some points are outside the mesh
692
693        import os, time
694        from anuga.config import time_format
695        from Numeric import sin, pi, exp
696        from mesh_factory import rectangular
697        from shallow_water import Domain
698        import anuga.shallow_water.data_manager 
699        from anuga.pmesh.mesh_interface import create_mesh_from_regions
700        finaltime = 1200
701       
702        filename = tempfile.mktemp()
703        #print "filename",filename
704        filename = 'test_file_function'
705
706        meshfilename = tempfile.mktemp(".tsh")
707
708        boundary_tags = {'walls':[0,1],'bom':[2]}
709       
710        polygon_absolute = [[0,-20],[10,-20],[10,15],[-20,15]]
711       
712        create_mesh_from_regions(polygon_absolute,
713                                 boundary_tags,
714                                 10000000,
715                                 filename=meshfilename)
716        domain = Domain(mesh_filename=meshfilename)
717        domain.smooth = False
718        domain.default_order = 2
719        domain.set_datadir('.')
720        domain.set_name(filename)
721        domain.store = True
722        domain.format = 'sww'   #Native netcdf visualisation format
723
724        #print points
725        start = time.mktime(time.strptime('2000', '%Y'))
726        domain.starttime = start
727       
728
729        #Store structure
730        domain.initialise_storage()
731
732        #Compute artificial time steps and store
733        dt = 60  #One minute intervals
734        t = 0.0
735        while t <= finaltime:
736            #Compute quantities
737            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
738            domain.set_quantity('stage', f1)
739
740            f2 = lambda x,y: x+y+t**2
741            domain.set_quantity('xmomentum', f2)
742
743            f3 = lambda x,y: x**2 + y**2 * sin(t*pi/600)
744            domain.set_quantity('ymomentum', f3)
745
746            #Store and advance time
747            domain.time = t
748            domain.store_timestep(domain.conserved_quantities)
749            t += dt
750
751        interpolation_points = [[1,0]]
752        interpolation_points = [[100,1000]]
753       
754        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5],
755                                [78787,78787],[7878,3432]]
756           
757        #Deliberately set domain.starttime to too early
758        domain.starttime = start - 1
759
760        #Create file function
761        F = file_function(filename + '.sww', domain,
762                          quantities = domain.conserved_quantities,
763                          interpolation_points = interpolation_points)
764
765        #Check that FF updates fixes domain starttime
766        assert allclose(domain.starttime, start)
767
768        #Check that domain.starttime isn't updated if later
769        domain.starttime = start + 1
770        F = file_function(filename + '.sww', domain,
771                          quantities = domain.conserved_quantities,
772                          interpolation_points = interpolation_points)
773        assert allclose(domain.starttime, start+1)
774        domain.starttime = start
775
776
777        #Check linear interpolation in time
778        # checking points inside and outside the mesh
779        F = file_function(filename + '.sww', domain,
780                          quantities = domain.conserved_quantities,
781                          interpolation_points = interpolation_points)
782       
783        for id in range(len(interpolation_points)):
784            x = interpolation_points[id][0]
785            y = interpolation_points[id][1]
786
787            for i in range(20):
788                t = i*10
789                k = i%6
790
791                if k == 0:
792                    q0 = F(t, point_id=id)
793                    q1 = F(t+60, point_id=id)
794
795                if q0 == NAN:
796                    actual = q0
797                else:
798                    actual = (k*q1 + (6-k)*q0)/6
799                q = F(t, point_id=id)
800                #print i, k, t, q
801                #print ' ', q0
802                #print ' ', q1
803                #print "q",q
804                #print "actual", actual
805                #print
806                if q0 == NAN:
807                     self.failUnless( q == actual, 'Fail!')
808                else:
809                    assert allclose(q, actual)
810
811        # now lets check points inside the mesh
812        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14]] #, [10,-12.5]] - this point doesn't work WHY?
813        interpolation_points = [[10,-12.5]]
814           
815        print "len(interpolation_points)",len(interpolation_points) 
816        F = file_function(filename + '.sww', domain,
817                          quantities = domain.conserved_quantities,
818                          interpolation_points = interpolation_points)
819
820        domain.starttime = start
821
822
823        #Check linear interpolation in time
824        F = file_function(filename + '.sww', domain,
825                          quantities = domain.conserved_quantities,
826                          interpolation_points = interpolation_points)               
827        for id in range(len(interpolation_points)):
828            x = interpolation_points[id][0]
829            y = interpolation_points[id][1]
830
831            for i in range(20):
832                t = i*10
833                k = i%6
834
835                if k == 0:
836                    q0 = F(t, point_id=id)
837                    q1 = F(t+60, point_id=id)
838
839                if q0 == NAN:
840                    actual = q0
841                else:
842                    actual = (k*q1 + (6-k)*q0)/6
843                q = F(t, point_id=id)
844                print "############"
845                print "id, x, y ", id, x, y #k, t, q
846                print "t", t
847                #print ' ', q0
848                #print ' ', q1
849                print "q",q
850                print "actual", actual
851                #print
852                if q0 == NAN:
853                     self.failUnless( q == actual, 'Fail!')
854                else:
855                    assert allclose(q, actual)
856
857
858        #Another check of linear interpolation in time
859        for id in range(len(interpolation_points)):
860            q60 = F(60, point_id=id)
861            q120 = F(120, point_id=id)
862
863            t = 90 #Halfway between 60 and 120
864            q = F(t, point_id=id)
865            assert allclose( (q120+q60)/2, q )
866
867            t = 100 #Two thirds of the way between between 60 and 120
868            q = F(t, point_id=id)
869            assert allclose(q60/3 + 2*q120/3, q)
870
871
872
873        #Check that domain.starttime isn't updated if later than file starttime but earlier
874        #than file end time
875        delta = 23
876        domain.starttime = start + delta
877        F = file_function(filename + '.sww', domain,
878                          quantities = domain.conserved_quantities,
879                          interpolation_points = interpolation_points)
880        assert allclose(domain.starttime, start+delta)
881
882
883
884
885        #Now try interpolation with delta offset
886        for id in range(len(interpolation_points)):           
887            x = interpolation_points[id][0]
888            y = interpolation_points[id][1]
889
890            for i in range(20):
891                t = i*10
892                k = i%6
893
894                if k == 0:
895                    q0 = F(t-delta, point_id=id)
896                    q1 = F(t+60-delta, point_id=id)
897
898                q = F(t-delta, point_id=id)
899                assert allclose(q, (k*q1 + (6-k)*q0)/6)
900
901
902        os.remove(filename + '.sww')
903
904    def test_file_function_time_with_domain(self):
905        """Test that File function interpolates correctly
906        between given times. No x,y dependency here.
907        Use domain with starttime
908        """
909
910        #Write file
911        import os, time, calendar
912        from anuga.config import time_format
913        from math import sin, pi
914        from domain import Domain
915
916        finaltime = 1200
917        filename = 'test_file_function'
918        fid = open(filename + '.txt', 'w')
919        start = time.mktime(time.strptime('2000', '%Y'))
920        dt = 60  #One minute intervals
921        t = 0.0
922        while t <= finaltime:
923            t_string = time.strftime(time_format, time.gmtime(t+start))
924            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
925            t += dt
926
927        fid.close()
928
929
930        #Convert ASCII file to NetCDF (Which is what we really like!)
931        timefile2netcdf(filename)
932
933
934
935        a = [0.0, 0.0]
936        b = [4.0, 0.0]
937        c = [0.0, 3.0]
938
939        points = [a, b, c]
940        vertices = [[0,1,2]]
941        domain = Domain(points, vertices)
942
943        #Check that domain.starttime is updated if non-existing
944        F = file_function(filename + '.tms', domain)
945       
946        assert allclose(domain.starttime, start)
947
948        #Check that domain.starttime is updated if too early
949        domain.starttime = start - 1
950        F = file_function(filename + '.tms', domain)
951        assert allclose(domain.starttime, start)
952
953        #Check that domain.starttime isn't updated if later
954        domain.starttime = start + 1
955        F = file_function(filename + '.tms', domain)       
956        assert allclose(domain.starttime, start+1)
957
958        domain.starttime = start
959        F = file_function(filename + '.tms', domain,
960                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])
961       
962
963        #print F.T
964        #print F.precomputed_values
965        #print 'F(60)', F(60)
966       
967        #Now try interpolation
968        for i in range(20):
969            t = i*10
970            q = F(t)
971
972            #Exact linear intpolation
973            assert allclose(q[0], 2*t)
974            if i%6 == 0:
975                assert allclose(q[1], t**2)
976                assert allclose(q[2], sin(t*pi/600))
977
978        #Check non-exact
979
980        t = 90 #Halfway between 60 and 120
981        q = F(t)
982        assert allclose( (120**2 + 60**2)/2, q[1] )
983        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
984
985
986        t = 100 #Two thirds of the way between between 60 and 120
987        q = F(t)
988        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
989        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
990
991        os.remove(filename + '.tms')
992        os.remove(filename + '.txt')       
993
994    def test_file_function_time_with_domain_different_start(self):
995        """Test that File function interpolates correctly
996        between given times. No x,y dependency here.
997        Use domain with a starttime later than that of file
998
999        ASCII version
1000        """
1001
1002        #Write file
1003        import os, time, calendar
1004        from anuga.config import time_format
1005        from math import sin, pi
1006        from domain import Domain
1007
1008        finaltime = 1200
1009        filename = 'test_file_function'
1010        fid = open(filename + '.txt', 'w')
1011        start = time.mktime(time.strptime('2000', '%Y'))
1012        dt = 60  #One minute intervals
1013        t = 0.0
1014        while t <= finaltime:
1015            t_string = time.strftime(time_format, time.gmtime(t+start))
1016            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
1017            t += dt
1018
1019        fid.close()
1020
1021        #Convert ASCII file to NetCDF (Which is what we really like!)
1022        timefile2netcdf(filename)       
1023
1024        a = [0.0, 0.0]
1025        b = [4.0, 0.0]
1026        c = [0.0, 3.0]
1027
1028        points = [a, b, c]
1029        vertices = [[0,1,2]]
1030        domain = Domain(points, vertices)
1031
1032        #Check that domain.starttime isn't updated if later than file starttime but earlier
1033        #than file end time
1034        delta = 23
1035        domain.starttime = start + delta
1036        F = file_function(filename + '.tms', domain,
1037                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])       
1038        assert allclose(domain.starttime, start+delta)
1039
1040
1041
1042
1043        #Now try interpolation with delta offset
1044        for i in range(20):
1045            t = i*10
1046            q = F(t-delta)
1047
1048            #Exact linear intpolation
1049            assert allclose(q[0], 2*t)
1050            if i%6 == 0:
1051                assert allclose(q[1], t**2)
1052                assert allclose(q[2], sin(t*pi/600))
1053
1054        #Check non-exact
1055
1056        t = 90 #Halfway between 60 and 120
1057        q = F(t-delta)
1058        assert allclose( (120**2 + 60**2)/2, q[1] )
1059        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
1060
1061
1062        t = 100 #Two thirds of the way between between 60 and 120
1063        q = F(t-delta)
1064        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
1065        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
1066
1067
1068        os.remove(filename + '.tms')
1069        os.remove(filename + '.txt')               
1070
1071
1072
1073    def test_apply_expression_to_dictionary(self):
1074
1075        #FIXME: Division is not expected to work for integers.
1076        #This must be caught.
1077        foo = array([[1,2,3],
1078                     [4,5,6]], Float)
1079
1080        bar = array([[-1,0,5],
1081                     [6,1,1]], Float)                 
1082
1083        D = {'X': foo, 'Y': bar}
1084
1085        Z = apply_expression_to_dictionary('X+Y', D)       
1086        assert allclose(Z, foo+bar)
1087
1088        Z = apply_expression_to_dictionary('X*Y', D)       
1089        assert allclose(Z, foo*bar)       
1090
1091        Z = apply_expression_to_dictionary('4*X+Y', D)       
1092        assert allclose(Z, 4*foo+bar)       
1093
1094        # test zero division is OK
1095        Z = apply_expression_to_dictionary('X/Y', D)
1096        assert allclose(1/Z, 1/(foo/bar)) # can't compare inf to inf
1097
1098        # make an error for zero on zero
1099        # this is really an error in Numeric, SciPy core can handle it
1100        # Z = apply_expression_to_dictionary('0/Y', D)
1101
1102        #Check exceptions
1103        try:
1104            #Wrong name
1105            Z = apply_expression_to_dictionary('4*X+A', D)       
1106        except NameError:
1107            pass
1108        else:
1109            msg = 'Should have raised a NameError Exception'
1110            raise msg
1111
1112
1113        try:
1114            #Wrong order
1115            Z = apply_expression_to_dictionary(D, '4*X+A')       
1116        except AssertionError:
1117            pass
1118        else:
1119            msg = 'Should have raised a AssertionError Exception'
1120            raise msg       
1121       
1122
1123    def test_multiple_replace(self):
1124        """Hard test that checks a true word-by-word simultaneous replace
1125        """
1126       
1127        D = {'x': 'xi', 'y': 'eta', 'xi':'lam'}
1128        exp = '3*x+y + xi'
1129       
1130        new = multiple_replace(exp, D)
1131       
1132        assert new == '3*xi+eta + lam'
1133                         
1134
1135
1136    def test_point_on_line_obsolete(self):
1137        """Test that obsolete call issues appropriate warning"""
1138
1139        #Turn warning into an exception
1140        import warnings
1141        warnings.filterwarnings('error')
1142
1143        try:
1144            assert point_on_line( 0, 0.5, 0,1, 0,0 )
1145        except DeprecationWarning:
1146            pass
1147        else:
1148            msg = 'point_on_line should have issued a DeprecationWarning'
1149            raise Exception(msg)   
1150
1151        warnings.resetwarnings()
1152   
1153    def test_get_version_info(self):
1154        info = get_version_info()
1155        assert info.startswith('Revision')
1156                         
1157       
1158    def test_add_directories(self):
1159       
1160        import tempfile
1161        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
1162        directories = ['ja','ne','ke']
1163        kens_dir = add_directories(root_dir, directories)
1164        assert kens_dir == root_dir + sep + 'ja' + sep + 'ne' + \
1165               sep + 'ke'
1166        assert access(root_dir,F_OK)
1167
1168        #clean up!
1169        os.rmdir(kens_dir)
1170        os.rmdir(root_dir + sep + 'ja' + sep + 'ne')
1171        os.rmdir(root_dir + sep + 'ja')
1172        os.rmdir(root_dir)
1173
1174#-------------------------------------------------------------
1175if __name__ == "__main__":
1176    suite = unittest.makeSuite(Test_Util,'test')
1177    #suite = unittest.makeSuite(Test_Util,'test_add_directories')
1178    runner = unittest.TextTestRunner()
1179    runner.run(suite)
1180
1181
1182
1183
Note: See TracBrowser for help on using the repository browser.