source: trunk/anuga_core/source/anuga/shallow_water/test_forcing.py @ 7844

Last change on this file since 7844 was 7844, checked in by steve, 12 years ago

Getting rid of warning messages from test_all.py

File size: 25.8 KB
Line 
1"""  Test environmental forcing - rain, wind, etc.
2"""
3
4import unittest, os
5from anuga.shallow_water.shallow_water_domain import Domain
6from boundaries import Reflective_boundary
7from anuga.coordinate_transforms.geo_reference import Geo_reference
8from anuga.file_conversion.file_conversion import timefile2netcdf
9from forcing import *
10
11import numpy as num
12import warnings
13
14
15def scalar_func_list(t, x, y):
16    """Function that returns a scalar.
17
18    Used to test error message when numeric array is expected
19    """
20
21    return [17.7]
22
23
24def speed(t, x, y):
25    """
26    Variable windfield implemented using functions   
27    Large speeds halfway between center and edges
28
29    Low speeds at center and edges
30    """
31
32    from math import exp, cos, pi
33
34    x = num.array(x)
35    y = num.array(y)
36
37    N = len(x)
38    s = 0*#New array
39
40    for k in range(N):
41        r = num.sqrt(x[k]**2 + y[k]**2)
42        factor = exp(-(r-0.15)**2)
43        s[k] = 4000 * factor * (cos(t*2*pi/150) + 2)
44
45    return s
46
47
48def angle(t, x, y):
49    """Rotating field
50    """
51    from math import atan, pi
52
53    x = num.array(x)
54    y = num.array(y)
55
56    N = len(x)
57    a = 0 * x    # New array
58
59    for k in range(N):
60        r = num.sqrt(x[k]**2 + y[k]**2)
61
62        angle = atan(y[k]/x[k])
63
64        if x[k] < 0:
65            angle += pi
66
67        # Take normal direction
68        angle -= pi/2
69
70        # Ensure positive radians
71        if angle < 0:
72            angle += 2*pi
73
74        a[k] = angle/pi*180
75
76    return a
77
78   
79
80class Test_Forcing(unittest.TestCase):
81    def setUp(self):
82        pass
83
84    def tearDown(self):
85        pass
86       
87    def test_constant_wind_stress(self):
88        from anuga.config import rho_a, rho_w, eta_w
89        from math import pi, cos, sin
90
91        a = [0.0, 0.0]
92        b = [0.0, 2.0]
93        c = [2.0, 0.0]
94        d = [0.0, 4.0]
95        e = [2.0, 2.0]
96        f = [4.0, 0.0]
97
98        points = [a, b, c, d, e, f]
99        #             bac,     bce,     ecf,     dbe
100        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
101
102        domain = Domain(points, vertices)
103
104        #Flat surface with 1m of water
105        domain.set_quantity('elevation', 0)
106        domain.set_quantity('stage', 1.0)
107        domain.set_quantity('friction', 0)
108
109        Br = Reflective_boundary(domain)
110        domain.set_boundary({'exterior': Br})
111
112        #Setup only one forcing term, constant wind stress
113        s = 100
114        phi = 135
115        domain.forcing_terms = []
116        domain.forcing_terms.append(Wind_stress(s, phi))
117
118        domain.compute_forcing_terms()
119
120        const = eta_w*rho_a / rho_w
121
122        #Convert to radians
123        phi = phi*pi / 180
124
125        #Compute velocity vector (u, v)
126        u = s*cos(phi)
127        v = s*sin(phi)
128
129        #Compute wind stress
130        S = const * num.sqrt(u**2 + v**2)
131
132        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
133        assert num.allclose(domain.quantities['xmomentum'].explicit_update, S*u)
134        assert num.allclose(domain.quantities['ymomentum'].explicit_update, S*v)
135
136    def test_variable_wind_stress(self):
137        from anuga.config import rho_a, rho_w, eta_w
138        from math import pi, cos, sin
139
140        a = [0.0, 0.0]
141        b = [0.0, 2.0]
142        c = [2.0, 0.0]
143        d = [0.0, 4.0]
144        e = [2.0, 2.0]
145        f = [4.0, 0.0]
146
147        points = [a, b, c, d, e, f]
148        #             bac,     bce,     ecf,     dbe
149        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
150
151        domain = Domain(points, vertices)
152
153        #Flat surface with 1m of water
154        domain.set_quantity('elevation', 0)
155        domain.set_quantity('stage', 1.0)
156        domain.set_quantity('friction', 0)
157
158        Br = Reflective_boundary(domain)
159        domain.set_boundary({'exterior': Br})
160
161        domain.time = 5.54    # Take a random time (not zero)
162
163        #Setup only one forcing term, constant wind stress
164        s = 100
165        phi = 135
166        domain.forcing_terms = []
167        domain.forcing_terms.append(Wind_stress(s=speed, phi=angle))
168
169        domain.compute_forcing_terms()
170
171        #Compute reference solution
172        const = eta_w*rho_a / rho_w
173
174        N = len(domain)    # number_of_triangles
175
176        xc = domain.get_centroid_coordinates()
177        t = domain.time
178
179        x = xc[:,0]
180        y = xc[:,1]
181        s_vec = speed(t,x,y)
182        phi_vec = angle(t,x,y)
183
184        for k in range(N):
185            # Convert to radians
186            phi = phi_vec[k]*pi / 180
187            s = s_vec[k]
188
189            # Compute velocity vector (u, v)
190            u = s*cos(phi)
191            v = s*sin(phi)
192
193            # Compute wind stress
194            S = const * num.sqrt(u**2 + v**2)
195
196            assert num.allclose(domain.quantities['stage'].explicit_update[k],
197                                0)
198            assert num.allclose(domain.quantities['xmomentum'].\
199                                     explicit_update[k],
200                                S*u)
201            assert num.allclose(domain.quantities['ymomentum'].\
202                                     explicit_update[k],
203                                S*v)
204
205    def test_windfield_from_file(self):
206        import time
207        from anuga.config import rho_a, rho_w, eta_w
208        from math import pi, cos, sin
209        from anuga.config import time_format
210        from anuga.abstract_2d_finite_volumes.util import file_function
211
212        a = [0.0, 0.0]
213        b = [0.0, 2.0]
214        c = [2.0, 0.0]
215        d = [0.0, 4.0]
216        e = [2.0, 2.0]
217        f = [4.0, 0.0]
218
219        points = [a, b, c, d, e, f]
220        #             bac,     bce,     ecf,     dbe
221        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
222
223        domain = Domain(points, vertices)
224
225        # Flat surface with 1m of water
226        domain.set_quantity('elevation', 0)
227        domain.set_quantity('stage', 1.0)
228        domain.set_quantity('friction', 0)
229
230        Br = Reflective_boundary(domain)
231        domain.set_boundary({'exterior': Br})
232
233        domain.time = 7    # Take a time that is represented in file (not zero)
234
235        # Write wind stress file (ensure that domain.time is covered)
236        # Take x=1 and y=0
237        filename = 'test_windstress_from_file'
238        start = time.mktime(time.strptime('2000', '%Y'))
239        fid = open(filename + '.txt', 'w')
240        dt = 1    # One second interval
241        t = 0.0
242        while t <= 10.0:
243            t_string = time.strftime(time_format, time.gmtime(t+start))
244
245            fid.write('%s, %f %f\n' %
246                      (t_string, speed(t,[1],[0])[0], angle(t,[1],[0])[0]))
247            t += dt
248
249        fid.close()
250
251        timefile2netcdf(filename + '.txt')
252        os.remove(filename + '.txt')
253
254        # Setup wind stress
255        F = file_function(filename + '.tms',
256                          quantities=['Attribute0', 'Attribute1'])
257        os.remove(filename + '.tms')
258
259        W = Wind_stress(F)
260
261        domain.forcing_terms = []
262        domain.forcing_terms.append(W)
263
264        domain.compute_forcing_terms()
265
266        # Compute reference solution
267        const = eta_w*rho_a / rho_w
268
269        N = len(domain)    # number_of_triangles
270
271        t = domain.time
272
273        s = speed(t, [1], [0])[0]
274        phi = angle(t, [1], [0])[0]
275
276        # Convert to radians
277        phi = phi*pi / 180
278
279        # Compute velocity vector (u, v)
280        u = s*cos(phi)
281        v = s*sin(phi)
282
283        # Compute wind stress
284        S = const * num.sqrt(u**2 + v**2)
285
286        for k in range(N):
287            assert num.allclose(domain.quantities['stage'].explicit_update[k],
288                                0)
289            assert num.allclose(domain.quantities['xmomentum'].\
290                                    explicit_update[k],
291                                S*u)
292            assert num.allclose(domain.quantities['ymomentum'].\
293                                    explicit_update[k],
294                                S*v)
295
296    def test_windfield_from_file_seconds(self):
297        import time
298        from anuga.config import rho_a, rho_w, eta_w
299        from math import pi, cos, sin
300        from anuga.config import time_format
301        from anuga.abstract_2d_finite_volumes.util import file_function
302
303        a = [0.0, 0.0]
304        b = [0.0, 2.0]
305        c = [2.0, 0.0]
306        d = [0.0, 4.0]
307        e = [2.0, 2.0]
308        f = [4.0, 0.0]
309
310        points = [a, b, c, d, e, f]
311        #             bac,     bce,     ecf,     dbe
312        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
313
314        domain = Domain(points, vertices)
315
316        # Flat surface with 1m of water
317        domain.set_quantity('elevation', 0)
318        domain.set_quantity('stage', 1.0)
319        domain.set_quantity('friction', 0)
320
321        Br = Reflective_boundary(domain)
322        domain.set_boundary({'exterior': Br})
323
324        domain.time = 7    # Take a time that is represented in file (not zero)
325
326        # Write wind stress file (ensure that domain.time is covered)
327        # Take x=1 and y=0
328        filename = 'test_windstress_from_file'
329        start = time.mktime(time.strptime('2000', '%Y'))
330        fid = open(filename + '.txt', 'w')
331        dt = 0.5    # Half second interval
332        t = 0.0
333        while t <= 10.0:
334            fid.write('%s, %f %f\n'
335                      % (str(t), speed(t, [1], [0])[0], angle(t, [1], [0])[0]))
336            t += dt
337
338        fid.close()
339
340        timefile2netcdf(filename + '.txt', time_as_seconds=True)
341        os.remove(filename + '.txt')
342
343        # Setup wind stress
344        F = file_function(filename + '.tms',
345                          quantities=['Attribute0', 'Attribute1'])
346        os.remove(filename + '.tms')
347
348        W = Wind_stress(F)
349
350        domain.forcing_terms = []
351        domain.forcing_terms.append(W)
352
353        domain.compute_forcing_terms()
354
355        # Compute reference solution
356        const = eta_w*rho_a / rho_w
357
358        N = len(domain)    # number_of_triangles
359
360        t = domain.time
361
362        s = speed(t, [1], [0])[0]
363        phi = angle(t, [1], [0])[0]
364
365        # Convert to radians
366        phi = phi*pi / 180
367
368        # Compute velocity vector (u, v)
369        u = s*cos(phi)
370        v = s*sin(phi)
371
372        # Compute wind stress
373        S = const * num.sqrt(u**2 + v**2)
374
375        for k in range(N):
376            assert num.allclose(domain.quantities['stage'].explicit_update[k],
377                                0)
378            assert num.allclose(domain.quantities['xmomentum'].\
379                                    explicit_update[k],
380                                S*u)
381            assert num.allclose(domain.quantities['ymomentum'].\
382                                    explicit_update[k],
383                                S*v)
384
385    def test_wind_stress_error_condition(self):
386        """Test that windstress reacts properly when forcing functions
387        are wrong - e.g. returns a scalar
388        """
389
390        from math import pi, cos, sin
391        from anuga.config import rho_a, rho_w, eta_w
392
393        a = [0.0, 0.0]
394        b = [0.0, 2.0]
395        c = [2.0, 0.0]
396        d = [0.0, 4.0]
397        e = [2.0, 2.0]
398        f = [4.0, 0.0]
399
400        points = [a, b, c, d, e, f]
401        #             bac,     bce,     ecf,     dbe
402        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
403
404        domain = Domain(points, vertices)
405
406        # Flat surface with 1m of water
407        domain.set_quantity('elevation', 0)
408        domain.set_quantity('stage', 1.0)
409        domain.set_quantity('friction', 0)
410
411        Br = Reflective_boundary(domain)
412        domain.set_boundary({'exterior': Br})
413
414        domain.time = 5.54    # Take a random time (not zero)
415
416        # Setup only one forcing term, bad func
417        domain.forcing_terms = []
418
419        try:
420            domain.forcing_terms.append(Wind_stress(s=scalar_func_list,
421                                                    phi=angle))
422        except AssertionError:
423            pass
424        else:
425            msg = 'Should have raised exception'
426            raise Exception, msg
427
428        try:
429            domain.forcing_terms.append(Wind_stress(s=speed, phi=scalar_func))
430        except Exception:
431            pass
432        else:
433            msg = 'Should have raised exception'
434            raise Exception, msg
435
436        try:
437            domain.forcing_terms.append(Wind_stress(s=speed, phi='xx'))
438        except:
439            pass
440        else:
441            msg = 'Should have raised exception'
442            raise Exception, msg
443
444    def test_rainfall(self):
445        from math import pi, cos, sin
446
447        a = [0.0, 0.0]
448        b = [0.0, 2.0]
449        c = [2.0, 0.0]
450        d = [0.0, 4.0]
451        e = [2.0, 2.0]
452        f = [4.0, 0.0]
453
454        points = [a, b, c, d, e, f]
455        #             bac,     bce,     ecf,     dbe
456        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
457
458        domain = Domain(points, vertices)
459
460        # Flat surface with 1m of water
461        domain.set_quantity('elevation', 0)
462        domain.set_quantity('stage', 1.0)
463        domain.set_quantity('friction', 0)
464
465        Br = Reflective_boundary(domain)
466        domain.set_boundary({'exterior': Br})
467
468        # Setup only one forcing term, constant rainfall
469        domain.forcing_terms = []
470        domain.forcing_terms.append(Rainfall(domain, rate=2.0))
471
472        domain.compute_forcing_terms()
473        assert num.allclose(domain.quantities['stage'].explicit_update,
474                            2.0/1000)
475
476    def test_rainfall_restricted_by_polygon(self):
477        from math import pi, cos, sin
478
479        a = [0.0, 0.0]
480        b = [0.0, 2.0]
481        c = [2.0, 0.0]
482        d = [0.0, 4.0]
483        e = [2.0, 2.0]
484        f = [4.0, 0.0]
485
486        points = [a, b, c, d, e, f]
487        #             bac,     bce,     ecf,     dbe
488        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
489
490        domain = Domain(points, vertices)
491
492        # Flat surface with 1m of water
493        domain.set_quantity('elevation', 0)
494        domain.set_quantity('stage', 1.0)
495        domain.set_quantity('friction', 0)
496
497        Br = Reflective_boundary(domain)
498        domain.set_boundary({'exterior': Br})
499
500        # Setup only one forcing term, constant rainfall
501        # restricted to a polygon enclosing triangle #1 (bce)
502        domain.forcing_terms = []
503        R = Rainfall(domain, rate=2.0, polygon=[[1,1], [2,1], [2,2], [1,2]])
504
505        assert num.allclose(R.exchange_area, 2)
506
507        domain.forcing_terms.append(R)
508
509        domain.compute_forcing_terms()
510
511        assert num.allclose(domain.quantities['stage'].explicit_update[1],
512                            2.0/1000)
513        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
514        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
515
516    def test_time_dependent_rainfall_restricted_by_polygon(self):
517        a = [0.0, 0.0]
518        b = [0.0, 2.0]
519        c = [2.0, 0.0]
520        d = [0.0, 4.0]
521        e = [2.0, 2.0]
522        f = [4.0, 0.0]
523
524        points = [a, b, c, d, e, f]
525        #             bac,     bce,     ecf,     dbe
526        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
527
528        domain = Domain(points, vertices)
529
530        # Flat surface with 1m of water
531        domain.set_quantity('elevation', 0)
532        domain.set_quantity('stage', 1.0)
533        domain.set_quantity('friction', 0)
534
535        Br = Reflective_boundary(domain)
536        domain.set_boundary({'exterior': Br})
537
538        # Setup only one forcing term, time dependent rainfall
539        # restricted to a polygon enclosing triangle #1 (bce)
540        domain.forcing_terms = []
541        R = Rainfall(domain,
542                     rate=lambda t: 3*t + 7,
543                     polygon = [[1,1], [2,1], [2,2], [1,2]])
544
545        assert num.allclose(R.exchange_area, 2)
546       
547        domain.forcing_terms.append(R)
548
549        domain.time = 10.
550
551        domain.compute_forcing_terms()
552
553        assert num.allclose(domain.quantities['stage'].explicit_update[1],
554                            (3*domain.time + 7)/1000)
555        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
556        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
557
558    def test_time_dependent_rainfall_using_starttime(self):
559        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
560
561        a = [0.0, 0.0]
562        b = [0.0, 2.0]
563        c = [2.0, 0.0]
564        d = [0.0, 4.0]
565        e = [2.0, 2.0]
566        f = [4.0, 0.0]
567
568        points = [a, b, c, d, e, f]
569        #             bac,     bce,     ecf,     dbe
570        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
571
572        domain = Domain(points, vertices)
573
574        # Flat surface with 1m of water
575        domain.set_quantity('elevation', 0)
576        domain.set_quantity('stage', 1.0)
577        domain.set_quantity('friction', 0)
578
579        Br = Reflective_boundary(domain)
580        domain.set_boundary({'exterior': Br})
581
582        # Setup only one forcing term, time dependent rainfall
583        # restricted to a polygon enclosing triangle #1 (bce)
584        domain.forcing_terms = []
585        R = Rainfall(domain,
586                     rate=lambda t: 3*t + 7,
587                     polygon=rainfall_poly)                     
588
589        assert num.allclose(R.exchange_area, 2)
590       
591        domain.forcing_terms.append(R)
592
593        # This will test that time used in the forcing function takes
594        # startime into account.
595        domain.starttime = 5.0
596
597        domain.time = 7.
598
599        domain.compute_forcing_terms()
600
601        assert num.allclose(domain.quantities['stage'].explicit_update[1],
602                            (3*domain.get_time() + 7)/1000)
603        assert num.allclose(domain.quantities['stage'].explicit_update[1],
604                            (3*(domain.time + domain.starttime) + 7)/1000)
605
606        # Using internal time her should fail
607        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
608                                (3*domain.time + 7)/1000)
609
610        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
611        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
612
613    def test_time_dependent_rainfall_using_georef(self):
614        """test_time_dependent_rainfall_using_georef
615
616        This will also test the General forcing term using georef
617        """
618
619        # Mesh in zone 56 (absolute coords)
620        x0 = 314036.58727982
621        y0 = 6224951.2960092
622
623        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
624        rainfall_poly += [x0, y0]
625
626        a = [0.0, 0.0]
627        b = [0.0, 2.0]
628        c = [2.0, 0.0]
629        d = [0.0, 4.0]
630        e = [2.0, 2.0]
631        f = [4.0, 0.0]
632
633        points = [a, b, c, d, e, f]
634        #             bac,     bce,     ecf,     dbe
635        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
636
637        domain = Domain(points, vertices,
638                        geo_reference=Geo_reference(56, x0, y0))
639
640        # Flat surface with 1m of water
641        domain.set_quantity('elevation', 0)
642        domain.set_quantity('stage', 1.0)
643        domain.set_quantity('friction', 0)
644
645        Br = Reflective_boundary(domain)
646        domain.set_boundary({'exterior': Br})
647
648        # Setup only one forcing term, time dependent rainfall
649        # restricted to a polygon enclosing triangle #1 (bce)
650        domain.forcing_terms = []
651        R = Rainfall(domain,
652                     rate=lambda t: 3*t + 7,
653                     polygon=rainfall_poly)
654
655        assert num.allclose(R.exchange_area, 2)
656       
657        domain.forcing_terms.append(R)
658
659        # This will test that time used in the forcing function takes
660        # startime into account.
661        domain.starttime = 5.0
662
663        domain.time = 7.
664
665        domain.compute_forcing_terms()
666
667        assert num.allclose(domain.quantities['stage'].explicit_update[1],
668                            (3*domain.get_time() + 7)/1000)
669        assert num.allclose(domain.quantities['stage'].explicit_update[1],
670                            (3*(domain.time + domain.starttime) + 7)/1000)
671
672        # Using internal time her should fail
673        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
674                                (3*domain.time + 7)/1000)
675
676        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
677        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
678
679    def test_time_dependent_rainfall_restricted_by_polygon_with_default(self):
680        """
681        Test that default rainfall can be used when given rate runs out of data.
682        """
683
684        import warnings
685        warnings.simplefilter('ignore', UserWarning)
686
687
688        a = [0.0, 0.0]
689        b = [0.0, 2.0]
690        c = [2.0, 0.0]
691        d = [0.0, 4.0]
692        e = [2.0, 2.0]
693        f = [4.0, 0.0]
694
695        points = [a, b, c, d, e, f]
696        #             bac,     bce,     ecf,     dbe
697        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
698
699        domain = Domain(points, vertices)
700
701        # Flat surface with 1m of water
702        domain.set_quantity('elevation', 0)
703        domain.set_quantity('stage', 1.0)
704        domain.set_quantity('friction', 0)
705
706        Br = Reflective_boundary(domain)
707        domain.set_boundary({'exterior': Br})
708
709        # Setup only one forcing term, time dependent rainfall
710        # that expires at t==20
711        from anuga.fit_interpolate.interpolate import Modeltime_too_late
712
713        def main_rate(t):
714            if t > 20:
715                msg = 'Model time exceeded.'
716                raise Modeltime_too_late, msg
717            else:
718                return 3*t + 7
719
720        domain.forcing_terms = []
721        R = Rainfall(domain,
722                     rate=main_rate,
723                     polygon = [[1,1], [2,1], [2,2], [1,2]],
724                     default_rate=5.0)
725
726        assert num.allclose(R.exchange_area, 2)
727       
728        domain.forcing_terms.append(R)
729
730        domain.time = 10.
731
732        domain.compute_forcing_terms()
733
734        assert num.allclose(domain.quantities['stage'].explicit_update[1],
735                            (3*domain.time+7)/1000)
736        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
737        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
738
739        domain.time = 100.
740        domain.quantities['stage'].explicit_update[:] = 0.0     # Reset
741        domain.compute_forcing_terms()
742
743        assert num.allclose(domain.quantities['stage'].explicit_update[1],
744                            5.0/1000) # Default value
745        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
746        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
747
748    def test_rainfall_forcing_with_evolve(self):
749        """test_rainfall_forcing_with_evolve
750
751        Test how forcing terms are called within evolve
752        """
753
754        # FIXME(Ole): This test is just to experiment
755        import warnings
756        warnings.simplefilter('ignore', UserWarning)
757
758
759        a = [0.0, 0.0]
760        b = [0.0, 2.0]
761        c = [2.0, 0.0]
762        d = [0.0, 4.0]
763        e = [2.0, 2.0]
764        f = [4.0, 0.0]
765
766        points = [a, b, c, d, e, f]
767        #             bac,     bce,     ecf,     dbe
768        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
769
770        domain = Domain(points, vertices)
771
772        # Flat surface with 1m of water
773        domain.set_quantity('elevation', 0)
774        domain.set_quantity('stage', 1.0)
775        domain.set_quantity('friction', 0)
776
777        Br = Reflective_boundary(domain)
778        domain.set_boundary({'exterior': Br})
779
780        # Setup only one forcing term, time dependent rainfall
781        # that expires at t==20
782        from anuga.fit_interpolate.interpolate import Modeltime_too_late
783
784        def main_rate(t):
785            if t > 20:
786                msg = 'Model time exceeded.'
787                raise Modeltime_too_late, msg
788            else:
789                return 3*t + 7
790
791        domain.forcing_terms = []
792        R = Rainfall(domain,
793                     rate=main_rate,
794                     polygon=[[1,1], [2,1], [2,2], [1,2]],
795                     default_rate=5.0)
796
797        assert num.allclose(R.exchange_area, 2)
798       
799        domain.forcing_terms.append(R)
800
801        for t in domain.evolve(yieldstep=1, finaltime=25):
802            pass
803            #FIXME(Ole):  A test here is hard because explicit_update also
804            # receives updates from the flux calculation.
805
806
807    def test_rainfall_forcing_with_evolve_1(self):
808        """test_rainfall_forcing_with_evolve
809
810        Test how forcing terms are called within evolve.
811        This test checks that proper exception is thrown when no default_rate is set
812        """
813
814        import warnings
815        warnings.simplefilter('ignore', UserWarning)
816
817
818        a = [0.0, 0.0]
819        b = [0.0, 2.0]
820        c = [2.0, 0.0]
821        d = [0.0, 4.0]
822        e = [2.0, 2.0]
823        f = [4.0, 0.0]
824
825        points = [a, b, c, d, e, f]
826        #             bac,     bce,     ecf,     dbe
827        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
828
829        domain = Domain(points, vertices)
830
831        # Flat surface with 1m of water
832        domain.set_quantity('elevation', 0)
833        domain.set_quantity('stage', 1.0)
834        domain.set_quantity('friction', 0)
835
836        Br = Reflective_boundary(domain)
837        domain.set_boundary({'exterior': Br})
838
839        # Setup only one forcing term, time dependent rainfall
840        # that expires at t==20
841        from anuga.fit_interpolate.interpolate import Modeltime_too_late
842
843        def main_rate(t):
844            if t > 20:
845                msg = 'Model time exceeded.'
846                raise Modeltime_too_late, msg
847            else:
848                return 3*t + 7
849
850        domain.forcing_terms = []
851        R = Rainfall(domain,
852                     rate=main_rate,
853                     polygon=[[1,1], [2,1], [2,2], [1,2]])
854
855
856        assert num.allclose(R.exchange_area, 2)
857       
858        domain.forcing_terms.append(R)
859        #for t in domain.evolve(yieldstep=1, finaltime=25):
860        #    pass
861               
862        try:
863            for t in domain.evolve(yieldstep=1, finaltime=25):
864                pass
865        except Modeltime_too_late, e:
866            # Test that error message is as expected
867            assert 'can specify keyword argument default_rate in the forcing function' in str(e)
868        else:
869            raise Exception, 'Should have raised exception'
870
871           
872if __name__ == "__main__":
873    suite = unittest.makeSuite(Test_Forcing, 'test')
874    runner = unittest.TextTestRunner(verbosity=1)
875    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.