source: anuga_core/source/anuga/shallow_water_balanced/test_swb_forcing_terms.py @ 7559

Last change on this file since 7559 was 7559, checked in by steve, 10 years ago

Broke up test_swb_domain so that we can concentrate on new functionality

File size: 60.9 KB
Line 
1#!/usr/bin/env python
2
3import unittest, os
4import os.path
5from math import pi, sqrt
6import tempfile
7
8from anuga.config import g, epsilon
9from anuga.config import netcdf_mode_r, netcdf_mode_w, netcdf_mode_a
10from anuga.utilities.numerical_tools import mean
11from anuga.utilities.polygon import is_inside_polygon
12from anuga.coordinate_transforms.geo_reference import Geo_reference
13from anuga.abstract_2d_finite_volumes.quantity import Quantity
14from anuga.geospatial_data.geospatial_data import Geospatial_data
15from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
16
17from anuga.utilities.system_tools import get_pathname_from_package
18from swb_domain import *
19
20import numpy as num
21
22# Get gateway to C implementation of flux function for direct testing
23from shallow_water_ext import flux_function_central as flux_function
24
25
26
27
28class Test_swb_clean(unittest.TestCase):
29    def setUp(self):
30        pass
31
32    def tearDown(self):
33        pass
34
35    def test_gravity(self):
36        #Assuming no friction
37
38        from anuga.config import g
39
40        a = [0.0, 0.0]
41        b = [0.0, 2.0]
42        c = [2.0, 0.0]
43        d = [0.0, 4.0]
44        e = [2.0, 2.0]
45        f = [4.0, 0.0]
46
47        points = [a, b, c, d, e, f]
48        #             bac,     bce,     ecf,     dbe
49        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
50
51        domain = Domain(points, vertices)
52
53        #Set up for a gradient of (3,0) at mid triangle (bce)
54        def slope(x, y):
55            return 3*x
56
57        h = 0.1
58        def stage(x, y):
59            return slope(x, y) + h
60
61        domain.set_quantity('elevation', slope)
62        domain.set_quantity('stage', stage)
63
64        for name in domain.conserved_quantities:
65            assert num.allclose(domain.quantities[name].explicit_update, 0)
66            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
67
68        domain.compute_forcing_terms()
69
70        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
71        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
72                            -g*h*3)
73        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
74
75    def test_manning_friction(self):
76        from anuga.config import g
77
78        a = [0.0, 0.0]
79        b = [0.0, 2.0]
80        c = [2.0, 0.0]
81        d = [0.0, 4.0]
82        e = [2.0, 2.0]
83        f = [4.0, 0.0]
84
85        points = [a, b, c, d, e, f]
86        #             bac,     bce,     ecf,     dbe
87        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
88
89        domain = Domain(points, vertices)
90
91        #Set up for a gradient of (3,0) at mid triangle (bce)
92        def slope(x, y):
93            return 3*x
94
95        h = 0.1
96        def stage(x, y):
97            return slope(x, y) + h
98
99        eta = 0.07
100        domain.set_quantity('elevation', slope)
101        domain.set_quantity('stage', stage)
102        domain.set_quantity('friction', eta)
103
104        for name in domain.conserved_quantities:
105            assert num.allclose(domain.quantities[name].explicit_update, 0)
106            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
107
108        domain.compute_forcing_terms()
109
110        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
111        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
112                            -g*h*3)
113        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
114
115        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
116        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
117                            0)
118        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
119                            0)
120
121        #Create some momentum for friction to work with
122        domain.set_quantity('xmomentum', 1)
123        dz = sqrt(10.0)
124        S = -g*eta**2 *dz / h**(7.0/3) 
125
126        domain.compute_forcing_terms()
127        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
128        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
129                            S)
130        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
131                            0)
132
133        #A more complex example
134        domain.quantities['stage'].semi_implicit_update[:] = 0.0
135        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
136        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
137
138        domain.set_quantity('xmomentum', 3)
139        domain.set_quantity('ymomentum', 4)
140
141        S = -g*eta**2*5*dz / h**(7.0/3) 
142
143        domain.compute_forcing_terms()
144
145        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
146        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
147                            3*S)
148        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
149                            4*S)
150
151
152
153    def test_manning_friction_old(self):
154        from anuga.config import g
155
156        a = [0.0, 0.0]
157        b = [0.0, 2.0]
158        c = [2.0, 0.0]
159        d = [0.0, 4.0]
160        e = [2.0, 2.0]
161        f = [4.0, 0.0]
162
163        points = [a, b, c, d, e, f]
164        #             bac,     bce,     ecf,     dbe
165        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
166
167        domain = Domain(points, vertices)
168
169        #Turn old mannings function on
170        domain.set_new_mannings_function(False)
171
172        #Set up for a gradient of (3,0) at mid triangle (bce)
173        def slope(x, y):
174            return 3*x
175
176        h = 0.1
177        def stage(x, y):
178            return slope(x, y) + h
179
180        eta = 0.07
181        domain.set_quantity('elevation', slope)
182        domain.set_quantity('stage', stage)
183        domain.set_quantity('friction', eta)
184
185        for name in domain.conserved_quantities:
186            assert num.allclose(domain.quantities[name].explicit_update, 0)
187            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
188
189        domain.compute_forcing_terms()
190
191        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
192        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
193                            -g*h*3)
194        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
195
196        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
197        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
198                            0)
199        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
200                            0)
201
202        #Create some momentum for friction to work with
203        domain.set_quantity('xmomentum', 1)
204        S = -g*eta**2 / h**(7.0/3)
205
206        domain.compute_forcing_terms()
207        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
208        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
209                            S)
210        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
211                            0)
212
213        #A more complex example
214        domain.quantities['stage'].semi_implicit_update[:] = 0.0
215        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
216        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
217
218        domain.set_quantity('xmomentum', 3)
219        domain.set_quantity('ymomentum', 4)
220
221        S = -g*eta**2*5 / h**(7.0/3)
222
223        domain.compute_forcing_terms()
224
225        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
226        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
227                            3*S)
228        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
229                            4*S)
230
231
232    def test_manning_friction_new(self):
233        from anuga.config import g
234
235        a = [0.0, 0.0]
236        b = [0.0, 2.0]
237        c = [2.0, 0.0]
238        d = [0.0, 4.0]
239        e = [2.0, 2.0]
240        f = [4.0, 0.0]
241
242        points = [a, b, c, d, e, f]
243        #             bac,     bce,     ecf,     dbe
244        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
245
246        domain = Domain(points, vertices)
247
248        # Use the new function which takes into account the extra
249        # wetted area due to slope of bed
250        domain.set_new_mannings_function(True)
251       
252        #Set up for a gradient of (3,0) at mid triangle (bce)
253        def slope(x, y):
254            return 3*x
255
256        h = 0.1
257        def stage(x, y):
258            return slope(x, y) + h
259
260        eta = 0.07
261        domain.set_quantity('elevation', slope)
262        domain.set_quantity('stage', stage)
263        domain.set_quantity('friction', eta)
264
265        for name in domain.conserved_quantities:
266            assert num.allclose(domain.quantities[name].explicit_update, 0)
267            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
268
269        domain.compute_forcing_terms()
270
271        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
272        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
273                            -g*h*3)
274        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
275
276        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
277        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
278                            0)
279        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
280                            0)
281
282        #Create some momentum for friction to work with
283        domain.set_quantity('xmomentum', 1)
284        S = -g*eta**2 / h**(7.0/3) * sqrt(10)
285
286        domain.compute_forcing_terms()
287        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
288        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
289                            S)
290        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
291                            0)
292
293        #A more complex example
294        domain.quantities['stage'].semi_implicit_update[:] = 0.0
295        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
296        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
297
298        domain.set_quantity('xmomentum', 3)
299        domain.set_quantity('ymomentum', 4)
300
301        S = -g*eta**2*5 / h**(7.0/3) * sqrt(10.0)
302
303        domain.compute_forcing_terms()
304
305        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
306        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
307                            3*S)
308        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
309                            4*S)
310       
311    def test_constant_wind_stress(self):
312        from anuga.config import rho_a, rho_w, eta_w
313        from math import pi, cos, sin
314
315        a = [0.0, 0.0]
316        b = [0.0, 2.0]
317        c = [2.0, 0.0]
318        d = [0.0, 4.0]
319        e = [2.0, 2.0]
320        f = [4.0, 0.0]
321
322        points = [a, b, c, d, e, f]
323        #             bac,     bce,     ecf,     dbe
324        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
325
326        domain = Domain(points, vertices)
327
328        #Flat surface with 1m of water
329        domain.set_quantity('elevation', 0)
330        domain.set_quantity('stage', 1.0)
331        domain.set_quantity('friction', 0)
332
333        Br = Reflective_boundary(domain)
334        domain.set_boundary({'exterior': Br})
335
336        #Setup only one forcing term, constant wind stress
337        s = 100
338        phi = 135
339        domain.forcing_terms = []
340        domain.forcing_terms.append(Wind_stress(s, phi))
341
342        domain.compute_forcing_terms()
343
344        const = eta_w*rho_a / rho_w
345
346        #Convert to radians
347        phi = phi*pi / 180
348
349        #Compute velocity vector (u, v)
350        u = s*cos(phi)
351        v = s*sin(phi)
352
353        #Compute wind stress
354        S = const * num.sqrt(u**2 + v**2)
355
356        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
357        assert num.allclose(domain.quantities['xmomentum'].explicit_update, S*u)
358        assert num.allclose(domain.quantities['ymomentum'].explicit_update, S*v)
359
360    def test_variable_wind_stress(self):
361        from anuga.config import rho_a, rho_w, eta_w
362        from math import pi, cos, sin
363
364        a = [0.0, 0.0]
365        b = [0.0, 2.0]
366        c = [2.0, 0.0]
367        d = [0.0, 4.0]
368        e = [2.0, 2.0]
369        f = [4.0, 0.0]
370
371        points = [a, b, c, d, e, f]
372        #             bac,     bce,     ecf,     dbe
373        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
374
375        domain = Domain(points, vertices)
376
377        #Flat surface with 1m of water
378        domain.set_quantity('elevation', 0)
379        domain.set_quantity('stage', 1.0)
380        domain.set_quantity('friction', 0)
381
382        Br = Reflective_boundary(domain)
383        domain.set_boundary({'exterior': Br})
384
385        domain.time = 5.54    # Take a random time (not zero)
386
387        #Setup only one forcing term, constant wind stress
388        s = 100
389        phi = 135
390        domain.forcing_terms = []
391        domain.forcing_terms.append(Wind_stress(s=speed, phi=angle))
392
393        domain.compute_forcing_terms()
394
395        #Compute reference solution
396        const = eta_w*rho_a / rho_w
397
398        N = len(domain)    # number_of_triangles
399
400        xc = domain.get_centroid_coordinates()
401        t = domain.time
402
403        x = xc[:,0]
404        y = xc[:,1]
405        s_vec = speed(t,x,y)
406        phi_vec = angle(t,x,y)
407
408        for k in range(N):
409            # Convert to radians
410            phi = phi_vec[k]*pi / 180
411            s = s_vec[k]
412
413            # Compute velocity vector (u, v)
414            u = s*cos(phi)
415            v = s*sin(phi)
416
417            # Compute wind stress
418            S = const * num.sqrt(u**2 + v**2)
419
420            assert num.allclose(domain.quantities['stage'].explicit_update[k],
421                                0)
422            assert num.allclose(domain.quantities['xmomentum'].\
423                                     explicit_update[k],
424                                S*u)
425            assert num.allclose(domain.quantities['ymomentum'].\
426                                     explicit_update[k],
427                                S*v)
428
429    def test_windfield_from_file(self):
430        import time
431        from anuga.config import rho_a, rho_w, eta_w
432        from math import pi, cos, sin
433        from anuga.config import time_format
434        from anuga.abstract_2d_finite_volumes.util import file_function
435
436        a = [0.0, 0.0]
437        b = [0.0, 2.0]
438        c = [2.0, 0.0]
439        d = [0.0, 4.0]
440        e = [2.0, 2.0]
441        f = [4.0, 0.0]
442
443        points = [a, b, c, d, e, f]
444        #             bac,     bce,     ecf,     dbe
445        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
446
447        domain = Domain(points, vertices)
448
449        # Flat surface with 1m of water
450        domain.set_quantity('elevation', 0)
451        domain.set_quantity('stage', 1.0)
452        domain.set_quantity('friction', 0)
453
454        Br = Reflective_boundary(domain)
455        domain.set_boundary({'exterior': Br})
456
457        domain.time = 7    # Take a time that is represented in file (not zero)
458
459        # Write wind stress file (ensure that domain.time is covered)
460        # Take x=1 and y=0
461        filename = 'test_windstress_from_file'
462        start = time.mktime(time.strptime('2000', '%Y'))
463        fid = open(filename + '.txt', 'w')
464        dt = 1    # One second interval
465        t = 0.0
466        while t <= 10.0:
467            t_string = time.strftime(time_format, time.gmtime(t+start))
468
469            fid.write('%s, %f %f\n' %
470                      (t_string, speed(t,[1],[0])[0], angle(t,[1],[0])[0]))
471            t += dt
472
473        fid.close()
474
475        # Convert ASCII file to NetCDF (Which is what we really like!)
476        from data_manager import timefile2netcdf
477
478        timefile2netcdf(filename)
479        os.remove(filename + '.txt')
480
481        # Setup wind stress
482        F = file_function(filename + '.tms',
483                          quantities=['Attribute0', 'Attribute1'])
484        os.remove(filename + '.tms')
485
486        W = Wind_stress(F)
487
488        domain.forcing_terms = []
489        domain.forcing_terms.append(W)
490
491        domain.compute_forcing_terms()
492
493        # Compute reference solution
494        const = eta_w*rho_a / rho_w
495
496        N = len(domain)    # number_of_triangles
497
498        t = domain.time
499
500        s = speed(t, [1], [0])[0]
501        phi = angle(t, [1], [0])[0]
502
503        # Convert to radians
504        phi = phi*pi / 180
505
506        # Compute velocity vector (u, v)
507        u = s*cos(phi)
508        v = s*sin(phi)
509
510        # Compute wind stress
511        S = const * num.sqrt(u**2 + v**2)
512
513        for k in range(N):
514            assert num.allclose(domain.quantities['stage'].explicit_update[k],
515                                0)
516            assert num.allclose(domain.quantities['xmomentum'].\
517                                    explicit_update[k],
518                                S*u)
519            assert num.allclose(domain.quantities['ymomentum'].\
520                                    explicit_update[k],
521                                S*v)
522
523    def test_windfield_from_file_seconds(self):
524        import time
525        from anuga.config import rho_a, rho_w, eta_w
526        from math import pi, cos, sin
527        from anuga.config import time_format
528        from anuga.abstract_2d_finite_volumes.util import file_function
529
530        a = [0.0, 0.0]
531        b = [0.0, 2.0]
532        c = [2.0, 0.0]
533        d = [0.0, 4.0]
534        e = [2.0, 2.0]
535        f = [4.0, 0.0]
536
537        points = [a, b, c, d, e, f]
538        #             bac,     bce,     ecf,     dbe
539        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
540
541        domain = Domain(points, vertices)
542
543        # Flat surface with 1m of water
544        domain.set_quantity('elevation', 0)
545        domain.set_quantity('stage', 1.0)
546        domain.set_quantity('friction', 0)
547
548        Br = Reflective_boundary(domain)
549        domain.set_boundary({'exterior': Br})
550
551        domain.time = 7    # Take a time that is represented in file (not zero)
552
553        # Write wind stress file (ensure that domain.time is covered)
554        # Take x=1 and y=0
555        filename = 'test_windstress_from_file'
556        start = time.mktime(time.strptime('2000', '%Y'))
557        fid = open(filename + '.txt', 'w')
558        dt = 0.5    # Half second interval
559        t = 0.0
560        while t <= 10.0:
561            fid.write('%s, %f %f\n'
562                      % (str(t), speed(t, [1], [0])[0], angle(t, [1], [0])[0]))
563            t += dt
564
565        fid.close()
566
567        # Convert ASCII file to NetCDF (Which is what we really like!)
568        from data_manager import timefile2netcdf
569
570        timefile2netcdf(filename, time_as_seconds=True)
571        os.remove(filename + '.txt')
572
573        # Setup wind stress
574        F = file_function(filename + '.tms',
575                          quantities=['Attribute0', 'Attribute1'])
576        os.remove(filename + '.tms')
577
578        W = Wind_stress(F)
579
580        domain.forcing_terms = []
581        domain.forcing_terms.append(W)
582
583        domain.compute_forcing_terms()
584
585        # Compute reference solution
586        const = eta_w*rho_a / rho_w
587
588        N = len(domain)    # number_of_triangles
589
590        t = domain.time
591
592        s = speed(t, [1], [0])[0]
593        phi = angle(t, [1], [0])[0]
594
595        # Convert to radians
596        phi = phi*pi / 180
597
598        # Compute velocity vector (u, v)
599        u = s*cos(phi)
600        v = s*sin(phi)
601
602        # Compute wind stress
603        S = const * num.sqrt(u**2 + v**2)
604
605        for k in range(N):
606            assert num.allclose(domain.quantities['stage'].explicit_update[k],
607                                0)
608            assert num.allclose(domain.quantities['xmomentum'].\
609                                    explicit_update[k],
610                                S*u)
611            assert num.allclose(domain.quantities['ymomentum'].\
612                                    explicit_update[k],
613                                S*v)
614
615    def test_wind_stress_error_condition(self):
616        """Test that windstress reacts properly when forcing functions
617        are wrong - e.g. returns a scalar
618        """
619
620        from math import pi, cos, sin
621        from anuga.config import rho_a, rho_w, eta_w
622
623        a = [0.0, 0.0]
624        b = [0.0, 2.0]
625        c = [2.0, 0.0]
626        d = [0.0, 4.0]
627        e = [2.0, 2.0]
628        f = [4.0, 0.0]
629
630        points = [a, b, c, d, e, f]
631        #             bac,     bce,     ecf,     dbe
632        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
633
634        domain = Domain(points, vertices)
635
636        # Flat surface with 1m of water
637        domain.set_quantity('elevation', 0)
638        domain.set_quantity('stage', 1.0)
639        domain.set_quantity('friction', 0)
640
641        Br = Reflective_boundary(domain)
642        domain.set_boundary({'exterior': Br})
643
644        domain.time = 5.54    # Take a random time (not zero)
645
646        # Setup only one forcing term, bad func
647        domain.forcing_terms = []
648
649        try:
650            domain.forcing_terms.append(Wind_stress(s=scalar_func_list,
651                                                    phi=angle))
652        except AssertionError:
653            pass
654        else:
655            msg = 'Should have raised exception'
656            raise Exception, msg
657
658        try:
659            domain.forcing_terms.append(Wind_stress(s=speed, phi=scalar_func))
660        except Exception:
661            pass
662        else:
663            msg = 'Should have raised exception'
664            raise Exception, msg
665
666        try:
667            domain.forcing_terms.append(Wind_stress(s=speed, phi='xx'))
668        except:
669            pass
670        else:
671            msg = 'Should have raised exception'
672            raise Exception, msg
673
674    def test_rainfall(self):
675        from math import pi, cos, sin
676
677        a = [0.0, 0.0]
678        b = [0.0, 2.0]
679        c = [2.0, 0.0]
680        d = [0.0, 4.0]
681        e = [2.0, 2.0]
682        f = [4.0, 0.0]
683
684        points = [a, b, c, d, e, f]
685        #             bac,     bce,     ecf,     dbe
686        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
687
688        domain = Domain(points, vertices)
689
690        # Flat surface with 1m of water
691        domain.set_quantity('elevation', 0)
692        domain.set_quantity('stage', 1.0)
693        domain.set_quantity('friction', 0)
694
695        Br = Reflective_boundary(domain)
696        domain.set_boundary({'exterior': Br})
697
698        # Setup only one forcing term, constant rainfall
699        domain.forcing_terms = []
700        domain.forcing_terms.append(Rainfall(domain, rate=2.0))
701
702        domain.compute_forcing_terms()
703        assert num.allclose(domain.quantities['stage'].explicit_update,
704                            2.0/1000)
705
706    def test_rainfall_restricted_by_polygon(self):
707        from math import pi, cos, sin
708
709        a = [0.0, 0.0]
710        b = [0.0, 2.0]
711        c = [2.0, 0.0]
712        d = [0.0, 4.0]
713        e = [2.0, 2.0]
714        f = [4.0, 0.0]
715
716        points = [a, b, c, d, e, f]
717        #             bac,     bce,     ecf,     dbe
718        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
719
720        domain = Domain(points, vertices)
721
722        # Flat surface with 1m of water
723        domain.set_quantity('elevation', 0)
724        domain.set_quantity('stage', 1.0)
725        domain.set_quantity('friction', 0)
726
727        Br = Reflective_boundary(domain)
728        domain.set_boundary({'exterior': Br})
729
730        # Setup only one forcing term, constant rainfall
731        # restricted to a polygon enclosing triangle #1 (bce)
732        domain.forcing_terms = []
733        R = Rainfall(domain, rate=2.0, polygon=[[1,1], [2,1], [2,2], [1,2]])
734
735        assert num.allclose(R.exchange_area, 2)
736
737        domain.forcing_terms.append(R)
738
739        domain.compute_forcing_terms()
740
741        assert num.allclose(domain.quantities['stage'].explicit_update[1],
742                            2.0/1000)
743        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
744        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
745
746    def test_time_dependent_rainfall_restricted_by_polygon(self):
747        a = [0.0, 0.0]
748        b = [0.0, 2.0]
749        c = [2.0, 0.0]
750        d = [0.0, 4.0]
751        e = [2.0, 2.0]
752        f = [4.0, 0.0]
753
754        points = [a, b, c, d, e, f]
755        #             bac,     bce,     ecf,     dbe
756        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
757
758        domain = Domain(points, vertices)
759
760        # Flat surface with 1m of water
761        domain.set_quantity('elevation', 0)
762        domain.set_quantity('stage', 1.0)
763        domain.set_quantity('friction', 0)
764
765        Br = Reflective_boundary(domain)
766        domain.set_boundary({'exterior': Br})
767
768        # Setup only one forcing term, time dependent rainfall
769        # restricted to a polygon enclosing triangle #1 (bce)
770        domain.forcing_terms = []
771        R = Rainfall(domain,
772                     rate=lambda t: 3*t + 7,
773                     polygon = [[1,1], [2,1], [2,2], [1,2]])
774
775        assert num.allclose(R.exchange_area, 2)
776       
777        domain.forcing_terms.append(R)
778
779        domain.time = 10.
780
781        domain.compute_forcing_terms()
782
783        assert num.allclose(domain.quantities['stage'].explicit_update[1],
784                            (3*domain.time + 7)/1000)
785        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
786        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
787
788    def test_time_dependent_rainfall_using_starttime(self):
789        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
790
791        a = [0.0, 0.0]
792        b = [0.0, 2.0]
793        c = [2.0, 0.0]
794        d = [0.0, 4.0]
795        e = [2.0, 2.0]
796        f = [4.0, 0.0]
797
798        points = [a, b, c, d, e, f]
799        #             bac,     bce,     ecf,     dbe
800        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
801
802        domain = Domain(points, vertices)
803
804        # Flat surface with 1m of water
805        domain.set_quantity('elevation', 0)
806        domain.set_quantity('stage', 1.0)
807        domain.set_quantity('friction', 0)
808
809        Br = Reflective_boundary(domain)
810        domain.set_boundary({'exterior': Br})
811
812        # Setup only one forcing term, time dependent rainfall
813        # restricted to a polygon enclosing triangle #1 (bce)
814        domain.forcing_terms = []
815        R = Rainfall(domain,
816                     rate=lambda t: 3*t + 7,
817                     polygon=rainfall_poly)                     
818
819        assert num.allclose(R.exchange_area, 2)
820       
821        domain.forcing_terms.append(R)
822
823        # This will test that time used in the forcing function takes
824        # startime into account.
825        domain.starttime = 5.0
826
827        domain.time = 7.
828
829        domain.compute_forcing_terms()
830
831        assert num.allclose(domain.quantities['stage'].explicit_update[1],
832                            (3*domain.get_time() + 7)/1000)
833        assert num.allclose(domain.quantities['stage'].explicit_update[1],
834                            (3*(domain.time + domain.starttime) + 7)/1000)
835
836        # Using internal time her should fail
837        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
838                                (3*domain.time + 7)/1000)
839
840        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
841        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
842
843    def test_time_dependent_rainfall_using_georef(self):
844        """test_time_dependent_rainfall_using_georef
845
846        This will also test the General forcing term using georef
847        """
848
849        # Mesh in zone 56 (absolute coords)
850        x0 = 314036.58727982
851        y0 = 6224951.2960092
852
853        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
854        rainfall_poly += [x0, y0]
855
856        a = [0.0, 0.0]
857        b = [0.0, 2.0]
858        c = [2.0, 0.0]
859        d = [0.0, 4.0]
860        e = [2.0, 2.0]
861        f = [4.0, 0.0]
862
863        points = [a, b, c, d, e, f]
864        #             bac,     bce,     ecf,     dbe
865        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
866
867        domain = Domain(points, vertices,
868                        geo_reference=Geo_reference(56, x0, y0))
869
870        # Flat surface with 1m of water
871        domain.set_quantity('elevation', 0)
872        domain.set_quantity('stage', 1.0)
873        domain.set_quantity('friction', 0)
874
875        Br = Reflective_boundary(domain)
876        domain.set_boundary({'exterior': Br})
877
878        # Setup only one forcing term, time dependent rainfall
879        # restricted to a polygon enclosing triangle #1 (bce)
880        domain.forcing_terms = []
881        R = Rainfall(domain,
882                     rate=lambda t: 3*t + 7,
883                     polygon=rainfall_poly)
884
885        assert num.allclose(R.exchange_area, 2)
886       
887        domain.forcing_terms.append(R)
888
889        # This will test that time used in the forcing function takes
890        # startime into account.
891        domain.starttime = 5.0
892
893        domain.time = 7.
894
895        domain.compute_forcing_terms()
896
897        assert num.allclose(domain.quantities['stage'].explicit_update[1],
898                            (3*domain.get_time() + 7)/1000)
899        assert num.allclose(domain.quantities['stage'].explicit_update[1],
900                            (3*(domain.time + domain.starttime) + 7)/1000)
901
902        # Using internal time her should fail
903        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
904                                (3*domain.time + 7)/1000)
905
906        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
907        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
908
909    def test_time_dependent_rainfall_restricted_by_polygon_with_default(self):
910        """
911        Test that default rainfall can be used when given rate runs out of data.
912        """
913
914        a = [0.0, 0.0]
915        b = [0.0, 2.0]
916        c = [2.0, 0.0]
917        d = [0.0, 4.0]
918        e = [2.0, 2.0]
919        f = [4.0, 0.0]
920
921        points = [a, b, c, d, e, f]
922        #             bac,     bce,     ecf,     dbe
923        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
924
925        domain = Domain(points, vertices)
926
927        # Flat surface with 1m of water
928        domain.set_quantity('elevation', 0)
929        domain.set_quantity('stage', 1.0)
930        domain.set_quantity('friction', 0)
931
932        Br = Reflective_boundary(domain)
933        domain.set_boundary({'exterior': Br})
934
935        # Setup only one forcing term, time dependent rainfall
936        # that expires at t==20
937        from anuga.fit_interpolate.interpolate import Modeltime_too_late
938
939        def main_rate(t):
940            if t > 20:
941                msg = 'Model time exceeded.'
942                raise Modeltime_too_late, msg
943            else:
944                return 3*t + 7
945
946        domain.forcing_terms = []
947        R = Rainfall(domain,
948                     rate=main_rate,
949                     polygon = [[1,1], [2,1], [2,2], [1,2]],
950                     default_rate=5.0)
951
952        assert num.allclose(R.exchange_area, 2)
953       
954        domain.forcing_terms.append(R)
955
956        domain.time = 10.
957
958        domain.compute_forcing_terms()
959
960        assert num.allclose(domain.quantities['stage'].explicit_update[1],
961                            (3*domain.time+7)/1000)
962        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
963        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
964
965        domain.time = 100.
966        domain.quantities['stage'].explicit_update[:] = 0.0     # Reset
967        domain.compute_forcing_terms()
968
969        assert num.allclose(domain.quantities['stage'].explicit_update[1],
970                            5.0/1000) # Default value
971        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
972        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
973
974    def test_rainfall_forcing_with_evolve(self):
975        """test_rainfall_forcing_with_evolve
976
977        Test how forcing terms are called within evolve
978        """
979
980        # FIXME(Ole): This test is just to experiment
981
982        a = [0.0, 0.0]
983        b = [0.0, 2.0]
984        c = [2.0, 0.0]
985        d = [0.0, 4.0]
986        e = [2.0, 2.0]
987        f = [4.0, 0.0]
988
989        points = [a, b, c, d, e, f]
990        #             bac,     bce,     ecf,     dbe
991        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
992
993        domain = Domain(points, vertices)
994
995        # Flat surface with 1m of water
996        domain.set_quantity('elevation', 0)
997        domain.set_quantity('stage', 1.0)
998        domain.set_quantity('friction', 0)
999
1000        Br = Reflective_boundary(domain)
1001        domain.set_boundary({'exterior': Br})
1002
1003        # Setup only one forcing term, time dependent rainfall
1004        # that expires at t==20
1005        from anuga.fit_interpolate.interpolate import Modeltime_too_late
1006
1007        def main_rate(t):
1008            if t > 20:
1009                msg = 'Model time exceeded.'
1010                raise Modeltime_too_late, msg
1011            else:
1012                return 3*t + 7
1013
1014        domain.forcing_terms = []
1015        R = Rainfall(domain,
1016                     rate=main_rate,
1017                     polygon=[[1,1], [2,1], [2,2], [1,2]],
1018                     default_rate=5.0)
1019
1020        assert num.allclose(R.exchange_area, 2)
1021       
1022        domain.forcing_terms.append(R)
1023
1024        for t in domain.evolve(yieldstep=1, finaltime=25):
1025            pass
1026            #FIXME(Ole):  A test here is hard because explicit_update also
1027            # receives updates from the flux calculation.
1028
1029
1030
1031    def test_inflow_using_circle(self):
1032        from math import pi, cos, sin
1033
1034        a = [0.0, 0.0]
1035        b = [0.0, 2.0]
1036        c = [2.0, 0.0]
1037        d = [0.0, 4.0]
1038        e = [2.0, 2.0]
1039        f = [4.0, 0.0]
1040
1041        points = [a, b, c, d, e, f]
1042        #             bac,     bce,     ecf,     dbe
1043        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1044
1045        domain = Domain(points, vertices)
1046
1047        # Flat surface with 1m of water
1048        domain.set_quantity('elevation', 0)
1049        domain.set_quantity('stage', 1.0)
1050        domain.set_quantity('friction', 0)
1051
1052        Br = Reflective_boundary(domain)
1053        domain.set_boundary({'exterior': Br})
1054
1055        # Setup only one forcing term, constant inflow of 2 m^3/s
1056        # on a circle affecting triangles #0 and #1 (bac and bce)
1057        domain.forcing_terms = []
1058       
1059        I = Inflow(domain, rate=2.0, center=(1,1), radius=1)
1060        domain.forcing_terms.append(I)       
1061        domain.compute_forcing_terms()
1062
1063       
1064        A = I.exchange_area
1065        assert num.allclose(A, 4) # Two triangles       
1066       
1067        assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A)
1068        assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A)
1069        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)       
1070
1071
1072    def test_inflow_using_circle_function(self):
1073        from math import pi, cos, sin
1074
1075        a = [0.0, 0.0]
1076        b = [0.0, 2.0]
1077        c = [2.0, 0.0]
1078        d = [0.0, 4.0]
1079        e = [2.0, 2.0]
1080        f = [4.0, 0.0]
1081
1082        points = [a, b, c, d, e, f]
1083        #             bac,     bce,     ecf,     dbe
1084        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1085
1086        domain = Domain(points, vertices)
1087
1088        # Flat surface with 1m of water
1089        domain.set_quantity('elevation', 0)
1090        domain.set_quantity('stage', 1.0)
1091        domain.set_quantity('friction', 0)
1092
1093        Br = Reflective_boundary(domain)
1094        domain.set_boundary({'exterior': Br})
1095
1096        # Setup only one forcing term, time dependent inflow of 2 m^3/s
1097        # on a circle affecting triangles #0 and #1 (bac and bce)
1098        domain.forcing_terms = []
1099        I = Inflow(domain, rate=lambda t: 2., center=(1,1), radius=1)
1100        domain.forcing_terms.append(I)
1101       
1102        domain.compute_forcing_terms()       
1103
1104        A = I.exchange_area
1105        assert num.allclose(A, 4) # Two triangles
1106       
1107        assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A)
1108        assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A)
1109        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)       
1110       
1111
1112
1113
1114    def test_inflow_catch_too_few_triangles(self):
1115        """
1116        Test that exception is thrown if no triangles are covered
1117        by the inflow area
1118        """
1119
1120        from math import pi, cos, sin
1121
1122        a = [0.0, 0.0]
1123        b = [0.0, 2.0]
1124        c = [2.0, 0.0]
1125        d = [0.0, 4.0]
1126        e = [2.0, 2.0]
1127        f = [4.0, 0.0]
1128
1129        points = [a, b, c, d, e, f]
1130        #             bac,     bce,     ecf,     dbe
1131        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1132
1133        domain = Domain(points, vertices)
1134
1135        # Flat surface with 1m of water
1136        domain.set_quantity('elevation', 0)
1137        domain.set_quantity('stage', 1.0)
1138        domain.set_quantity('friction', 0)
1139
1140        Br = Reflective_boundary(domain)
1141        domain.set_boundary({'exterior': Br})
1142
1143        # Setup only one forcing term, constant inflow of 2 m^3/s
1144        # on a circle affecting triangles #0 and #1 (bac and bce)
1145        try:
1146            Inflow(domain, rate=2.0, center=(1,1.1), radius=0.01)
1147        except:
1148            pass
1149        else:
1150            msg = 'Should have raised exception'
1151            raise Exception, msg
1152
1153    def Xtest_inflow_outflow_conservation(self):
1154        """
1155        Test what happens if water is abstracted from one area and
1156        injected into another - especially if there is not enough
1157        water to match the abstraction.
1158        This tests that the total volume is kept constant under a range of
1159        scenarios.
1160
1161        This test will fail as the problem was only fixed for culverts.
1162        """
1163
1164        from math import pi, cos, sin
1165
1166        length = 20.
1167        width = 10.
1168
1169        dx = dy = 2    # 1 or 2 OK
1170        points, vertices, boundary = rectangular_cross(int(length/dx),
1171                                                       int(width/dy),
1172                                                       len1=length,
1173                                                       len2=width)
1174        domain = Domain(points, vertices, boundary)
1175        domain.set_name('test_inflow_conservation')    # Output name
1176        domain.set_default_order(2)
1177
1178        # Flat surface with 1m of water
1179        stage = 1.0
1180        domain.set_quantity('elevation', 0)
1181        domain.set_quantity('stage', stage)
1182        domain.set_quantity('friction', 0)
1183
1184        Br = Reflective_boundary(domain)
1185        domain.set_boundary({'left': Br, 'right': Br, 'bottom': Br, 'top': Br})
1186
1187        # Setup one forcing term, constant inflow of 2 m^3/s on a circle
1188        domain.forcing_terms = []
1189        domain.forcing_terms.append(Inflow(domain, rate=2.0,
1190                                           center=(5,5), radius=1))
1191
1192        domain.compute_forcing_terms()
1193
1194        # Check that update values are correct
1195        for x in domain.quantities['stage'].explicit_update:
1196            assert num.allclose(x, 2.0/pi) or num.allclose(x, 0.0)
1197
1198        # Check volumes without inflow
1199        domain.forcing_terms = []
1200        initial_volume = domain.quantities['stage'].get_integral()
1201
1202        assert num.allclose(initial_volume, width*length*stage)
1203
1204        for t in domain.evolve(yieldstep = 0.05, finaltime = 5.0):
1205            volume = domain.quantities['stage'].get_integral()
1206            assert num.allclose(volume, initial_volume)
1207
1208        # Now apply the inflow and check volumes for a range of stage values
1209        for stage in [2.0, 1.0, 0.5, 0.25, 0.1, 0.0]:
1210            domain.time = 0.0
1211            domain.set_quantity('stage', stage)
1212            domain.forcing_terms = []
1213            domain.forcing_terms.append(Inflow(domain, rate=2.0,
1214                                               center=(5,5), radius=1))
1215            initial_volume = domain.quantities['stage'].get_integral()
1216            predicted_volume = initial_volume
1217            dt = 0.05
1218            for t in domain.evolve(yieldstep=dt, finaltime=5.0):
1219                volume = domain.quantities['stage'].get_integral()
1220                assert num.allclose (volume, predicted_volume)
1221                predicted_volume = predicted_volume + 2.0/pi/100/dt # Why 100?
1222
1223        # Apply equivalent outflow only and check volumes
1224        # for a range of stage values
1225        for stage in [2.0, 1.0, 0.5, 0.25, 0.1, 0.0]:
1226            print stage
1227
1228            domain.time = 0.0
1229            domain.set_quantity('stage', stage)
1230            domain.forcing_terms = []
1231            domain.forcing_terms.append(Inflow(domain, rate=-2.0,
1232                                               center=(15,5), radius=1))
1233            initial_volume = domain.quantities['stage'].get_integral()
1234            predicted_volume = initial_volume
1235            dt = 0.05
1236            for t in domain.evolve(yieldstep=dt, finaltime=5.0):
1237                volume = domain.quantities['stage'].get_integral()
1238                print t, volume, predicted_volume
1239                assert num.allclose (volume, predicted_volume)
1240                predicted_volume = predicted_volume - 2.0/pi/100/dt # Why 100?
1241
1242        # Apply both inflow and outflow and check volumes being constant for a
1243        # range of stage values
1244        for stage in [2.0, 1.0, 0.5, 0.25, 0.1, 0.0]:
1245            print stage
1246
1247            domain.time = 0.0
1248            domain.set_quantity('stage', stage)
1249            domain.forcing_terms = []
1250            domain.forcing_terms.append(Inflow(domain, rate=2.0,
1251                                               center=(5,5), radius=1))
1252            domain.forcing_terms.append(Inflow(domain, rate=-2.0,
1253                                               center=(15,5), radius=1))
1254            initial_volume = domain.quantities['stage'].get_integral()
1255
1256            dt = 0.05
1257            for t in domain.evolve(yieldstep=dt, finaltime=5.0):
1258                volume = domain.quantities['stage'].get_integral()
1259
1260                print t, volume
1261                assert num.allclose(volume, initial_volume)
1262
1263    #####################################################
1264
1265       
1266    def test_inflow_using_flowline(self):
1267        """test_inflow_using_flowline
1268
1269        Test the ability of a flowline to match inflow above the flowline by
1270        creating constant inflow onto a circle at the head of a 20m
1271        wide by 300m long plane dipping at various slopes with a
1272        perpendicular flowline and gauge downstream of the inflow and
1273        a 45 degree flowlines at 200m downstream.
1274
1275        A more substantial version of this test with finer resolution and
1276        including the depth calculation using Manning's equation is
1277        available under the validate_all suite in the directory
1278        anuga_validation/automated_validation_tests/flow_tests.
1279        """
1280
1281       
1282        verbose = False
1283       
1284
1285        #----------------------------------------------------------------------
1286        # Import necessary modules
1287        #----------------------------------------------------------------------
1288
1289        from anuga.abstract_2d_finite_volumes.mesh_factory \
1290                import rectangular_cross
1291        from anuga.shallow_water import Domain
1292        from anuga.shallow_water.shallow_water_domain import Reflective_boundary
1293        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
1294        from anuga.shallow_water.shallow_water_domain import Inflow
1295        from anuga.shallow_water.data_manager \
1296                import get_flow_through_cross_section
1297        from anuga.abstract_2d_finite_volumes.util \
1298                import sww2csv_gauges, csv2timeseries_graphs
1299
1300        #----------------------------------------------------------------------
1301        # Setup computational domain
1302        #----------------------------------------------------------------------
1303        number_of_inflows = 2 # Number of inflows on top of each other
1304        finaltime = 500 #700.0 # If this is too short, steady state will not be achieved
1305
1306        length = 250.
1307        width  = 20.
1308        dx = dy = 5                 # Resolution: of grid on both axes
1309
1310        points, vertices, boundary = rectangular_cross(int(length/dx),
1311                                                       int(width/dy),
1312                                                       len1=length,
1313                                                       len2=width)
1314
1315        for mannings_n in [0.1, 0.01]:
1316            # Loop over a range of roughnesses             
1317           
1318            for slope in [1.0/300, 1.0/100]:
1319                # Loop over a range of bedslopes representing
1320                # sub to super critical flows
1321
1322
1323                domain = Domain(points, vertices, boundary)   
1324                domain.set_name('inflow_flowline_test')     # Output name
1325
1326                #--------------------------------------------------------------
1327                # Setup initial conditions
1328                #--------------------------------------------------------------
1329
1330                def topography(x, y):
1331                    z = -x * slope
1332                    return z
1333
1334                # Use function for elevation
1335                domain.set_quantity('elevation', topography)
1336                # Constant friction of conc surface   
1337                domain.set_quantity('friction', mannings_n)
1338                # Dry initial condition
1339                domain.set_quantity('stage', expression='elevation')
1340
1341                #--------------------------------------------------------------
1342                # Setup Inflow
1343                #--------------------------------------------------------------
1344
1345                # Fixed Flowrate onto Area
1346                fixed_inflow = Inflow(domain,
1347                                      center=(10.0, 10.0),
1348                                      radius=5.00,
1349                                      rate=10.00)   
1350
1351                # Stack this flow
1352                for i in range(number_of_inflows):
1353                    domain.forcing_terms.append(fixed_inflow)
1354               
1355                ref_flow = fixed_inflow.rate*number_of_inflows
1356
1357                # Compute normal depth on plane using Mannings equation
1358                # v=1/n*(r^2/3)*(s^0.5) or r=(Q*n/(s^0.5*W))^0.6
1359                normal_depth=(ref_flow*mannings_n/(slope**0.5*width))**0.6
1360                if verbose:
1361                    print
1362                    print 'Slope:', slope, 'Mannings n:', mannings_n
1363                   
1364                   
1365                #--------------------------------------------------------------
1366                # Setup boundary conditions
1367                #--------------------------------------------------------------
1368
1369                Br = Reflective_boundary(domain)
1370               
1371                # Define downstream boundary based on predicted depth
1372                def normal_depth_stage_downstream(t):
1373                    return (-slope*length) + normal_depth
1374               
1375                Bt = Transmissive_momentum_set_stage_boundary(domain=domain,
1376                                                              function=normal_depth_stage_downstream)
1377               
1378
1379               
1380
1381                domain.set_boundary({'left': Br,
1382                                     'right': Bt,
1383                                     'top': Br,
1384                                     'bottom': Br})
1385
1386
1387
1388                #--------------------------------------------------------------
1389                # Evolve system through time
1390                #--------------------------------------------------------------
1391
1392                for t in domain.evolve(yieldstep=100.0, finaltime=finaltime):
1393                    pass
1394                    #if verbose :
1395                    #    print domain.timestepping_statistics()
1396
1397                    #    print domain.volumetric_balance_statistics()                                   
1398
1399
1400                #--------------------------------------------------------------
1401                # Compute flow thru flowlines ds of inflow
1402                #--------------------------------------------------------------
1403                   
1404                # Square on flowline at 200m
1405                q = domain.get_flow_through_cross_section([[200.0,  0.0],
1406                                                           [200.0, 20.0]])
1407                if verbose:
1408                    print ('90 degree flowline: ANUGA = %f, Ref = %f'
1409                           % (q, ref_flow))
1410
1411                msg = ('Predicted flow was %f, should have been %f'
1412                       % (q, ref_flow))
1413                assert num.allclose(q, ref_flow, rtol=1.0e-2), msg         
1414
1415                           
1416                # 45 degree flowline at 200m
1417                q = domain.get_flow_through_cross_section([[200.0,  0.0],
1418                                                           [220.0, 20.0]])
1419                if verbose:
1420                    print ('45 degree flowline: ANUGA = %f, Ref = %f'
1421                           % (q, ref_flow))
1422                   
1423                msg = ('Predicted flow was %f, should have been %f'
1424                       % (q, ref_flow))
1425                assert num.allclose(q, ref_flow, rtol=1.0e-2), msg         
1426
1427        os.remove('inflow_flowline_test.sww')
1428
1429       
1430    def Xtest_inflow_boundary_using_flowline(self):
1431        """test_inflow_boundary_using_flowline
1432        Test the ability of a flowline to match inflow above the flowline by
1433        creating constant inflow into the boundary at the head of a 20m
1434        wide by 300m long plane dipping at various slopes with a
1435        perpendicular flowline and gauge downstream of the inflow and
1436        a 45 degree flowlines at 200m downstream
1437       
1438       
1439        """
1440
1441        # FIXME (Ole): Work in progress
1442       
1443        verbose = False
1444       
1445
1446        #----------------------------------------------------------------------
1447        # Import necessary modules
1448        #----------------------------------------------------------------------
1449        from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
1450        from anuga.shallow_water import Domain
1451        from anuga.shallow_water.shallow_water_domain import Reflective_boundary
1452        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
1453        from anuga.shallow_water.shallow_water_domain import Inflow_boundary
1454        from anuga.shallow_water.data_manager import get_flow_through_cross_section
1455        from anuga.abstract_2d_finite_volumes.util import sww2csv_gauges, csv2timeseries_graphs
1456
1457
1458        #----------------------------------------------------------------------
1459        # Setup computational domain
1460        #----------------------------------------------------------------------
1461
1462        finaltime = 500 #700.0 # If this is too short, steady state will not be achieved
1463
1464        length = 250.
1465        width  = 20.
1466        dx = dy = 5          # Resolution: of grid on both axes
1467
1468        points, vertices, boundary = rectangular_cross(int(length/dx), int(width/dy),
1469                                                       len1=length, len2=width)
1470
1471        for mannings_n in [0.1, 0.01]:
1472            # Loop over a range of roughnesses             
1473           
1474            for slope in [1.0/300, 1.0/100]:
1475                # Loop over a range of bedslopes representing sub to super critical flows
1476
1477
1478                domain = Domain(points, vertices, boundary)   
1479                domain.set_name('inflow_boundary_flowline_test')
1480               
1481
1482                #-------------------------------------------------------------
1483                # Setup initial conditions
1484                #-------------------------------------------------------------
1485
1486                def topography(x, y):
1487                    z=-x * slope
1488                    return z
1489
1490                domain.set_quantity('elevation', topography)
1491                domain.set_quantity('friction', mannings_n)
1492                domain.set_quantity('stage',
1493                                    expression='elevation')
1494
1495
1496                   
1497                #--------------------------------------------------------------
1498                # Setup boundary conditions
1499                #--------------------------------------------------------------
1500               
1501
1502
1503                ref_flow = 10.00
1504
1505                # Compute normal depth on plane using Mannings equation
1506                # v=1/n*(r^2/3)*(s^0.5) or r=(Q*n/(s^0.5*W))^0.6
1507                normal_depth=(ref_flow*mannings_n/(slope**0.5*width))**0.6
1508                if verbose:
1509                    print
1510                    print 'Slope:', slope, 'Mannings n:', mannings_n
1511                   
1512               
1513               
1514                Bi = Inflow_boundary(domain, rate=ref_flow)
1515
1516                Br = Reflective_boundary(domain)
1517               
1518                # Define downstream boundary based on predicted depth
1519                def normal_depth_stage_downstream(t):
1520                    return (-slope*length) + normal_depth
1521               
1522                Bt = Transmissive_momentum_set_stage_boundary(domain=domain,
1523                                                              function=normal_depth_stage_downstream)
1524               
1525
1526               
1527
1528                domain.set_boundary({'left': Bi,
1529                                     'right': Bt,
1530                                     'top': Br,
1531                                     'bottom': Br})
1532
1533
1534
1535                #--------------------------------------------------------------
1536                # Evolve system through time
1537                #--------------------------------------------------------------
1538
1539
1540                for t in domain.evolve(yieldstep=100.0, finaltime=finaltime):
1541                    pass
1542                    #if verbose :
1543                    #    print domain.timestepping_statistics()
1544                    #    print domain.volumetric_balance_statistics()
1545                                                       
1546
1547
1548                #--------------------------------------------------------------
1549                # Compute flow thru flowlines ds of inflow
1550                #--------------------------------------------------------------
1551                   
1552                # Square on flowline at 200m
1553                q=domain.get_flow_through_cross_section([[200.0,0.0],[200.0,20.0]])
1554                msg = 'Predicted flow was %f, should have been %f' % (q, ref_flow)
1555                if verbose:
1556                    print '90 degree flowline: ANUGA = %f, Ref = %f' % (q, ref_flow)
1557                assert num.allclose(q, ref_flow, rtol=1.0e-2), msg         
1558
1559                           
1560                # 45 degree flowline at 200m
1561                q=domain.get_flow_through_cross_section([[200.0,0.0],[220.0,20.0]])
1562                msg = 'Predicted flow was %f, should have been %f' % (q, ref_flow)
1563                if verbose:
1564                    print '45 degree flowline: ANUGA = %f, Ref = %f' % (q, ref_flow)
1565                   
1566                assert num.allclose(q, ref_flow, rtol=1.0e-2), msg         
1567
1568       
1569       
1570    def Xtest_friction_dependent_flow_using_flowline(self):
1571        """test_friction_dependent_flow_using_flowline
1572       
1573        Test the internal flow (using flowline) as a function of
1574        different values of Mannings n and different slopes.
1575       
1576        Flow is applied in the form of boundary conditions with fixed momentum.
1577        """
1578
1579        verbose = True
1580
1581        #----------------------------------------------------------------------
1582        # Import necessary modules
1583        #----------------------------------------------------------------------
1584
1585        from anuga.abstract_2d_finite_volumes.mesh_factory \
1586                import rectangular_cross
1587        from anuga.shallow_water import Domain
1588        from anuga.shallow_water.shallow_water_domain import Reflective_boundary
1589        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
1590        from anuga.shallow_water.shallow_water_domain import Inflow
1591        from anuga.shallow_water.data_manager \
1592                import get_flow_through_cross_section
1593        from anuga.abstract_2d_finite_volumes.util \
1594                import sww2csv_gauges, csv2timeseries_graphs
1595
1596
1597        #----------------------------------------------------------------------
1598        # Setup computational domain
1599        #----------------------------------------------------------------------
1600
1601        finaltime = 1000.0
1602
1603        length = 300.
1604        width  = 20.
1605        dx = dy = 5       # Resolution: of grid on both axes
1606       
1607        # Input parameters
1608        uh = 1.0
1609        vh = 0.0
1610        d = 1.0
1611       
1612        ref_flow = uh*d*width # 20 m^3/s in the x direction across entire domain
1613
1614        points, vertices, boundary = rectangular_cross(int(length/dx),
1615                                                       int(width/dy),
1616                                                       len1=length,
1617                                                       len2=width)
1618
1619        for mannings_n in [0.035]:          #[0.0, 0.012, 0.035]:
1620            for slope in [1.0/300]:         #[0.0, 1.0/300, 1.0/150]:
1621                # Loop over a range of bedslopes representing
1622                # sub to super critical flows
1623                if verbose:
1624                    print
1625                    print 'Slope:', slope, 'Mannings n:', mannings_n
1626                domain = Domain(points, vertices, boundary)   
1627                domain.set_name('Inflow_flowline_test')     # Output name
1628
1629                #--------------------------------------------------------------
1630                # Setup initial conditions
1631                #--------------------------------------------------------------
1632
1633                def topography(x, y):
1634                    z = -x * slope
1635                    return z
1636
1637                # Use function for elevation
1638                domain.set_quantity('elevation', topography)
1639                # Constant friction
1640                domain.set_quantity('friction', mannings_n)
1641               
1642                #domain.set_quantity('stage', expression='elevation')
1643                     
1644                # Set initial flow as depth=1m, uh=1.0 m/s, vh = 0.0
1645                # making it 20 m^3/s across entire domain
1646                domain.set_quantity('stage', expression='elevation + %f' % d)
1647                domain.set_quantity('xmomentum', uh)
1648                domain.set_quantity('ymomentum', vh)               
1649
1650                #--------------------------------------------------------------
1651                # Setup boundary conditions
1652                #--------------------------------------------------------------
1653
1654                Br = Reflective_boundary(domain)      # Solid reflective wall
1655               
1656                # Constant flow in and out of domain
1657                # Depth = 1m, uh=1 m/s, i.e. a flow of 20 m^3/s
1658                # across boundaries
1659                Bi = Dirichlet_boundary([d, uh, vh]) 
1660                Bo = Dirichlet_boundary([-length*slope+d, uh, vh])
1661                #Bo = Dirichlet_boundary([-100, 0, 0])
1662
1663                domain.set_boundary({'left': Bi, 'right': Bo,
1664                                     'top': Br,  'bottom': Br})
1665
1666                #--------------------------------------------------------------
1667                # Evolve system through time
1668                #--------------------------------------------------------------
1669
1670                for t in domain.evolve(yieldstep=100.0, finaltime=finaltime):
1671                    if verbose :
1672                        print domain.timestepping_statistics()
1673                        print domain.volumetric_balance_statistics()
1674
1675                # 90 degree flowline at 200m
1676                q = domain.get_flow_through_cross_section([[200.0,  0.0],
1677                                                           [200.0, 20.0]])
1678                msg = ('Predicted flow was %f, should have been %f'
1679                       % (q, ref_flow))
1680                if verbose:
1681                    print ('90 degree flowline: ANUGA = %f, Ref = %f'
1682                           % (q, ref_flow))
1683
1684                # 45 degree flowline at 200m
1685                q = domain.get_flow_through_cross_section([[200.0,  0.0],
1686                                                           [220.0, 20.0]])
1687                msg = ('Predicted flow was %f, should have been %f'
1688                       % (q, ref_flow))
1689                if verbose:
1690                    print ('45 degree flowline: ANUGA = %f, Ref = %f'
1691                           % (q, ref_flow))
1692
1693                # Stage recorder (gauge) in middle of plane at 200m
1694                x = 200.0
1695                y = 10.00               
1696                w = domain.get_quantity('stage').\
1697                        get_values(interpolation_points=[[x, y]])[0]
1698                z = domain.get_quantity('elevation').\
1699                        get_values(interpolation_points=[[x, y]])[0]
1700                domain_depth = w-z
1701
1702                xmom = domain.get_quantity('xmomentum').\
1703                        get_values(interpolation_points=[[x, y]])[0]
1704                ymom = domain.get_quantity('ymomentum').\
1705                        get_values(interpolation_points=[[x, y]])[0]           
1706                if verbose:                   
1707                    print ('At interpolation point (h, uh, vh): ',
1708                           domain_depth, xmom, ymom)
1709                    print 'uh * d * width = ', xmom*domain_depth*width
1710                               
1711                if slope > 0.0:
1712                    # Compute normal depth at gauge location using Manning eqn
1713                    # v=1/n*(r^2/3)*(s^0.5) or r=(Q*n/(s^0.5*W))^0.6
1714                    normal_depth = (ref_flow*mannings_n/(slope**0.5*width))**0.6
1715                    if verbose:
1716                        print ('Depth: ANUGA = %f, Mannings = %f'
1717                               % (domain_depth, normal_depth))
1718
1719        os.remove('Inflow_flowline_test.sww')
1720
1721
1722#################################################################################
1723
1724if __name__ == "__main__":
1725    suite = unittest.makeSuite(Test_swb_clean, 'test')
1726    runner = unittest.TextTestRunner(verbosity=1)
1727    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.