source: trunk/anuga_core/source/anuga/shallow_water_balanced/test_swb_forcing_terms.py @ 7866

Last change on this file since 7866 was 7866, checked in by hudson, 12 years ago

More swb tests passing. Cleaned up some pylint errors.

File size: 61.4 KB
Line 
1#!/usr/bin/env python
2
3import unittest, os
4import os.path
5from math import pi, sqrt
6import tempfile
7import anuga
8
9from anuga.config import g, epsilon
10from anuga.config import netcdf_mode_r, netcdf_mode_w, netcdf_mode_a
11from anuga.utilities.numerical_tools import mean
12from anuga.geometry.polygon import is_inside_polygon
13from anuga.coordinate_transforms.geo_reference import Geo_reference
14from anuga.abstract_2d_finite_volumes.quantity import Quantity
15from anuga.geospatial_data.geospatial_data import Geospatial_data
16from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
17
18from anuga.utilities.system_tools import get_pathname_from_package
19from swb_domain import *
20
21import numpy as num
22
23# Get gateway to C implementation of flux function for direct testing
24from shallow_water_ext import flux_function_central as flux_function
25
26
27# Variable windfield implemented using functions
28def speed(t, x, y):
29    """Large speeds halfway between center and edges
30
31    Low speeds at center and edges
32    """
33
34    from math import exp, cos, pi
35
36    x = num.array(x)
37    y = num.array(y)
38
39    N = len(x)
40    s = 0*#New array
41
42    for k in range(N):
43        r = num.sqrt(x[k]**2 + y[k]**2)
44        factor = exp(-(r-0.15)**2)
45        s[k] = 4000 * factor * (cos(t*2*pi/150) + 2)
46
47    return s
48
49def scalar_func(t, x, y):
50    """Function that returns a scalar.
51
52    Used to test error message when numeric array is expected
53    """
54
55    return 17.7
56
57def scalar_func_list(t, x, y):
58    """Function that returns a scalar.
59
60    Used to test error message when numeric array is expected
61    """
62
63    return [17.7]
64
65
66def angle(t, x, y):
67    """Rotating field
68    """
69    from math import atan, pi
70
71    x = num.array(x)
72    y = num.array(y)
73
74    N = len(x)
75    a = 0 * x    # New array
76
77    for k in range(N):
78        r = num.sqrt(x[k]**2 + y[k]**2)
79
80        angle = atan(y[k]/x[k])
81
82        if x[k] < 0:
83            angle += pi
84
85        # Take normal direction
86        angle -= pi/2
87
88        # Ensure positive radians
89        if angle < 0:
90            angle += 2*pi
91
92        a[k] = angle/pi*180
93
94    return a
95
96
97###############################################################################
98
99class Test_swb_forcing_terms(unittest.TestCase):
100    def setUp(self):
101        pass
102
103    def tearDown(self):
104        pass
105
106    def test_gravity(self):
107        #Assuming no friction
108
109        from anuga.config import g
110
111        a = [0.0, 0.0]
112        b = [0.0, 2.0]
113        c = [2.0, 0.0]
114        d = [0.0, 4.0]
115        e = [2.0, 2.0]
116        f = [4.0, 0.0]
117
118        points = [a, b, c, d, e, f]
119        #             bac,     bce,     ecf,     dbe
120        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
121
122        domain = Domain(points, vertices)
123
124        #Set up for a gradient of (3,0) at mid triangle (bce)
125        def slope(x, y):
126            return 3*x
127
128        h = 0.1
129        def stage(x, y):
130            return slope(x, y) + h
131
132        domain.set_quantity('elevation', slope)
133        domain.set_quantity('stage', stage)
134
135        for name in domain.conserved_quantities:
136            assert num.allclose(domain.quantities[name].explicit_update, 0)
137            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
138
139        domain.compute_forcing_terms()
140
141        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
142        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
143                            -g*h*3)
144        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
145
146    def test_manning_friction(self):
147        from anuga.config import g
148
149        a = [0.0, 0.0]
150        b = [0.0, 2.0]
151        c = [2.0, 0.0]
152        d = [0.0, 4.0]
153        e = [2.0, 2.0]
154        f = [4.0, 0.0]
155
156        points = [a, b, c, d, e, f]
157        #             bac,     bce,     ecf,     dbe
158        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
159
160        domain = Domain(points, vertices)
161
162        #Set up for a gradient of (3,0) at mid triangle (bce)
163        def slope(x, y):
164            return 3*x
165
166        h = 0.1
167        def stage(x, y):
168            return slope(x, y) + h
169
170        eta = 0.07
171        domain.set_quantity('elevation', slope)
172        domain.set_quantity('stage', stage)
173        domain.set_quantity('friction', eta)
174
175        for name in domain.conserved_quantities:
176            assert num.allclose(domain.quantities[name].explicit_update, 0)
177            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
178
179        domain.compute_forcing_terms()
180
181        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
182        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
183                            -g*h*3)
184        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
185
186        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
187        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
188                            0)
189        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
190                            0)
191
192        #Create some momentum for friction to work with
193        domain.set_quantity('xmomentum', 1)
194        dz = sqrt(10.0)
195        S = -g*eta**2 *dz / h**(7.0/3) 
196
197        domain.compute_forcing_terms()
198        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
199        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
200                            S)
201        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
202                            0)
203
204        #A more complex example
205        domain.quantities['stage'].semi_implicit_update[:] = 0.0
206        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
207        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
208
209        domain.set_quantity('xmomentum', 3)
210        domain.set_quantity('ymomentum', 4)
211
212        S = -g*eta**2*5*dz / h**(7.0/3) 
213
214        domain.compute_forcing_terms()
215
216        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
217        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
218                            3*S)
219        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
220                            4*S)
221
222
223
224    def test_manning_friction_old(self):
225        from anuga.config import g
226
227        a = [0.0, 0.0]
228        b = [0.0, 2.0]
229        c = [2.0, 0.0]
230        d = [0.0, 4.0]
231        e = [2.0, 2.0]
232        f = [4.0, 0.0]
233
234        points = [a, b, c, d, e, f]
235        #             bac,     bce,     ecf,     dbe
236        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
237
238        domain = Domain(points, vertices)
239
240        #Turn old mannings function on
241        domain.set_new_mannings_function(False)
242
243        #Set up for a gradient of (3,0) at mid triangle (bce)
244        def slope(x, y):
245            return 3*x
246
247        h = 0.1
248        def stage(x, y):
249            return slope(x, y) + h
250
251        eta = 0.07
252        domain.set_quantity('elevation', slope)
253        domain.set_quantity('stage', stage)
254        domain.set_quantity('friction', eta)
255
256        for name in domain.conserved_quantities:
257            assert num.allclose(domain.quantities[name].explicit_update, 0)
258            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
259
260        domain.compute_forcing_terms()
261
262        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
263        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
264                            -g*h*3)
265        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
266
267        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
268        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
269                            0)
270        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
271                            0)
272
273        #Create some momentum for friction to work with
274        domain.set_quantity('xmomentum', 1)
275        S = -g*eta**2 / h**(7.0/3)
276
277        domain.compute_forcing_terms()
278        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
279        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
280                            S)
281        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
282                            0)
283
284        #A more complex example
285        domain.quantities['stage'].semi_implicit_update[:] = 0.0
286        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
287        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
288
289        domain.set_quantity('xmomentum', 3)
290        domain.set_quantity('ymomentum', 4)
291
292        S = -g*eta**2*5 / h**(7.0/3)
293
294        domain.compute_forcing_terms()
295
296        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
297        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
298                            3*S)
299        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
300                            4*S)
301
302
303    def test_manning_friction_new(self):
304        from anuga.config import g
305
306        a = [0.0, 0.0]
307        b = [0.0, 2.0]
308        c = [2.0, 0.0]
309        d = [0.0, 4.0]
310        e = [2.0, 2.0]
311        f = [4.0, 0.0]
312
313        points = [a, b, c, d, e, f]
314        #             bac,     bce,     ecf,     dbe
315        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
316
317        domain = Domain(points, vertices)
318
319        # Use the new function which takes into account the extra
320        # wetted area due to slope of bed
321        domain.set_new_mannings_function(True)
322       
323        #Set up for a gradient of (3,0) at mid triangle (bce)
324        def slope(x, y):
325            return 3*x
326
327        h = 0.1
328        def stage(x, y):
329            return slope(x, y) + h
330
331        eta = 0.07
332        domain.set_quantity('elevation', slope)
333        domain.set_quantity('stage', stage)
334        domain.set_quantity('friction', eta)
335
336        for name in domain.conserved_quantities:
337            assert num.allclose(domain.quantities[name].explicit_update, 0)
338            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
339
340        domain.compute_forcing_terms()
341
342        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
343        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
344                            -g*h*3)
345        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
346
347        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
348        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
349                            0)
350        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
351                            0)
352
353        #Create some momentum for friction to work with
354        domain.set_quantity('xmomentum', 1)
355        S = -g*eta**2 / h**(7.0/3) * sqrt(10)
356
357        domain.compute_forcing_terms()
358        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
359        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
360                            S)
361        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
362                            0)
363
364        #A more complex example
365        domain.quantities['stage'].semi_implicit_update[:] = 0.0
366        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
367        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
368
369        domain.set_quantity('xmomentum', 3)
370        domain.set_quantity('ymomentum', 4)
371
372        S = -g*eta**2*5 / h**(7.0/3) * sqrt(10.0)
373
374        domain.compute_forcing_terms()
375
376        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
377        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
378                            3*S)
379        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
380                            4*S)
381       
382    def test_constant_wind_stress(self):
383        from anuga.config import rho_a, rho_w, eta_w
384        from math import pi, cos, sin
385
386        a = [0.0, 0.0]
387        b = [0.0, 2.0]
388        c = [2.0, 0.0]
389        d = [0.0, 4.0]
390        e = [2.0, 2.0]
391        f = [4.0, 0.0]
392
393        points = [a, b, c, d, e, f]
394        #             bac,     bce,     ecf,     dbe
395        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
396
397        domain = anuga.Domain(points, vertices)
398
399        #Flat surface with 1m of water
400        domain.set_quantity('elevation', 0)
401        domain.set_quantity('stage', 1.0)
402        domain.set_quantity('friction', 0)
403
404        Br = anuga.Reflective_boundary(domain)
405        domain.set_boundary({'exterior': Br})
406
407        #Setup only one forcing term, constant wind stress
408        s = 100
409        phi = 135
410        domain.forcing_terms = []
411        domain.forcing_terms.append(anuga.Wind_stress(s, phi))
412
413        domain.compute_forcing_terms()
414
415        const = eta_w*rho_a / rho_w
416
417        #Convert to radians
418        phi = phi*pi / 180
419
420        #Compute velocity vector (u, v)
421        u = s*cos(phi)
422        v = s*sin(phi)
423
424        #Compute wind stress
425        S = const * num.sqrt(u**2 + v**2)
426
427        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
428        assert num.allclose(domain.quantities['xmomentum'].explicit_update, S*u)
429        assert num.allclose(domain.quantities['ymomentum'].explicit_update, S*v)
430
431    def test_variable_wind_stress(self):
432        from anuga.config import rho_a, rho_w, eta_w
433        from math import pi, cos, sin
434
435        a = [0.0, 0.0]
436        b = [0.0, 2.0]
437        c = [2.0, 0.0]
438        d = [0.0, 4.0]
439        e = [2.0, 2.0]
440        f = [4.0, 0.0]
441
442        points = [a, b, c, d, e, f]
443        #             bac,     bce,     ecf,     dbe
444        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
445
446        domain = Domain(points, vertices)
447
448        #Flat surface with 1m of water
449        domain.set_quantity('elevation', 0)
450        domain.set_quantity('stage', 1.0)
451        domain.set_quantity('friction', 0)
452
453        Br = anuga.Reflective_boundary(domain)
454        domain.set_boundary({'exterior': Br})
455
456        domain.time = 5.54    # Take a random time (not zero)
457
458        #Setup only one forcing term, constant wind stress
459        s = 100
460        phi = 135
461        domain.forcing_terms = []
462        domain.forcing_terms.append(anuga.Wind_stress(s=speed, phi=angle))
463
464        domain.compute_forcing_terms()
465
466        #Compute reference solution
467        const = eta_w*rho_a / rho_w
468
469        N = len(domain)    # number_of_triangles
470
471        xc = domain.get_centroid_coordinates()
472        t = domain.time
473
474        x = xc[:,0]
475        y = xc[:,1]
476        s_vec = speed(t,x,y)
477        phi_vec = angle(t,x,y)
478
479        for k in range(N):
480            # Convert to radians
481            phi = phi_vec[k]*pi / 180
482            s = s_vec[k]
483
484            # Compute velocity vector (u, v)
485            u = s*cos(phi)
486            v = s*sin(phi)
487
488            # Compute wind stress
489            S = const * num.sqrt(u**2 + v**2)
490
491            assert num.allclose(domain.quantities['stage'].explicit_update[k],
492                                0)
493            assert num.allclose(domain.quantities['xmomentum'].\
494                                     explicit_update[k],
495                                S*u)
496            assert num.allclose(domain.quantities['ymomentum'].\
497                                     explicit_update[k],
498                                S*v)
499
500    def test_windfield_from_file(self):
501        import time
502        from anuga.config import rho_a, rho_w, eta_w
503        from math import pi, cos, sin
504        from anuga.config import time_format
505        from anuga.abstract_2d_finite_volumes.util import file_function
506
507        a = [0.0, 0.0]
508        b = [0.0, 2.0]
509        c = [2.0, 0.0]
510        d = [0.0, 4.0]
511        e = [2.0, 2.0]
512        f = [4.0, 0.0]
513
514        points = [a, b, c, d, e, f]
515        #             bac,     bce,     ecf,     dbe
516        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
517
518        domain = Domain(points, vertices)
519
520        # Flat surface with 1m of water
521        domain.set_quantity('elevation', 0)
522        domain.set_quantity('stage', 1.0)
523        domain.set_quantity('friction', 0)
524
525        Br = anuga.Reflective_boundary(domain)
526        domain.set_boundary({'exterior': Br})
527
528        domain.time = 7    # Take a time that is represented in file (not zero)
529
530        # Write wind stress file (ensure that domain.time is covered)
531        # Take x=1 and y=0
532        filename = 'test_windstress_from_file'
533        start = time.mktime(time.strptime('2000', '%Y'))
534        fid = open(filename + '.txt', 'w')
535        dt = 1    # One second interval
536        t = 0.0
537        while t <= 10.0:
538            t_string = time.strftime(time_format, time.gmtime(t+start))
539
540            fid.write('%s, %f %f\n' %
541                      (t_string, speed(t,[1],[0])[0], angle(t,[1],[0])[0]))
542            t += dt
543
544        fid.close()
545
546        anuga.timefile2netcdf(filename+'.txt', filename+'.tms')
547        os.remove(filename + '.txt')
548
549        # Setup wind stress
550        F = file_function(filename + '.tms',
551                          quantities=['Attribute0', 'Attribute1'])
552        os.remove(filename + '.tms')
553
554        W = Wind_stress(F)
555
556        domain.forcing_terms = []
557        domain.forcing_terms.append(W)
558
559        domain.compute_forcing_terms()
560
561        # Compute reference solution
562        const = eta_w*rho_a / rho_w
563
564        N = len(domain)    # number_of_triangles
565
566        t = domain.time
567
568        s = speed(t, [1], [0])[0]
569        phi = angle(t, [1], [0])[0]
570
571        # Convert to radians
572        phi = phi*pi / 180
573
574        # Compute velocity vector (u, v)
575        u = s*cos(phi)
576        v = s*sin(phi)
577
578        # Compute wind stress
579        S = const * num.sqrt(u**2 + v**2)
580
581        for k in range(N):
582            assert num.allclose(domain.quantities['stage'].explicit_update[k],
583                                0)
584            assert num.allclose(domain.quantities['xmomentum'].\
585                                    explicit_update[k],
586                                S*u)
587            assert num.allclose(domain.quantities['ymomentum'].\
588                                    explicit_update[k],
589                                S*v)
590
591    def test_windfield_from_file_seconds(self):
592        import time
593        from anuga.config import rho_a, rho_w, eta_w
594        from math import pi, cos, sin
595        from anuga.config import time_format
596        from anuga.abstract_2d_finite_volumes.util import file_function
597
598        a = [0.0, 0.0]
599        b = [0.0, 2.0]
600        c = [2.0, 0.0]
601        d = [0.0, 4.0]
602        e = [2.0, 2.0]
603        f = [4.0, 0.0]
604
605        points = [a, b, c, d, e, f]
606        #             bac,     bce,     ecf,     dbe
607        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
608
609        domain = Domain(points, vertices)
610
611        # Flat surface with 1m of water
612        domain.set_quantity('elevation', 0)
613        domain.set_quantity('stage', 1.0)
614        domain.set_quantity('friction', 0)
615
616        Br = anuga.Reflective_boundary(domain)
617        domain.set_boundary({'exterior': Br})
618
619        domain.time = 7    # Take a time that is represented in file (not zero)
620
621        # Write wind stress file (ensure that domain.time is covered)
622        # Take x=1 and y=0
623        filename = 'test_windstress_from_file.txt'
624        file_out = 'test_windstress_from_file.tms'
625        start = time.mktime(time.strptime('2000', '%Y'))
626        fid = open(filename, 'w')
627        dt = 0.5    # Half second interval
628        t = 0.0
629        while t <= 10.0:
630            fid.write('%s, %f %f\n'
631                      % (str(t), speed(t, [1], [0])[0], angle(t, [1], [0])[0]))
632            t += dt
633
634        fid.close()
635
636        anuga.timefile2netcdf(filename, file_out, time_as_seconds=True)
637        os.remove(filename)
638
639        # Setup wind stress
640        F = file_function(file_out,
641                          quantities=['Attribute0', 'Attribute1'])
642        os.remove(file_out)
643
644        W = anuga.Wind_stress(F)
645
646        domain.forcing_terms = []
647        domain.forcing_terms.append(W)
648
649        domain.compute_forcing_terms()
650
651        # Compute reference solution
652        const = eta_w*rho_a / rho_w
653
654        N = len(domain)    # number_of_triangles
655
656        t = domain.time
657
658        s = speed(t, [1], [0])[0]
659        phi = angle(t, [1], [0])[0]
660
661        # Convert to radians
662        phi = phi*pi / 180
663
664        # Compute velocity vector (u, v)
665        u = s*cos(phi)
666        v = s*sin(phi)
667
668        # Compute wind stress
669        S = const * num.sqrt(u**2 + v**2)
670
671        for k in range(N):
672            assert num.allclose(domain.quantities['stage'].explicit_update[k],
673                                0)
674            assert num.allclose(domain.quantities['xmomentum'].\
675                                    explicit_update[k],
676                                S*u)
677            assert num.allclose(domain.quantities['ymomentum'].\
678                                    explicit_update[k],
679                                S*v)
680
681    def test_wind_stress_error_condition(self):
682        """Test that windstress reacts properly when forcing functions
683        are wrong - e.g. returns a scalar
684        """
685
686        from math import pi, cos, sin
687        from anuga.config import rho_a, rho_w, eta_w
688
689        a = [0.0, 0.0]
690        b = [0.0, 2.0]
691        c = [2.0, 0.0]
692        d = [0.0, 4.0]
693        e = [2.0, 2.0]
694        f = [4.0, 0.0]
695
696        points = [a, b, c, d, e, f]
697        #             bac,     bce,     ecf,     dbe
698        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
699
700        domain = Domain(points, vertices)
701
702        # Flat surface with 1m of water
703        domain.set_quantity('elevation', 0)
704        domain.set_quantity('stage', 1.0)
705        domain.set_quantity('friction', 0)
706
707        Br = anuga.Reflective_boundary(domain)
708        domain.set_boundary({'exterior': Br})
709
710        domain.time = 5.54    # Take a random time (not zero)
711
712        # Setup only one forcing term, bad func
713        domain.forcing_terms = []
714
715        try:
716            domain.forcing_terms.append(anuga.Wind_stress(s=scalar_func_list,
717                                                    phi=angle))
718        except AssertionError:
719            pass
720        else:
721            msg = 'Should have raised exception'
722            raise Exception, msg
723
724        try:
725            domain.forcing_terms.append(Wind_stress(s=speed, phi=scalar_func))
726        except Exception:
727            pass
728        else:
729            msg = 'Should have raised exception'
730            raise Exception, msg
731
732        try:
733            domain.forcing_terms.append(Wind_stress(s=speed, phi='xx'))
734        except:
735            pass
736        else:
737            msg = 'Should have raised exception'
738            raise Exception, msg
739
740    def test_rainfall(self):
741        from math import pi, cos, sin
742
743        a = [0.0, 0.0]
744        b = [0.0, 2.0]
745        c = [2.0, 0.0]
746        d = [0.0, 4.0]
747        e = [2.0, 2.0]
748        f = [4.0, 0.0]
749
750        points = [a, b, c, d, e, f]
751        #             bac,     bce,     ecf,     dbe
752        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
753
754        domain = Domain(points, vertices)
755
756        # Flat surface with 1m of water
757        domain.set_quantity('elevation', 0)
758        domain.set_quantity('stage', 1.0)
759        domain.set_quantity('friction', 0)
760
761        Br = anuga.Reflective_boundary(domain)
762        domain.set_boundary({'exterior': Br})
763
764        # Setup only one forcing term, constant rainfall
765        domain.forcing_terms = []
766        domain.forcing_terms.append(anuga.Rainfall(domain, rate=2.0))
767
768        domain.compute_forcing_terms()
769        assert num.allclose(domain.quantities['stage'].explicit_update,
770                            2.0/1000)
771
772    def test_rainfall_restricted_by_polygon(self):
773        from math import pi, cos, sin
774
775        a = [0.0, 0.0]
776        b = [0.0, 2.0]
777        c = [2.0, 0.0]
778        d = [0.0, 4.0]
779        e = [2.0, 2.0]
780        f = [4.0, 0.0]
781
782        points = [a, b, c, d, e, f]
783        #             bac,     bce,     ecf,     dbe
784        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
785
786        domain = Domain(points, vertices)
787
788        # Flat surface with 1m of water
789        domain.set_quantity('elevation', 0)
790        domain.set_quantity('stage', 1.0)
791        domain.set_quantity('friction', 0)
792
793        Br = anuga.Reflective_boundary(domain)
794        domain.set_boundary({'exterior': Br})
795
796        # Setup only one forcing term, constant rainfall
797        # restricted to a polygon enclosing triangle #1 (bce)
798        domain.forcing_terms = []
799        R = anuga.Rainfall(domain, rate=2.0, polygon=[[1,1], [2,1], [2,2], [1,2]])
800
801        assert num.allclose(R.exchange_area, 2)
802
803        domain.forcing_terms.append(R)
804
805        domain.compute_forcing_terms()
806
807        assert num.allclose(domain.quantities['stage'].explicit_update[1],
808                            2.0/1000)
809        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
810        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
811
812    def test_time_dependent_rainfall_restricted_by_polygon(self):
813        a = [0.0, 0.0]
814        b = [0.0, 2.0]
815        c = [2.0, 0.0]
816        d = [0.0, 4.0]
817        e = [2.0, 2.0]
818        f = [4.0, 0.0]
819
820        points = [a, b, c, d, e, f]
821        #             bac,     bce,     ecf,     dbe
822        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
823
824        domain = anuga.Domain(points, vertices)
825
826        # Flat surface with 1m of water
827        domain.set_quantity('elevation', 0)
828        domain.set_quantity('stage', 1.0)
829        domain.set_quantity('friction', 0)
830
831        Br = anuga.Reflective_boundary(domain)
832        domain.set_boundary({'exterior': Br})
833
834        # Setup only one forcing term, time dependent rainfall
835        # restricted to a polygon enclosing triangle #1 (bce)
836        domain.forcing_terms = []
837        R = anuga.Rainfall(domain,
838                     rate=lambda t: 3*t + 7,
839                     polygon = [[1,1], [2,1], [2,2], [1,2]])
840
841        assert num.allclose(R.exchange_area, 2)
842       
843        domain.forcing_terms.append(R)
844
845        domain.time = 10.
846
847        domain.compute_forcing_terms()
848
849        assert num.allclose(domain.quantities['stage'].explicit_update[1],
850                            (3*domain.time + 7)/1000)
851        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
852        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
853
854    def test_time_dependent_rainfall_using_starttime(self):
855        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
856
857        a = [0.0, 0.0]
858        b = [0.0, 2.0]
859        c = [2.0, 0.0]
860        d = [0.0, 4.0]
861        e = [2.0, 2.0]
862        f = [4.0, 0.0]
863
864        points = [a, b, c, d, e, f]
865        #             bac,     bce,     ecf,     dbe
866        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
867
868        domain = anuga.Domain(points, vertices)
869
870        # Flat surface with 1m of water
871        domain.set_quantity('elevation', 0)
872        domain.set_quantity('stage', 1.0)
873        domain.set_quantity('friction', 0)
874
875        Br = anuga.Reflective_boundary(domain)
876        domain.set_boundary({'exterior': Br})
877
878        # Setup only one forcing term, time dependent rainfall
879        # restricted to a polygon enclosing triangle #1 (bce)
880        domain.forcing_terms = []
881        R = anuga.Rainfall(domain,
882                     rate=lambda t: 3*t + 7,
883                     polygon=rainfall_poly)                     
884
885        assert num.allclose(R.exchange_area, 2)
886       
887        domain.forcing_terms.append(R)
888
889        # This will test that time used in the forcing function takes
890        # startime into account.
891        domain.starttime = 5.0
892
893        domain.time = 7.
894
895        domain.compute_forcing_terms()
896
897        assert num.allclose(domain.quantities['stage'].explicit_update[1],
898                            (3*domain.get_time() + 7)/1000)
899        assert num.allclose(domain.quantities['stage'].explicit_update[1],
900                            (3*(domain.time + domain.starttime) + 7)/1000)
901
902        # Using internal time her should fail
903        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
904                                (3*domain.time + 7)/1000)
905
906        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
907        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
908
909    def test_time_dependent_rainfall_using_georef(self):
910        """test_time_dependent_rainfall_using_georef
911
912        This will also test the General forcing term using georef
913        """
914
915        # Mesh in zone 56 (absolute coords)
916        x0 = 314036.58727982
917        y0 = 6224951.2960092
918
919        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
920        rainfall_poly += [x0, y0]
921
922        a = [0.0, 0.0]
923        b = [0.0, 2.0]
924        c = [2.0, 0.0]
925        d = [0.0, 4.0]
926        e = [2.0, 2.0]
927        f = [4.0, 0.0]
928
929        points = [a, b, c, d, e, f]
930        #             bac,     bce,     ecf,     dbe
931        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
932
933        domain = Domain(points, vertices,
934                        geo_reference=Geo_reference(56, x0, y0))
935
936        # Flat surface with 1m of water
937        domain.set_quantity('elevation', 0)
938        domain.set_quantity('stage', 1.0)
939        domain.set_quantity('friction', 0)
940
941        Br = anuga.Reflective_boundary(domain)
942        domain.set_boundary({'exterior': Br})
943
944        # Setup only one forcing term, time dependent rainfall
945        # restricted to a polygon enclosing triangle #1 (bce)
946        domain.forcing_terms = []
947        R = anuga.Rainfall(domain,
948                     rate=lambda t: 3*t + 7,
949                     polygon=rainfall_poly)
950
951        assert num.allclose(R.exchange_area, 2)
952       
953        domain.forcing_terms.append(R)
954
955        # This will test that time used in the forcing function takes
956        # startime into account.
957        domain.starttime = 5.0
958
959        domain.time = 7.
960
961        domain.compute_forcing_terms()
962
963        assert num.allclose(domain.quantities['stage'].explicit_update[1],
964                            (3*domain.get_time() + 7)/1000)
965        assert num.allclose(domain.quantities['stage'].explicit_update[1],
966                            (3*(domain.time + domain.starttime) + 7)/1000)
967
968        # Using internal time her should fail
969        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
970                                (3*domain.time + 7)/1000)
971
972        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
973        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
974
975    def test_time_dependent_rainfall_restricted_by_polygon_with_default(self):
976        """
977        Test that default rainfall can be used when given rate runs out of data.
978        """
979
980        a = [0.0, 0.0]
981        b = [0.0, 2.0]
982        c = [2.0, 0.0]
983        d = [0.0, 4.0]
984        e = [2.0, 2.0]
985        f = [4.0, 0.0]
986
987        points = [a, b, c, d, e, f]
988        #             bac,     bce,     ecf,     dbe
989        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
990
991        domain = Domain(points, vertices)
992
993        # Flat surface with 1m of water
994        domain.set_quantity('elevation', 0)
995        domain.set_quantity('stage', 1.0)
996        domain.set_quantity('friction', 0)
997
998        Br = anuga.Reflective_boundary(domain)
999        domain.set_boundary({'exterior': Br})
1000
1001        # Setup only one forcing term, time dependent rainfall
1002        # that expires at t==20
1003        from anuga.fit_interpolate.interpolate import Modeltime_too_late
1004
1005        def main_rate(t):
1006            if t > 20:
1007                msg = 'Model time exceeded.'
1008                raise Modeltime_too_late, msg
1009            else:
1010                return 3*t + 7
1011
1012        domain.forcing_terms = []
1013        R = anuga.Rainfall(domain,
1014                     rate=main_rate,
1015                     polygon = [[1,1], [2,1], [2,2], [1,2]],
1016                     default_rate=5.0)
1017
1018        assert num.allclose(R.exchange_area, 2)
1019       
1020        domain.forcing_terms.append(R)
1021
1022        domain.time = 10.
1023
1024        domain.compute_forcing_terms()
1025
1026        assert num.allclose(domain.quantities['stage'].explicit_update[1],
1027                            (3*domain.time+7)/1000)
1028        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
1029        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
1030
1031        domain.time = 100.
1032        domain.quantities['stage'].explicit_update[:] = 0.0     # Reset
1033        domain.compute_forcing_terms()
1034
1035        assert num.allclose(domain.quantities['stage'].explicit_update[1],
1036                            5.0/1000) # Default value
1037        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
1038        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
1039
1040    def test_rainfall_forcing_with_evolve(self):
1041        """test_rainfall_forcing_with_evolve
1042
1043        Test how forcing terms are called within evolve
1044        """
1045
1046        # FIXME(Ole): This test is just to experiment
1047
1048        a = [0.0, 0.0]
1049        b = [0.0, 2.0]
1050        c = [2.0, 0.0]
1051        d = [0.0, 4.0]
1052        e = [2.0, 2.0]
1053        f = [4.0, 0.0]
1054
1055        points = [a, b, c, d, e, f]
1056        #             bac,     bce,     ecf,     dbe
1057        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1058
1059        domain = Domain(points, vertices)
1060
1061        # Flat surface with 1m of water
1062        domain.set_quantity('elevation', 0)
1063        domain.set_quantity('stage', 1.0)
1064        domain.set_quantity('friction', 0)
1065
1066        Br = anuga.Reflective_boundary(domain)
1067        domain.set_boundary({'exterior': Br})
1068
1069        # Setup only one forcing term, time dependent rainfall
1070        # that expires at t==20
1071        from anuga.fit_interpolate.interpolate import Modeltime_too_late
1072
1073        def main_rate(t):
1074            if t > 20:
1075                msg = 'Model time exceeded.'
1076                raise Modeltime_too_late, msg
1077            else:
1078                return 3*t + 7
1079
1080        domain.forcing_terms = []
1081        R = anuga.Rainfall(domain,
1082                     rate=main_rate,
1083                     polygon=[[1,1], [2,1], [2,2], [1,2]],
1084                     default_rate=5.0)
1085
1086        assert num.allclose(R.exchange_area, 2)
1087       
1088        domain.forcing_terms.append(R)
1089
1090        for t in domain.evolve(yieldstep=1, finaltime=25):
1091            pass
1092            #FIXME(Ole):  A test here is hard because explicit_update also
1093            # receives updates from the flux calculation.
1094
1095
1096
1097    def test_inflow_using_circle(self):
1098        from math import pi, cos, sin
1099
1100        a = [0.0, 0.0]
1101        b = [0.0, 2.0]
1102        c = [2.0, 0.0]
1103        d = [0.0, 4.0]
1104        e = [2.0, 2.0]
1105        f = [4.0, 0.0]
1106
1107        points = [a, b, c, d, e, f]
1108        #             bac,     bce,     ecf,     dbe
1109        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1110
1111        domain = Domain(points, vertices)
1112
1113        # Flat surface with 1m of water
1114        domain.set_quantity('elevation', 0)
1115        domain.set_quantity('stage', 1.0)
1116        domain.set_quantity('friction', 0)
1117
1118        Br = anuga.Reflective_boundary(domain)
1119        domain.set_boundary({'exterior': Br})
1120
1121        # Setup only one forcing term, constant inflow of 2 m^3/s
1122        # on a circle affecting triangles #0 and #1 (bac and bce)
1123        domain.forcing_terms = []
1124       
1125        I = anuga.Inflow(domain, rate=2.0, center=(1,1), radius=1)
1126        domain.forcing_terms.append(I)       
1127        domain.compute_forcing_terms()
1128
1129       
1130        A = I.exchange_area
1131        assert num.allclose(A, 4) # Two triangles       
1132       
1133        assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A)
1134        assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A)
1135        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)       
1136
1137
1138    def test_inflow_using_circle_function(self):
1139        from math import pi, cos, sin
1140
1141        a = [0.0, 0.0]
1142        b = [0.0, 2.0]
1143        c = [2.0, 0.0]
1144        d = [0.0, 4.0]
1145        e = [2.0, 2.0]
1146        f = [4.0, 0.0]
1147
1148        points = [a, b, c, d, e, f]
1149        #             bac,     bce,     ecf,     dbe
1150        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1151
1152        domain = anuga.Domain(points, vertices)
1153
1154        # Flat surface with 1m of water
1155        domain.set_quantity('elevation', 0)
1156        domain.set_quantity('stage', 1.0)
1157        domain.set_quantity('friction', 0)
1158
1159        Br = anuga.Reflective_boundary(domain)
1160        domain.set_boundary({'exterior': Br})
1161
1162        # Setup only one forcing term, time dependent inflow of 2 m^3/s
1163        # on a circle affecting triangles #0 and #1 (bac and bce)
1164        domain.forcing_terms = []
1165        I = Inflow(domain, rate=lambda t: 2., center=(1,1), radius=1)
1166        domain.forcing_terms.append(I)
1167       
1168        domain.compute_forcing_terms()       
1169
1170        A = I.exchange_area
1171        assert num.allclose(A, 4) # Two triangles
1172       
1173        assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A)
1174        assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A)
1175        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)       
1176       
1177
1178
1179
1180    def test_inflow_catch_too_few_triangles(self):
1181        """
1182        Test that exception is thrown if no triangles are covered
1183        by the inflow area
1184        """
1185
1186        from math import pi, cos, sin
1187
1188        a = [0.0, 0.0]
1189        b = [0.0, 2.0]
1190        c = [2.0, 0.0]
1191        d = [0.0, 4.0]
1192        e = [2.0, 2.0]
1193        f = [4.0, 0.0]
1194
1195        points = [a, b, c, d, e, f]
1196        #             bac,     bce,     ecf,     dbe
1197        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1198
1199        domain = Domain(points, vertices)
1200
1201        # Flat surface with 1m of water
1202        domain.set_quantity('elevation', 0)
1203        domain.set_quantity('stage', 1.0)
1204        domain.set_quantity('friction', 0)
1205
1206        Br = anuga.Reflective_boundary(domain)
1207        domain.set_boundary({'exterior': Br})
1208
1209        # Setup only one forcing term, constant inflow of 2 m^3/s
1210        # on a circle affecting triangles #0 and #1 (bac and bce)
1211        try:
1212            Inflow(domain, rate=2.0, center=(1,1.1), radius=0.01)
1213        except:
1214            pass
1215        else:
1216            msg = 'Should have raised exception'
1217            raise Exception, msg
1218
1219    def Xtest_inflow_outflow_conservation(self):
1220        """
1221        Test what happens if water is abstracted from one area and
1222        injected into another - especially if there is not enough
1223        water to match the abstraction.
1224        This tests that the total volume is kept constant under a range of
1225        scenarios.
1226
1227        This test will fail as the problem was only fixed for culverts.
1228        """
1229
1230        from math import pi, cos, sin
1231
1232        length = 20.
1233        width = 10.
1234
1235        dx = dy = 2    # 1 or 2 OK
1236        points, vertices, boundary = rectangular_cross(int(length/dx),
1237                                                       int(width/dy),
1238                                                       len1=length,
1239                                                       len2=width)
1240        domain = Domain(points, vertices, boundary)
1241        domain.set_name('test_inflow_conservation')    # Output name
1242        domain.set_default_order(2)
1243
1244        # Flat surface with 1m of water
1245        stage = 1.0
1246        domain.set_quantity('elevation', 0)
1247        domain.set_quantity('stage', stage)
1248        domain.set_quantity('friction', 0)
1249
1250        Br = Reflective_boundary(domain)
1251        domain.set_boundary({'left': Br, 'right': Br, 'bottom': Br, 'top': Br})
1252
1253        # Setup one forcing term, constant inflow of 2 m^3/s on a circle
1254        domain.forcing_terms = []
1255        domain.forcing_terms.append(Inflow(domain, rate=2.0,
1256                                           center=(5,5), radius=1))
1257
1258        domain.compute_forcing_terms()
1259
1260        # Check that update values are correct
1261        for x in domain.quantities['stage'].explicit_update:
1262            assert num.allclose(x, 2.0/pi) or num.allclose(x, 0.0)
1263
1264        # Check volumes without inflow
1265        domain.forcing_terms = []
1266        initial_volume = domain.quantities['stage'].get_integral()
1267
1268        assert num.allclose(initial_volume, width*length*stage)
1269
1270        for t in domain.evolve(yieldstep = 0.05, finaltime = 5.0):
1271            volume = domain.quantities['stage'].get_integral()
1272            assert num.allclose(volume, initial_volume)
1273
1274        # Now apply the inflow and check volumes for a range of stage values
1275        for stage in [2.0, 1.0, 0.5, 0.25, 0.1, 0.0]:
1276            domain.time = 0.0
1277            domain.set_quantity('stage', stage)
1278            domain.forcing_terms = []
1279            domain.forcing_terms.append(Inflow(domain, rate=2.0,
1280                                               center=(5,5), radius=1))
1281            initial_volume = domain.quantities['stage'].get_integral()
1282            predicted_volume = initial_volume
1283            dt = 0.05
1284            for t in domain.evolve(yieldstep=dt, finaltime=5.0):
1285                volume = domain.quantities['stage'].get_integral()
1286                assert num.allclose (volume, predicted_volume)
1287                predicted_volume = predicted_volume + 2.0/pi/100/dt # Why 100?
1288
1289        # Apply equivalent outflow only and check volumes
1290        # for a range of stage values
1291        for stage in [2.0, 1.0, 0.5, 0.25, 0.1, 0.0]:
1292            print stage
1293
1294            domain.time = 0.0
1295            domain.set_quantity('stage', stage)
1296            domain.forcing_terms = []
1297            domain.forcing_terms.append(Inflow(domain, rate=-2.0,
1298                                               center=(15,5), radius=1))
1299            initial_volume = domain.quantities['stage'].get_integral()
1300            predicted_volume = initial_volume
1301            dt = 0.05
1302            for t in domain.evolve(yieldstep=dt, finaltime=5.0):
1303                volume = domain.quantities['stage'].get_integral()
1304                print t, volume, predicted_volume
1305                assert num.allclose (volume, predicted_volume)
1306                predicted_volume = predicted_volume - 2.0/pi/100/dt # Why 100?
1307
1308        # Apply both inflow and outflow and check volumes being constant for a
1309        # range of stage values
1310        for stage in [2.0, 1.0, 0.5, 0.25, 0.1, 0.0]:
1311            print stage
1312
1313            domain.time = 0.0
1314            domain.set_quantity('stage', stage)
1315            domain.forcing_terms = []
1316            domain.forcing_terms.append(Inflow(domain, rate=2.0,
1317                                               center=(5,5), radius=1))
1318            domain.forcing_terms.append(Inflow(domain, rate=-2.0,
1319                                               center=(15,5), radius=1))
1320            initial_volume = domain.quantities['stage'].get_integral()
1321
1322            dt = 0.05
1323            for t in domain.evolve(yieldstep=dt, finaltime=5.0):
1324                volume = domain.quantities['stage'].get_integral()
1325
1326                print t, volume
1327                assert num.allclose(volume, initial_volume)
1328
1329    #####################################################
1330
1331       
1332    def test_inflow_using_flowline(self):
1333        """test_inflow_using_flowline
1334
1335        Test the ability of a flowline to match inflow above the flowline by
1336        creating constant inflow onto a circle at the head of a 20m
1337        wide by 300m long plane dipping at various slopes with a
1338        perpendicular flowline and gauge downstream of the inflow and
1339        a 45 degree flowlines at 200m downstream.
1340
1341        A more substantial version of this test with finer resolution and
1342        including the depth calculation using Manning's equation is
1343        available under the validate_all suite in the directory
1344        anuga_validation/automated_validation_tests/flow_tests.
1345        """
1346
1347       
1348        verbose = False
1349       
1350        #----------------------------------------------------------------------
1351        # Setup computational domain
1352        #----------------------------------------------------------------------
1353        number_of_inflows = 2 # Number of inflows on top of each other
1354        finaltime = 500 #700.0 # If this is too short, steady state will not be achieved
1355
1356        length = 250.
1357        width  = 20.
1358        dx = dy = 5                 # Resolution: of grid on both axes
1359
1360        points, vertices, boundary = rectangular_cross(int(length/dx),
1361                                                       int(width/dy),
1362                                                       len1=length,
1363                                                       len2=width)
1364
1365        for mannings_n in [0.1, 0.01]:
1366            # Loop over a range of roughnesses             
1367           
1368            for slope in [1.0/300, 1.0/100]:
1369                # Loop over a range of bedslopes representing
1370                # sub to super critical flows
1371
1372
1373                domain = Domain(points, vertices, boundary)   
1374                domain.set_name('inflow_flowline_test')     # Output name
1375
1376                #--------------------------------------------------------------
1377                # Setup initial conditions
1378                #--------------------------------------------------------------
1379
1380                def topography(x, y):
1381                    z = -x * slope
1382                    return z
1383
1384                # Use function for elevation
1385                domain.set_quantity('elevation', topography)
1386                # Constant friction of conc surface   
1387                domain.set_quantity('friction', mannings_n)
1388                # Dry initial condition
1389                domain.set_quantity('stage', expression='elevation')
1390
1391                #--------------------------------------------------------------
1392                # Setup Inflow
1393                #--------------------------------------------------------------
1394
1395                # Fixed Flowrate onto Area
1396                fixed_inflow = anuga.Inflow(domain,
1397                                      center=(10.0, 10.0),
1398                                      radius=5.00,
1399                                      rate=10.00)   
1400
1401                # Stack this flow
1402                for i in range(number_of_inflows):
1403                    domain.forcing_terms.append(fixed_inflow)
1404               
1405                ref_flow = fixed_inflow.rate*number_of_inflowsg
1406
1407                # Compute normal depth on plane using Mannings equation
1408                # v=1/n*(r^2/3)*(s^0.5) or r=(Q*n/(s^0.5*W))^0.6
1409                normal_depth=(ref_flow*mannings_n/(slope**0.5*width))**0.6
1410                if verbose:
1411                    print
1412                    print 'Slope:', slope, 'Mannings n:', mannings_n
1413                   
1414                   
1415                #--------------------------------------------------------------
1416                # Setup boundary conditions
1417                #--------------------------------------------------------------
1418
1419                Br = Reflective_boundary(domain)
1420               
1421                # Define downstream boundary based on predicted depth
1422                def normal_depth_stage_downstream(t):
1423                    return (-slope*length) + normal_depth
1424               
1425                Bt = Transmissive_momentum_set_stage_boundary(domain=domain,
1426                                                              function=normal_depth_stage_downstream)
1427               
1428
1429               
1430
1431                domain.set_boundary({'left': Br,
1432                                     'right': Bt,
1433                                     'top': Br,
1434                                     'bottom': Br})
1435
1436
1437
1438                #--------------------------------------------------------------
1439                # Evolve system through time
1440                #--------------------------------------------------------------
1441
1442                for t in domain.evolve(yieldstep=100.0, finaltime=finaltime):
1443                    pass
1444                    #if verbose :
1445                    #    print domain.timestepping_statistics()
1446
1447                    #    print domain.volumetric_balance_statistics()                                   
1448
1449
1450                #--------------------------------------------------------------
1451                # Compute flow thru flowlines ds of inflow
1452                #--------------------------------------------------------------
1453                   
1454                # Square on flowline at 200m
1455                q = domain.get_flow_through_cross_section([[200.0,  0.0],
1456                                                           [200.0, 20.0]])
1457                if verbose:
1458                    print ('90 degree flowline: ANUGA = %f, Ref = %f'
1459                           % (q, ref_flow))
1460
1461                msg = ('Predicted flow was %f, should have been %f'
1462                       % (q, ref_flow))
1463                assert num.allclose(q, ref_flow, rtol=1.0e-2), msg         
1464
1465                           
1466                # 45 degree flowline at 200m
1467                q = domain.get_flow_through_cross_section([[200.0,  0.0],
1468                                                           [220.0, 20.0]])
1469                if verbose:
1470                    print ('45 degree flowline: ANUGA = %f, Ref = %f'
1471                           % (q, ref_flow))
1472                   
1473                msg = ('Predicted flow was %f, should have been %f'
1474                       % (q, ref_flow))
1475                assert num.allclose(q, ref_flow, rtol=1.0e-2), msg         
1476
1477        os.remove('inflow_flowline_test.sww')
1478
1479       
1480    def Xtest_inflow_boundary_using_flowline(self):
1481        """test_inflow_boundary_using_flowline
1482        Test the ability of a flowline to match inflow above the flowline by
1483        creating constant inflow into the boundary at the head of a 20m
1484        wide by 300m long plane dipping at various slopes with a
1485        perpendicular flowline and gauge downstream of the inflow and
1486        a 45 degree flowlines at 200m downstream
1487       
1488       
1489        """
1490
1491        # FIXME (Ole): Work in progress
1492       
1493        verbose = False
1494       
1495
1496        #----------------------------------------------------------------------
1497        # Import necessary modules
1498        #----------------------------------------------------------------------
1499        from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
1500        from anuga.shallow_water import Domain
1501        from anuga.shallow_water.shallow_water_domain import Reflective_boundary
1502        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
1503        from anuga.shallow_water.forcing import Inflow_boundary
1504        from anuga.shallow_water.data_manager import get_flow_through_cross_section
1505        from anuga.abstract_2d_finite_volumes.util import sww2csv_gauges, csv2timeseries_graphs
1506
1507
1508        #----------------------------------------------------------------------
1509        # Setup computational domain
1510        #----------------------------------------------------------------------
1511
1512        finaltime = 500 #700.0 # If this is too short, steady state will not be achieved
1513
1514        length = 250.
1515        width  = 20.
1516        dx = dy = 5          # Resolution: of grid on both axes
1517
1518        points, vertices, boundary = rectangular_cross(int(length/dx), int(width/dy),
1519                                                       len1=length, len2=width)
1520
1521        for mannings_n in [0.1, 0.01]:
1522            # Loop over a range of roughnesses             
1523           
1524            for slope in [1.0/300, 1.0/100]:
1525                # Loop over a range of bedslopes representing sub to super critical flows
1526
1527
1528                domain = Domain(points, vertices, boundary)   
1529                domain.set_name('inflow_boundary_flowline_test')
1530               
1531
1532                #-------------------------------------------------------------
1533                # Setup initial conditions
1534                #-------------------------------------------------------------
1535
1536                def topography(x, y):
1537                    z=-x * slope
1538                    return z
1539
1540                domain.set_quantity('elevation', topography)
1541                domain.set_quantity('friction', mannings_n)
1542                domain.set_quantity('stage',
1543                                    expression='elevation')
1544
1545
1546                   
1547                #--------------------------------------------------------------
1548                # Setup boundary conditions
1549                #--------------------------------------------------------------
1550               
1551
1552
1553                ref_flow = 10.00
1554
1555                # Compute normal depth on plane using Mannings equation
1556                # v=1/n*(r^2/3)*(s^0.5) or r=(Q*n/(s^0.5*W))^0.6
1557                normal_depth=(ref_flow*mannings_n/(slope**0.5*width))**0.6
1558                if verbose:
1559                    print
1560                    print 'Slope:', slope, 'Mannings n:', mannings_n
1561                   
1562               
1563               
1564                Bi = Inflow_boundary(domain, rate=ref_flow)
1565
1566                Br = Reflective_boundary(domain)
1567               
1568                # Define downstream boundary based on predicted depth
1569                def normal_depth_stage_downstream(t):
1570                    return (-slope*length) + normal_depth
1571               
1572                Bt = Transmissive_momentum_set_stage_boundary(domain=domain,
1573                                                              function=normal_depth_stage_downstream)
1574               
1575
1576               
1577
1578                domain.set_boundary({'left': Bi,
1579                                     'right': Bt,
1580                                     'top': Br,
1581                                     'bottom': Br})
1582
1583
1584
1585                #--------------------------------------------------------------
1586                # Evolve system through time
1587                #--------------------------------------------------------------
1588
1589
1590                for t in domain.evolve(yieldstep=100.0, finaltime=finaltime):
1591                    pass
1592                    #if verbose :
1593                    #    print domain.timestepping_statistics()
1594                    #    print domain.volumetric_balance_statistics()
1595                                                       
1596
1597
1598                #--------------------------------------------------------------
1599                # Compute flow thru flowlines ds of inflow
1600                #--------------------------------------------------------------
1601                   
1602                # Square on flowline at 200m
1603                q=domain.get_flow_through_cross_section([[200.0,0.0],[200.0,20.0]])
1604                msg = 'Predicted flow was %f, should have been %f' % (q, ref_flow)
1605                if verbose:
1606                    print '90 degree flowline: ANUGA = %f, Ref = %f' % (q, ref_flow)
1607                assert num.allclose(q, ref_flow, rtol=1.0e-2), msg         
1608
1609                           
1610                # 45 degree flowline at 200m
1611                q=domain.get_flow_through_cross_section([[200.0,0.0],[220.0,20.0]])
1612                msg = 'Predicted flow was %f, should have been %f' % (q, ref_flow)
1613                if verbose:
1614                    print '45 degree flowline: ANUGA = %f, Ref = %f' % (q, ref_flow)
1615                   
1616                assert num.allclose(q, ref_flow, rtol=1.0e-2), msg         
1617
1618       
1619       
1620    def Xtest_friction_dependent_flow_using_flowline(self):
1621        """test_friction_dependent_flow_using_flowline
1622       
1623        Test the internal flow (using flowline) as a function of
1624        different values of Mannings n and different slopes.
1625       
1626        Flow is applied in the form of boundary conditions with fixed momentum.
1627        """
1628
1629        verbose = True
1630
1631        #----------------------------------------------------------------------
1632        # Import necessary modules
1633        #----------------------------------------------------------------------
1634
1635        from anuga.abstract_2d_finite_volumes.mesh_factory \
1636                import rectangular_cross
1637        from anuga.shallow_water import Domain
1638        from anuga.shallow_water.shallow_water_domain import Reflective_boundary
1639        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
1640        from anuga.shallow_water.forcing import Inflow
1641        from anuga.shallow_water.data_manager \
1642                import get_flow_through_cross_section
1643        from anuga.abstract_2d_finite_volumes.util \
1644                import sww2csv_gauges, csv2timeseries_graphs
1645
1646
1647        #----------------------------------------------------------------------
1648        # Setup computational domain
1649        #----------------------------------------------------------------------
1650
1651        finaltime = 1000.0
1652
1653        length = 300.
1654        width  = 20.
1655        dx = dy = 5       # Resolution: of grid on both axes
1656       
1657        # Input parameters
1658        uh = 1.0
1659        vh = 0.0
1660        d = 1.0
1661       
1662        ref_flow = uh*d*width # 20 m^3/s in the x direction across entire domain
1663
1664        points, vertices, boundary = rectangular_cross(int(length/dx),
1665                                                       int(width/dy),
1666                                                       len1=length,
1667                                                       len2=width)
1668
1669        for mannings_n in [0.035]:          #[0.0, 0.012, 0.035]:
1670            for slope in [1.0/300]:         #[0.0, 1.0/300, 1.0/150]:
1671                # Loop over a range of bedslopes representing
1672                # sub to super critical flows
1673                if verbose:
1674                    print
1675                    print 'Slope:', slope, 'Mannings n:', mannings_n
1676                domain = Domain(points, vertices, boundary)   
1677                domain.set_name('Inflow_flowline_test')     # Output name
1678
1679                #--------------------------------------------------------------
1680                # Setup initial conditions
1681                #--------------------------------------------------------------
1682
1683                def topography(x, y):
1684                    z = -x * slope
1685                    return z
1686
1687                # Use function for elevation
1688                domain.set_quantity('elevation', topography)
1689                # Constant friction
1690                domain.set_quantity('friction', mannings_n)
1691               
1692                #domain.set_quantity('stage', expression='elevation')
1693                     
1694                # Set initial flow as depth=1m, uh=1.0 m/s, vh = 0.0
1695                # making it 20 m^3/s across entire domain
1696                domain.set_quantity('stage', expression='elevation + %f' % d)
1697                domain.set_quantity('xmomentum', uh)
1698                domain.set_quantity('ymomentum', vh)               
1699
1700                #--------------------------------------------------------------
1701                # Setup boundary conditions
1702                #--------------------------------------------------------------
1703
1704                Br = Reflective_boundary(domain)      # Solid reflective wall
1705               
1706                # Constant flow in and out of domain
1707                # Depth = 1m, uh=1 m/s, i.e. a flow of 20 m^3/s
1708                # across boundaries
1709                Bi = Dirichlet_boundary([d, uh, vh]) 
1710                Bo = Dirichlet_boundary([-length*slope+d, uh, vh])
1711                #Bo = Dirichlet_boundary([-100, 0, 0])
1712
1713                domain.set_boundary({'left': Bi, 'right': Bo,
1714                                     'top': Br,  'bottom': Br})
1715
1716                #--------------------------------------------------------------
1717                # Evolve system through time
1718                #--------------------------------------------------------------
1719
1720                for t in domain.evolve(yieldstep=100.0, finaltime=finaltime):
1721                    if verbose :
1722                        print domain.timestepping_statistics()
1723                        print domain.volumetric_balance_statistics()
1724
1725                # 90 degree flowline at 200m
1726                q = domain.get_flow_through_cross_section([[200.0,  0.0],
1727                                                           [200.0, 20.0]])
1728                msg = ('Predicted flow was %f, should have been %f'
1729                       % (q, ref_flow))
1730                if verbose:
1731                    print ('90 degree flowline: ANUGA = %f, Ref = %f'
1732                           % (q, ref_flow))
1733
1734                # 45 degree flowline at 200m
1735                q = domain.get_flow_through_cross_section([[200.0,  0.0],
1736                                                           [220.0, 20.0]])
1737                msg = ('Predicted flow was %f, should have been %f'
1738                       % (q, ref_flow))
1739                if verbose:
1740                    print ('45 degree flowline: ANUGA = %f, Ref = %f'
1741                           % (q, ref_flow))
1742
1743                # Stage recorder (gauge) in middle of plane at 200m
1744                x = 200.0
1745                y = 10.00               
1746                w = domain.get_quantity('stage').\
1747                        get_values(interpolation_points=[[x, y]])[0]
1748                z = domain.get_quantity('elevation').\
1749                        get_values(interpolation_points=[[x, y]])[0]
1750                domain_depth = w-z
1751
1752                xmom = domain.get_quantity('xmomentum').\
1753                        get_values(interpolation_points=[[x, y]])[0]
1754                ymom = domain.get_quantity('ymomentum').\
1755                        get_values(interpolation_points=[[x, y]])[0]           
1756                if verbose:                   
1757                    print ('At interpolation point (h, uh, vh): ',
1758                           domain_depth, xmom, ymom)
1759                    print 'uh * d * width = ', xmom*domain_depth*width
1760                               
1761                if slope > 0.0:
1762                    # Compute normal depth at gauge location using Manning eqn
1763                    # v=1/n*(r^2/3)*(s^0.5) or r=(Q*n/(s^0.5*W))^0.6
1764                    normal_depth = (ref_flow*mannings_n/(slope**0.5*width))**0.6
1765                    if verbose:
1766                        print ('Depth: ANUGA = %f, Mannings = %f'
1767                               % (domain_depth, normal_depth))
1768
1769        os.remove('Inflow_flowline_test.sww')
1770
1771
1772#################################################################################
1773
1774if __name__ == "__main__":
1775    suite = unittest.makeSuite(Test_swb_forcing_terms, 'test')
1776    runner = unittest.TextTestRunner(verbosity=1)
1777    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.