source: anuga_core/source/anuga/shallow_water_balanced/test_swb_forcing_terms.py @ 7562

Last change on this file since 7562 was 7562, checked in by steve, 15 years ago

Updating the balanced and parallel code

File size: 62.2 KB
Line 
1#!/usr/bin/env python
2
3import unittest, os
4import os.path
5from math import pi, sqrt
6import tempfile
7
8from anuga.config import g, epsilon
9from anuga.config import netcdf_mode_r, netcdf_mode_w, netcdf_mode_a
10from anuga.utilities.numerical_tools import mean
11from anuga.utilities.polygon import is_inside_polygon
12from anuga.coordinate_transforms.geo_reference import Geo_reference
13from anuga.abstract_2d_finite_volumes.quantity import Quantity
14from anuga.geospatial_data.geospatial_data import Geospatial_data
15from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
16
17from anuga.utilities.system_tools import get_pathname_from_package
18from swb_domain import *
19
20import numpy as num
21
22# Get gateway to C implementation of flux function for direct testing
23from shallow_water_ext import flux_function_central as flux_function
24
25
26# Variable windfield implemented using functions
27def speed(t, x, y):
28    """Large speeds halfway between center and edges
29
30    Low speeds at center and edges
31    """
32
33    from math import exp, cos, pi
34
35    x = num.array(x)
36    y = num.array(y)
37
38    N = len(x)
39    s = 0*#New array
40
41    for k in range(N):
42        r = num.sqrt(x[k]**2 + y[k]**2)
43        factor = exp(-(r-0.15)**2)
44        s[k] = 4000 * factor * (cos(t*2*pi/150) + 2)
45
46    return s
47
48def scalar_func(t, x, y):
49    """Function that returns a scalar.
50
51    Used to test error message when numeric array is expected
52    """
53
54    return 17.7
55
56def scalar_func_list(t, x, y):
57    """Function that returns a scalar.
58
59    Used to test error message when numeric array is expected
60    """
61
62    return [17.7]
63
64
65def angle(t, x, y):
66    """Rotating field
67    """
68    from math import atan, pi
69
70    x = num.array(x)
71    y = num.array(y)
72
73    N = len(x)
74    a = 0 * x    # New array
75
76    for k in range(N):
77        r = num.sqrt(x[k]**2 + y[k]**2)
78
79        angle = atan(y[k]/x[k])
80
81        if x[k] < 0:
82            angle += pi
83
84        # Take normal direction
85        angle -= pi/2
86
87        # Ensure positive radians
88        if angle < 0:
89            angle += 2*pi
90
91        a[k] = angle/pi*180
92
93    return a
94
95
96###############################################################################
97
98class Test_swb_forcing_terms(unittest.TestCase):
99    def setUp(self):
100        pass
101
102    def tearDown(self):
103        pass
104
105    def test_gravity(self):
106        #Assuming no friction
107
108        from anuga.config import g
109
110        a = [0.0, 0.0]
111        b = [0.0, 2.0]
112        c = [2.0, 0.0]
113        d = [0.0, 4.0]
114        e = [2.0, 2.0]
115        f = [4.0, 0.0]
116
117        points = [a, b, c, d, e, f]
118        #             bac,     bce,     ecf,     dbe
119        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
120
121        domain = Domain(points, vertices)
122
123        #Set up for a gradient of (3,0) at mid triangle (bce)
124        def slope(x, y):
125            return 3*x
126
127        h = 0.1
128        def stage(x, y):
129            return slope(x, y) + h
130
131        domain.set_quantity('elevation', slope)
132        domain.set_quantity('stage', stage)
133
134        for name in domain.conserved_quantities:
135            assert num.allclose(domain.quantities[name].explicit_update, 0)
136            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
137
138        domain.compute_forcing_terms()
139
140        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
141        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
142                            -g*h*3)
143        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
144
145    def test_manning_friction(self):
146        from anuga.config import g
147
148        a = [0.0, 0.0]
149        b = [0.0, 2.0]
150        c = [2.0, 0.0]
151        d = [0.0, 4.0]
152        e = [2.0, 2.0]
153        f = [4.0, 0.0]
154
155        points = [a, b, c, d, e, f]
156        #             bac,     bce,     ecf,     dbe
157        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
158
159        domain = Domain(points, vertices)
160
161        #Set up for a gradient of (3,0) at mid triangle (bce)
162        def slope(x, y):
163            return 3*x
164
165        h = 0.1
166        def stage(x, y):
167            return slope(x, y) + h
168
169        eta = 0.07
170        domain.set_quantity('elevation', slope)
171        domain.set_quantity('stage', stage)
172        domain.set_quantity('friction', eta)
173
174        for name in domain.conserved_quantities:
175            assert num.allclose(domain.quantities[name].explicit_update, 0)
176            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
177
178        domain.compute_forcing_terms()
179
180        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
181        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
182                            -g*h*3)
183        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
184
185        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
186        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
187                            0)
188        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
189                            0)
190
191        #Create some momentum for friction to work with
192        domain.set_quantity('xmomentum', 1)
193        dz = sqrt(10.0)
194        S = -g*eta**2 *dz / h**(7.0/3) 
195
196        domain.compute_forcing_terms()
197        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
198        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
199                            S)
200        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
201                            0)
202
203        #A more complex example
204        domain.quantities['stage'].semi_implicit_update[:] = 0.0
205        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
206        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
207
208        domain.set_quantity('xmomentum', 3)
209        domain.set_quantity('ymomentum', 4)
210
211        S = -g*eta**2*5*dz / h**(7.0/3) 
212
213        domain.compute_forcing_terms()
214
215        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
216        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
217                            3*S)
218        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
219                            4*S)
220
221
222
223    def test_manning_friction_old(self):
224        from anuga.config import g
225
226        a = [0.0, 0.0]
227        b = [0.0, 2.0]
228        c = [2.0, 0.0]
229        d = [0.0, 4.0]
230        e = [2.0, 2.0]
231        f = [4.0, 0.0]
232
233        points = [a, b, c, d, e, f]
234        #             bac,     bce,     ecf,     dbe
235        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
236
237        domain = Domain(points, vertices)
238
239        #Turn old mannings function on
240        domain.set_new_mannings_function(False)
241
242        #Set up for a gradient of (3,0) at mid triangle (bce)
243        def slope(x, y):
244            return 3*x
245
246        h = 0.1
247        def stage(x, y):
248            return slope(x, y) + h
249
250        eta = 0.07
251        domain.set_quantity('elevation', slope)
252        domain.set_quantity('stage', stage)
253        domain.set_quantity('friction', eta)
254
255        for name in domain.conserved_quantities:
256            assert num.allclose(domain.quantities[name].explicit_update, 0)
257            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
258
259        domain.compute_forcing_terms()
260
261        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
262        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
263                            -g*h*3)
264        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
265
266        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
267        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
268                            0)
269        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
270                            0)
271
272        #Create some momentum for friction to work with
273        domain.set_quantity('xmomentum', 1)
274        S = -g*eta**2 / h**(7.0/3)
275
276        domain.compute_forcing_terms()
277        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
278        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
279                            S)
280        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
281                            0)
282
283        #A more complex example
284        domain.quantities['stage'].semi_implicit_update[:] = 0.0
285        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
286        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
287
288        domain.set_quantity('xmomentum', 3)
289        domain.set_quantity('ymomentum', 4)
290
291        S = -g*eta**2*5 / h**(7.0/3)
292
293        domain.compute_forcing_terms()
294
295        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
296        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
297                            3*S)
298        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
299                            4*S)
300
301
302    def test_manning_friction_new(self):
303        from anuga.config import g
304
305        a = [0.0, 0.0]
306        b = [0.0, 2.0]
307        c = [2.0, 0.0]
308        d = [0.0, 4.0]
309        e = [2.0, 2.0]
310        f = [4.0, 0.0]
311
312        points = [a, b, c, d, e, f]
313        #             bac,     bce,     ecf,     dbe
314        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
315
316        domain = Domain(points, vertices)
317
318        # Use the new function which takes into account the extra
319        # wetted area due to slope of bed
320        domain.set_new_mannings_function(True)
321       
322        #Set up for a gradient of (3,0) at mid triangle (bce)
323        def slope(x, y):
324            return 3*x
325
326        h = 0.1
327        def stage(x, y):
328            return slope(x, y) + h
329
330        eta = 0.07
331        domain.set_quantity('elevation', slope)
332        domain.set_quantity('stage', stage)
333        domain.set_quantity('friction', eta)
334
335        for name in domain.conserved_quantities:
336            assert num.allclose(domain.quantities[name].explicit_update, 0)
337            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
338
339        domain.compute_forcing_terms()
340
341        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
342        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
343                            -g*h*3)
344        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
345
346        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
347        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
348                            0)
349        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
350                            0)
351
352        #Create some momentum for friction to work with
353        domain.set_quantity('xmomentum', 1)
354        S = -g*eta**2 / h**(7.0/3) * sqrt(10)
355
356        domain.compute_forcing_terms()
357        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
358        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
359                            S)
360        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
361                            0)
362
363        #A more complex example
364        domain.quantities['stage'].semi_implicit_update[:] = 0.0
365        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
366        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
367
368        domain.set_quantity('xmomentum', 3)
369        domain.set_quantity('ymomentum', 4)
370
371        S = -g*eta**2*5 / h**(7.0/3) * sqrt(10.0)
372
373        domain.compute_forcing_terms()
374
375        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
376        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
377                            3*S)
378        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
379                            4*S)
380       
381    def test_constant_wind_stress(self):
382        from anuga.config import rho_a, rho_w, eta_w
383        from math import pi, cos, sin
384
385        a = [0.0, 0.0]
386        b = [0.0, 2.0]
387        c = [2.0, 0.0]
388        d = [0.0, 4.0]
389        e = [2.0, 2.0]
390        f = [4.0, 0.0]
391
392        points = [a, b, c, d, e, f]
393        #             bac,     bce,     ecf,     dbe
394        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
395
396        domain = Domain(points, vertices)
397
398        #Flat surface with 1m of water
399        domain.set_quantity('elevation', 0)
400        domain.set_quantity('stage', 1.0)
401        domain.set_quantity('friction', 0)
402
403        Br = Reflective_boundary(domain)
404        domain.set_boundary({'exterior': Br})
405
406        #Setup only one forcing term, constant wind stress
407        s = 100
408        phi = 135
409        domain.forcing_terms = []
410        domain.forcing_terms.append(Wind_stress(s, phi))
411
412        domain.compute_forcing_terms()
413
414        const = eta_w*rho_a / rho_w
415
416        #Convert to radians
417        phi = phi*pi / 180
418
419        #Compute velocity vector (u, v)
420        u = s*cos(phi)
421        v = s*sin(phi)
422
423        #Compute wind stress
424        S = const * num.sqrt(u**2 + v**2)
425
426        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
427        assert num.allclose(domain.quantities['xmomentum'].explicit_update, S*u)
428        assert num.allclose(domain.quantities['ymomentum'].explicit_update, S*v)
429
430    def test_variable_wind_stress(self):
431        from anuga.config import rho_a, rho_w, eta_w
432        from math import pi, cos, sin
433
434        a = [0.0, 0.0]
435        b = [0.0, 2.0]
436        c = [2.0, 0.0]
437        d = [0.0, 4.0]
438        e = [2.0, 2.0]
439        f = [4.0, 0.0]
440
441        points = [a, b, c, d, e, f]
442        #             bac,     bce,     ecf,     dbe
443        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
444
445        domain = Domain(points, vertices)
446
447        #Flat surface with 1m of water
448        domain.set_quantity('elevation', 0)
449        domain.set_quantity('stage', 1.0)
450        domain.set_quantity('friction', 0)
451
452        Br = Reflective_boundary(domain)
453        domain.set_boundary({'exterior': Br})
454
455        domain.time = 5.54    # Take a random time (not zero)
456
457        #Setup only one forcing term, constant wind stress
458        s = 100
459        phi = 135
460        domain.forcing_terms = []
461        domain.forcing_terms.append(Wind_stress(s=speed, phi=angle))
462
463        domain.compute_forcing_terms()
464
465        #Compute reference solution
466        const = eta_w*rho_a / rho_w
467
468        N = len(domain)    # number_of_triangles
469
470        xc = domain.get_centroid_coordinates()
471        t = domain.time
472
473        x = xc[:,0]
474        y = xc[:,1]
475        s_vec = speed(t,x,y)
476        phi_vec = angle(t,x,y)
477
478        for k in range(N):
479            # Convert to radians
480            phi = phi_vec[k]*pi / 180
481            s = s_vec[k]
482
483            # Compute velocity vector (u, v)
484            u = s*cos(phi)
485            v = s*sin(phi)
486
487            # Compute wind stress
488            S = const * num.sqrt(u**2 + v**2)
489
490            assert num.allclose(domain.quantities['stage'].explicit_update[k],
491                                0)
492            assert num.allclose(domain.quantities['xmomentum'].\
493                                     explicit_update[k],
494                                S*u)
495            assert num.allclose(domain.quantities['ymomentum'].\
496                                     explicit_update[k],
497                                S*v)
498
499    def test_windfield_from_file(self):
500        import time
501        from anuga.config import rho_a, rho_w, eta_w
502        from math import pi, cos, sin
503        from anuga.config import time_format
504        from anuga.abstract_2d_finite_volumes.util import file_function
505
506        a = [0.0, 0.0]
507        b = [0.0, 2.0]
508        c = [2.0, 0.0]
509        d = [0.0, 4.0]
510        e = [2.0, 2.0]
511        f = [4.0, 0.0]
512
513        points = [a, b, c, d, e, f]
514        #             bac,     bce,     ecf,     dbe
515        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
516
517        domain = Domain(points, vertices)
518
519        # Flat surface with 1m of water
520        domain.set_quantity('elevation', 0)
521        domain.set_quantity('stage', 1.0)
522        domain.set_quantity('friction', 0)
523
524        Br = Reflective_boundary(domain)
525        domain.set_boundary({'exterior': Br})
526
527        domain.time = 7    # Take a time that is represented in file (not zero)
528
529        # Write wind stress file (ensure that domain.time is covered)
530        # Take x=1 and y=0
531        filename = 'test_windstress_from_file'
532        start = time.mktime(time.strptime('2000', '%Y'))
533        fid = open(filename + '.txt', 'w')
534        dt = 1    # One second interval
535        t = 0.0
536        while t <= 10.0:
537            t_string = time.strftime(time_format, time.gmtime(t+start))
538
539            fid.write('%s, %f %f\n' %
540                      (t_string, speed(t,[1],[0])[0], angle(t,[1],[0])[0]))
541            t += dt
542
543        fid.close()
544
545        # Convert ASCII file to NetCDF (Which is what we really like!)
546        from data_manager import timefile2netcdf
547
548        timefile2netcdf(filename)
549        os.remove(filename + '.txt')
550
551        # Setup wind stress
552        F = file_function(filename + '.tms',
553                          quantities=['Attribute0', 'Attribute1'])
554        os.remove(filename + '.tms')
555
556        W = Wind_stress(F)
557
558        domain.forcing_terms = []
559        domain.forcing_terms.append(W)
560
561        domain.compute_forcing_terms()
562
563        # Compute reference solution
564        const = eta_w*rho_a / rho_w
565
566        N = len(domain)    # number_of_triangles
567
568        t = domain.time
569
570        s = speed(t, [1], [0])[0]
571        phi = angle(t, [1], [0])[0]
572
573        # Convert to radians
574        phi = phi*pi / 180
575
576        # Compute velocity vector (u, v)
577        u = s*cos(phi)
578        v = s*sin(phi)
579
580        # Compute wind stress
581        S = const * num.sqrt(u**2 + v**2)
582
583        for k in range(N):
584            assert num.allclose(domain.quantities['stage'].explicit_update[k],
585                                0)
586            assert num.allclose(domain.quantities['xmomentum'].\
587                                    explicit_update[k],
588                                S*u)
589            assert num.allclose(domain.quantities['ymomentum'].\
590                                    explicit_update[k],
591                                S*v)
592
593    def test_windfield_from_file_seconds(self):
594        import time
595        from anuga.config import rho_a, rho_w, eta_w
596        from math import pi, cos, sin
597        from anuga.config import time_format
598        from anuga.abstract_2d_finite_volumes.util import file_function
599
600        a = [0.0, 0.0]
601        b = [0.0, 2.0]
602        c = [2.0, 0.0]
603        d = [0.0, 4.0]
604        e = [2.0, 2.0]
605        f = [4.0, 0.0]
606
607        points = [a, b, c, d, e, f]
608        #             bac,     bce,     ecf,     dbe
609        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
610
611        domain = Domain(points, vertices)
612
613        # Flat surface with 1m of water
614        domain.set_quantity('elevation', 0)
615        domain.set_quantity('stage', 1.0)
616        domain.set_quantity('friction', 0)
617
618        Br = Reflective_boundary(domain)
619        domain.set_boundary({'exterior': Br})
620
621        domain.time = 7    # Take a time that is represented in file (not zero)
622
623        # Write wind stress file (ensure that domain.time is covered)
624        # Take x=1 and y=0
625        filename = 'test_windstress_from_file'
626        start = time.mktime(time.strptime('2000', '%Y'))
627        fid = open(filename + '.txt', 'w')
628        dt = 0.5    # Half second interval
629        t = 0.0
630        while t <= 10.0:
631            fid.write('%s, %f %f\n'
632                      % (str(t), speed(t, [1], [0])[0], angle(t, [1], [0])[0]))
633            t += dt
634
635        fid.close()
636
637        # Convert ASCII file to NetCDF (Which is what we really like!)
638        from data_manager import timefile2netcdf
639
640        timefile2netcdf(filename, time_as_seconds=True)
641        os.remove(filename + '.txt')
642
643        # Setup wind stress
644        F = file_function(filename + '.tms',
645                          quantities=['Attribute0', 'Attribute1'])
646        os.remove(filename + '.tms')
647
648        W = Wind_stress(F)
649
650        domain.forcing_terms = []
651        domain.forcing_terms.append(W)
652
653        domain.compute_forcing_terms()
654
655        # Compute reference solution
656        const = eta_w*rho_a / rho_w
657
658        N = len(domain)    # number_of_triangles
659
660        t = domain.time
661
662        s = speed(t, [1], [0])[0]
663        phi = angle(t, [1], [0])[0]
664
665        # Convert to radians
666        phi = phi*pi / 180
667
668        # Compute velocity vector (u, v)
669        u = s*cos(phi)
670        v = s*sin(phi)
671
672        # Compute wind stress
673        S = const * num.sqrt(u**2 + v**2)
674
675        for k in range(N):
676            assert num.allclose(domain.quantities['stage'].explicit_update[k],
677                                0)
678            assert num.allclose(domain.quantities['xmomentum'].\
679                                    explicit_update[k],
680                                S*u)
681            assert num.allclose(domain.quantities['ymomentum'].\
682                                    explicit_update[k],
683                                S*v)
684
685    def test_wind_stress_error_condition(self):
686        """Test that windstress reacts properly when forcing functions
687        are wrong - e.g. returns a scalar
688        """
689
690        from math import pi, cos, sin
691        from anuga.config import rho_a, rho_w, eta_w
692
693        a = [0.0, 0.0]
694        b = [0.0, 2.0]
695        c = [2.0, 0.0]
696        d = [0.0, 4.0]
697        e = [2.0, 2.0]
698        f = [4.0, 0.0]
699
700        points = [a, b, c, d, e, f]
701        #             bac,     bce,     ecf,     dbe
702        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
703
704        domain = Domain(points, vertices)
705
706        # Flat surface with 1m of water
707        domain.set_quantity('elevation', 0)
708        domain.set_quantity('stage', 1.0)
709        domain.set_quantity('friction', 0)
710
711        Br = Reflective_boundary(domain)
712        domain.set_boundary({'exterior': Br})
713
714        domain.time = 5.54    # Take a random time (not zero)
715
716        # Setup only one forcing term, bad func
717        domain.forcing_terms = []
718
719        try:
720            domain.forcing_terms.append(Wind_stress(s=scalar_func_list,
721                                                    phi=angle))
722        except AssertionError:
723            pass
724        else:
725            msg = 'Should have raised exception'
726            raise Exception, msg
727
728        try:
729            domain.forcing_terms.append(Wind_stress(s=speed, phi=scalar_func))
730        except Exception:
731            pass
732        else:
733            msg = 'Should have raised exception'
734            raise Exception, msg
735
736        try:
737            domain.forcing_terms.append(Wind_stress(s=speed, phi='xx'))
738        except:
739            pass
740        else:
741            msg = 'Should have raised exception'
742            raise Exception, msg
743
744    def test_rainfall(self):
745        from math import pi, cos, sin
746
747        a = [0.0, 0.0]
748        b = [0.0, 2.0]
749        c = [2.0, 0.0]
750        d = [0.0, 4.0]
751        e = [2.0, 2.0]
752        f = [4.0, 0.0]
753
754        points = [a, b, c, d, e, f]
755        #             bac,     bce,     ecf,     dbe
756        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
757
758        domain = Domain(points, vertices)
759
760        # Flat surface with 1m of water
761        domain.set_quantity('elevation', 0)
762        domain.set_quantity('stage', 1.0)
763        domain.set_quantity('friction', 0)
764
765        Br = Reflective_boundary(domain)
766        domain.set_boundary({'exterior': Br})
767
768        # Setup only one forcing term, constant rainfall
769        domain.forcing_terms = []
770        domain.forcing_terms.append(Rainfall(domain, rate=2.0))
771
772        domain.compute_forcing_terms()
773        assert num.allclose(domain.quantities['stage'].explicit_update,
774                            2.0/1000)
775
776    def test_rainfall_restricted_by_polygon(self):
777        from math import pi, cos, sin
778
779        a = [0.0, 0.0]
780        b = [0.0, 2.0]
781        c = [2.0, 0.0]
782        d = [0.0, 4.0]
783        e = [2.0, 2.0]
784        f = [4.0, 0.0]
785
786        points = [a, b, c, d, e, f]
787        #             bac,     bce,     ecf,     dbe
788        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
789
790        domain = Domain(points, vertices)
791
792        # Flat surface with 1m of water
793        domain.set_quantity('elevation', 0)
794        domain.set_quantity('stage', 1.0)
795        domain.set_quantity('friction', 0)
796
797        Br = Reflective_boundary(domain)
798        domain.set_boundary({'exterior': Br})
799
800        # Setup only one forcing term, constant rainfall
801        # restricted to a polygon enclosing triangle #1 (bce)
802        domain.forcing_terms = []
803        R = Rainfall(domain, rate=2.0, polygon=[[1,1], [2,1], [2,2], [1,2]])
804
805        assert num.allclose(R.exchange_area, 2)
806
807        domain.forcing_terms.append(R)
808
809        domain.compute_forcing_terms()
810
811        assert num.allclose(domain.quantities['stage'].explicit_update[1],
812                            2.0/1000)
813        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
814        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
815
816    def test_time_dependent_rainfall_restricted_by_polygon(self):
817        a = [0.0, 0.0]
818        b = [0.0, 2.0]
819        c = [2.0, 0.0]
820        d = [0.0, 4.0]
821        e = [2.0, 2.0]
822        f = [4.0, 0.0]
823
824        points = [a, b, c, d, e, f]
825        #             bac,     bce,     ecf,     dbe
826        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
827
828        domain = Domain(points, vertices)
829
830        # Flat surface with 1m of water
831        domain.set_quantity('elevation', 0)
832        domain.set_quantity('stage', 1.0)
833        domain.set_quantity('friction', 0)
834
835        Br = Reflective_boundary(domain)
836        domain.set_boundary({'exterior': Br})
837
838        # Setup only one forcing term, time dependent rainfall
839        # restricted to a polygon enclosing triangle #1 (bce)
840        domain.forcing_terms = []
841        R = Rainfall(domain,
842                     rate=lambda t: 3*t + 7,
843                     polygon = [[1,1], [2,1], [2,2], [1,2]])
844
845        assert num.allclose(R.exchange_area, 2)
846       
847        domain.forcing_terms.append(R)
848
849        domain.time = 10.
850
851        domain.compute_forcing_terms()
852
853        assert num.allclose(domain.quantities['stage'].explicit_update[1],
854                            (3*domain.time + 7)/1000)
855        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
856        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
857
858    def test_time_dependent_rainfall_using_starttime(self):
859        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
860
861        a = [0.0, 0.0]
862        b = [0.0, 2.0]
863        c = [2.0, 0.0]
864        d = [0.0, 4.0]
865        e = [2.0, 2.0]
866        f = [4.0, 0.0]
867
868        points = [a, b, c, d, e, f]
869        #             bac,     bce,     ecf,     dbe
870        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
871
872        domain = Domain(points, vertices)
873
874        # Flat surface with 1m of water
875        domain.set_quantity('elevation', 0)
876        domain.set_quantity('stage', 1.0)
877        domain.set_quantity('friction', 0)
878
879        Br = Reflective_boundary(domain)
880        domain.set_boundary({'exterior': Br})
881
882        # Setup only one forcing term, time dependent rainfall
883        # restricted to a polygon enclosing triangle #1 (bce)
884        domain.forcing_terms = []
885        R = Rainfall(domain,
886                     rate=lambda t: 3*t + 7,
887                     polygon=rainfall_poly)                     
888
889        assert num.allclose(R.exchange_area, 2)
890       
891        domain.forcing_terms.append(R)
892
893        # This will test that time used in the forcing function takes
894        # startime into account.
895        domain.starttime = 5.0
896
897        domain.time = 7.
898
899        domain.compute_forcing_terms()
900
901        assert num.allclose(domain.quantities['stage'].explicit_update[1],
902                            (3*domain.get_time() + 7)/1000)
903        assert num.allclose(domain.quantities['stage'].explicit_update[1],
904                            (3*(domain.time + domain.starttime) + 7)/1000)
905
906        # Using internal time her should fail
907        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
908                                (3*domain.time + 7)/1000)
909
910        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
911        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
912
913    def test_time_dependent_rainfall_using_georef(self):
914        """test_time_dependent_rainfall_using_georef
915
916        This will also test the General forcing term using georef
917        """
918
919        # Mesh in zone 56 (absolute coords)
920        x0 = 314036.58727982
921        y0 = 6224951.2960092
922
923        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
924        rainfall_poly += [x0, y0]
925
926        a = [0.0, 0.0]
927        b = [0.0, 2.0]
928        c = [2.0, 0.0]
929        d = [0.0, 4.0]
930        e = [2.0, 2.0]
931        f = [4.0, 0.0]
932
933        points = [a, b, c, d, e, f]
934        #             bac,     bce,     ecf,     dbe
935        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
936
937        domain = Domain(points, vertices,
938                        geo_reference=Geo_reference(56, x0, y0))
939
940        # Flat surface with 1m of water
941        domain.set_quantity('elevation', 0)
942        domain.set_quantity('stage', 1.0)
943        domain.set_quantity('friction', 0)
944
945        Br = Reflective_boundary(domain)
946        domain.set_boundary({'exterior': Br})
947
948        # Setup only one forcing term, time dependent rainfall
949        # restricted to a polygon enclosing triangle #1 (bce)
950        domain.forcing_terms = []
951        R = Rainfall(domain,
952                     rate=lambda t: 3*t + 7,
953                     polygon=rainfall_poly)
954
955        assert num.allclose(R.exchange_area, 2)
956       
957        domain.forcing_terms.append(R)
958
959        # This will test that time used in the forcing function takes
960        # startime into account.
961        domain.starttime = 5.0
962
963        domain.time = 7.
964
965        domain.compute_forcing_terms()
966
967        assert num.allclose(domain.quantities['stage'].explicit_update[1],
968                            (3*domain.get_time() + 7)/1000)
969        assert num.allclose(domain.quantities['stage'].explicit_update[1],
970                            (3*(domain.time + domain.starttime) + 7)/1000)
971
972        # Using internal time her should fail
973        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
974                                (3*domain.time + 7)/1000)
975
976        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
977        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
978
979    def test_time_dependent_rainfall_restricted_by_polygon_with_default(self):
980        """
981        Test that default rainfall can be used when given rate runs out of data.
982        """
983
984        a = [0.0, 0.0]
985        b = [0.0, 2.0]
986        c = [2.0, 0.0]
987        d = [0.0, 4.0]
988        e = [2.0, 2.0]
989        f = [4.0, 0.0]
990
991        points = [a, b, c, d, e, f]
992        #             bac,     bce,     ecf,     dbe
993        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
994
995        domain = Domain(points, vertices)
996
997        # Flat surface with 1m of water
998        domain.set_quantity('elevation', 0)
999        domain.set_quantity('stage', 1.0)
1000        domain.set_quantity('friction', 0)
1001
1002        Br = Reflective_boundary(domain)
1003        domain.set_boundary({'exterior': Br})
1004
1005        # Setup only one forcing term, time dependent rainfall
1006        # that expires at t==20
1007        from anuga.fit_interpolate.interpolate import Modeltime_too_late
1008
1009        def main_rate(t):
1010            if t > 20:
1011                msg = 'Model time exceeded.'
1012                raise Modeltime_too_late, msg
1013            else:
1014                return 3*t + 7
1015
1016        domain.forcing_terms = []
1017        R = Rainfall(domain,
1018                     rate=main_rate,
1019                     polygon = [[1,1], [2,1], [2,2], [1,2]],
1020                     default_rate=5.0)
1021
1022        assert num.allclose(R.exchange_area, 2)
1023       
1024        domain.forcing_terms.append(R)
1025
1026        domain.time = 10.
1027
1028        domain.compute_forcing_terms()
1029
1030        assert num.allclose(domain.quantities['stage'].explicit_update[1],
1031                            (3*domain.time+7)/1000)
1032        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
1033        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
1034
1035        domain.time = 100.
1036        domain.quantities['stage'].explicit_update[:] = 0.0     # Reset
1037        domain.compute_forcing_terms()
1038
1039        assert num.allclose(domain.quantities['stage'].explicit_update[1],
1040                            5.0/1000) # Default value
1041        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
1042        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
1043
1044    def test_rainfall_forcing_with_evolve(self):
1045        """test_rainfall_forcing_with_evolve
1046
1047        Test how forcing terms are called within evolve
1048        """
1049
1050        # FIXME(Ole): This test is just to experiment
1051
1052        a = [0.0, 0.0]
1053        b = [0.0, 2.0]
1054        c = [2.0, 0.0]
1055        d = [0.0, 4.0]
1056        e = [2.0, 2.0]
1057        f = [4.0, 0.0]
1058
1059        points = [a, b, c, d, e, f]
1060        #             bac,     bce,     ecf,     dbe
1061        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1062
1063        domain = Domain(points, vertices)
1064
1065        # Flat surface with 1m of water
1066        domain.set_quantity('elevation', 0)
1067        domain.set_quantity('stage', 1.0)
1068        domain.set_quantity('friction', 0)
1069
1070        Br = Reflective_boundary(domain)
1071        domain.set_boundary({'exterior': Br})
1072
1073        # Setup only one forcing term, time dependent rainfall
1074        # that expires at t==20
1075        from anuga.fit_interpolate.interpolate import Modeltime_too_late
1076
1077        def main_rate(t):
1078            if t > 20:
1079                msg = 'Model time exceeded.'
1080                raise Modeltime_too_late, msg
1081            else:
1082                return 3*t + 7
1083
1084        domain.forcing_terms = []
1085        R = Rainfall(domain,
1086                     rate=main_rate,
1087                     polygon=[[1,1], [2,1], [2,2], [1,2]],
1088                     default_rate=5.0)
1089
1090        assert num.allclose(R.exchange_area, 2)
1091       
1092        domain.forcing_terms.append(R)
1093
1094        for t in domain.evolve(yieldstep=1, finaltime=25):
1095            pass
1096            #FIXME(Ole):  A test here is hard because explicit_update also
1097            # receives updates from the flux calculation.
1098
1099
1100
1101    def test_inflow_using_circle(self):
1102        from math import pi, cos, sin
1103
1104        a = [0.0, 0.0]
1105        b = [0.0, 2.0]
1106        c = [2.0, 0.0]
1107        d = [0.0, 4.0]
1108        e = [2.0, 2.0]
1109        f = [4.0, 0.0]
1110
1111        points = [a, b, c, d, e, f]
1112        #             bac,     bce,     ecf,     dbe
1113        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1114
1115        domain = Domain(points, vertices)
1116
1117        # Flat surface with 1m of water
1118        domain.set_quantity('elevation', 0)
1119        domain.set_quantity('stage', 1.0)
1120        domain.set_quantity('friction', 0)
1121
1122        Br = Reflective_boundary(domain)
1123        domain.set_boundary({'exterior': Br})
1124
1125        # Setup only one forcing term, constant inflow of 2 m^3/s
1126        # on a circle affecting triangles #0 and #1 (bac and bce)
1127        domain.forcing_terms = []
1128       
1129        I = Inflow(domain, rate=2.0, center=(1,1), radius=1)
1130        domain.forcing_terms.append(I)       
1131        domain.compute_forcing_terms()
1132
1133       
1134        A = I.exchange_area
1135        assert num.allclose(A, 4) # Two triangles       
1136       
1137        assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A)
1138        assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A)
1139        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)       
1140
1141
1142    def test_inflow_using_circle_function(self):
1143        from math import pi, cos, sin
1144
1145        a = [0.0, 0.0]
1146        b = [0.0, 2.0]
1147        c = [2.0, 0.0]
1148        d = [0.0, 4.0]
1149        e = [2.0, 2.0]
1150        f = [4.0, 0.0]
1151
1152        points = [a, b, c, d, e, f]
1153        #             bac,     bce,     ecf,     dbe
1154        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1155
1156        domain = Domain(points, vertices)
1157
1158        # Flat surface with 1m of water
1159        domain.set_quantity('elevation', 0)
1160        domain.set_quantity('stage', 1.0)
1161        domain.set_quantity('friction', 0)
1162
1163        Br = Reflective_boundary(domain)
1164        domain.set_boundary({'exterior': Br})
1165
1166        # Setup only one forcing term, time dependent inflow of 2 m^3/s
1167        # on a circle affecting triangles #0 and #1 (bac and bce)
1168        domain.forcing_terms = []
1169        I = Inflow(domain, rate=lambda t: 2., center=(1,1), radius=1)
1170        domain.forcing_terms.append(I)
1171       
1172        domain.compute_forcing_terms()       
1173
1174        A = I.exchange_area
1175        assert num.allclose(A, 4) # Two triangles
1176       
1177        assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A)
1178        assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A)
1179        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)       
1180       
1181
1182
1183
1184    def test_inflow_catch_too_few_triangles(self):
1185        """
1186        Test that exception is thrown if no triangles are covered
1187        by the inflow area
1188        """
1189
1190        from math import pi, cos, sin
1191
1192        a = [0.0, 0.0]
1193        b = [0.0, 2.0]
1194        c = [2.0, 0.0]
1195        d = [0.0, 4.0]
1196        e = [2.0, 2.0]
1197        f = [4.0, 0.0]
1198
1199        points = [a, b, c, d, e, f]
1200        #             bac,     bce,     ecf,     dbe
1201        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1202
1203        domain = Domain(points, vertices)
1204
1205        # Flat surface with 1m of water
1206        domain.set_quantity('elevation', 0)
1207        domain.set_quantity('stage', 1.0)
1208        domain.set_quantity('friction', 0)
1209
1210        Br = Reflective_boundary(domain)
1211        domain.set_boundary({'exterior': Br})
1212
1213        # Setup only one forcing term, constant inflow of 2 m^3/s
1214        # on a circle affecting triangles #0 and #1 (bac and bce)
1215        try:
1216            Inflow(domain, rate=2.0, center=(1,1.1), radius=0.01)
1217        except:
1218            pass
1219        else:
1220            msg = 'Should have raised exception'
1221            raise Exception, msg
1222
1223    def Xtest_inflow_outflow_conservation(self):
1224        """
1225        Test what happens if water is abstracted from one area and
1226        injected into another - especially if there is not enough
1227        water to match the abstraction.
1228        This tests that the total volume is kept constant under a range of
1229        scenarios.
1230
1231        This test will fail as the problem was only fixed for culverts.
1232        """
1233
1234        from math import pi, cos, sin
1235
1236        length = 20.
1237        width = 10.
1238
1239        dx = dy = 2    # 1 or 2 OK
1240        points, vertices, boundary = rectangular_cross(int(length/dx),
1241                                                       int(width/dy),
1242                                                       len1=length,
1243                                                       len2=width)
1244        domain = Domain(points, vertices, boundary)
1245        domain.set_name('test_inflow_conservation')    # Output name
1246        domain.set_default_order(2)
1247
1248        # Flat surface with 1m of water
1249        stage = 1.0
1250        domain.set_quantity('elevation', 0)
1251        domain.set_quantity('stage', stage)
1252        domain.set_quantity('friction', 0)
1253
1254        Br = Reflective_boundary(domain)
1255        domain.set_boundary({'left': Br, 'right': Br, 'bottom': Br, 'top': Br})
1256
1257        # Setup one forcing term, constant inflow of 2 m^3/s on a circle
1258        domain.forcing_terms = []
1259        domain.forcing_terms.append(Inflow(domain, rate=2.0,
1260                                           center=(5,5), radius=1))
1261
1262        domain.compute_forcing_terms()
1263
1264        # Check that update values are correct
1265        for x in domain.quantities['stage'].explicit_update:
1266            assert num.allclose(x, 2.0/pi) or num.allclose(x, 0.0)
1267
1268        # Check volumes without inflow
1269        domain.forcing_terms = []
1270        initial_volume = domain.quantities['stage'].get_integral()
1271
1272        assert num.allclose(initial_volume, width*length*stage)
1273
1274        for t in domain.evolve(yieldstep = 0.05, finaltime = 5.0):
1275            volume = domain.quantities['stage'].get_integral()
1276            assert num.allclose(volume, initial_volume)
1277
1278        # Now apply the inflow and check volumes for a range of stage values
1279        for stage in [2.0, 1.0, 0.5, 0.25, 0.1, 0.0]:
1280            domain.time = 0.0
1281            domain.set_quantity('stage', stage)
1282            domain.forcing_terms = []
1283            domain.forcing_terms.append(Inflow(domain, rate=2.0,
1284                                               center=(5,5), radius=1))
1285            initial_volume = domain.quantities['stage'].get_integral()
1286            predicted_volume = initial_volume
1287            dt = 0.05
1288            for t in domain.evolve(yieldstep=dt, finaltime=5.0):
1289                volume = domain.quantities['stage'].get_integral()
1290                assert num.allclose (volume, predicted_volume)
1291                predicted_volume = predicted_volume + 2.0/pi/100/dt # Why 100?
1292
1293        # Apply equivalent outflow only and check volumes
1294        # for a range of stage values
1295        for stage in [2.0, 1.0, 0.5, 0.25, 0.1, 0.0]:
1296            print stage
1297
1298            domain.time = 0.0
1299            domain.set_quantity('stage', stage)
1300            domain.forcing_terms = []
1301            domain.forcing_terms.append(Inflow(domain, rate=-2.0,
1302                                               center=(15,5), radius=1))
1303            initial_volume = domain.quantities['stage'].get_integral()
1304            predicted_volume = initial_volume
1305            dt = 0.05
1306            for t in domain.evolve(yieldstep=dt, finaltime=5.0):
1307                volume = domain.quantities['stage'].get_integral()
1308                print t, volume, predicted_volume
1309                assert num.allclose (volume, predicted_volume)
1310                predicted_volume = predicted_volume - 2.0/pi/100/dt # Why 100?
1311
1312        # Apply both inflow and outflow and check volumes being constant for a
1313        # range of stage values
1314        for stage in [2.0, 1.0, 0.5, 0.25, 0.1, 0.0]:
1315            print stage
1316
1317            domain.time = 0.0
1318            domain.set_quantity('stage', stage)
1319            domain.forcing_terms = []
1320            domain.forcing_terms.append(Inflow(domain, rate=2.0,
1321                                               center=(5,5), radius=1))
1322            domain.forcing_terms.append(Inflow(domain, rate=-2.0,
1323                                               center=(15,5), radius=1))
1324            initial_volume = domain.quantities['stage'].get_integral()
1325
1326            dt = 0.05
1327            for t in domain.evolve(yieldstep=dt, finaltime=5.0):
1328                volume = domain.quantities['stage'].get_integral()
1329
1330                print t, volume
1331                assert num.allclose(volume, initial_volume)
1332
1333    #####################################################
1334
1335       
1336    def test_inflow_using_flowline(self):
1337        """test_inflow_using_flowline
1338
1339        Test the ability of a flowline to match inflow above the flowline by
1340        creating constant inflow onto a circle at the head of a 20m
1341        wide by 300m long plane dipping at various slopes with a
1342        perpendicular flowline and gauge downstream of the inflow and
1343        a 45 degree flowlines at 200m downstream.
1344
1345        A more substantial version of this test with finer resolution and
1346        including the depth calculation using Manning's equation is
1347        available under the validate_all suite in the directory
1348        anuga_validation/automated_validation_tests/flow_tests.
1349        """
1350
1351       
1352        verbose = False
1353       
1354
1355        #----------------------------------------------------------------------
1356        # Import necessary modules
1357        #----------------------------------------------------------------------
1358
1359        from anuga.abstract_2d_finite_volumes.mesh_factory \
1360                import rectangular_cross
1361        from anuga.shallow_water import Domain
1362        from anuga.shallow_water.shallow_water_domain import Reflective_boundary
1363        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
1364        from anuga.shallow_water.shallow_water_domain import Inflow
1365        from anuga.shallow_water.data_manager \
1366                import get_flow_through_cross_section
1367        from anuga.abstract_2d_finite_volumes.util \
1368                import sww2csv_gauges, csv2timeseries_graphs
1369
1370        #----------------------------------------------------------------------
1371        # Setup computational domain
1372        #----------------------------------------------------------------------
1373        number_of_inflows = 2 # Number of inflows on top of each other
1374        finaltime = 500 #700.0 # If this is too short, steady state will not be achieved
1375
1376        length = 250.
1377        width  = 20.
1378        dx = dy = 5                 # Resolution: of grid on both axes
1379
1380        points, vertices, boundary = rectangular_cross(int(length/dx),
1381                                                       int(width/dy),
1382                                                       len1=length,
1383                                                       len2=width)
1384
1385        for mannings_n in [0.1, 0.01]:
1386            # Loop over a range of roughnesses             
1387           
1388            for slope in [1.0/300, 1.0/100]:
1389                # Loop over a range of bedslopes representing
1390                # sub to super critical flows
1391
1392
1393                domain = Domain(points, vertices, boundary)   
1394                domain.set_name('inflow_flowline_test')     # Output name
1395
1396                #--------------------------------------------------------------
1397                # Setup initial conditions
1398                #--------------------------------------------------------------
1399
1400                def topography(x, y):
1401                    z = -x * slope
1402                    return z
1403
1404                # Use function for elevation
1405                domain.set_quantity('elevation', topography)
1406                # Constant friction of conc surface   
1407                domain.set_quantity('friction', mannings_n)
1408                # Dry initial condition
1409                domain.set_quantity('stage', expression='elevation')
1410
1411                #--------------------------------------------------------------
1412                # Setup Inflow
1413                #--------------------------------------------------------------
1414
1415                # Fixed Flowrate onto Area
1416                fixed_inflow = Inflow(domain,
1417                                      center=(10.0, 10.0),
1418                                      radius=5.00,
1419                                      rate=10.00)   
1420
1421                # Stack this flow
1422                for i in range(number_of_inflows):
1423                    domain.forcing_terms.append(fixed_inflow)
1424               
1425                ref_flow = fixed_inflow.rate*number_of_inflows
1426
1427                # Compute normal depth on plane using Mannings equation
1428                # v=1/n*(r^2/3)*(s^0.5) or r=(Q*n/(s^0.5*W))^0.6
1429                normal_depth=(ref_flow*mannings_n/(slope**0.5*width))**0.6
1430                if verbose:
1431                    print
1432                    print 'Slope:', slope, 'Mannings n:', mannings_n
1433                   
1434                   
1435                #--------------------------------------------------------------
1436                # Setup boundary conditions
1437                #--------------------------------------------------------------
1438
1439                Br = Reflective_boundary(domain)
1440               
1441                # Define downstream boundary based on predicted depth
1442                def normal_depth_stage_downstream(t):
1443                    return (-slope*length) + normal_depth
1444               
1445                Bt = Transmissive_momentum_set_stage_boundary(domain=domain,
1446                                                              function=normal_depth_stage_downstream)
1447               
1448
1449               
1450
1451                domain.set_boundary({'left': Br,
1452                                     'right': Bt,
1453                                     'top': Br,
1454                                     'bottom': Br})
1455
1456
1457
1458                #--------------------------------------------------------------
1459                # Evolve system through time
1460                #--------------------------------------------------------------
1461
1462                for t in domain.evolve(yieldstep=100.0, finaltime=finaltime):
1463                    pass
1464                    #if verbose :
1465                    #    print domain.timestepping_statistics()
1466
1467                    #    print domain.volumetric_balance_statistics()                                   
1468
1469
1470                #--------------------------------------------------------------
1471                # Compute flow thru flowlines ds of inflow
1472                #--------------------------------------------------------------
1473                   
1474                # Square on flowline at 200m
1475                q = domain.get_flow_through_cross_section([[200.0,  0.0],
1476                                                           [200.0, 20.0]])
1477                if verbose:
1478                    print ('90 degree flowline: ANUGA = %f, Ref = %f'
1479                           % (q, ref_flow))
1480
1481                msg = ('Predicted flow was %f, should have been %f'
1482                       % (q, ref_flow))
1483                assert num.allclose(q, ref_flow, rtol=1.0e-2), msg         
1484
1485                           
1486                # 45 degree flowline at 200m
1487                q = domain.get_flow_through_cross_section([[200.0,  0.0],
1488                                                           [220.0, 20.0]])
1489                if verbose:
1490                    print ('45 degree flowline: ANUGA = %f, Ref = %f'
1491                           % (q, ref_flow))
1492                   
1493                msg = ('Predicted flow was %f, should have been %f'
1494                       % (q, ref_flow))
1495                assert num.allclose(q, ref_flow, rtol=1.0e-2), msg         
1496
1497        os.remove('inflow_flowline_test.sww')
1498
1499       
1500    def Xtest_inflow_boundary_using_flowline(self):
1501        """test_inflow_boundary_using_flowline
1502        Test the ability of a flowline to match inflow above the flowline by
1503        creating constant inflow into the boundary at the head of a 20m
1504        wide by 300m long plane dipping at various slopes with a
1505        perpendicular flowline and gauge downstream of the inflow and
1506        a 45 degree flowlines at 200m downstream
1507       
1508       
1509        """
1510
1511        # FIXME (Ole): Work in progress
1512       
1513        verbose = False
1514       
1515
1516        #----------------------------------------------------------------------
1517        # Import necessary modules
1518        #----------------------------------------------------------------------
1519        from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
1520        from anuga.shallow_water import Domain
1521        from anuga.shallow_water.shallow_water_domain import Reflective_boundary
1522        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
1523        from anuga.shallow_water.shallow_water_domain import Inflow_boundary
1524        from anuga.shallow_water.data_manager import get_flow_through_cross_section
1525        from anuga.abstract_2d_finite_volumes.util import sww2csv_gauges, csv2timeseries_graphs
1526
1527
1528        #----------------------------------------------------------------------
1529        # Setup computational domain
1530        #----------------------------------------------------------------------
1531
1532        finaltime = 500 #700.0 # If this is too short, steady state will not be achieved
1533
1534        length = 250.
1535        width  = 20.
1536        dx = dy = 5          # Resolution: of grid on both axes
1537
1538        points, vertices, boundary = rectangular_cross(int(length/dx), int(width/dy),
1539                                                       len1=length, len2=width)
1540
1541        for mannings_n in [0.1, 0.01]:
1542            # Loop over a range of roughnesses             
1543           
1544            for slope in [1.0/300, 1.0/100]:
1545                # Loop over a range of bedslopes representing sub to super critical flows
1546
1547
1548                domain = Domain(points, vertices, boundary)   
1549                domain.set_name('inflow_boundary_flowline_test')
1550               
1551
1552                #-------------------------------------------------------------
1553                # Setup initial conditions
1554                #-------------------------------------------------------------
1555
1556                def topography(x, y):
1557                    z=-x * slope
1558                    return z
1559
1560                domain.set_quantity('elevation', topography)
1561                domain.set_quantity('friction', mannings_n)
1562                domain.set_quantity('stage',
1563                                    expression='elevation')
1564
1565
1566                   
1567                #--------------------------------------------------------------
1568                # Setup boundary conditions
1569                #--------------------------------------------------------------
1570               
1571
1572
1573                ref_flow = 10.00
1574
1575                # Compute normal depth on plane using Mannings equation
1576                # v=1/n*(r^2/3)*(s^0.5) or r=(Q*n/(s^0.5*W))^0.6
1577                normal_depth=(ref_flow*mannings_n/(slope**0.5*width))**0.6
1578                if verbose:
1579                    print
1580                    print 'Slope:', slope, 'Mannings n:', mannings_n
1581                   
1582               
1583               
1584                Bi = Inflow_boundary(domain, rate=ref_flow)
1585
1586                Br = Reflective_boundary(domain)
1587               
1588                # Define downstream boundary based on predicted depth
1589                def normal_depth_stage_downstream(t):
1590                    return (-slope*length) + normal_depth
1591               
1592                Bt = Transmissive_momentum_set_stage_boundary(domain=domain,
1593                                                              function=normal_depth_stage_downstream)
1594               
1595
1596               
1597
1598                domain.set_boundary({'left': Bi,
1599                                     'right': Bt,
1600                                     'top': Br,
1601                                     'bottom': Br})
1602
1603
1604
1605                #--------------------------------------------------------------
1606                # Evolve system through time
1607                #--------------------------------------------------------------
1608
1609
1610                for t in domain.evolve(yieldstep=100.0, finaltime=finaltime):
1611                    pass
1612                    #if verbose :
1613                    #    print domain.timestepping_statistics()
1614                    #    print domain.volumetric_balance_statistics()
1615                                                       
1616
1617
1618                #--------------------------------------------------------------
1619                # Compute flow thru flowlines ds of inflow
1620                #--------------------------------------------------------------
1621                   
1622                # Square on flowline at 200m
1623                q=domain.get_flow_through_cross_section([[200.0,0.0],[200.0,20.0]])
1624                msg = 'Predicted flow was %f, should have been %f' % (q, ref_flow)
1625                if verbose:
1626                    print '90 degree flowline: ANUGA = %f, Ref = %f' % (q, ref_flow)
1627                assert num.allclose(q, ref_flow, rtol=1.0e-2), msg         
1628
1629                           
1630                # 45 degree flowline at 200m
1631                q=domain.get_flow_through_cross_section([[200.0,0.0],[220.0,20.0]])
1632                msg = 'Predicted flow was %f, should have been %f' % (q, ref_flow)
1633                if verbose:
1634                    print '45 degree flowline: ANUGA = %f, Ref = %f' % (q, ref_flow)
1635                   
1636                assert num.allclose(q, ref_flow, rtol=1.0e-2), msg         
1637
1638       
1639       
1640    def Xtest_friction_dependent_flow_using_flowline(self):
1641        """test_friction_dependent_flow_using_flowline
1642       
1643        Test the internal flow (using flowline) as a function of
1644        different values of Mannings n and different slopes.
1645       
1646        Flow is applied in the form of boundary conditions with fixed momentum.
1647        """
1648
1649        verbose = True
1650
1651        #----------------------------------------------------------------------
1652        # Import necessary modules
1653        #----------------------------------------------------------------------
1654
1655        from anuga.abstract_2d_finite_volumes.mesh_factory \
1656                import rectangular_cross
1657        from anuga.shallow_water import Domain
1658        from anuga.shallow_water.shallow_water_domain import Reflective_boundary
1659        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
1660        from anuga.shallow_water.shallow_water_domain import Inflow
1661        from anuga.shallow_water.data_manager \
1662                import get_flow_through_cross_section
1663        from anuga.abstract_2d_finite_volumes.util \
1664                import sww2csv_gauges, csv2timeseries_graphs
1665
1666
1667        #----------------------------------------------------------------------
1668        # Setup computational domain
1669        #----------------------------------------------------------------------
1670
1671        finaltime = 1000.0
1672
1673        length = 300.
1674        width  = 20.
1675        dx = dy = 5       # Resolution: of grid on both axes
1676       
1677        # Input parameters
1678        uh = 1.0
1679        vh = 0.0
1680        d = 1.0
1681       
1682        ref_flow = uh*d*width # 20 m^3/s in the x direction across entire domain
1683
1684        points, vertices, boundary = rectangular_cross(int(length/dx),
1685                                                       int(width/dy),
1686                                                       len1=length,
1687                                                       len2=width)
1688
1689        for mannings_n in [0.035]:          #[0.0, 0.012, 0.035]:
1690            for slope in [1.0/300]:         #[0.0, 1.0/300, 1.0/150]:
1691                # Loop over a range of bedslopes representing
1692                # sub to super critical flows
1693                if verbose:
1694                    print
1695                    print 'Slope:', slope, 'Mannings n:', mannings_n
1696                domain = Domain(points, vertices, boundary)   
1697                domain.set_name('Inflow_flowline_test')     # Output name
1698
1699                #--------------------------------------------------------------
1700                # Setup initial conditions
1701                #--------------------------------------------------------------
1702
1703                def topography(x, y):
1704                    z = -x * slope
1705                    return z
1706
1707                # Use function for elevation
1708                domain.set_quantity('elevation', topography)
1709                # Constant friction
1710                domain.set_quantity('friction', mannings_n)
1711               
1712                #domain.set_quantity('stage', expression='elevation')
1713                     
1714                # Set initial flow as depth=1m, uh=1.0 m/s, vh = 0.0
1715                # making it 20 m^3/s across entire domain
1716                domain.set_quantity('stage', expression='elevation + %f' % d)
1717                domain.set_quantity('xmomentum', uh)
1718                domain.set_quantity('ymomentum', vh)               
1719
1720                #--------------------------------------------------------------
1721                # Setup boundary conditions
1722                #--------------------------------------------------------------
1723
1724                Br = Reflective_boundary(domain)      # Solid reflective wall
1725               
1726                # Constant flow in and out of domain
1727                # Depth = 1m, uh=1 m/s, i.e. a flow of 20 m^3/s
1728                # across boundaries
1729                Bi = Dirichlet_boundary([d, uh, vh]) 
1730                Bo = Dirichlet_boundary([-length*slope+d, uh, vh])
1731                #Bo = Dirichlet_boundary([-100, 0, 0])
1732
1733                domain.set_boundary({'left': Bi, 'right': Bo,
1734                                     'top': Br,  'bottom': Br})
1735
1736                #--------------------------------------------------------------
1737                # Evolve system through time
1738                #--------------------------------------------------------------
1739
1740                for t in domain.evolve(yieldstep=100.0, finaltime=finaltime):
1741                    if verbose :
1742                        print domain.timestepping_statistics()
1743                        print domain.volumetric_balance_statistics()
1744
1745                # 90 degree flowline at 200m
1746                q = domain.get_flow_through_cross_section([[200.0,  0.0],
1747                                                           [200.0, 20.0]])
1748                msg = ('Predicted flow was %f, should have been %f'
1749                       % (q, ref_flow))
1750                if verbose:
1751                    print ('90 degree flowline: ANUGA = %f, Ref = %f'
1752                           % (q, ref_flow))
1753
1754                # 45 degree flowline at 200m
1755                q = domain.get_flow_through_cross_section([[200.0,  0.0],
1756                                                           [220.0, 20.0]])
1757                msg = ('Predicted flow was %f, should have been %f'
1758                       % (q, ref_flow))
1759                if verbose:
1760                    print ('45 degree flowline: ANUGA = %f, Ref = %f'
1761                           % (q, ref_flow))
1762
1763                # Stage recorder (gauge) in middle of plane at 200m
1764                x = 200.0
1765                y = 10.00               
1766                w = domain.get_quantity('stage').\
1767                        get_values(interpolation_points=[[x, y]])[0]
1768                z = domain.get_quantity('elevation').\
1769                        get_values(interpolation_points=[[x, y]])[0]
1770                domain_depth = w-z
1771
1772                xmom = domain.get_quantity('xmomentum').\
1773                        get_values(interpolation_points=[[x, y]])[0]
1774                ymom = domain.get_quantity('ymomentum').\
1775                        get_values(interpolation_points=[[x, y]])[0]           
1776                if verbose:                   
1777                    print ('At interpolation point (h, uh, vh): ',
1778                           domain_depth, xmom, ymom)
1779                    print 'uh * d * width = ', xmom*domain_depth*width
1780                               
1781                if slope > 0.0:
1782                    # Compute normal depth at gauge location using Manning eqn
1783                    # v=1/n*(r^2/3)*(s^0.5) or r=(Q*n/(s^0.5*W))^0.6
1784                    normal_depth = (ref_flow*mannings_n/(slope**0.5*width))**0.6
1785                    if verbose:
1786                        print ('Depth: ANUGA = %f, Mannings = %f'
1787                               % (domain_depth, normal_depth))
1788
1789        os.remove('Inflow_flowline_test.sww')
1790
1791
1792#################################################################################
1793
1794if __name__ == "__main__":
1795    suite = unittest.makeSuite(Test_swb_forcing_terms, 'test')
1796    runner = unittest.TextTestRunner(verbosity=1)
1797    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.