source: inundation/pyvolution/test_util.py @ 2852

Last change on this file since 2852 was 2852, checked in by ole, 19 years ago

Nailed the problem with tests failing when fixing duplicate timestep issue.

File size: 37.7 KB
Line 
1#!/usr/bin/env python
2
3
4import unittest
5from Numeric import zeros, array, allclose, Float
6from math import sqrt, pi
7import tempfile
8
9from util import *
10from config import epsilon
11from data_manager import timefile2netcdf
12
13from utilities.numerical_tools import INF
14
15def test_function(x, y):
16    return x+y
17
18class Test_Util(unittest.TestCase):
19    def setUp(self):
20        pass
21
22    def tearDown(self):
23        pass
24
25
26
27
28    #Geometric
29    #def test_distance(self):
30    #    from util import distance#
31    #
32    #    self.failUnless( distance([4,2],[7,6]) == 5.0,
33    #                     'Distance is wrong!')
34    #    self.failUnless( allclose(distance([7,6],[9,8]), 2.82842712475),
35    #                    'distance is wrong!')
36    #    self.failUnless( allclose(distance([9,8],[4,2]), 7.81024967591),
37    #                    'distance is wrong!')
38    #
39    #    self.failUnless( distance([9,8],[4,2]) == distance([4,2],[9,8]),
40    #                    'distance is wrong!')
41
42
43    def test_file_function_time1(self):
44        """Test that File function interpolates correctly
45        between given times. No x,y dependency here.
46        """
47
48        #Write file
49        import os, time
50        from config import time_format
51        from math import sin, pi
52
53        #Typical ASCII file
54        finaltime = 1200
55        filename = 'test_file_function'
56        fid = open(filename + '.txt', 'w')
57        start = time.mktime(time.strptime('2000', '%Y'))
58        dt = 60  #One minute intervals
59        t = 0.0
60        while t <= finaltime:
61            t_string = time.strftime(time_format, time.gmtime(t+start))
62            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
63            t += dt
64
65        fid.close()
66
67        #Convert ASCII file to NetCDF (Which is what we really like!)
68        timefile2netcdf(filename)
69
70
71        #Create file function from time series
72        F = file_function(filename + '.tms',
73                          quantities = ['Attribute0',
74                                        'Attribute1',
75                                        'Attribute2'])
76       
77        #Now try interpolation
78        for i in range(20):
79            t = i*10
80            q = F(t)
81
82            #Exact linear intpolation
83            assert allclose(q[0], 2*t)
84            if i%6 == 0:
85                assert allclose(q[1], t**2)
86                assert allclose(q[2], sin(t*pi/600))
87
88        #Check non-exact
89
90        t = 90 #Halfway between 60 and 120
91        q = F(t)
92        assert allclose( (120**2 + 60**2)/2, q[1] )
93        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
94
95
96        t = 100 #Two thirds of the way between between 60 and 120
97        q = F(t)
98        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
99        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
100
101        os.remove(filename + '.txt')
102        os.remove(filename + '.tms')       
103
104
105       
106    def test_spatio_temporal_file_function_basic(self):
107        """Test that spatio temporal file function performs the correct
108        interpolations in both time and space
109        NetCDF version (x,y,t dependency)       
110        """
111        import time
112
113        #Create sww file of simple propagation from left to right
114        #through rectangular domain
115        from shallow_water import Domain, Dirichlet_boundary
116        from mesh_factory import rectangular
117        from Numeric import take, concatenate, reshape
118
119        #Create basic mesh and shallow water domain
120        points, vertices, boundary = rectangular(3, 3)
121        domain1 = Domain(points, vertices, boundary)
122
123        from utilities.numerical_tools import mean
124        domain1.reduction = mean
125        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
126                              # only one value.
127
128        domain1.default_order = 2
129        domain1.store = True
130        domain1.set_datadir('.')
131        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
132        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
133
134        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
135        domain1.set_quantity('elevation', 0)
136        domain1.set_quantity('friction', 0)
137        domain1.set_quantity('stage', 0)
138
139        # Boundary conditions
140        B0 = Dirichlet_boundary([0,0,0])
141        B6 = Dirichlet_boundary([0.6,0,0])
142        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
143        domain1.check_integrity()
144
145        finaltime = 8
146        #Evolution
147        t0 = -1
148        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
149            #print 'Timesteps: %.16f, %.16f' %(t0, t)
150            #if t == t0:
151            #    msg = 'Duplicate timestep found: %f, %f' %(t0, t)
152            #   raise msg
153            t0 = t
154             
155            #domain1.write_time()
156
157
158        #Now read data from sww and check
159        from Scientific.IO.NetCDF import NetCDFFile
160        filename = domain1.get_name() + '.' + domain1.format
161        fid = NetCDFFile(filename)
162
163        x = fid.variables['x'][:]
164        y = fid.variables['y'][:]
165        stage = fid.variables['stage'][:]
166        xmomentum = fid.variables['xmomentum'][:]
167        ymomentum = fid.variables['ymomentum'][:]
168        time = fid.variables['time'][:]
169
170        #Take stage vertex values at last timestep on diagonal
171        #Diagonal is identified by vertices: 0, 5, 10, 15
172
173        last_time_index = len(time)-1 #Last last_time_index
174        d_stage = reshape(take(stage[last_time_index, :], [0,5,10,15]), (4,1))
175        d_uh = reshape(take(xmomentum[last_time_index, :], [0,5,10,15]), (4,1))
176        d_vh = reshape(take(ymomentum[last_time_index, :], [0,5,10,15]), (4,1))
177        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
178
179        #Reference interpolated values at midpoints on diagonal at
180        #this timestep are
181        r0 = (D[0] + D[1])/2
182        r1 = (D[1] + D[2])/2
183        r2 = (D[2] + D[3])/2
184
185        #And the midpoints are found now
186        Dx = take(reshape(x, (16,1)), [0,5,10,15])
187        Dy = take(reshape(y, (16,1)), [0,5,10,15])
188
189        diag = concatenate( (Dx, Dy), axis=1)
190        d_midpoints = (diag[1:] + diag[:-1])/2
191
192        #Let us see if the file function can find the correct
193        #values at the midpoints at the last timestep:
194        f = file_function(filename, domain1,
195                          interpolation_points = d_midpoints)
196
197        msg = 'duplicate timesteps: %.16f and %.16f' %(f.T[-1], f.T[-2])
198        assert not f.T[-1] == f.T[-2], msg
199        t = time[last_time_index]
200        q = f(t, point_id=0); assert allclose(r0, q)
201        q = f(t, point_id=1); assert allclose(r1, q)
202        q = f(t, point_id=2); assert allclose(r2, q)
203
204
205        ##################
206        #Now do the same for the first timestep
207
208        timestep = 0 #First timestep
209        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
210        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
211        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
212        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
213
214        #Reference interpolated values at midpoints on diagonal at
215        #this timestep are
216        r0 = (D[0] + D[1])/2
217        r1 = (D[1] + D[2])/2
218        r2 = (D[2] + D[3])/2
219
220        #Let us see if the file function can find the correct
221        #values
222        q = f(0, point_id=0); assert allclose(r0, q)
223        q = f(0, point_id=1); assert allclose(r1, q)
224        q = f(0, point_id=2); assert allclose(r2, q)
225
226
227        ##################
228        #Now do it again for a timestep in the middle
229
230        timestep = 33
231        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
232        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
233        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
234        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
235
236        #Reference interpolated values at midpoints on diagonal at
237        #this timestep are
238        r0 = (D[0] + D[1])/2
239        r1 = (D[1] + D[2])/2
240        r2 = (D[2] + D[3])/2
241
242        q = f(timestep/10., point_id=0); assert allclose(r0, q)
243        q = f(timestep/10., point_id=1); assert allclose(r1, q)
244        q = f(timestep/10., point_id=2); assert allclose(r2, q)
245
246
247        ##################
248        #Now check temporal interpolation
249        #Halfway between timestep 15 and 16
250
251        timestep = 15
252        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
253        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
254        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
255        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
256
257        #Reference interpolated values at midpoints on diagonal at
258        #this timestep are
259        r0_0 = (D[0] + D[1])/2
260        r1_0 = (D[1] + D[2])/2
261        r2_0 = (D[2] + D[3])/2
262
263        #
264        timestep = 16
265        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
266        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
267        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
268        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
269
270        #Reference interpolated values at midpoints on diagonal at
271        #this timestep are
272        r0_1 = (D[0] + D[1])/2
273        r1_1 = (D[1] + D[2])/2
274        r2_1 = (D[2] + D[3])/2
275
276        # The reference values are
277        r0 = (r0_0 + r0_1)/2
278        r1 = (r1_0 + r1_1)/2
279        r2 = (r2_0 + r2_1)/2
280
281        q = f((timestep - 0.5)/10., point_id=0); assert allclose(r0, q)
282        q = f((timestep - 0.5)/10., point_id=1); assert allclose(r1, q)
283        q = f((timestep - 0.5)/10., point_id=2); assert allclose(r2, q)
284
285        ##################
286        #Finally check interpolation 2 thirds of the way
287        #between timestep 15 and 16
288
289        # The reference values are
290        r0 = (r0_0 + 2*r0_1)/3
291        r1 = (r1_0 + 2*r1_1)/3
292        r2 = (r2_0 + 2*r2_1)/3
293
294        #And the file function gives
295        q = f((timestep - 1.0/3)/10., point_id=0); assert allclose(r0, q)
296        q = f((timestep - 1.0/3)/10., point_id=1); assert allclose(r1, q)
297        q = f((timestep - 1.0/3)/10., point_id=2); assert allclose(r2, q)
298
299        fid.close()
300        import os
301        os.remove(filename)
302
303
304
305    def test_spatio_temporal_file_function_different_origin(self):
306        """Test that spatio temporal file function performs the correct
307        interpolations in both time and space where space is offset by
308        xllcorner and yllcorner
309        NetCDF version (x,y,t dependency)       
310        """
311        import time
312
313        #Create sww file of simple propagation from left to right
314        #through rectangular domain
315        from shallow_water import Domain, Dirichlet_boundary
316        from mesh_factory import rectangular
317        from Numeric import take, concatenate, reshape
318
319
320        from coordinate_transforms.geo_reference import Geo_reference
321        xllcorner = 2048
322        yllcorner = 11000
323        zone = 2
324
325        #Create basic mesh and shallow water domain
326        points, vertices, boundary = rectangular(3, 3)
327        domain1 = Domain(points, vertices, boundary,
328                         geo_reference = Geo_reference(xllcorner = xllcorner,
329                                                       yllcorner = yllcorner))
330       
331
332        from utilities.numerical_tools import mean     
333        domain1.reduction = mean
334        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
335                              # only one value.
336
337        domain1.default_order = 2
338        domain1.store = True
339        domain1.set_datadir('.')
340        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
341        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
342
343        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
344        domain1.set_quantity('elevation', 0)
345        domain1.set_quantity('friction', 0)
346        domain1.set_quantity('stage', 0)
347
348        # Boundary conditions
349        B0 = Dirichlet_boundary([0,0,0])
350        B6 = Dirichlet_boundary([0.6,0,0])
351        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
352        domain1.check_integrity()
353
354        finaltime = 8
355        #Evolution
356        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
357            pass
358            #domain1.write_time()
359
360
361        #Now read data from sww and check
362        from Scientific.IO.NetCDF import NetCDFFile
363        filename = domain1.get_name() + '.' + domain1.format
364        fid = NetCDFFile(filename)
365
366        x = fid.variables['x'][:]
367        y = fid.variables['y'][:]
368        stage = fid.variables['stage'][:]
369        xmomentum = fid.variables['xmomentum'][:]
370        ymomentum = fid.variables['ymomentum'][:]
371        time = fid.variables['time'][:]
372
373        #Take stage vertex values at last timestep on diagonal
374        #Diagonal is identified by vertices: 0, 5, 10, 15
375
376        last_time_index = len(time)-1 #Last last_time_index     
377        d_stage = reshape(take(stage[last_time_index, :], [0,5,10,15]), (4,1))
378        d_uh = reshape(take(xmomentum[last_time_index, :], [0,5,10,15]), (4,1))
379        d_vh = reshape(take(ymomentum[last_time_index, :], [0,5,10,15]), (4,1))
380        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
381
382        #Reference interpolated values at midpoints on diagonal at
383        #this timestep are
384        r0 = (D[0] + D[1])/2
385        r1 = (D[1] + D[2])/2
386        r2 = (D[2] + D[3])/2
387
388        #And the midpoints are found now
389        Dx = take(reshape(x, (16,1)), [0,5,10,15])
390        Dy = take(reshape(y, (16,1)), [0,5,10,15])
391
392        diag = concatenate( (Dx, Dy), axis=1)
393        d_midpoints = (diag[1:] + diag[:-1])/2
394
395
396        #Adjust for georef - make interpolation points absolute
397        d_midpoints[:,0] += xllcorner
398        d_midpoints[:,1] += yllcorner               
399
400        #Let us see if the file function can find the correct
401        #values at the midpoints at the last timestep:
402        f = file_function(filename, domain1,
403                          interpolation_points = d_midpoints)
404
405        t = time[last_time_index]                         
406        q = f(t, point_id=0); assert allclose(r0, q)
407        q = f(t, point_id=1); assert allclose(r1, q)
408        q = f(t, point_id=2); assert allclose(r2, q)
409
410
411        ##################
412        #Now do the same for the first timestep
413
414        timestep = 0 #First timestep
415        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
416        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
417        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
418        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
419
420        #Reference interpolated values at midpoints on diagonal at
421        #this timestep are
422        r0 = (D[0] + D[1])/2
423        r1 = (D[1] + D[2])/2
424        r2 = (D[2] + D[3])/2
425
426        #Let us see if the file function can find the correct
427        #values
428        q = f(0, point_id=0); assert allclose(r0, q)
429        q = f(0, point_id=1); assert allclose(r1, q)
430        q = f(0, point_id=2); assert allclose(r2, q)
431
432
433        ##################
434        #Now do it again for a timestep in the middle
435
436        timestep = 33
437        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
438        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
439        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
440        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
441
442        #Reference interpolated values at midpoints on diagonal at
443        #this timestep are
444        r0 = (D[0] + D[1])/2
445        r1 = (D[1] + D[2])/2
446        r2 = (D[2] + D[3])/2
447
448        q = f(timestep/10., point_id=0); assert allclose(r0, q)
449        q = f(timestep/10., point_id=1); assert allclose(r1, q)
450        q = f(timestep/10., point_id=2); assert allclose(r2, q)
451
452
453        ##################
454        #Now check temporal interpolation
455        #Halfway between timestep 15 and 16
456
457        timestep = 15
458        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
459        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
460        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
461        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
462
463        #Reference interpolated values at midpoints on diagonal at
464        #this timestep are
465        r0_0 = (D[0] + D[1])/2
466        r1_0 = (D[1] + D[2])/2
467        r2_0 = (D[2] + D[3])/2
468
469        #
470        timestep = 16
471        d_stage = reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
472        d_uh = reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
473        d_vh = reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
474        D = concatenate( (d_stage, d_uh, d_vh), axis=1)
475
476        #Reference interpolated values at midpoints on diagonal at
477        #this timestep are
478        r0_1 = (D[0] + D[1])/2
479        r1_1 = (D[1] + D[2])/2
480        r2_1 = (D[2] + D[3])/2
481
482        # The reference values are
483        r0 = (r0_0 + r0_1)/2
484        r1 = (r1_0 + r1_1)/2
485        r2 = (r2_0 + r2_1)/2
486
487        q = f((timestep - 0.5)/10., point_id=0); assert allclose(r0, q)
488        q = f((timestep - 0.5)/10., point_id=1); assert allclose(r1, q)
489        q = f((timestep - 0.5)/10., point_id=2); assert allclose(r2, q)
490
491        ##################
492        #Finally check interpolation 2 thirds of the way
493        #between timestep 15 and 16
494
495        # The reference values are
496        r0 = (r0_0 + 2*r0_1)/3
497        r1 = (r1_0 + 2*r1_1)/3
498        r2 = (r2_0 + 2*r2_1)/3
499
500        #And the file function gives
501        q = f((timestep - 1.0/3)/10., point_id=0); assert allclose(r0, q)
502        q = f((timestep - 1.0/3)/10., point_id=1); assert allclose(r1, q)
503        q = f((timestep - 1.0/3)/10., point_id=2); assert allclose(r2, q)
504
505        fid.close()
506        import os
507        os.remove(filename)
508
509       
510
511
512    def qtest_spatio_temporal_file_function_time(self):
513        """Test that File function interpolates correctly
514        between given times.
515        NetCDF version (x,y,t dependency)
516        """
517
518        #Create NetCDF (sww) file to be read
519        # x: 0, 5, 10, 15
520        # y: -20, -10, 0, 10
521        # t: 0, 60, 120, ...., 1200
522        #
523        # test quantities (arbitrary but non-trivial expressions):
524        #
525        #   stage     = 3*x - y**2 + 2*t
526        #   xmomentum = exp( -((x-7)**2 + (y+5)**2)/20 ) * t**2
527        #   ymomentum = x**2 + y**2 * sin(t*pi/600)
528
529        #NOTE: Nice test that may render some of the others redundant.
530
531        import os, time
532        from config import time_format
533        from Numeric import sin, pi, exp
534        from mesh_factory import rectangular
535        from shallow_water import Domain
536        import data_manager
537
538        finaltime = 1200
539        filename = 'test_file_function'
540
541        #Create a domain to hold test grid
542        #(0:15, -20:10)
543        points, vertices, boundary =\
544                rectangular(4, 4, 15, 30, origin = (0, -20))
545        print "points", points
546
547        #print 'Number of elements', len(vertices)
548        domain = Domain(points, vertices, boundary)
549        domain.smooth = False
550        domain.default_order = 2
551        domain.set_datadir('.')
552        domain.set_name(filename)
553        domain.store = True
554        domain.format = 'sww'   #Native netcdf visualisation format
555
556        #print points
557        start = time.mktime(time.strptime('2000', '%Y'))
558        domain.starttime = start
559
560
561        #Store structure
562        domain.initialise_storage()
563
564        #Compute artificial time steps and store
565        dt = 60  #One minute intervals
566        t = 0.0
567        while t <= finaltime:
568            #Compute quantities
569            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
570            domain.set_quantity('stage', f1)
571
572            f2 = lambda x,y: x+y+t**2
573            domain.set_quantity('xmomentum', f2)
574
575            f3 = lambda x,y: x**2 + y**2 * sin(t*pi/600)
576            domain.set_quantity('ymomentum', f3)
577
578            #Store and advance time
579            domain.time = t
580            domain.store_timestep(domain.conserved_quantities)
581            t += dt
582
583
584        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5]]
585     
586        #Deliberately set domain.starttime to too early
587        domain.starttime = start - 1
588
589        #Create file function
590        F = file_function(filename + '.sww', domain,
591                          quantities = domain.conserved_quantities,
592                          interpolation_points = interpolation_points)
593
594        #Check that FF updates fixes domain starttime
595        assert allclose(domain.starttime, start)
596
597        #Check that domain.starttime isn't updated if later
598        domain.starttime = start + 1
599        F = file_function(filename + '.sww', domain,
600                          quantities = domain.conserved_quantities,
601                          interpolation_points = interpolation_points)
602        assert allclose(domain.starttime, start+1)
603        domain.starttime = start
604
605
606        #Check linear interpolation in time
607        F = file_function(filename + '.sww', domain,
608                          quantities = domain.conserved_quantities,
609                          interpolation_points = interpolation_points)               
610        for id in range(len(interpolation_points)):
611            x = interpolation_points[id][0]
612            y = interpolation_points[id][1]
613
614            for i in range(20):
615                t = i*10
616                k = i%6
617
618                if k == 0:
619                    q0 = F(t, point_id=id)
620                    q1 = F(t+60, point_id=id)
621
622                if q0 == INF:
623                    actual = q0
624                else:
625                    actual = (k*q1 + (6-k)*q0)/6
626                q = F(t, point_id=id)
627                #print i, k, t, q
628                #print ' ', q0
629                #print ' ', q1
630                print "q",q
631                print "actual", actual
632                #print
633                if q0 == INF:
634                     self.failUnless( q == actual, 'Fail!')
635                else:
636                    assert allclose(q, actual)
637
638
639        #Another check of linear interpolation in time
640        for id in range(len(interpolation_points)):
641            q60 = F(60, point_id=id)
642            q120 = F(120, point_id=id)
643
644            t = 90 #Halfway between 60 and 120
645            q = F(t, point_id=id)
646            assert allclose( (q120+q60)/2, q )
647
648            t = 100 #Two thirds of the way between between 60 and 120
649            q = F(t, point_id=id)
650            assert allclose(q60/3 + 2*q120/3, q)
651
652
653
654        #Check that domain.starttime isn't updated if later than file starttime but earlier
655        #than file end time
656        delta = 23
657        domain.starttime = start + delta
658        F = file_function(filename + '.sww', domain,
659                          quantities = domain.conserved_quantities,
660                          interpolation_points = interpolation_points)
661        assert allclose(domain.starttime, start+delta)
662
663
664
665
666        #Now try interpolation with delta offset
667        for id in range(len(interpolation_points)):           
668            x = interpolation_points[id][0]
669            y = interpolation_points[id][1]
670
671            for i in range(20):
672                t = i*10
673                k = i%6
674
675                if k == 0:
676                    q0 = F(t-delta, point_id=id)
677                    q1 = F(t+60-delta, point_id=id)
678
679                q = F(t-delta, point_id=id)
680                assert allclose(q, (k*q1 + (6-k)*q0)/6)
681
682
683        os.remove(filename + '.sww')
684
685
686
687    def xtest_spatio_temporal_file_function_time(self):
688        # Test that File function interpolates correctly
689        # When some points are outside the mesh
690
691        import os, time
692        from config import time_format
693        from Numeric import sin, pi, exp
694        from mesh_factory import rectangular
695        from shallow_water import Domain
696        import data_manager 
697        from pmesh.mesh_interface import create_mesh_from_regions
698        finaltime = 1200
699       
700        filename = tempfile.mktemp()
701        #print "filename",filename
702        filename = 'test_file_function'
703
704        meshfilename = tempfile.mktemp(".tsh")
705
706        boundary_tags = {'walls':[0,1],'bom':[2]}
707       
708        polygon_absolute = [[0,-20],[10,-20],[10,15],[-20,15]]
709       
710        create_mesh_from_regions(polygon_absolute,
711                                 boundary_tags,
712                                 10000000,
713                                 filename=meshfilename)
714        domain = Domain(mesh_filename=meshfilename)
715        domain.smooth = False
716        domain.default_order = 2
717        domain.set_datadir('.')
718        domain.set_name(filename)
719        domain.store = True
720        domain.format = 'sww'   #Native netcdf visualisation format
721
722        #print points
723        start = time.mktime(time.strptime('2000', '%Y'))
724        domain.starttime = start
725       
726
727        #Store structure
728        domain.initialise_storage()
729
730        #Compute artificial time steps and store
731        dt = 60  #One minute intervals
732        t = 0.0
733        while t <= finaltime:
734            #Compute quantities
735            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
736            domain.set_quantity('stage', f1)
737
738            f2 = lambda x,y: x+y+t**2
739            domain.set_quantity('xmomentum', f2)
740
741            f3 = lambda x,y: x**2 + y**2 * sin(t*pi/600)
742            domain.set_quantity('ymomentum', f3)
743
744            #Store and advance time
745            domain.time = t
746            domain.store_timestep(domain.conserved_quantities)
747            t += dt
748
749        interpolation_points = [[1,0]]
750        interpolation_points = [[100,1000]]
751       
752        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5],
753                                [78787,78787],[7878,3432]]
754           
755        #Deliberately set domain.starttime to too early
756        domain.starttime = start - 1
757
758        #Create file function
759        F = file_function(filename + '.sww', domain,
760                          quantities = domain.conserved_quantities,
761                          interpolation_points = interpolation_points)
762
763        #Check that FF updates fixes domain starttime
764        assert allclose(domain.starttime, start)
765
766        #Check that domain.starttime isn't updated if later
767        domain.starttime = start + 1
768        F = file_function(filename + '.sww', domain,
769                          quantities = domain.conserved_quantities,
770                          interpolation_points = interpolation_points)
771        assert allclose(domain.starttime, start+1)
772        domain.starttime = start
773
774
775        #Check linear interpolation in time
776        # checking points inside and outside the mesh
777        F = file_function(filename + '.sww', domain,
778                          quantities = domain.conserved_quantities,
779                          interpolation_points = interpolation_points)
780       
781        for id in range(len(interpolation_points)):
782            x = interpolation_points[id][0]
783            y = interpolation_points[id][1]
784
785            for i in range(20):
786                t = i*10
787                k = i%6
788
789                if k == 0:
790                    q0 = F(t, point_id=id)
791                    q1 = F(t+60, point_id=id)
792
793                if q0 == INF:
794                    actual = q0
795                else:
796                    actual = (k*q1 + (6-k)*q0)/6
797                q = F(t, point_id=id)
798                #print i, k, t, q
799                #print ' ', q0
800                #print ' ', q1
801                #print "q",q
802                #print "actual", actual
803                #print
804                if q0 == INF:
805                     self.failUnless( q == actual, 'Fail!')
806                else:
807                    assert allclose(q, actual)
808
809        # now lets check points inside the mesh
810        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14]] #, [10,-12.5]] - this point doesn't work WHY?
811        interpolation_points = [[10,-12.5]]
812           
813        print "len(interpolation_points)",len(interpolation_points) 
814        F = file_function(filename + '.sww', domain,
815                          quantities = domain.conserved_quantities,
816                          interpolation_points = interpolation_points)
817
818        domain.starttime = start
819
820
821        #Check linear interpolation in time
822        F = file_function(filename + '.sww', domain,
823                          quantities = domain.conserved_quantities,
824                          interpolation_points = interpolation_points)               
825        for id in range(len(interpolation_points)):
826            x = interpolation_points[id][0]
827            y = interpolation_points[id][1]
828
829            for i in range(20):
830                t = i*10
831                k = i%6
832
833                if k == 0:
834                    q0 = F(t, point_id=id)
835                    q1 = F(t+60, point_id=id)
836
837                if q0 == INF:
838                    actual = q0
839                else:
840                    actual = (k*q1 + (6-k)*q0)/6
841                q = F(t, point_id=id)
842                print "############"
843                print "id, x, y ", id, x, y #k, t, q
844                print "t", t
845                #print ' ', q0
846                #print ' ', q1
847                print "q",q
848                print "actual", actual
849                #print
850                if q0 == INF:
851                     self.failUnless( q == actual, 'Fail!')
852                else:
853                    assert allclose(q, actual)
854
855
856        #Another check of linear interpolation in time
857        for id in range(len(interpolation_points)):
858            q60 = F(60, point_id=id)
859            q120 = F(120, point_id=id)
860
861            t = 90 #Halfway between 60 and 120
862            q = F(t, point_id=id)
863            assert allclose( (q120+q60)/2, q )
864
865            t = 100 #Two thirds of the way between between 60 and 120
866            q = F(t, point_id=id)
867            assert allclose(q60/3 + 2*q120/3, q)
868
869
870
871        #Check that domain.starttime isn't updated if later than file starttime but earlier
872        #than file end time
873        delta = 23
874        domain.starttime = start + delta
875        F = file_function(filename + '.sww', domain,
876                          quantities = domain.conserved_quantities,
877                          interpolation_points = interpolation_points)
878        assert allclose(domain.starttime, start+delta)
879
880
881
882
883        #Now try interpolation with delta offset
884        for id in range(len(interpolation_points)):           
885            x = interpolation_points[id][0]
886            y = interpolation_points[id][1]
887
888            for i in range(20):
889                t = i*10
890                k = i%6
891
892                if k == 0:
893                    q0 = F(t-delta, point_id=id)
894                    q1 = F(t+60-delta, point_id=id)
895
896                q = F(t-delta, point_id=id)
897                assert allclose(q, (k*q1 + (6-k)*q0)/6)
898
899
900        os.remove(filename + '.sww')
901
902    def test_file_function_time_with_domain(self):
903        """Test that File function interpolates correctly
904        between given times. No x,y dependency here.
905        Use domain with starttime
906        """
907
908        #Write file
909        import os, time, calendar
910        from config import time_format
911        from math import sin, pi
912        from domain import Domain
913
914        finaltime = 1200
915        filename = 'test_file_function'
916        fid = open(filename + '.txt', 'w')
917        start = time.mktime(time.strptime('2000', '%Y'))
918        dt = 60  #One minute intervals
919        t = 0.0
920        while t <= finaltime:
921            t_string = time.strftime(time_format, time.gmtime(t+start))
922            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
923            t += dt
924
925        fid.close()
926
927
928        #Convert ASCII file to NetCDF (Which is what we really like!)
929        timefile2netcdf(filename)
930
931
932
933        a = [0.0, 0.0]
934        b = [4.0, 0.0]
935        c = [0.0, 3.0]
936
937        points = [a, b, c]
938        vertices = [[0,1,2]]
939        domain = Domain(points, vertices)
940
941        #Check that domain.starttime is updated if non-existing
942        F = file_function(filename + '.tms', domain)
943       
944        assert allclose(domain.starttime, start)
945
946        #Check that domain.starttime is updated if too early
947        domain.starttime = start - 1
948        F = file_function(filename + '.tms', domain)
949        assert allclose(domain.starttime, start)
950
951        #Check that domain.starttime isn't updated if later
952        domain.starttime = start + 1
953        F = file_function(filename + '.tms', domain)       
954        assert allclose(domain.starttime, start+1)
955
956        domain.starttime = start
957        F = file_function(filename + '.tms', domain,
958                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])
959       
960
961        #print F.T
962        #print F.precomputed_values
963        #print 'F(60)', F(60)
964       
965        #Now try interpolation
966        for i in range(20):
967            t = i*10
968            q = F(t)
969
970            #Exact linear intpolation
971            assert allclose(q[0], 2*t)
972            if i%6 == 0:
973                assert allclose(q[1], t**2)
974                assert allclose(q[2], sin(t*pi/600))
975
976        #Check non-exact
977
978        t = 90 #Halfway between 60 and 120
979        q = F(t)
980        assert allclose( (120**2 + 60**2)/2, q[1] )
981        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
982
983
984        t = 100 #Two thirds of the way between between 60 and 120
985        q = F(t)
986        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
987        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
988
989        os.remove(filename + '.tms')
990        os.remove(filename + '.txt')       
991
992    def test_file_function_time_with_domain_different_start(self):
993        """Test that File function interpolates correctly
994        between given times. No x,y dependency here.
995        Use domain with a starttime later than that of file
996
997        ASCII version
998        """
999
1000        #Write file
1001        import os, time, calendar
1002        from config import time_format
1003        from math import sin, pi
1004        from domain import Domain
1005
1006        finaltime = 1200
1007        filename = 'test_file_function'
1008        fid = open(filename + '.txt', 'w')
1009        start = time.mktime(time.strptime('2000', '%Y'))
1010        dt = 60  #One minute intervals
1011        t = 0.0
1012        while t <= finaltime:
1013            t_string = time.strftime(time_format, time.gmtime(t+start))
1014            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
1015            t += dt
1016
1017        fid.close()
1018
1019        #Convert ASCII file to NetCDF (Which is what we really like!)
1020        timefile2netcdf(filename)       
1021
1022        a = [0.0, 0.0]
1023        b = [4.0, 0.0]
1024        c = [0.0, 3.0]
1025
1026        points = [a, b, c]
1027        vertices = [[0,1,2]]
1028        domain = Domain(points, vertices)
1029
1030        #Check that domain.starttime isn't updated if later than file starttime but earlier
1031        #than file end time
1032        delta = 23
1033        domain.starttime = start + delta
1034        F = file_function(filename + '.tms', domain,
1035                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])       
1036        assert allclose(domain.starttime, start+delta)
1037
1038
1039
1040
1041        #Now try interpolation with delta offset
1042        for i in range(20):
1043            t = i*10
1044            q = F(t-delta)
1045
1046            #Exact linear intpolation
1047            assert allclose(q[0], 2*t)
1048            if i%6 == 0:
1049                assert allclose(q[1], t**2)
1050                assert allclose(q[2], sin(t*pi/600))
1051
1052        #Check non-exact
1053
1054        t = 90 #Halfway between 60 and 120
1055        q = F(t-delta)
1056        assert allclose( (120**2 + 60**2)/2, q[1] )
1057        assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
1058
1059
1060        t = 100 #Two thirds of the way between between 60 and 120
1061        q = F(t-delta)
1062        assert allclose( 2*120**2/3 + 60**2/3, q[1] )
1063        assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
1064
1065
1066        os.remove(filename + '.tms')
1067        os.remove(filename + '.txt')               
1068
1069
1070
1071    def test_apply_expression_to_dictionary(self):
1072
1073        #FIXME: Division is not expected to work for integers.
1074        #This must be caught.
1075        foo = array([[1,2,3],
1076                     [4,5,6]], Float)
1077
1078        bar = array([[-1,0,5],
1079                     [6,1,1]], Float)                 
1080
1081        D = {'X': foo, 'Y': bar}
1082
1083        Z = apply_expression_to_dictionary('X+Y', D)       
1084        assert allclose(Z, foo+bar)
1085
1086        Z = apply_expression_to_dictionary('X*Y', D)       
1087        assert allclose(Z, foo*bar)       
1088
1089        Z = apply_expression_to_dictionary('4*X+Y', D)       
1090        assert allclose(Z, 4*foo+bar)       
1091
1092        # test zero division is OK
1093        Z = apply_expression_to_dictionary('X/Y', D)
1094        assert allclose(1/Z, 1/(foo/bar)) # can't compare inf to inf
1095
1096        # make an error for zero on zero
1097        # this is really an error in Numeric, SciPy core can handle it
1098        # Z = apply_expression_to_dictionary('0/Y', D)
1099
1100        #Check exceptions
1101        try:
1102            #Wrong name
1103            Z = apply_expression_to_dictionary('4*X+A', D)       
1104        except NameError:
1105            pass
1106        else:
1107            msg = 'Should have raised a NameError Exception'
1108            raise msg
1109
1110
1111        try:
1112            #Wrong order
1113            Z = apply_expression_to_dictionary(D, '4*X+A')       
1114        except AssertionError:
1115            pass
1116        else:
1117            msg = 'Should have raised a AssertionError Exception'
1118            raise msg       
1119       
1120
1121    def test_multiple_replace(self):
1122        """Hard test that checks a true word-by-word simultaneous replace
1123        """
1124       
1125        D = {'x': 'xi', 'y': 'eta', 'xi':'lam'}
1126        exp = '3*x+y + xi'
1127       
1128        new = multiple_replace(exp, D)
1129       
1130        assert new == '3*xi+eta + lam'
1131                         
1132
1133
1134    def test_point_on_line_obsolete(self):
1135        """Test that obsolete call issues appropriate warning"""
1136
1137        #Turn warning into an exception
1138        import warnings
1139        warnings.filterwarnings('error')
1140
1141        try:
1142            assert point_on_line( 0, 0.5, 0,1, 0,0 )
1143        except DeprecationWarning:
1144            pass
1145        else:
1146            msg = 'point_on_line should have issued a DeprecationWarning'
1147            raise Exception(msg)   
1148
1149        warnings.resetwarnings()                         
1150                         
1151
1152#-------------------------------------------------------------
1153if __name__ == "__main__":
1154    suite = unittest.makeSuite(Test_Util,'test')
1155    #suite = unittest.makeSuite(Test_Util,'test_apply')
1156    runner = unittest.TextTestRunner()
1157    runner.run(suite)
1158
1159
1160
1161
Note: See TracBrowser for help on using the repository browser.