source: trunk/anuga_core/source/anuga/shallow_water/test_forcing.py @ 7778

Last change on this file since 7778 was 7778, checked in by hudson, 14 years ago

Cleaned up unit tests to use API.

File size: 25.6 KB
Line 
1"""  Test environmental forcing - rain, wind, etc.
2"""
3
4import unittest, os
5from anuga.shallow_water.shallow_water_domain import Domain
6from boundaries import Reflective_boundary
7from anuga.coordinate_transforms.geo_reference import Geo_reference
8from anuga.file_conversion.file_conversion import timefile2netcdf
9from forcing import *
10
11import numpy as num
12
13
14def scalar_func_list(t, x, y):
15    """Function that returns a scalar.
16
17    Used to test error message when numeric array is expected
18    """
19
20    return [17.7]
21
22
23def speed(t, x, y):
24    """
25    Variable windfield implemented using functions   
26    Large speeds halfway between center and edges
27
28    Low speeds at center and edges
29    """
30
31    from math import exp, cos, pi
32
33    x = num.array(x)
34    y = num.array(y)
35
36    N = len(x)
37    s = 0*#New array
38
39    for k in range(N):
40        r = num.sqrt(x[k]**2 + y[k]**2)
41        factor = exp(-(r-0.15)**2)
42        s[k] = 4000 * factor * (cos(t*2*pi/150) + 2)
43
44    return s
45
46
47def angle(t, x, y):
48    """Rotating field
49    """
50    from math import atan, pi
51
52    x = num.array(x)
53    y = num.array(y)
54
55    N = len(x)
56    a = 0 * x    # New array
57
58    for k in range(N):
59        r = num.sqrt(x[k]**2 + y[k]**2)
60
61        angle = atan(y[k]/x[k])
62
63        if x[k] < 0:
64            angle += pi
65
66        # Take normal direction
67        angle -= pi/2
68
69        # Ensure positive radians
70        if angle < 0:
71            angle += 2*pi
72
73        a[k] = angle/pi*180
74
75    return a
76
77   
78
79class Test_Forcing(unittest.TestCase):
80    def setUp(self):
81        pass
82
83    def tearDown(self):
84        pass
85       
86    def test_constant_wind_stress(self):
87        from anuga.config import rho_a, rho_w, eta_w
88        from math import pi, cos, sin
89
90        a = [0.0, 0.0]
91        b = [0.0, 2.0]
92        c = [2.0, 0.0]
93        d = [0.0, 4.0]
94        e = [2.0, 2.0]
95        f = [4.0, 0.0]
96
97        points = [a, b, c, d, e, f]
98        #             bac,     bce,     ecf,     dbe
99        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
100
101        domain = Domain(points, vertices)
102
103        #Flat surface with 1m of water
104        domain.set_quantity('elevation', 0)
105        domain.set_quantity('stage', 1.0)
106        domain.set_quantity('friction', 0)
107
108        Br = Reflective_boundary(domain)
109        domain.set_boundary({'exterior': Br})
110
111        #Setup only one forcing term, constant wind stress
112        s = 100
113        phi = 135
114        domain.forcing_terms = []
115        domain.forcing_terms.append(Wind_stress(s, phi))
116
117        domain.compute_forcing_terms()
118
119        const = eta_w*rho_a / rho_w
120
121        #Convert to radians
122        phi = phi*pi / 180
123
124        #Compute velocity vector (u, v)
125        u = s*cos(phi)
126        v = s*sin(phi)
127
128        #Compute wind stress
129        S = const * num.sqrt(u**2 + v**2)
130
131        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
132        assert num.allclose(domain.quantities['xmomentum'].explicit_update, S*u)
133        assert num.allclose(domain.quantities['ymomentum'].explicit_update, S*v)
134
135    def test_variable_wind_stress(self):
136        from anuga.config import rho_a, rho_w, eta_w
137        from math import pi, cos, sin
138
139        a = [0.0, 0.0]
140        b = [0.0, 2.0]
141        c = [2.0, 0.0]
142        d = [0.0, 4.0]
143        e = [2.0, 2.0]
144        f = [4.0, 0.0]
145
146        points = [a, b, c, d, e, f]
147        #             bac,     bce,     ecf,     dbe
148        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
149
150        domain = Domain(points, vertices)
151
152        #Flat surface with 1m of water
153        domain.set_quantity('elevation', 0)
154        domain.set_quantity('stage', 1.0)
155        domain.set_quantity('friction', 0)
156
157        Br = Reflective_boundary(domain)
158        domain.set_boundary({'exterior': Br})
159
160        domain.time = 5.54    # Take a random time (not zero)
161
162        #Setup only one forcing term, constant wind stress
163        s = 100
164        phi = 135
165        domain.forcing_terms = []
166        domain.forcing_terms.append(Wind_stress(s=speed, phi=angle))
167
168        domain.compute_forcing_terms()
169
170        #Compute reference solution
171        const = eta_w*rho_a / rho_w
172
173        N = len(domain)    # number_of_triangles
174
175        xc = domain.get_centroid_coordinates()
176        t = domain.time
177
178        x = xc[:,0]
179        y = xc[:,1]
180        s_vec = speed(t,x,y)
181        phi_vec = angle(t,x,y)
182
183        for k in range(N):
184            # Convert to radians
185            phi = phi_vec[k]*pi / 180
186            s = s_vec[k]
187
188            # Compute velocity vector (u, v)
189            u = s*cos(phi)
190            v = s*sin(phi)
191
192            # Compute wind stress
193            S = const * num.sqrt(u**2 + v**2)
194
195            assert num.allclose(domain.quantities['stage'].explicit_update[k],
196                                0)
197            assert num.allclose(domain.quantities['xmomentum'].\
198                                     explicit_update[k],
199                                S*u)
200            assert num.allclose(domain.quantities['ymomentum'].\
201                                     explicit_update[k],
202                                S*v)
203
204    def test_windfield_from_file(self):
205        import time
206        from anuga.config import rho_a, rho_w, eta_w
207        from math import pi, cos, sin
208        from anuga.config import time_format
209        from anuga.abstract_2d_finite_volumes.util import file_function
210
211        a = [0.0, 0.0]
212        b = [0.0, 2.0]
213        c = [2.0, 0.0]
214        d = [0.0, 4.0]
215        e = [2.0, 2.0]
216        f = [4.0, 0.0]
217
218        points = [a, b, c, d, e, f]
219        #             bac,     bce,     ecf,     dbe
220        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
221
222        domain = Domain(points, vertices)
223
224        # Flat surface with 1m of water
225        domain.set_quantity('elevation', 0)
226        domain.set_quantity('stage', 1.0)
227        domain.set_quantity('friction', 0)
228
229        Br = Reflective_boundary(domain)
230        domain.set_boundary({'exterior': Br})
231
232        domain.time = 7    # Take a time that is represented in file (not zero)
233
234        # Write wind stress file (ensure that domain.time is covered)
235        # Take x=1 and y=0
236        filename = 'test_windstress_from_file'
237        start = time.mktime(time.strptime('2000', '%Y'))
238        fid = open(filename + '.txt', 'w')
239        dt = 1    # One second interval
240        t = 0.0
241        while t <= 10.0:
242            t_string = time.strftime(time_format, time.gmtime(t+start))
243
244            fid.write('%s, %f %f\n' %
245                      (t_string, speed(t,[1],[0])[0], angle(t,[1],[0])[0]))
246            t += dt
247
248        fid.close()
249
250        timefile2netcdf(filename)
251        os.remove(filename + '.txt')
252
253        # Setup wind stress
254        F = file_function(filename + '.tms',
255                          quantities=['Attribute0', 'Attribute1'])
256        os.remove(filename + '.tms')
257
258        W = Wind_stress(F)
259
260        domain.forcing_terms = []
261        domain.forcing_terms.append(W)
262
263        domain.compute_forcing_terms()
264
265        # Compute reference solution
266        const = eta_w*rho_a / rho_w
267
268        N = len(domain)    # number_of_triangles
269
270        t = domain.time
271
272        s = speed(t, [1], [0])[0]
273        phi = angle(t, [1], [0])[0]
274
275        # Convert to radians
276        phi = phi*pi / 180
277
278        # Compute velocity vector (u, v)
279        u = s*cos(phi)
280        v = s*sin(phi)
281
282        # Compute wind stress
283        S = const * num.sqrt(u**2 + v**2)
284
285        for k in range(N):
286            assert num.allclose(domain.quantities['stage'].explicit_update[k],
287                                0)
288            assert num.allclose(domain.quantities['xmomentum'].\
289                                    explicit_update[k],
290                                S*u)
291            assert num.allclose(domain.quantities['ymomentum'].\
292                                    explicit_update[k],
293                                S*v)
294
295    def test_windfield_from_file_seconds(self):
296        import time
297        from anuga.config import rho_a, rho_w, eta_w
298        from math import pi, cos, sin
299        from anuga.config import time_format
300        from anuga.abstract_2d_finite_volumes.util import file_function
301
302        a = [0.0, 0.0]
303        b = [0.0, 2.0]
304        c = [2.0, 0.0]
305        d = [0.0, 4.0]
306        e = [2.0, 2.0]
307        f = [4.0, 0.0]
308
309        points = [a, b, c, d, e, f]
310        #             bac,     bce,     ecf,     dbe
311        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
312
313        domain = Domain(points, vertices)
314
315        # Flat surface with 1m of water
316        domain.set_quantity('elevation', 0)
317        domain.set_quantity('stage', 1.0)
318        domain.set_quantity('friction', 0)
319
320        Br = Reflective_boundary(domain)
321        domain.set_boundary({'exterior': Br})
322
323        domain.time = 7    # Take a time that is represented in file (not zero)
324
325        # Write wind stress file (ensure that domain.time is covered)
326        # Take x=1 and y=0
327        filename = 'test_windstress_from_file'
328        start = time.mktime(time.strptime('2000', '%Y'))
329        fid = open(filename + '.txt', 'w')
330        dt = 0.5    # Half second interval
331        t = 0.0
332        while t <= 10.0:
333            fid.write('%s, %f %f\n'
334                      % (str(t), speed(t, [1], [0])[0], angle(t, [1], [0])[0]))
335            t += dt
336
337        fid.close()
338
339        timefile2netcdf(filename, time_as_seconds=True)
340        os.remove(filename + '.txt')
341
342        # Setup wind stress
343        F = file_function(filename + '.tms',
344                          quantities=['Attribute0', 'Attribute1'])
345        os.remove(filename + '.tms')
346
347        W = Wind_stress(F)
348
349        domain.forcing_terms = []
350        domain.forcing_terms.append(W)
351
352        domain.compute_forcing_terms()
353
354        # Compute reference solution
355        const = eta_w*rho_a / rho_w
356
357        N = len(domain)    # number_of_triangles
358
359        t = domain.time
360
361        s = speed(t, [1], [0])[0]
362        phi = angle(t, [1], [0])[0]
363
364        # Convert to radians
365        phi = phi*pi / 180
366
367        # Compute velocity vector (u, v)
368        u = s*cos(phi)
369        v = s*sin(phi)
370
371        # Compute wind stress
372        S = const * num.sqrt(u**2 + v**2)
373
374        for k in range(N):
375            assert num.allclose(domain.quantities['stage'].explicit_update[k],
376                                0)
377            assert num.allclose(domain.quantities['xmomentum'].\
378                                    explicit_update[k],
379                                S*u)
380            assert num.allclose(domain.quantities['ymomentum'].\
381                                    explicit_update[k],
382                                S*v)
383
384    def test_wind_stress_error_condition(self):
385        """Test that windstress reacts properly when forcing functions
386        are wrong - e.g. returns a scalar
387        """
388
389        from math import pi, cos, sin
390        from anuga.config import rho_a, rho_w, eta_w
391
392        a = [0.0, 0.0]
393        b = [0.0, 2.0]
394        c = [2.0, 0.0]
395        d = [0.0, 4.0]
396        e = [2.0, 2.0]
397        f = [4.0, 0.0]
398
399        points = [a, b, c, d, e, f]
400        #             bac,     bce,     ecf,     dbe
401        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
402
403        domain = Domain(points, vertices)
404
405        # Flat surface with 1m of water
406        domain.set_quantity('elevation', 0)
407        domain.set_quantity('stage', 1.0)
408        domain.set_quantity('friction', 0)
409
410        Br = Reflective_boundary(domain)
411        domain.set_boundary({'exterior': Br})
412
413        domain.time = 5.54    # Take a random time (not zero)
414
415        # Setup only one forcing term, bad func
416        domain.forcing_terms = []
417
418        try:
419            domain.forcing_terms.append(Wind_stress(s=scalar_func_list,
420                                                    phi=angle))
421        except AssertionError:
422            pass
423        else:
424            msg = 'Should have raised exception'
425            raise Exception, msg
426
427        try:
428            domain.forcing_terms.append(Wind_stress(s=speed, phi=scalar_func))
429        except Exception:
430            pass
431        else:
432            msg = 'Should have raised exception'
433            raise Exception, msg
434
435        try:
436            domain.forcing_terms.append(Wind_stress(s=speed, phi='xx'))
437        except:
438            pass
439        else:
440            msg = 'Should have raised exception'
441            raise Exception, msg
442
443    def test_rainfall(self):
444        from math import pi, cos, sin
445
446        a = [0.0, 0.0]
447        b = [0.0, 2.0]
448        c = [2.0, 0.0]
449        d = [0.0, 4.0]
450        e = [2.0, 2.0]
451        f = [4.0, 0.0]
452
453        points = [a, b, c, d, e, f]
454        #             bac,     bce,     ecf,     dbe
455        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
456
457        domain = Domain(points, vertices)
458
459        # Flat surface with 1m of water
460        domain.set_quantity('elevation', 0)
461        domain.set_quantity('stage', 1.0)
462        domain.set_quantity('friction', 0)
463
464        Br = Reflective_boundary(domain)
465        domain.set_boundary({'exterior': Br})
466
467        # Setup only one forcing term, constant rainfall
468        domain.forcing_terms = []
469        domain.forcing_terms.append(Rainfall(domain, rate=2.0))
470
471        domain.compute_forcing_terms()
472        assert num.allclose(domain.quantities['stage'].explicit_update,
473                            2.0/1000)
474
475    def test_rainfall_restricted_by_polygon(self):
476        from math import pi, cos, sin
477
478        a = [0.0, 0.0]
479        b = [0.0, 2.0]
480        c = [2.0, 0.0]
481        d = [0.0, 4.0]
482        e = [2.0, 2.0]
483        f = [4.0, 0.0]
484
485        points = [a, b, c, d, e, f]
486        #             bac,     bce,     ecf,     dbe
487        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
488
489        domain = Domain(points, vertices)
490
491        # Flat surface with 1m of water
492        domain.set_quantity('elevation', 0)
493        domain.set_quantity('stage', 1.0)
494        domain.set_quantity('friction', 0)
495
496        Br = Reflective_boundary(domain)
497        domain.set_boundary({'exterior': Br})
498
499        # Setup only one forcing term, constant rainfall
500        # restricted to a polygon enclosing triangle #1 (bce)
501        domain.forcing_terms = []
502        R = Rainfall(domain, rate=2.0, polygon=[[1,1], [2,1], [2,2], [1,2]])
503
504        assert num.allclose(R.exchange_area, 2)
505
506        domain.forcing_terms.append(R)
507
508        domain.compute_forcing_terms()
509
510        assert num.allclose(domain.quantities['stage'].explicit_update[1],
511                            2.0/1000)
512        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
513        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
514
515    def test_time_dependent_rainfall_restricted_by_polygon(self):
516        a = [0.0, 0.0]
517        b = [0.0, 2.0]
518        c = [2.0, 0.0]
519        d = [0.0, 4.0]
520        e = [2.0, 2.0]
521        f = [4.0, 0.0]
522
523        points = [a, b, c, d, e, f]
524        #             bac,     bce,     ecf,     dbe
525        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
526
527        domain = Domain(points, vertices)
528
529        # Flat surface with 1m of water
530        domain.set_quantity('elevation', 0)
531        domain.set_quantity('stage', 1.0)
532        domain.set_quantity('friction', 0)
533
534        Br = Reflective_boundary(domain)
535        domain.set_boundary({'exterior': Br})
536
537        # Setup only one forcing term, time dependent rainfall
538        # restricted to a polygon enclosing triangle #1 (bce)
539        domain.forcing_terms = []
540        R = Rainfall(domain,
541                     rate=lambda t: 3*t + 7,
542                     polygon = [[1,1], [2,1], [2,2], [1,2]])
543
544        assert num.allclose(R.exchange_area, 2)
545       
546        domain.forcing_terms.append(R)
547
548        domain.time = 10.
549
550        domain.compute_forcing_terms()
551
552        assert num.allclose(domain.quantities['stage'].explicit_update[1],
553                            (3*domain.time + 7)/1000)
554        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
555        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
556
557    def test_time_dependent_rainfall_using_starttime(self):
558        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
559
560        a = [0.0, 0.0]
561        b = [0.0, 2.0]
562        c = [2.0, 0.0]
563        d = [0.0, 4.0]
564        e = [2.0, 2.0]
565        f = [4.0, 0.0]
566
567        points = [a, b, c, d, e, f]
568        #             bac,     bce,     ecf,     dbe
569        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
570
571        domain = Domain(points, vertices)
572
573        # Flat surface with 1m of water
574        domain.set_quantity('elevation', 0)
575        domain.set_quantity('stage', 1.0)
576        domain.set_quantity('friction', 0)
577
578        Br = Reflective_boundary(domain)
579        domain.set_boundary({'exterior': Br})
580
581        # Setup only one forcing term, time dependent rainfall
582        # restricted to a polygon enclosing triangle #1 (bce)
583        domain.forcing_terms = []
584        R = Rainfall(domain,
585                     rate=lambda t: 3*t + 7,
586                     polygon=rainfall_poly)                     
587
588        assert num.allclose(R.exchange_area, 2)
589       
590        domain.forcing_terms.append(R)
591
592        # This will test that time used in the forcing function takes
593        # startime into account.
594        domain.starttime = 5.0
595
596        domain.time = 7.
597
598        domain.compute_forcing_terms()
599
600        assert num.allclose(domain.quantities['stage'].explicit_update[1],
601                            (3*domain.get_time() + 7)/1000)
602        assert num.allclose(domain.quantities['stage'].explicit_update[1],
603                            (3*(domain.time + domain.starttime) + 7)/1000)
604
605        # Using internal time her should fail
606        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
607                                (3*domain.time + 7)/1000)
608
609        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
610        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
611
612    def test_time_dependent_rainfall_using_georef(self):
613        """test_time_dependent_rainfall_using_georef
614
615        This will also test the General forcing term using georef
616        """
617
618        # Mesh in zone 56 (absolute coords)
619        x0 = 314036.58727982
620        y0 = 6224951.2960092
621
622        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
623        rainfall_poly += [x0, y0]
624
625        a = [0.0, 0.0]
626        b = [0.0, 2.0]
627        c = [2.0, 0.0]
628        d = [0.0, 4.0]
629        e = [2.0, 2.0]
630        f = [4.0, 0.0]
631
632        points = [a, b, c, d, e, f]
633        #             bac,     bce,     ecf,     dbe
634        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
635
636        domain = Domain(points, vertices,
637                        geo_reference=Geo_reference(56, x0, y0))
638
639        # Flat surface with 1m of water
640        domain.set_quantity('elevation', 0)
641        domain.set_quantity('stage', 1.0)
642        domain.set_quantity('friction', 0)
643
644        Br = Reflective_boundary(domain)
645        domain.set_boundary({'exterior': Br})
646
647        # Setup only one forcing term, time dependent rainfall
648        # restricted to a polygon enclosing triangle #1 (bce)
649        domain.forcing_terms = []
650        R = Rainfall(domain,
651                     rate=lambda t: 3*t + 7,
652                     polygon=rainfall_poly)
653
654        assert num.allclose(R.exchange_area, 2)
655       
656        domain.forcing_terms.append(R)
657
658        # This will test that time used in the forcing function takes
659        # startime into account.
660        domain.starttime = 5.0
661
662        domain.time = 7.
663
664        domain.compute_forcing_terms()
665
666        assert num.allclose(domain.quantities['stage'].explicit_update[1],
667                            (3*domain.get_time() + 7)/1000)
668        assert num.allclose(domain.quantities['stage'].explicit_update[1],
669                            (3*(domain.time + domain.starttime) + 7)/1000)
670
671        # Using internal time her should fail
672        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
673                                (3*domain.time + 7)/1000)
674
675        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
676        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
677
678    def test_time_dependent_rainfall_restricted_by_polygon_with_default(self):
679        """
680        Test that default rainfall can be used when given rate runs out of data.
681        """
682
683        a = [0.0, 0.0]
684        b = [0.0, 2.0]
685        c = [2.0, 0.0]
686        d = [0.0, 4.0]
687        e = [2.0, 2.0]
688        f = [4.0, 0.0]
689
690        points = [a, b, c, d, e, f]
691        #             bac,     bce,     ecf,     dbe
692        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
693
694        domain = Domain(points, vertices)
695
696        # Flat surface with 1m of water
697        domain.set_quantity('elevation', 0)
698        domain.set_quantity('stage', 1.0)
699        domain.set_quantity('friction', 0)
700
701        Br = Reflective_boundary(domain)
702        domain.set_boundary({'exterior': Br})
703
704        # Setup only one forcing term, time dependent rainfall
705        # that expires at t==20
706        from anuga.fit_interpolate.interpolate import Modeltime_too_late
707
708        def main_rate(t):
709            if t > 20:
710                msg = 'Model time exceeded.'
711                raise Modeltime_too_late, msg
712            else:
713                return 3*t + 7
714
715        domain.forcing_terms = []
716        R = Rainfall(domain,
717                     rate=main_rate,
718                     polygon = [[1,1], [2,1], [2,2], [1,2]],
719                     default_rate=5.0)
720
721        assert num.allclose(R.exchange_area, 2)
722       
723        domain.forcing_terms.append(R)
724
725        domain.time = 10.
726
727        domain.compute_forcing_terms()
728
729        assert num.allclose(domain.quantities['stage'].explicit_update[1],
730                            (3*domain.time+7)/1000)
731        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
732        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
733
734        domain.time = 100.
735        domain.quantities['stage'].explicit_update[:] = 0.0     # Reset
736        domain.compute_forcing_terms()
737
738        assert num.allclose(domain.quantities['stage'].explicit_update[1],
739                            5.0/1000) # Default value
740        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
741        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
742
743    def test_rainfall_forcing_with_evolve(self):
744        """test_rainfall_forcing_with_evolve
745
746        Test how forcing terms are called within evolve
747        """
748
749        # FIXME(Ole): This test is just to experiment
750
751        a = [0.0, 0.0]
752        b = [0.0, 2.0]
753        c = [2.0, 0.0]
754        d = [0.0, 4.0]
755        e = [2.0, 2.0]
756        f = [4.0, 0.0]
757
758        points = [a, b, c, d, e, f]
759        #             bac,     bce,     ecf,     dbe
760        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
761
762        domain = Domain(points, vertices)
763
764        # Flat surface with 1m of water
765        domain.set_quantity('elevation', 0)
766        domain.set_quantity('stage', 1.0)
767        domain.set_quantity('friction', 0)
768
769        Br = Reflective_boundary(domain)
770        domain.set_boundary({'exterior': Br})
771
772        # Setup only one forcing term, time dependent rainfall
773        # that expires at t==20
774        from anuga.fit_interpolate.interpolate import Modeltime_too_late
775
776        def main_rate(t):
777            if t > 20:
778                msg = 'Model time exceeded.'
779                raise Modeltime_too_late, msg
780            else:
781                return 3*t + 7
782
783        domain.forcing_terms = []
784        R = Rainfall(domain,
785                     rate=main_rate,
786                     polygon=[[1,1], [2,1], [2,2], [1,2]],
787                     default_rate=5.0)
788
789        assert num.allclose(R.exchange_area, 2)
790       
791        domain.forcing_terms.append(R)
792
793        for t in domain.evolve(yieldstep=1, finaltime=25):
794            pass
795            #FIXME(Ole):  A test here is hard because explicit_update also
796            # receives updates from the flux calculation.
797
798
799    def test_rainfall_forcing_with_evolve_1(self):
800        """test_rainfall_forcing_with_evolve
801
802        Test how forcing terms are called within evolve.
803        This test checks that proper exception is thrown when no default_rate is set
804        """
805
806
807        a = [0.0, 0.0]
808        b = [0.0, 2.0]
809        c = [2.0, 0.0]
810        d = [0.0, 4.0]
811        e = [2.0, 2.0]
812        f = [4.0, 0.0]
813
814        points = [a, b, c, d, e, f]
815        #             bac,     bce,     ecf,     dbe
816        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
817
818        domain = Domain(points, vertices)
819
820        # Flat surface with 1m of water
821        domain.set_quantity('elevation', 0)
822        domain.set_quantity('stage', 1.0)
823        domain.set_quantity('friction', 0)
824
825        Br = Reflective_boundary(domain)
826        domain.set_boundary({'exterior': Br})
827
828        # Setup only one forcing term, time dependent rainfall
829        # that expires at t==20
830        from anuga.fit_interpolate.interpolate import Modeltime_too_late
831
832        def main_rate(t):
833            if t > 20:
834                msg = 'Model time exceeded.'
835                raise Modeltime_too_late, msg
836            else:
837                return 3*t + 7
838
839        domain.forcing_terms = []
840        R = Rainfall(domain,
841                     rate=main_rate,
842                     polygon=[[1,1], [2,1], [2,2], [1,2]])
843
844
845        assert num.allclose(R.exchange_area, 2)
846       
847        domain.forcing_terms.append(R)
848        #for t in domain.evolve(yieldstep=1, finaltime=25):
849        #    pass
850               
851        try:
852            for t in domain.evolve(yieldstep=1, finaltime=25):
853                pass
854        except Modeltime_too_late, e:
855            # Test that error message is as expected
856            assert 'can specify keyword argument default_rate in the forcing function' in str(e)
857        else:
858            raise Exception, 'Should have raised exception'
859
860           
861if __name__ == "__main__":
862    suite = unittest.makeSuite(Test_Forcing, 'test')
863    runner = unittest.TextTestRunner(verbosity=1)
864    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.