source: anuga_core/source/anuga/shallow_water/test_forcing_terms.py @ 7743

Last change on this file since 7743 was 7743, checked in by hudson, 13 years ago

Removed more modules from data_handler: code to do with building destruction.

File size: 40.1 KB
Line 
1#!/usr/bin/env python
2
3import unittest, os
4import os.path
5from math import pi, sqrt
6import tempfile
7
8from anuga.config import g, epsilon, \
9                    netcdf_mode_r, netcdf_mode_w, netcdf_mode_a
10from anuga.geometry.polygon import is_inside_polygon
11from anuga.coordinate_transforms.geo_reference import Geo_reference
12from anuga.abstract_2d_finite_volumes.quantity import Quantity
13from anuga.geospatial_data.geospatial_data import Geospatial_data
14from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
15
16from anuga.utilities.system_tools import get_pathname_from_package
17from anuga.utilities.numerical_tools import ensure_numeric, mean
18
19from shallow_water_domain import Domain
20from boundaries import Reflective_boundary
21from forcing import Wind_stress, Inflow, Rainfall
22from anuga.file_conversion.file_conversion import timefile2netcdf
23
24import numpy as num
25
26# Get gateway to C implementation of flux function for direct testing
27from shallow_water_ext import flux_function_central as flux_function
28
29# Variable windfield implemented using functions
30def speed(t, x, y):
31    """Large speeds halfway between center and edges
32
33    Low speeds at center and edges
34    """
35
36    from math import exp, cos, pi
37
38    x = num.array(x)
39    y = num.array(y)
40
41    N = len(x)
42    s = 0*#New array
43
44    for k in range(N):
45        r = num.sqrt(x[k]**2 + y[k]**2)
46        factor = exp(-(r-0.15)**2)
47        s[k] = 4000 * factor * (cos(t*2*pi/150) + 2)
48
49    return s
50
51def angle(t, x, y):
52    """Rotating field
53    """
54    from math import atan, pi
55
56    x = num.array(x)
57    y = num.array(y)
58
59    N = len(x)
60    a = 0 * x    # New array
61
62    for k in range(N):
63        r = num.sqrt(x[k]**2 + y[k]**2)
64
65        angle = atan(y[k]/x[k])
66
67        if x[k] < 0:
68            angle += pi
69
70        # Take normal direction
71        angle -= pi/2
72
73        # Ensure positive radians
74        if angle < 0:
75            angle += 2*pi
76
77        a[k] = angle/pi*180
78
79    return a
80
81
82def scalar_func(t, x, y):
83    """Function that returns a scalar.
84
85    Used to test error message when numeric array is expected
86    """
87
88    return 17.7
89
90def scalar_func_list(t, x, y):
91    """Function that returns a scalar.
92
93    Used to test error message when numeric array is expected
94    """
95
96    return [17.7]
97
98class Test_forcing_terms(unittest.TestCase):
99    def setUp(self):
100        pass
101
102    def tearDown(self):
103        pass
104
105
106    def test_gravity(self):
107        #Assuming no friction
108
109        from anuga.config import g
110
111        a = [0.0, 0.0]
112        b = [0.0, 2.0]
113        c = [2.0, 0.0]
114        d = [0.0, 4.0]
115        e = [2.0, 2.0]
116        f = [4.0, 0.0]
117
118        points = [a, b, c, d, e, f]
119        #             bac,     bce,     ecf,     dbe
120        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
121
122        domain = Domain(points, vertices)
123
124        #Set up for a gradient of (3,0) at mid triangle (bce)
125        def slope(x, y):
126            return 3*x
127
128        h = 0.1
129        def stage(x, y):
130            return slope(x, y) + h
131
132        domain.set_quantity('elevation', slope)
133        domain.set_quantity('stage', stage)
134
135        for name in domain.conserved_quantities:
136            assert num.allclose(domain.quantities[name].explicit_update, 0)
137            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
138
139        domain.compute_forcing_terms()
140
141        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
142        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
143                            -g*h*3)
144        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
145
146    # FIXME: James these tests are failing - are they outdated?
147    def NOtest_manning_friction(self):
148        from anuga.config import g
149
150        a = [0.0, 0.0]
151        b = [0.0, 2.0]
152        c = [2.0, 0.0]
153        d = [0.0, 4.0]
154        e = [2.0, 2.0]
155        f = [4.0, 0.0]
156
157        points = [a, b, c, d, e, f]
158        #             bac,     bce,     ecf,     dbe
159        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
160
161        domain = Domain(points, vertices)
162
163        #Set up for a gradient of (3,0) at mid triangle (bce)
164        def slope(x, y):
165            return 3*x
166
167        h = 0.1
168        def stage(x, y):
169            return slope(x, y) + h
170
171        eta = 0.07
172        domain.set_quantity('elevation', slope)
173        domain.set_quantity('stage', stage)
174        domain.set_quantity('friction', eta)
175
176        for name in domain.conserved_quantities:
177            assert num.allclose(domain.quantities[name].explicit_update, 0)
178            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
179
180        domain.compute_forcing_terms()
181
182        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
183        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
184                            -g*h*3)
185        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
186
187        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
188        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
189                            0)
190        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
191                            0)
192
193        #Create some momentum for friction to work with
194        domain.set_quantity('xmomentum', 1)
195        S = -g*eta**2 / h**(7.0/3)
196
197        domain.compute_forcing_terms()
198        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
199        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
200                            S)
201        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
202                            0)
203
204        #A more complex example
205        domain.quantities['stage'].semi_implicit_update[:] = 0.0
206        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
207        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
208
209        domain.set_quantity('xmomentum', 3)
210        domain.set_quantity('ymomentum', 4)
211
212        S = -g*eta**2*5 / h**(7.0/3)
213
214        domain.compute_forcing_terms()
215
216        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
217        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
218                            S)
219        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
220                            S)
221
222
223
224    def NOtest_manning_friction_old(self):
225        from anuga.config import g
226
227        a = [0.0, 0.0]
228        b = [0.0, 2.0]
229        c = [2.0, 0.0]
230        d = [0.0, 4.0]
231        e = [2.0, 2.0]
232        f = [4.0, 0.0]
233
234        points = [a, b, c, d, e, f]
235        #             bac,     bce,     ecf,     dbe
236        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
237
238        domain = Domain(points, vertices)
239
240        #Set up for a gradient of (3,0) at mid triangle (bce)
241        def slope(x, y):
242            return 3*x
243
244        h = 0.1
245        def stage(x, y):
246            return slope(x, y) + h
247
248        eta = 0.07
249        domain.set_quantity('elevation', slope)
250        domain.set_quantity('stage', stage)
251        domain.set_quantity('friction', eta)
252
253        for name in domain.conserved_quantities:
254            assert num.allclose(domain.quantities[name].explicit_update, 0)
255            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
256
257        domain.compute_forcing_terms()
258
259        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
260        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
261                            -g*h*3)
262        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
263
264        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
265        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
266                            0)
267        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
268                            0)
269
270        #Create some momentum for friction to work with
271        domain.set_quantity('xmomentum', 1)
272        S = -g*eta**2 / h**(7.0/3)
273
274        domain.compute_forcing_terms()
275        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
276        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
277                            S)
278        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
279                            0)
280
281        #A more complex example
282        domain.quantities['stage'].semi_implicit_update[:] = 0.0
283        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
284        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
285
286        domain.set_quantity('xmomentum', 3)
287        domain.set_quantity('ymomentum', 4)
288        # sqrt(3^2 +4^2) = 5
289
290        S = -g*eta**2 / h**(7.0/3)  * 5
291
292        domain.compute_forcing_terms()
293
294        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
295        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
296                            S)
297        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
298                            S)
299
300
301    def NOtest_manning_friction_new(self):
302        from anuga.config import g
303
304        a = [0.0, 0.0]
305        b = [0.0, 2.0]
306        c = [2.0, 0.0]
307        d = [0.0, 4.0]
308        e = [2.0, 2.0]
309        f = [4.0, 0.0]
310
311        points = [a, b, c, d, e, f]
312        #             bac,     bce,     ecf,     dbe
313        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
314
315        domain = Domain(points, vertices)
316
317        # Use the new function which takes into account the extra
318        # wetted area due to slope of bed
319        domain.set_new_mannings_function(True)
320       
321        #Set up for a gradient of (3,0) at mid triangle (bce)
322        def slope(x, y):
323            return 3*x
324
325        h = 0.1
326        def stage(x, y):
327            return slope(x, y) + h
328
329        eta = 0.07
330        domain.set_quantity('elevation', slope)
331        domain.set_quantity('stage', stage)
332        domain.set_quantity('friction', eta)
333
334        for name in domain.conserved_quantities:
335            assert num.allclose(domain.quantities[name].explicit_update, 0)
336            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
337
338        domain.compute_forcing_terms()
339
340        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
341        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
342                            -g*h*3)
343        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
344
345        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
346        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
347                            0)
348        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
349                            0)
350
351        #Create some momentum for friction to work with
352        domain.set_quantity('xmomentum', 1)
353        S = -g*eta**2 / h**(7.0/3) * sqrt(10)
354
355        domain.compute_forcing_terms()
356        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
357        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
358                            S)
359        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
360                            0)
361
362        #A more complex example
363        domain.quantities['stage'].semi_implicit_update[:] = 0.0
364        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
365        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
366
367        domain.set_quantity('xmomentum', 3)
368        domain.set_quantity('ymomentum', 4)
369
370        S = -g*eta**2*5 / h**(7.0/3) * sqrt(10.0)
371
372        domain.compute_forcing_terms()
373
374        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
375        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
376                            S)
377        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
378                            S)
379       
380    def test_constant_wind_stress(self):
381        from anuga.config import rho_a, rho_w, eta_w
382        from math import pi, cos, sin
383
384        a = [0.0, 0.0]
385        b = [0.0, 2.0]
386        c = [2.0, 0.0]
387        d = [0.0, 4.0]
388        e = [2.0, 2.0]
389        f = [4.0, 0.0]
390
391        points = [a, b, c, d, e, f]
392        #             bac,     bce,     ecf,     dbe
393        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
394
395        domain = Domain(points, vertices)
396
397        #Flat surface with 1m of water
398        domain.set_quantity('elevation', 0)
399        domain.set_quantity('stage', 1.0)
400        domain.set_quantity('friction', 0)
401
402        Br = Reflective_boundary(domain)
403        domain.set_boundary({'exterior': Br})
404
405        #Setup only one forcing term, constant wind stress
406        s = 100
407        phi = 135
408        domain.forcing_terms = []
409        domain.forcing_terms.append(Wind_stress(s, phi))
410
411        domain.compute_forcing_terms()
412
413        const = eta_w*rho_a / rho_w
414
415        #Convert to radians
416        phi = phi*pi / 180
417
418        #Compute velocity vector (u, v)
419        u = s*cos(phi)
420        v = s*sin(phi)
421
422        #Compute wind stress
423        S = const * num.sqrt(u**2 + v**2)
424
425        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
426        assert num.allclose(domain.quantities['xmomentum'].explicit_update, S*u)
427        assert num.allclose(domain.quantities['ymomentum'].explicit_update, S*v)
428
429    def test_variable_wind_stress(self):
430        from anuga.config import rho_a, rho_w, eta_w
431        from math import pi, cos, sin
432
433        a = [0.0, 0.0]
434        b = [0.0, 2.0]
435        c = [2.0, 0.0]
436        d = [0.0, 4.0]
437        e = [2.0, 2.0]
438        f = [4.0, 0.0]
439
440        points = [a, b, c, d, e, f]
441        #             bac,     bce,     ecf,     dbe
442        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
443
444        domain = Domain(points, vertices)
445
446        #Flat surface with 1m of water
447        domain.set_quantity('elevation', 0)
448        domain.set_quantity('stage', 1.0)
449        domain.set_quantity('friction', 0)
450
451        Br = Reflective_boundary(domain)
452        domain.set_boundary({'exterior': Br})
453
454        domain.time = 5.54    # Take a random time (not zero)
455
456        #Setup only one forcing term, constant wind stress
457        s = 100
458        phi = 135
459        domain.forcing_terms = []
460        domain.forcing_terms.append(Wind_stress(s=speed, phi=angle))
461
462        domain.compute_forcing_terms()
463
464        #Compute reference solution
465        const = eta_w*rho_a / rho_w
466
467        N = len(domain)    # number_of_triangles
468
469        xc = domain.get_centroid_coordinates()
470        t = domain.time
471
472        x = xc[:,0]
473        y = xc[:,1]
474        s_vec = speed(t,x,y)
475        phi_vec = angle(t,x,y)
476
477        for k in range(N):
478            # Convert to radians
479            phi = phi_vec[k]*pi / 180
480            s = s_vec[k]
481
482            # Compute velocity vector (u, v)
483            u = s*cos(phi)
484            v = s*sin(phi)
485
486            # Compute wind stress
487            S = const * num.sqrt(u**2 + v**2)
488
489            assert num.allclose(domain.quantities['stage'].explicit_update[k],
490                                0)
491            assert num.allclose(domain.quantities['xmomentum'].\
492                                     explicit_update[k],
493                                S*u)
494            assert num.allclose(domain.quantities['ymomentum'].\
495                                     explicit_update[k],
496                                S*v)
497
498    def test_windfield_from_file(self):
499        import time
500        from anuga.config import rho_a, rho_w, eta_w
501        from math import pi, cos, sin
502        from anuga.config import time_format
503        from anuga.abstract_2d_finite_volumes.util import file_function
504
505        a = [0.0, 0.0]
506        b = [0.0, 2.0]
507        c = [2.0, 0.0]
508        d = [0.0, 4.0]
509        e = [2.0, 2.0]
510        f = [4.0, 0.0]
511
512        points = [a, b, c, d, e, f]
513        #             bac,     bce,     ecf,     dbe
514        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
515
516        domain = Domain(points, vertices)
517
518        # Flat surface with 1m of water
519        domain.set_quantity('elevation', 0)
520        domain.set_quantity('stage', 1.0)
521        domain.set_quantity('friction', 0)
522
523        Br = Reflective_boundary(domain)
524        domain.set_boundary({'exterior': Br})
525
526        domain.time = 7    # Take a time that is represented in file (not zero)
527
528        # Write wind stress file (ensure that domain.time is covered)
529        # Take x=1 and y=0
530        filename = 'test_windstress_from_file'
531        start = time.mktime(time.strptime('2000', '%Y'))
532        fid = open(filename + '.txt', 'w')
533        dt = 1    # One second interval
534        t = 0.0
535        while t <= 10.0:
536            t_string = time.strftime(time_format, time.gmtime(t+start))
537
538            fid.write('%s, %f %f\n' %
539                      (t_string, speed(t,[1],[0])[0], angle(t,[1],[0])[0]))
540            t += dt
541
542        fid.close()
543
544        # Convert ASCII file to NetCDF (Which is what we really like!)
545        timefile2netcdf(filename)
546        os.remove(filename + '.txt')
547
548        # Setup wind stress
549        F = file_function(filename + '.tms',
550                          quantities=['Attribute0', 'Attribute1'])
551        os.remove(filename + '.tms')
552
553        W = Wind_stress(F)
554
555        domain.forcing_terms = []
556        domain.forcing_terms.append(W)
557
558        domain.compute_forcing_terms()
559
560        # Compute reference solution
561        const = eta_w*rho_a / rho_w
562
563        N = len(domain)    # number_of_triangles
564
565        t = domain.time
566
567        s = speed(t, [1], [0])[0]
568        phi = angle(t, [1], [0])[0]
569
570        # Convert to radians
571        phi = phi*pi / 180
572
573        # Compute velocity vector (u, v)
574        u = s*cos(phi)
575        v = s*sin(phi)
576
577        # Compute wind stress
578        S = const * num.sqrt(u**2 + v**2)
579
580        for k in range(N):
581            assert num.allclose(domain.quantities['stage'].explicit_update[k],
582                                0)
583            assert num.allclose(domain.quantities['xmomentum'].\
584                                    explicit_update[k],
585                                S*u)
586            assert num.allclose(domain.quantities['ymomentum'].\
587                                    explicit_update[k],
588                                S*v)
589
590    def test_windfield_from_file_seconds(self):
591        import time
592        from anuga.config import rho_a, rho_w, eta_w
593        from math import pi, cos, sin
594        from anuga.config import time_format
595        from anuga.abstract_2d_finite_volumes.util import file_function
596
597        a = [0.0, 0.0]
598        b = [0.0, 2.0]
599        c = [2.0, 0.0]
600        d = [0.0, 4.0]
601        e = [2.0, 2.0]
602        f = [4.0, 0.0]
603
604        points = [a, b, c, d, e, f]
605        #             bac,     bce,     ecf,     dbe
606        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
607
608        domain = Domain(points, vertices)
609
610        # Flat surface with 1m of water
611        domain.set_quantity('elevation', 0)
612        domain.set_quantity('stage', 1.0)
613        domain.set_quantity('friction', 0)
614
615        Br = Reflective_boundary(domain)
616        domain.set_boundary({'exterior': Br})
617
618        domain.time = 7    # Take a time that is represented in file (not zero)
619
620        # Write wind stress file (ensure that domain.time is covered)
621        # Take x=1 and y=0
622        filename = 'test_windstress_from_file'
623        start = time.mktime(time.strptime('2000', '%Y'))
624        fid = open(filename + '.txt', 'w')
625        dt = 0.5    # Half second interval
626        t = 0.0
627        while t <= 10.0:
628            fid.write('%s, %f %f\n'
629                      % (str(t), speed(t, [1], [0])[0], angle(t, [1], [0])[0]))
630            t += dt
631
632        fid.close()
633
634        # Convert ASCII file to NetCDF (Which is what we really like!)
635        timefile2netcdf(filename, time_as_seconds=True)
636        os.remove(filename + '.txt')
637
638        # Setup wind stress
639        F = file_function(filename + '.tms',
640                          quantities=['Attribute0', 'Attribute1'])
641        os.remove(filename + '.tms')
642
643        W = Wind_stress(F)
644
645        domain.forcing_terms = []
646        domain.forcing_terms.append(W)
647
648        domain.compute_forcing_terms()
649
650        # Compute reference solution
651        const = eta_w*rho_a / rho_w
652
653        N = len(domain)    # number_of_triangles
654
655        t = domain.time
656
657        s = speed(t, [1], [0])[0]
658        phi = angle(t, [1], [0])[0]
659
660        # Convert to radians
661        phi = phi*pi / 180
662
663        # Compute velocity vector (u, v)
664        u = s*cos(phi)
665        v = s*sin(phi)
666
667        # Compute wind stress
668        S = const * num.sqrt(u**2 + v**2)
669
670        for k in range(N):
671            assert num.allclose(domain.quantities['stage'].explicit_update[k],
672                                0)
673            assert num.allclose(domain.quantities['xmomentum'].\
674                                    explicit_update[k],
675                                S*u)
676            assert num.allclose(domain.quantities['ymomentum'].\
677                                    explicit_update[k],
678                                S*v)
679
680    def test_wind_stress_error_condition(self):
681        """Test that windstress reacts properly when forcing functions
682        are wrong - e.g. returns a scalar
683        """
684
685        from math import pi, cos, sin
686        from anuga.config import rho_a, rho_w, eta_w
687
688        a = [0.0, 0.0]
689        b = [0.0, 2.0]
690        c = [2.0, 0.0]
691        d = [0.0, 4.0]
692        e = [2.0, 2.0]
693        f = [4.0, 0.0]
694
695        points = [a, b, c, d, e, f]
696        #             bac,     bce,     ecf,     dbe
697        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
698
699        domain = Domain(points, vertices)
700
701        # Flat surface with 1m of water
702        domain.set_quantity('elevation', 0)
703        domain.set_quantity('stage', 1.0)
704        domain.set_quantity('friction', 0)
705
706        Br = Reflective_boundary(domain)
707        domain.set_boundary({'exterior': Br})
708
709        domain.time = 5.54    # Take a random time (not zero)
710
711        # Setup only one forcing term, bad func
712        domain.forcing_terms = []
713
714        try:
715            domain.forcing_terms.append(Wind_stress(s=scalar_func_list,
716                                                    phi=angle))
717        except AssertionError:
718            pass
719        else:
720            msg = 'Should have raised exception'
721            raise Exception, msg
722
723        try:
724            domain.forcing_terms.append(Wind_stress(s=speed, phi=scalar_func))
725        except Exception:
726            pass
727        else:
728            msg = 'Should have raised exception'
729            raise Exception, msg
730
731        try:
732            domain.forcing_terms.append(Wind_stress(s=speed, phi='xx'))
733        except:
734            pass
735        else:
736            msg = 'Should have raised exception'
737            raise Exception, msg
738
739
740    def test_rainfall(self):
741        from math import pi, cos, sin
742
743        a = [0.0, 0.0]
744        b = [0.0, 2.0]
745        c = [2.0, 0.0]
746        d = [0.0, 4.0]
747        e = [2.0, 2.0]
748        f = [4.0, 0.0]
749
750        points = [a, b, c, d, e, f]
751        #             bac,     bce,     ecf,     dbe
752        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
753
754        domain = Domain(points, vertices)
755
756        # Flat surface with 1m of water
757        domain.set_quantity('elevation', 0)
758        domain.set_quantity('stage', 1.0)
759        domain.set_quantity('friction', 0)
760
761        Br = Reflective_boundary(domain)
762        domain.set_boundary({'exterior': Br})
763
764        # Setup only one forcing term, constant rainfall
765        domain.forcing_terms = []
766        domain.forcing_terms.append(Rainfall(domain, rate=2.0))
767
768        domain.compute_forcing_terms()
769        assert num.allclose(domain.quantities['stage'].explicit_update,
770                            2.0/1000)
771
772    def test_rainfall_restricted_by_polygon(self):
773        from math import pi, cos, sin
774
775        a = [0.0, 0.0]
776        b = [0.0, 2.0]
777        c = [2.0, 0.0]
778        d = [0.0, 4.0]
779        e = [2.0, 2.0]
780        f = [4.0, 0.0]
781
782        points = [a, b, c, d, e, f]
783        #             bac,     bce,     ecf,     dbe
784        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
785
786        domain = Domain(points, vertices)
787
788        # Flat surface with 1m of water
789        domain.set_quantity('elevation', 0)
790        domain.set_quantity('stage', 1.0)
791        domain.set_quantity('friction', 0)
792
793        Br = Reflective_boundary(domain)
794        domain.set_boundary({'exterior': Br})
795
796        # Setup only one forcing term, constant rainfall
797        # restricted to a polygon enclosing triangle #1 (bce)
798        domain.forcing_terms = []
799        R = Rainfall(domain, rate=2.0, polygon=[[1,1], [2,1], [2,2], [1,2]])
800
801        assert num.allclose(R.exchange_area, 2)
802
803        domain.forcing_terms.append(R)
804
805        domain.compute_forcing_terms()
806
807        assert num.allclose(domain.quantities['stage'].explicit_update[1],
808                            2.0/1000)
809        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
810        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
811
812    def test_time_dependent_rainfall_restricted_by_polygon(self):
813        a = [0.0, 0.0]
814        b = [0.0, 2.0]
815        c = [2.0, 0.0]
816        d = [0.0, 4.0]
817        e = [2.0, 2.0]
818        f = [4.0, 0.0]
819
820        points = [a, b, c, d, e, f]
821        #             bac,     bce,     ecf,     dbe
822        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
823
824        domain = Domain(points, vertices)
825
826        # Flat surface with 1m of water
827        domain.set_quantity('elevation', 0)
828        domain.set_quantity('stage', 1.0)
829        domain.set_quantity('friction', 0)
830
831        Br = Reflective_boundary(domain)
832        domain.set_boundary({'exterior': Br})
833
834        # Setup only one forcing term, time dependent rainfall
835        # restricted to a polygon enclosing triangle #1 (bce)
836        domain.forcing_terms = []
837        R = Rainfall(domain,
838                     rate=lambda t: 3*t + 7,
839                     polygon = [[1,1], [2,1], [2,2], [1,2]])
840
841        assert num.allclose(R.exchange_area, 2)
842       
843        domain.forcing_terms.append(R)
844
845        domain.time = 10.
846
847        domain.compute_forcing_terms()
848
849        assert num.allclose(domain.quantities['stage'].explicit_update[1],
850                            (3*domain.time + 7)/1000)
851        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
852        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
853
854    def test_time_dependent_rainfall_using_starttime(self):
855        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
856
857        a = [0.0, 0.0]
858        b = [0.0, 2.0]
859        c = [2.0, 0.0]
860        d = [0.0, 4.0]
861        e = [2.0, 2.0]
862        f = [4.0, 0.0]
863
864        points = [a, b, c, d, e, f]
865        #             bac,     bce,     ecf,     dbe
866        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
867
868        domain = Domain(points, vertices)
869
870        # Flat surface with 1m of water
871        domain.set_quantity('elevation', 0)
872        domain.set_quantity('stage', 1.0)
873        domain.set_quantity('friction', 0)
874
875        Br = Reflective_boundary(domain)
876        domain.set_boundary({'exterior': Br})
877
878        # Setup only one forcing term, time dependent rainfall
879        # restricted to a polygon enclosing triangle #1 (bce)
880        domain.forcing_terms = []
881        R = Rainfall(domain,
882                     rate=lambda t: 3*t + 7,
883                     polygon=rainfall_poly)                     
884
885        assert num.allclose(R.exchange_area, 2)
886       
887        domain.forcing_terms.append(R)
888
889        # This will test that time used in the forcing function takes
890        # startime into account.
891        domain.starttime = 5.0
892
893        domain.time = 7.
894
895        domain.compute_forcing_terms()
896
897        assert num.allclose(domain.quantities['stage'].explicit_update[1],
898                            (3*domain.get_time() + 7)/1000)
899        assert num.allclose(domain.quantities['stage'].explicit_update[1],
900                            (3*(domain.time + domain.starttime) + 7)/1000)
901
902        # Using internal time her should fail
903        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
904                                (3*domain.time + 7)/1000)
905
906        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
907        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
908
909    def test_time_dependent_rainfall_using_georef(self):
910        """test_time_dependent_rainfall_using_georef
911
912        This will also test the General forcing term using georef
913        """
914
915        # Mesh in zone 56 (absolute coords)
916        x0 = 314036.58727982
917        y0 = 6224951.2960092
918
919        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
920        rainfall_poly += [x0, y0]
921
922        a = [0.0, 0.0]
923        b = [0.0, 2.0]
924        c = [2.0, 0.0]
925        d = [0.0, 4.0]
926        e = [2.0, 2.0]
927        f = [4.0, 0.0]
928
929        points = [a, b, c, d, e, f]
930        #             bac,     bce,     ecf,     dbe
931        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
932
933        domain = Domain(points, vertices,
934                        geo_reference=Geo_reference(56, x0, y0))
935
936        # Flat surface with 1m of water
937        domain.set_quantity('elevation', 0)
938        domain.set_quantity('stage', 1.0)
939        domain.set_quantity('friction', 0)
940
941        Br = Reflective_boundary(domain)
942        domain.set_boundary({'exterior': Br})
943
944        # Setup only one forcing term, time dependent rainfall
945        # restricted to a polygon enclosing triangle #1 (bce)
946        domain.forcing_terms = []
947        R = Rainfall(domain,
948                     rate=lambda t: 3*t + 7,
949                     polygon=rainfall_poly)
950
951        assert num.allclose(R.exchange_area, 2)
952       
953        domain.forcing_terms.append(R)
954
955        # This will test that time used in the forcing function takes
956        # startime into account.
957        domain.starttime = 5.0
958
959        domain.time = 7.
960
961        domain.compute_forcing_terms()
962
963        assert num.allclose(domain.quantities['stage'].explicit_update[1],
964                            (3*domain.get_time() + 7)/1000)
965        assert num.allclose(domain.quantities['stage'].explicit_update[1],
966                            (3*(domain.time + domain.starttime) + 7)/1000)
967
968        # Using internal time her should fail
969        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
970                                (3*domain.time + 7)/1000)
971
972        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
973        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
974
975    def test_time_dependent_rainfall_restricted_by_polygon_with_default(self):
976        """
977        Test that default rainfall can be used when given rate runs out of data.
978        """
979
980        a = [0.0, 0.0]
981        b = [0.0, 2.0]
982        c = [2.0, 0.0]
983        d = [0.0, 4.0]
984        e = [2.0, 2.0]
985        f = [4.0, 0.0]
986
987        points = [a, b, c, d, e, f]
988        #             bac,     bce,     ecf,     dbe
989        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
990
991        domain = Domain(points, vertices)
992
993        # Flat surface with 1m of water
994        domain.set_quantity('elevation', 0)
995        domain.set_quantity('stage', 1.0)
996        domain.set_quantity('friction', 0)
997
998        Br = Reflective_boundary(domain)
999        domain.set_boundary({'exterior': Br})
1000
1001        # Setup only one forcing term, time dependent rainfall
1002        # that expires at t==20
1003        from anuga.fit_interpolate.interpolate import Modeltime_too_late
1004
1005        def main_rate(t):
1006            if t > 20:
1007                msg = 'Model time exceeded.'
1008                raise Modeltime_too_late, msg
1009            else:
1010                return 3*t + 7
1011
1012        domain.forcing_terms = []
1013        R = Rainfall(domain,
1014                     rate=main_rate,
1015                     polygon = [[1,1], [2,1], [2,2], [1,2]],
1016                     default_rate=5.0)
1017
1018        assert num.allclose(R.exchange_area, 2)
1019       
1020        domain.forcing_terms.append(R)
1021
1022        domain.time = 10.
1023
1024        domain.compute_forcing_terms()
1025
1026        assert num.allclose(domain.quantities['stage'].explicit_update[1],
1027                            (3*domain.time+7)/1000)
1028        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
1029        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
1030
1031        domain.time = 100.
1032        domain.quantities['stage'].explicit_update[:] = 0.0     # Reset
1033        domain.compute_forcing_terms()
1034
1035        assert num.allclose(domain.quantities['stage'].explicit_update[1],
1036                            5.0/1000) # Default value
1037        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
1038        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
1039
1040    def test_rainfall_forcing_with_evolve(self):
1041        """test_rainfall_forcing_with_evolve
1042
1043        Test how forcing terms are called within evolve
1044        """
1045
1046        # FIXME(Ole): This test is just to experiment
1047
1048        a = [0.0, 0.0]
1049        b = [0.0, 2.0]
1050        c = [2.0, 0.0]
1051        d = [0.0, 4.0]
1052        e = [2.0, 2.0]
1053        f = [4.0, 0.0]
1054
1055        points = [a, b, c, d, e, f]
1056        #             bac,     bce,     ecf,     dbe
1057        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1058
1059        domain = Domain(points, vertices)
1060
1061        # Flat surface with 1m of water
1062        domain.set_quantity('elevation', 0)
1063        domain.set_quantity('stage', 1.0)
1064        domain.set_quantity('friction', 0)
1065
1066        Br = Reflective_boundary(domain)
1067        domain.set_boundary({'exterior': Br})
1068
1069        # Setup only one forcing term, time dependent rainfall
1070        # that expires at t==20
1071        from anuga.fit_interpolate.interpolate import Modeltime_too_late
1072
1073        def main_rate(t):
1074            if t > 20:
1075                msg = 'Model time exceeded.'
1076                raise Modeltime_too_late, msg
1077            else:
1078                return 3*t + 7
1079
1080        domain.forcing_terms = []
1081        R = Rainfall(domain,
1082                     rate=main_rate,
1083                     polygon=[[1,1], [2,1], [2,2], [1,2]],
1084                     default_rate=5.0)
1085
1086        assert num.allclose(R.exchange_area, 2)
1087       
1088        domain.forcing_terms.append(R)
1089
1090        for t in domain.evolve(yieldstep=1, finaltime=25):
1091            pass
1092            #FIXME(Ole):  A test here is hard because explicit_update also
1093            # receives updates from the flux calculation.
1094
1095
1096    def test_rainfall_forcing_with_evolve_1(self):
1097        """test_rainfall_forcing_with_evolve
1098
1099        Test how forcing terms are called within evolve.
1100        This test checks that proper exception is thrown when no default_rate is set
1101        """
1102
1103
1104        a = [0.0, 0.0]
1105        b = [0.0, 2.0]
1106        c = [2.0, 0.0]
1107        d = [0.0, 4.0]
1108        e = [2.0, 2.0]
1109        f = [4.0, 0.0]
1110
1111        points = [a, b, c, d, e, f]
1112        #             bac,     bce,     ecf,     dbe
1113        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1114
1115        domain = Domain(points, vertices)
1116
1117        # Flat surface with 1m of water
1118        domain.set_quantity('elevation', 0)
1119        domain.set_quantity('stage', 1.0)
1120        domain.set_quantity('friction', 0)
1121
1122        Br = Reflective_boundary(domain)
1123        domain.set_boundary({'exterior': Br})
1124
1125        # Setup only one forcing term, time dependent rainfall
1126        # that expires at t==20
1127        from anuga.fit_interpolate.interpolate import Modeltime_too_late
1128
1129        def main_rate(t):
1130            if t > 20:
1131                msg = 'Model time exceeded.'
1132                raise Modeltime_too_late, msg
1133            else:
1134                return 3*t + 7
1135
1136        domain.forcing_terms = []
1137        R = Rainfall(domain,
1138                     rate=main_rate,
1139                     polygon=[[1,1], [2,1], [2,2], [1,2]])
1140
1141
1142        assert num.allclose(R.exchange_area, 2)
1143       
1144        domain.forcing_terms.append(R)
1145        #for t in domain.evolve(yieldstep=1, finaltime=25):
1146        #    pass
1147               
1148        try:
1149            for t in domain.evolve(yieldstep=1, finaltime=25):
1150                pass
1151        except Modeltime_too_late, e:
1152            # Test that error message is as expected
1153            assert 'can specify keyword argument default_rate in the forcing function' in str(e)
1154        else:
1155            raise Exception, 'Should have raised exception'
1156
1157           
1158           
1159    def test_inflow_using_circle(self):
1160        from math import pi, cos, sin
1161
1162        a = [0.0, 0.0]
1163        b = [0.0, 2.0]
1164        c = [2.0, 0.0]
1165        d = [0.0, 4.0]
1166        e = [2.0, 2.0]
1167        f = [4.0, 0.0]
1168
1169        points = [a, b, c, d, e, f]
1170        #             bac,     bce,     ecf,     dbe
1171        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1172
1173        domain = Domain(points, vertices)
1174
1175        # Flat surface with 1m of water
1176        domain.set_quantity('elevation', 0)
1177        domain.set_quantity('stage', 1.0)
1178        domain.set_quantity('friction', 0)
1179
1180        Br = Reflective_boundary(domain)
1181        domain.set_boundary({'exterior': Br})
1182
1183        # Setup only one forcing term, constant inflow of 2 m^3/s
1184        # on a circle affecting triangles #0 and #1 (bac and bce)
1185        domain.forcing_terms = []
1186       
1187        I = Inflow(domain, rate=2.0, center=(1,1), radius=1)
1188        domain.forcing_terms.append(I)       
1189        domain.compute_forcing_terms()
1190
1191       
1192        A = I.exchange_area
1193        assert num.allclose(A, 4) # Two triangles       
1194       
1195        assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A)
1196        assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A)
1197        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)       
1198
1199
1200    def test_inflow_using_circle_function(self):
1201        from math import pi, cos, sin
1202
1203        a = [0.0, 0.0]
1204        b = [0.0, 2.0]
1205        c = [2.0, 0.0]
1206        d = [0.0, 4.0]
1207        e = [2.0, 2.0]
1208        f = [4.0, 0.0]
1209
1210        points = [a, b, c, d, e, f]
1211        #             bac,     bce,     ecf,     dbe
1212        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1213
1214        domain = Domain(points, vertices)
1215
1216        # Flat surface with 1m of water
1217        domain.set_quantity('elevation', 0)
1218        domain.set_quantity('stage', 1.0)
1219        domain.set_quantity('friction', 0)
1220
1221        Br = Reflective_boundary(domain)
1222        domain.set_boundary({'exterior': Br})
1223
1224        # Setup only one forcing term, time dependent inflow of 2 m^3/s
1225        # on a circle affecting triangles #0 and #1 (bac and bce)
1226        domain.forcing_terms = []
1227        I = Inflow(domain, rate=lambda t: 2., center=(1,1), radius=1)
1228        domain.forcing_terms.append(I)
1229       
1230        domain.compute_forcing_terms()       
1231
1232        A = I.exchange_area
1233        assert num.allclose(A, 4) # Two triangles
1234       
1235        assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A)
1236        assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A)
1237        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)       
1238       
1239
1240
1241
1242    def test_inflow_catch_too_few_triangles(self):
1243        """
1244        Test that exception is thrown if no triangles are covered
1245        by the inflow area
1246        """
1247
1248        from math import pi, cos, sin
1249
1250        a = [0.0, 0.0]
1251        b = [0.0, 2.0]
1252        c = [2.0, 0.0]
1253        d = [0.0, 4.0]
1254        e = [2.0, 2.0]
1255        f = [4.0, 0.0]
1256
1257        points = [a, b, c, d, e, f]
1258        #             bac,     bce,     ecf,     dbe
1259        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1260
1261        domain = Domain(points, vertices)
1262
1263        # Flat surface with 1m of water
1264        domain.set_quantity('elevation', 0)
1265        domain.set_quantity('stage', 1.0)
1266        domain.set_quantity('friction', 0)
1267
1268        Br = Reflective_boundary(domain)
1269        domain.set_boundary({'exterior': Br})
1270
1271        # Setup only one forcing term, constant inflow of 2 m^3/s
1272        # on a circle affecting triangles #0 and #1 (bac and bce)
1273        try:
1274            Inflow(domain, rate=2.0, center=(1,1.1), radius=0.01)
1275        except:
1276            pass
1277        else:
1278            msg = 'Should have raised exception'
1279            raise Exception, msg
1280
1281       
1282
1283#################################################################################
1284
1285if __name__ == "__main__":
1286    #suite = unittest.makeSuite(Test_forcing_terms, 'test_volume_conservation_rain')
1287    #FIXME: James - these tests seem to be invalid. Please investigate
1288    suite = unittest.makeSuite(Test_forcing_terms, 'test')
1289    runner = unittest.TextTestRunner(verbosity=1)
1290    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.