source: trunk/anuga_core/source/anuga/shallow_water_balanced/test_swb_forcing_terms.py @ 7869

Last change on this file since 7869 was 7869, checked in by hudson, 14 years ago

Shallow water balanced tests fail, but no longer have errors.

File size: 54.5 KB
RevLine 
[7559]1#!/usr/bin/env python
2
3import unittest, os
4import os.path
5from math import pi, sqrt
6import tempfile
[7866]7import anuga
[7559]8
9from anuga.config import g, epsilon
10from anuga.config import netcdf_mode_r, netcdf_mode_w, netcdf_mode_a
11from anuga.utilities.numerical_tools import mean
[7866]12from anuga.geometry.polygon import is_inside_polygon
[7559]13from anuga.coordinate_transforms.geo_reference import Geo_reference
14from anuga.abstract_2d_finite_volumes.quantity import Quantity
15from anuga.geospatial_data.geospatial_data import Geospatial_data
16from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
17
18from anuga.utilities.system_tools import get_pathname_from_package
19from swb_domain import *
20
21import numpy as num
22
23# Get gateway to C implementation of flux function for direct testing
24from shallow_water_ext import flux_function_central as flux_function
25
26
[7562]27# Variable windfield implemented using functions
28def speed(t, x, y):
29    """Large speeds halfway between center and edges
[7559]30
[7562]31    Low speeds at center and edges
32    """
[7559]33
[7562]34    from math import exp, cos, pi
35
36    x = num.array(x)
37    y = num.array(y)
38
39    N = len(x)
40    s = 0*#New array
41
42    for k in range(N):
43        r = num.sqrt(x[k]**2 + y[k]**2)
44        factor = exp(-(r-0.15)**2)
45        s[k] = 4000 * factor * (cos(t*2*pi/150) + 2)
46
47    return s
48
49def scalar_func(t, x, y):
50    """Function that returns a scalar.
51
52    Used to test error message when numeric array is expected
53    """
54
55    return 17.7
56
57def scalar_func_list(t, x, y):
58    """Function that returns a scalar.
59
60    Used to test error message when numeric array is expected
61    """
62
63    return [17.7]
64
65
66def angle(t, x, y):
67    """Rotating field
68    """
69    from math import atan, pi
70
71    x = num.array(x)
72    y = num.array(y)
73
74    N = len(x)
75    a = 0 * x    # New array
76
77    for k in range(N):
78        r = num.sqrt(x[k]**2 + y[k]**2)
79
80        angle = atan(y[k]/x[k])
81
82        if x[k] < 0:
83            angle += pi
84
85        # Take normal direction
86        angle -= pi/2
87
88        # Ensure positive radians
89        if angle < 0:
90            angle += 2*pi
91
92        a[k] = angle/pi*180
93
94    return a
95
96
97###############################################################################
98
99class Test_swb_forcing_terms(unittest.TestCase):
[7559]100    def setUp(self):
101        pass
102
103    def tearDown(self):
104        pass
105
106    def test_gravity(self):
107        #Assuming no friction
108
109        from anuga.config import g
110
111        a = [0.0, 0.0]
112        b = [0.0, 2.0]
113        c = [2.0, 0.0]
114        d = [0.0, 4.0]
115        e = [2.0, 2.0]
116        f = [4.0, 0.0]
117
118        points = [a, b, c, d, e, f]
119        #             bac,     bce,     ecf,     dbe
120        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
121
122        domain = Domain(points, vertices)
123
124        #Set up for a gradient of (3,0) at mid triangle (bce)
125        def slope(x, y):
126            return 3*x
127
128        h = 0.1
129        def stage(x, y):
130            return slope(x, y) + h
131
132        domain.set_quantity('elevation', slope)
133        domain.set_quantity('stage', stage)
134
135        for name in domain.conserved_quantities:
136            assert num.allclose(domain.quantities[name].explicit_update, 0)
137            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
138
139        domain.compute_forcing_terms()
140
141        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
142        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
143                            -g*h*3)
144        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
145
146    def test_manning_friction(self):
147        from anuga.config import g
148
149        a = [0.0, 0.0]
150        b = [0.0, 2.0]
151        c = [2.0, 0.0]
152        d = [0.0, 4.0]
153        e = [2.0, 2.0]
154        f = [4.0, 0.0]
155
156        points = [a, b, c, d, e, f]
157        #             bac,     bce,     ecf,     dbe
158        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
159
160        domain = Domain(points, vertices)
161
162        #Set up for a gradient of (3,0) at mid triangle (bce)
163        def slope(x, y):
164            return 3*x
165
166        h = 0.1
167        def stage(x, y):
168            return slope(x, y) + h
169
170        eta = 0.07
171        domain.set_quantity('elevation', slope)
172        domain.set_quantity('stage', stage)
173        domain.set_quantity('friction', eta)
174
175        for name in domain.conserved_quantities:
176            assert num.allclose(domain.quantities[name].explicit_update, 0)
177            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
178
179        domain.compute_forcing_terms()
180
181        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
182        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
183                            -g*h*3)
184        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
185
186        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
187        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
188                            0)
189        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
190                            0)
191
192        #Create some momentum for friction to work with
193        domain.set_quantity('xmomentum', 1)
194        dz = sqrt(10.0)
195        S = -g*eta**2 *dz / h**(7.0/3) 
196
197        domain.compute_forcing_terms()
198        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
199        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
200                            S)
201        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
202                            0)
203
204        #A more complex example
205        domain.quantities['stage'].semi_implicit_update[:] = 0.0
206        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
207        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
208
209        domain.set_quantity('xmomentum', 3)
210        domain.set_quantity('ymomentum', 4)
211
212        S = -g*eta**2*5*dz / h**(7.0/3) 
213
214        domain.compute_forcing_terms()
215
216        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
217        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
218                            3*S)
219        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
220                            4*S)
221
222
223
224    def test_manning_friction_old(self):
225        from anuga.config import g
226
227        a = [0.0, 0.0]
228        b = [0.0, 2.0]
229        c = [2.0, 0.0]
230        d = [0.0, 4.0]
231        e = [2.0, 2.0]
232        f = [4.0, 0.0]
233
234        points = [a, b, c, d, e, f]
235        #             bac,     bce,     ecf,     dbe
236        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
237
238        domain = Domain(points, vertices)
239
240        #Turn old mannings function on
241        domain.set_new_mannings_function(False)
242
243        #Set up for a gradient of (3,0) at mid triangle (bce)
244        def slope(x, y):
245            return 3*x
246
247        h = 0.1
248        def stage(x, y):
249            return slope(x, y) + h
250
251        eta = 0.07
252        domain.set_quantity('elevation', slope)
253        domain.set_quantity('stage', stage)
254        domain.set_quantity('friction', eta)
255
256        for name in domain.conserved_quantities:
257            assert num.allclose(domain.quantities[name].explicit_update, 0)
258            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
259
260        domain.compute_forcing_terms()
261
262        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
263        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
264                            -g*h*3)
265        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
266
267        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
268        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
269                            0)
270        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
271                            0)
272
273        #Create some momentum for friction to work with
274        domain.set_quantity('xmomentum', 1)
275        S = -g*eta**2 / h**(7.0/3)
276
277        domain.compute_forcing_terms()
278        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
279        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
280                            S)
281        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
282                            0)
283
284        #A more complex example
285        domain.quantities['stage'].semi_implicit_update[:] = 0.0
286        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
287        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
288
289        domain.set_quantity('xmomentum', 3)
290        domain.set_quantity('ymomentum', 4)
291
292        S = -g*eta**2*5 / h**(7.0/3)
293
294        domain.compute_forcing_terms()
295
296        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
297        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
298                            3*S)
299        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
300                            4*S)
301
302
303    def test_manning_friction_new(self):
304        from anuga.config import g
305
306        a = [0.0, 0.0]
307        b = [0.0, 2.0]
308        c = [2.0, 0.0]
309        d = [0.0, 4.0]
310        e = [2.0, 2.0]
311        f = [4.0, 0.0]
312
313        points = [a, b, c, d, e, f]
314        #             bac,     bce,     ecf,     dbe
315        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
316
317        domain = Domain(points, vertices)
318
319        # Use the new function which takes into account the extra
320        # wetted area due to slope of bed
321        domain.set_new_mannings_function(True)
322       
323        #Set up for a gradient of (3,0) at mid triangle (bce)
324        def slope(x, y):
325            return 3*x
326
327        h = 0.1
328        def stage(x, y):
329            return slope(x, y) + h
330
331        eta = 0.07
332        domain.set_quantity('elevation', slope)
333        domain.set_quantity('stage', stage)
334        domain.set_quantity('friction', eta)
335
336        for name in domain.conserved_quantities:
337            assert num.allclose(domain.quantities[name].explicit_update, 0)
338            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
339
340        domain.compute_forcing_terms()
341
342        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
343        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
344                            -g*h*3)
345        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
346
347        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
348        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
349                            0)
350        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
351                            0)
352
353        #Create some momentum for friction to work with
354        domain.set_quantity('xmomentum', 1)
355        S = -g*eta**2 / h**(7.0/3) * sqrt(10)
356
357        domain.compute_forcing_terms()
358        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
359        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
360                            S)
361        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
362                            0)
363
364        #A more complex example
365        domain.quantities['stage'].semi_implicit_update[:] = 0.0
366        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
367        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
368
369        domain.set_quantity('xmomentum', 3)
370        domain.set_quantity('ymomentum', 4)
371
372        S = -g*eta**2*5 / h**(7.0/3) * sqrt(10.0)
373
374        domain.compute_forcing_terms()
375
376        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
377        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
378                            3*S)
379        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
380                            4*S)
381       
382    def test_constant_wind_stress(self):
383        from anuga.config import rho_a, rho_w, eta_w
384        from math import pi, cos, sin
385
386        a = [0.0, 0.0]
387        b = [0.0, 2.0]
388        c = [2.0, 0.0]
389        d = [0.0, 4.0]
390        e = [2.0, 2.0]
391        f = [4.0, 0.0]
392
393        points = [a, b, c, d, e, f]
394        #             bac,     bce,     ecf,     dbe
395        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
396
[7866]397        domain = anuga.Domain(points, vertices)
[7559]398
399        #Flat surface with 1m of water
400        domain.set_quantity('elevation', 0)
401        domain.set_quantity('stage', 1.0)
402        domain.set_quantity('friction', 0)
403
[7866]404        Br = anuga.Reflective_boundary(domain)
[7559]405        domain.set_boundary({'exterior': Br})
406
407        #Setup only one forcing term, constant wind stress
408        s = 100
409        phi = 135
410        domain.forcing_terms = []
[7866]411        domain.forcing_terms.append(anuga.Wind_stress(s, phi))
[7559]412
413        domain.compute_forcing_terms()
414
415        const = eta_w*rho_a / rho_w
416
417        #Convert to radians
418        phi = phi*pi / 180
419
420        #Compute velocity vector (u, v)
421        u = s*cos(phi)
422        v = s*sin(phi)
423
424        #Compute wind stress
425        S = const * num.sqrt(u**2 + v**2)
426
427        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
428        assert num.allclose(domain.quantities['xmomentum'].explicit_update, S*u)
429        assert num.allclose(domain.quantities['ymomentum'].explicit_update, S*v)
430
431    def test_variable_wind_stress(self):
432        from anuga.config import rho_a, rho_w, eta_w
433        from math import pi, cos, sin
434
435        a = [0.0, 0.0]
436        b = [0.0, 2.0]
437        c = [2.0, 0.0]
438        d = [0.0, 4.0]
439        e = [2.0, 2.0]
440        f = [4.0, 0.0]
441
442        points = [a, b, c, d, e, f]
443        #             bac,     bce,     ecf,     dbe
444        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
445
446        domain = Domain(points, vertices)
447
448        #Flat surface with 1m of water
449        domain.set_quantity('elevation', 0)
450        domain.set_quantity('stage', 1.0)
451        domain.set_quantity('friction', 0)
452
[7866]453        Br = anuga.Reflective_boundary(domain)
[7559]454        domain.set_boundary({'exterior': Br})
455
456        domain.time = 5.54    # Take a random time (not zero)
457
458        #Setup only one forcing term, constant wind stress
459        s = 100
460        phi = 135
461        domain.forcing_terms = []
[7866]462        domain.forcing_terms.append(anuga.Wind_stress(s=speed, phi=angle))
[7559]463
464        domain.compute_forcing_terms()
465
466        #Compute reference solution
467        const = eta_w*rho_a / rho_w
468
469        N = len(domain)    # number_of_triangles
470
471        xc = domain.get_centroid_coordinates()
472        t = domain.time
473
474        x = xc[:,0]
475        y = xc[:,1]
476        s_vec = speed(t,x,y)
477        phi_vec = angle(t,x,y)
478
479        for k in range(N):
480            # Convert to radians
481            phi = phi_vec[k]*pi / 180
482            s = s_vec[k]
483
484            # Compute velocity vector (u, v)
485            u = s*cos(phi)
486            v = s*sin(phi)
487
488            # Compute wind stress
489            S = const * num.sqrt(u**2 + v**2)
490
491            assert num.allclose(domain.quantities['stage'].explicit_update[k],
492                                0)
493            assert num.allclose(domain.quantities['xmomentum'].\
494                                     explicit_update[k],
495                                S*u)
496            assert num.allclose(domain.quantities['ymomentum'].\
497                                     explicit_update[k],
498                                S*v)
499
500    def test_windfield_from_file(self):
501        import time
502        from anuga.config import rho_a, rho_w, eta_w
503        from math import pi, cos, sin
504        from anuga.config import time_format
505        from anuga.abstract_2d_finite_volumes.util import file_function
506
507        a = [0.0, 0.0]
508        b = [0.0, 2.0]
509        c = [2.0, 0.0]
510        d = [0.0, 4.0]
511        e = [2.0, 2.0]
512        f = [4.0, 0.0]
513
514        points = [a, b, c, d, e, f]
515        #             bac,     bce,     ecf,     dbe
516        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
517
518        domain = Domain(points, vertices)
519
520        # Flat surface with 1m of water
521        domain.set_quantity('elevation', 0)
522        domain.set_quantity('stage', 1.0)
523        domain.set_quantity('friction', 0)
524
[7866]525        Br = anuga.Reflective_boundary(domain)
[7559]526        domain.set_boundary({'exterior': Br})
527
528        domain.time = 7    # Take a time that is represented in file (not zero)
529
530        # Write wind stress file (ensure that domain.time is covered)
531        # Take x=1 and y=0
532        filename = 'test_windstress_from_file'
533        start = time.mktime(time.strptime('2000', '%Y'))
534        fid = open(filename + '.txt', 'w')
535        dt = 1    # One second interval
536        t = 0.0
537        while t <= 10.0:
538            t_string = time.strftime(time_format, time.gmtime(t+start))
539
540            fid.write('%s, %f %f\n' %
541                      (t_string, speed(t,[1],[0])[0], angle(t,[1],[0])[0]))
542            t += dt
543
544        fid.close()
545
[7866]546        anuga.timefile2netcdf(filename+'.txt', filename+'.tms')
[7559]547        os.remove(filename + '.txt')
548
549        # Setup wind stress
550        F = file_function(filename + '.tms',
551                          quantities=['Attribute0', 'Attribute1'])
552        os.remove(filename + '.tms')
553
[7869]554        W = anuga.Wind_stress(F)
[7559]555
556        domain.forcing_terms = []
557        domain.forcing_terms.append(W)
558
559        domain.compute_forcing_terms()
560
561        # Compute reference solution
562        const = eta_w*rho_a / rho_w
563
564        N = len(domain)    # number_of_triangles
565
566        t = domain.time
567
568        s = speed(t, [1], [0])[0]
569        phi = angle(t, [1], [0])[0]
570
571        # Convert to radians
572        phi = phi*pi / 180
573
574        # Compute velocity vector (u, v)
575        u = s*cos(phi)
576        v = s*sin(phi)
577
578        # Compute wind stress
579        S = const * num.sqrt(u**2 + v**2)
580
581        for k in range(N):
582            assert num.allclose(domain.quantities['stage'].explicit_update[k],
583                                0)
584            assert num.allclose(domain.quantities['xmomentum'].\
585                                    explicit_update[k],
586                                S*u)
587            assert num.allclose(domain.quantities['ymomentum'].\
588                                    explicit_update[k],
589                                S*v)
590
591    def test_windfield_from_file_seconds(self):
592        import time
593        from anuga.config import rho_a, rho_w, eta_w
594        from math import pi, cos, sin
595        from anuga.config import time_format
596        from anuga.abstract_2d_finite_volumes.util import file_function
597
598        a = [0.0, 0.0]
599        b = [0.0, 2.0]
600        c = [2.0, 0.0]
601        d = [0.0, 4.0]
602        e = [2.0, 2.0]
603        f = [4.0, 0.0]
604
605        points = [a, b, c, d, e, f]
606        #             bac,     bce,     ecf,     dbe
607        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
608
609        domain = Domain(points, vertices)
610
611        # Flat surface with 1m of water
612        domain.set_quantity('elevation', 0)
613        domain.set_quantity('stage', 1.0)
614        domain.set_quantity('friction', 0)
615
[7866]616        Br = anuga.Reflective_boundary(domain)
[7559]617        domain.set_boundary({'exterior': Br})
618
619        domain.time = 7    # Take a time that is represented in file (not zero)
620
621        # Write wind stress file (ensure that domain.time is covered)
622        # Take x=1 and y=0
[7866]623        filename = 'test_windstress_from_file.txt'
624        file_out = 'test_windstress_from_file.tms'
[7559]625        start = time.mktime(time.strptime('2000', '%Y'))
[7866]626        fid = open(filename, 'w')
[7559]627        dt = 0.5    # Half second interval
628        t = 0.0
629        while t <= 10.0:
630            fid.write('%s, %f %f\n'
631                      % (str(t), speed(t, [1], [0])[0], angle(t, [1], [0])[0]))
632            t += dt
633
634        fid.close()
635
[7866]636        anuga.timefile2netcdf(filename, file_out, time_as_seconds=True)
637        os.remove(filename)
[7559]638
639        # Setup wind stress
[7866]640        F = file_function(file_out,
[7559]641                          quantities=['Attribute0', 'Attribute1'])
[7866]642        os.remove(file_out)
[7559]643
[7866]644        W = anuga.Wind_stress(F)
[7559]645
646        domain.forcing_terms = []
647        domain.forcing_terms.append(W)
648
649        domain.compute_forcing_terms()
650
651        # Compute reference solution
652        const = eta_w*rho_a / rho_w
653
654        N = len(domain)    # number_of_triangles
655
656        t = domain.time
657
658        s = speed(t, [1], [0])[0]
659        phi = angle(t, [1], [0])[0]
660
661        # Convert to radians
662        phi = phi*pi / 180
663
664        # Compute velocity vector (u, v)
665        u = s*cos(phi)
666        v = s*sin(phi)
667
668        # Compute wind stress
669        S = const * num.sqrt(u**2 + v**2)
670
671        for k in range(N):
672            assert num.allclose(domain.quantities['stage'].explicit_update[k],
673                                0)
674            assert num.allclose(domain.quantities['xmomentum'].\
675                                    explicit_update[k],
676                                S*u)
677            assert num.allclose(domain.quantities['ymomentum'].\
678                                    explicit_update[k],
679                                S*v)
680
681    def test_wind_stress_error_condition(self):
682        """Test that windstress reacts properly when forcing functions
683        are wrong - e.g. returns a scalar
684        """
685
686        from math import pi, cos, sin
687        from anuga.config import rho_a, rho_w, eta_w
688
689        a = [0.0, 0.0]
690        b = [0.0, 2.0]
691        c = [2.0, 0.0]
692        d = [0.0, 4.0]
693        e = [2.0, 2.0]
694        f = [4.0, 0.0]
695
696        points = [a, b, c, d, e, f]
697        #             bac,     bce,     ecf,     dbe
698        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
699
700        domain = Domain(points, vertices)
701
702        # Flat surface with 1m of water
703        domain.set_quantity('elevation', 0)
704        domain.set_quantity('stage', 1.0)
705        domain.set_quantity('friction', 0)
706
[7866]707        Br = anuga.Reflective_boundary(domain)
[7559]708        domain.set_boundary({'exterior': Br})
709
710        domain.time = 5.54    # Take a random time (not zero)
711
712        # Setup only one forcing term, bad func
713        domain.forcing_terms = []
714
715        try:
[7866]716            domain.forcing_terms.append(anuga.Wind_stress(s=scalar_func_list,
[7559]717                                                    phi=angle))
718        except AssertionError:
719            pass
720        else:
721            msg = 'Should have raised exception'
722            raise Exception, msg
723
724        try:
725            domain.forcing_terms.append(Wind_stress(s=speed, phi=scalar_func))
726        except Exception:
727            pass
728        else:
729            msg = 'Should have raised exception'
730            raise Exception, msg
731
732        try:
733            domain.forcing_terms.append(Wind_stress(s=speed, phi='xx'))
734        except:
735            pass
736        else:
737            msg = 'Should have raised exception'
738            raise Exception, msg
739
740    def test_rainfall(self):
741        from math import pi, cos, sin
742
743        a = [0.0, 0.0]
744        b = [0.0, 2.0]
745        c = [2.0, 0.0]
746        d = [0.0, 4.0]
747        e = [2.0, 2.0]
748        f = [4.0, 0.0]
749
750        points = [a, b, c, d, e, f]
751        #             bac,     bce,     ecf,     dbe
752        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
753
754        domain = Domain(points, vertices)
755
756        # Flat surface with 1m of water
757        domain.set_quantity('elevation', 0)
758        domain.set_quantity('stage', 1.0)
759        domain.set_quantity('friction', 0)
760
[7866]761        Br = anuga.Reflective_boundary(domain)
[7559]762        domain.set_boundary({'exterior': Br})
763
764        # Setup only one forcing term, constant rainfall
765        domain.forcing_terms = []
[7866]766        domain.forcing_terms.append(anuga.Rainfall(domain, rate=2.0))
[7559]767
768        domain.compute_forcing_terms()
769        assert num.allclose(domain.quantities['stage'].explicit_update,
770                            2.0/1000)
771
772    def test_rainfall_restricted_by_polygon(self):
773        from math import pi, cos, sin
774
775        a = [0.0, 0.0]
776        b = [0.0, 2.0]
777        c = [2.0, 0.0]
778        d = [0.0, 4.0]
779        e = [2.0, 2.0]
780        f = [4.0, 0.0]
781
782        points = [a, b, c, d, e, f]
783        #             bac,     bce,     ecf,     dbe
784        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
785
786        domain = Domain(points, vertices)
787
788        # Flat surface with 1m of water
789        domain.set_quantity('elevation', 0)
790        domain.set_quantity('stage', 1.0)
791        domain.set_quantity('friction', 0)
792
[7866]793        Br = anuga.Reflective_boundary(domain)
[7559]794        domain.set_boundary({'exterior': Br})
795
796        # Setup only one forcing term, constant rainfall
797        # restricted to a polygon enclosing triangle #1 (bce)
798        domain.forcing_terms = []
[7866]799        R = anuga.Rainfall(domain, rate=2.0, polygon=[[1,1], [2,1], [2,2], [1,2]])
[7559]800
801        assert num.allclose(R.exchange_area, 2)
802
803        domain.forcing_terms.append(R)
804
805        domain.compute_forcing_terms()
806
807        assert num.allclose(domain.quantities['stage'].explicit_update[1],
808                            2.0/1000)
809        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
810        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
811
812    def test_time_dependent_rainfall_restricted_by_polygon(self):
813        a = [0.0, 0.0]
814        b = [0.0, 2.0]
815        c = [2.0, 0.0]
816        d = [0.0, 4.0]
817        e = [2.0, 2.0]
818        f = [4.0, 0.0]
819
820        points = [a, b, c, d, e, f]
821        #             bac,     bce,     ecf,     dbe
822        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
823
[7866]824        domain = anuga.Domain(points, vertices)
[7559]825
826        # Flat surface with 1m of water
827        domain.set_quantity('elevation', 0)
828        domain.set_quantity('stage', 1.0)
829        domain.set_quantity('friction', 0)
830
[7866]831        Br = anuga.Reflective_boundary(domain)
[7559]832        domain.set_boundary({'exterior': Br})
833
834        # Setup only one forcing term, time dependent rainfall
835        # restricted to a polygon enclosing triangle #1 (bce)
836        domain.forcing_terms = []
[7866]837        R = anuga.Rainfall(domain,
[7559]838                     rate=lambda t: 3*t + 7,
839                     polygon = [[1,1], [2,1], [2,2], [1,2]])
840
841        assert num.allclose(R.exchange_area, 2)
842       
843        domain.forcing_terms.append(R)
844
845        domain.time = 10.
846
847        domain.compute_forcing_terms()
848
849        assert num.allclose(domain.quantities['stage'].explicit_update[1],
850                            (3*domain.time + 7)/1000)
851        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
852        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
853
854    def test_time_dependent_rainfall_using_starttime(self):
855        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
856
857        a = [0.0, 0.0]
858        b = [0.0, 2.0]
859        c = [2.0, 0.0]
860        d = [0.0, 4.0]
861        e = [2.0, 2.0]
862        f = [4.0, 0.0]
863
864        points = [a, b, c, d, e, f]
865        #             bac,     bce,     ecf,     dbe
866        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
867
[7866]868        domain = anuga.Domain(points, vertices)
[7559]869
870        # Flat surface with 1m of water
871        domain.set_quantity('elevation', 0)
872        domain.set_quantity('stage', 1.0)
873        domain.set_quantity('friction', 0)
874
[7866]875        Br = anuga.Reflective_boundary(domain)
[7559]876        domain.set_boundary({'exterior': Br})
877
878        # Setup only one forcing term, time dependent rainfall
879        # restricted to a polygon enclosing triangle #1 (bce)
880        domain.forcing_terms = []
[7866]881        R = anuga.Rainfall(domain,
[7559]882                     rate=lambda t: 3*t + 7,
883                     polygon=rainfall_poly)                     
884
885        assert num.allclose(R.exchange_area, 2)
886       
887        domain.forcing_terms.append(R)
888
889        # This will test that time used in the forcing function takes
890        # startime into account.
891        domain.starttime = 5.0
892
893        domain.time = 7.
894
895        domain.compute_forcing_terms()
896
897        assert num.allclose(domain.quantities['stage'].explicit_update[1],
898                            (3*domain.get_time() + 7)/1000)
899        assert num.allclose(domain.quantities['stage'].explicit_update[1],
900                            (3*(domain.time + domain.starttime) + 7)/1000)
901
902        # Using internal time her should fail
903        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
904                                (3*domain.time + 7)/1000)
905
906        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
907        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
908
909    def test_time_dependent_rainfall_using_georef(self):
910        """test_time_dependent_rainfall_using_georef
911
912        This will also test the General forcing term using georef
913        """
914
915        # Mesh in zone 56 (absolute coords)
916        x0 = 314036.58727982
917        y0 = 6224951.2960092
918
919        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
920        rainfall_poly += [x0, y0]
921
922        a = [0.0, 0.0]
923        b = [0.0, 2.0]
924        c = [2.0, 0.0]
925        d = [0.0, 4.0]
926        e = [2.0, 2.0]
927        f = [4.0, 0.0]
928
929        points = [a, b, c, d, e, f]
930        #             bac,     bce,     ecf,     dbe
931        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
932
933        domain = Domain(points, vertices,
934                        geo_reference=Geo_reference(56, x0, y0))
935
936        # Flat surface with 1m of water
937        domain.set_quantity('elevation', 0)
938        domain.set_quantity('stage', 1.0)
939        domain.set_quantity('friction', 0)
940
[7866]941        Br = anuga.Reflective_boundary(domain)
[7559]942        domain.set_boundary({'exterior': Br})
943
944        # Setup only one forcing term, time dependent rainfall
945        # restricted to a polygon enclosing triangle #1 (bce)
946        domain.forcing_terms = []
[7866]947        R = anuga.Rainfall(domain,
[7559]948                     rate=lambda t: 3*t + 7,
949                     polygon=rainfall_poly)
950
951        assert num.allclose(R.exchange_area, 2)
952       
953        domain.forcing_terms.append(R)
954
955        # This will test that time used in the forcing function takes
956        # startime into account.
957        domain.starttime = 5.0
958
959        domain.time = 7.
960
961        domain.compute_forcing_terms()
962
963        assert num.allclose(domain.quantities['stage'].explicit_update[1],
964                            (3*domain.get_time() + 7)/1000)
965        assert num.allclose(domain.quantities['stage'].explicit_update[1],
966                            (3*(domain.time + domain.starttime) + 7)/1000)
967
968        # Using internal time her should fail
969        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
970                                (3*domain.time + 7)/1000)
971
972        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
973        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
974
975    def test_time_dependent_rainfall_restricted_by_polygon_with_default(self):
976        """
977        Test that default rainfall can be used when given rate runs out of data.
978        """
979
980        a = [0.0, 0.0]
981        b = [0.0, 2.0]
982        c = [2.0, 0.0]
983        d = [0.0, 4.0]
984        e = [2.0, 2.0]
985        f = [4.0, 0.0]
986
987        points = [a, b, c, d, e, f]
988        #             bac,     bce,     ecf,     dbe
989        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
990
991        domain = Domain(points, vertices)
992
993        # Flat surface with 1m of water
994        domain.set_quantity('elevation', 0)
995        domain.set_quantity('stage', 1.0)
996        domain.set_quantity('friction', 0)
997
[7866]998        Br = anuga.Reflective_boundary(domain)
[7559]999        domain.set_boundary({'exterior': Br})
1000
1001        # Setup only one forcing term, time dependent rainfall
1002        # that expires at t==20
1003        from anuga.fit_interpolate.interpolate import Modeltime_too_late
1004
1005        def main_rate(t):
1006            if t > 20:
1007                msg = 'Model time exceeded.'
1008                raise Modeltime_too_late, msg
1009            else:
1010                return 3*t + 7
1011
1012        domain.forcing_terms = []
[7866]1013        R = anuga.Rainfall(domain,
[7559]1014                     rate=main_rate,
1015                     polygon = [[1,1], [2,1], [2,2], [1,2]],
1016                     default_rate=5.0)
1017
1018        assert num.allclose(R.exchange_area, 2)
1019       
1020        domain.forcing_terms.append(R)
1021
1022        domain.time = 10.
1023
1024        domain.compute_forcing_terms()
1025
1026        assert num.allclose(domain.quantities['stage'].explicit_update[1],
1027                            (3*domain.time+7)/1000)
1028        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
1029        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
1030
1031        domain.time = 100.
1032        domain.quantities['stage'].explicit_update[:] = 0.0     # Reset
1033        domain.compute_forcing_terms()
1034
1035        assert num.allclose(domain.quantities['stage'].explicit_update[1],
1036                            5.0/1000) # Default value
1037        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
1038        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
1039
1040    def test_rainfall_forcing_with_evolve(self):
1041        """test_rainfall_forcing_with_evolve
1042
1043        Test how forcing terms are called within evolve
1044        """
1045
1046        # FIXME(Ole): This test is just to experiment
1047
1048        a = [0.0, 0.0]
1049        b = [0.0, 2.0]
1050        c = [2.0, 0.0]
1051        d = [0.0, 4.0]
1052        e = [2.0, 2.0]
1053        f = [4.0, 0.0]
1054
1055        points = [a, b, c, d, e, f]
1056        #             bac,     bce,     ecf,     dbe
1057        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1058
1059        domain = Domain(points, vertices)
1060
1061        # Flat surface with 1m of water
1062        domain.set_quantity('elevation', 0)
1063        domain.set_quantity('stage', 1.0)
1064        domain.set_quantity('friction', 0)
1065
[7866]1066        Br = anuga.Reflective_boundary(domain)
[7559]1067        domain.set_boundary({'exterior': Br})
1068
1069        # Setup only one forcing term, time dependent rainfall
1070        # that expires at t==20
1071        from anuga.fit_interpolate.interpolate import Modeltime_too_late
1072
1073        def main_rate(t):
1074            if t > 20:
1075                msg = 'Model time exceeded.'
1076                raise Modeltime_too_late, msg
1077            else:
1078                return 3*t + 7
1079
1080        domain.forcing_terms = []
[7866]1081        R = anuga.Rainfall(domain,
[7559]1082                     rate=main_rate,
1083                     polygon=[[1,1], [2,1], [2,2], [1,2]],
1084                     default_rate=5.0)
1085
1086        assert num.allclose(R.exchange_area, 2)
1087       
1088        domain.forcing_terms.append(R)
1089
1090        for t in domain.evolve(yieldstep=1, finaltime=25):
1091            pass
1092            #FIXME(Ole):  A test here is hard because explicit_update also
1093            # receives updates from the flux calculation.
1094
1095
1096
1097    def test_inflow_using_circle(self):
1098        from math import pi, cos, sin
1099
1100        a = [0.0, 0.0]
1101        b = [0.0, 2.0]
1102        c = [2.0, 0.0]
1103        d = [0.0, 4.0]
1104        e = [2.0, 2.0]
1105        f = [4.0, 0.0]
1106
1107        points = [a, b, c, d, e, f]
1108        #             bac,     bce,     ecf,     dbe
1109        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1110
1111        domain = Domain(points, vertices)
1112
1113        # Flat surface with 1m of water
1114        domain.set_quantity('elevation', 0)
1115        domain.set_quantity('stage', 1.0)
1116        domain.set_quantity('friction', 0)
1117
[7866]1118        Br = anuga.Reflective_boundary(domain)
[7559]1119        domain.set_boundary({'exterior': Br})
1120
1121        # Setup only one forcing term, constant inflow of 2 m^3/s
1122        # on a circle affecting triangles #0 and #1 (bac and bce)
1123        domain.forcing_terms = []
1124       
[7866]1125        I = anuga.Inflow(domain, rate=2.0, center=(1,1), radius=1)
[7559]1126        domain.forcing_terms.append(I)       
1127        domain.compute_forcing_terms()
1128
1129       
1130        A = I.exchange_area
1131        assert num.allclose(A, 4) # Two triangles       
1132       
1133        assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A)
1134        assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A)
1135        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)       
1136
1137
1138    def test_inflow_using_circle_function(self):
1139        from math import pi, cos, sin
1140
1141        a = [0.0, 0.0]
1142        b = [0.0, 2.0]
1143        c = [2.0, 0.0]
1144        d = [0.0, 4.0]
1145        e = [2.0, 2.0]
1146        f = [4.0, 0.0]
1147
1148        points = [a, b, c, d, e, f]
1149        #             bac,     bce,     ecf,     dbe
1150        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1151
[7866]1152        domain = anuga.Domain(points, vertices)
[7559]1153
1154        # Flat surface with 1m of water
1155        domain.set_quantity('elevation', 0)
1156        domain.set_quantity('stage', 1.0)
1157        domain.set_quantity('friction', 0)
1158
[7866]1159        Br = anuga.Reflective_boundary(domain)
[7559]1160        domain.set_boundary({'exterior': Br})
1161
1162        # Setup only one forcing term, time dependent inflow of 2 m^3/s
1163        # on a circle affecting triangles #0 and #1 (bac and bce)
1164        domain.forcing_terms = []
[7869]1165        I = anuga.Inflow(domain, rate=lambda t: 2., center=(1,1), radius=1)
[7559]1166        domain.forcing_terms.append(I)
1167       
1168        domain.compute_forcing_terms()       
1169
1170        A = I.exchange_area
1171        assert num.allclose(A, 4) # Two triangles
1172       
1173        assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A)
1174        assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A)
1175        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)       
1176       
1177
1178
1179
1180    def test_inflow_catch_too_few_triangles(self):
1181        """
1182        Test that exception is thrown if no triangles are covered
1183        by the inflow area
1184        """
1185
1186        from math import pi, cos, sin
1187
1188        a = [0.0, 0.0]
1189        b = [0.0, 2.0]
1190        c = [2.0, 0.0]
1191        d = [0.0, 4.0]
1192        e = [2.0, 2.0]
1193        f = [4.0, 0.0]
1194
1195        points = [a, b, c, d, e, f]
1196        #             bac,     bce,     ecf,     dbe
1197        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1198
1199        domain = Domain(points, vertices)
1200
1201        # Flat surface with 1m of water
1202        domain.set_quantity('elevation', 0)
1203        domain.set_quantity('stage', 1.0)
1204        domain.set_quantity('friction', 0)
1205
[7866]1206        Br = anuga.Reflective_boundary(domain)
[7559]1207        domain.set_boundary({'exterior': Br})
1208
1209        # Setup only one forcing term, constant inflow of 2 m^3/s
1210        # on a circle affecting triangles #0 and #1 (bac and bce)
1211        try:
1212            Inflow(domain, rate=2.0, center=(1,1.1), radius=0.01)
1213        except:
1214            pass
1215        else:
1216            msg = 'Should have raised exception'
1217            raise Exception, msg
1218
1219    def Xtest_inflow_outflow_conservation(self):
1220        """
1221        Test what happens if water is abstracted from one area and
1222        injected into another - especially if there is not enough
1223        water to match the abstraction.
1224        This tests that the total volume is kept constant under a range of
1225        scenarios.
1226
1227        This test will fail as the problem was only fixed for culverts.
1228        """
1229
1230        from math import pi, cos, sin
1231
1232        length = 20.
1233        width = 10.
1234
1235        dx = dy = 2    # 1 or 2 OK
1236        points, vertices, boundary = rectangular_cross(int(length/dx),
1237                                                       int(width/dy),
1238                                                       len1=length,
1239                                                       len2=width)
1240        domain = Domain(points, vertices, boundary)
1241        domain.set_name('test_inflow_conservation')    # Output name
1242        domain.set_default_order(2)
1243
1244        # Flat surface with 1m of water
1245        stage = 1.0
1246        domain.set_quantity('elevation', 0)
1247        domain.set_quantity('stage', stage)
1248        domain.set_quantity('friction', 0)
1249
1250        Br = Reflective_boundary(domain)
1251        domain.set_boundary({'left': Br, 'right': Br, 'bottom': Br, 'top': Br})
1252
1253        # Setup one forcing term, constant inflow of 2 m^3/s on a circle
1254        domain.forcing_terms = []
1255        domain.forcing_terms.append(Inflow(domain, rate=2.0,
1256                                           center=(5,5), radius=1))
1257
1258        domain.compute_forcing_terms()
1259
1260        # Check that update values are correct
1261        for x in domain.quantities['stage'].explicit_update:
1262            assert num.allclose(x, 2.0/pi) or num.allclose(x, 0.0)
1263
1264        # Check volumes without inflow
1265        domain.forcing_terms = []
1266        initial_volume = domain.quantities['stage'].get_integral()
1267
1268        assert num.allclose(initial_volume, width*length*stage)
1269
1270        for t in domain.evolve(yieldstep = 0.05, finaltime = 5.0):
1271            volume = domain.quantities['stage'].get_integral()
1272            assert num.allclose(volume, initial_volume)
1273
1274        # Now apply the inflow and check volumes for a range of stage values
1275        for stage in [2.0, 1.0, 0.5, 0.25, 0.1, 0.0]:
1276            domain.time = 0.0
1277            domain.set_quantity('stage', stage)
1278            domain.forcing_terms = []
1279            domain.forcing_terms.append(Inflow(domain, rate=2.0,
1280                                               center=(5,5), radius=1))
1281            initial_volume = domain.quantities['stage'].get_integral()
1282            predicted_volume = initial_volume
1283            dt = 0.05
1284            for t in domain.evolve(yieldstep=dt, finaltime=5.0):
1285                volume = domain.quantities['stage'].get_integral()
1286                assert num.allclose (volume, predicted_volume)
1287                predicted_volume = predicted_volume + 2.0/pi/100/dt # Why 100?
1288
1289        # Apply equivalent outflow only and check volumes
1290        # for a range of stage values
1291        for stage in [2.0, 1.0, 0.5, 0.25, 0.1, 0.0]:
1292            domain.time = 0.0
1293            domain.set_quantity('stage', stage)
1294            domain.forcing_terms = []
1295            domain.forcing_terms.append(Inflow(domain, rate=-2.0,
1296                                               center=(15,5), radius=1))
1297            initial_volume = domain.quantities['stage'].get_integral()
1298            predicted_volume = initial_volume
1299            dt = 0.05
1300            for t in domain.evolve(yieldstep=dt, finaltime=5.0):
1301                volume = domain.quantities['stage'].get_integral()
1302                assert num.allclose (volume, predicted_volume)
1303                predicted_volume = predicted_volume - 2.0/pi/100/dt # Why 100?
1304
1305        # Apply both inflow and outflow and check volumes being constant for a
1306        # range of stage values
1307        for stage in [2.0, 1.0, 0.5, 0.25, 0.1, 0.0]:
1308            domain.time = 0.0
1309            domain.set_quantity('stage', stage)
1310            domain.forcing_terms = []
1311            domain.forcing_terms.append(Inflow(domain, rate=2.0,
1312                                               center=(5,5), radius=1))
1313            domain.forcing_terms.append(Inflow(domain, rate=-2.0,
1314                                               center=(15,5), radius=1))
1315            initial_volume = domain.quantities['stage'].get_integral()
1316
1317            dt = 0.05
1318            for t in domain.evolve(yieldstep=dt, finaltime=5.0):
1319                volume = domain.quantities['stage'].get_integral()
1320                assert num.allclose(volume, initial_volume)
1321
1322    #####################################################
1323
1324       
1325    def test_inflow_using_flowline(self):
1326        """test_inflow_using_flowline
1327
1328        Test the ability of a flowline to match inflow above the flowline by
1329        creating constant inflow onto a circle at the head of a 20m
1330        wide by 300m long plane dipping at various slopes with a
1331        perpendicular flowline and gauge downstream of the inflow and
1332        a 45 degree flowlines at 200m downstream.
1333
1334        A more substantial version of this test with finer resolution and
1335        including the depth calculation using Manning's equation is
1336        available under the validate_all suite in the directory
1337        anuga_validation/automated_validation_tests/flow_tests.
1338        """
1339
1340       
1341        verbose = False
1342       
1343        #----------------------------------------------------------------------
1344        # Setup computational domain
1345        #----------------------------------------------------------------------
1346        number_of_inflows = 2 # Number of inflows on top of each other
1347        finaltime = 500 #700.0 # If this is too short, steady state will not be achieved
1348
1349        length = 250.
1350        width  = 20.
1351        dx = dy = 5                 # Resolution: of grid on both axes
1352
1353        points, vertices, boundary = rectangular_cross(int(length/dx),
1354                                                       int(width/dy),
1355                                                       len1=length,
1356                                                       len2=width)
1357
1358        for mannings_n in [0.1, 0.01]:
1359            # Loop over a range of roughnesses             
1360           
1361            for slope in [1.0/300, 1.0/100]:
1362                # Loop over a range of bedslopes representing
1363                # sub to super critical flows
1364
1365
1366                domain = Domain(points, vertices, boundary)   
1367                domain.set_name('inflow_flowline_test')     # Output name
1368
1369                #--------------------------------------------------------------
1370                # Setup initial conditions
1371                #--------------------------------------------------------------
1372
1373                def topography(x, y):
1374                    z = -x * slope
1375                    return z
1376
1377                # Use function for elevation
1378                domain.set_quantity('elevation', topography)
1379                # Constant friction of conc surface   
1380                domain.set_quantity('friction', mannings_n)
1381                # Dry initial condition
1382                domain.set_quantity('stage', expression='elevation')
1383
1384                #--------------------------------------------------------------
1385                # Setup Inflow
1386                #--------------------------------------------------------------
1387
1388                # Fixed Flowrate onto Area
[7866]1389                fixed_inflow = anuga.Inflow(domain,
[7559]1390                                      center=(10.0, 10.0),
1391                                      radius=5.00,
1392                                      rate=10.00)   
1393
1394                # Stack this flow
1395                for i in range(number_of_inflows):
1396                    domain.forcing_terms.append(fixed_inflow)
1397               
[7869]1398                ref_flow = fixed_inflow.rate*number_of_inflows
[7559]1399
1400                # Compute normal depth on plane using Mannings equation
1401                # v=1/n*(r^2/3)*(s^0.5) or r=(Q*n/(s^0.5*W))^0.6
1402                normal_depth=(ref_flow*mannings_n/(slope**0.5*width))**0.6
1403                if verbose:
1404                    print
1405                    print 'Slope:', slope, 'Mannings n:', mannings_n
1406                   
1407                   
1408                #--------------------------------------------------------------
1409                # Setup boundary conditions
1410                #--------------------------------------------------------------
1411
[7869]1412                Br = anuga.Reflective_boundary(domain)
[7559]1413               
1414                # Define downstream boundary based on predicted depth
1415                def normal_depth_stage_downstream(t):
1416                    return (-slope*length) + normal_depth
1417               
[7869]1418                Bt = anuga.Transmissive_momentum_set_stage_boundary(
1419                        domain=domain, function=normal_depth_stage_downstream)
[7559]1420
1421                domain.set_boundary({'left': Br,
1422                                     'right': Bt,
1423                                     'top': Br,
1424                                     'bottom': Br})
1425
1426
1427                #--------------------------------------------------------------
1428                # Evolve system through time
1429                #--------------------------------------------------------------
1430
1431                for t in domain.evolve(yieldstep=100.0, finaltime=finaltime):
1432                    pass
1433                    #if verbose :
1434                    #    print domain.timestepping_statistics()
1435
1436                    #    print domain.volumetric_balance_statistics()                                   
1437
1438
1439                #--------------------------------------------------------------
1440                # Compute flow thru flowlines ds of inflow
1441                #--------------------------------------------------------------
1442                   
1443                # Square on flowline at 200m
1444                q = domain.get_flow_through_cross_section([[200.0,  0.0],
1445                                                           [200.0, 20.0]])
1446                if verbose:
1447                    print ('90 degree flowline: ANUGA = %f, Ref = %f'
1448                           % (q, ref_flow))
1449
1450                msg = ('Predicted flow was %f, should have been %f'
1451                       % (q, ref_flow))
1452                assert num.allclose(q, ref_flow, rtol=1.0e-2), msg         
1453
1454                           
1455                # 45 degree flowline at 200m
1456                q = domain.get_flow_through_cross_section([[200.0,  0.0],
1457                                                           [220.0, 20.0]])
1458                if verbose:
1459                    print ('45 degree flowline: ANUGA = %f, Ref = %f'
1460                           % (q, ref_flow))
1461                   
1462                msg = ('Predicted flow was %f, should have been %f'
1463                       % (q, ref_flow))
1464                assert num.allclose(q, ref_flow, rtol=1.0e-2), msg         
1465
1466        os.remove('inflow_flowline_test.sww')
1467
1468       
1469    def Xtest_inflow_boundary_using_flowline(self):
1470        """test_inflow_boundary_using_flowline
1471        Test the ability of a flowline to match inflow above the flowline by
1472        creating constant inflow into the boundary at the head of a 20m
1473        wide by 300m long plane dipping at various slopes with a
1474        perpendicular flowline and gauge downstream of the inflow and
1475        a 45 degree flowlines at 200m downstream
1476       
1477       
1478        """
1479
1480        # FIXME (Ole): Work in progress
1481       
1482        verbose = False
1483       
1484
1485        #----------------------------------------------------------------------
1486        # Import necessary modules
1487        #----------------------------------------------------------------------
1488        from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
1489        from anuga.shallow_water import Domain
1490        from anuga.shallow_water.shallow_water_domain import Reflective_boundary
1491        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
[7733]1492        from anuga.shallow_water.forcing import Inflow_boundary
[7559]1493        from anuga.shallow_water.data_manager import get_flow_through_cross_section
1494        from anuga.abstract_2d_finite_volumes.util import sww2csv_gauges, csv2timeseries_graphs
1495
1496
1497        #----------------------------------------------------------------------
1498        # Setup computational domain
1499        #----------------------------------------------------------------------
1500
1501        finaltime = 500 #700.0 # If this is too short, steady state will not be achieved
1502
1503        length = 250.
1504        width  = 20.
1505        dx = dy = 5          # Resolution: of grid on both axes
1506
1507        points, vertices, boundary = rectangular_cross(int(length/dx), int(width/dy),
1508                                                       len1=length, len2=width)
1509
1510        for mannings_n in [0.1, 0.01]:
1511            # Loop over a range of roughnesses             
1512           
1513            for slope in [1.0/300, 1.0/100]:
1514                # Loop over a range of bedslopes representing sub to super critical flows
1515
1516
1517                domain = Domain(points, vertices, boundary)   
1518                domain.set_name('inflow_boundary_flowline_test')
1519               
1520
1521                #-------------------------------------------------------------
1522                # Setup initial conditions
1523                #-------------------------------------------------------------
1524
1525                def topography(x, y):
1526                    z=-x * slope
1527                    return z
1528
1529                domain.set_quantity('elevation', topography)
1530                domain.set_quantity('friction', mannings_n)
1531                domain.set_quantity('stage',
1532                                    expression='elevation')
1533
1534
1535                   
1536                #--------------------------------------------------------------
1537                # Setup boundary conditions
1538                #--------------------------------------------------------------
1539               
1540
1541
1542                ref_flow = 10.00
1543
1544                # Compute normal depth on plane using Mannings equation
1545                # v=1/n*(r^2/3)*(s^0.5) or r=(Q*n/(s^0.5*W))^0.6
1546                normal_depth=(ref_flow*mannings_n/(slope**0.5*width))**0.6
1547                if verbose:
1548                    print
1549                    print 'Slope:', slope, 'Mannings n:', mannings_n
1550                   
1551               
1552               
1553                Bi = Inflow_boundary(domain, rate=ref_flow)
1554
1555                Br = Reflective_boundary(domain)
1556               
1557                # Define downstream boundary based on predicted depth
1558                def normal_depth_stage_downstream(t):
1559                    return (-slope*length) + normal_depth
1560               
1561                Bt = Transmissive_momentum_set_stage_boundary(domain=domain,
1562                                                              function=normal_depth_stage_downstream)
1563               
1564
1565               
1566
1567                domain.set_boundary({'left': Bi,
1568                                     'right': Bt,
1569                                     'top': Br,
1570                                     'bottom': Br})
1571
1572
1573
1574                #--------------------------------------------------------------
1575                # Evolve system through time
1576                #--------------------------------------------------------------
1577
1578
1579                for t in domain.evolve(yieldstep=100.0, finaltime=finaltime):
1580                    pass
1581                    #if verbose :
1582                    #    print domain.timestepping_statistics()
1583                    #    print domain.volumetric_balance_statistics()
1584                                                       
1585
1586
1587                #--------------------------------------------------------------
1588                # Compute flow thru flowlines ds of inflow
1589                #--------------------------------------------------------------
1590                   
1591                # Square on flowline at 200m
1592                q=domain.get_flow_through_cross_section([[200.0,0.0],[200.0,20.0]])
1593                msg = 'Predicted flow was %f, should have been %f' % (q, ref_flow)
1594                if verbose:
1595                    print '90 degree flowline: ANUGA = %f, Ref = %f' % (q, ref_flow)
1596                assert num.allclose(q, ref_flow, rtol=1.0e-2), msg         
1597
1598                           
1599                # 45 degree flowline at 200m
1600                q=domain.get_flow_through_cross_section([[200.0,0.0],[220.0,20.0]])
1601                msg = 'Predicted flow was %f, should have been %f' % (q, ref_flow)
1602                if verbose:
1603                    print '45 degree flowline: ANUGA = %f, Ref = %f' % (q, ref_flow)
1604                   
1605                assert num.allclose(q, ref_flow, rtol=1.0e-2), msg         
1606
1607       
[7869]1608 ################################################################################
[7559]1609
1610if __name__ == "__main__":
[7562]1611    suite = unittest.makeSuite(Test_swb_forcing_terms, 'test')
[7559]1612    runner = unittest.TextTestRunner(verbosity=1)
1613    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.