source: anuga_core/source/anuga/shallow_water/test_forcing.py @ 7735

Last change on this file since 7735 was 7735, checked in by hudson, 14 years ago

Split up some of the huge modules in shallow_water, fixed most of the unit test dependencies.

File size: 25.7 KB
Line 
1"""  Test environmental forcing - rain, wind, etc.
2"""
3
4import unittest, os
5from anuga.shallow_water import Domain
6from boundaries import Reflective_boundary
7from anuga.coordinate_transforms.geo_reference import Geo_reference
8from forcing import *
9
10import numpy as num
11
12
13def scalar_func_list(t, x, y):
14    """Function that returns a scalar.
15
16    Used to test error message when numeric array is expected
17    """
18
19    return [17.7]
20
21
22def speed(t, x, y):
23    """
24    Variable windfield implemented using functions   
25    Large speeds halfway between center and edges
26
27    Low speeds at center and edges
28    """
29
30    from math import exp, cos, pi
31
32    x = num.array(x)
33    y = num.array(y)
34
35    N = len(x)
36    s = 0*#New array
37
38    for k in range(N):
39        r = num.sqrt(x[k]**2 + y[k]**2)
40        factor = exp(-(r-0.15)**2)
41        s[k] = 4000 * factor * (cos(t*2*pi/150) + 2)
42
43    return s
44
45
46def angle(t, x, y):
47    """Rotating field
48    """
49    from math import atan, pi
50
51    x = num.array(x)
52    y = num.array(y)
53
54    N = len(x)
55    a = 0 * x    # New array
56
57    for k in range(N):
58        r = num.sqrt(x[k]**2 + y[k]**2)
59
60        angle = atan(y[k]/x[k])
61
62        if x[k] < 0:
63            angle += pi
64
65        # Take normal direction
66        angle -= pi/2
67
68        # Ensure positive radians
69        if angle < 0:
70            angle += 2*pi
71
72        a[k] = angle/pi*180
73
74    return a
75
76   
77
78class Test_Forcing(unittest.TestCase):
79    def setUp(self):
80        pass
81
82    def tearDown(self):
83        pass
84       
85    def test_constant_wind_stress(self):
86        from anuga.config import rho_a, rho_w, eta_w
87        from math import pi, cos, sin
88
89        a = [0.0, 0.0]
90        b = [0.0, 2.0]
91        c = [2.0, 0.0]
92        d = [0.0, 4.0]
93        e = [2.0, 2.0]
94        f = [4.0, 0.0]
95
96        points = [a, b, c, d, e, f]
97        #             bac,     bce,     ecf,     dbe
98        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
99
100        domain = Domain(points, vertices)
101
102        #Flat surface with 1m of water
103        domain.set_quantity('elevation', 0)
104        domain.set_quantity('stage', 1.0)
105        domain.set_quantity('friction', 0)
106
107        Br = Reflective_boundary(domain)
108        domain.set_boundary({'exterior': Br})
109
110        #Setup only one forcing term, constant wind stress
111        s = 100
112        phi = 135
113        domain.forcing_terms = []
114        domain.forcing_terms.append(Wind_stress(s, phi))
115
116        domain.compute_forcing_terms()
117
118        const = eta_w*rho_a / rho_w
119
120        #Convert to radians
121        phi = phi*pi / 180
122
123        #Compute velocity vector (u, v)
124        u = s*cos(phi)
125        v = s*sin(phi)
126
127        #Compute wind stress
128        S = const * num.sqrt(u**2 + v**2)
129
130        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
131        assert num.allclose(domain.quantities['xmomentum'].explicit_update, S*u)
132        assert num.allclose(domain.quantities['ymomentum'].explicit_update, S*v)
133
134    def test_variable_wind_stress(self):
135        from anuga.config import rho_a, rho_w, eta_w
136        from math import pi, cos, sin
137
138        a = [0.0, 0.0]
139        b = [0.0, 2.0]
140        c = [2.0, 0.0]
141        d = [0.0, 4.0]
142        e = [2.0, 2.0]
143        f = [4.0, 0.0]
144
145        points = [a, b, c, d, e, f]
146        #             bac,     bce,     ecf,     dbe
147        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
148
149        domain = Domain(points, vertices)
150
151        #Flat surface with 1m of water
152        domain.set_quantity('elevation', 0)
153        domain.set_quantity('stage', 1.0)
154        domain.set_quantity('friction', 0)
155
156        Br = Reflective_boundary(domain)
157        domain.set_boundary({'exterior': Br})
158
159        domain.time = 5.54    # Take a random time (not zero)
160
161        #Setup only one forcing term, constant wind stress
162        s = 100
163        phi = 135
164        domain.forcing_terms = []
165        domain.forcing_terms.append(Wind_stress(s=speed, phi=angle))
166
167        domain.compute_forcing_terms()
168
169        #Compute reference solution
170        const = eta_w*rho_a / rho_w
171
172        N = len(domain)    # number_of_triangles
173
174        xc = domain.get_centroid_coordinates()
175        t = domain.time
176
177        x = xc[:,0]
178        y = xc[:,1]
179        s_vec = speed(t,x,y)
180        phi_vec = angle(t,x,y)
181
182        for k in range(N):
183            # Convert to radians
184            phi = phi_vec[k]*pi / 180
185            s = s_vec[k]
186
187            # Compute velocity vector (u, v)
188            u = s*cos(phi)
189            v = s*sin(phi)
190
191            # Compute wind stress
192            S = const * num.sqrt(u**2 + v**2)
193
194            assert num.allclose(domain.quantities['stage'].explicit_update[k],
195                                0)
196            assert num.allclose(domain.quantities['xmomentum'].\
197                                     explicit_update[k],
198                                S*u)
199            assert num.allclose(domain.quantities['ymomentum'].\
200                                     explicit_update[k],
201                                S*v)
202
203    def test_windfield_from_file(self):
204        import time
205        from anuga.config import rho_a, rho_w, eta_w
206        from math import pi, cos, sin
207        from anuga.config import time_format
208        from anuga.abstract_2d_finite_volumes.util import file_function
209
210        a = [0.0, 0.0]
211        b = [0.0, 2.0]
212        c = [2.0, 0.0]
213        d = [0.0, 4.0]
214        e = [2.0, 2.0]
215        f = [4.0, 0.0]
216
217        points = [a, b, c, d, e, f]
218        #             bac,     bce,     ecf,     dbe
219        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
220
221        domain = Domain(points, vertices)
222
223        # Flat surface with 1m of water
224        domain.set_quantity('elevation', 0)
225        domain.set_quantity('stage', 1.0)
226        domain.set_quantity('friction', 0)
227
228        Br = Reflective_boundary(domain)
229        domain.set_boundary({'exterior': Br})
230
231        domain.time = 7    # Take a time that is represented in file (not zero)
232
233        # Write wind stress file (ensure that domain.time is covered)
234        # Take x=1 and y=0
235        filename = 'test_windstress_from_file'
236        start = time.mktime(time.strptime('2000', '%Y'))
237        fid = open(filename + '.txt', 'w')
238        dt = 1    # One second interval
239        t = 0.0
240        while t <= 10.0:
241            t_string = time.strftime(time_format, time.gmtime(t+start))
242
243            fid.write('%s, %f %f\n' %
244                      (t_string, speed(t,[1],[0])[0], angle(t,[1],[0])[0]))
245            t += dt
246
247        fid.close()
248
249        # Convert ASCII file to NetCDF (Which is what we really like!)
250        from file_conversion import timefile2netcdf
251
252        timefile2netcdf(filename)
253        os.remove(filename + '.txt')
254
255        # Setup wind stress
256        F = file_function(filename + '.tms',
257                          quantities=['Attribute0', 'Attribute1'])
258        os.remove(filename + '.tms')
259
260        W = Wind_stress(F)
261
262        domain.forcing_terms = []
263        domain.forcing_terms.append(W)
264
265        domain.compute_forcing_terms()
266
267        # Compute reference solution
268        const = eta_w*rho_a / rho_w
269
270        N = len(domain)    # number_of_triangles
271
272        t = domain.time
273
274        s = speed(t, [1], [0])[0]
275        phi = angle(t, [1], [0])[0]
276
277        # Convert to radians
278        phi = phi*pi / 180
279
280        # Compute velocity vector (u, v)
281        u = s*cos(phi)
282        v = s*sin(phi)
283
284        # Compute wind stress
285        S = const * num.sqrt(u**2 + v**2)
286
287        for k in range(N):
288            assert num.allclose(domain.quantities['stage'].explicit_update[k],
289                                0)
290            assert num.allclose(domain.quantities['xmomentum'].\
291                                    explicit_update[k],
292                                S*u)
293            assert num.allclose(domain.quantities['ymomentum'].\
294                                    explicit_update[k],
295                                S*v)
296
297    def test_windfield_from_file_seconds(self):
298        import time
299        from anuga.config import rho_a, rho_w, eta_w
300        from math import pi, cos, sin
301        from anuga.config import time_format
302        from anuga.abstract_2d_finite_volumes.util import file_function
303
304        a = [0.0, 0.0]
305        b = [0.0, 2.0]
306        c = [2.0, 0.0]
307        d = [0.0, 4.0]
308        e = [2.0, 2.0]
309        f = [4.0, 0.0]
310
311        points = [a, b, c, d, e, f]
312        #             bac,     bce,     ecf,     dbe
313        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
314
315        domain = Domain(points, vertices)
316
317        # Flat surface with 1m of water
318        domain.set_quantity('elevation', 0)
319        domain.set_quantity('stage', 1.0)
320        domain.set_quantity('friction', 0)
321
322        Br = Reflective_boundary(domain)
323        domain.set_boundary({'exterior': Br})
324
325        domain.time = 7    # Take a time that is represented in file (not zero)
326
327        # Write wind stress file (ensure that domain.time is covered)
328        # Take x=1 and y=0
329        filename = 'test_windstress_from_file'
330        start = time.mktime(time.strptime('2000', '%Y'))
331        fid = open(filename + '.txt', 'w')
332        dt = 0.5    # Half second interval
333        t = 0.0
334        while t <= 10.0:
335            fid.write('%s, %f %f\n'
336                      % (str(t), speed(t, [1], [0])[0], angle(t, [1], [0])[0]))
337            t += dt
338
339        fid.close()
340
341        # Convert ASCII file to NetCDF (Which is what we really like!)
342        from file_conversion import timefile2netcdf
343
344        timefile2netcdf(filename, time_as_seconds=True)
345        os.remove(filename + '.txt')
346
347        # Setup wind stress
348        F = file_function(filename + '.tms',
349                          quantities=['Attribute0', 'Attribute1'])
350        os.remove(filename + '.tms')
351
352        W = Wind_stress(F)
353
354        domain.forcing_terms = []
355        domain.forcing_terms.append(W)
356
357        domain.compute_forcing_terms()
358
359        # Compute reference solution
360        const = eta_w*rho_a / rho_w
361
362        N = len(domain)    # number_of_triangles
363
364        t = domain.time
365
366        s = speed(t, [1], [0])[0]
367        phi = angle(t, [1], [0])[0]
368
369        # Convert to radians
370        phi = phi*pi / 180
371
372        # Compute velocity vector (u, v)
373        u = s*cos(phi)
374        v = s*sin(phi)
375
376        # Compute wind stress
377        S = const * num.sqrt(u**2 + v**2)
378
379        for k in range(N):
380            assert num.allclose(domain.quantities['stage'].explicit_update[k],
381                                0)
382            assert num.allclose(domain.quantities['xmomentum'].\
383                                    explicit_update[k],
384                                S*u)
385            assert num.allclose(domain.quantities['ymomentum'].\
386                                    explicit_update[k],
387                                S*v)
388
389    def test_wind_stress_error_condition(self):
390        """Test that windstress reacts properly when forcing functions
391        are wrong - e.g. returns a scalar
392        """
393
394        from math import pi, cos, sin
395        from anuga.config import rho_a, rho_w, eta_w
396
397        a = [0.0, 0.0]
398        b = [0.0, 2.0]
399        c = [2.0, 0.0]
400        d = [0.0, 4.0]
401        e = [2.0, 2.0]
402        f = [4.0, 0.0]
403
404        points = [a, b, c, d, e, f]
405        #             bac,     bce,     ecf,     dbe
406        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
407
408        domain = Domain(points, vertices)
409
410        # Flat surface with 1m of water
411        domain.set_quantity('elevation', 0)
412        domain.set_quantity('stage', 1.0)
413        domain.set_quantity('friction', 0)
414
415        Br = Reflective_boundary(domain)
416        domain.set_boundary({'exterior': Br})
417
418        domain.time = 5.54    # Take a random time (not zero)
419
420        # Setup only one forcing term, bad func
421        domain.forcing_terms = []
422
423        try:
424            domain.forcing_terms.append(Wind_stress(s=scalar_func_list,
425                                                    phi=angle))
426        except AssertionError:
427            pass
428        else:
429            msg = 'Should have raised exception'
430            raise Exception, msg
431
432        try:
433            domain.forcing_terms.append(Wind_stress(s=speed, phi=scalar_func))
434        except Exception:
435            pass
436        else:
437            msg = 'Should have raised exception'
438            raise Exception, msg
439
440        try:
441            domain.forcing_terms.append(Wind_stress(s=speed, phi='xx'))
442        except:
443            pass
444        else:
445            msg = 'Should have raised exception'
446            raise Exception, msg
447
448    def test_rainfall(self):
449        from math import pi, cos, sin
450
451        a = [0.0, 0.0]
452        b = [0.0, 2.0]
453        c = [2.0, 0.0]
454        d = [0.0, 4.0]
455        e = [2.0, 2.0]
456        f = [4.0, 0.0]
457
458        points = [a, b, c, d, e, f]
459        #             bac,     bce,     ecf,     dbe
460        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
461
462        domain = Domain(points, vertices)
463
464        # Flat surface with 1m of water
465        domain.set_quantity('elevation', 0)
466        domain.set_quantity('stage', 1.0)
467        domain.set_quantity('friction', 0)
468
469        Br = Reflective_boundary(domain)
470        domain.set_boundary({'exterior': Br})
471
472        # Setup only one forcing term, constant rainfall
473        domain.forcing_terms = []
474        domain.forcing_terms.append(Rainfall(domain, rate=2.0))
475
476        domain.compute_forcing_terms()
477        assert num.allclose(domain.quantities['stage'].explicit_update,
478                            2.0/1000)
479
480    def test_rainfall_restricted_by_polygon(self):
481        from math import pi, cos, sin
482
483        a = [0.0, 0.0]
484        b = [0.0, 2.0]
485        c = [2.0, 0.0]
486        d = [0.0, 4.0]
487        e = [2.0, 2.0]
488        f = [4.0, 0.0]
489
490        points = [a, b, c, d, e, f]
491        #             bac,     bce,     ecf,     dbe
492        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
493
494        domain = Domain(points, vertices)
495
496        # Flat surface with 1m of water
497        domain.set_quantity('elevation', 0)
498        domain.set_quantity('stage', 1.0)
499        domain.set_quantity('friction', 0)
500
501        Br = Reflective_boundary(domain)
502        domain.set_boundary({'exterior': Br})
503
504        # Setup only one forcing term, constant rainfall
505        # restricted to a polygon enclosing triangle #1 (bce)
506        domain.forcing_terms = []
507        R = Rainfall(domain, rate=2.0, polygon=[[1,1], [2,1], [2,2], [1,2]])
508
509        assert num.allclose(R.exchange_area, 2)
510
511        domain.forcing_terms.append(R)
512
513        domain.compute_forcing_terms()
514
515        assert num.allclose(domain.quantities['stage'].explicit_update[1],
516                            2.0/1000)
517        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
518        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
519
520    def test_time_dependent_rainfall_restricted_by_polygon(self):
521        a = [0.0, 0.0]
522        b = [0.0, 2.0]
523        c = [2.0, 0.0]
524        d = [0.0, 4.0]
525        e = [2.0, 2.0]
526        f = [4.0, 0.0]
527
528        points = [a, b, c, d, e, f]
529        #             bac,     bce,     ecf,     dbe
530        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
531
532        domain = Domain(points, vertices)
533
534        # Flat surface with 1m of water
535        domain.set_quantity('elevation', 0)
536        domain.set_quantity('stage', 1.0)
537        domain.set_quantity('friction', 0)
538
539        Br = Reflective_boundary(domain)
540        domain.set_boundary({'exterior': Br})
541
542        # Setup only one forcing term, time dependent rainfall
543        # restricted to a polygon enclosing triangle #1 (bce)
544        domain.forcing_terms = []
545        R = Rainfall(domain,
546                     rate=lambda t: 3*t + 7,
547                     polygon = [[1,1], [2,1], [2,2], [1,2]])
548
549        assert num.allclose(R.exchange_area, 2)
550       
551        domain.forcing_terms.append(R)
552
553        domain.time = 10.
554
555        domain.compute_forcing_terms()
556
557        assert num.allclose(domain.quantities['stage'].explicit_update[1],
558                            (3*domain.time + 7)/1000)
559        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
560        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
561
562    def test_time_dependent_rainfall_using_starttime(self):
563        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
564
565        a = [0.0, 0.0]
566        b = [0.0, 2.0]
567        c = [2.0, 0.0]
568        d = [0.0, 4.0]
569        e = [2.0, 2.0]
570        f = [4.0, 0.0]
571
572        points = [a, b, c, d, e, f]
573        #             bac,     bce,     ecf,     dbe
574        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
575
576        domain = Domain(points, vertices)
577
578        # Flat surface with 1m of water
579        domain.set_quantity('elevation', 0)
580        domain.set_quantity('stage', 1.0)
581        domain.set_quantity('friction', 0)
582
583        Br = Reflective_boundary(domain)
584        domain.set_boundary({'exterior': Br})
585
586        # Setup only one forcing term, time dependent rainfall
587        # restricted to a polygon enclosing triangle #1 (bce)
588        domain.forcing_terms = []
589        R = Rainfall(domain,
590                     rate=lambda t: 3*t + 7,
591                     polygon=rainfall_poly)                     
592
593        assert num.allclose(R.exchange_area, 2)
594       
595        domain.forcing_terms.append(R)
596
597        # This will test that time used in the forcing function takes
598        # startime into account.
599        domain.starttime = 5.0
600
601        domain.time = 7.
602
603        domain.compute_forcing_terms()
604
605        assert num.allclose(domain.quantities['stage'].explicit_update[1],
606                            (3*domain.get_time() + 7)/1000)
607        assert num.allclose(domain.quantities['stage'].explicit_update[1],
608                            (3*(domain.time + domain.starttime) + 7)/1000)
609
610        # Using internal time her should fail
611        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
612                                (3*domain.time + 7)/1000)
613
614        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
615        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
616
617    def test_time_dependent_rainfall_using_georef(self):
618        """test_time_dependent_rainfall_using_georef
619
620        This will also test the General forcing term using georef
621        """
622
623        # Mesh in zone 56 (absolute coords)
624        x0 = 314036.58727982
625        y0 = 6224951.2960092
626
627        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
628        rainfall_poly += [x0, y0]
629
630        a = [0.0, 0.0]
631        b = [0.0, 2.0]
632        c = [2.0, 0.0]
633        d = [0.0, 4.0]
634        e = [2.0, 2.0]
635        f = [4.0, 0.0]
636
637        points = [a, b, c, d, e, f]
638        #             bac,     bce,     ecf,     dbe
639        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
640
641        domain = Domain(points, vertices,
642                        geo_reference=Geo_reference(56, x0, y0))
643
644        # Flat surface with 1m of water
645        domain.set_quantity('elevation', 0)
646        domain.set_quantity('stage', 1.0)
647        domain.set_quantity('friction', 0)
648
649        Br = Reflective_boundary(domain)
650        domain.set_boundary({'exterior': Br})
651
652        # Setup only one forcing term, time dependent rainfall
653        # restricted to a polygon enclosing triangle #1 (bce)
654        domain.forcing_terms = []
655        R = Rainfall(domain,
656                     rate=lambda t: 3*t + 7,
657                     polygon=rainfall_poly)
658
659        assert num.allclose(R.exchange_area, 2)
660       
661        domain.forcing_terms.append(R)
662
663        # This will test that time used in the forcing function takes
664        # startime into account.
665        domain.starttime = 5.0
666
667        domain.time = 7.
668
669        domain.compute_forcing_terms()
670
671        assert num.allclose(domain.quantities['stage'].explicit_update[1],
672                            (3*domain.get_time() + 7)/1000)
673        assert num.allclose(domain.quantities['stage'].explicit_update[1],
674                            (3*(domain.time + domain.starttime) + 7)/1000)
675
676        # Using internal time her should fail
677        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
678                                (3*domain.time + 7)/1000)
679
680        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
681        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
682
683    def test_time_dependent_rainfall_restricted_by_polygon_with_default(self):
684        """
685        Test that default rainfall can be used when given rate runs out of data.
686        """
687
688        a = [0.0, 0.0]
689        b = [0.0, 2.0]
690        c = [2.0, 0.0]
691        d = [0.0, 4.0]
692        e = [2.0, 2.0]
693        f = [4.0, 0.0]
694
695        points = [a, b, c, d, e, f]
696        #             bac,     bce,     ecf,     dbe
697        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
698
699        domain = Domain(points, vertices)
700
701        # Flat surface with 1m of water
702        domain.set_quantity('elevation', 0)
703        domain.set_quantity('stage', 1.0)
704        domain.set_quantity('friction', 0)
705
706        Br = Reflective_boundary(domain)
707        domain.set_boundary({'exterior': Br})
708
709        # Setup only one forcing term, time dependent rainfall
710        # that expires at t==20
711        from anuga.fit_interpolate.interpolate import Modeltime_too_late
712
713        def main_rate(t):
714            if t > 20:
715                msg = 'Model time exceeded.'
716                raise Modeltime_too_late, msg
717            else:
718                return 3*t + 7
719
720        domain.forcing_terms = []
721        R = Rainfall(domain,
722                     rate=main_rate,
723                     polygon = [[1,1], [2,1], [2,2], [1,2]],
724                     default_rate=5.0)
725
726        assert num.allclose(R.exchange_area, 2)
727       
728        domain.forcing_terms.append(R)
729
730        domain.time = 10.
731
732        domain.compute_forcing_terms()
733
734        assert num.allclose(domain.quantities['stage'].explicit_update[1],
735                            (3*domain.time+7)/1000)
736        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
737        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
738
739        domain.time = 100.
740        domain.quantities['stage'].explicit_update[:] = 0.0     # Reset
741        domain.compute_forcing_terms()
742
743        assert num.allclose(domain.quantities['stage'].explicit_update[1],
744                            5.0/1000) # Default value
745        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
746        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
747
748    def test_rainfall_forcing_with_evolve(self):
749        """test_rainfall_forcing_with_evolve
750
751        Test how forcing terms are called within evolve
752        """
753
754        # FIXME(Ole): This test is just to experiment
755
756        a = [0.0, 0.0]
757        b = [0.0, 2.0]
758        c = [2.0, 0.0]
759        d = [0.0, 4.0]
760        e = [2.0, 2.0]
761        f = [4.0, 0.0]
762
763        points = [a, b, c, d, e, f]
764        #             bac,     bce,     ecf,     dbe
765        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
766
767        domain = Domain(points, vertices)
768
769        # Flat surface with 1m of water
770        domain.set_quantity('elevation', 0)
771        domain.set_quantity('stage', 1.0)
772        domain.set_quantity('friction', 0)
773
774        Br = Reflective_boundary(domain)
775        domain.set_boundary({'exterior': Br})
776
777        # Setup only one forcing term, time dependent rainfall
778        # that expires at t==20
779        from anuga.fit_interpolate.interpolate import Modeltime_too_late
780
781        def main_rate(t):
782            if t > 20:
783                msg = 'Model time exceeded.'
784                raise Modeltime_too_late, msg
785            else:
786                return 3*t + 7
787
788        domain.forcing_terms = []
789        R = Rainfall(domain,
790                     rate=main_rate,
791                     polygon=[[1,1], [2,1], [2,2], [1,2]],
792                     default_rate=5.0)
793
794        assert num.allclose(R.exchange_area, 2)
795       
796        domain.forcing_terms.append(R)
797
798        for t in domain.evolve(yieldstep=1, finaltime=25):
799            pass
800            #FIXME(Ole):  A test here is hard because explicit_update also
801            # receives updates from the flux calculation.
802
803
804    def test_rainfall_forcing_with_evolve_1(self):
805        """test_rainfall_forcing_with_evolve
806
807        Test how forcing terms are called within evolve.
808        This test checks that proper exception is thrown when no default_rate is set
809        """
810
811
812        a = [0.0, 0.0]
813        b = [0.0, 2.0]
814        c = [2.0, 0.0]
815        d = [0.0, 4.0]
816        e = [2.0, 2.0]
817        f = [4.0, 0.0]
818
819        points = [a, b, c, d, e, f]
820        #             bac,     bce,     ecf,     dbe
821        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
822
823        domain = Domain(points, vertices)
824
825        # Flat surface with 1m of water
826        domain.set_quantity('elevation', 0)
827        domain.set_quantity('stage', 1.0)
828        domain.set_quantity('friction', 0)
829
830        Br = Reflective_boundary(domain)
831        domain.set_boundary({'exterior': Br})
832
833        # Setup only one forcing term, time dependent rainfall
834        # that expires at t==20
835        from anuga.fit_interpolate.interpolate import Modeltime_too_late
836
837        def main_rate(t):
838            if t > 20:
839                msg = 'Model time exceeded.'
840                raise Modeltime_too_late, msg
841            else:
842                return 3*t + 7
843
844        domain.forcing_terms = []
845        R = Rainfall(domain,
846                     rate=main_rate,
847                     polygon=[[1,1], [2,1], [2,2], [1,2]])
848
849
850        assert num.allclose(R.exchange_area, 2)
851       
852        domain.forcing_terms.append(R)
853        #for t in domain.evolve(yieldstep=1, finaltime=25):
854        #    pass
855               
856        try:
857            for t in domain.evolve(yieldstep=1, finaltime=25):
858                pass
859        except Modeltime_too_late, e:
860            # Test that error message is as expected
861            assert 'can specify keyword argument default_rate in the forcing function' in str(e)
862        else:
863            raise Exception, 'Should have raised exception'
864
865           
866if __name__ == "__main__":
867    suite = unittest.makeSuite(Test_Forcing, 'test')
868    runner = unittest.TextTestRunner(verbosity=1)
869    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.