source: anuga_core/source/anuga/abstract_2d_finite_volumes/test_util.py @ 3563

Last change on this file since 3563 was 3563, checked in by ole, 18 years ago

Moved shallow water out from the old pyvolution directory.
All tests pass, most examples and okushiri works again.

File size: 38.0 KB
Line 
1#!/usr/bin/env python
2
3
4import unittest
5from Numeric import zeros, array, allclose, Float
6from math import sqrt, pi
7import tempfile
8
9from anuga.abstract_2d_finite_volumes.util import *
10from anuga.config import epsilon
11from anuga.shallow_water.data_manager import timefile2netcdf
12
13from anuga.utilities.numerical_tools import NAN
14
15def test_function(x, y):
16    return x+y
17
18class Test_Util(unittest.TestCase):
19    def setUp(self):
20        pass
21
22    def tearDown(self):
23        pass
24
25
26
27
28    #Geometric
29    #def test_distance(self):
30    #    from anuga.abstract_2d_finite_volumes.util import distance#
31    #
32    #    self.failUnless( distance([4,2],[7,6]) == 5.0,
33    #                     'Distance is wrong!')
34    #    self.failUnless( allclose(distance([7,6],[9,8]), 2.82842712475),
35    #                    'distance is wrong!')
36    #    self.failUnless( allclose(distance([9,8],[4,2]), 7.81024967591),
37    #                    'distance is wrong!')
38    #
39    #    self.failUnless( distance([9,8],[4,2]) == distance([4,2],[9,8]),
40    #                    'distance is wrong!')
41
42
43    def test_file_function_time1(self):
44        """Test that File function interpolates correctly
45        between given times. No x,y dependency here.
46        """
47
48        #Write file
49        import os, time
50        from anuga.config import time_format
51        from math import sin, pi
52
53        #Typical ASCII file
54        finaltime = 1200
55        filename = 'test_file_function'
56        fid = open(filename + '.txt', 'w')
57        start = time.mktime(time.strptime('2000', '%Y'))
58        dt = 60  #One minute intervals
59        t = 0.0
60        while t <= finaltime:
61            t_string = time.strftime(time_format, time.gmtime(t+start))
62            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
63            t += dt
64
65        fid.close()
66
67        #Convert ASCII file to NetCDF (Which is what we really like!)
68        timefile2netcdf(filename)
69
70
71        #Create file function from time series
72        F = file_function(filename + '.tms',
73                          quantities = ['Attribute0',
74                                        'Attribute1',
75                                        'Attribute2'])
76       
77        #Now try interpolation
78        for i in range(20):
79            t = i*10
80            q = F(t)
81
82            #Exact linear intpolation
83            assert allclose(q[0], 2*t)
84            if i%6 == 0:
85                assert allclose(q[1], t**2)
86                assert allclose(q[2], sin(t*pi/600))
87
88        #Check non-exact
89
90        t = 90 #Halfway between 60 and 120
91        q = F(t)
92        assert allclose( (120**2 + 60**2)/2, q[1] )
93        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
94
95
96        t = 100 #Two thirds of the way between between 60 and 120
97        q = F(t)
98        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
99        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
100
101        os.remove(filename + '.txt')
102        os.remove(filename + '.tms')       
103
104
105       
106    def test_spatio_temporal_file_function_basic(self):
107        """Test that spatio temporal file function performs the correct
108        interpolations in both time and space
109        NetCDF version (x,y,t dependency)       
110        """
111        import time
112
113        #Create sww file of simple propagation from left to right
114        #through rectangular domain
115        from shallow_water import Domain, Dirichlet_boundary
116        from mesh_factory import rectangular
117        from Numeric import take, concatenate, reshape
118
119        #Create basic mesh and shallow water domain
120        points, vertices, boundary = rectangular(3, 3)
121        domain1 = Domain(points, vertices, boundary)
122
123        from anuga.utilities.numerical_tools import mean
124        domain1.reduction = mean
125        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
126                              # only one value.
127
128        domain1.default_order = 2
129        domain1.store = True
130        domain1.set_datadir('.')
131        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
132        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
133
134        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
135        domain1.set_quantity('elevation', 0)
136        domain1.set_quantity('friction', 0)
137        domain1.set_quantity('stage', 0)
138
139        # Boundary conditions
140        B0 = Dirichlet_boundary([0,0,0])
141        B6 = Dirichlet_boundary([0.6,0,0])
142        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
143        domain1.check_integrity()
144
145        finaltime = 8
146        #Evolution
147        t0 = -1
148        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
149            #print 'Timesteps: %.16f, %.16f' %(t0, t)
150            #if t == t0:
151            #    msg = 'Duplicate timestep found: %f, %f' %(t0, t)
152            #   raise msg
153            t0 = t
154             
155            #domain1.write_time()
156
157
158        #Now read data from sww and check
159        from Scientific.IO.NetCDF import NetCDFFile
160        filename = domain1.get_name() + '.' + domain1.format
161        fid = NetCDFFile(filename)
162
163        x = fid.variables['x'][:]
164        y = fid.variables['y'][:]
165        stage = fid.variables['stage'][:]
166        xmomentum = fid.variables['xmomentum'][:]
167        ymomentum = fid.variables['ymomentum'][:]
168        time = fid.variables['time'][:]
169
170        #Take stage vertex values at last timestep on diagonal
171        #Diagonal is identified by vertices: 0, 5, 10, 15
172
173        last_time_index = len(time)-1 #Last last_time_index
174        d_stage = reshape(take(stage[last_time_index, :], [0,5,10,15]), (4,1))
175        d_uh = reshape(take(xmomentum[last_time_index, :], [0,5,10,15]), (4,1))
176        d_vh = reshape(take(ymomentum[last_time_index, :], [0,5,10,15]), (4,1))
177        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
178
179        #Reference interpolated values at midpoints on diagonal at
180        #this timestep are
181        r0 = (D[0] + D[1])/2
182        r1 = (D[1] + D[2])/2
183        r2 = (D[2] + D[3])/2
184
185        #And the midpoints are found now
186        Dx = take(reshape(x, (16,1)), [0,5,10,15])
187        Dy = take(reshape(y, (16,1)), [0,5,10,15])
188
189        diag = concatenate( (Dx, Dy), axis=1)
190        d_midpoints = (diag[1:] + diag[:-1])/2
191
192        #Let us see if the file function can find the correct
193        #values at the midpoints at the last timestep:
194        f = file_function(filename, domain1,
195                          interpolation_points = d_midpoints)
196
197        T = f.get_time()
198        msg = 'duplicate timesteps: %.16f and %.16f' %(T[-1], T[-2])
199        assert not T[-1] == T[-2], msg
200        t = time[last_time_index]
201        q = f(t, point_id=0); assert allclose(r0, q)
202        q = f(t, point_id=1); assert allclose(r1, q)
203        q = f(t, point_id=2); assert allclose(r2, q)
204
205
206        ##################
207        #Now do the same for the first timestep
208
209        timestep = 0 #First timestep
210        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
211        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
212        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
213        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
214
215        #Reference interpolated values at midpoints on diagonal at
216        #this timestep are
217        r0 = (D[0] + D[1])/2
218        r1 = (D[1] + D[2])/2
219        r2 = (D[2] + D[3])/2
220
221        #Let us see if the file function can find the correct
222        #values
223        q = f(0, point_id=0); assert allclose(r0, q)
224        q = f(0, point_id=1); assert allclose(r1, q)
225        q = f(0, point_id=2); assert allclose(r2, q)
226
227
228        ##################
229        #Now do it again for a timestep in the middle
230
231        timestep = 33
232        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
233        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
234        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
235        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
236
237        #Reference interpolated values at midpoints on diagonal at
238        #this timestep are
239        r0 = (D[0] + D[1])/2
240        r1 = (D[1] + D[2])/2
241        r2 = (D[2] + D[3])/2
242
243        q = f(timestep/10., point_id=0); assert allclose(r0, q)
244        q = f(timestep/10., point_id=1); assert allclose(r1, q)
245        q = f(timestep/10., point_id=2); assert allclose(r2, q)
246
247
248        ##################
249        #Now check temporal interpolation
250        #Halfway between timestep 15 and 16
251
252        timestep = 15
253        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
254        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
255        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
256        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
257
258        #Reference interpolated values at midpoints on diagonal at
259        #this timestep are
260        r0_0 = (D[0] + D[1])/2
261        r1_0 = (D[1] + D[2])/2
262        r2_0 = (D[2] + D[3])/2
263
264        #
265        timestep = 16
266        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
267        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
268        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
269        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
270
271        #Reference interpolated values at midpoints on diagonal at
272        #this timestep are
273        r0_1 = (D[0] + D[1])/2
274        r1_1 = (D[1] + D[2])/2
275        r2_1 = (D[2] + D[3])/2
276
277        # The reference values are
278        r0 = (r0_0 + r0_1)/2
279        r1 = (r1_0 + r1_1)/2
280        r2 = (r2_0 + r2_1)/2
281
282        q = f((timestep - 0.5)/10., point_id=0); assert allclose(r0, q)
283        q = f((timestep - 0.5)/10., point_id=1); assert allclose(r1, q)
284        q = f((timestep - 0.5)/10., point_id=2); assert allclose(r2, q)
285
286        ##################
287        #Finally check interpolation 2 thirds of the way
288        #between timestep 15 and 16
289
290        # The reference values are
291        r0 = (r0_0 + 2*r0_1)/3
292        r1 = (r1_0 + 2*r1_1)/3
293        r2 = (r2_0 + 2*r2_1)/3
294
295        #And the file function gives
296        q = f((timestep - 1.0/3)/10., point_id=0); assert allclose(r0, q)
297        q = f((timestep - 1.0/3)/10., point_id=1); assert allclose(r1, q)
298        q = f((timestep - 1.0/3)/10., point_id=2); assert allclose(r2, q)
299
300        fid.close()
301        import os
302        os.remove(filename)
303
304
305
306    def test_spatio_temporal_file_function_different_origin(self):
307        """Test that spatio temporal file function performs the correct
308        interpolations in both time and space where space is offset by
309        xllcorner and yllcorner
310        NetCDF version (x,y,t dependency)       
311        """
312        import time
313
314        #Create sww file of simple propagation from left to right
315        #through rectangular domain
316        from shallow_water import Domain, Dirichlet_boundary
317        from mesh_factory import rectangular
318        from Numeric import take, concatenate, reshape
319
320
321        from anuga.coordinate_transforms.geo_reference import Geo_reference
322        xllcorner = 2048
323        yllcorner = 11000
324        zone = 2
325
326        #Create basic mesh and shallow water domain
327        points, vertices, boundary = rectangular(3, 3)
328        domain1 = Domain(points, vertices, boundary,
329                         geo_reference = Geo_reference(xllcorner = xllcorner,
330                                                       yllcorner = yllcorner))
331       
332
333        from anuga.utilities.numerical_tools import mean       
334        domain1.reduction = mean
335        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
336                              # only one value.
337
338        domain1.default_order = 2
339        domain1.store = True
340        domain1.set_datadir('.')
341        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
342        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
343
344        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
345        domain1.set_quantity('elevation', 0)
346        domain1.set_quantity('friction', 0)
347        domain1.set_quantity('stage', 0)
348
349        # Boundary conditions
350        B0 = Dirichlet_boundary([0,0,0])
351        B6 = Dirichlet_boundary([0.6,0,0])
352        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
353        domain1.check_integrity()
354
355        finaltime = 8
356        #Evolution
357        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
358            pass
359            #domain1.write_time()
360
361
362        #Now read data from sww and check
363        from Scientific.IO.NetCDF import NetCDFFile
364        filename = domain1.get_name() + '.' + domain1.format
365        fid = NetCDFFile(filename)
366
367        x = fid.variables['x'][:]
368        y = fid.variables['y'][:]
369        stage = fid.variables['stage'][:]
370        xmomentum = fid.variables['xmomentum'][:]
371        ymomentum = fid.variables['ymomentum'][:]
372        time = fid.variables['time'][:]
373
374        #Take stage vertex values at last timestep on diagonal
375        #Diagonal is identified by vertices: 0, 5, 10, 15
376
377        last_time_index = len(time)-1 #Last last_time_index     
378        d_stage = reshape(take(stage[last_time_index, :], [0,5,10,15]), (4,1))
379        d_uh = reshape(take(xmomentum[last_time_index, :], [0,5,10,15]), (4,1))
380        d_vh = reshape(take(ymomentum[last_time_index, :], [0,5,10,15]), (4,1))
381        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
382
383        #Reference interpolated values at midpoints on diagonal at
384        #this timestep are
385        r0 = (D[0] + D[1])/2
386        r1 = (D[1] + D[2])/2
387        r2 = (D[2] + D[3])/2
388
389        #And the midpoints are found now
390        Dx = take(reshape(x, (16,1)), [0,5,10,15])
391        Dy = take(reshape(y, (16,1)), [0,5,10,15])
392
393        diag = concatenate( (Dx, Dy), axis=1)
394        d_midpoints = (diag[1:] + diag[:-1])/2
395
396
397        #Adjust for georef - make interpolation points absolute
398        d_midpoints[:,0] += xllcorner
399        d_midpoints[:,1] += yllcorner               
400
401        #Let us see if the file function can find the correct
402        #values at the midpoints at the last timestep:
403        f = file_function(filename, domain1,
404                          interpolation_points = d_midpoints)
405
406        t = time[last_time_index]                         
407        q = f(t, point_id=0); assert allclose(r0, q)
408        q = f(t, point_id=1); assert allclose(r1, q)
409        q = f(t, point_id=2); assert allclose(r2, q)
410
411
412        ##################
413        #Now do the same for the first timestep
414
415        timestep = 0 #First timestep
416        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
417        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
418        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
419        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
420
421        #Reference interpolated values at midpoints on diagonal at
422        #this timestep are
423        r0 = (D[0] + D[1])/2
424        r1 = (D[1] + D[2])/2
425        r2 = (D[2] + D[3])/2
426
427        #Let us see if the file function can find the correct
428        #values
429        q = f(0, point_id=0); assert allclose(r0, q)
430        q = f(0, point_id=1); assert allclose(r1, q)
431        q = f(0, point_id=2); assert allclose(r2, q)
432
433
434        ##################
435        #Now do it again for a timestep in the middle
436
437        timestep = 33
438        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
439        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
440        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
441        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
442
443        #Reference interpolated values at midpoints on diagonal at
444        #this timestep are
445        r0 = (D[0] + D[1])/2
446        r1 = (D[1] + D[2])/2
447        r2 = (D[2] + D[3])/2
448
449        q = f(timestep/10., point_id=0); assert allclose(r0, q)
450        q = f(timestep/10., point_id=1); assert allclose(r1, q)
451        q = f(timestep/10., point_id=2); assert allclose(r2, q)
452
453
454        ##################
455        #Now check temporal interpolation
456        #Halfway between timestep 15 and 16
457
458        timestep = 15
459        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
460        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
461        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
462        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
463
464        #Reference interpolated values at midpoints on diagonal at
465        #this timestep are
466        r0_0 = (D[0] + D[1])/2
467        r1_0 = (D[1] + D[2])/2
468        r2_0 = (D[2] + D[3])/2
469
470        #
471        timestep = 16
472        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
473        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
474        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
475        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
476
477        #Reference interpolated values at midpoints on diagonal at
478        #this timestep are
479        r0_1 = (D[0] + D[1])/2
480        r1_1 = (D[1] + D[2])/2
481        r2_1 = (D[2] + D[3])/2
482
483        # The reference values are
484        r0 = (r0_0 + r0_1)/2
485        r1 = (r1_0 + r1_1)/2
486        r2 = (r2_0 + r2_1)/2
487
488        q = f((timestep - 0.5)/10., point_id=0); assert allclose(r0, q)
489        q = f((timestep - 0.5)/10., point_id=1); assert allclose(r1, q)
490        q = f((timestep - 0.5)/10., point_id=2); assert allclose(r2, q)
491
492        ##################
493        #Finally check interpolation 2 thirds of the way
494        #between timestep 15 and 16
495
496        # The reference values are
497        r0 = (r0_0 + 2*r0_1)/3
498        r1 = (r1_0 + 2*r1_1)/3
499        r2 = (r2_0 + 2*r2_1)/3
500
501        #And the file function gives
502        q = f((timestep - 1.0/3)/10., point_id=0); assert allclose(r0, q)
503        q = f((timestep - 1.0/3)/10., point_id=1); assert allclose(r1, q)
504        q = f((timestep - 1.0/3)/10., point_id=2); assert allclose(r2, q)
505
506        fid.close()
507        import os
508        os.remove(filename)
509
510       
511
512
513    def qtest_spatio_temporal_file_function_time(self):
514        """Test that File function interpolates correctly
515        between given times.
516        NetCDF version (x,y,t dependency)
517        """
518
519        #Create NetCDF (sww) file to be read
520        # x: 0, 5, 10, 15
521        # y: -20, -10, 0, 10
522        # t: 0, 60, 120, ...., 1200
523        #
524        # test quantities (arbitrary but non-trivial expressions):
525        #
526        #   stage     = 3*x - y**2 + 2*t
527        #   xmomentum = exp( -((x-7)**2 + (y+5)**2)/20 ) * t**2
528        #   ymomentum = x**2 + y**2 * sin(t*pi/600)
529
530        #NOTE: Nice test that may render some of the others redundant.
531
532        import os, time
533        from anuga.config import time_format
534        from Numeric import sin, pi, exp
535        from mesh_factory import rectangular
536        from shallow_water import Domain
537        import anuga.shallow_water.data_manager
538
539        finaltime = 1200
540        filename = 'test_file_function'
541
542        #Create a domain to hold test grid
543        #(0:15, -20:10)
544        points, vertices, boundary =\
545                rectangular(4, 4, 15, 30, origin = (0, -20))
546        print "points", points
547
548        #print 'Number of elements', len(vertices)
549        domain = Domain(points, vertices, boundary)
550        domain.smooth = False
551        domain.default_order = 2
552        domain.set_datadir('.')
553        domain.set_name(filename)
554        domain.store = True
555        domain.format = 'sww'   #Native netcdf visualisation format
556
557        #print points
558        start = time.mktime(time.strptime('2000', '%Y'))
559        domain.starttime = start
560
561
562        #Store structure
563        domain.initialise_storage()
564
565        #Compute artificial time steps and store
566        dt = 60  #One minute intervals
567        t = 0.0
568        while t <= finaltime:
569            #Compute quantities
570            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
571            domain.set_quantity('stage', f1)
572
573            f2 = lambda x,y: x+y+t**2
574            domain.set_quantity('xmomentum', f2)
575
576            f3 = lambda x,y: x**2 + y**2 * sin(t*pi/600)
577            domain.set_quantity('ymomentum', f3)
578
579            #Store and advance time
580            domain.time = t
581            domain.store_timestep(domain.conserved_quantities)
582            t += dt
583
584
585        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5]]
586     
587        #Deliberately set domain.starttime to too early
588        domain.starttime = start - 1
589
590        #Create file function
591        F = file_function(filename + '.sww', domain,
592                          quantities = domain.conserved_quantities,
593                          interpolation_points = interpolation_points)
594
595        #Check that FF updates fixes domain starttime
596        assert allclose(domain.starttime, start)
597
598        #Check that domain.starttime isn't updated if later
599        domain.starttime = start + 1
600        F = file_function(filename + '.sww', domain,
601                          quantities = domain.conserved_quantities,
602                          interpolation_points = interpolation_points)
603        assert allclose(domain.starttime, start+1)
604        domain.starttime = start
605
606
607        #Check linear interpolation in time
608        F = file_function(filename + '.sww', domain,
609                          quantities = domain.conserved_quantities,
610                          interpolation_points = interpolation_points)               
611        for id in range(len(interpolation_points)):
612            x = interpolation_points[id][0]
613            y = interpolation_points[id][1]
614
615            for i in range(20):
616                t = i*10
617                k = i%6
618
619                if k == 0:
620                    q0 = F(t, point_id=id)
621                    q1 = F(t+60, point_id=id)
622
623                if q0 == NAN:
624                    actual = q0
625                else:
626                    actual = (k*q1 + (6-k)*q0)/6
627                q = F(t, point_id=id)
628                #print i, k, t, q
629                #print ' ', q0
630                #print ' ', q1
631                print "q",q
632                print "actual", actual
633                #print
634                if q0 == NAN:
635                     self.failUnless( q == actual, 'Fail!')
636                else:
637                    assert allclose(q, actual)
638
639
640        #Another check of linear interpolation in time
641        for id in range(len(interpolation_points)):
642            q60 = F(60, point_id=id)
643            q120 = F(120, point_id=id)
644
645            t = 90 #Halfway between 60 and 120
646            q = F(t, point_id=id)
647            assert allclose( (q120+q60)/2, q )
648
649            t = 100 #Two thirds of the way between between 60 and 120
650            q = F(t, point_id=id)
651            assert allclose(q60/3 + 2*q120/3, q)
652
653
654
655        #Check that domain.starttime isn't updated if later than file starttime but earlier
656        #than file end time
657        delta = 23
658        domain.starttime = start + delta
659        F = file_function(filename + '.sww', domain,
660                          quantities = domain.conserved_quantities,
661                          interpolation_points = interpolation_points)
662        assert allclose(domain.starttime, start+delta)
663
664
665
666
667        #Now try interpolation with delta offset
668        for id in range(len(interpolation_points)):           
669            x = interpolation_points[id][0]
670            y = interpolation_points[id][1]
671
672            for i in range(20):
673                t = i*10
674                k = i%6
675
676                if k == 0:
677                    q0 = F(t-delta, point_id=id)
678                    q1 = F(t+60-delta, point_id=id)
679
680                q = F(t-delta, point_id=id)
681                assert allclose(q, (k*q1 + (6-k)*q0)/6)
682
683
684        os.remove(filename + '.sww')
685
686
687
688    def xtest_spatio_temporal_file_function_time(self):
689        # Test that File function interpolates correctly
690        # When some points are outside the mesh
691
692        import os, time
693        from anuga.config import time_format
694        from Numeric import sin, pi, exp
695        from mesh_factory import rectangular
696        from shallow_water import Domain
697        import anuga.shallow_water.data_manager 
698        from anuga.pmesh.mesh_interface import create_mesh_from_regions
699        finaltime = 1200
700       
701        filename = tempfile.mktemp()
702        #print "filename",filename
703        filename = 'test_file_function'
704
705        meshfilename = tempfile.mktemp(".tsh")
706
707        boundary_tags = {'walls':[0,1],'bom':[2]}
708       
709        polygon_absolute = [[0,-20],[10,-20],[10,15],[-20,15]]
710       
711        create_mesh_from_regions(polygon_absolute,
712                                 boundary_tags,
713                                 10000000,
714                                 filename=meshfilename)
715        domain = Domain(mesh_filename=meshfilename)
716        domain.smooth = False
717        domain.default_order = 2
718        domain.set_datadir('.')
719        domain.set_name(filename)
720        domain.store = True
721        domain.format = 'sww'   #Native netcdf visualisation format
722
723        #print points
724        start = time.mktime(time.strptime('2000', '%Y'))
725        domain.starttime = start
726       
727
728        #Store structure
729        domain.initialise_storage()
730
731        #Compute artificial time steps and store
732        dt = 60  #One minute intervals
733        t = 0.0
734        while t <= finaltime:
735            #Compute quantities
736            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
737            domain.set_quantity('stage', f1)
738
739            f2 = lambda x,y: x+y+t**2
740            domain.set_quantity('xmomentum', f2)
741
742            f3 = lambda x,y: x**2 + y**2 * sin(t*pi/600)
743            domain.set_quantity('ymomentum', f3)
744
745            #Store and advance time
746            domain.time = t
747            domain.store_timestep(domain.conserved_quantities)
748            t += dt
749
750        interpolation_points = [[1,0]]
751        interpolation_points = [[100,1000]]
752       
753        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5],
754                                [78787,78787],[7878,3432]]
755           
756        #Deliberately set domain.starttime to too early
757        domain.starttime = start - 1
758
759        #Create file function
760        F = file_function(filename + '.sww', domain,
761                          quantities = domain.conserved_quantities,
762                          interpolation_points = interpolation_points)
763
764        #Check that FF updates fixes domain starttime
765        assert allclose(domain.starttime, start)
766
767        #Check that domain.starttime isn't updated if later
768        domain.starttime = start + 1
769        F = file_function(filename + '.sww', domain,
770                          quantities = domain.conserved_quantities,
771                          interpolation_points = interpolation_points)
772        assert allclose(domain.starttime, start+1)
773        domain.starttime = start
774
775
776        #Check linear interpolation in time
777        # checking points inside and outside the mesh
778        F = file_function(filename + '.sww', domain,
779                          quantities = domain.conserved_quantities,
780                          interpolation_points = interpolation_points)
781       
782        for id in range(len(interpolation_points)):
783            x = interpolation_points[id][0]
784            y = interpolation_points[id][1]
785
786            for i in range(20):
787                t = i*10
788                k = i%6
789
790                if k == 0:
791                    q0 = F(t, point_id=id)
792                    q1 = F(t+60, point_id=id)
793
794                if q0 == NAN:
795                    actual = q0
796                else:
797                    actual = (k*q1 + (6-k)*q0)/6
798                q = F(t, point_id=id)
799                #print i, k, t, q
800                #print ' ', q0
801                #print ' ', q1
802                #print "q",q
803                #print "actual", actual
804                #print
805                if q0 == NAN:
806                     self.failUnless( q == actual, 'Fail!')
807                else:
808                    assert allclose(q, actual)
809
810        # now lets check points inside the mesh
811        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14]] #, [10,-12.5]] - this point doesn't work WHY?
812        interpolation_points = [[10,-12.5]]
813           
814        print "len(interpolation_points)",len(interpolation_points) 
815        F = file_function(filename + '.sww', domain,
816                          quantities = domain.conserved_quantities,
817                          interpolation_points = interpolation_points)
818
819        domain.starttime = start
820
821
822        #Check linear interpolation in time
823        F = file_function(filename + '.sww', domain,
824                          quantities = domain.conserved_quantities,
825                          interpolation_points = interpolation_points)               
826        for id in range(len(interpolation_points)):
827            x = interpolation_points[id][0]
828            y = interpolation_points[id][1]
829
830            for i in range(20):
831                t = i*10
832                k = i%6
833
834                if k == 0:
835                    q0 = F(t, point_id=id)
836                    q1 = F(t+60, point_id=id)
837
838                if q0 == NAN:
839                    actual = q0
840                else:
841                    actual = (k*q1 + (6-k)*q0)/6
842                q = F(t, point_id=id)
843                print "############"
844                print "id, x, y ", id, x, y #k, t, q
845                print "t", t
846                #print ' ', q0
847                #print ' ', q1
848                print "q",q
849                print "actual", actual
850                #print
851                if q0 == NAN:
852                     self.failUnless( q == actual, 'Fail!')
853                else:
854                    assert allclose(q, actual)
855
856
857        #Another check of linear interpolation in time
858        for id in range(len(interpolation_points)):
859            q60 = F(60, point_id=id)
860            q120 = F(120, point_id=id)
861
862            t = 90 #Halfway between 60 and 120
863            q = F(t, point_id=id)
864            assert allclose( (q120+q60)/2, q )
865
866            t = 100 #Two thirds of the way between between 60 and 120
867            q = F(t, point_id=id)
868            assert allclose(q60/3 + 2*q120/3, q)
869
870
871
872        #Check that domain.starttime isn't updated if later than file starttime but earlier
873        #than file end time
874        delta = 23
875        domain.starttime = start + delta
876        F = file_function(filename + '.sww', domain,
877                          quantities = domain.conserved_quantities,
878                          interpolation_points = interpolation_points)
879        assert allclose(domain.starttime, start+delta)
880
881
882
883
884        #Now try interpolation with delta offset
885        for id in range(len(interpolation_points)):           
886            x = interpolation_points[id][0]
887            y = interpolation_points[id][1]
888
889            for i in range(20):
890                t = i*10
891                k = i%6
892
893                if k == 0:
894                    q0 = F(t-delta, point_id=id)
895                    q1 = F(t+60-delta, point_id=id)
896
897                q = F(t-delta, point_id=id)
898                assert allclose(q, (k*q1 + (6-k)*q0)/6)
899
900
901        os.remove(filename + '.sww')
902
903    def test_file_function_time_with_domain(self):
904        """Test that File function interpolates correctly
905        between given times. No x,y dependency here.
906        Use domain with starttime
907        """
908
909        #Write file
910        import os, time, calendar
911        from anuga.config import time_format
912        from math import sin, pi
913        from domain import Domain
914
915        finaltime = 1200
916        filename = 'test_file_function'
917        fid = open(filename + '.txt', 'w')
918        start = time.mktime(time.strptime('2000', '%Y'))
919        dt = 60  #One minute intervals
920        t = 0.0
921        while t <= finaltime:
922            t_string = time.strftime(time_format, time.gmtime(t+start))
923            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
924            t += dt
925
926        fid.close()
927
928
929        #Convert ASCII file to NetCDF (Which is what we really like!)
930        timefile2netcdf(filename)
931
932
933
934        a = [0.0, 0.0]
935        b = [4.0, 0.0]
936        c = [0.0, 3.0]
937
938        points = [a, b, c]
939        vertices = [[0,1,2]]
940        domain = Domain(points, vertices)
941
942        #Check that domain.starttime is updated if non-existing
943        F = file_function(filename + '.tms', domain)
944       
945        assert allclose(domain.starttime, start)
946
947        #Check that domain.starttime is updated if too early
948        domain.starttime = start - 1
949        F = file_function(filename + '.tms', domain)
950        assert allclose(domain.starttime, start)
951
952        #Check that domain.starttime isn't updated if later
953        domain.starttime = start + 1
954        F = file_function(filename + '.tms', domain)       
955        assert allclose(domain.starttime, start+1)
956
957        domain.starttime = start
958        F = file_function(filename + '.tms', domain,
959                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])
960       
961
962        #print F.T
963        #print F.precomputed_values
964        #print 'F(60)', F(60)
965       
966        #Now try interpolation
967        for i in range(20):
968            t = i*10
969            q = F(t)
970
971            #Exact linear intpolation
972            assert allclose(q[0], 2*t)
973            if i%6 == 0:
974                assert allclose(q[1], t**2)
975                assert allclose(q[2], sin(t*pi/600))
976
977        #Check non-exact
978
979        t = 90 #Halfway between 60 and 120
980        q = F(t)
981        assert allclose( (120**2 + 60**2)/2, q[1] )
982        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
983
984
985        t = 100 #Two thirds of the way between between 60 and 120
986        q = F(t)
987        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
988        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
989
990        os.remove(filename + '.tms')
991        os.remove(filename + '.txt')       
992
993    def test_file_function_time_with_domain_different_start(self):
994        """Test that File function interpolates correctly
995        between given times. No x,y dependency here.
996        Use domain with a starttime later than that of file
997
998        ASCII version
999        """
1000
1001        #Write file
1002        import os, time, calendar
1003        from anuga.config import time_format
1004        from math import sin, pi
1005        from domain import Domain
1006
1007        finaltime = 1200
1008        filename = 'test_file_function'
1009        fid = open(filename + '.txt', 'w')
1010        start = time.mktime(time.strptime('2000', '%Y'))
1011        dt = 60  #One minute intervals
1012        t = 0.0
1013        while t <= finaltime:
1014            t_string = time.strftime(time_format, time.gmtime(t+start))
1015            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
1016            t += dt
1017
1018        fid.close()
1019
1020        #Convert ASCII file to NetCDF (Which is what we really like!)
1021        timefile2netcdf(filename)       
1022
1023        a = [0.0, 0.0]
1024        b = [4.0, 0.0]
1025        c = [0.0, 3.0]
1026
1027        points = [a, b, c]
1028        vertices = [[0,1,2]]
1029        domain = Domain(points, vertices)
1030
1031        #Check that domain.starttime isn't updated if later than file starttime but earlier
1032        #than file end time
1033        delta = 23
1034        domain.starttime = start + delta
1035        F = file_function(filename + '.tms', domain,
1036                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])       
1037        assert allclose(domain.starttime, start+delta)
1038
1039
1040
1041
1042        #Now try interpolation with delta offset
1043        for i in range(20):
1044            t = i*10
1045            q = F(t-delta)
1046
1047            #Exact linear intpolation
1048            assert allclose(q[0], 2*t)
1049            if i%6 == 0:
1050                assert allclose(q[1], t**2)
1051                assert allclose(q[2], sin(t*pi/600))
1052
1053        #Check non-exact
1054
1055        t = 90 #Halfway between 60 and 120
1056        q = F(t-delta)
1057        assert allclose( (120**2 + 60**2)/2, q[1] )
1058        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
1059
1060
1061        t = 100 #Two thirds of the way between between 60 and 120
1062        q = F(t-delta)
1063        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
1064        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
1065
1066
1067        os.remove(filename + '.tms')
1068        os.remove(filename + '.txt')               
1069
1070
1071
1072    def test_apply_expression_to_dictionary(self):
1073
1074        #FIXME: Division is not expected to work for integers.
1075        #This must be caught.
1076        foo = array([[1,2,3],
1077                     [4,5,6]], Float)
1078
1079        bar = array([[-1,0,5],
1080                     [6,1,1]], Float)                 
1081
1082        D = {'X': foo, 'Y': bar}
1083
1084        Z = apply_expression_to_dictionary('X+Y', D)       
1085        assert allclose(Z, foo+bar)
1086
1087        Z = apply_expression_to_dictionary('X*Y', D)       
1088        assert allclose(Z, foo*bar)       
1089
1090        Z = apply_expression_to_dictionary('4*X+Y', D)       
1091        assert allclose(Z, 4*foo+bar)       
1092
1093        # test zero division is OK
1094        Z = apply_expression_to_dictionary('X/Y', D)
1095        assert allclose(1/Z, 1/(foo/bar)) # can't compare inf to inf
1096
1097        # make an error for zero on zero
1098        # this is really an error in Numeric, SciPy core can handle it
1099        # Z = apply_expression_to_dictionary('0/Y', D)
1100
1101        #Check exceptions
1102        try:
1103            #Wrong name
1104            Z = apply_expression_to_dictionary('4*X+A', D)       
1105        except NameError:
1106            pass
1107        else:
1108            msg = 'Should have raised a NameError Exception'
1109            raise msg
1110
1111
1112        try:
1113            #Wrong order
1114            Z = apply_expression_to_dictionary(D, '4*X+A')       
1115        except AssertionError:
1116            pass
1117        else:
1118            msg = 'Should have raised a AssertionError Exception'
1119            raise msg       
1120       
1121
1122    def test_multiple_replace(self):
1123        """Hard test that checks a true word-by-word simultaneous replace
1124        """
1125       
1126        D = {'x': 'xi', 'y': 'eta', 'xi':'lam'}
1127        exp = '3*x+y + xi'
1128       
1129        new = multiple_replace(exp, D)
1130       
1131        assert new == '3*xi+eta + lam'
1132                         
1133
1134
1135    def test_point_on_line_obsolete(self):
1136        """Test that obsolete call issues appropriate warning"""
1137
1138        #Turn warning into an exception
1139        import warnings
1140        warnings.filterwarnings('error')
1141
1142        try:
1143            assert point_on_line( 0, 0.5, 0,1, 0,0 )
1144        except DeprecationWarning:
1145            pass
1146        else:
1147            msg = 'point_on_line should have issued a DeprecationWarning'
1148            raise Exception(msg)   
1149
1150        warnings.resetwarnings()
1151   
1152    def test_get_version_info(self):
1153        info = get_version_info()
1154        assert info.startswith('Revision')
1155                         
1156                         
1157
1158#-------------------------------------------------------------
1159if __name__ == "__main__":
1160    suite = unittest.makeSuite(Test_Util,'test')
1161    #suite = unittest.makeSuite(Test_Util,'test_apply')
1162    runner = unittest.TextTestRunner()
1163    runner.run(suite)
1164
1165
1166
1167
Note: See TracBrowser for help on using the repository browser.