source: trunk/anuga_core/source/anuga_parallel/test_parallel_sw_flow.py @ 8114

Last change on this file since 8114 was 8114, checked in by jakeman, 13 years ago

jakeman: fixed parallel version so that it can be run using a file_boundary based upon an .sts file. Added the unit test - test_parallel_file_boundary.py

File size: 7.2 KB
Line 
1"""
2Simple water flow example using ANUGA
3
4Water driven up a linear slope and time varying boundary,
5similar to a beach environment
6
7This is a very simple test of the parallel algorithm using the simplified parallel API
8"""
9
10
11#------------------------------------------------------------------------------
12# Import necessary modules
13#------------------------------------------------------------------------------
14import unittest
15import os
16import sys
17#import pypar
18import numpy as num
19
20from anuga import Domain
21from anuga import Reflective_boundary
22from anuga import Dirichlet_boundary
23from anuga import Time_boundary
24from anuga import Transmissive_boundary
25
26from anuga import rectangular_cross_domain
27
28
29from anuga_parallel.interface import distribute, myid, numprocs, send, receive, barrier, finalize
30
31#--------------------------------------------------------------------------
32# Setup parameters
33#--------------------------------------------------------------------------
34yieldstep = 0.25
35finaltime = 6.0
36nprocs = 4
37N = 25
38M = 25
39verbose = True 
40
41#---------------------------------
42# Setup Functions
43#---------------------------------
44def topography(x,y): 
45    return -x/2   
46
47###########################################################################
48# Setup Test
49##########################################################################
50def evolution_test(parallel=False, G = None, seq_interpolation_points=None):
51
52    #--------------------------------------------------------------------------
53    # Setup computational domain and quantities
54    #--------------------------------------------------------------------------
55    domain = rectangular_cross_domain(M, N)
56    domain.set_quantity('elevation', topography) # Use function for elevation
57    domain.set_quantity('friction', 0.0)         # Constant friction
58    domain.set_quantity('stage', expression='elevation') # Dry initial stage
59
60    #--------------------------------------------------------------------------
61    # Create the parallel domain
62    #--------------------------------------------------------------------------
63    if parallel:
64        if myid == 0 and verbose : print 'DISTRIBUTING PARALLEL DOMAIN'
65        domain = distribute(domain, verbose=False)
66
67    #--------------------------------------------------------------------------
68    # Setup domain parameters
69    #--------------------------------------------------------------------------
70    domain.set_name('runup')                    # Set sww filename
71    domain.set_datadir('.')                     # Set output dir
72
73    domain.set_default_order(1)       
74    domain.set_quantities_to_be_stored(None)
75
76
77    #------------------------------------------------------------------------------
78    # Setup boundary conditions
79    # This must currently happen *AFTER* domain has been distributed
80    #------------------------------------------------------------------------------
81
82    Br = Reflective_boundary(domain)      # Solid reflective wall
83    Bd = Dirichlet_boundary([-0.2,0.,0.]) # Constant boundary values
84
85    # Associate boundary tags with boundary objects
86    domain.set_boundary({'left': Br, 'right': Bd, 'top': Br, 'bottom': Br})
87
88    #------------------------------------------------------------------------------
89    # Find which sub_domain in which the interpolation points are located
90    #
91    # Sometimes the interpolation points sit exactly
92    # between two centroids, so in the parallel run we
93    # reset the interpolation points to the centroids
94    # found in the sequential run
95    #------------------------------------------------------------------------------
96    interpolation_points = [[0.4,0.5], [0.6,0.5], [0.8,0.5], [0.9,0.5]]
97
98
99    gauge_values = []
100    tri_ids = []
101    for i, point in enumerate(interpolation_points):
102        gauge_values.append([]) # Empty list for timeseries
103
104        #if is_inside_polygon(point, domain.get_boundary_polygon()):
105        try:
106            k = domain.get_triangle_containing_point(point)
107            if domain.tri_full_flag[k] == 1:
108                tri_ids.append(k)
109            else:
110                tri_ids.append(-1)           
111        except:
112            tri_ids.append(-2)
113
114
115    if verbose: print 'P%d has points = %s' %(myid, tri_ids)
116
117
118    c_coord = domain.get_centroid_coordinates()
119    interpolation_points = []
120    for id in tri_ids:
121        if id<1:
122            print 'ERROR: All interpolation points be within the sequential domain!'
123        interpolation_points.append(c_coord[id,:])
124           
125    #------------------------------------------------------------------------------
126    # Evolve system through time
127    #------------------------------------------------------------------------------
128    time = []
129
130    if parallel:
131        if myid == 0 and verbose: print 'PARALLEL EVOLVE'
132    else:
133        if myid == 0 and verbose: print 'SEQUENTIAL EVOLVE'
134   
135
136    for t in domain.evolve(yieldstep = yieldstep, finaltime = finaltime):
137        if myid == 0 and verbose : domain.write_time()
138
139        # Record time series at known points
140        time.append(domain.get_time())
141
142        stage = domain.get_quantity('stage')
143
144        for i in range(4):
145            if tri_ids[i] > -1:
146                gauge_values[i].append(stage.centroid_values[tri_ids[i]])
147
148
149    #----------------------------------------
150    # Setup test arrays during sequential run
151    #----------------------------------------
152    if not parallel:
153        G = []
154        for i in range(4):
155            G.append(gauge_values[i])
156
157    success = True
158
159    for i in range(4):
160        if tri_ids[i] > -1:
161            success = success and num.allclose(gauge_values[i], G[i])
162
163    assert_(success)
164   
165    return G, interpolation_points
166
167# Test an nprocs-way run of the shallow water equations
168# against the sequential code.
169
170class Test_parallel_sw_flow(unittest.TestCase):
171    def test_parallel_sw_flow(self):
172        print "Expect this test to fail if not run from the parallel directory."
173        result = os.system("mpirun -np %d python test_parallel_sw_flow.py" % nprocs)
174        assert_(result == 0)
175
176# Because we are doing assertions outside of the TestCase class
177# the PyUnit defined assert_ function can't be used.
178def assert_(condition, msg="Assertion Failed"):
179    if condition == False:
180        #pypar.finalize()
181        raise AssertionError, msg
182
183if __name__=="__main__":
184    if numprocs == 1: 
185        runner = unittest.TextTestRunner()
186        suite = unittest.makeSuite(Test_parallel_sw_flow, 'test')
187        runner.run(suite)
188    else:
189
190        #------------------------------------------
191        # Run the sequential code on each processor
192        # and save results at 4 gauge stations to
193        # array G
194        #------------------------------------------
195        barrier()
196        if myid == 0 and verbose: print 'SEQUENTIAL START'
197
198        G , interpolation_points = evolution_test(parallel=False)
199        G = num.array(G,num.float)
200
201        barrier()
202       
203        #------------------------------------------
204        # Run the code code and compare sequential
205        # results at 4 gauge stations
206        #------------------------------------------
207        if myid ==0 and verbose: print 'PARALLEL START'
208
209        evolution_test(parallel=True, G=G, seq_interpolation_points = interpolation_points)
210       
211        finalize()
212
213
Note: See TracBrowser for help on using the repository browser.