Ignore:
Timestamp:
May 19, 2010, 10:13:06 AM (14 years ago)
Author:
hudson
Message:

Fixed unit test failures.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • anuga_core/source/anuga/shallow_water/test_shallow_water_domain.py

    r7731 r7733  
    55from math import pi, sqrt
    66import tempfile
     7
     8from anuga.shallow_water import Domain
    79
    810from anuga.config import g, epsilon
     
    1416from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
    1517from anuga.abstract_2d_finite_volumes.quantity import Quantity
     18from anuga.shallow_water.forcing import Inflow, Cross_section
     19from anuga.geospatial_data.geospatial_data import ensure_geospatial
    1620
    1721from anuga.utilities.system_tools import get_pathname_from_package
    1822
    19 from anuga.shallow_water import Domain
    2023from anuga.abstract_2d_finite_volumes.generic_boundary_conditions \
    2124        import Dirichlet_boundary
    22 from anuga.shallow_water.shallow_water_domain import Rainfall, Wind_stress
    23 from anuga.shallow_water.shallow_water_domain import Inflow, Cross_section
     25from anuga.shallow_water.forcing import Rainfall, Wind_stress
     26from anuga.shallow_water.forcing import Inflow, Cross_section
    2427from anuga.shallow_water.data_manager import get_flow_through_cross_section
    2528
     
    171174
    172175
    173 # Variable windfield implemented using functions
    174 def speed(t, x, y):
    175     """Large speeds halfway between center and edges
    176 
    177     Low speeds at center and edges
    178     """
    179 
    180     from math import exp, cos, pi
    181 
    182     x = num.array(x)
    183     y = num.array(y)
    184 
    185     N = len(x)
    186     s = 0*x  #New array
    187 
    188     for k in range(N):
    189         r = num.sqrt(x[k]**2 + y[k]**2)
    190         factor = exp(-(r-0.15)**2)
    191         s[k] = 4000 * factor * (cos(t*2*pi/150) + 2)
    192 
    193     return s
    194176
    195177def scalar_func(t, x, y):
     
    201183    return 17.7
    202184
    203 def scalar_func_list(t, x, y):
    204     """Function that returns a scalar.
    205 
    206     Used to test error message when numeric array is expected
    207     """
    208 
    209     return [17.7]
    210 
    211 def angle(t, x, y):
    212     """Rotating field
    213     """
    214     from math import atan, pi
    215 
    216     x = num.array(x)
    217     y = num.array(y)
    218 
    219     N = len(x)
    220     a = 0 * x    # New array
    221 
    222     for k in range(N):
    223         r = num.sqrt(x[k]**2 + y[k]**2)
    224 
    225         angle = atan(y[k]/x[k])
    226 
    227         if x[k] < 0:
    228             angle += pi
    229 
    230         # Take normal direction
    231         angle -= pi/2
    232 
    233         # Ensure positive radians
    234         if angle < 0:
    235             angle += 2*pi
    236 
    237         a[k] = angle/pi*180
    238 
    239     return a
    240185
    241186
     
    22352180                            4*S)
    22362181       
    2237     def test_constant_wind_stress(self):
    2238         from anuga.config import rho_a, rho_w, eta_w
    2239         from math import pi, cos, sin
    2240 
    2241         a = [0.0, 0.0]
    2242         b = [0.0, 2.0]
    2243         c = [2.0, 0.0]
    2244         d = [0.0, 4.0]
    2245         e = [2.0, 2.0]
    2246         f = [4.0, 0.0]
    2247 
    2248         points = [a, b, c, d, e, f]
    2249         #             bac,     bce,     ecf,     dbe
    2250         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2251 
    2252         domain = Domain(points, vertices)
    2253 
    2254         #Flat surface with 1m of water
    2255         domain.set_quantity('elevation', 0)
    2256         domain.set_quantity('stage', 1.0)
    2257         domain.set_quantity('friction', 0)
    2258 
    2259         Br = Reflective_boundary(domain)
    2260         domain.set_boundary({'exterior': Br})
    2261 
    2262         #Setup only one forcing term, constant wind stress
    2263         s = 100
    2264         phi = 135
    2265         domain.forcing_terms = []
    2266         domain.forcing_terms.append(Wind_stress(s, phi))
    2267 
    2268         domain.compute_forcing_terms()
    2269 
    2270         const = eta_w*rho_a / rho_w
    2271 
    2272         #Convert to radians
    2273         phi = phi*pi / 180
    2274 
    2275         #Compute velocity vector (u, v)
    2276         u = s*cos(phi)
    2277         v = s*sin(phi)
    2278 
    2279         #Compute wind stress
    2280         S = const * num.sqrt(u**2 + v**2)
    2281 
    2282         assert num.allclose(domain.quantities['stage'].explicit_update, 0)
    2283         assert num.allclose(domain.quantities['xmomentum'].explicit_update, S*u)
    2284         assert num.allclose(domain.quantities['ymomentum'].explicit_update, S*v)
    2285 
    2286     def test_variable_wind_stress(self):
    2287         from anuga.config import rho_a, rho_w, eta_w
    2288         from math import pi, cos, sin
    2289 
    2290         a = [0.0, 0.0]
    2291         b = [0.0, 2.0]
    2292         c = [2.0, 0.0]
    2293         d = [0.0, 4.0]
    2294         e = [2.0, 2.0]
    2295         f = [4.0, 0.0]
    2296 
    2297         points = [a, b, c, d, e, f]
    2298         #             bac,     bce,     ecf,     dbe
    2299         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2300 
    2301         domain = Domain(points, vertices)
    2302 
    2303         #Flat surface with 1m of water
    2304         domain.set_quantity('elevation', 0)
    2305         domain.set_quantity('stage', 1.0)
    2306         domain.set_quantity('friction', 0)
    2307 
    2308         Br = Reflective_boundary(domain)
    2309         domain.set_boundary({'exterior': Br})
    2310 
    2311         domain.time = 5.54    # Take a random time (not zero)
    2312 
    2313         #Setup only one forcing term, constant wind stress
    2314         s = 100
    2315         phi = 135
    2316         domain.forcing_terms = []
    2317         domain.forcing_terms.append(Wind_stress(s=speed, phi=angle))
    2318 
    2319         domain.compute_forcing_terms()
    2320 
    2321         #Compute reference solution
    2322         const = eta_w*rho_a / rho_w
    2323 
    2324         N = len(domain)    # number_of_triangles
    2325 
    2326         xc = domain.get_centroid_coordinates()
    2327         t = domain.time
    2328 
    2329         x = xc[:,0]
    2330         y = xc[:,1]
    2331         s_vec = speed(t,x,y)
    2332         phi_vec = angle(t,x,y)
    2333 
    2334         for k in range(N):
    2335             # Convert to radians
    2336             phi = phi_vec[k]*pi / 180
    2337             s = s_vec[k]
    2338 
    2339             # Compute velocity vector (u, v)
    2340             u = s*cos(phi)
    2341             v = s*sin(phi)
    2342 
    2343             # Compute wind stress
    2344             S = const * num.sqrt(u**2 + v**2)
    2345 
    2346             assert num.allclose(domain.quantities['stage'].explicit_update[k],
    2347                                 0)
    2348             assert num.allclose(domain.quantities['xmomentum'].\
    2349                                      explicit_update[k],
    2350                                 S*u)
    2351             assert num.allclose(domain.quantities['ymomentum'].\
    2352                                      explicit_update[k],
    2353                                 S*v)
    2354 
    2355     def test_windfield_from_file(self):
    2356         import time
    2357         from anuga.config import rho_a, rho_w, eta_w
    2358         from math import pi, cos, sin
    2359         from anuga.config import time_format
    2360         from anuga.abstract_2d_finite_volumes.util import file_function
    2361 
    2362         a = [0.0, 0.0]
    2363         b = [0.0, 2.0]
    2364         c = [2.0, 0.0]
    2365         d = [0.0, 4.0]
    2366         e = [2.0, 2.0]
    2367         f = [4.0, 0.0]
    2368 
    2369         points = [a, b, c, d, e, f]
    2370         #             bac,     bce,     ecf,     dbe
    2371         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2372 
    2373         domain = Domain(points, vertices)
    2374 
    2375         # Flat surface with 1m of water
    2376         domain.set_quantity('elevation', 0)
    2377         domain.set_quantity('stage', 1.0)
    2378         domain.set_quantity('friction', 0)
    2379 
    2380         Br = Reflective_boundary(domain)
    2381         domain.set_boundary({'exterior': Br})
    2382 
    2383         domain.time = 7    # Take a time that is represented in file (not zero)
    2384 
    2385         # Write wind stress file (ensure that domain.time is covered)
    2386         # Take x=1 and y=0
    2387         filename = 'test_windstress_from_file'
    2388         start = time.mktime(time.strptime('2000', '%Y'))
    2389         fid = open(filename + '.txt', 'w')
    2390         dt = 1    # One second interval
    2391         t = 0.0
    2392         while t <= 10.0:
    2393             t_string = time.strftime(time_format, time.gmtime(t+start))
    2394 
    2395             fid.write('%s, %f %f\n' %
    2396                       (t_string, speed(t,[1],[0])[0], angle(t,[1],[0])[0]))
    2397             t += dt
    2398 
    2399         fid.close()
    2400 
    2401         # Convert ASCII file to NetCDF (Which is what we really like!)
    2402         from data_manager import timefile2netcdf
    2403 
    2404         timefile2netcdf(filename)
    2405         os.remove(filename + '.txt')
    2406 
    2407         # Setup wind stress
    2408         F = file_function(filename + '.tms',
    2409                           quantities=['Attribute0', 'Attribute1'])
    2410         os.remove(filename + '.tms')
    2411 
    2412         W = Wind_stress(F)
    2413 
    2414         domain.forcing_terms = []
    2415         domain.forcing_terms.append(W)
    2416 
    2417         domain.compute_forcing_terms()
    2418 
    2419         # Compute reference solution
    2420         const = eta_w*rho_a / rho_w
    2421 
    2422         N = len(domain)    # number_of_triangles
    2423 
    2424         t = domain.time
    2425 
    2426         s = speed(t, [1], [0])[0]
    2427         phi = angle(t, [1], [0])[0]
    2428 
    2429         # Convert to radians
    2430         phi = phi*pi / 180
    2431 
    2432         # Compute velocity vector (u, v)
    2433         u = s*cos(phi)
    2434         v = s*sin(phi)
    2435 
    2436         # Compute wind stress
    2437         S = const * num.sqrt(u**2 + v**2)
    2438 
    2439         for k in range(N):
    2440             assert num.allclose(domain.quantities['stage'].explicit_update[k],
    2441                                 0)
    2442             assert num.allclose(domain.quantities['xmomentum'].\
    2443                                     explicit_update[k],
    2444                                 S*u)
    2445             assert num.allclose(domain.quantities['ymomentum'].\
    2446                                     explicit_update[k],
    2447                                 S*v)
    2448 
    2449     def test_windfield_from_file_seconds(self):
    2450         import time
    2451         from anuga.config import rho_a, rho_w, eta_w
    2452         from math import pi, cos, sin
    2453         from anuga.config import time_format
    2454         from anuga.abstract_2d_finite_volumes.util import file_function
    2455 
    2456         a = [0.0, 0.0]
    2457         b = [0.0, 2.0]
    2458         c = [2.0, 0.0]
    2459         d = [0.0, 4.0]
    2460         e = [2.0, 2.0]
    2461         f = [4.0, 0.0]
    2462 
    2463         points = [a, b, c, d, e, f]
    2464         #             bac,     bce,     ecf,     dbe
    2465         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2466 
    2467         domain = Domain(points, vertices)
    2468 
    2469         # Flat surface with 1m of water
    2470         domain.set_quantity('elevation', 0)
    2471         domain.set_quantity('stage', 1.0)
    2472         domain.set_quantity('friction', 0)
    2473 
    2474         Br = Reflective_boundary(domain)
    2475         domain.set_boundary({'exterior': Br})
    2476 
    2477         domain.time = 7    # Take a time that is represented in file (not zero)
    2478 
    2479         # Write wind stress file (ensure that domain.time is covered)
    2480         # Take x=1 and y=0
    2481         filename = 'test_windstress_from_file'
    2482         start = time.mktime(time.strptime('2000', '%Y'))
    2483         fid = open(filename + '.txt', 'w')
    2484         dt = 0.5    # Half second interval
    2485         t = 0.0
    2486         while t <= 10.0:
    2487             fid.write('%s, %f %f\n'
    2488                       % (str(t), speed(t, [1], [0])[0], angle(t, [1], [0])[0]))
    2489             t += dt
    2490 
    2491         fid.close()
    2492 
    2493         # Convert ASCII file to NetCDF (Which is what we really like!)
    2494         from data_manager import timefile2netcdf
    2495 
    2496         timefile2netcdf(filename, time_as_seconds=True)
    2497         os.remove(filename + '.txt')
    2498 
    2499         # Setup wind stress
    2500         F = file_function(filename + '.tms',
    2501                           quantities=['Attribute0', 'Attribute1'])
    2502         os.remove(filename + '.tms')
    2503 
    2504         W = Wind_stress(F)
    2505 
    2506         domain.forcing_terms = []
    2507         domain.forcing_terms.append(W)
    2508 
    2509         domain.compute_forcing_terms()
    2510 
    2511         # Compute reference solution
    2512         const = eta_w*rho_a / rho_w
    2513 
    2514         N = len(domain)    # number_of_triangles
    2515 
    2516         t = domain.time
    2517 
    2518         s = speed(t, [1], [0])[0]
    2519         phi = angle(t, [1], [0])[0]
    2520 
    2521         # Convert to radians
    2522         phi = phi*pi / 180
    2523 
    2524         # Compute velocity vector (u, v)
    2525         u = s*cos(phi)
    2526         v = s*sin(phi)
    2527 
    2528         # Compute wind stress
    2529         S = const * num.sqrt(u**2 + v**2)
    2530 
    2531         for k in range(N):
    2532             assert num.allclose(domain.quantities['stage'].explicit_update[k],
    2533                                 0)
    2534             assert num.allclose(domain.quantities['xmomentum'].\
    2535                                     explicit_update[k],
    2536                                 S*u)
    2537             assert num.allclose(domain.quantities['ymomentum'].\
    2538                                     explicit_update[k],
    2539                                 S*v)
    2540 
    2541     def test_wind_stress_error_condition(self):
    2542         """Test that windstress reacts properly when forcing functions
    2543         are wrong - e.g. returns a scalar
    2544         """
    2545 
    2546         from math import pi, cos, sin
    2547         from anuga.config import rho_a, rho_w, eta_w
    2548 
    2549         a = [0.0, 0.0]
    2550         b = [0.0, 2.0]
    2551         c = [2.0, 0.0]
    2552         d = [0.0, 4.0]
    2553         e = [2.0, 2.0]
    2554         f = [4.0, 0.0]
    2555 
    2556         points = [a, b, c, d, e, f]
    2557         #             bac,     bce,     ecf,     dbe
    2558         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2559 
    2560         domain = Domain(points, vertices)
    2561 
    2562         # Flat surface with 1m of water
    2563         domain.set_quantity('elevation', 0)
    2564         domain.set_quantity('stage', 1.0)
    2565         domain.set_quantity('friction', 0)
    2566 
    2567         Br = Reflective_boundary(domain)
    2568         domain.set_boundary({'exterior': Br})
    2569 
    2570         domain.time = 5.54    # Take a random time (not zero)
    2571 
    2572         # Setup only one forcing term, bad func
    2573         domain.forcing_terms = []
    2574 
    2575         try:
    2576             domain.forcing_terms.append(Wind_stress(s=scalar_func_list,
    2577                                                     phi=angle))
    2578         except AssertionError:
    2579             pass
    2580         else:
    2581             msg = 'Should have raised exception'
    2582             raise Exception, msg
    2583 
    2584         try:
    2585             domain.forcing_terms.append(Wind_stress(s=speed, phi=scalar_func))
    2586         except Exception:
    2587             pass
    2588         else:
    2589             msg = 'Should have raised exception'
    2590             raise Exception, msg
    2591 
    2592         try:
    2593             domain.forcing_terms.append(Wind_stress(s=speed, phi='xx'))
    2594         except:
    2595             pass
    2596         else:
    2597             msg = 'Should have raised exception'
    2598             raise Exception, msg
    2599 
    2600     def test_rainfall(self):
    2601         from math import pi, cos, sin
    2602 
    2603         a = [0.0, 0.0]
    2604         b = [0.0, 2.0]
    2605         c = [2.0, 0.0]
    2606         d = [0.0, 4.0]
    2607         e = [2.0, 2.0]
    2608         f = [4.0, 0.0]
    2609 
    2610         points = [a, b, c, d, e, f]
    2611         #             bac,     bce,     ecf,     dbe
    2612         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2613 
    2614         domain = Domain(points, vertices)
    2615 
    2616         # Flat surface with 1m of water
    2617         domain.set_quantity('elevation', 0)
    2618         domain.set_quantity('stage', 1.0)
    2619         domain.set_quantity('friction', 0)
    2620 
    2621         Br = Reflective_boundary(domain)
    2622         domain.set_boundary({'exterior': Br})
    2623 
    2624         # Setup only one forcing term, constant rainfall
    2625         domain.forcing_terms = []
    2626         domain.forcing_terms.append(Rainfall(domain, rate=2.0))
    2627 
    2628         domain.compute_forcing_terms()
    2629         assert num.allclose(domain.quantities['stage'].explicit_update,
    2630                             2.0/1000)
    2631 
    2632     def test_rainfall_restricted_by_polygon(self):
    2633         from math import pi, cos, sin
    2634 
    2635         a = [0.0, 0.0]
    2636         b = [0.0, 2.0]
    2637         c = [2.0, 0.0]
    2638         d = [0.0, 4.0]
    2639         e = [2.0, 2.0]
    2640         f = [4.0, 0.0]
    2641 
    2642         points = [a, b, c, d, e, f]
    2643         #             bac,     bce,     ecf,     dbe
    2644         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2645 
    2646         domain = Domain(points, vertices)
    2647 
    2648         # Flat surface with 1m of water
    2649         domain.set_quantity('elevation', 0)
    2650         domain.set_quantity('stage', 1.0)
    2651         domain.set_quantity('friction', 0)
    2652 
    2653         Br = Reflective_boundary(domain)
    2654         domain.set_boundary({'exterior': Br})
    2655 
    2656         # Setup only one forcing term, constant rainfall
    2657         # restricted to a polygon enclosing triangle #1 (bce)
    2658         domain.forcing_terms = []
    2659         R = Rainfall(domain, rate=2.0, polygon=[[1,1], [2,1], [2,2], [1,2]])
    2660 
    2661         assert num.allclose(R.exchange_area, 2)
    2662 
    2663         domain.forcing_terms.append(R)
    2664 
    2665         domain.compute_forcing_terms()
    2666 
    2667         assert num.allclose(domain.quantities['stage'].explicit_update[1],
    2668                             2.0/1000)
    2669         assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
    2670         assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
    2671 
    2672     def test_time_dependent_rainfall_restricted_by_polygon(self):
    2673         a = [0.0, 0.0]
    2674         b = [0.0, 2.0]
    2675         c = [2.0, 0.0]
    2676         d = [0.0, 4.0]
    2677         e = [2.0, 2.0]
    2678         f = [4.0, 0.0]
    2679 
    2680         points = [a, b, c, d, e, f]
    2681         #             bac,     bce,     ecf,     dbe
    2682         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2683 
    2684         domain = Domain(points, vertices)
    2685 
    2686         # Flat surface with 1m of water
    2687         domain.set_quantity('elevation', 0)
    2688         domain.set_quantity('stage', 1.0)
    2689         domain.set_quantity('friction', 0)
    2690 
    2691         Br = Reflective_boundary(domain)
    2692         domain.set_boundary({'exterior': Br})
    2693 
    2694         # Setup only one forcing term, time dependent rainfall
    2695         # restricted to a polygon enclosing triangle #1 (bce)
    2696         domain.forcing_terms = []
    2697         R = Rainfall(domain,
    2698                      rate=lambda t: 3*t + 7,
    2699                      polygon = [[1,1], [2,1], [2,2], [1,2]])
    2700 
    2701         assert num.allclose(R.exchange_area, 2)
    2702        
    2703         domain.forcing_terms.append(R)
    2704 
    2705         domain.time = 10.
    2706 
    2707         domain.compute_forcing_terms()
    2708 
    2709         assert num.allclose(domain.quantities['stage'].explicit_update[1],
    2710                             (3*domain.time + 7)/1000)
    2711         assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
    2712         assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
    2713 
    2714     def test_time_dependent_rainfall_using_starttime(self):
    2715         rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
    2716 
    2717         a = [0.0, 0.0]
    2718         b = [0.0, 2.0]
    2719         c = [2.0, 0.0]
    2720         d = [0.0, 4.0]
    2721         e = [2.0, 2.0]
    2722         f = [4.0, 0.0]
    2723 
    2724         points = [a, b, c, d, e, f]
    2725         #             bac,     bce,     ecf,     dbe
    2726         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2727 
    2728         domain = Domain(points, vertices)
    2729 
    2730         # Flat surface with 1m of water
    2731         domain.set_quantity('elevation', 0)
    2732         domain.set_quantity('stage', 1.0)
    2733         domain.set_quantity('friction', 0)
    2734 
    2735         Br = Reflective_boundary(domain)
    2736         domain.set_boundary({'exterior': Br})
    2737 
    2738         # Setup only one forcing term, time dependent rainfall
    2739         # restricted to a polygon enclosing triangle #1 (bce)
    2740         domain.forcing_terms = []
    2741         R = Rainfall(domain,
    2742                      rate=lambda t: 3*t + 7,
    2743                      polygon=rainfall_poly)                     
    2744 
    2745         assert num.allclose(R.exchange_area, 2)
    2746        
    2747         domain.forcing_terms.append(R)
    2748 
    2749         # This will test that time used in the forcing function takes
    2750         # startime into account.
    2751         domain.starttime = 5.0
    2752 
    2753         domain.time = 7.
    2754 
    2755         domain.compute_forcing_terms()
    2756 
    2757         assert num.allclose(domain.quantities['stage'].explicit_update[1],
    2758                             (3*domain.get_time() + 7)/1000)
    2759         assert num.allclose(domain.quantities['stage'].explicit_update[1],
    2760                             (3*(domain.time + domain.starttime) + 7)/1000)
    2761 
    2762         # Using internal time her should fail
    2763         assert not num.allclose(domain.quantities['stage'].explicit_update[1],
    2764                                 (3*domain.time + 7)/1000)
    2765 
    2766         assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
    2767         assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
    2768 
    2769     def test_time_dependent_rainfall_using_georef(self):
    2770         """test_time_dependent_rainfall_using_georef
    2771 
    2772         This will also test the General forcing term using georef
    2773         """
    2774 
    2775         # Mesh in zone 56 (absolute coords)
    2776         x0 = 314036.58727982
    2777         y0 = 6224951.2960092
    2778 
    2779         rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
    2780         rainfall_poly += [x0, y0]
    2781 
    2782         a = [0.0, 0.0]
    2783         b = [0.0, 2.0]
    2784         c = [2.0, 0.0]
    2785         d = [0.0, 4.0]
    2786         e = [2.0, 2.0]
    2787         f = [4.0, 0.0]
    2788 
    2789         points = [a, b, c, d, e, f]
    2790         #             bac,     bce,     ecf,     dbe
    2791         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2792 
    2793         domain = Domain(points, vertices,
    2794                         geo_reference=Geo_reference(56, x0, y0))
    2795 
    2796         # Flat surface with 1m of water
    2797         domain.set_quantity('elevation', 0)
    2798         domain.set_quantity('stage', 1.0)
    2799         domain.set_quantity('friction', 0)
    2800 
    2801         Br = Reflective_boundary(domain)
    2802         domain.set_boundary({'exterior': Br})
    2803 
    2804         # Setup only one forcing term, time dependent rainfall
    2805         # restricted to a polygon enclosing triangle #1 (bce)
    2806         domain.forcing_terms = []
    2807         R = Rainfall(domain,
    2808                      rate=lambda t: 3*t + 7,
    2809                      polygon=rainfall_poly)
    2810 
    2811         assert num.allclose(R.exchange_area, 2)
    2812        
    2813         domain.forcing_terms.append(R)
    2814 
    2815         # This will test that time used in the forcing function takes
    2816         # startime into account.
    2817         domain.starttime = 5.0
    2818 
    2819         domain.time = 7.
    2820 
    2821         domain.compute_forcing_terms()
    2822 
    2823         assert num.allclose(domain.quantities['stage'].explicit_update[1],
    2824                             (3*domain.get_time() + 7)/1000)
    2825         assert num.allclose(domain.quantities['stage'].explicit_update[1],
    2826                             (3*(domain.time + domain.starttime) + 7)/1000)
    2827 
    2828         # Using internal time her should fail
    2829         assert not num.allclose(domain.quantities['stage'].explicit_update[1],
    2830                                 (3*domain.time + 7)/1000)
    2831 
    2832         assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
    2833         assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
    2834 
    2835     def test_time_dependent_rainfall_restricted_by_polygon_with_default(self):
    2836         """
    2837         Test that default rainfall can be used when given rate runs out of data.
    2838         """
    2839 
    2840         a = [0.0, 0.0]
    2841         b = [0.0, 2.0]
    2842         c = [2.0, 0.0]
    2843         d = [0.0, 4.0]
    2844         e = [2.0, 2.0]
    2845         f = [4.0, 0.0]
    2846 
    2847         points = [a, b, c, d, e, f]
    2848         #             bac,     bce,     ecf,     dbe
    2849         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2850 
    2851         domain = Domain(points, vertices)
    2852 
    2853         # Flat surface with 1m of water
    2854         domain.set_quantity('elevation', 0)
    2855         domain.set_quantity('stage', 1.0)
    2856         domain.set_quantity('friction', 0)
    2857 
    2858         Br = Reflective_boundary(domain)
    2859         domain.set_boundary({'exterior': Br})
    2860 
    2861         # Setup only one forcing term, time dependent rainfall
    2862         # that expires at t==20
    2863         from anuga.fit_interpolate.interpolate import Modeltime_too_late
    2864 
    2865         def main_rate(t):
    2866             if t > 20:
    2867                 msg = 'Model time exceeded.'
    2868                 raise Modeltime_too_late, msg
    2869             else:
    2870                 return 3*t + 7
    2871 
    2872         domain.forcing_terms = []
    2873         R = Rainfall(domain,
    2874                      rate=main_rate,
    2875                      polygon = [[1,1], [2,1], [2,2], [1,2]],
    2876                      default_rate=5.0)
    2877 
    2878         assert num.allclose(R.exchange_area, 2)
    2879        
    2880         domain.forcing_terms.append(R)
    2881 
    2882         domain.time = 10.
    2883 
    2884         domain.compute_forcing_terms()
    2885 
    2886         assert num.allclose(domain.quantities['stage'].explicit_update[1],
    2887                             (3*domain.time+7)/1000)
    2888         assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
    2889         assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
    2890 
    2891         domain.time = 100.
    2892         domain.quantities['stage'].explicit_update[:] = 0.0     # Reset
    2893         domain.compute_forcing_terms()
    2894 
    2895         assert num.allclose(domain.quantities['stage'].explicit_update[1],
    2896                             5.0/1000) # Default value
    2897         assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
    2898         assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
    2899 
    2900     def test_rainfall_forcing_with_evolve(self):
    2901         """test_rainfall_forcing_with_evolve
    2902 
    2903         Test how forcing terms are called within evolve
    2904         """
    2905 
    2906         # FIXME(Ole): This test is just to experiment
    2907 
    2908         a = [0.0, 0.0]
    2909         b = [0.0, 2.0]
    2910         c = [2.0, 0.0]
    2911         d = [0.0, 4.0]
    2912         e = [2.0, 2.0]
    2913         f = [4.0, 0.0]
    2914 
    2915         points = [a, b, c, d, e, f]
    2916         #             bac,     bce,     ecf,     dbe
    2917         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2918 
    2919         domain = Domain(points, vertices)
    2920 
    2921         # Flat surface with 1m of water
    2922         domain.set_quantity('elevation', 0)
    2923         domain.set_quantity('stage', 1.0)
    2924         domain.set_quantity('friction', 0)
    2925 
    2926         Br = Reflective_boundary(domain)
    2927         domain.set_boundary({'exterior': Br})
    2928 
    2929         # Setup only one forcing term, time dependent rainfall
    2930         # that expires at t==20
    2931         from anuga.fit_interpolate.interpolate import Modeltime_too_late
    2932 
    2933         def main_rate(t):
    2934             if t > 20:
    2935                 msg = 'Model time exceeded.'
    2936                 raise Modeltime_too_late, msg
    2937             else:
    2938                 return 3*t + 7
    2939 
    2940         domain.forcing_terms = []
    2941         R = Rainfall(domain,
    2942                      rate=main_rate,
    2943                      polygon=[[1,1], [2,1], [2,2], [1,2]],
    2944                      default_rate=5.0)
    2945 
    2946         assert num.allclose(R.exchange_area, 2)
    2947        
    2948         domain.forcing_terms.append(R)
    2949 
    2950         for t in domain.evolve(yieldstep=1, finaltime=25):
    2951             pass
    2952             #FIXME(Ole):  A test here is hard because explicit_update also
    2953             # receives updates from the flux calculation.
    2954 
    2955 
    2956     def test_rainfall_forcing_with_evolve_1(self):
    2957         """test_rainfall_forcing_with_evolve
    2958 
    2959         Test how forcing terms are called within evolve.
    2960         This test checks that proper exception is thrown when no default_rate is set
    2961         """
    2962 
    2963 
    2964         a = [0.0, 0.0]
    2965         b = [0.0, 2.0]
    2966         c = [2.0, 0.0]
    2967         d = [0.0, 4.0]
    2968         e = [2.0, 2.0]
    2969         f = [4.0, 0.0]
    2970 
    2971         points = [a, b, c, d, e, f]
    2972         #             bac,     bce,     ecf,     dbe
    2973         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2974 
    2975         domain = Domain(points, vertices)
    2976 
    2977         # Flat surface with 1m of water
    2978         domain.set_quantity('elevation', 0)
    2979         domain.set_quantity('stage', 1.0)
    2980         domain.set_quantity('friction', 0)
    2981 
    2982         Br = Reflective_boundary(domain)
    2983         domain.set_boundary({'exterior': Br})
    2984 
    2985         # Setup only one forcing term, time dependent rainfall
    2986         # that expires at t==20
    2987         from anuga.fit_interpolate.interpolate import Modeltime_too_late
    2988 
    2989         def main_rate(t):
    2990             if t > 20:
    2991                 msg = 'Model time exceeded.'
    2992                 raise Modeltime_too_late, msg
    2993             else:
    2994                 return 3*t + 7
    2995 
    2996         domain.forcing_terms = []
    2997         R = Rainfall(domain,
    2998                      rate=main_rate,
    2999                      polygon=[[1,1], [2,1], [2,2], [1,2]])
    3000 
    3001 
    3002         assert num.allclose(R.exchange_area, 2)
    3003        
    3004         domain.forcing_terms.append(R)
    3005         #for t in domain.evolve(yieldstep=1, finaltime=25):
    3006         #    pass
    3007                
    3008         try:
    3009             for t in domain.evolve(yieldstep=1, finaltime=25):
    3010                 pass
    3011         except Modeltime_too_late, e:
    3012             # Test that error message is as expected
    3013             assert 'can specify keyword argument default_rate in the forcing function' in str(e)
    3014         else:
    3015             raise Exception, 'Should have raised exception'
    3016 
    3017            
     2182
    30182183           
    30192184    def test_inflow_using_circle(self):
     
    66165781                import rectangular_cross
    66175782        from anuga.shallow_water import Domain
    6618         from anuga.shallow_water.shallow_water_domain import Inflow
     5783        from anuga.shallow_water.forcing import Inflow
    66195784        from anuga.shallow_water.data_manager \
    66205785                import get_flow_through_cross_section
     
    67075872        from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
    67085873        from anuga.shallow_water import Domain
    6709         from anuga.shallow_water.shallow_water_domain import Inflow
    67105874        from anuga.shallow_water.data_manager import get_flow_through_cross_section
    67115875
     
    72766440        #----------------------------------------------------------------------
    72776441        from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
    7278         from anuga.shallow_water import Domain
    7279         from anuga.shallow_water.shallow_water_domain import Reflective_boundary
    7280         from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
    7281         from anuga.shallow_water.shallow_water_domain import Inflow_boundary
    72826442        from anuga.shallow_water.data_manager import get_flow_through_cross_section
    72836443        from anuga.abstract_2d_finite_volumes.util import sww2csv_gauges, csv2timeseries_graphs
     
    74166576        from anuga.shallow_water.shallow_water_domain import Reflective_boundary
    74176577        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
    7418         from anuga.shallow_water.shallow_water_domain import Inflow
     6578        from anuga.shallow_water.forcing import Inflow
    74196579        from anuga.shallow_water.data_manager \
    74206580                import get_flow_through_cross_section
Note: See TracChangeset for help on using the changeset viewer.