source: inundation-numpy-branch/pyvolution/test_util.py @ 3031

Last change on this file since 3031 was 2526, checked in by ole, 19 years ago

Moved more general numerical functionality into utilities/numerical_tools.py

File size: 29.8 KB
Line 
1#!/usr/bin/env python
2
3
4import unittest
5from Numeric import zeros, array, allclose, Float
6from math import sqrt, pi
7
8from util import *
9from config import epsilon
10from data_manager import timefile2netcdf
11
12
13def test_function(x, y):
14    return x+y
15
16class Test_Util(unittest.TestCase):
17    def setUp(self):
18        pass
19
20    def tearDown(self):
21        pass
22
23
24
25
26    #Geometric
27    #def test_distance(self):
28    #    from util import distance#
29    #
30    #    self.failUnless( distance([4,2],[7,6]) == 5.0,
31    #                     'Distance is wrong!')
32    #    self.failUnless( allclose(distance([7,6],[9,8]), 2.82842712475),
33    #                    'distance is wrong!')
34    #    self.failUnless( allclose(distance([9,8],[4,2]), 7.81024967591),
35    #                    'distance is wrong!')
36    #
37    #    self.failUnless( distance([9,8],[4,2]) == distance([4,2],[9,8]),
38    #                    'distance is wrong!')
39
40
41    def test_file_function_time1(self):
42        """Test that File function interpolates correctly
43        between given times. No x,y dependency here.
44        """
45
46        #Write file
47        import os, time
48        from config import time_format
49        from math import sin, pi
50
51        #Typical ASCII file
52        finaltime = 1200
53        filename = 'test_file_function'
54        fid = open(filename + '.txt', 'w')
55        start = time.mktime(time.strptime('2000', '%Y'))
56        dt = 60  #One minute intervals
57        t = 0.0
58        while t <= finaltime:
59            t_string = time.strftime(time_format, time.gmtime(t+start))
60            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
61            t += dt
62
63        fid.close()
64
65        #Convert ASCII file to NetCDF (Which is what we really like!)
66        timefile2netcdf(filename)
67
68
69        #Create file function from time series
70        F = file_function(filename + '.tms',
71                          quantities = ['Attribute0',
72                                        'Attribute1',
73                                        'Attribute2'])
74       
75        #Now try interpolation
76        for i in range(20):
77            t = i*10
78            q = F(t)
79
80            #Exact linear intpolation
81            assert allclose(q[0], 2*t)
82            if i%6 == 0:
83                assert allclose(q[1], t**2)
84                assert allclose(q[2], sin(t*pi/600))
85
86        #Check non-exact
87
88        t = 90 #Halfway between 60 and 120
89        q = F(t)
90        assert allclose( (120**2 + 60**2)/2, q[1] )
91        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
92
93
94        t = 100 #Two thirds of the way between between 60 and 120
95        q = F(t)
96        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
97        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
98
99        os.remove(filename + '.txt')
100        os.remove(filename + '.tms')       
101
102
103       
104    def test_spatio_temporal_file_function(self):
105        """Test that spatio temporal file function performs the correct
106        interpolations in both time and space
107        NetCDF version (x,y,t dependency)       
108        """
109        import time
110
111        #Create sww file of simple propagation from left to right
112        #through rectangular domain
113        from shallow_water import Domain, Dirichlet_boundary
114        from mesh_factory import rectangular
115        from Numeric import take, concatenate, reshape
116
117        #Create basic mesh and shallow water domain
118        points, vertices, boundary = rectangular(3, 3)
119        domain1 = Domain(points, vertices, boundary)
120
121        from utilities.numerical_tools import mean
122        domain1.reduction = mean
123        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
124                              # only one value.
125
126        domain1.default_order = 2
127        domain1.store = True
128        domain1.set_datadir('.')
129        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
130        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
131
132        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
133        domain1.set_quantity('elevation', 0)
134        domain1.set_quantity('friction', 0)
135        domain1.set_quantity('stage', 0)
136
137        # Boundary conditions
138        B0 = Dirichlet_boundary([0,0,0])
139        B6 = Dirichlet_boundary([0.6,0,0])
140        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
141        domain1.check_integrity()
142
143        finaltime = 8
144        #Evolution
145        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
146            pass
147            #domain1.write_time()
148
149
150        #Now read data from sww and check
151        from Scientific.IO.NetCDF import NetCDFFile
152        filename = domain1.get_name() + '.' + domain1.format
153        fid = NetCDFFile(filename)
154
155        x = fid.variables['x'][:]
156        y = fid.variables['y'][:]
157        stage = fid.variables['stage'][:]
158        xmomentum = fid.variables['xmomentum'][:]
159        ymomentum = fid.variables['ymomentum'][:]
160        time = fid.variables['time'][:]
161
162        #Take stage vertex values at last timestep on diagonal
163        #Diagonal is identified by vertices: 0, 5, 10, 15
164
165        timestep = len(time)-1 #Last timestep
166        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
167        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
168        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
169        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
170
171        #Reference interpolated values at midpoints on diagonal at
172        #this timestep are
173        r0 = (D[0] + D[1])/2
174        r1 = (D[1] + D[2])/2
175        r2 = (D[2] + D[3])/2
176
177        #And the midpoints are found now
178        Dx = take(reshape(x, (16,1)), [0,5,10,15])
179        Dy = take(reshape(y, (16,1)), [0,5,10,15])
180
181        diag = concatenate( (Dx, Dy), axis=1)
182        d_midpoints = (diag[1:] + diag[:-1])/2
183
184        #Let us see if the file function can find the correct
185        #values at the midpoints at the last timestep:
186        f = file_function(filename, domain1,
187                          interpolation_points = d_midpoints)
188
189        q = f(timestep/10., point_id=0); assert allclose(r0, q)
190        q = f(timestep/10., point_id=1); assert allclose(r1, q)
191        q = f(timestep/10., point_id=2); assert allclose(r2, q)
192
193
194        ##################
195        #Now do the same for the first timestep
196
197        timestep = 0 #First timestep
198        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
199        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
200        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
201        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
202
203        #Reference interpolated values at midpoints on diagonal at
204        #this timestep are
205        r0 = (D[0] + D[1])/2
206        r1 = (D[1] + D[2])/2
207        r2 = (D[2] + D[3])/2
208
209        #Let us see if the file function can find the correct
210        #values
211        q = f(0, point_id=0); assert allclose(r0, q)
212        q = f(0, point_id=1); assert allclose(r1, q)
213        q = f(0, point_id=2); assert allclose(r2, q)
214
215
216        ##################
217        #Now do it again for a timestep in the middle
218
219        timestep = 33
220        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
221        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
222        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
223        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
224
225        #Reference interpolated values at midpoints on diagonal at
226        #this timestep are
227        r0 = (D[0] + D[1])/2
228        r1 = (D[1] + D[2])/2
229        r2 = (D[2] + D[3])/2
230
231        q = f(timestep/10., point_id=0); assert allclose(r0, q)
232        q = f(timestep/10., point_id=1); assert allclose(r1, q)
233        q = f(timestep/10., point_id=2); assert allclose(r2, q)
234
235
236        ##################
237        #Now check temporal interpolation
238        #Halfway between timestep 15 and 16
239
240        timestep = 15
241        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
242        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
243        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
244        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
245
246        #Reference interpolated values at midpoints on diagonal at
247        #this timestep are
248        r0_0 = (D[0] + D[1])/2
249        r1_0 = (D[1] + D[2])/2
250        r2_0 = (D[2] + D[3])/2
251
252        #
253        timestep = 16
254        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
255        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
256        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
257        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
258
259        #Reference interpolated values at midpoints on diagonal at
260        #this timestep are
261        r0_1 = (D[0] + D[1])/2
262        r1_1 = (D[1] + D[2])/2
263        r2_1 = (D[2] + D[3])/2
264
265        # The reference values are
266        r0 = (r0_0 + r0_1)/2
267        r1 = (r1_0 + r1_1)/2
268        r2 = (r2_0 + r2_1)/2
269
270        q = f((timestep - 0.5)/10., point_id=0); assert allclose(r0, q)
271        q = f((timestep - 0.5)/10., point_id=1); assert allclose(r1, q)
272        q = f((timestep - 0.5)/10., point_id=2); assert allclose(r2, q)
273
274        ##################
275        #Finally check interpolation 2 thirds of the way
276        #between timestep 15 and 16
277
278        # The reference values are
279        r0 = (r0_0 + 2*r0_1)/3
280        r1 = (r1_0 + 2*r1_1)/3
281        r2 = (r2_0 + 2*r2_1)/3
282
283        #And the file function gives
284        q = f((timestep - 1.0/3)/10., point_id=0); assert allclose(r0, q)
285        q = f((timestep - 1.0/3)/10., point_id=1); assert allclose(r1, q)
286        q = f((timestep - 1.0/3)/10., point_id=2); assert allclose(r2, q)
287
288        fid.close()
289        import os
290        os.remove(filename)
291
292
293
294    def test_spatio_temporal_file_function_different_origin(self):
295        """Test that spatio temporal file function performs the correct
296        interpolations in both time and space where space is offset by
297        xllcorner and yllcorner
298        NetCDF version (x,y,t dependency)       
299        """
300        import time
301
302        #Create sww file of simple propagation from left to right
303        #through rectangular domain
304        from shallow_water import Domain, Dirichlet_boundary
305        from mesh_factory import rectangular
306        from Numeric import take, concatenate, reshape
307
308
309        from coordinate_transforms.geo_reference import Geo_reference
310        xllcorner = 2048
311        yllcorner = 11000
312        zone = 2
313
314        #Create basic mesh and shallow water domain
315        points, vertices, boundary = rectangular(3, 3)
316        domain1 = Domain(points, vertices, boundary,
317                         geo_reference = Geo_reference(xllcorner = xllcorner,
318                                                       yllcorner = yllcorner))
319       
320
321        from utilities.numerical_tools import mean     
322        domain1.reduction = mean
323        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
324                              # only one value.
325
326        domain1.default_order = 2
327        domain1.store = True
328        domain1.set_datadir('.')
329        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
330        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
331
332        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
333        domain1.set_quantity('elevation', 0)
334        domain1.set_quantity('friction', 0)
335        domain1.set_quantity('stage', 0)
336
337        # Boundary conditions
338        B0 = Dirichlet_boundary([0,0,0])
339        B6 = Dirichlet_boundary([0.6,0,0])
340        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
341        domain1.check_integrity()
342
343        finaltime = 8
344        #Evolution
345        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
346            pass
347            #domain1.write_time()
348
349
350        #Now read data from sww and check
351        from Scientific.IO.NetCDF import NetCDFFile
352        filename = domain1.get_name() + '.' + domain1.format
353        fid = NetCDFFile(filename)
354
355        x = fid.variables['x'][:]
356        y = fid.variables['y'][:]
357        stage = fid.variables['stage'][:]
358        xmomentum = fid.variables['xmomentum'][:]
359        ymomentum = fid.variables['ymomentum'][:]
360        time = fid.variables['time'][:]
361
362        #Take stage vertex values at last timestep on diagonal
363        #Diagonal is identified by vertices: 0, 5, 10, 15
364
365        timestep = len(time)-1 #Last timestep
366        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
367        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
368        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
369        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
370
371        #Reference interpolated values at midpoints on diagonal at
372        #this timestep are
373        r0 = (D[0] + D[1])/2
374        r1 = (D[1] + D[2])/2
375        r2 = (D[2] + D[3])/2
376
377        #And the midpoints are found now
378        Dx = take(reshape(x, (16,1)), [0,5,10,15])
379        Dy = take(reshape(y, (16,1)), [0,5,10,15])
380
381        diag = concatenate( (Dx, Dy), axis=1)
382        d_midpoints = (diag[1:] + diag[:-1])/2
383
384
385        #Adjust for georef - make interpolation points absolute
386        d_midpoints[:,0] += xllcorner
387        d_midpoints[:,1] += yllcorner               
388
389        #Let us see if the file function can find the correct
390        #values at the midpoints at the last timestep:
391        f = file_function(filename, domain1,
392                          interpolation_points = d_midpoints)
393
394        q = f(timestep/10., point_id=0); assert allclose(r0, q)
395        q = f(timestep/10., point_id=1); assert allclose(r1, q)
396        q = f(timestep/10., point_id=2); assert allclose(r2, q)
397
398
399        ##################
400        #Now do the same for the first timestep
401
402        timestep = 0 #First timestep
403        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
404        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
405        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
406        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
407
408        #Reference interpolated values at midpoints on diagonal at
409        #this timestep are
410        r0 = (D[0] + D[1])/2
411        r1 = (D[1] + D[2])/2
412        r2 = (D[2] + D[3])/2
413
414        #Let us see if the file function can find the correct
415        #values
416        q = f(0, point_id=0); assert allclose(r0, q)
417        q = f(0, point_id=1); assert allclose(r1, q)
418        q = f(0, point_id=2); assert allclose(r2, q)
419
420
421        ##################
422        #Now do it again for a timestep in the middle
423
424        timestep = 33
425        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
426        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
427        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
428        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
429
430        #Reference interpolated values at midpoints on diagonal at
431        #this timestep are
432        r0 = (D[0] + D[1])/2
433        r1 = (D[1] + D[2])/2
434        r2 = (D[2] + D[3])/2
435
436        q = f(timestep/10., point_id=0); assert allclose(r0, q)
437        q = f(timestep/10., point_id=1); assert allclose(r1, q)
438        q = f(timestep/10., point_id=2); assert allclose(r2, q)
439
440
441        ##################
442        #Now check temporal interpolation
443        #Halfway between timestep 15 and 16
444
445        timestep = 15
446        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
447        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
448        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
449        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
450
451        #Reference interpolated values at midpoints on diagonal at
452        #this timestep are
453        r0_0 = (D[0] + D[1])/2
454        r1_0 = (D[1] + D[2])/2
455        r2_0 = (D[2] + D[3])/2
456
457        #
458        timestep = 16
459        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
460        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
461        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
462        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
463
464        #Reference interpolated values at midpoints on diagonal at
465        #this timestep are
466        r0_1 = (D[0] + D[1])/2
467        r1_1 = (D[1] + D[2])/2
468        r2_1 = (D[2] + D[3])/2
469
470        # The reference values are
471        r0 = (r0_0 + r0_1)/2
472        r1 = (r1_0 + r1_1)/2
473        r2 = (r2_0 + r2_1)/2
474
475        q = f((timestep - 0.5)/10., point_id=0); assert allclose(r0, q)
476        q = f((timestep - 0.5)/10., point_id=1); assert allclose(r1, q)
477        q = f((timestep - 0.5)/10., point_id=2); assert allclose(r2, q)
478
479        ##################
480        #Finally check interpolation 2 thirds of the way
481        #between timestep 15 and 16
482
483        # The reference values are
484        r0 = (r0_0 + 2*r0_1)/3
485        r1 = (r1_0 + 2*r1_1)/3
486        r2 = (r2_0 + 2*r2_1)/3
487
488        #And the file function gives
489        q = f((timestep - 1.0/3)/10., point_id=0); assert allclose(r0, q)
490        q = f((timestep - 1.0/3)/10., point_id=1); assert allclose(r1, q)
491        q = f((timestep - 1.0/3)/10., point_id=2); assert allclose(r2, q)
492
493        fid.close()
494        import os
495        os.remove(filename)
496
497       
498
499
500    def test_spatio_temporal_file_function_time(self):
501        """Test that File function interpolates correctly
502        between given times.
503        NetCDF version (x,y,t dependency)
504        """
505
506        #Create NetCDF (sww) file to be read
507        # x: 0, 5, 10, 15
508        # y: -20, -10, 0, 10
509        # t: 0, 60, 120, ...., 1200
510        #
511        # test quantities (arbitrary but non-trivial expressions):
512        #
513        #   stage     = 3*x - y**2 + 2*t
514        #   xmomentum = exp( -((x-7)**2 + (y+5)**2)/20 ) * t**2
515        #   ymomentum = x**2 + y**2 * sin(t*pi/600)
516
517        #NOTE: Nice test that may render some of the others redundant.
518
519        import os, time
520        from config import time_format
521        from Numeric import sin, pi, exp
522        from mesh_factory import rectangular
523        from shallow_water import Domain
524        import data_manager
525
526        finaltime = 1200
527        filename = 'test_file_function'
528
529        #Create a domain to hold test grid
530        #(0:15, -20:10)
531        points, vertices, boundary =\
532                rectangular(4, 4, 15, 30, origin = (0, -20))
533
534
535        #print 'Number of elements', len(vertices)
536        domain = Domain(points, vertices, boundary)
537        domain.smooth = False
538        domain.default_order = 2
539        domain.set_datadir('.')
540        domain.set_name(filename)
541        domain.store = True
542        domain.format = 'sww'   #Native netcdf visualisation format
543
544        #print points
545        start = time.mktime(time.strptime('2000', '%Y'))
546        domain.starttime = start
547
548
549        #Store structure
550        domain.initialise_storage()
551
552        #Compute artificial time steps and store
553        dt = 60  #One minute intervals
554        t = 0.0
555        while t <= finaltime:
556            #Compute quantities
557            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
558            domain.set_quantity('stage', f1)
559
560            f2 = lambda x,y: x+y+t**2
561            domain.set_quantity('xmomentum', f2)
562
563            f3 = lambda x,y: x**2 + y**2 * sin(t*pi/600)
564            domain.set_quantity('ymomentum', f3)
565
566            #Store and advance time
567            domain.time = t
568            domain.store_timestep(domain.conserved_quantities)
569            t += dt
570
571
572        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5]]
573
574
575
576        #Deliberately set domain.starttime to too early
577        domain.starttime = start - 1
578
579        #Create file function
580        F = file_function(filename + '.sww', domain,
581                          quantities = domain.conserved_quantities,
582                          interpolation_points = interpolation_points)
583
584        #Check that FF updates fixes domain starttime
585        assert allclose(domain.starttime, start)
586
587        #Check that domain.starttime isn't updated if later
588        domain.starttime = start + 1
589        F = file_function(filename + '.sww', domain,
590                          quantities = domain.conserved_quantities,
591                          interpolation_points = interpolation_points)
592        assert allclose(domain.starttime, start+1)
593        domain.starttime = start
594
595
596        #Check linear interpolation in time
597        F = file_function(filename + '.sww', domain,
598                          quantities = domain.conserved_quantities,
599                          interpolation_points = interpolation_points)               
600        for id in range(len(interpolation_points)):
601            x = interpolation_points[id][0]
602            y = interpolation_points[id][1]
603
604            for i in range(20):
605                t = i*10
606                k = i%6
607
608                if k == 0:
609                    q0 = F(t, point_id=id)
610                    q1 = F(t+60, point_id=id)
611
612
613                q = F(t, point_id=id)
614                #print i, k, t, q
615                #print ' ', q0
616                #print ' ', q1
617                #print 's', (k*q1 + (6-k)*q0)/6
618                #print
619                assert allclose(q, (k*q1 + (6-k)*q0)/6)
620
621
622        #Another check of linear interpolation in time
623        for id in range(len(interpolation_points)):
624            q60 = F(60, point_id=id)
625            q120 = F(120, point_id=id)
626
627            t = 90 #Halfway between 60 and 120
628            q = F(t, point_id=id)
629            assert allclose( (q120+q60)/2, q )
630
631            t = 100 #Two thirds of the way between between 60 and 120
632            q = F(t, point_id=id)
633            assert allclose(q60/3 + 2*q120/3, q)
634
635
636
637        #Check that domain.starttime isn't updated if later than file starttime but earlier
638        #than file end time
639        delta = 23
640        domain.starttime = start + delta
641        F = file_function(filename + '.sww', domain,
642                          quantities = domain.conserved_quantities,
643                          interpolation_points = interpolation_points)
644        assert allclose(domain.starttime, start+delta)
645
646
647
648
649        #Now try interpolation with delta offset
650        for id in range(len(interpolation_points)):           
651            x = interpolation_points[id][0]
652            y = interpolation_points[id][1]
653
654            for i in range(20):
655                t = i*10
656                k = i%6
657
658                if k == 0:
659                    q0 = F(t-delta, point_id=id)
660                    q1 = F(t+60-delta, point_id=id)
661
662                q = F(t-delta, point_id=id)
663                assert allclose(q, (k*q1 + (6-k)*q0)/6)
664
665
666        os.remove(filename + '.sww')
667
668
669
670    def test_file_function_time_with_domain(self):
671        """Test that File function interpolates correctly
672        between given times. No x,y dependency here.
673        Use domain with starttime
674        """
675
676        #Write file
677        import os, time, calendar
678        from config import time_format
679        from math import sin, pi
680        from domain import Domain
681
682        finaltime = 1200
683        filename = 'test_file_function'
684        fid = open(filename + '.txt', 'w')
685        start = time.mktime(time.strptime('2000', '%Y'))
686        dt = 60  #One minute intervals
687        t = 0.0
688        while t <= finaltime:
689            t_string = time.strftime(time_format, time.gmtime(t+start))
690            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
691            t += dt
692
693        fid.close()
694
695
696        #Convert ASCII file to NetCDF (Which is what we really like!)
697        timefile2netcdf(filename)
698
699
700
701        a = [0.0, 0.0]
702        b = [4.0, 0.0]
703        c = [0.0, 3.0]
704
705        points = [a, b, c]
706        vertices = [[0,1,2]]
707        domain = Domain(points, vertices)
708
709        #Check that domain.starttime is updated if non-existing
710        F = file_function(filename + '.tms', domain)
711       
712        assert allclose(domain.starttime, start)
713
714        #Check that domain.starttime is updated if too early
715        domain.starttime = start - 1
716        F = file_function(filename + '.tms', domain)
717        assert allclose(domain.starttime, start)
718
719        #Check that domain.starttime isn't updated if later
720        domain.starttime = start + 1
721        F = file_function(filename + '.tms', domain)       
722        assert allclose(domain.starttime, start+1)
723
724        domain.starttime = start
725        F = file_function(filename + '.tms', domain,
726                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])
727       
728
729        #print F.T
730        #print F.precomputed_values
731        #print 'F(60)', F(60)
732       
733        #Now try interpolation
734        for i in range(20):
735            t = i*10
736            q = F(t)
737
738            #Exact linear intpolation
739            assert allclose(q[0], 2*t)
740            if i%6 == 0:
741                assert allclose(q[1], t**2)
742                assert allclose(q[2], sin(t*pi/600))
743
744        #Check non-exact
745
746        t = 90 #Halfway between 60 and 120
747        q = F(t)
748        assert allclose( (120**2 + 60**2)/2, q[1] )
749        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
750
751
752        t = 100 #Two thirds of the way between between 60 and 120
753        q = F(t)
754        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
755        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
756
757        os.remove(filename + '.tms')
758        os.remove(filename + '.txt')       
759
760    def test_file_function_time_with_domain_different_start(self):
761        """Test that File function interpolates correctly
762        between given times. No x,y dependency here.
763        Use domain with a starttime later than that of file
764
765        ASCII version
766        """
767
768        #Write file
769        import os, time, calendar
770        from config import time_format
771        from math import sin, pi
772        from domain import Domain
773
774        finaltime = 1200
775        filename = 'test_file_function'
776        fid = open(filename + '.txt', 'w')
777        start = time.mktime(time.strptime('2000', '%Y'))
778        dt = 60  #One minute intervals
779        t = 0.0
780        while t <= finaltime:
781            t_string = time.strftime(time_format, time.gmtime(t+start))
782            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
783            t += dt
784
785        fid.close()
786
787        #Convert ASCII file to NetCDF (Which is what we really like!)
788        timefile2netcdf(filename)       
789
790        a = [0.0, 0.0]
791        b = [4.0, 0.0]
792        c = [0.0, 3.0]
793
794        points = [a, b, c]
795        vertices = [[0,1,2]]
796        domain = Domain(points, vertices)
797
798        #Check that domain.starttime isn't updated if later than file starttime but earlier
799        #than file end time
800        delta = 23
801        domain.starttime = start + delta
802        F = file_function(filename + '.tms', domain,
803                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])       
804        assert allclose(domain.starttime, start+delta)
805
806
807
808
809        #Now try interpolation with delta offset
810        for i in range(20):
811            t = i*10
812            q = F(t-delta)
813
814            #Exact linear intpolation
815            assert allclose(q[0], 2*t)
816            if i%6 == 0:
817                assert allclose(q[1], t**2)
818                assert allclose(q[2], sin(t*pi/600))
819
820        #Check non-exact
821
822        t = 90 #Halfway between 60 and 120
823        q = F(t-delta)
824        assert allclose( (120**2 + 60**2)/2, q[1] )
825        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
826
827
828        t = 100 #Two thirds of the way between between 60 and 120
829        q = F(t-delta)
830        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
831        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
832
833
834        os.remove(filename + '.tms')
835        os.remove(filename + '.txt')               
836
837
838
839    def test_apply_expression_to_dictionary(self):
840
841        #FIXME: Division is not expected to work for integers.
842        #This must be caught.
843        foo = array([[1,2,3],
844                     [4,5,6]], Float)
845
846        bar = array([[-1,0,5],
847                     [6,1,1]], Float)                 
848
849        D = {'X': foo, 'Y': bar}
850
851        Z = apply_expression_to_dictionary('X+Y', D)       
852        assert allclose(Z, foo+bar)
853
854        Z = apply_expression_to_dictionary('X*Y', D)       
855        assert allclose(Z, foo*bar)       
856
857        Z = apply_expression_to_dictionary('4*X+Y', D)       
858        assert allclose(Z, 4*foo+bar)       
859
860        # test zero division is OK
861        Z = apply_expression_to_dictionary('X/Y', D)
862        assert allclose(1/Z, 1/(foo/bar)) # can't compare inf to inf
863
864        # make an error for zero on zero
865        # this is really an error in Numeric, SciPy core can handle it
866        # Z = apply_expression_to_dictionary('0/Y', D)
867
868        #Check exceptions
869        try:
870            #Wrong name
871            Z = apply_expression_to_dictionary('4*X+A', D)       
872        except NameError:
873            pass
874        else:
875            msg = 'Should have raised a NameError Exception'
876            raise msg
877
878
879        try:
880            #Wrong order
881            Z = apply_expression_to_dictionary(D, '4*X+A')       
882        except AssertionError:
883            pass
884        else:
885            msg = 'Should have raised a AssertionError Exception'
886            raise msg       
887       
888
889    def test_multiple_replace(self):
890        """Hard test that checks a true word-by-word simultaneous replace
891        """
892       
893        D = {'x': 'xi', 'y': 'eta', 'xi':'lam'}
894        exp = '3*x+y + xi'
895       
896        new = multiple_replace(exp, D)
897       
898        assert new == '3*xi+eta + lam'
899                         
900
901
902    def test_point_on_line_obsolete(self):
903        """Test that obsolete call issues appropriate warning"""
904
905        #Turn warning into an exception
906        import warnings
907        warnings.filterwarnings('error')
908
909        try:
910            assert point_on_line( 0, 0.5, 0,1, 0,0 )
911        except DeprecationWarning:
912            pass
913        else:
914            msg = 'point_on_line should have issued a DeprecationWarning'
915            raise Exception(msg)   
916
917        warnings.resetwarnings()                         
918                         
919
920#-------------------------------------------------------------
921if __name__ == "__main__":
922    suite = unittest.makeSuite(Test_Util,'test')
923    #suite = unittest.makeSuite(Test_Util,'test_apply')
924    runner = unittest.TextTestRunner()
925    runner.run(suite)
926
927
928
929
Note: See TracBrowser for help on using the repository browser.