source: inundation/pyvolution/test_util.py @ 2689

Last change on this file since 2689 was 2679, checked in by duncan, 18 years ago

checking in so I can run some tests

File size: 37.4 KB
Line 
1#!/usr/bin/env python
2
3
4import unittest
5from Numeric import zeros, array, allclose, Float
6from math import sqrt, pi
7import tempfile
8
9from util import *
10from config import epsilon
11from data_manager import timefile2netcdf
12
13from utilities.numerical_tools import INF
14
15def test_function(x, y):
16    return x+y
17
18class Test_Util(unittest.TestCase):
19    def setUp(self):
20        pass
21
22    def tearDown(self):
23        pass
24
25
26
27
28    #Geometric
29    #def test_distance(self):
30    #    from util import distance#
31    #
32    #    self.failUnless( distance([4,2],[7,6]) == 5.0,
33    #                     'Distance is wrong!')
34    #    self.failUnless( allclose(distance([7,6],[9,8]), 2.82842712475),
35    #                    'distance is wrong!')
36    #    self.failUnless( allclose(distance([9,8],[4,2]), 7.81024967591),
37    #                    'distance is wrong!')
38    #
39    #    self.failUnless( distance([9,8],[4,2]) == distance([4,2],[9,8]),
40    #                    'distance is wrong!')
41
42
43    def test_file_function_time1(self):
44        """Test that File function interpolates correctly
45        between given times. No x,y dependency here.
46        """
47
48        #Write file
49        import os, time
50        from config import time_format
51        from math import sin, pi
52
53        #Typical ASCII file
54        finaltime = 1200
55        filename = 'test_file_function'
56        fid = open(filename + '.txt', 'w')
57        start = time.mktime(time.strptime('2000', '%Y'))
58        dt = 60  #One minute intervals
59        t = 0.0
60        while t <= finaltime:
61            t_string = time.strftime(time_format, time.gmtime(t+start))
62            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
63            t += dt
64
65        fid.close()
66
67        #Convert ASCII file to NetCDF (Which is what we really like!)
68        timefile2netcdf(filename)
69
70
71        #Create file function from time series
72        F = file_function(filename + '.tms',
73                          quantities = ['Attribute0',
74                                        'Attribute1',
75                                        'Attribute2'])
76       
77        #Now try interpolation
78        for i in range(20):
79            t = i*10
80            q = F(t)
81
82            #Exact linear intpolation
83            assert allclose(q[0], 2*t)
84            if i%6 == 0:
85                assert allclose(q[1], t**2)
86                assert allclose(q[2], sin(t*pi/600))
87
88        #Check non-exact
89
90        t = 90 #Halfway between 60 and 120
91        q = F(t)
92        assert allclose( (120**2 + 60**2)/2, q[1] )
93        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
94
95
96        t = 100 #Two thirds of the way between between 60 and 120
97        q = F(t)
98        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
99        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
100
101        os.remove(filename + '.txt')
102        os.remove(filename + '.tms')       
103
104
105       
106    def test_spatio_temporal_file_function(self):
107        """Test that spatio temporal file function performs the correct
108        interpolations in both time and space
109        NetCDF version (x,y,t dependency)       
110        """
111        import time
112
113        #Create sww file of simple propagation from left to right
114        #through rectangular domain
115        from shallow_water import Domain, Dirichlet_boundary
116        from mesh_factory import rectangular
117        from Numeric import take, concatenate, reshape
118
119        #Create basic mesh and shallow water domain
120        points, vertices, boundary = rectangular(3, 3)
121        domain1 = Domain(points, vertices, boundary)
122
123        from utilities.numerical_tools import mean
124        domain1.reduction = mean
125        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
126                              # only one value.
127
128        domain1.default_order = 2
129        domain1.store = True
130        domain1.set_datadir('.')
131        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
132        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
133
134        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
135        domain1.set_quantity('elevation', 0)
136        domain1.set_quantity('friction', 0)
137        domain1.set_quantity('stage', 0)
138
139        # Boundary conditions
140        B0 = Dirichlet_boundary([0,0,0])
141        B6 = Dirichlet_boundary([0.6,0,0])
142        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
143        domain1.check_integrity()
144
145        finaltime = 8
146        #Evolution
147        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
148            pass
149            #domain1.write_time()
150
151
152        #Now read data from sww and check
153        from Scientific.IO.NetCDF import NetCDFFile
154        filename = domain1.get_name() + '.' + domain1.format
155        fid = NetCDFFile(filename)
156
157        x = fid.variables['x'][:]
158        y = fid.variables['y'][:]
159        stage = fid.variables['stage'][:]
160        xmomentum = fid.variables['xmomentum'][:]
161        ymomentum = fid.variables['ymomentum'][:]
162        time = fid.variables['time'][:]
163
164        #Take stage vertex values at last timestep on diagonal
165        #Diagonal is identified by vertices: 0, 5, 10, 15
166
167        timestep = len(time)-1 #Last timestep
168        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
169        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
170        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
171        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
172
173        #Reference interpolated values at midpoints on diagonal at
174        #this timestep are
175        r0 = (D[0] + D[1])/2
176        r1 = (D[1] + D[2])/2
177        r2 = (D[2] + D[3])/2
178
179        #And the midpoints are found now
180        Dx = take(reshape(x, (16,1)), [0,5,10,15])
181        Dy = take(reshape(y, (16,1)), [0,5,10,15])
182
183        diag = concatenate( (Dx, Dy), axis=1)
184        d_midpoints = (diag[1:] + diag[:-1])/2
185
186        #Let us see if the file function can find the correct
187        #values at the midpoints at the last timestep:
188        f = file_function(filename, domain1,
189                          interpolation_points = d_midpoints)
190
191        q = f(timestep/10., point_id=0); assert allclose(r0, q)
192        q = f(timestep/10., point_id=1); assert allclose(r1, q)
193        q = f(timestep/10., point_id=2); assert allclose(r2, q)
194
195
196        ##################
197        #Now do the same for the first timestep
198
199        timestep = 0 #First timestep
200        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
201        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
202        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
203        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
204
205        #Reference interpolated values at midpoints on diagonal at
206        #this timestep are
207        r0 = (D[0] + D[1])/2
208        r1 = (D[1] + D[2])/2
209        r2 = (D[2] + D[3])/2
210
211        #Let us see if the file function can find the correct
212        #values
213        q = f(0, point_id=0); assert allclose(r0, q)
214        q = f(0, point_id=1); assert allclose(r1, q)
215        q = f(0, point_id=2); assert allclose(r2, q)
216
217
218        ##################
219        #Now do it again for a timestep in the middle
220
221        timestep = 33
222        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
223        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
224        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
225        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
226
227        #Reference interpolated values at midpoints on diagonal at
228        #this timestep are
229        r0 = (D[0] + D[1])/2
230        r1 = (D[1] + D[2])/2
231        r2 = (D[2] + D[3])/2
232
233        q = f(timestep/10., point_id=0); assert allclose(r0, q)
234        q = f(timestep/10., point_id=1); assert allclose(r1, q)
235        q = f(timestep/10., point_id=2); assert allclose(r2, q)
236
237
238        ##################
239        #Now check temporal interpolation
240        #Halfway between timestep 15 and 16
241
242        timestep = 15
243        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
244        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
245        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
246        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
247
248        #Reference interpolated values at midpoints on diagonal at
249        #this timestep are
250        r0_0 = (D[0] + D[1])/2
251        r1_0 = (D[1] + D[2])/2
252        r2_0 = (D[2] + D[3])/2
253
254        #
255        timestep = 16
256        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
257        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
258        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
259        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
260
261        #Reference interpolated values at midpoints on diagonal at
262        #this timestep are
263        r0_1 = (D[0] + D[1])/2
264        r1_1 = (D[1] + D[2])/2
265        r2_1 = (D[2] + D[3])/2
266
267        # The reference values are
268        r0 = (r0_0 + r0_1)/2
269        r1 = (r1_0 + r1_1)/2
270        r2 = (r2_0 + r2_1)/2
271
272        q = f((timestep - 0.5)/10., point_id=0); assert allclose(r0, q)
273        q = f((timestep - 0.5)/10., point_id=1); assert allclose(r1, q)
274        q = f((timestep - 0.5)/10., point_id=2); assert allclose(r2, q)
275
276        ##################
277        #Finally check interpolation 2 thirds of the way
278        #between timestep 15 and 16
279
280        # The reference values are
281        r0 = (r0_0 + 2*r0_1)/3
282        r1 = (r1_0 + 2*r1_1)/3
283        r2 = (r2_0 + 2*r2_1)/3
284
285        #And the file function gives
286        q = f((timestep - 1.0/3)/10., point_id=0); assert allclose(r0, q)
287        q = f((timestep - 1.0/3)/10., point_id=1); assert allclose(r1, q)
288        q = f((timestep - 1.0/3)/10., point_id=2); assert allclose(r2, q)
289
290        fid.close()
291        import os
292        os.remove(filename)
293
294
295
296    def test_spatio_temporal_file_function_different_origin(self):
297        """Test that spatio temporal file function performs the correct
298        interpolations in both time and space where space is offset by
299        xllcorner and yllcorner
300        NetCDF version (x,y,t dependency)       
301        """
302        import time
303
304        #Create sww file of simple propagation from left to right
305        #through rectangular domain
306        from shallow_water import Domain, Dirichlet_boundary
307        from mesh_factory import rectangular
308        from Numeric import take, concatenate, reshape
309
310
311        from coordinate_transforms.geo_reference import Geo_reference
312        xllcorner = 2048
313        yllcorner = 11000
314        zone = 2
315
316        #Create basic mesh and shallow water domain
317        points, vertices, boundary = rectangular(3, 3)
318        domain1 = Domain(points, vertices, boundary,
319                         geo_reference = Geo_reference(xllcorner = xllcorner,
320                                                       yllcorner = yllcorner))
321       
322
323        from utilities.numerical_tools import mean     
324        domain1.reduction = mean
325        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
326                              # only one value.
327
328        domain1.default_order = 2
329        domain1.store = True
330        domain1.set_datadir('.')
331        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
332        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
333
334        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
335        domain1.set_quantity('elevation', 0)
336        domain1.set_quantity('friction', 0)
337        domain1.set_quantity('stage', 0)
338
339        # Boundary conditions
340        B0 = Dirichlet_boundary([0,0,0])
341        B6 = Dirichlet_boundary([0.6,0,0])
342        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
343        domain1.check_integrity()
344
345        finaltime = 8
346        #Evolution
347        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
348            pass
349            #domain1.write_time()
350
351
352        #Now read data from sww and check
353        from Scientific.IO.NetCDF import NetCDFFile
354        filename = domain1.get_name() + '.' + domain1.format
355        fid = NetCDFFile(filename)
356
357        x = fid.variables['x'][:]
358        y = fid.variables['y'][:]
359        stage = fid.variables['stage'][:]
360        xmomentum = fid.variables['xmomentum'][:]
361        ymomentum = fid.variables['ymomentum'][:]
362        time = fid.variables['time'][:]
363
364        #Take stage vertex values at last timestep on diagonal
365        #Diagonal is identified by vertices: 0, 5, 10, 15
366
367        timestep = len(time)-1 #Last timestep
368        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
369        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
370        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
371        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
372
373        #Reference interpolated values at midpoints on diagonal at
374        #this timestep are
375        r0 = (D[0] + D[1])/2
376        r1 = (D[1] + D[2])/2
377        r2 = (D[2] + D[3])/2
378
379        #And the midpoints are found now
380        Dx = take(reshape(x, (16,1)), [0,5,10,15])
381        Dy = take(reshape(y, (16,1)), [0,5,10,15])
382
383        diag = concatenate( (Dx, Dy), axis=1)
384        d_midpoints = (diag[1:] + diag[:-1])/2
385
386
387        #Adjust for georef - make interpolation points absolute
388        d_midpoints[:,0] += xllcorner
389        d_midpoints[:,1] += yllcorner               
390
391        #Let us see if the file function can find the correct
392        #values at the midpoints at the last timestep:
393        f = file_function(filename, domain1,
394                          interpolation_points = d_midpoints)
395
396        q = f(timestep/10., point_id=0); assert allclose(r0, q)
397        q = f(timestep/10., point_id=1); assert allclose(r1, q)
398        q = f(timestep/10., point_id=2); assert allclose(r2, q)
399
400
401        ##################
402        #Now do the same for the first timestep
403
404        timestep = 0 #First timestep
405        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
406        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
407        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
408        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
409
410        #Reference interpolated values at midpoints on diagonal at
411        #this timestep are
412        r0 = (D[0] + D[1])/2
413        r1 = (D[1] + D[2])/2
414        r2 = (D[2] + D[3])/2
415
416        #Let us see if the file function can find the correct
417        #values
418        q = f(0, point_id=0); assert allclose(r0, q)
419        q = f(0, point_id=1); assert allclose(r1, q)
420        q = f(0, point_id=2); assert allclose(r2, q)
421
422
423        ##################
424        #Now do it again for a timestep in the middle
425
426        timestep = 33
427        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
428        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
429        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
430        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
431
432        #Reference interpolated values at midpoints on diagonal at
433        #this timestep are
434        r0 = (D[0] + D[1])/2
435        r1 = (D[1] + D[2])/2
436        r2 = (D[2] + D[3])/2
437
438        q = f(timestep/10., point_id=0); assert allclose(r0, q)
439        q = f(timestep/10., point_id=1); assert allclose(r1, q)
440        q = f(timestep/10., point_id=2); assert allclose(r2, q)
441
442
443        ##################
444        #Now check temporal interpolation
445        #Halfway between timestep 15 and 16
446
447        timestep = 15
448        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
449        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
450        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
451        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
452
453        #Reference interpolated values at midpoints on diagonal at
454        #this timestep are
455        r0_0 = (D[0] + D[1])/2
456        r1_0 = (D[1] + D[2])/2
457        r2_0 = (D[2] + D[3])/2
458
459        #
460        timestep = 16
461        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
462        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
463        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
464        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
465
466        #Reference interpolated values at midpoints on diagonal at
467        #this timestep are
468        r0_1 = (D[0] + D[1])/2
469        r1_1 = (D[1] + D[2])/2
470        r2_1 = (D[2] + D[3])/2
471
472        # The reference values are
473        r0 = (r0_0 + r0_1)/2
474        r1 = (r1_0 + r1_1)/2
475        r2 = (r2_0 + r2_1)/2
476
477        q = f((timestep - 0.5)/10., point_id=0); assert allclose(r0, q)
478        q = f((timestep - 0.5)/10., point_id=1); assert allclose(r1, q)
479        q = f((timestep - 0.5)/10., point_id=2); assert allclose(r2, q)
480
481        ##################
482        #Finally check interpolation 2 thirds of the way
483        #between timestep 15 and 16
484
485        # The reference values are
486        r0 = (r0_0 + 2*r0_1)/3
487        r1 = (r1_0 + 2*r1_1)/3
488        r2 = (r2_0 + 2*r2_1)/3
489
490        #And the file function gives
491        q = f((timestep - 1.0/3)/10., point_id=0); assert allclose(r0, q)
492        q = f((timestep - 1.0/3)/10., point_id=1); assert allclose(r1, q)
493        q = f((timestep - 1.0/3)/10., point_id=2); assert allclose(r2, q)
494
495        fid.close()
496        import os
497        os.remove(filename)
498
499       
500
501
502    def qtest_spatio_temporal_file_function_time(self):
503        """Test that File function interpolates correctly
504        between given times.
505        NetCDF version (x,y,t dependency)
506        """
507
508        #Create NetCDF (sww) file to be read
509        # x: 0, 5, 10, 15
510        # y: -20, -10, 0, 10
511        # t: 0, 60, 120, ...., 1200
512        #
513        # test quantities (arbitrary but non-trivial expressions):
514        #
515        #   stage     = 3*x - y**2 + 2*t
516        #   xmomentum = exp( -((x-7)**2 + (y+5)**2)/20 ) * t**2
517        #   ymomentum = x**2 + y**2 * sin(t*pi/600)
518
519        #NOTE: Nice test that may render some of the others redundant.
520
521        import os, time
522        from config import time_format
523        from Numeric import sin, pi, exp
524        from mesh_factory import rectangular
525        from shallow_water import Domain
526        import data_manager
527
528        finaltime = 1200
529        filename = 'test_file_function'
530
531        #Create a domain to hold test grid
532        #(0:15, -20:10)
533        points, vertices, boundary =\
534                rectangular(4, 4, 15, 30, origin = (0, -20))
535        print "points", points
536
537        #print 'Number of elements', len(vertices)
538        domain = Domain(points, vertices, boundary)
539        domain.smooth = False
540        domain.default_order = 2
541        domain.set_datadir('.')
542        domain.set_name(filename)
543        domain.store = True
544        domain.format = 'sww'   #Native netcdf visualisation format
545
546        #print points
547        start = time.mktime(time.strptime('2000', '%Y'))
548        domain.starttime = start
549
550
551        #Store structure
552        domain.initialise_storage()
553
554        #Compute artificial time steps and store
555        dt = 60  #One minute intervals
556        t = 0.0
557        while t <= finaltime:
558            #Compute quantities
559            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
560            domain.set_quantity('stage', f1)
561
562            f2 = lambda x,y: x+y+t**2
563            domain.set_quantity('xmomentum', f2)
564
565            f3 = lambda x,y: x**2 + y**2 * sin(t*pi/600)
566            domain.set_quantity('ymomentum', f3)
567
568            #Store and advance time
569            domain.time = t
570            domain.store_timestep(domain.conserved_quantities)
571            t += dt
572
573
574        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5]]
575     
576        #Deliberately set domain.starttime to too early
577        domain.starttime = start - 1
578
579        #Create file function
580        F = file_function(filename + '.sww', domain,
581                          quantities = domain.conserved_quantities,
582                          interpolation_points = interpolation_points)
583
584        #Check that FF updates fixes domain starttime
585        assert allclose(domain.starttime, start)
586
587        #Check that domain.starttime isn't updated if later
588        domain.starttime = start + 1
589        F = file_function(filename + '.sww', domain,
590                          quantities = domain.conserved_quantities,
591                          interpolation_points = interpolation_points)
592        assert allclose(domain.starttime, start+1)
593        domain.starttime = start
594
595
596        #Check linear interpolation in time
597        F = file_function(filename + '.sww', domain,
598                          quantities = domain.conserved_quantities,
599                          interpolation_points = interpolation_points)               
600        for id in range(len(interpolation_points)):
601            x = interpolation_points[id][0]
602            y = interpolation_points[id][1]
603
604            for i in range(20):
605                t = i*10
606                k = i%6
607
608                if k == 0:
609                    q0 = F(t, point_id=id)
610                    q1 = F(t+60, point_id=id)
611
612                if q0 == INF:
613                    actual = q0
614                else:
615                    actual = (k*q1 + (6-k)*q0)/6
616                q = F(t, point_id=id)
617                #print i, k, t, q
618                #print ' ', q0
619                #print ' ', q1
620                print "q",q
621                print "actual", actual
622                #print
623                if q0 == INF:
624                     self.failUnless( q == actual, 'Fail!')
625                else:
626                    assert allclose(q, actual)
627
628
629        #Another check of linear interpolation in time
630        for id in range(len(interpolation_points)):
631            q60 = F(60, point_id=id)
632            q120 = F(120, point_id=id)
633
634            t = 90 #Halfway between 60 and 120
635            q = F(t, point_id=id)
636            assert allclose( (q120+q60)/2, q )
637
638            t = 100 #Two thirds of the way between between 60 and 120
639            q = F(t, point_id=id)
640            assert allclose(q60/3 + 2*q120/3, q)
641
642
643
644        #Check that domain.starttime isn't updated if later than file starttime but earlier
645        #than file end time
646        delta = 23
647        domain.starttime = start + delta
648        F = file_function(filename + '.sww', domain,
649                          quantities = domain.conserved_quantities,
650                          interpolation_points = interpolation_points)
651        assert allclose(domain.starttime, start+delta)
652
653
654
655
656        #Now try interpolation with delta offset
657        for id in range(len(interpolation_points)):           
658            x = interpolation_points[id][0]
659            y = interpolation_points[id][1]
660
661            for i in range(20):
662                t = i*10
663                k = i%6
664
665                if k == 0:
666                    q0 = F(t-delta, point_id=id)
667                    q1 = F(t+60-delta, point_id=id)
668
669                q = F(t-delta, point_id=id)
670                assert allclose(q, (k*q1 + (6-k)*q0)/6)
671
672
673        os.remove(filename + '.sww')
674
675
676
677    def xtest_spatio_temporal_file_function_time(self):
678        # Test that File function interpolates correctly
679        # When some points are outside the mesh
680
681        import os, time
682        from config import time_format
683        from Numeric import sin, pi, exp
684        from mesh_factory import rectangular
685        from shallow_water import Domain
686        import data_manager 
687        from pmesh.mesh_interface import create_mesh_from_regions
688        finaltime = 1200
689       
690        filename = tempfile.mktemp()
691        #print "filename",filename
692        filename = 'test_file_function'
693
694        meshfilename = tempfile.mktemp(".tsh")
695
696        boundary_tags = {'walls':[0,1],'bom':[2]}
697       
698        polygon_absolute = [[0,-20],[10,-20],[10,15],[-20,15]]
699       
700        create_mesh_from_regions(polygon_absolute,
701                                 boundary_tags,
702                                 10000000,
703                                 filename=meshfilename)
704        domain = Domain(mesh_filename=meshfilename)
705        domain.smooth = False
706        domain.default_order = 2
707        domain.set_datadir('.')
708        domain.set_name(filename)
709        domain.store = True
710        domain.format = 'sww'   #Native netcdf visualisation format
711
712        #print points
713        start = time.mktime(time.strptime('2000', '%Y'))
714        domain.starttime = start
715       
716
717        #Store structure
718        domain.initialise_storage()
719
720        #Compute artificial time steps and store
721        dt = 60  #One minute intervals
722        t = 0.0
723        while t <= finaltime:
724            #Compute quantities
725            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
726            domain.set_quantity('stage', f1)
727
728            f2 = lambda x,y: x+y+t**2
729            domain.set_quantity('xmomentum', f2)
730
731            f3 = lambda x,y: x**2 + y**2 * sin(t*pi/600)
732            domain.set_quantity('ymomentum', f3)
733
734            #Store and advance time
735            domain.time = t
736            domain.store_timestep(domain.conserved_quantities)
737            t += dt
738
739        interpolation_points = [[1,0]]
740        interpolation_points = [[100,1000]]
741       
742        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5],
743                                [78787,78787],[7878,3432]]
744           
745        #Deliberately set domain.starttime to too early
746        domain.starttime = start - 1
747
748        #Create file function
749        F = file_function(filename + '.sww', domain,
750                          quantities = domain.conserved_quantities,
751                          interpolation_points = interpolation_points)
752
753        #Check that FF updates fixes domain starttime
754        assert allclose(domain.starttime, start)
755
756        #Check that domain.starttime isn't updated if later
757        domain.starttime = start + 1
758        F = file_function(filename + '.sww', domain,
759                          quantities = domain.conserved_quantities,
760                          interpolation_points = interpolation_points)
761        assert allclose(domain.starttime, start+1)
762        domain.starttime = start
763
764
765        #Check linear interpolation in time
766        # checking points inside and outside the mesh
767        F = file_function(filename + '.sww', domain,
768                          quantities = domain.conserved_quantities,
769                          interpolation_points = interpolation_points)
770       
771        for id in range(len(interpolation_points)):
772            x = interpolation_points[id][0]
773            y = interpolation_points[id][1]
774
775            for i in range(20):
776                t = i*10
777                k = i%6
778
779                if k == 0:
780                    q0 = F(t, point_id=id)
781                    q1 = F(t+60, point_id=id)
782
783                if q0 == INF:
784                    actual = q0
785                else:
786                    actual = (k*q1 + (6-k)*q0)/6
787                q = F(t, point_id=id)
788                #print i, k, t, q
789                #print ' ', q0
790                #print ' ', q1
791                #print "q",q
792                #print "actual", actual
793                #print
794                if q0 == INF:
795                     self.failUnless( q == actual, 'Fail!')
796                else:
797                    assert allclose(q, actual)
798
799        # now lets check points inside the mesh
800        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14]] #, [10,-12.5]] - this point doesn't work WHY?
801        interpolation_points = [[10,-12.5]]
802           
803        print "len(interpolation_points)",len(interpolation_points) 
804        F = file_function(filename + '.sww', domain,
805                          quantities = domain.conserved_quantities,
806                          interpolation_points = interpolation_points)
807
808        domain.starttime = start
809
810
811        #Check linear interpolation in time
812        F = file_function(filename + '.sww', domain,
813                          quantities = domain.conserved_quantities,
814                          interpolation_points = interpolation_points)               
815        for id in range(len(interpolation_points)):
816            x = interpolation_points[id][0]
817            y = interpolation_points[id][1]
818
819            for i in range(20):
820                t = i*10
821                k = i%6
822
823                if k == 0:
824                    q0 = F(t, point_id=id)
825                    q1 = F(t+60, point_id=id)
826
827                if q0 == INF:
828                    actual = q0
829                else:
830                    actual = (k*q1 + (6-k)*q0)/6
831                q = F(t, point_id=id)
832                print "############"
833                print "id, x, y ", id, x, y #k, t, q
834                print "t", t
835                #print ' ', q0
836                #print ' ', q1
837                print "q",q
838                print "actual", actual
839                #print
840                if q0 == INF:
841                     self.failUnless( q == actual, 'Fail!')
842                else:
843                    assert allclose(q, actual)
844
845
846        #Another check of linear interpolation in time
847        for id in range(len(interpolation_points)):
848            q60 = F(60, point_id=id)
849            q120 = F(120, point_id=id)
850
851            t = 90 #Halfway between 60 and 120
852            q = F(t, point_id=id)
853            assert allclose( (q120+q60)/2, q )
854
855            t = 100 #Two thirds of the way between between 60 and 120
856            q = F(t, point_id=id)
857            assert allclose(q60/3 + 2*q120/3, q)
858
859
860
861        #Check that domain.starttime isn't updated if later than file starttime but earlier
862        #than file end time
863        delta = 23
864        domain.starttime = start + delta
865        F = file_function(filename + '.sww', domain,
866                          quantities = domain.conserved_quantities,
867                          interpolation_points = interpolation_points)
868        assert allclose(domain.starttime, start+delta)
869
870
871
872
873        #Now try interpolation with delta offset
874        for id in range(len(interpolation_points)):           
875            x = interpolation_points[id][0]
876            y = interpolation_points[id][1]
877
878            for i in range(20):
879                t = i*10
880                k = i%6
881
882                if k == 0:
883                    q0 = F(t-delta, point_id=id)
884                    q1 = F(t+60-delta, point_id=id)
885
886                q = F(t-delta, point_id=id)
887                assert allclose(q, (k*q1 + (6-k)*q0)/6)
888
889
890        os.remove(filename + '.sww')
891
892    def test_file_function_time_with_domain(self):
893        """Test that File function interpolates correctly
894        between given times. No x,y dependency here.
895        Use domain with starttime
896        """
897
898        #Write file
899        import os, time, calendar
900        from config import time_format
901        from math import sin, pi
902        from domain import Domain
903
904        finaltime = 1200
905        filename = 'test_file_function'
906        fid = open(filename + '.txt', 'w')
907        start = time.mktime(time.strptime('2000', '%Y'))
908        dt = 60  #One minute intervals
909        t = 0.0
910        while t <= finaltime:
911            t_string = time.strftime(time_format, time.gmtime(t+start))
912            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
913            t += dt
914
915        fid.close()
916
917
918        #Convert ASCII file to NetCDF (Which is what we really like!)
919        timefile2netcdf(filename)
920
921
922
923        a = [0.0, 0.0]
924        b = [4.0, 0.0]
925        c = [0.0, 3.0]
926
927        points = [a, b, c]
928        vertices = [[0,1,2]]
929        domain = Domain(points, vertices)
930
931        #Check that domain.starttime is updated if non-existing
932        F = file_function(filename + '.tms', domain)
933       
934        assert allclose(domain.starttime, start)
935
936        #Check that domain.starttime is updated if too early
937        domain.starttime = start - 1
938        F = file_function(filename + '.tms', domain)
939        assert allclose(domain.starttime, start)
940
941        #Check that domain.starttime isn't updated if later
942        domain.starttime = start + 1
943        F = file_function(filename + '.tms', domain)       
944        assert allclose(domain.starttime, start+1)
945
946        domain.starttime = start
947        F = file_function(filename + '.tms', domain,
948                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])
949       
950
951        #print F.T
952        #print F.precomputed_values
953        #print 'F(60)', F(60)
954       
955        #Now try interpolation
956        for i in range(20):
957            t = i*10
958            q = F(t)
959
960            #Exact linear intpolation
961            assert allclose(q[0], 2*t)
962            if i%6 == 0:
963                assert allclose(q[1], t**2)
964                assert allclose(q[2], sin(t*pi/600))
965
966        #Check non-exact
967
968        t = 90 #Halfway between 60 and 120
969        q = F(t)
970        assert allclose( (120**2 + 60**2)/2, q[1] )
971        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
972
973
974        t = 100 #Two thirds of the way between between 60 and 120
975        q = F(t)
976        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
977        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
978
979        os.remove(filename + '.tms')
980        os.remove(filename + '.txt')       
981
982    def test_file_function_time_with_domain_different_start(self):
983        """Test that File function interpolates correctly
984        between given times. No x,y dependency here.
985        Use domain with a starttime later than that of file
986
987        ASCII version
988        """
989
990        #Write file
991        import os, time, calendar
992        from config import time_format
993        from math import sin, pi
994        from domain import Domain
995
996        finaltime = 1200
997        filename = 'test_file_function'
998        fid = open(filename + '.txt', 'w')
999        start = time.mktime(time.strptime('2000', '%Y'))
1000        dt = 60  #One minute intervals
1001        t = 0.0
1002        while t <= finaltime:
1003            t_string = time.strftime(time_format, time.gmtime(t+start))
1004            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
1005            t += dt
1006
1007        fid.close()
1008
1009        #Convert ASCII file to NetCDF (Which is what we really like!)
1010        timefile2netcdf(filename)       
1011
1012        a = [0.0, 0.0]
1013        b = [4.0, 0.0]
1014        c = [0.0, 3.0]
1015
1016        points = [a, b, c]
1017        vertices = [[0,1,2]]
1018        domain = Domain(points, vertices)
1019
1020        #Check that domain.starttime isn't updated if later than file starttime but earlier
1021        #than file end time
1022        delta = 23
1023        domain.starttime = start + delta
1024        F = file_function(filename + '.tms', domain,
1025                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])       
1026        assert allclose(domain.starttime, start+delta)
1027
1028
1029
1030
1031        #Now try interpolation with delta offset
1032        for i in range(20):
1033            t = i*10
1034            q = F(t-delta)
1035
1036            #Exact linear intpolation
1037            assert allclose(q[0], 2*t)
1038            if i%6 == 0:
1039                assert allclose(q[1], t**2)
1040                assert allclose(q[2], sin(t*pi/600))
1041
1042        #Check non-exact
1043
1044        t = 90 #Halfway between 60 and 120
1045        q = F(t-delta)
1046        assert allclose( (120**2 + 60**2)/2, q[1] )
1047        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
1048
1049
1050        t = 100 #Two thirds of the way between between 60 and 120
1051        q = F(t-delta)
1052        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
1053        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
1054
1055
1056        os.remove(filename + '.tms')
1057        os.remove(filename + '.txt')               
1058
1059
1060
1061    def test_apply_expression_to_dictionary(self):
1062
1063        #FIXME: Division is not expected to work for integers.
1064        #This must be caught.
1065        foo = array([[1,2,3],
1066                     [4,5,6]], Float)
1067
1068        bar = array([[-1,0,5],
1069                     [6,1,1]], Float)                 
1070
1071        D = {'X': foo, 'Y': bar}
1072
1073        Z = apply_expression_to_dictionary('X+Y', D)       
1074        assert allclose(Z, foo+bar)
1075
1076        Z = apply_expression_to_dictionary('X*Y', D)       
1077        assert allclose(Z, foo*bar)       
1078
1079        Z = apply_expression_to_dictionary('4*X+Y', D)       
1080        assert allclose(Z, 4*foo+bar)       
1081
1082        # test zero division is OK
1083        Z = apply_expression_to_dictionary('X/Y', D)
1084        assert allclose(1/Z, 1/(foo/bar)) # can't compare inf to inf
1085
1086        # make an error for zero on zero
1087        # this is really an error in Numeric, SciPy core can handle it
1088        # Z = apply_expression_to_dictionary('0/Y', D)
1089
1090        #Check exceptions
1091        try:
1092            #Wrong name
1093            Z = apply_expression_to_dictionary('4*X+A', D)       
1094        except NameError:
1095            pass
1096        else:
1097            msg = 'Should have raised a NameError Exception'
1098            raise msg
1099
1100
1101        try:
1102            #Wrong order
1103            Z = apply_expression_to_dictionary(D, '4*X+A')       
1104        except AssertionError:
1105            pass
1106        else:
1107            msg = 'Should have raised a AssertionError Exception'
1108            raise msg       
1109       
1110
1111    def test_multiple_replace(self):
1112        """Hard test that checks a true word-by-word simultaneous replace
1113        """
1114       
1115        D = {'x': 'xi', 'y': 'eta', 'xi':'lam'}
1116        exp = '3*x+y + xi'
1117       
1118        new = multiple_replace(exp, D)
1119       
1120        assert new == '3*xi+eta + lam'
1121                         
1122
1123
1124    def test_point_on_line_obsolete(self):
1125        """Test that obsolete call issues appropriate warning"""
1126
1127        #Turn warning into an exception
1128        import warnings
1129        warnings.filterwarnings('error')
1130
1131        try:
1132            assert point_on_line( 0, 0.5, 0,1, 0,0 )
1133        except DeprecationWarning:
1134            pass
1135        else:
1136            msg = 'point_on_line should have issued a DeprecationWarning'
1137            raise Exception(msg)   
1138
1139        warnings.resetwarnings()                         
1140                         
1141
1142#-------------------------------------------------------------
1143if __name__ == "__main__":
1144    suite = unittest.makeSuite(Test_Util,'test')
1145    #suite = unittest.makeSuite(Test_Util,'test_apply')
1146    runner = unittest.TextTestRunner()
1147    runner.run(suite)
1148
1149
1150
1151
Note: See TracBrowser for help on using the repository browser.