source: trunk/anuga_core/source/anuga/shallow_water/test_forcing_terms.py @ 8068

Last change on this file since 8068 was 8068, checked in by habili, 13 years ago

Fixed bug where starttime and duration in the evolve function did not produce correct results. Starttime is now set in the constructor of Domain and is 0 by default. time is is to starttime. Also a bug was fixed where duration did not correctly calculate the finaltime. Associated unit tests have also been fixed to reflect the change.

File size: 39.6 KB
Line 
1#!/usr/bin/env python
2
3import unittest, os
4import os.path
5from math import pi, sqrt
6import tempfile
7
8from anuga.config import g, epsilon, \
9                    netcdf_mode_r, netcdf_mode_w, netcdf_mode_a
10from anuga.geometry.polygon import is_inside_polygon
11from anuga.coordinate_transforms.geo_reference import Geo_reference
12from anuga.abstract_2d_finite_volumes.quantity import Quantity
13from anuga.geospatial_data.geospatial_data import Geospatial_data
14from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
15
16from anuga.utilities.system_tools import get_pathname_from_package
17from anuga.utilities.numerical_tools import ensure_numeric, mean
18
19from shallow_water_domain import Domain
20from boundaries import Reflective_boundary
21from forcing import Wind_stress, Inflow, Rainfall
22from anuga.file_conversion.file_conversion import timefile2netcdf
23
24import numpy as num
25
26# Get gateway to C implementation of flux function for direct testing
27from shallow_water_ext import flux_function_central as flux_function
28
29# Variable windfield implemented using functions
30def speed(t, x, y):
31    """Large speeds halfway between center and edges
32
33    Low speeds at center and edges
34    """
35
36    from math import exp, cos, pi
37
38    x = num.array(x)
39    y = num.array(y)
40
41    N = len(x)
42    s = 0*#New array
43
44    for k in range(N):
45        r = num.sqrt(x[k]**2 + y[k]**2)
46        factor = exp(-(r-0.15)**2)
47        s[k] = 4000 * factor * (cos(t*2*pi/150) + 2)
48
49    return s
50
51def angle(t, x, y):
52    """Rotating field
53    """
54    from math import atan, pi
55
56    x = num.array(x)
57    y = num.array(y)
58
59    N = len(x)
60    a = 0 * x    # New array
61
62    for k in range(N):
63        r = num.sqrt(x[k]**2 + y[k]**2)
64
65        angle = atan(y[k]/x[k])
66
67        if x[k] < 0:
68            angle += pi
69
70        # Take normal direction
71        angle -= pi/2
72
73        # Ensure positive radians
74        if angle < 0:
75            angle += 2*pi
76
77        a[k] = angle/pi*180
78
79    return a
80
81
82def scalar_func(t, x, y):
83    """Function that returns a scalar.
84
85    Used to test error message when numeric array is expected
86    """
87
88    return 17.7
89
90def scalar_func_list(t, x, y):
91    """Function that returns a scalar.
92
93    Used to test error message when numeric array is expected
94    """
95
96    return [17.7]
97
98class Test_forcing_terms(unittest.TestCase):
99    def setUp(self):
100        pass
101
102    def tearDown(self):
103        pass
104
105
106    def test_gravity(self):
107        #Assuming no friction
108
109        from anuga.config import g
110
111        a = [0.0, 0.0]
112        b = [0.0, 2.0]
113        c = [2.0, 0.0]
114        d = [0.0, 4.0]
115        e = [2.0, 2.0]
116        f = [4.0, 0.0]
117
118        points = [a, b, c, d, e, f]
119        #             bac,     bce,     ecf,     dbe
120        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
121
122        domain = Domain(points, vertices)
123
124        #Set up for a gradient of (3,0) at mid triangle (bce)
125        def slope(x, y):
126            return 3*x
127
128        h = 0.1
129        def stage(x, y):
130            return slope(x, y) + h
131
132        domain.set_quantity('elevation', slope)
133        domain.set_quantity('stage', stage)
134
135        for name in domain.conserved_quantities:
136            assert num.allclose(domain.quantities[name].explicit_update, 0)
137            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
138
139        domain.compute_forcing_terms()
140
141        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
142        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
143                            -g*h*3)
144        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
145
146    # FIXME: James these tests are failing - are they outdated?
147    def NOtest_manning_friction(self):
148        from anuga.config import g
149
150        a = [0.0, 0.0]
151        b = [0.0, 2.0]
152        c = [2.0, 0.0]
153        d = [0.0, 4.0]
154        e = [2.0, 2.0]
155        f = [4.0, 0.0]
156
157        points = [a, b, c, d, e, f]
158        #             bac,     bce,     ecf,     dbe
159        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
160
161        domain = Domain(points, vertices)
162
163        #Set up for a gradient of (3,0) at mid triangle (bce)
164        def slope(x, y):
165            return 3*x
166
167        h = 0.1
168        def stage(x, y):
169            return slope(x, y) + h
170
171        eta = 0.07
172        domain.set_quantity('elevation', slope)
173        domain.set_quantity('stage', stage)
174        domain.set_quantity('friction', eta)
175
176        for name in domain.conserved_quantities:
177            assert num.allclose(domain.quantities[name].explicit_update, 0)
178            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
179
180        domain.compute_forcing_terms()
181
182        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
183        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
184                            -g*h*3)
185        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
186
187        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
188        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
189                            0)
190        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
191                            0)
192
193        #Create some momentum for friction to work with
194        domain.set_quantity('xmomentum', 1)
195        S = -g*eta**2 / h**(7.0/3)
196
197        domain.compute_forcing_terms()
198        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
199        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
200                            S)
201        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
202                            0)
203
204        #A more complex example
205        domain.quantities['stage'].semi_implicit_update[:] = 0.0
206        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
207        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
208
209        domain.set_quantity('xmomentum', 3)
210        domain.set_quantity('ymomentum', 4)
211
212        S = -g*eta**2*5 / h**(7.0/3)
213
214        domain.compute_forcing_terms()
215
216        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
217        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
218                            S)
219        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
220                            S)
221
222
223
224    def NOtest_manning_friction_old(self):
225        from anuga.config import g
226
227        a = [0.0, 0.0]
228        b = [0.0, 2.0]
229        c = [2.0, 0.0]
230        d = [0.0, 4.0]
231        e = [2.0, 2.0]
232        f = [4.0, 0.0]
233
234        points = [a, b, c, d, e, f]
235        #             bac,     bce,     ecf,     dbe
236        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
237
238        domain = Domain(points, vertices)
239
240        #Set up for a gradient of (3,0) at mid triangle (bce)
241        def slope(x, y):
242            return 3*x
243
244        h = 0.1
245        def stage(x, y):
246            return slope(x, y) + h
247
248        eta = 0.07
249        domain.set_quantity('elevation', slope)
250        domain.set_quantity('stage', stage)
251        domain.set_quantity('friction', eta)
252
253        for name in domain.conserved_quantities:
254            assert num.allclose(domain.quantities[name].explicit_update, 0)
255            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
256
257        domain.compute_forcing_terms()
258
259        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
260        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
261                            -g*h*3)
262        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
263
264        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
265        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
266                            0)
267        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
268                            0)
269
270        #Create some momentum for friction to work with
271        domain.set_quantity('xmomentum', 1)
272        S = -g*eta**2 / h**(7.0/3)
273
274        domain.compute_forcing_terms()
275        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
276        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
277                            S)
278        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
279                            0)
280
281        #A more complex example
282        domain.quantities['stage'].semi_implicit_update[:] = 0.0
283        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
284        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
285
286        domain.set_quantity('xmomentum', 3)
287        domain.set_quantity('ymomentum', 4)
288        # sqrt(3^2 +4^2) = 5
289
290        S = -g*eta**2 / h**(7.0/3)  * 5
291
292        domain.compute_forcing_terms()
293
294        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
295        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
296                            S)
297        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
298                            S)
299
300
301    def NOtest_manning_friction_new(self):
302        from anuga.config import g
303
304        a = [0.0, 0.0]
305        b = [0.0, 2.0]
306        c = [2.0, 0.0]
307        d = [0.0, 4.0]
308        e = [2.0, 2.0]
309        f = [4.0, 0.0]
310
311        points = [a, b, c, d, e, f]
312        #             bac,     bce,     ecf,     dbe
313        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
314
315        domain = Domain(points, vertices)
316
317        # Use the new function which takes into account the extra
318        # wetted area due to slope of bed
319        domain.set_new_mannings_function(True)
320       
321        #Set up for a gradient of (3,0) at mid triangle (bce)
322        def slope(x, y):
323            return 3*x
324
325        h = 0.1
326        def stage(x, y):
327            return slope(x, y) + h
328
329        eta = 0.07
330        domain.set_quantity('elevation', slope)
331        domain.set_quantity('stage', stage)
332        domain.set_quantity('friction', eta)
333
334        for name in domain.conserved_quantities:
335            assert num.allclose(domain.quantities[name].explicit_update, 0)
336            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
337
338        domain.compute_forcing_terms()
339
340        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
341        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
342                            -g*h*3)
343        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
344
345        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
346        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
347                            0)
348        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
349                            0)
350
351        #Create some momentum for friction to work with
352        domain.set_quantity('xmomentum', 1)
353        S = -g*eta**2 / h**(7.0/3) * sqrt(10)
354
355        domain.compute_forcing_terms()
356        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
357        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
358                            S)
359        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
360                            0)
361
362        #A more complex example
363        domain.quantities['stage'].semi_implicit_update[:] = 0.0
364        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
365        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
366
367        domain.set_quantity('xmomentum', 3)
368        domain.set_quantity('ymomentum', 4)
369
370        S = -g*eta**2*5 / h**(7.0/3) * sqrt(10.0)
371
372        domain.compute_forcing_terms()
373
374        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
375        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
376                            S)
377        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
378                            S)
379       
380    def test_constant_wind_stress(self):
381        from anuga.config import rho_a, rho_w, eta_w
382        from math import pi, cos, sin
383
384        a = [0.0, 0.0]
385        b = [0.0, 2.0]
386        c = [2.0, 0.0]
387        d = [0.0, 4.0]
388        e = [2.0, 2.0]
389        f = [4.0, 0.0]
390
391        points = [a, b, c, d, e, f]
392        #             bac,     bce,     ecf,     dbe
393        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
394
395        domain = Domain(points, vertices)
396
397        #Flat surface with 1m of water
398        domain.set_quantity('elevation', 0)
399        domain.set_quantity('stage', 1.0)
400        domain.set_quantity('friction', 0)
401
402        Br = Reflective_boundary(domain)
403        domain.set_boundary({'exterior': Br})
404
405        #Setup only one forcing term, constant wind stress
406        s = 100
407        phi = 135
408        domain.forcing_terms = []
409        domain.forcing_terms.append(Wind_stress(s, phi))
410
411        domain.compute_forcing_terms()
412
413        const = eta_w*rho_a / rho_w
414
415        #Convert to radians
416        phi = phi*pi / 180
417
418        #Compute velocity vector (u, v)
419        u = s*cos(phi)
420        v = s*sin(phi)
421
422        #Compute wind stress
423        S = const * num.sqrt(u**2 + v**2)
424
425        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
426        assert num.allclose(domain.quantities['xmomentum'].explicit_update, S*u)
427        assert num.allclose(domain.quantities['ymomentum'].explicit_update, S*v)
428
429    def test_variable_wind_stress(self):
430        from anuga.config import rho_a, rho_w, eta_w
431        from math import pi, cos, sin
432
433        a = [0.0, 0.0]
434        b = [0.0, 2.0]
435        c = [2.0, 0.0]
436        d = [0.0, 4.0]
437        e = [2.0, 2.0]
438        f = [4.0, 0.0]
439
440        points = [a, b, c, d, e, f]
441        #             bac,     bce,     ecf,     dbe
442        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
443
444        domain = Domain(points, vertices)
445
446        #Flat surface with 1m of water
447        domain.set_quantity('elevation', 0)
448        domain.set_quantity('stage', 1.0)
449        domain.set_quantity('friction', 0)
450
451        Br = Reflective_boundary(domain)
452        domain.set_boundary({'exterior': Br})
453
454        domain.time = 5.54    # Take a random time (not zero)
455
456        #Setup only one forcing term, constant wind stress
457        s = 100
458        phi = 135
459        domain.forcing_terms = []
460        domain.forcing_terms.append(Wind_stress(s=speed, phi=angle))
461
462        domain.compute_forcing_terms()
463
464        #Compute reference solution
465        const = eta_w*rho_a / rho_w
466
467        N = len(domain)    # number_of_triangles
468
469        xc = domain.get_centroid_coordinates()
470        t = domain.time
471
472        x = xc[:,0]
473        y = xc[:,1]
474        s_vec = speed(t,x,y)
475        phi_vec = angle(t,x,y)
476
477        for k in range(N):
478            # Convert to radians
479            phi = phi_vec[k]*pi / 180
480            s = s_vec[k]
481
482            # Compute velocity vector (u, v)
483            u = s*cos(phi)
484            v = s*sin(phi)
485
486            # Compute wind stress
487            S = const * num.sqrt(u**2 + v**2)
488
489            assert num.allclose(domain.quantities['stage'].explicit_update[k],
490                                0)
491            assert num.allclose(domain.quantities['xmomentum'].\
492                                     explicit_update[k],
493                                S*u)
494            assert num.allclose(domain.quantities['ymomentum'].\
495                                     explicit_update[k],
496                                S*v)
497
498    def test_windfield_from_file(self):
499        import time
500        from anuga.config import rho_a, rho_w, eta_w
501        from math import pi, cos, sin
502        from anuga.config import time_format
503        from anuga.abstract_2d_finite_volumes.util import file_function
504
505        a = [0.0, 0.0]
506        b = [0.0, 2.0]
507        c = [2.0, 0.0]
508        d = [0.0, 4.0]
509        e = [2.0, 2.0]
510        f = [4.0, 0.0]
511
512        points = [a, b, c, d, e, f]
513        #             bac,     bce,     ecf,     dbe
514        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
515
516        domain = Domain(points, vertices)
517
518        # Flat surface with 1m of water
519        domain.set_quantity('elevation', 0)
520        domain.set_quantity('stage', 1.0)
521        domain.set_quantity('friction', 0)
522
523        Br = Reflective_boundary(domain)
524        domain.set_boundary({'exterior': Br})
525
526        domain.time = 7    # Take a time that is represented in file (not zero)
527
528        # Write wind stress file (ensure that domain.time is covered)
529        # Take x=1 and y=0
530        filename = 'test_windstress_from_file'
531        start = time.mktime(time.strptime('2000', '%Y'))
532        fid = open(filename + '.txt', 'w')
533        dt = 1    # One second interval
534        t = 0.0
535        while t <= 10.0:
536            t_string = time.strftime(time_format, time.gmtime(t+start))
537
538            fid.write('%s, %f %f\n' %
539                      (t_string, speed(t,[1],[0])[0], angle(t,[1],[0])[0]))
540            t += dt
541
542        fid.close()
543
544        # Convert ASCII file to NetCDF (Which is what we really like!)
545        timefile2netcdf(filename + '.txt')
546        os.remove(filename + '.txt')
547
548        # Setup wind stress
549        F = file_function(filename + '.tms',
550                          quantities=['Attribute0', 'Attribute1'])
551        os.remove(filename + '.tms')
552
553        W = Wind_stress(F)
554
555        domain.forcing_terms = []
556        domain.forcing_terms.append(W)
557
558        domain.compute_forcing_terms()
559
560        # Compute reference solution
561        const = eta_w*rho_a / rho_w
562
563        N = len(domain)    # number_of_triangles
564
565        t = domain.time
566
567        s = speed(t, [1], [0])[0]
568        phi = angle(t, [1], [0])[0]
569
570        # Convert to radians
571        phi = phi*pi / 180
572
573        # Compute velocity vector (u, v)
574        u = s*cos(phi)
575        v = s*sin(phi)
576
577        # Compute wind stress
578        S = const * num.sqrt(u**2 + v**2)
579
580        for k in range(N):
581            assert num.allclose(domain.quantities['stage'].explicit_update[k],
582                                0)
583            assert num.allclose(domain.quantities['xmomentum'].\
584                                    explicit_update[k],
585                                S*u)
586            assert num.allclose(domain.quantities['ymomentum'].\
587                                    explicit_update[k],
588                                S*v)
589
590    def test_windfield_from_file_seconds(self):
591        import time
592        from anuga.config import rho_a, rho_w, eta_w
593        from math import pi, cos, sin
594        from anuga.config import time_format
595        from anuga.abstract_2d_finite_volumes.util import file_function
596
597        a = [0.0, 0.0]
598        b = [0.0, 2.0]
599        c = [2.0, 0.0]
600        d = [0.0, 4.0]
601        e = [2.0, 2.0]
602        f = [4.0, 0.0]
603
604        points = [a, b, c, d, e, f]
605        #             bac,     bce,     ecf,     dbe
606        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
607
608        domain = Domain(points, vertices)
609
610        # Flat surface with 1m of water
611        domain.set_quantity('elevation', 0)
612        domain.set_quantity('stage', 1.0)
613        domain.set_quantity('friction', 0)
614
615        Br = Reflective_boundary(domain)
616        domain.set_boundary({'exterior': Br})
617
618        domain.time = 7    # Take a time that is represented in file (not zero)
619
620        # Write wind stress file (ensure that domain.time is covered)
621        # Take x=1 and y=0
622        filename = 'test_windstress_from_file'
623        start = time.mktime(time.strptime('2000', '%Y'))
624        fid = open(filename + '.txt', 'w')
625        dt = 0.5    # Half second interval
626        t = 0.0
627        while t <= 10.0:
628            fid.write('%s, %f %f\n'
629                      % (str(t), speed(t, [1], [0])[0], angle(t, [1], [0])[0]))
630            t += dt
631
632        fid.close()
633
634        # Convert ASCII file to NetCDF (Which is what we really like!)
635        timefile2netcdf(filename + '.txt', time_as_seconds=True)
636        os.remove(filename + '.txt')
637
638        # Setup wind stress
639        F = file_function(filename + '.tms',
640                          quantities=['Attribute0', 'Attribute1'])
641        os.remove(filename + '.tms')
642
643        W = Wind_stress(F)
644
645        domain.forcing_terms = []
646        domain.forcing_terms.append(W)
647
648        domain.compute_forcing_terms()
649
650        # Compute reference solution
651        const = eta_w*rho_a / rho_w
652
653        N = len(domain)    # number_of_triangles
654
655        t = domain.time
656
657        s = speed(t, [1], [0])[0]
658        phi = angle(t, [1], [0])[0]
659
660        # Convert to radians
661        phi = phi*pi / 180
662
663        # Compute velocity vector (u, v)
664        u = s*cos(phi)
665        v = s*sin(phi)
666
667        # Compute wind stress
668        S = const * num.sqrt(u**2 + v**2)
669
670        for k in range(N):
671            assert num.allclose(domain.quantities['stage'].explicit_update[k],
672                                0)
673            assert num.allclose(domain.quantities['xmomentum'].\
674                                    explicit_update[k],
675                                S*u)
676            assert num.allclose(domain.quantities['ymomentum'].\
677                                    explicit_update[k],
678                                S*v)
679
680    def test_wind_stress_error_condition(self):
681        """Test that windstress reacts properly when forcing functions
682        are wrong - e.g. returns a scalar
683        """
684
685        from math import pi, cos, sin
686        from anuga.config import rho_a, rho_w, eta_w
687
688        a = [0.0, 0.0]
689        b = [0.0, 2.0]
690        c = [2.0, 0.0]
691        d = [0.0, 4.0]
692        e = [2.0, 2.0]
693        f = [4.0, 0.0]
694
695        points = [a, b, c, d, e, f]
696        #             bac,     bce,     ecf,     dbe
697        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
698
699        domain = Domain(points, vertices)
700
701        # Flat surface with 1m of water
702        domain.set_quantity('elevation', 0)
703        domain.set_quantity('stage', 1.0)
704        domain.set_quantity('friction', 0)
705
706        Br = Reflective_boundary(domain)
707        domain.set_boundary({'exterior': Br})
708
709        domain.time = 5.54    # Take a random time (not zero)
710
711        # Setup only one forcing term, bad func
712        domain.forcing_terms = []
713
714        try:
715            domain.forcing_terms.append(Wind_stress(s=scalar_func_list,
716                                                    phi=angle))
717        except AssertionError:
718            pass
719        else:
720            msg = 'Should have raised exception'
721            raise Exception, msg
722
723        try:
724            domain.forcing_terms.append(Wind_stress(s=speed, phi=scalar_func))
725        except Exception:
726            pass
727        else:
728            msg = 'Should have raised exception'
729            raise Exception, msg
730
731        try:
732            domain.forcing_terms.append(Wind_stress(s=speed, phi='xx'))
733        except:
734            pass
735        else:
736            msg = 'Should have raised exception'
737            raise Exception, msg
738
739
740    def test_rainfall(self):
741        from math import pi, cos, sin
742
743        a = [0.0, 0.0]
744        b = [0.0, 2.0]
745        c = [2.0, 0.0]
746        d = [0.0, 4.0]
747        e = [2.0, 2.0]
748        f = [4.0, 0.0]
749
750        points = [a, b, c, d, e, f]
751        #             bac,     bce,     ecf,     dbe
752        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
753
754        domain = Domain(points, vertices)
755
756        # Flat surface with 1m of water
757        domain.set_quantity('elevation', 0)
758        domain.set_quantity('stage', 1.0)
759        domain.set_quantity('friction', 0)
760
761        Br = Reflective_boundary(domain)
762        domain.set_boundary({'exterior': Br})
763
764        # Setup only one forcing term, constant rainfall
765        domain.forcing_terms = []
766        domain.forcing_terms.append(Rainfall(domain, rate=2.0))
767
768        domain.compute_forcing_terms()
769        assert num.allclose(domain.quantities['stage'].explicit_update,
770                            2.0/1000)
771
772    def test_rainfall_restricted_by_polygon(self):
773        from math import pi, cos, sin
774
775        a = [0.0, 0.0]
776        b = [0.0, 2.0]
777        c = [2.0, 0.0]
778        d = [0.0, 4.0]
779        e = [2.0, 2.0]
780        f = [4.0, 0.0]
781
782        points = [a, b, c, d, e, f]
783        #             bac,     bce,     ecf,     dbe
784        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
785
786        domain = Domain(points, vertices)
787
788        # Flat surface with 1m of water
789        domain.set_quantity('elevation', 0)
790        domain.set_quantity('stage', 1.0)
791        domain.set_quantity('friction', 0)
792
793        Br = Reflective_boundary(domain)
794        domain.set_boundary({'exterior': Br})
795
796        # Setup only one forcing term, constant rainfall
797        # restricted to a polygon enclosing triangle #1 (bce)
798        domain.forcing_terms = []
799        R = Rainfall(domain, rate=2.0, polygon=[[1,1], [2,1], [2,2], [1,2]])
800
801        assert num.allclose(R.exchange_area, 2)
802
803        domain.forcing_terms.append(R)
804
805        domain.compute_forcing_terms()
806
807        assert num.allclose(domain.quantities['stage'].explicit_update[1],
808                            2.0/1000)
809        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
810        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
811
812    def test_time_dependent_rainfall_restricted_by_polygon(self):
813        a = [0.0, 0.0]
814        b = [0.0, 2.0]
815        c = [2.0, 0.0]
816        d = [0.0, 4.0]
817        e = [2.0, 2.0]
818        f = [4.0, 0.0]
819
820        points = [a, b, c, d, e, f]
821        #             bac,     bce,     ecf,     dbe
822        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
823
824        domain = Domain(points, vertices)
825
826        # Flat surface with 1m of water
827        domain.set_quantity('elevation', 0)
828        domain.set_quantity('stage', 1.0)
829        domain.set_quantity('friction', 0)
830
831        Br = Reflective_boundary(domain)
832        domain.set_boundary({'exterior': Br})
833
834        # Setup only one forcing term, time dependent rainfall
835        # restricted to a polygon enclosing triangle #1 (bce)
836        domain.forcing_terms = []
837        R = Rainfall(domain,
838                     rate=lambda t: 3*t + 7,
839                     polygon = [[1,1], [2,1], [2,2], [1,2]])
840
841        assert num.allclose(R.exchange_area, 2)
842       
843        domain.forcing_terms.append(R)
844
845        domain.time = 10.
846
847        domain.compute_forcing_terms()
848
849        assert num.allclose(domain.quantities['stage'].explicit_update[1],
850                            (3*domain.time + 7)/1000)
851        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
852        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
853
854    def test_time_dependent_rainfall_using_starttime(self):
855        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
856
857        a = [0.0, 0.0]
858        b = [0.0, 2.0]
859        c = [2.0, 0.0]
860        d = [0.0, 4.0]
861        e = [2.0, 2.0]
862        f = [4.0, 0.0]
863
864        points = [a, b, c, d, e, f]
865        #             bac,     bce,     ecf,     dbe
866        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
867
868        domain = Domain(points, vertices)
869
870        # Flat surface with 1m of water
871        domain.set_quantity('elevation', 0)
872        domain.set_quantity('stage', 1.0)
873        domain.set_quantity('friction', 0)
874
875        Br = Reflective_boundary(domain)
876        domain.set_boundary({'exterior': Br})
877
878        # Setup only one forcing term, time dependent rainfall
879        # restricted to a polygon enclosing triangle #1 (bce)
880        domain.forcing_terms = []
881        R = Rainfall(domain,
882                     rate=lambda t: 3*t + 7,
883                     polygon=rainfall_poly)                     
884
885        assert num.allclose(R.exchange_area, 2)
886       
887        domain.forcing_terms.append(R)
888
889        # This will test that time is set to starttime in set_starttime
890        domain.set_starttime(5.0)
891
892        domain.compute_forcing_terms()
893
894        assert num.allclose(domain.quantities['stage'].explicit_update[1],
895                            (3*domain.get_time() + 7)/1000)
896        assert num.allclose(domain.quantities['stage'].explicit_update[1],
897                            (3*domain.get_starttime() + 7)/1000)
898
899        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
900        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
901
902    def test_time_dependent_rainfall_using_georef(self):
903        """test_time_dependent_rainfall_using_georef
904
905        This will also test the General forcing term using georef
906        """
907
908        # Mesh in zone 56 (absolute coords)
909        x0 = 314036.58727982
910        y0 = 6224951.2960092
911
912        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
913        rainfall_poly += [x0, y0]
914
915        a = [0.0, 0.0]
916        b = [0.0, 2.0]
917        c = [2.0, 0.0]
918        d = [0.0, 4.0]
919        e = [2.0, 2.0]
920        f = [4.0, 0.0]
921
922        points = [a, b, c, d, e, f]
923        #             bac,     bce,     ecf,     dbe
924        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
925
926        domain = Domain(points, vertices,
927                        geo_reference=Geo_reference(56, x0, y0))
928
929        # Flat surface with 1m of water
930        domain.set_quantity('elevation', 0)
931        domain.set_quantity('stage', 1.0)
932        domain.set_quantity('friction', 0)
933
934        Br = Reflective_boundary(domain)
935        domain.set_boundary({'exterior': Br})
936
937        # Setup only one forcing term, time dependent rainfall
938        # restricted to a polygon enclosing triangle #1 (bce)
939        domain.forcing_terms = []
940        R = Rainfall(domain,
941                     rate=lambda t: 3*t + 7,
942                     polygon=rainfall_poly)
943
944        assert num.allclose(R.exchange_area, 2)
945       
946        domain.forcing_terms.append(R)
947
948        # This will test that time is set to starttime in set_starttime
949        domain.set_starttime(5.0)
950
951        domain.compute_forcing_terms()
952
953        assert num.allclose(domain.quantities['stage'].explicit_update[1],
954                            (3*domain.get_time() + 7)/1000)
955        assert num.allclose(domain.quantities['stage'].explicit_update[1],
956                            (3*domain.get_starttime() + 7)/1000)
957
958
959        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
960        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
961
962    def test_time_dependent_rainfall_restricted_by_polygon_with_default(self):
963        """
964        Test that default rainfall can be used when given rate runs out of data.
965        """
966
967        a = [0.0, 0.0]
968        b = [0.0, 2.0]
969        c = [2.0, 0.0]
970        d = [0.0, 4.0]
971        e = [2.0, 2.0]
972        f = [4.0, 0.0]
973
974        points = [a, b, c, d, e, f]
975        #             bac,     bce,     ecf,     dbe
976        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
977
978        domain = Domain(points, vertices)
979
980        # Flat surface with 1m of water
981        domain.set_quantity('elevation', 0)
982        domain.set_quantity('stage', 1.0)
983        domain.set_quantity('friction', 0)
984
985        Br = Reflective_boundary(domain)
986        domain.set_boundary({'exterior': Br})
987
988        # Setup only one forcing term, time dependent rainfall
989        # that expires at t==20
990        from anuga.fit_interpolate.interpolate import Modeltime_too_late
991
992        def main_rate(t):
993            if t > 20:
994                msg = 'Model time exceeded.'
995                raise Modeltime_too_late, msg
996            else:
997                return 3*t + 7
998
999        domain.forcing_terms = []
1000        R = Rainfall(domain,
1001                     rate=main_rate,
1002                     polygon = [[1,1], [2,1], [2,2], [1,2]],
1003                     default_rate=5.0)
1004
1005        assert num.allclose(R.exchange_area, 2)
1006       
1007        domain.forcing_terms.append(R)
1008
1009        domain.time = 10.
1010
1011        domain.compute_forcing_terms()
1012
1013        assert num.allclose(domain.quantities['stage'].explicit_update[1],
1014                            (3*domain.time+7)/1000)
1015        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
1016        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
1017
1018        domain.time = 100.
1019        domain.quantities['stage'].explicit_update[:] = 0.0     # Reset
1020        domain.compute_forcing_terms()
1021
1022        assert num.allclose(domain.quantities['stage'].explicit_update[1],
1023                            5.0/1000) # Default value
1024        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
1025        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
1026
1027    def test_rainfall_forcing_with_evolve(self):
1028        """test_rainfall_forcing_with_evolve
1029
1030        Test how forcing terms are called within evolve
1031        """
1032
1033        # FIXME(Ole): This test is just to experiment
1034
1035        a = [0.0, 0.0]
1036        b = [0.0, 2.0]
1037        c = [2.0, 0.0]
1038        d = [0.0, 4.0]
1039        e = [2.0, 2.0]
1040        f = [4.0, 0.0]
1041
1042        points = [a, b, c, d, e, f]
1043        #             bac,     bce,     ecf,     dbe
1044        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1045
1046        domain = Domain(points, vertices)
1047
1048        # Flat surface with 1m of water
1049        domain.set_quantity('elevation', 0)
1050        domain.set_quantity('stage', 1.0)
1051        domain.set_quantity('friction', 0)
1052
1053        Br = Reflective_boundary(domain)
1054        domain.set_boundary({'exterior': Br})
1055
1056        # Setup only one forcing term, time dependent rainfall
1057        # that expires at t==20
1058        from anuga.fit_interpolate.interpolate import Modeltime_too_late
1059
1060        def main_rate(t):
1061            if t > 20:
1062                msg = 'Model time exceeded.'
1063                raise Modeltime_too_late, msg
1064            else:
1065                return 3*t + 7
1066
1067        domain.forcing_terms = []
1068        R = Rainfall(domain,
1069                     rate=main_rate,
1070                     polygon=[[1,1], [2,1], [2,2], [1,2]],
1071                     default_rate=5.0)
1072
1073        assert num.allclose(R.exchange_area, 2)
1074       
1075        domain.forcing_terms.append(R)
1076
1077        for t in domain.evolve(yieldstep=1, finaltime=25):
1078            pass
1079            #FIXME(Ole):  A test here is hard because explicit_update also
1080            # receives updates from the flux calculation.
1081
1082
1083    def test_rainfall_forcing_with_evolve_1(self):
1084        """test_rainfall_forcing_with_evolve
1085
1086        Test how forcing terms are called within evolve.
1087        This test checks that proper exception is thrown when no default_rate is set
1088        """
1089
1090
1091        a = [0.0, 0.0]
1092        b = [0.0, 2.0]
1093        c = [2.0, 0.0]
1094        d = [0.0, 4.0]
1095        e = [2.0, 2.0]
1096        f = [4.0, 0.0]
1097
1098        points = [a, b, c, d, e, f]
1099        #             bac,     bce,     ecf,     dbe
1100        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1101
1102        domain = Domain(points, vertices)
1103
1104        # Flat surface with 1m of water
1105        domain.set_quantity('elevation', 0)
1106        domain.set_quantity('stage', 1.0)
1107        domain.set_quantity('friction', 0)
1108
1109        Br = Reflective_boundary(domain)
1110        domain.set_boundary({'exterior': Br})
1111
1112        # Setup only one forcing term, time dependent rainfall
1113        # that expires at t==20
1114        from anuga.fit_interpolate.interpolate import Modeltime_too_late
1115
1116        def main_rate(t):
1117            if t > 20:
1118                msg = 'Model time exceeded.'
1119                raise Modeltime_too_late, msg
1120            else:
1121                return 3*t + 7
1122
1123        domain.forcing_terms = []
1124        R = Rainfall(domain,
1125                     rate=main_rate,
1126                     polygon=[[1,1], [2,1], [2,2], [1,2]])
1127
1128
1129        assert num.allclose(R.exchange_area, 2)
1130       
1131        domain.forcing_terms.append(R)
1132        #for t in domain.evolve(yieldstep=1, finaltime=25):
1133        #    pass
1134               
1135        try:
1136            for t in domain.evolve(yieldstep=1, finaltime=25):
1137                pass
1138        except Modeltime_too_late, e:
1139            # Test that error message is as expected
1140            assert 'can specify keyword argument default_rate in the forcing function' in str(e)
1141        else:
1142            raise Exception, 'Should have raised exception'
1143
1144           
1145           
1146    def test_inflow_using_circle(self):
1147        from math import pi, cos, sin
1148
1149        a = [0.0, 0.0]
1150        b = [0.0, 2.0]
1151        c = [2.0, 0.0]
1152        d = [0.0, 4.0]
1153        e = [2.0, 2.0]
1154        f = [4.0, 0.0]
1155
1156        points = [a, b, c, d, e, f]
1157        #             bac,     bce,     ecf,     dbe
1158        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1159
1160        domain = Domain(points, vertices)
1161
1162        # Flat surface with 1m of water
1163        domain.set_quantity('elevation', 0)
1164        domain.set_quantity('stage', 1.0)
1165        domain.set_quantity('friction', 0)
1166
1167        Br = Reflective_boundary(domain)
1168        domain.set_boundary({'exterior': Br})
1169
1170        # Setup only one forcing term, constant inflow of 2 m^3/s
1171        # on a circle affecting triangles #0 and #1 (bac and bce)
1172        domain.forcing_terms = []
1173       
1174        I = Inflow(domain, rate=2.0, center=(1,1), radius=1)
1175        domain.forcing_terms.append(I)       
1176        domain.compute_forcing_terms()
1177
1178       
1179        A = I.exchange_area
1180        assert num.allclose(A, 4) # Two triangles       
1181       
1182        assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A)
1183        assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A)
1184        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)       
1185
1186
1187    def test_inflow_using_circle_function(self):
1188        from math import pi, cos, sin
1189
1190        a = [0.0, 0.0]
1191        b = [0.0, 2.0]
1192        c = [2.0, 0.0]
1193        d = [0.0, 4.0]
1194        e = [2.0, 2.0]
1195        f = [4.0, 0.0]
1196
1197        points = [a, b, c, d, e, f]
1198        #             bac,     bce,     ecf,     dbe
1199        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1200
1201        domain = Domain(points, vertices)
1202
1203        # Flat surface with 1m of water
1204        domain.set_quantity('elevation', 0)
1205        domain.set_quantity('stage', 1.0)
1206        domain.set_quantity('friction', 0)
1207
1208        Br = Reflective_boundary(domain)
1209        domain.set_boundary({'exterior': Br})
1210
1211        # Setup only one forcing term, time dependent inflow of 2 m^3/s
1212        # on a circle affecting triangles #0 and #1 (bac and bce)
1213        domain.forcing_terms = []
1214        I = Inflow(domain, rate=lambda t: 2., center=(1,1), radius=1)
1215        domain.forcing_terms.append(I)
1216       
1217        domain.compute_forcing_terms()       
1218
1219        A = I.exchange_area
1220        assert num.allclose(A, 4) # Two triangles
1221       
1222        assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A)
1223        assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A)
1224        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)       
1225       
1226
1227
1228
1229    def test_inflow_catch_too_few_triangles(self):
1230        """
1231        Test that exception is thrown if no triangles are covered
1232        by the inflow area
1233        """
1234
1235        from math import pi, cos, sin
1236
1237        a = [0.0, 0.0]
1238        b = [0.0, 2.0]
1239        c = [2.0, 0.0]
1240        d = [0.0, 4.0]
1241        e = [2.0, 2.0]
1242        f = [4.0, 0.0]
1243
1244        points = [a, b, c, d, e, f]
1245        #             bac,     bce,     ecf,     dbe
1246        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1247
1248        domain = Domain(points, vertices)
1249
1250        # Flat surface with 1m of water
1251        domain.set_quantity('elevation', 0)
1252        domain.set_quantity('stage', 1.0)
1253        domain.set_quantity('friction', 0)
1254
1255        Br = Reflective_boundary(domain)
1256        domain.set_boundary({'exterior': Br})
1257
1258        # Setup only one forcing term, constant inflow of 2 m^3/s
1259        # on a circle affecting triangles #0 and #1 (bac and bce)
1260        try:
1261            Inflow(domain, rate=2.0, center=(1,1.1), radius=0.01)
1262        except:
1263            pass
1264        else:
1265            msg = 'Should have raised exception'
1266            raise Exception, msg
1267
1268       
1269
1270#################################################################################
1271
1272if __name__ == "__main__":
1273    #suite = unittest.makeSuite(Test_forcing_terms, 'test_volume_conservation_rain')
1274    #FIXME: James - these tests seem to be invalid. Please investigate
1275    suite = unittest.makeSuite(Test_forcing_terms, 'test')
1276    runner = unittest.TextTestRunner(verbosity=1)
1277    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.