source: trunk/anuga_core/source/anuga/shallow_water/test_forcing_terms.py @ 7814

Last change on this file since 7814 was 7814, checked in by hudson, 14 years ago

New filename conventions for file conversion. Filenames must always be passed in with the correct extension.

File size: 40.1 KB
RevLine 
[7726]1#!/usr/bin/env python
2
3import unittest, os
4import os.path
5from math import pi, sqrt
6import tempfile
7
[7736]8from anuga.config import g, epsilon, \
9                    netcdf_mode_r, netcdf_mode_w, netcdf_mode_a
[7726]10from anuga.geometry.polygon import is_inside_polygon
11from anuga.coordinate_transforms.geo_reference import Geo_reference
12from anuga.abstract_2d_finite_volumes.quantity import Quantity
13from anuga.geospatial_data.geospatial_data import Geospatial_data
14from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
15
16from anuga.utilities.system_tools import get_pathname_from_package
[7736]17from anuga.utilities.numerical_tools import ensure_numeric, mean
[7726]18
[7736]19from shallow_water_domain import Domain
20from boundaries import Reflective_boundary
21from forcing import Wind_stress, Inflow, Rainfall
[7743]22from anuga.file_conversion.file_conversion import timefile2netcdf
[7736]23
[7726]24import numpy as num
25
26# Get gateway to C implementation of flux function for direct testing
27from shallow_water_ext import flux_function_central as flux_function
28
29# Variable windfield implemented using functions
30def speed(t, x, y):
31    """Large speeds halfway between center and edges
32
33    Low speeds at center and edges
34    """
35
36    from math import exp, cos, pi
37
38    x = num.array(x)
39    y = num.array(y)
40
41    N = len(x)
42    s = 0*#New array
43
44    for k in range(N):
45        r = num.sqrt(x[k]**2 + y[k]**2)
46        factor = exp(-(r-0.15)**2)
47        s[k] = 4000 * factor * (cos(t*2*pi/150) + 2)
48
49    return s
50
51def angle(t, x, y):
52    """Rotating field
53    """
54    from math import atan, pi
55
56    x = num.array(x)
57    y = num.array(y)
58
59    N = len(x)
60    a = 0 * x    # New array
61
62    for k in range(N):
63        r = num.sqrt(x[k]**2 + y[k]**2)
64
65        angle = atan(y[k]/x[k])
66
67        if x[k] < 0:
68            angle += pi
69
70        # Take normal direction
71        angle -= pi/2
72
73        # Ensure positive radians
74        if angle < 0:
75            angle += 2*pi
76
77        a[k] = angle/pi*180
78
79    return a
80
81
82def scalar_func(t, x, y):
83    """Function that returns a scalar.
84
85    Used to test error message when numeric array is expected
86    """
87
88    return 17.7
89
90def scalar_func_list(t, x, y):
91    """Function that returns a scalar.
92
93    Used to test error message when numeric array is expected
94    """
95
96    return [17.7]
97
98class Test_forcing_terms(unittest.TestCase):
99    def setUp(self):
100        pass
101
102    def tearDown(self):
103        pass
104
105
106    def test_gravity(self):
107        #Assuming no friction
108
109        from anuga.config import g
110
111        a = [0.0, 0.0]
112        b = [0.0, 2.0]
113        c = [2.0, 0.0]
114        d = [0.0, 4.0]
115        e = [2.0, 2.0]
116        f = [4.0, 0.0]
117
118        points = [a, b, c, d, e, f]
119        #             bac,     bce,     ecf,     dbe
120        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
121
122        domain = Domain(points, vertices)
123
124        #Set up for a gradient of (3,0) at mid triangle (bce)
125        def slope(x, y):
126            return 3*x
127
128        h = 0.1
129        def stage(x, y):
130            return slope(x, y) + h
131
132        domain.set_quantity('elevation', slope)
133        domain.set_quantity('stage', stage)
134
135        for name in domain.conserved_quantities:
136            assert num.allclose(domain.quantities[name].explicit_update, 0)
137            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
138
139        domain.compute_forcing_terms()
140
141        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
142        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
143                            -g*h*3)
144        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
145
[7736]146    # FIXME: James these tests are failing - are they outdated?
147    def NOtest_manning_friction(self):
[7726]148        from anuga.config import g
149
150        a = [0.0, 0.0]
151        b = [0.0, 2.0]
152        c = [2.0, 0.0]
153        d = [0.0, 4.0]
154        e = [2.0, 2.0]
155        f = [4.0, 0.0]
156
157        points = [a, b, c, d, e, f]
158        #             bac,     bce,     ecf,     dbe
159        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
160
161        domain = Domain(points, vertices)
162
163        #Set up for a gradient of (3,0) at mid triangle (bce)
164        def slope(x, y):
165            return 3*x
166
167        h = 0.1
168        def stage(x, y):
169            return slope(x, y) + h
170
171        eta = 0.07
172        domain.set_quantity('elevation', slope)
173        domain.set_quantity('stage', stage)
174        domain.set_quantity('friction', eta)
175
176        for name in domain.conserved_quantities:
177            assert num.allclose(domain.quantities[name].explicit_update, 0)
178            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
179
180        domain.compute_forcing_terms()
181
182        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
183        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
184                            -g*h*3)
185        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
186
187        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
188        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
189                            0)
190        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
191                            0)
192
193        #Create some momentum for friction to work with
194        domain.set_quantity('xmomentum', 1)
195        S = -g*eta**2 / h**(7.0/3)
196
197        domain.compute_forcing_terms()
198        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
199        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
200                            S)
201        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
202                            0)
203
204        #A more complex example
205        domain.quantities['stage'].semi_implicit_update[:] = 0.0
206        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
207        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
208
209        domain.set_quantity('xmomentum', 3)
210        domain.set_quantity('ymomentum', 4)
211
212        S = -g*eta**2*5 / h**(7.0/3)
213
214        domain.compute_forcing_terms()
215
216        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
217        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
218                            S)
219        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
220                            S)
221
222
223
[7736]224    def NOtest_manning_friction_old(self):
[7726]225        from anuga.config import g
226
227        a = [0.0, 0.0]
228        b = [0.0, 2.0]
229        c = [2.0, 0.0]
230        d = [0.0, 4.0]
231        e = [2.0, 2.0]
232        f = [4.0, 0.0]
233
234        points = [a, b, c, d, e, f]
235        #             bac,     bce,     ecf,     dbe
236        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
237
238        domain = Domain(points, vertices)
239
240        #Set up for a gradient of (3,0) at mid triangle (bce)
241        def slope(x, y):
242            return 3*x
243
244        h = 0.1
245        def stage(x, y):
246            return slope(x, y) + h
247
248        eta = 0.07
249        domain.set_quantity('elevation', slope)
250        domain.set_quantity('stage', stage)
251        domain.set_quantity('friction', eta)
252
253        for name in domain.conserved_quantities:
254            assert num.allclose(domain.quantities[name].explicit_update, 0)
255            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
256
257        domain.compute_forcing_terms()
258
259        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
260        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
261                            -g*h*3)
262        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
263
264        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
265        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
266                            0)
267        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
268                            0)
269
270        #Create some momentum for friction to work with
271        domain.set_quantity('xmomentum', 1)
272        S = -g*eta**2 / h**(7.0/3)
273
274        domain.compute_forcing_terms()
275        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
276        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
277                            S)
278        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
279                            0)
280
281        #A more complex example
282        domain.quantities['stage'].semi_implicit_update[:] = 0.0
283        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
284        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
285
286        domain.set_quantity('xmomentum', 3)
287        domain.set_quantity('ymomentum', 4)
288        # sqrt(3^2 +4^2) = 5
289
290        S = -g*eta**2 / h**(7.0/3)  * 5
291
292        domain.compute_forcing_terms()
293
294        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
295        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
296                            S)
297        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
298                            S)
299
300
[7736]301    def NOtest_manning_friction_new(self):
[7726]302        from anuga.config import g
303
304        a = [0.0, 0.0]
305        b = [0.0, 2.0]
306        c = [2.0, 0.0]
307        d = [0.0, 4.0]
308        e = [2.0, 2.0]
309        f = [4.0, 0.0]
310
311        points = [a, b, c, d, e, f]
312        #             bac,     bce,     ecf,     dbe
313        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
314
315        domain = Domain(points, vertices)
316
317        # Use the new function which takes into account the extra
318        # wetted area due to slope of bed
319        domain.set_new_mannings_function(True)
320       
321        #Set up for a gradient of (3,0) at mid triangle (bce)
322        def slope(x, y):
323            return 3*x
324
325        h = 0.1
326        def stage(x, y):
327            return slope(x, y) + h
328
329        eta = 0.07
330        domain.set_quantity('elevation', slope)
331        domain.set_quantity('stage', stage)
332        domain.set_quantity('friction', eta)
333
334        for name in domain.conserved_quantities:
335            assert num.allclose(domain.quantities[name].explicit_update, 0)
336            assert num.allclose(domain.quantities[name].semi_implicit_update, 0)
337
338        domain.compute_forcing_terms()
339
340        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
341        assert num.allclose(domain.quantities['xmomentum'].explicit_update,
342                            -g*h*3)
343        assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0)
344
345        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
346        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
347                            0)
348        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
349                            0)
350
351        #Create some momentum for friction to work with
352        domain.set_quantity('xmomentum', 1)
353        S = -g*eta**2 / h**(7.0/3) * sqrt(10)
354
355        domain.compute_forcing_terms()
356        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
357        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
358                            S)
359        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
360                            0)
361
362        #A more complex example
363        domain.quantities['stage'].semi_implicit_update[:] = 0.0
364        domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0
365        domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0
366
367        domain.set_quantity('xmomentum', 3)
368        domain.set_quantity('ymomentum', 4)
369
370        S = -g*eta**2*5 / h**(7.0/3) * sqrt(10.0)
371
372        domain.compute_forcing_terms()
373
374        assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0)
375        assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,
376                            S)
377        assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,
378                            S)
379       
380    def test_constant_wind_stress(self):
381        from anuga.config import rho_a, rho_w, eta_w
382        from math import pi, cos, sin
383
384        a = [0.0, 0.0]
385        b = [0.0, 2.0]
386        c = [2.0, 0.0]
387        d = [0.0, 4.0]
388        e = [2.0, 2.0]
389        f = [4.0, 0.0]
390
391        points = [a, b, c, d, e, f]
392        #             bac,     bce,     ecf,     dbe
393        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
394
395        domain = Domain(points, vertices)
396
397        #Flat surface with 1m of water
398        domain.set_quantity('elevation', 0)
399        domain.set_quantity('stage', 1.0)
400        domain.set_quantity('friction', 0)
401
402        Br = Reflective_boundary(domain)
403        domain.set_boundary({'exterior': Br})
404
405        #Setup only one forcing term, constant wind stress
406        s = 100
407        phi = 135
408        domain.forcing_terms = []
409        domain.forcing_terms.append(Wind_stress(s, phi))
410
411        domain.compute_forcing_terms()
412
413        const = eta_w*rho_a / rho_w
414
415        #Convert to radians
416        phi = phi*pi / 180
417
418        #Compute velocity vector (u, v)
419        u = s*cos(phi)
420        v = s*sin(phi)
421
422        #Compute wind stress
423        S = const * num.sqrt(u**2 + v**2)
424
425        assert num.allclose(domain.quantities['stage'].explicit_update, 0)
426        assert num.allclose(domain.quantities['xmomentum'].explicit_update, S*u)
427        assert num.allclose(domain.quantities['ymomentum'].explicit_update, S*v)
428
429    def test_variable_wind_stress(self):
430        from anuga.config import rho_a, rho_w, eta_w
431        from math import pi, cos, sin
432
433        a = [0.0, 0.0]
434        b = [0.0, 2.0]
435        c = [2.0, 0.0]
436        d = [0.0, 4.0]
437        e = [2.0, 2.0]
438        f = [4.0, 0.0]
439
440        points = [a, b, c, d, e, f]
441        #             bac,     bce,     ecf,     dbe
442        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
443
444        domain = Domain(points, vertices)
445
446        #Flat surface with 1m of water
447        domain.set_quantity('elevation', 0)
448        domain.set_quantity('stage', 1.0)
449        domain.set_quantity('friction', 0)
450
451        Br = Reflective_boundary(domain)
452        domain.set_boundary({'exterior': Br})
453
454        domain.time = 5.54    # Take a random time (not zero)
455
456        #Setup only one forcing term, constant wind stress
457        s = 100
458        phi = 135
459        domain.forcing_terms = []
460        domain.forcing_terms.append(Wind_stress(s=speed, phi=angle))
461
462        domain.compute_forcing_terms()
463
464        #Compute reference solution
465        const = eta_w*rho_a / rho_w
466
467        N = len(domain)    # number_of_triangles
468
469        xc = domain.get_centroid_coordinates()
470        t = domain.time
471
472        x = xc[:,0]
473        y = xc[:,1]
474        s_vec = speed(t,x,y)
475        phi_vec = angle(t,x,y)
476
477        for k in range(N):
478            # Convert to radians
479            phi = phi_vec[k]*pi / 180
480            s = s_vec[k]
481
482            # Compute velocity vector (u, v)
483            u = s*cos(phi)
484            v = s*sin(phi)
485
486            # Compute wind stress
487            S = const * num.sqrt(u**2 + v**2)
488
489            assert num.allclose(domain.quantities['stage'].explicit_update[k],
490                                0)
491            assert num.allclose(domain.quantities['xmomentum'].\
492                                     explicit_update[k],
493                                S*u)
494            assert num.allclose(domain.quantities['ymomentum'].\
495                                     explicit_update[k],
496                                S*v)
497
498    def test_windfield_from_file(self):
499        import time
500        from anuga.config import rho_a, rho_w, eta_w
501        from math import pi, cos, sin
502        from anuga.config import time_format
503        from anuga.abstract_2d_finite_volumes.util import file_function
504
505        a = [0.0, 0.0]
506        b = [0.0, 2.0]
507        c = [2.0, 0.0]
508        d = [0.0, 4.0]
509        e = [2.0, 2.0]
510        f = [4.0, 0.0]
511
512        points = [a, b, c, d, e, f]
513        #             bac,     bce,     ecf,     dbe
514        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
515
516        domain = Domain(points, vertices)
517
518        # Flat surface with 1m of water
519        domain.set_quantity('elevation', 0)
520        domain.set_quantity('stage', 1.0)
521        domain.set_quantity('friction', 0)
522
523        Br = Reflective_boundary(domain)
524        domain.set_boundary({'exterior': Br})
525
526        domain.time = 7    # Take a time that is represented in file (not zero)
527
528        # Write wind stress file (ensure that domain.time is covered)
529        # Take x=1 and y=0
530        filename = 'test_windstress_from_file'
531        start = time.mktime(time.strptime('2000', '%Y'))
532        fid = open(filename + '.txt', 'w')
533        dt = 1    # One second interval
534        t = 0.0
535        while t <= 10.0:
536            t_string = time.strftime(time_format, time.gmtime(t+start))
537
538            fid.write('%s, %f %f\n' %
539                      (t_string, speed(t,[1],[0])[0], angle(t,[1],[0])[0]))
540            t += dt
541
542        fid.close()
543
544        # Convert ASCII file to NetCDF (Which is what we really like!)
[7814]545        timefile2netcdf(filename + '.txt')
[7726]546        os.remove(filename + '.txt')
547
548        # Setup wind stress
549        F = file_function(filename + '.tms',
550                          quantities=['Attribute0', 'Attribute1'])
551        os.remove(filename + '.tms')
552
553        W = Wind_stress(F)
554
555        domain.forcing_terms = []
556        domain.forcing_terms.append(W)
557
558        domain.compute_forcing_terms()
559
560        # Compute reference solution
561        const = eta_w*rho_a / rho_w
562
563        N = len(domain)    # number_of_triangles
564
565        t = domain.time
566
567        s = speed(t, [1], [0])[0]
568        phi = angle(t, [1], [0])[0]
569
570        # Convert to radians
571        phi = phi*pi / 180
572
573        # Compute velocity vector (u, v)
574        u = s*cos(phi)
575        v = s*sin(phi)
576
577        # Compute wind stress
578        S = const * num.sqrt(u**2 + v**2)
579
580        for k in range(N):
581            assert num.allclose(domain.quantities['stage'].explicit_update[k],
582                                0)
583            assert num.allclose(domain.quantities['xmomentum'].\
584                                    explicit_update[k],
585                                S*u)
586            assert num.allclose(domain.quantities['ymomentum'].\
587                                    explicit_update[k],
588                                S*v)
589
590    def test_windfield_from_file_seconds(self):
591        import time
592        from anuga.config import rho_a, rho_w, eta_w
593        from math import pi, cos, sin
594        from anuga.config import time_format
595        from anuga.abstract_2d_finite_volumes.util import file_function
596
597        a = [0.0, 0.0]
598        b = [0.0, 2.0]
599        c = [2.0, 0.0]
600        d = [0.0, 4.0]
601        e = [2.0, 2.0]
602        f = [4.0, 0.0]
603
604        points = [a, b, c, d, e, f]
605        #             bac,     bce,     ecf,     dbe
606        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
607
608        domain = Domain(points, vertices)
609
610        # Flat surface with 1m of water
611        domain.set_quantity('elevation', 0)
612        domain.set_quantity('stage', 1.0)
613        domain.set_quantity('friction', 0)
614
615        Br = Reflective_boundary(domain)
616        domain.set_boundary({'exterior': Br})
617
618        domain.time = 7    # Take a time that is represented in file (not zero)
619
620        # Write wind stress file (ensure that domain.time is covered)
621        # Take x=1 and y=0
622        filename = 'test_windstress_from_file'
623        start = time.mktime(time.strptime('2000', '%Y'))
624        fid = open(filename + '.txt', 'w')
625        dt = 0.5    # Half second interval
626        t = 0.0
627        while t <= 10.0:
628            fid.write('%s, %f %f\n'
629                      % (str(t), speed(t, [1], [0])[0], angle(t, [1], [0])[0]))
630            t += dt
631
632        fid.close()
633
634        # Convert ASCII file to NetCDF (Which is what we really like!)
[7814]635        timefile2netcdf(filename + '.txt', time_as_seconds=True)
[7726]636        os.remove(filename + '.txt')
637
638        # Setup wind stress
639        F = file_function(filename + '.tms',
640                          quantities=['Attribute0', 'Attribute1'])
641        os.remove(filename + '.tms')
642
643        W = Wind_stress(F)
644
645        domain.forcing_terms = []
646        domain.forcing_terms.append(W)
647
648        domain.compute_forcing_terms()
649
650        # Compute reference solution
651        const = eta_w*rho_a / rho_w
652
653        N = len(domain)    # number_of_triangles
654
655        t = domain.time
656
657        s = speed(t, [1], [0])[0]
658        phi = angle(t, [1], [0])[0]
659
660        # Convert to radians
661        phi = phi*pi / 180
662
663        # Compute velocity vector (u, v)
664        u = s*cos(phi)
665        v = s*sin(phi)
666
667        # Compute wind stress
668        S = const * num.sqrt(u**2 + v**2)
669
670        for k in range(N):
671            assert num.allclose(domain.quantities['stage'].explicit_update[k],
672                                0)
673            assert num.allclose(domain.quantities['xmomentum'].\
674                                    explicit_update[k],
675                                S*u)
676            assert num.allclose(domain.quantities['ymomentum'].\
677                                    explicit_update[k],
678                                S*v)
679
680    def test_wind_stress_error_condition(self):
681        """Test that windstress reacts properly when forcing functions
682        are wrong - e.g. returns a scalar
683        """
684
685        from math import pi, cos, sin
686        from anuga.config import rho_a, rho_w, eta_w
687
688        a = [0.0, 0.0]
689        b = [0.0, 2.0]
690        c = [2.0, 0.0]
691        d = [0.0, 4.0]
692        e = [2.0, 2.0]
693        f = [4.0, 0.0]
694
695        points = [a, b, c, d, e, f]
696        #             bac,     bce,     ecf,     dbe
697        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
698
699        domain = Domain(points, vertices)
700
701        # Flat surface with 1m of water
702        domain.set_quantity('elevation', 0)
703        domain.set_quantity('stage', 1.0)
704        domain.set_quantity('friction', 0)
705
706        Br = Reflective_boundary(domain)
707        domain.set_boundary({'exterior': Br})
708
709        domain.time = 5.54    # Take a random time (not zero)
710
711        # Setup only one forcing term, bad func
712        domain.forcing_terms = []
713
714        try:
715            domain.forcing_terms.append(Wind_stress(s=scalar_func_list,
716                                                    phi=angle))
717        except AssertionError:
718            pass
719        else:
720            msg = 'Should have raised exception'
721            raise Exception, msg
722
723        try:
724            domain.forcing_terms.append(Wind_stress(s=speed, phi=scalar_func))
725        except Exception:
726            pass
727        else:
728            msg = 'Should have raised exception'
729            raise Exception, msg
730
731        try:
732            domain.forcing_terms.append(Wind_stress(s=speed, phi='xx'))
733        except:
734            pass
735        else:
736            msg = 'Should have raised exception'
737            raise Exception, msg
738
739
740    def test_rainfall(self):
741        from math import pi, cos, sin
742
743        a = [0.0, 0.0]
744        b = [0.0, 2.0]
745        c = [2.0, 0.0]
746        d = [0.0, 4.0]
747        e = [2.0, 2.0]
748        f = [4.0, 0.0]
749
750        points = [a, b, c, d, e, f]
751        #             bac,     bce,     ecf,     dbe
752        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
753
754        domain = Domain(points, vertices)
755
756        # Flat surface with 1m of water
757        domain.set_quantity('elevation', 0)
758        domain.set_quantity('stage', 1.0)
759        domain.set_quantity('friction', 0)
760
761        Br = Reflective_boundary(domain)
762        domain.set_boundary({'exterior': Br})
763
764        # Setup only one forcing term, constant rainfall
765        domain.forcing_terms = []
766        domain.forcing_terms.append(Rainfall(domain, rate=2.0))
767
768        domain.compute_forcing_terms()
769        assert num.allclose(domain.quantities['stage'].explicit_update,
770                            2.0/1000)
771
772    def test_rainfall_restricted_by_polygon(self):
773        from math import pi, cos, sin
774
775        a = [0.0, 0.0]
776        b = [0.0, 2.0]
777        c = [2.0, 0.0]
778        d = [0.0, 4.0]
779        e = [2.0, 2.0]
780        f = [4.0, 0.0]
781
782        points = [a, b, c, d, e, f]
783        #             bac,     bce,     ecf,     dbe
784        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
785
786        domain = Domain(points, vertices)
787
788        # Flat surface with 1m of water
789        domain.set_quantity('elevation', 0)
790        domain.set_quantity('stage', 1.0)
791        domain.set_quantity('friction', 0)
792
793        Br = Reflective_boundary(domain)
794        domain.set_boundary({'exterior': Br})
795
796        # Setup only one forcing term, constant rainfall
797        # restricted to a polygon enclosing triangle #1 (bce)
798        domain.forcing_terms = []
799        R = Rainfall(domain, rate=2.0, polygon=[[1,1], [2,1], [2,2], [1,2]])
800
801        assert num.allclose(R.exchange_area, 2)
802
803        domain.forcing_terms.append(R)
804
805        domain.compute_forcing_terms()
806
807        assert num.allclose(domain.quantities['stage'].explicit_update[1],
808                            2.0/1000)
809        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
810        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
811
812    def test_time_dependent_rainfall_restricted_by_polygon(self):
813        a = [0.0, 0.0]
814        b = [0.0, 2.0]
815        c = [2.0, 0.0]
816        d = [0.0, 4.0]
817        e = [2.0, 2.0]
818        f = [4.0, 0.0]
819
820        points = [a, b, c, d, e, f]
821        #             bac,     bce,     ecf,     dbe
822        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
823
824        domain = Domain(points, vertices)
825
826        # Flat surface with 1m of water
827        domain.set_quantity('elevation', 0)
828        domain.set_quantity('stage', 1.0)
829        domain.set_quantity('friction', 0)
830
831        Br = Reflective_boundary(domain)
832        domain.set_boundary({'exterior': Br})
833
834        # Setup only one forcing term, time dependent rainfall
835        # restricted to a polygon enclosing triangle #1 (bce)
836        domain.forcing_terms = []
837        R = Rainfall(domain,
838                     rate=lambda t: 3*t + 7,
839                     polygon = [[1,1], [2,1], [2,2], [1,2]])
840
841        assert num.allclose(R.exchange_area, 2)
842       
843        domain.forcing_terms.append(R)
844
845        domain.time = 10.
846
847        domain.compute_forcing_terms()
848
849        assert num.allclose(domain.quantities['stage'].explicit_update[1],
850                            (3*domain.time + 7)/1000)
851        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
852        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
853
854    def test_time_dependent_rainfall_using_starttime(self):
855        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
856
857        a = [0.0, 0.0]
858        b = [0.0, 2.0]
859        c = [2.0, 0.0]
860        d = [0.0, 4.0]
861        e = [2.0, 2.0]
862        f = [4.0, 0.0]
863
864        points = [a, b, c, d, e, f]
865        #             bac,     bce,     ecf,     dbe
866        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
867
868        domain = Domain(points, vertices)
869
870        # Flat surface with 1m of water
871        domain.set_quantity('elevation', 0)
872        domain.set_quantity('stage', 1.0)
873        domain.set_quantity('friction', 0)
874
875        Br = Reflective_boundary(domain)
876        domain.set_boundary({'exterior': Br})
877
878        # Setup only one forcing term, time dependent rainfall
879        # restricted to a polygon enclosing triangle #1 (bce)
880        domain.forcing_terms = []
881        R = Rainfall(domain,
882                     rate=lambda t: 3*t + 7,
883                     polygon=rainfall_poly)                     
884
885        assert num.allclose(R.exchange_area, 2)
886       
887        domain.forcing_terms.append(R)
888
889        # This will test that time used in the forcing function takes
890        # startime into account.
891        domain.starttime = 5.0
892
893        domain.time = 7.
894
895        domain.compute_forcing_terms()
896
897        assert num.allclose(domain.quantities['stage'].explicit_update[1],
898                            (3*domain.get_time() + 7)/1000)
899        assert num.allclose(domain.quantities['stage'].explicit_update[1],
900                            (3*(domain.time + domain.starttime) + 7)/1000)
901
902        # Using internal time her should fail
903        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
904                                (3*domain.time + 7)/1000)
905
906        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
907        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
908
909    def test_time_dependent_rainfall_using_georef(self):
910        """test_time_dependent_rainfall_using_georef
911
912        This will also test the General forcing term using georef
913        """
914
915        # Mesh in zone 56 (absolute coords)
916        x0 = 314036.58727982
917        y0 = 6224951.2960092
918
919        rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
920        rainfall_poly += [x0, y0]
921
922        a = [0.0, 0.0]
923        b = [0.0, 2.0]
924        c = [2.0, 0.0]
925        d = [0.0, 4.0]
926        e = [2.0, 2.0]
927        f = [4.0, 0.0]
928
929        points = [a, b, c, d, e, f]
930        #             bac,     bce,     ecf,     dbe
931        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
932
933        domain = Domain(points, vertices,
934                        geo_reference=Geo_reference(56, x0, y0))
935
936        # Flat surface with 1m of water
937        domain.set_quantity('elevation', 0)
938        domain.set_quantity('stage', 1.0)
939        domain.set_quantity('friction', 0)
940
941        Br = Reflective_boundary(domain)
942        domain.set_boundary({'exterior': Br})
943
944        # Setup only one forcing term, time dependent rainfall
945        # restricted to a polygon enclosing triangle #1 (bce)
946        domain.forcing_terms = []
947        R = Rainfall(domain,
948                     rate=lambda t: 3*t + 7,
949                     polygon=rainfall_poly)
950
951        assert num.allclose(R.exchange_area, 2)
952       
953        domain.forcing_terms.append(R)
954
955        # This will test that time used in the forcing function takes
956        # startime into account.
957        domain.starttime = 5.0
958
959        domain.time = 7.
960
961        domain.compute_forcing_terms()
962
963        assert num.allclose(domain.quantities['stage'].explicit_update[1],
964                            (3*domain.get_time() + 7)/1000)
965        assert num.allclose(domain.quantities['stage'].explicit_update[1],
966                            (3*(domain.time + domain.starttime) + 7)/1000)
967
968        # Using internal time her should fail
969        assert not num.allclose(domain.quantities['stage'].explicit_update[1],
970                                (3*domain.time + 7)/1000)
971
972        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
973        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
974
975    def test_time_dependent_rainfall_restricted_by_polygon_with_default(self):
976        """
977        Test that default rainfall can be used when given rate runs out of data.
978        """
979
980        a = [0.0, 0.0]
981        b = [0.0, 2.0]
982        c = [2.0, 0.0]
983        d = [0.0, 4.0]
984        e = [2.0, 2.0]
985        f = [4.0, 0.0]
986
987        points = [a, b, c, d, e, f]
988        #             bac,     bce,     ecf,     dbe
989        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
990
991        domain = Domain(points, vertices)
992
993        # Flat surface with 1m of water
994        domain.set_quantity('elevation', 0)
995        domain.set_quantity('stage', 1.0)
996        domain.set_quantity('friction', 0)
997
998        Br = Reflective_boundary(domain)
999        domain.set_boundary({'exterior': Br})
1000
1001        # Setup only one forcing term, time dependent rainfall
1002        # that expires at t==20
1003        from anuga.fit_interpolate.interpolate import Modeltime_too_late
1004
1005        def main_rate(t):
1006            if t > 20:
1007                msg = 'Model time exceeded.'
1008                raise Modeltime_too_late, msg
1009            else:
1010                return 3*t + 7
1011
1012        domain.forcing_terms = []
1013        R = Rainfall(domain,
1014                     rate=main_rate,
1015                     polygon = [[1,1], [2,1], [2,2], [1,2]],
1016                     default_rate=5.0)
1017
1018        assert num.allclose(R.exchange_area, 2)
1019       
1020        domain.forcing_terms.append(R)
1021
1022        domain.time = 10.
1023
1024        domain.compute_forcing_terms()
1025
1026        assert num.allclose(domain.quantities['stage'].explicit_update[1],
1027                            (3*domain.time+7)/1000)
1028        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
1029        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
1030
1031        domain.time = 100.
1032        domain.quantities['stage'].explicit_update[:] = 0.0     # Reset
1033        domain.compute_forcing_terms()
1034
1035        assert num.allclose(domain.quantities['stage'].explicit_update[1],
1036                            5.0/1000) # Default value
1037        assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
1038        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
1039
1040    def test_rainfall_forcing_with_evolve(self):
1041        """test_rainfall_forcing_with_evolve
1042
1043        Test how forcing terms are called within evolve
1044        """
1045
1046        # FIXME(Ole): This test is just to experiment
1047
1048        a = [0.0, 0.0]
1049        b = [0.0, 2.0]
1050        c = [2.0, 0.0]
1051        d = [0.0, 4.0]
1052        e = [2.0, 2.0]
1053        f = [4.0, 0.0]
1054
1055        points = [a, b, c, d, e, f]
1056        #             bac,     bce,     ecf,     dbe
1057        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1058
1059        domain = Domain(points, vertices)
1060
1061        # Flat surface with 1m of water
1062        domain.set_quantity('elevation', 0)
1063        domain.set_quantity('stage', 1.0)
1064        domain.set_quantity('friction', 0)
1065
1066        Br = Reflective_boundary(domain)
1067        domain.set_boundary({'exterior': Br})
1068
1069        # Setup only one forcing term, time dependent rainfall
1070        # that expires at t==20
1071        from anuga.fit_interpolate.interpolate import Modeltime_too_late
1072
1073        def main_rate(t):
1074            if t > 20:
1075                msg = 'Model time exceeded.'
1076                raise Modeltime_too_late, msg
1077            else:
1078                return 3*t + 7
1079
1080        domain.forcing_terms = []
1081        R = Rainfall(domain,
1082                     rate=main_rate,
1083                     polygon=[[1,1], [2,1], [2,2], [1,2]],
1084                     default_rate=5.0)
1085
1086        assert num.allclose(R.exchange_area, 2)
1087       
1088        domain.forcing_terms.append(R)
1089
1090        for t in domain.evolve(yieldstep=1, finaltime=25):
1091            pass
1092            #FIXME(Ole):  A test here is hard because explicit_update also
1093            # receives updates from the flux calculation.
1094
1095
1096    def test_rainfall_forcing_with_evolve_1(self):
1097        """test_rainfall_forcing_with_evolve
1098
1099        Test how forcing terms are called within evolve.
1100        This test checks that proper exception is thrown when no default_rate is set
1101        """
1102
1103
1104        a = [0.0, 0.0]
1105        b = [0.0, 2.0]
1106        c = [2.0, 0.0]
1107        d = [0.0, 4.0]
1108        e = [2.0, 2.0]
1109        f = [4.0, 0.0]
1110
1111        points = [a, b, c, d, e, f]
1112        #             bac,     bce,     ecf,     dbe
1113        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1114
1115        domain = Domain(points, vertices)
1116
1117        # Flat surface with 1m of water
1118        domain.set_quantity('elevation', 0)
1119        domain.set_quantity('stage', 1.0)
1120        domain.set_quantity('friction', 0)
1121
1122        Br = Reflective_boundary(domain)
1123        domain.set_boundary({'exterior': Br})
1124
1125        # Setup only one forcing term, time dependent rainfall
1126        # that expires at t==20
1127        from anuga.fit_interpolate.interpolate import Modeltime_too_late
1128
1129        def main_rate(t):
1130            if t > 20:
1131                msg = 'Model time exceeded.'
1132                raise Modeltime_too_late, msg
1133            else:
1134                return 3*t + 7
1135
1136        domain.forcing_terms = []
1137        R = Rainfall(domain,
1138                     rate=main_rate,
1139                     polygon=[[1,1], [2,1], [2,2], [1,2]])
1140
1141
1142        assert num.allclose(R.exchange_area, 2)
1143       
1144        domain.forcing_terms.append(R)
1145        #for t in domain.evolve(yieldstep=1, finaltime=25):
1146        #    pass
1147               
1148        try:
1149            for t in domain.evolve(yieldstep=1, finaltime=25):
1150                pass
1151        except Modeltime_too_late, e:
1152            # Test that error message is as expected
1153            assert 'can specify keyword argument default_rate in the forcing function' in str(e)
1154        else:
1155            raise Exception, 'Should have raised exception'
1156
1157           
1158           
1159    def test_inflow_using_circle(self):
1160        from math import pi, cos, sin
1161
1162        a = [0.0, 0.0]
1163        b = [0.0, 2.0]
1164        c = [2.0, 0.0]
1165        d = [0.0, 4.0]
1166        e = [2.0, 2.0]
1167        f = [4.0, 0.0]
1168
1169        points = [a, b, c, d, e, f]
1170        #             bac,     bce,     ecf,     dbe
1171        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1172
1173        domain = Domain(points, vertices)
1174
1175        # Flat surface with 1m of water
1176        domain.set_quantity('elevation', 0)
1177        domain.set_quantity('stage', 1.0)
1178        domain.set_quantity('friction', 0)
1179
1180        Br = Reflective_boundary(domain)
1181        domain.set_boundary({'exterior': Br})
1182
1183        # Setup only one forcing term, constant inflow of 2 m^3/s
1184        # on a circle affecting triangles #0 and #1 (bac and bce)
1185        domain.forcing_terms = []
1186       
1187        I = Inflow(domain, rate=2.0, center=(1,1), radius=1)
1188        domain.forcing_terms.append(I)       
1189        domain.compute_forcing_terms()
1190
1191       
1192        A = I.exchange_area
1193        assert num.allclose(A, 4) # Two triangles       
1194       
1195        assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A)
1196        assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A)
1197        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)       
1198
1199
1200    def test_inflow_using_circle_function(self):
1201        from math import pi, cos, sin
1202
1203        a = [0.0, 0.0]
1204        b = [0.0, 2.0]
1205        c = [2.0, 0.0]
1206        d = [0.0, 4.0]
1207        e = [2.0, 2.0]
1208        f = [4.0, 0.0]
1209
1210        points = [a, b, c, d, e, f]
1211        #             bac,     bce,     ecf,     dbe
1212        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1213
1214        domain = Domain(points, vertices)
1215
1216        # Flat surface with 1m of water
1217        domain.set_quantity('elevation', 0)
1218        domain.set_quantity('stage', 1.0)
1219        domain.set_quantity('friction', 0)
1220
1221        Br = Reflective_boundary(domain)
1222        domain.set_boundary({'exterior': Br})
1223
1224        # Setup only one forcing term, time dependent inflow of 2 m^3/s
1225        # on a circle affecting triangles #0 and #1 (bac and bce)
1226        domain.forcing_terms = []
1227        I = Inflow(domain, rate=lambda t: 2., center=(1,1), radius=1)
1228        domain.forcing_terms.append(I)
1229       
1230        domain.compute_forcing_terms()       
1231
1232        A = I.exchange_area
1233        assert num.allclose(A, 4) # Two triangles
1234       
1235        assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A)
1236        assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A)
1237        assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)       
1238       
1239
1240
1241
1242    def test_inflow_catch_too_few_triangles(self):
1243        """
1244        Test that exception is thrown if no triangles are covered
1245        by the inflow area
1246        """
1247
1248        from math import pi, cos, sin
1249
1250        a = [0.0, 0.0]
1251        b = [0.0, 2.0]
1252        c = [2.0, 0.0]
1253        d = [0.0, 4.0]
1254        e = [2.0, 2.0]
1255        f = [4.0, 0.0]
1256
1257        points = [a, b, c, d, e, f]
1258        #             bac,     bce,     ecf,     dbe
1259        vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
1260
1261        domain = Domain(points, vertices)
1262
1263        # Flat surface with 1m of water
1264        domain.set_quantity('elevation', 0)
1265        domain.set_quantity('stage', 1.0)
1266        domain.set_quantity('friction', 0)
1267
1268        Br = Reflective_boundary(domain)
1269        domain.set_boundary({'exterior': Br})
1270
1271        # Setup only one forcing term, constant inflow of 2 m^3/s
1272        # on a circle affecting triangles #0 and #1 (bac and bce)
1273        try:
1274            Inflow(domain, rate=2.0, center=(1,1.1), radius=0.01)
1275        except:
1276            pass
1277        else:
1278            msg = 'Should have raised exception'
1279            raise Exception, msg
1280
1281       
1282
1283#################################################################################
1284
1285if __name__ == "__main__":
1286    #suite = unittest.makeSuite(Test_forcing_terms, 'test_volume_conservation_rain')
[7736]1287    #FIXME: James - these tests seem to be invalid. Please investigate
[7726]1288    suite = unittest.makeSuite(Test_forcing_terms, 'test')
1289    runner = unittest.TextTestRunner(verbosity=1)
1290    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.