source: anuga_core/source_numpy_conversion/anuga/abstract_2d_finite_volumes/test_util.py @ 5903

Last change on this file since 5903 was 5903, checked in by rwilson, 15 years ago

NumPy? conversion.

File size: 63.1 KB
Line 
1#!/usr/bin/env python
2
3import unittest
4import numpy
5
6from math import sqrt, pi
7import tempfile, os
8from os import access, F_OK,sep, removedirs,remove,mkdir,getcwd
9
10from anuga.abstract_2d_finite_volumes.util import *
11from anuga.config import epsilon
12from anuga.shallow_water.data_manager import timefile2netcdf,del_dir
13
14from anuga.utilities.numerical_tools import NAN
15
16from sys import platform
17
18from anuga.pmesh.mesh import Mesh
19from anuga.shallow_water import Domain, Transmissive_boundary
20from anuga.shallow_water.data_manager import get_dataobject
21from csv import reader,writer
22import time
23import string
24
25def test_function(x, y):
26    return x+y
27
28class Test_Util(unittest.TestCase):
29    def setUp(self):
30        pass
31
32    def tearDown(self):
33        pass
34
35
36
37
38    #Geometric
39    #def test_distance(self):
40    #    from anuga.abstract_2d_finite_volumes.util import distance#
41    #
42    #    self.failUnless( distance([4,2],[7,6]) == 5.0,
43    #                     'Distance is wrong!')
44    #    self.failUnless( allclose(distance([7,6],[9,8]), 2.82842712475),
45    #                    'distance is wrong!')
46    #    self.failUnless( allclose(distance([9,8],[4,2]), 7.81024967591),
47    #                    'distance is wrong!')
48    #
49    #    self.failUnless( distance([9,8],[4,2]) == distance([4,2],[9,8]),
50    #                    'distance is wrong!')
51
52
53    def test_file_function_time1(self):
54        """Test that File function interpolates correctly
55        between given times. No x,y dependency here.
56        """
57
58        #Write file
59        import os, time
60        from anuga.config import time_format
61        from math import sin, pi
62
63        #Typical ASCII file
64        finaltime = 1200
65        filename = 'test_file_function'
66        fid = open(filename + '.txt', 'w')
67        start = time.mktime(time.strptime('2000', '%Y'))
68        dt = 60  #One minute intervals
69        t = 0.0
70        while t <= finaltime:
71            t_string = time.strftime(time_format, time.gmtime(t+start))
72            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
73            t += dt
74
75        fid.close()
76
77        #Convert ASCII file to NetCDF (Which is what we really like!)
78        timefile2netcdf(filename)
79
80
81        #Create file function from time series
82        F = file_function(filename + '.tms',
83                          quantities = ['Attribute0',
84                                        'Attribute1',
85                                        'Attribute2'])
86       
87        #Now try interpolation
88        for i in range(20):
89            t = i*10
90            q = F(t)
91
92            #Exact linear intpolation
93            assert numpy.allclose(q[0], 2*t)
94            if i%6 == 0:
95                assert numpy.allclose(q[1], t**2)
96                assert numpy.allclose(q[2], sin(t*pi/600))
97
98        #Check non-exact
99
100        t = 90 #Halfway between 60 and 120
101        q = F(t)
102        assert numpy.allclose( (120**2 + 60**2)/2, q[1] )
103        assert numpy.allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
104
105
106        t = 100 #Two thirds of the way between between 60 and 120
107        q = F(t)
108        assert numpy.allclose( 2*120**2/3 + 60**2/3, q[1] )
109        assert numpy.allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
110
111        os.remove(filename + '.txt')
112        os.remove(filename + '.tms')       
113
114
115       
116    def test_spatio_temporal_file_function_basic(self):
117        """Test that spatio temporal file function performs the correct
118        interpolations in both time and space
119        NetCDF version (x,y,t dependency)       
120        """
121        import time
122
123        #Create sww file of simple propagation from left to right
124        #through rectangular domain
125        from shallow_water import Domain, Dirichlet_boundary
126        from mesh_factory import rectangular
127
128        #Create basic mesh and shallow water domain
129        points, vertices, boundary = rectangular(3, 3)
130        domain1 = Domain(points, vertices, boundary)
131
132        from anuga.utilities.numerical_tools import mean
133        domain1.reduction = mean
134        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
135                              # only one value.
136
137        domain1.default_order = 2
138        domain1.store = True
139        domain1.set_datadir('.')
140        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
141        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
142
143        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
144        domain1.set_quantity('elevation', 0)
145        domain1.set_quantity('friction', 0)
146        domain1.set_quantity('stage', 0)
147
148        # Boundary conditions
149        B0 = Dirichlet_boundary([0,0,0])
150        B6 = Dirichlet_boundary([0.6,0,0])
151        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
152        domain1.check_integrity()
153
154        finaltime = 8
155        #Evolution
156        t0 = -1
157# crash at following line
158        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
159            #print 'Timesteps: %.16f, %.16f' %(t0, t)
160            #if t == t0:
161            #    msg = 'Duplicate timestep found: %f, %f' %(t0, t)
162            #   raise msg
163            t0 = t
164             
165            #domain1.write_time()
166
167
168        #Now read data from sww and check
169        from Scientific.IO.NetCDF import NetCDFFile
170        filename = domain1.get_name() + '.' + domain1.format
171        fid = NetCDFFile(filename)
172
173        x = fid.variables['x'][:]
174        y = fid.variables['y'][:]
175        stage = fid.variables['stage'][:]
176        xmomentum = fid.variables['xmomentum'][:]
177        ymomentum = fid.variables['ymomentum'][:]
178        time = fid.variables['time'][:]
179
180        #Take stage vertex values at last timestep on diagonal
181        #Diagonal is identified by vertices: 0, 5, 10, 15
182
183        last_time_index = len(time)-1 #Last last_time_index
184        d_stage = numpy.reshape(numpy.take(stage[last_time_index, :], [0,5,10,15], axis=0), (4,1))
185        d_uh = numpy.reshape(numpy.take(xmomentum[last_time_index, :], [0,5,10,15], axis=0), (4,1))
186        d_vh = numpy.reshape(numpy.take(ymomentum[last_time_index, :], [0,5,10,15], axis=0), (4,1))
187        D = numpy.concatenate( (d_stage, d_uh, d_vh), axis=1)
188
189        #Reference interpolated values at midpoints on diagonal at
190        #this timestep are
191        r0 = (D[0] + D[1])/2
192        r1 = (D[1] + D[2])/2
193        r2 = (D[2] + D[3])/2
194
195        #And the midpoints are found now
196        Dx = numpy.take(numpy.reshape(x, (16,1)), [0,5,10,15], axis=0)
197        Dy = numpy.take(numpy.reshape(y, (16,1)), [0,5,10,15], axis=0)
198
199        diag = numpy.concatenate( (Dx, Dy), axis=1)
200        d_midpoints = (diag[1:] + diag[:-1])/2
201
202        #Let us see if the file function can find the correct
203        #values at the midpoints at the last timestep:
204        f = file_function(filename, domain1,
205                          interpolation_points = d_midpoints)
206
207        T = f.get_time()
208        msg = 'duplicate timesteps: %.16f and %.16f' %(T[-1], T[-2])
209        assert not T[-1] == T[-2], msg
210        t = time[last_time_index]
211        q = f(t, point_id=0); assert numpy.allclose(r0, q)
212        q = f(t, point_id=1); assert numpy.allclose(r1, q)
213        q = f(t, point_id=2); assert numpy.allclose(r2, q)
214
215
216        ##################
217        #Now do the same for the first timestep
218
219        timestep = 0 #First timestep
220        d_stage = numpy.reshape(numpy.take(stage[timestep, :], [0,5,10,15], axis=0), (4,1))
221        d_uh = numpy.reshape(numpy.take(xmomentum[timestep, :], [0,5,10,15], axis=0), (4,1))
222        d_vh = numpy.reshape(numpy.take(ymomentum[timestep, :], [0,5,10,15], axis=0), (4,1))
223        D = numpy.concatenate( (d_stage, d_uh, d_vh), axis=1)
224
225        #Reference interpolated values at midpoints on diagonal at
226        #this timestep are
227        r0 = (D[0] + D[1])/2
228        r1 = (D[1] + D[2])/2
229        r2 = (D[2] + D[3])/2
230
231        #Let us see if the file function can find the correct
232        #values
233        q = f(0, point_id=0); assert numpy.allclose(r0, q)
234        q = f(0, point_id=1); assert numpy.allclose(r1, q)
235        q = f(0, point_id=2); assert numpy.allclose(r2, q)
236
237
238        ##################
239        #Now do it again for a timestep in the middle
240
241        timestep = 33
242        d_stage = numpy.reshape(numpy.take(stage[timestep, :], [0,5,10,15], axis=0), (4,1))
243        d_uh = numpy.reshape(numpy.take(xmomentum[timestep, :], [0,5,10,15], axis=0), (4,1))
244        d_vh = numpy.reshape(numpy.take(ymomentum[timestep, :], [0,5,10,15], axis=0), (4,1))
245        D = numpy.concatenate( (d_stage, d_uh, d_vh), axis=1)
246
247        #Reference interpolated values at midpoints on diagonal at
248        #this timestep are
249        r0 = (D[0] + D[1])/2
250        r1 = (D[1] + D[2])/2
251        r2 = (D[2] + D[3])/2
252
253        q = f(timestep/10., point_id=0); assert numpy.allclose(r0, q)
254        q = f(timestep/10., point_id=1); assert numpy.allclose(r1, q)
255        q = f(timestep/10., point_id=2); assert numpy.allclose(r2, q)
256
257
258        ##################
259        #Now check temporal interpolation
260        #Halfway between timestep 15 and 16
261
262        timestep = 15
263        d_stage = numpy.reshape(numpy.take(stage[timestep, :], [0,5,10,15], axis=0), (4,1))
264        d_uh = numpy.reshape(numpy.take(xmomentum[timestep, :], [0,5,10,15], axis=0), (4,1))
265        d_vh = numpy.reshape(numpy.take(ymomentum[timestep, :], [0,5,10,15], axis=0), (4,1))
266        D = numpy.concatenate( (d_stage, d_uh, d_vh), axis=1)
267
268        #Reference interpolated values at midpoints on diagonal at
269        #this timestep are
270        r0_0 = (D[0] + D[1])/2
271        r1_0 = (D[1] + D[2])/2
272        r2_0 = (D[2] + D[3])/2
273
274        #
275        timestep = 16
276        d_stage = numpy.reshape(numpy.take(stage[timestep, :], [0,5,10,15], axis=0), (4,1))
277        d_uh = numpy.reshape(numpy.take(xmomentum[timestep, :], [0,5,10,15], axis=0), (4,1))
278        d_vh = numpy.reshape(numpy.take(ymomentum[timestep, :], [0,5,10,15], axis=0), (4,1))
279        D = numpy.concatenate( (d_stage, d_uh, d_vh), axis=1)
280
281        #Reference interpolated values at midpoints on diagonal at
282        #this timestep are
283        r0_1 = (D[0] + D[1])/2
284        r1_1 = (D[1] + D[2])/2
285        r2_1 = (D[2] + D[3])/2
286
287        # The reference values are
288        r0 = (r0_0 + r0_1)/2
289        r1 = (r1_0 + r1_1)/2
290        r2 = (r2_0 + r2_1)/2
291
292        q = f((timestep - 0.5)/10., point_id=0); assert numpy.allclose(r0, q)
293        q = f((timestep - 0.5)/10., point_id=1); assert numpy.allclose(r1, q)
294        q = f((timestep - 0.5)/10., point_id=2); assert numpy.allclose(r2, q)
295
296        ##################
297        #Finally check interpolation 2 thirds of the way
298        #between timestep 15 and 16
299
300        # The reference values are
301        r0 = (r0_0 + 2*r0_1)/3
302        r1 = (r1_0 + 2*r1_1)/3
303        r2 = (r2_0 + 2*r2_1)/3
304
305        #And the file function gives
306        q = f((timestep - 1.0/3)/10., point_id=0); assert numpy.allclose(r0, q)
307        q = f((timestep - 1.0/3)/10., point_id=1); assert numpy.allclose(r1, q)
308        q = f((timestep - 1.0/3)/10., point_id=2); assert numpy.allclose(r2, q)
309
310        fid.close()
311        import os
312        os.remove(filename)
313
314
315
316    def test_spatio_temporal_file_function_different_origin(self):
317        """Test that spatio temporal file function performs the correct
318        interpolations in both time and space where space is offset by
319        xllcorner and yllcorner
320        NetCDF version (x,y,t dependency)       
321        """
322        import time
323
324        #Create sww file of simple propagation from left to right
325        #through rectangular domain
326        from shallow_water import Domain, Dirichlet_boundary
327        from mesh_factory import rectangular
328
329
330        from anuga.coordinate_transforms.geo_reference import Geo_reference
331        xllcorner = 2048
332        yllcorner = 11000
333        zone = 2
334
335        #Create basic mesh and shallow water domain
336        points, vertices, boundary = rectangular(3, 3)
337        domain1 = Domain(points, vertices, boundary,
338                         geo_reference = Geo_reference(xllcorner = xllcorner,
339                                                       yllcorner = yllcorner))
340       
341
342        from anuga.utilities.numerical_tools import mean       
343        domain1.reduction = mean
344        domain1.smooth = True #NOTE: Mimic sww output where each vertex has
345                              # only one value.
346
347        domain1.default_order = 2
348        domain1.store = True
349        domain1.set_datadir('.')
350        domain1.set_name('spatio_temporal_boundary_source_%d' %(id(self)))
351        domain1.quantities_to_be_stored = ['stage', 'xmomentum', 'ymomentum']
352
353        #Bed-slope, friction and IC at vertices (and interpolated elsewhere)
354        domain1.set_quantity('elevation', 0)
355        domain1.set_quantity('friction', 0)
356        domain1.set_quantity('stage', 0)
357
358        # Boundary conditions
359        B0 = Dirichlet_boundary([0,0,0])
360        B6 = Dirichlet_boundary([0.6,0,0])
361        domain1.set_boundary({'left': B6, 'top': B6, 'right': B0, 'bottom': B0})
362        domain1.check_integrity()
363
364        finaltime = 8
365        #Evolution
366        for t in domain1.evolve(yieldstep = 0.1, finaltime = finaltime):
367            pass
368            #domain1.write_time()
369
370
371        #Now read data from sww and check
372        from Scientific.IO.NetCDF import NetCDFFile
373        filename = domain1.get_name() + '.' + domain1.format
374        fid = NetCDFFile(filename)
375
376        x = fid.variables['x'][:]
377        y = fid.variables['y'][:]
378        stage = fid.variables['stage'][:]
379        xmomentum = fid.variables['xmomentum'][:]
380        ymomentum = fid.variables['ymomentum'][:]
381        time = fid.variables['time'][:]
382
383        #Take stage vertex values at last timestep on diagonal
384        #Diagonal is identified by vertices: 0, 5, 10, 15
385
386        last_time_index = len(time)-1 #Last last_time_index     
387        d_stage = numpy.reshape(take(stage[last_time_index, :], [0,5,10,15]), (4,1))
388        d_uh = numpy.reshape(take(xmomentum[last_time_index, :], [0,5,10,15]), (4,1))
389        d_vh = numpy.reshape(take(ymomentum[last_time_index, :], [0,5,10,15]), (4,1))
390        D = numpy.concatenate( (d_stage, d_uh, d_vh), axis=1)
391
392        #Reference interpolated values at midpoints on diagonal at
393        #this timestep are
394        r0 = (D[0] + D[1])/2
395        r1 = (D[1] + D[2])/2
396        r2 = (D[2] + D[3])/2
397
398        #And the midpoints are found now
399        Dx = take(numpy.reshape(x, (16,1)), [0,5,10,15])
400        Dy = take(numpy.reshape(y, (16,1)), [0,5,10,15])
401
402        diag = numpy.concatenate( (Dx, Dy), axis=1)
403        d_midpoints = (diag[1:] + diag[:-1])/2
404
405
406        #Adjust for georef - make interpolation points absolute
407        d_midpoints[:,0] += xllcorner
408        d_midpoints[:,1] += yllcorner               
409
410        #Let us see if the file function can find the correct
411        #values at the midpoints at the last timestep:
412        f = file_function(filename, domain1,
413                          interpolation_points = d_midpoints)
414
415        t = time[last_time_index]                         
416        q = f(t, point_id=0); assert numpy.allclose(r0, q)
417        q = f(t, point_id=1); assert numpy.allclose(r1, q)
418        q = f(t, point_id=2); assert numpy.allclose(r2, q)
419
420
421        ##################
422        #Now do the same for the first timestep
423
424        timestep = 0 #First timestep
425        d_stage = numpy.reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
426        d_uh = numpy.reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
427        d_vh = numpy.reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
428        D = numpy.concatenate( (d_stage, d_uh, d_vh), axis=1)
429
430        #Reference interpolated values at midpoints on diagonal at
431        #this timestep are
432        r0 = (D[0] + D[1])/2
433        r1 = (D[1] + D[2])/2
434        r2 = (D[2] + D[3])/2
435
436        #Let us see if the file function can find the correct
437        #values
438        q = f(0, point_id=0); assert numpy.allclose(r0, q)
439        q = f(0, point_id=1); assert numpy.allclose(r1, q)
440        q = f(0, point_id=2); assert numpy.allclose(r2, q)
441
442
443        ##################
444        #Now do it again for a timestep in the middle
445
446        timestep = 33
447        d_stage = numpy.reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
448        d_uh = numpy.reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
449        d_vh = numpy.reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
450        D = numpy.concatenate( (d_stage, d_uh, d_vh), axis=1)
451
452        #Reference interpolated values at midpoints on diagonal at
453        #this timestep are
454        r0 = (D[0] + D[1])/2
455        r1 = (D[1] + D[2])/2
456        r2 = (D[2] + D[3])/2
457
458        q = f(timestep/10., point_id=0); assert numpy.allclose(r0, q)
459        q = f(timestep/10., point_id=1); assert numpy.allclose(r1, q)
460        q = f(timestep/10., point_id=2); assert numpy.allclose(r2, q)
461
462
463        ##################
464        #Now check temporal interpolation
465        #Halfway between timestep 15 and 16
466
467        timestep = 15
468        d_stage = numpy.reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
469        d_uh = numpy.reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
470        d_vh = numpy.reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
471        D = numpy.concatenate( (d_stage, d_uh, d_vh), axis=1)
472
473        #Reference interpolated values at midpoints on diagonal at
474        #this timestep are
475        r0_0 = (D[0] + D[1])/2
476        r1_0 = (D[1] + D[2])/2
477        r2_0 = (D[2] + D[3])/2
478
479        #
480        timestep = 16
481        d_stage = numpy.reshape(take(stage[timestep, :], [0,5,10,15]), (4,1))
482        d_uh = numpy.reshape(take(xmomentum[timestep, :], [0,5,10,15]), (4,1))
483        d_vh = numpy.reshape(take(ymomentum[timestep, :], [0,5,10,15]), (4,1))
484        D = numpy.concatenate( (d_stage, d_uh, d_vh), axis=1)
485
486        #Reference interpolated values at midpoints on diagonal at
487        #this timestep are
488        r0_1 = (D[0] + D[1])/2
489        r1_1 = (D[1] + D[2])/2
490        r2_1 = (D[2] + D[3])/2
491
492        # The reference values are
493        r0 = (r0_0 + r0_1)/2
494        r1 = (r1_0 + r1_1)/2
495        r2 = (r2_0 + r2_1)/2
496
497        q = f((timestep - 0.5)/10., point_id=0); assert numpy.allclose(r0, q)
498        q = f((timestep - 0.5)/10., point_id=1); assert numpy.allclose(r1, q)
499        q = f((timestep - 0.5)/10., point_id=2); assert numpy.allclose(r2, q)
500
501        ##################
502        #Finally check interpolation 2 thirds of the way
503        #between timestep 15 and 16
504
505        # The reference values are
506        r0 = (r0_0 + 2*r0_1)/3
507        r1 = (r1_0 + 2*r1_1)/3
508        r2 = (r2_0 + 2*r2_1)/3
509
510        #And the file function gives
511        q = f((timestep - 1.0/3)/10., point_id=0); assert numpy.allclose(r0, q)
512        q = f((timestep - 1.0/3)/10., point_id=1); assert numpy.allclose(r1, q)
513        q = f((timestep - 1.0/3)/10., point_id=2); assert numpy.allclose(r2, q)
514
515        fid.close()
516        import os
517        os.remove(filename)
518
519       
520
521
522    def test_spatio_temporal_file_function_time(self):
523        """Test that File function interpolates correctly
524        between given times.
525        NetCDF version (x,y,t dependency)
526        """
527
528        #Create NetCDF (sww) file to be read
529        # x: 0, 5, 10, 15
530        # y: -20, -10, 0, 10
531        # t: 0, 60, 120, ...., 1200
532        #
533        # test quantities (arbitrary but non-trivial expressions):
534        #
535        #   stage     = 3*x - y**2 + 2*t
536        #   xmomentum = exp( -((x-7)**2 + (y+5)**2)/20 ) * t**2
537        #   ymomentum = x**2 + y**2 * sin(t*pi/600)
538
539        #NOTE: Nice test that may render some of the others redundant.
540
541        import os, time
542        from anuga.config import time_format
543        from mesh_factory import rectangular
544        from shallow_water import Domain
545        import anuga.shallow_water.data_manager
546
547        finaltime = 1200
548        filename = 'test_file_function'
549
550        #Create a domain to hold test grid
551        #(0:15, -20:10)
552        points, vertices, boundary =\
553                rectangular(4, 4, 15, 30, origin = (0, -20))
554        #print "points", points
555
556        #print 'Number of elements', len(vertices)
557        domain = Domain(points, vertices, boundary)
558        domain.smooth = False
559        domain.default_order = 2
560        domain.set_datadir('.')
561        domain.set_name(filename)
562        domain.store = True
563        domain.format = 'sww'   #Native netcdf visualisation format
564
565        #print points
566        start = time.mktime(time.strptime('2000', '%Y'))
567        domain.starttime = start
568
569
570        #Store structure
571        domain.initialise_storage()
572
573        #Compute artificial time steps and store
574        dt = 60  #One minute intervals
575        t = 0.0
576        while t <= finaltime:
577            #Compute quantities
578            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
579            domain.set_quantity('stage', f1)
580
581            f2 = lambda x,y: x+y+t**2
582            domain.set_quantity('xmomentum', f2)
583
584            f3 = lambda x,y: x**2 + y**2 * numpy.sin(t*numpy.pi/600)
585            domain.set_quantity('ymomentum', f3)
586
587            #Store and advance time
588            domain.time = t
589            domain.store_timestep(domain.conserved_quantities)
590            t += dt
591
592
593        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5]]
594     
595        #Deliberately set domain.starttime to too early
596        domain.starttime = start - 1
597
598        #Create file function
599        F = file_function(filename + '.sww', domain,
600                          quantities = domain.conserved_quantities,
601                          interpolation_points = interpolation_points)
602
603        #Check that FF updates fixes domain starttime
604        assert numpy.allclose(domain.starttime, start)
605
606        #Check that domain.starttime isn't updated if later
607        domain.starttime = start + 1
608        F = file_function(filename + '.sww', domain,
609                          quantities = domain.conserved_quantities,
610                          interpolation_points = interpolation_points)
611        assert numpy.allclose(domain.starttime, start+1)
612        domain.starttime = start
613
614
615        #Check linear interpolation in time
616        F = file_function(filename + '.sww', domain,
617                          quantities = domain.conserved_quantities,
618                          interpolation_points = interpolation_points)               
619        for id in range(len(interpolation_points)):
620            x = interpolation_points[id][0]
621            y = interpolation_points[id][1]
622
623            for i in range(20):
624                t = i*10
625                k = i%6
626
627                if k == 0:
628                    q0 = F(t, point_id=id)
629                    q1 = F(t+60, point_id=id)
630
631                if q0 == NAN:
632                    actual = q0
633                else:
634                    actual = (k*q1 + (6-k)*q0)/6
635                q = F(t, point_id=id)
636                #print i, k, t, q
637                #print ' ', q0
638                #print ' ', q1
639                #print "q",q
640                #print "actual", actual
641                #print
642                if q0 == NAN:
643                     self.failUnless( q == actual, 'Fail!')
644                else:
645                    assert numpy.allclose(q, actual)
646
647
648        #Another check of linear interpolation in time
649        for id in range(len(interpolation_points)):
650            q60 = F(60, point_id=id)
651            q120 = F(120, point_id=id)
652
653            t = 90 #Halfway between 60 and 120
654            q = F(t, point_id=id)
655            assert numpy.allclose( (q120+q60)/2, q )
656
657            t = 100 #Two thirds of the way between between 60 and 120
658            q = F(t, point_id=id)
659            assert numpy.allclose(q60/3 + 2*q120/3, q)
660
661
662
663        #Check that domain.starttime isn't updated if later than file starttime but earlier
664        #than file end time
665        delta = 23
666        domain.starttime = start + delta
667        F = file_function(filename + '.sww', domain,
668                          quantities = domain.conserved_quantities,
669                          interpolation_points = interpolation_points)
670        assert numpy.allclose(domain.starttime, start+delta)
671
672
673
674
675        #Now try interpolation with delta offset
676        for id in range(len(interpolation_points)):           
677            x = interpolation_points[id][0]
678            y = interpolation_points[id][1]
679
680            for i in range(20):
681                t = i*10
682                k = i%6
683
684                if k == 0:
685                    q0 = F(t-delta, point_id=id)
686                    q1 = F(t+60-delta, point_id=id)
687
688                q = F(t-delta, point_id=id)
689                assert numpy.allclose(q, (k*q1 + (6-k)*q0)/6)
690
691
692        os.remove(filename + '.sww')
693
694
695
696    def Xtest_spatio_temporal_file_function_time(self):
697        # FIXME: This passes but needs some TLC
698        # Test that File function interpolates correctly
699        # When some points are outside the mesh
700
701        import os, time
702        from anuga.config import time_format
703        from mesh_factory import rectangular
704        from shallow_water import Domain
705        import anuga.shallow_water.data_manager 
706        from anuga.pmesh.mesh_interface import create_mesh_from_regions
707        finaltime = 1200
708       
709        filename = tempfile.mktemp()
710        #print "filename",filename
711        filename = 'test_file_function'
712
713        meshfilename = tempfile.mktemp(".tsh")
714
715        boundary_tags = {'walls':[0,1],'bom':[2]}
716       
717        polygon_absolute = [[0,-20],[10,-20],[10,15],[-20,15]]
718       
719        create_mesh_from_regions(polygon_absolute,
720                                 boundary_tags,
721                                 10000000,
722                                 filename=meshfilename)
723        domain = Domain(mesh_filename=meshfilename)
724        domain.smooth = False
725        domain.default_order = 2
726        domain.set_datadir('.')
727        domain.set_name(filename)
728        domain.store = True
729        domain.format = 'sww'   #Native netcdf visualisation format
730
731        #print points
732        start = time.mktime(time.strptime('2000', '%Y'))
733        domain.starttime = start
734       
735
736        #Store structure
737        domain.initialise_storage()
738
739        #Compute artificial time steps and store
740        dt = 60  #One minute intervals
741        t = 0.0
742        while t <= finaltime:
743            #Compute quantities
744            f1 = lambda x,y: 3*x - y**2 + 2*t + 4
745            domain.set_quantity('stage', f1)
746
747            f2 = lambda x,y: x+y+t**2
748            domain.set_quantity('xmomentum', f2)
749
750            f3 = lambda x,y: x**2 + y**2 * numpy.sin(t*numpy.pi/600)
751            domain.set_quantity('ymomentum', f3)
752
753            #Store and advance time
754            domain.time = t
755            domain.store_timestep(domain.conserved_quantities)
756            t += dt
757
758        interpolation_points = [[1,0]]
759        interpolation_points = [[100,1000]]
760       
761        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14], [10,-12.5],
762                                [78787,78787],[7878,3432]]
763           
764        #Deliberately set domain.starttime to too early
765        domain.starttime = start - 1
766
767        #Create file function
768        F = file_function(filename + '.sww', domain,
769                          quantities = domain.conserved_quantities,
770                          interpolation_points = interpolation_points)
771
772        #Check that FF updates fixes domain starttime
773        assert numpy.allclose(domain.starttime, start)
774
775        #Check that domain.starttime isn't updated if later
776        domain.starttime = start + 1
777        F = file_function(filename + '.sww', domain,
778                          quantities = domain.conserved_quantities,
779                          interpolation_points = interpolation_points)
780        assert numpy.allclose(domain.starttime, start+1)
781        domain.starttime = start
782
783
784        #Check linear interpolation in time
785        # checking points inside and outside the mesh
786        F = file_function(filename + '.sww', domain,
787                          quantities = domain.conserved_quantities,
788                          interpolation_points = interpolation_points)
789       
790        for id in range(len(interpolation_points)):
791            x = interpolation_points[id][0]
792            y = interpolation_points[id][1]
793
794            for i in range(20):
795                t = i*10
796                k = i%6
797
798                if k == 0:
799                    q0 = F(t, point_id=id)
800                    q1 = F(t+60, point_id=id)
801
802                if q0 == NAN:
803                    actual = q0
804                else:
805                    actual = (k*q1 + (6-k)*q0)/6
806                q = F(t, point_id=id)
807                #print i, k, t, q
808                #print ' ', q0
809                #print ' ', q1
810                #print "q",q
811                #print "actual", actual
812                #print
813                if q0 == NAN:
814                     self.failUnless( q == actual, 'Fail!')
815                else:
816                    assert numpy.allclose(q, actual)
817
818        # now lets check points inside the mesh
819        interpolation_points = [[0,-20], [1,0], [0,1], [1.1, 3.14]] #, [10,-12.5]] - this point doesn't work WHY?
820        interpolation_points = [[10,-12.5]]
821           
822        print "len(interpolation_points)",len(interpolation_points) 
823        F = file_function(filename + '.sww', domain,
824                          quantities = domain.conserved_quantities,
825                          interpolation_points = interpolation_points)
826
827        domain.starttime = start
828
829
830        #Check linear interpolation in time
831        F = file_function(filename + '.sww', domain,
832                          quantities = domain.conserved_quantities,
833                          interpolation_points = interpolation_points)               
834        for id in range(len(interpolation_points)):
835            x = interpolation_points[id][0]
836            y = interpolation_points[id][1]
837
838            for i in range(20):
839                t = i*10
840                k = i%6
841
842                if k == 0:
843                    q0 = F(t, point_id=id)
844                    q1 = F(t+60, point_id=id)
845
846                if q0 == NAN:
847                    actual = q0
848                else:
849                    actual = (k*q1 + (6-k)*q0)/6
850                q = F(t, point_id=id)
851                print "############"
852                print "id, x, y ", id, x, y #k, t, q
853                print "t", t
854                #print ' ', q0
855                #print ' ', q1
856                print "q",q
857                print "actual", actual
858                #print
859                if q0 == NAN:
860                     self.failUnless( q == actual, 'Fail!')
861                else:
862                    assert numpy.allclose(q, actual)
863
864
865        #Another check of linear interpolation in time
866        for id in range(len(interpolation_points)):
867            q60 = F(60, point_id=id)
868            q120 = F(120, point_id=id)
869
870            t = 90 #Halfway between 60 and 120
871            q = F(t, point_id=id)
872            assert numpy.allclose( (q120+q60)/2, q )
873
874            t = 100 #Two thirds of the way between between 60 and 120
875            q = F(t, point_id=id)
876            assert numpy.allclose(q60/3 + 2*q120/3, q)
877
878
879
880        #Check that domain.starttime isn't updated if later than file starttime but earlier
881        #than file end time
882        delta = 23
883        domain.starttime = start + delta
884        F = file_function(filename + '.sww', domain,
885                          quantities = domain.conserved_quantities,
886                          interpolation_points = interpolation_points)
887        assert numpy.allclose(domain.starttime, start+delta)
888
889
890
891
892        #Now try interpolation with delta offset
893        for id in range(len(interpolation_points)):           
894            x = interpolation_points[id][0]
895            y = interpolation_points[id][1]
896
897            for i in range(20):
898                t = i*10
899                k = i%6
900
901                if k == 0:
902                    q0 = F(t-delta, point_id=id)
903                    q1 = F(t+60-delta, point_id=id)
904
905                q = F(t-delta, point_id=id)
906                assert numpy.allclose(q, (k*q1 + (6-k)*q0)/6)
907
908
909        os.remove(filename + '.sww')
910
911    def test_file_function_time_with_domain(self):
912        """Test that File function interpolates correctly
913        between given times. No x,y dependency here.
914        Use domain with starttime
915        """
916
917        #Write file
918        import os, time, calendar
919        from anuga.config import time_format
920        from math import sin, pi
921        from domain import Domain
922
923        finaltime = 1200
924        filename = 'test_file_function'
925        fid = open(filename + '.txt', 'w')
926        start = time.mktime(time.strptime('2000', '%Y'))
927        dt = 60  #One minute intervals
928        t = 0.0
929        while t <= finaltime:
930            t_string = time.strftime(time_format, time.gmtime(t+start))
931            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
932            t += dt
933
934        fid.close()
935
936
937        #Convert ASCII file to NetCDF (Which is what we really like!)
938        timefile2netcdf(filename)
939
940
941
942        a = [0.0, 0.0]
943        b = [4.0, 0.0]
944        c = [0.0, 3.0]
945
946        points = [a, b, c]
947        vertices = [[0,1,2]]
948        domain = Domain(points, vertices)
949
950        # Check that domain.starttime is updated if non-existing
951        F = file_function(filename + '.tms',
952                          domain,
953                          quantities = ['Attribute0', 'Attribute1', 'Attribute2']) 
954        assert numpy.allclose(domain.starttime, start)
955
956        # Check that domain.starttime is updated if too early
957        domain.starttime = start - 1
958        F = file_function(filename + '.tms',
959                          domain,
960                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])
961        assert numpy.allclose(domain.starttime, start)
962
963        # Check that domain.starttime isn't updated if later
964        domain.starttime = start + 1
965        F = file_function(filename + '.tms',
966                          domain,
967                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])
968        assert numpy.allclose(domain.starttime, start+1)
969
970        domain.starttime = start
971        F = file_function(filename + '.tms',
972                          domain,
973                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'],
974                          use_cache=True)
975       
976
977        #print F.precomputed_values
978        #print 'F(60)', F(60)
979       
980        #Now try interpolation
981        for i in range(20):
982            t = i*10
983            q = F(t)
984
985            #Exact linear intpolation
986            assert numpy.allclose(q[0], 2*t)
987            if i%6 == 0:
988                assert numpy.allclose(q[1], t**2)
989                assert numpy.allclose(q[2], sin(t*pi/600))
990
991        #Check non-exact
992
993        t = 90 #Halfway between 60 and 120
994        q = F(t)
995        assert numpy.allclose( (120**2 + 60**2)/2, q[1] )
996        assert numpy.allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
997
998
999        t = 100 #Two thirds of the way between between 60 and 120
1000        q = F(t)
1001        assert numpy.allclose( 2*120**2/3 + 60**2/3, q[1] )
1002        assert numpy.allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
1003
1004        os.remove(filename + '.tms')
1005        os.remove(filename + '.txt')       
1006
1007    def test_file_function_time_with_domain_different_start(self):
1008        """Test that File function interpolates correctly
1009        between given times. No x,y dependency here.
1010        Use domain with a starttime later than that of file
1011
1012        ASCII version
1013        """
1014
1015        #Write file
1016        import os, time, calendar
1017        from anuga.config import time_format
1018        from math import sin, pi
1019        from domain import Domain
1020
1021        finaltime = 1200
1022        filename = 'test_file_function'
1023        fid = open(filename + '.txt', 'w')
1024        start = time.mktime(time.strptime('2000', '%Y'))
1025        dt = 60  #One minute intervals
1026        t = 0.0
1027        while t <= finaltime:
1028            t_string = time.strftime(time_format, time.gmtime(t+start))
1029            fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600)))
1030            t += dt
1031
1032        fid.close()
1033
1034        #Convert ASCII file to NetCDF (Which is what we really like!)
1035        timefile2netcdf(filename)       
1036
1037        a = [0.0, 0.0]
1038        b = [4.0, 0.0]
1039        c = [0.0, 3.0]
1040
1041        points = [a, b, c]
1042        vertices = [[0,1,2]]
1043        domain = Domain(points, vertices)
1044
1045        #Check that domain.starttime isn't updated if later than file starttime but earlier
1046        #than file end time
1047        delta = 23
1048        domain.starttime = start + delta
1049        F = file_function(filename + '.tms', domain,
1050                          quantities = ['Attribute0', 'Attribute1', 'Attribute2'])       
1051        assert numpy.allclose(domain.starttime, start+delta)
1052
1053
1054
1055
1056        #Now try interpolation with delta offset
1057        for i in range(20):
1058            t = i*10
1059            q = F(t-delta)
1060
1061            #Exact linear intpolation
1062            assert numpy.allclose(q[0], 2*t)
1063            if i%6 == 0:
1064                assert numpy.allclose(q[1], t**2)
1065                assert numpy.allclose(q[2], sin(t*pi/600))
1066
1067        #Check non-exact
1068
1069        t = 90 #Halfway between 60 and 120
1070        q = F(t-delta)
1071        assert numpy.allclose( (120**2 + 60**2)/2, q[1] )
1072        assert numpy.allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] )
1073
1074
1075        t = 100 #Two thirds of the way between between 60 and 120
1076        q = F(t-delta)
1077        assert numpy.allclose( 2*120**2/3 + 60**2/3, q[1] )
1078        assert numpy.allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] )
1079
1080
1081        os.remove(filename + '.tms')
1082        os.remove(filename + '.txt')               
1083
1084
1085
1086    def test_apply_expression_to_dictionary(self):
1087
1088        #FIXME: Division is not expected to work for integers.
1089        #This must be caught.
1090        foo = numpy.array([[1,2,3], [4,5,6]], numpy.float)
1091
1092        bar = numpy.array([[-1,0,5], [6,1,1]], numpy.float)                 
1093
1094        D = {'X': foo, 'Y': bar}
1095
1096        Z = apply_expression_to_dictionary('X+Y', D)       
1097        assert numpy.allclose(Z, foo+bar)
1098
1099        Z = apply_expression_to_dictionary('X*Y', D)       
1100        assert numpy.allclose(Z, foo*bar)       
1101
1102        Z = apply_expression_to_dictionary('4*X+Y', D)       
1103        assert numpy.allclose(Z, 4*foo+bar)       
1104
1105        # test zero division is OK
1106        Z = apply_expression_to_dictionary('X/Y', D)
1107        assert numpy.allclose(1/Z, 1/(foo/bar)) # can't compare inf to inf
1108
1109        # make an error for zero on zero
1110        # this is really an error in Numeric, SciPy core can handle it
1111        # Z = apply_expression_to_dictionary('0/Y', D)
1112
1113        #Check exceptions
1114        try:
1115            #Wrong name
1116            Z = apply_expression_to_dictionary('4*X+A', D)       
1117        except NameError:
1118            pass
1119        else:
1120            msg = 'Should have raised a NameError Exception'
1121            raise msg
1122
1123
1124        try:
1125            #Wrong order
1126            Z = apply_expression_to_dictionary(D, '4*X+A')       
1127        except AssertionError:
1128            pass
1129        else:
1130            msg = 'Should have raised a AssertionError Exception'
1131            raise msg       
1132       
1133
1134    def test_multiple_replace(self):
1135        """Hard test that checks a true word-by-word simultaneous replace
1136        """
1137       
1138        D = {'x': 'xi', 'y': 'eta', 'xi':'lam'}
1139        exp = '3*x+y + xi'
1140       
1141        new = multiple_replace(exp, D)
1142       
1143        assert new == '3*xi+eta + lam'
1144                         
1145
1146
1147    def test_point_on_line_obsolete(self):
1148        """Test that obsolete call issues appropriate warning"""
1149
1150        #Turn warning into an exception
1151        import warnings
1152        warnings.filterwarnings('error')
1153
1154        try:
1155            assert point_on_line( 0, 0.5, 0,1, 0,0 )
1156        except DeprecationWarning:
1157            pass
1158        else:
1159            msg = 'point_on_line should have issued a DeprecationWarning'
1160            raise Exception(msg)   
1161
1162        warnings.resetwarnings()
1163   
1164    def test_get_revision_number(self):
1165        """test_get_revision_number(self):
1166
1167        Test that revision number can be retrieved.
1168        """
1169        if os.environ.has_key('USER') and os.environ['USER'] == 'dgray':
1170            # I have a known snv incompatability issue,
1171            # so I'm skipping this test.
1172            # FIXME when SVN is upgraded on our clusters
1173            pass
1174        else:   
1175            n = get_revision_number()
1176            assert n>=0
1177
1178
1179       
1180    def test_add_directories(self):
1181       
1182        import tempfile
1183        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
1184        directories = ['ja','ne','ke']
1185        kens_dir = add_directories(root_dir, directories)
1186        assert kens_dir == root_dir + sep + 'ja' + sep + 'ne' + \
1187               sep + 'ke'
1188        assert access(root_dir,F_OK)
1189
1190        add_directories(root_dir, directories)
1191        assert access(root_dir,F_OK)
1192       
1193        #clean up!
1194        os.rmdir(kens_dir)
1195        os.rmdir(root_dir + sep + 'ja' + sep + 'ne')
1196        os.rmdir(root_dir + sep + 'ja')
1197        os.rmdir(root_dir)
1198
1199    def test_add_directories_bad(self):
1200       
1201        import tempfile
1202        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
1203        directories = ['/\/!@#@#$%^%&*((*:*:','ne','ke']
1204       
1205        try:
1206            kens_dir = add_directories(root_dir, directories)
1207        except OSError:
1208            pass
1209        else:
1210            msg = 'bad dir name should give OSError'
1211            raise Exception(msg)   
1212           
1213        #clean up!
1214        os.rmdir(root_dir)
1215
1216    def test_check_list(self):
1217
1218        check_list(['stage','xmomentum'])
1219
1220       
1221    def test_add_directories(self):
1222       
1223        import tempfile
1224        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
1225        directories = ['ja','ne','ke']
1226        kens_dir = add_directories(root_dir, directories)
1227        assert kens_dir == root_dir + sep + 'ja' + sep + 'ne' + \
1228               sep + 'ke'
1229        assert access(root_dir,F_OK)
1230
1231        add_directories(root_dir, directories)
1232        assert access(root_dir,F_OK)
1233       
1234        #clean up!
1235        os.rmdir(kens_dir)
1236        os.rmdir(root_dir + sep + 'ja' + sep + 'ne')
1237        os.rmdir(root_dir + sep + 'ja')
1238        os.rmdir(root_dir)
1239
1240    def test_add_directories_bad(self):
1241       
1242        import tempfile
1243        root_dir = tempfile.mkdtemp('_test_util', 'test_util_')
1244        directories = ['/\/!@#@#$%^%&*((*:*:','ne','ke']
1245       
1246        try:
1247            kens_dir = add_directories(root_dir, directories)
1248        except OSError:
1249            pass
1250        else:
1251            msg = 'bad dir name should give OSError'
1252            raise Exception(msg)   
1253           
1254        #clean up!
1255        os.rmdir(root_dir)
1256
1257    def test_check_list(self):
1258
1259        check_list(['stage','xmomentum'])
1260       
1261    def test_remove_lone_verts_d(self):
1262        verts = [[0,0],[1,0],[0,1]]
1263        tris = [[0,1,2]]
1264        new_verts, new_tris = remove_lone_verts(verts, tris)
1265        assert numpy.alltrue(new_verts == verts)
1266        assert numpy.alltrue(new_tris == tris)
1267     
1268
1269    def test_remove_lone_verts_e(self):
1270        verts = [[0,0],[1,0],[0,1],[99,99]]
1271        tris = [[0,1,2]]
1272        new_verts, new_tris = remove_lone_verts(verts, tris)
1273        assert new_verts == verts[0:3]
1274        assert new_tris == tris
1275       
1276    def test_remove_lone_verts_a(self):
1277        verts = [[99,99],[0,0],[1,0],[99,99],[0,1],[99,99]]
1278        tris = [[1,2,4]]
1279        new_verts, new_tris = remove_lone_verts(verts, tris)
1280        #print "new_verts", new_verts
1281        assert numpy.alltrue(new_verts == [[0,0],[1,0],[0,1]])
1282        assert numpy.alltrue(new_tris == [[0,1,2]])
1283     
1284    def test_remove_lone_verts_c(self):
1285        verts = [[0,0],[1,0],[99,99],[0,1]]
1286        tris = [[0,1,3]]
1287        new_verts, new_tris = remove_lone_verts(verts, tris)
1288        assert numpy.alltrue(new_verts == [[0,0],[1,0],[0,1]])
1289        assert numpy.alltrue(new_tris == [[0,1,2]])
1290       
1291    def test_remove_lone_verts_b(self):
1292        verts = [[0,0],[1,0],[0,1],[99,99],[99,99],[99,99]]
1293        tris = [[0,1,2]]
1294        new_verts, new_tris = remove_lone_verts(verts, tris)
1295        assert numpy.alltrue(new_verts == verts[0:3])
1296        assert numpy.alltrue(new_tris == tris)
1297     
1298
1299    def test_remove_lone_verts_e(self):
1300        verts = [[0,0],[1,0],[0,1],[99,99]]
1301        tris = [[0,1,2]]
1302        new_verts, new_tris = remove_lone_verts(verts, tris)
1303        assert numpy.alltrue(new_verts == verts[0:3])
1304        assert numpy.alltrue(new_tris == tris)
1305       
1306    def test_get_min_max_values(self):
1307       
1308        list=[8,9,6,1,4]
1309        min1, max1 = get_min_max_values(list)
1310       
1311        assert min1==1 
1312        assert max1==9
1313       
1314    def test_get_min_max_values1(self):
1315       
1316        list=[-8,-9,-6,-1,-4]
1317        min1, max1 = get_min_max_values(list)
1318       
1319#        print 'min1,max1',min1,max1
1320        assert min1==-9 
1321        assert max1==-1
1322
1323#    def test_get_min_max_values2(self):
1324#        '''
1325#        The min and max supplied are greater than the ones in the
1326#        list and therefore are the ones returned
1327#        '''
1328#        list=[-8,-9,-6,-1,-4]
1329#        min1, max1 = get_min_max_values(list,-10,10)
1330#       
1331##        print 'min1,max1',min1,max1
1332#        assert min1==-10
1333#        assert max1==10
1334       
1335    def test_make_plots_from_csv_files(self):
1336       
1337        #if sys.platform == 'win32':  #Windows
1338            try: 
1339                import pylab
1340            except ImportError:
1341                #ANUGA don't need pylab to work so the system doesn't
1342                #rely on pylab being installed
1343                return
1344           
1345       
1346            current_dir=getcwd()+sep+'abstract_2d_finite_volumes'
1347            temp_dir = tempfile.mkdtemp('','figures')
1348    #        print 'temp_dir',temp_dir
1349            fileName = temp_dir+sep+'time_series_3.csv'
1350            file = open(fileName,"w")
1351            file.write("time,stage,speed,momentum,elevation\n\
13521.0, 0, 0, 0, 10 \n\
13532.0, 5, 2, 4, 10 \n\
13543.0, 3, 3, 5, 10 \n")
1355            file.close()
1356   
1357            fileName1 = temp_dir+sep+'time_series_4.csv'
1358            file1 = open(fileName1,"w")
1359            file1.write("time,stage,speed,momentum,elevation\n\
13601.0, 0, 0, 0, 5 \n\
13612.0, -5, -2, -4, 5 \n\
13623.0, -4, -3, -5, 5 \n")
1363            file1.close()
1364   
1365            fileName2 = temp_dir+sep+'time_series_5.csv'
1366            file2 = open(fileName2,"w")
1367            file2.write("time,stage,speed,momentum,elevation\n\
13681.0, 0, 0, 0, 7 \n\
13692.0, 4, -0.45, 57, 7 \n\
13703.0, 6, -0.5, 56, 7 \n")
1371            file2.close()
1372           
1373            dir, name=os.path.split(fileName)
1374            csv2timeseries_graphs(directories_dic={dir:['gauge', 0, 0]},
1375                                  output_dir=temp_dir,
1376                                  base_name='time_series_',
1377                                  plot_numbers=['3-5'],
1378                                  quantities=['speed','stage','momentum'],
1379                                  assess_all_csv_files=True,
1380                                  extra_plot_name='test')
1381           
1382            #print dir+sep+name[:-4]+'_stage_test.png'
1383            assert(access(dir+sep+name[:-4]+'_stage_test.png',F_OK)==True)
1384            assert(access(dir+sep+name[:-4]+'_speed_test.png',F_OK)==True)
1385            assert(access(dir+sep+name[:-4]+'_momentum_test.png',F_OK)==True)
1386   
1387            dir1, name1=os.path.split(fileName1)
1388            assert(access(dir+sep+name1[:-4]+'_stage_test.png',F_OK)==True)
1389            assert(access(dir+sep+name1[:-4]+'_speed_test.png',F_OK)==True)
1390            assert(access(dir+sep+name1[:-4]+'_momentum_test.png',F_OK)==True)
1391   
1392   
1393            dir2, name2=os.path.split(fileName2)
1394            assert(access(dir+sep+name2[:-4]+'_stage_test.png',F_OK)==True)
1395            assert(access(dir+sep+name2[:-4]+'_speed_test.png',F_OK)==True)
1396            assert(access(dir+sep+name2[:-4]+'_momentum_test.png',F_OK)==True)
1397   
1398            del_dir(temp_dir)
1399       
1400
1401    def test_sww2csv_gauges(self):
1402
1403        def elevation_function(x, y):
1404            return -x
1405       
1406        """Most of this test was copied from test_interpolate
1407        test_interpole_sww2csv
1408       
1409        This is testing the gauge_sww2csv function, by creating a sww file and
1410        then exporting the gauges and checking the results.
1411        """
1412       
1413        # Create mesh
1414        mesh_file = tempfile.mktemp(".tsh")   
1415        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
1416        m = Mesh()
1417        m.add_vertices(points)
1418        m.auto_segment()
1419        m.generate_mesh(verbose=False)
1420        m.export_mesh_file(mesh_file)
1421       
1422        # Create shallow water domain
1423        domain = Domain(mesh_file)
1424        os.remove(mesh_file)
1425       
1426        domain.default_order=2
1427       
1428        # This test was made before tight_slope_limiters were introduced
1429        # Since were are testing interpolation values this is OK
1430        domain.tight_slope_limiters = 0 
1431       
1432
1433        # Set some field values
1434        domain.set_quantity('elevation', elevation_function)
1435        domain.set_quantity('friction', 0.03)
1436        domain.set_quantity('xmomentum', 3.0)
1437        domain.set_quantity('ymomentum', 4.0)
1438
1439        ######################
1440        # Boundary conditions
1441        B = Transmissive_boundary(domain)
1442        domain.set_boundary( {'exterior': B})
1443
1444        # This call mangles the stage values.
1445        domain.distribute_to_vertices_and_edges()
1446        domain.set_quantity('stage', 1.0)
1447
1448
1449        domain.set_name('datatest' + str(time.time()))
1450        domain.format = 'sww'
1451        domain.smooth = True
1452        domain.reduction = mean
1453
1454
1455        sww = get_dataobject(domain)
1456        sww.store_connectivity()
1457        sww.store_timestep(['stage', 'xmomentum', 'ymomentum','elevation'])
1458        domain.set_quantity('stage', 10.0) # This is automatically limited
1459        # so it will not be less than the elevation
1460        domain.time = 2.
1461        sww.store_timestep(['stage','elevation', 'xmomentum', 'ymomentum'])
1462
1463        # test the function
1464        points = [[5.0,1.],[0.5,2.]]
1465
1466        points_file = tempfile.mktemp(".csv")
1467#        points_file = 'test_point.csv'
1468        file_id = open(points_file,"w")
1469        file_id.write("name, easting, northing, elevation \n\
1470point1, 5.0, 1.0, 3.0\n\
1471point2, 0.5, 2.0, 9.0\n")
1472        file_id.close()
1473
1474       
1475        sww2csv_gauges(sww.filename, 
1476                       points_file,
1477                       verbose=False,
1478                       use_cache=False)
1479
1480#        point1_answers_array = [[0.0,1.0,-5.0,3.0,4.0], [2.0,10.0,-5.0,3.0,4.0]]
1481        point1_answers_array = [[0.0,1.0,6.0,-5.0,3.0,4.0], [2.0,10.0,15.0,-5.0,3.0,4.0]]
1482        point1_filename = 'gauge_point1.csv'
1483        point1_handle = file(point1_filename)
1484        point1_reader = reader(point1_handle)
1485        point1_reader.next()
1486
1487        line=[]
1488        for i,row in enumerate(point1_reader):
1489            #print 'i',i,'row',row
1490            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),float(row[4]),float(row[5])])
1491            #print 'assert line',line[i],'point1',point1_answers_array[i]
1492            assert numpy.allclose(line[i], point1_answers_array[i])
1493
1494        point2_answers_array = [[0.0,1.0,1.5,-0.5,3.0,4.0], [2.0,10.0,10.5,-0.5,3.0,4.0]]
1495        point2_filename = 'gauge_point2.csv' 
1496        point2_handle = file(point2_filename)
1497        point2_reader = reader(point2_handle)
1498        point2_reader.next()
1499                       
1500        line=[]
1501        for i,row in enumerate(point2_reader):
1502            #print 'i',i,'row',row
1503            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),float(row[4]),float(row[5])])
1504            #print 'assert line',line[i],'point1',point1_answers_array[i]
1505            assert numpy.allclose(line[i], point2_answers_array[i])
1506                         
1507        # clean up
1508        point1_handle.close()
1509        point2_handle.close()
1510        #print "sww.filename",sww.filename
1511        os.remove(sww.filename)
1512        os.remove(points_file)
1513        os.remove(point1_filename)
1514        os.remove(point2_filename)
1515
1516
1517
1518    def test_sww2csv_gauges1(self):
1519        from anuga.pmesh.mesh import Mesh
1520        from anuga.shallow_water import Domain, Transmissive_boundary
1521        from anuga.shallow_water.data_manager import get_dataobject
1522        from csv import reader,writer
1523        import time
1524        import string
1525
1526        def elevation_function(x, y):
1527            return -x
1528       
1529        """Most of this test was copied from test_interpolate
1530        test_interpole_sww2csv
1531       
1532        This is testing the gauge_sww2csv function, by creating a sww file and
1533        then exporting the gauges and checking the results.
1534       
1535        This tests the ablity not to have elevation in the points file and
1536        not store xmomentum and ymomentum
1537        """
1538       
1539        # Create mesh
1540        mesh_file = tempfile.mktemp(".tsh")   
1541        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
1542        m = Mesh()
1543        m.add_vertices(points)
1544        m.auto_segment()
1545        m.generate_mesh(verbose=False)
1546        m.export_mesh_file(mesh_file)
1547       
1548        # Create shallow water domain
1549        domain = Domain(mesh_file)
1550        os.remove(mesh_file)
1551       
1552        domain.default_order=2
1553
1554        # Set some field values
1555        domain.set_quantity('elevation', elevation_function)
1556        domain.set_quantity('friction', 0.03)
1557        domain.set_quantity('xmomentum', 3.0)
1558        domain.set_quantity('ymomentum', 4.0)
1559
1560        ######################
1561        # Boundary conditions
1562        B = Transmissive_boundary(domain)
1563        domain.set_boundary( {'exterior': B})
1564
1565        # This call mangles the stage values.
1566        domain.distribute_to_vertices_and_edges()
1567        domain.set_quantity('stage', 1.0)
1568
1569
1570        domain.set_name('datatest' + str(time.time()))
1571        domain.format = 'sww'
1572        domain.smooth = True
1573        domain.reduction = mean
1574
1575        sww = get_dataobject(domain)
1576        sww.store_connectivity()
1577        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
1578        domain.set_quantity('stage', 10.0) # This is automatically limited
1579        # so it will not be less than the elevation
1580        domain.time = 2.
1581        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
1582
1583        # test the function
1584        points = [[5.0,1.],[0.5,2.]]
1585
1586        points_file = tempfile.mktemp(".csv")
1587#        points_file = 'test_point.csv'
1588        file_id = open(points_file,"w")
1589        file_id.write("name,easting,northing \n\
1590point1, 5.0, 1.0\n\
1591point2, 0.5, 2.0\n")
1592        file_id.close()
1593
1594        sww2csv_gauges(sww.filename, 
1595                            points_file,
1596                            quantities=['stage', 'elevation'],
1597                            use_cache=False,
1598                            verbose=False)
1599
1600        point1_answers_array = [[0.0,1.0,-5.0], [2.0,10.0,-5.0]]
1601        point1_filename = 'gauge_point1.csv'
1602        point1_handle = file(point1_filename)
1603        point1_reader = reader(point1_handle)
1604        point1_reader.next()
1605
1606        line=[]
1607        for i,row in enumerate(point1_reader):
1608#            print 'i',i,'row',row
1609            line.append([float(row[0]),float(row[1]),float(row[2])])
1610            #print 'line',line[i],'point1',point1_answers_array[i]
1611            assert numpy.allclose(line[i], point1_answers_array[i])
1612
1613        point2_answers_array = [[0.0,1.0,-0.5], [2.0,10.0,-0.5]]
1614        point2_filename = 'gauge_point2.csv' 
1615        point2_handle = file(point2_filename)
1616        point2_reader = reader(point2_handle)
1617        point2_reader.next()
1618                       
1619        line=[]
1620        for i,row in enumerate(point2_reader):
1621#            print 'i',i,'row',row
1622            line.append([float(row[0]),float(row[1]),float(row[2])])
1623#            print 'line',line[i],'point1',point1_answers_array[i]
1624            assert numpy.allclose(line[i], point2_answers_array[i])
1625                         
1626        # clean up
1627        point1_handle.close()
1628        point2_handle.close()
1629        #print "sww.filename",sww.filename
1630        os.remove(sww.filename)
1631        os.remove(points_file)
1632        os.remove(point1_filename)
1633        os.remove(point2_filename)
1634
1635
1636    def test_sww2csv_gauges2(self):
1637
1638        def elevation_function(x, y):
1639            return -x
1640       
1641        """Most of this test was copied from test_interpolate
1642        test_interpole_sww2csv
1643       
1644        This is testing the gauge_sww2csv function, by creating a sww file and
1645        then exporting the gauges and checking the results.
1646       
1647        This is the same as sww2csv_gauges except set domain.set_starttime to 5.
1648        Therefore testing the storing of the absolute time in the csv files
1649        """
1650       
1651        # Create mesh
1652        mesh_file = tempfile.mktemp(".tsh")   
1653        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
1654        m = Mesh()
1655        m.add_vertices(points)
1656        m.auto_segment()
1657        m.generate_mesh(verbose=False)
1658        m.export_mesh_file(mesh_file)
1659       
1660        # Create shallow water domain
1661        domain = Domain(mesh_file)
1662        os.remove(mesh_file)
1663       
1664        domain.default_order=2
1665
1666        # This test was made before tight_slope_limiters were introduced
1667        # Since were are testing interpolation values this is OK
1668        domain.tight_slope_limiters = 0         
1669
1670        # Set some field values
1671        domain.set_quantity('elevation', elevation_function)
1672        domain.set_quantity('friction', 0.03)
1673        domain.set_quantity('xmomentum', 3.0)
1674        domain.set_quantity('ymomentum', 4.0)
1675        domain.set_starttime(5)
1676
1677        ######################
1678        # Boundary conditions
1679        B = Transmissive_boundary(domain)
1680        domain.set_boundary( {'exterior': B})
1681
1682        # This call mangles the stage values.
1683        domain.distribute_to_vertices_and_edges()
1684        domain.set_quantity('stage', 1.0)
1685       
1686
1687
1688        domain.set_name('datatest' + str(time.time()))
1689        domain.format = 'sww'
1690        domain.smooth = True
1691        domain.reduction = mean
1692
1693        sww = get_dataobject(domain)
1694        sww.store_connectivity()
1695        sww.store_timestep(['stage', 'xmomentum', 'ymomentum','elevation'])
1696        domain.set_quantity('stage', 10.0) # This is automatically limited
1697        # so it will not be less than the elevation
1698        domain.time = 2.
1699        sww.store_timestep(['stage','elevation', 'xmomentum', 'ymomentum'])
1700
1701        # test the function
1702        points = [[5.0,1.],[0.5,2.]]
1703
1704        points_file = tempfile.mktemp(".csv")
1705#        points_file = 'test_point.csv'
1706        file_id = open(points_file,"w")
1707        file_id.write("name, easting, northing, elevation \n\
1708point1, 5.0, 1.0, 3.0\n\
1709point2, 0.5, 2.0, 9.0\n")
1710        file_id.close()
1711
1712       
1713        sww2csv_gauges(sww.filename, 
1714                            points_file,
1715                            verbose=False,
1716                            use_cache=False)
1717
1718#        point1_answers_array = [[0.0,1.0,-5.0,3.0,4.0], [2.0,10.0,-5.0,3.0,4.0]]
1719        point1_answers_array = [[5.0,1.0,6.0,-5.0,3.0,4.0], [7.0,10.0,15.0,-5.0,3.0,4.0]]
1720        point1_filename = 'gauge_point1.csv'
1721        point1_handle = file(point1_filename)
1722        point1_reader = reader(point1_handle)
1723        point1_reader.next()
1724
1725        line=[]
1726        for i,row in enumerate(point1_reader):
1727            #print 'i',i,'row',row
1728            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),float(row[4]),float(row[5])])
1729            #print 'assert line',line[i],'point1',point1_answers_array[i]
1730            assert numpy.allclose(line[i], point1_answers_array[i])
1731
1732        point2_answers_array = [[5.0,1.0,1.5,-0.5,3.0,4.0], [7.0,10.0,10.5,-0.5,3.0,4.0]]
1733        point2_filename = 'gauge_point2.csv' 
1734        point2_handle = file(point2_filename)
1735        point2_reader = reader(point2_handle)
1736        point2_reader.next()
1737                       
1738        line=[]
1739        for i,row in enumerate(point2_reader):
1740            #print 'i',i,'row',row
1741            line.append([float(row[0]),float(row[1]),float(row[2]),float(row[3]),float(row[4]),float(row[5])])
1742            #print 'assert line',line[i],'point1',point1_answers_array[i]
1743            assert numpy.allclose(line[i], point2_answers_array[i])
1744                         
1745        # clean up
1746        point1_handle.close()
1747        point2_handle.close()
1748        #print "sww.filename",sww.filename
1749        os.remove(sww.filename)
1750        os.remove(points_file)
1751        os.remove(point1_filename)
1752        os.remove(point2_filename)
1753
1754
1755    def test_greens_law(self):
1756
1757        from math import sqrt
1758       
1759        d1 = 80.0
1760        d2 = 20.0
1761        h1 = 1.0
1762        h2 = greens_law(d1,d2,h1)
1763
1764        assert h2==sqrt(2.0)
1765       
1766    def test_calc_bearings(self):
1767 
1768        from math import atan, degrees
1769        #Test East
1770        uh = 1
1771        vh = 1.e-15
1772        angle = calc_bearing(uh, vh)
1773        if 89 < angle < 91: v=1
1774        assert v==1
1775        #Test West
1776        uh = -1
1777        vh = 1.e-15
1778        angle = calc_bearing(uh, vh)
1779        if 269 < angle < 271: v=1
1780        assert v==1
1781        #Test North
1782        uh = 1.e-15
1783        vh = 1
1784        angle = calc_bearing(uh, vh)
1785        if -1 < angle < 1: v=1
1786        assert v==1
1787        #Test South
1788        uh = 1.e-15
1789        vh = -1
1790        angle = calc_bearing(uh, vh)
1791        if 179 < angle < 181: v=1
1792        assert v==1
1793        #Test South-East
1794        uh = 1
1795        vh = -1
1796        angle = calc_bearing(uh, vh)
1797        if 134 < angle < 136: v=1
1798        assert v==1
1799        #Test North-East
1800        uh = 1
1801        vh = 1
1802        angle = calc_bearing(uh, vh)
1803        if 44 < angle < 46: v=1
1804        assert v==1
1805        #Test South-West
1806        uh = -1
1807        vh = -1
1808        angle = calc_bearing(uh, vh)
1809        if 224 < angle < 226: v=1
1810        assert v==1
1811        #Test North-West
1812        uh = -1
1813        vh = 1
1814        angle = calc_bearing(uh, vh)
1815        if 314 < angle < 316: v=1
1816        assert v==1
1817
1818 
1819
1820
1821       
1822
1823#-------------------------------------------------------------
1824if __name__ == "__main__":
1825    suite = unittest.makeSuite(Test_Util,'test')
1826#    suite = unittest.makeSuite(Test_Util,'test_spatio_temporal_file_function_basic')
1827#    runner = unittest.TextTestRunner(verbosity=2)
1828    runner = unittest.TextTestRunner(verbosity=1)
1829    runner.run(suite)
1830
1831
1832
1833
Note: See TracBrowser for help on using the repository browser.