source: anuga_core/source/anuga_parallel/test_parallel_sw_flow.py @ 7452

Last change on this file since 7452 was 7452, checked in by steve, 15 years ago

Committing latest parallel code

File size: 7.3 KB
Line 
1
2"""Simple water flow example using ANUGA
3
4Water driven up a linear slope and time varying boundary,
5similar to a beach environment
6
7This is a very simple test of the parallel algorithm using the simplified parallel API
8"""
9
10
11#------------------------------------------------------------------------------
12# Import necessary modules
13#------------------------------------------------------------------------------
14import unittest
15import os
16import sys
17#import pypar
18import numpy as num
19
20from anuga.interface import Domain
21from anuga.interface import Reflective_boundary
22from anuga.interface import Dirichlet_boundary
23from anuga.interface import Time_boundary
24from anuga.interface import Transmissive_boundary
25
26from anuga.interface import rectangular_cross_domain
27
28
29from anuga_parallel.interface import distribute, myid, numprocs, send, receive, barrier
30
31#--------------------------------------------------------------------------
32# Setup parameters
33#--------------------------------------------------------------------------
34yieldstep = 0.25
35finaltime = 6.0
36nprocs = 4
37N = 25
38M = 25
39verbose =  True
40
41#---------------------------------
42# Setup Functions
43#---------------------------------
44def topography(x,y): 
45    return -x/2   
46
47###########################################################################
48# Setup Test
49##########################################################################
50def evolution_test(parallel=False, G = None, seq_interpolation_points=None):
51
52    #--------------------------------------------------------------------------
53    # Setup computational domain and quantities
54    #--------------------------------------------------------------------------
55    domain = rectangular_cross_domain(N, M) 
56    domain.set_quantity('elevation', topography) # Use function for elevation
57    domain.set_quantity('friction', 0.0)         # Constant friction
58    domain.set_quantity('stage', expression='elevation') # Dry initial stage
59
60    #--------------------------------------------------------------------------
61    # Create the parallel domain
62    #--------------------------------------------------------------------------
63    if parallel:
64        if myid == 0 and verbose : print 'DISTRIBUTING PARALLEL DOMAIN'
65        domain = distribute(domain, verbose=False)
66
67    #--------------------------------------------------------------------------
68    # Setup domain parameters
69    #--------------------------------------------------------------------------
70    domain.set_name('runup')                    # Set sww filename
71    domain.set_datadir('.')                     # Set output dir
72
73    domain.set_default_order(1)       
74    domain.set_quantities_to_be_stored(None)
75
76
77    #------------------------------------------------------------------------------
78    # Setup boundary conditions
79    # This must currently happen *AFTER* domain has been distributed
80    #------------------------------------------------------------------------------
81
82    Br = Reflective_boundary(domain)      # Solid reflective wall
83    Bd = Dirichlet_boundary([-0.2,0.,0.]) # Constant boundary values
84
85    # Associate boundary tags with boundary objects
86    domain.set_boundary({'left': Br, 'right': Bd, 'top': Br, 'bottom': Br})
87
88    #------------------------------------------------------------------------------
89    # Find which sub_domain in which the interpolation points are located
90    #
91    # Sometimes the interpolation points sit exactly
92    # between to centroids, so in the parallel run we
93    # reset the interpolation points to the centroids
94    # found in the sequential run
95    #------------------------------------------------------------------------------
96    interpolation_points = [[0.4,0.5], [0.6,0.5], [0.8,0.5], [0.9,0.5]]
97
98    if parallel:
99        interpolation_points = seq_interpolation_points
100
101
102    gauge_values = []
103    tri_ids = []
104    for i, point in enumerate(interpolation_points):
105        gauge_values.append([]) # Empty list for timeseries
106
107        #if is_inside_polygon(point, domain.get_boundary_polygon()):
108        try:
109            k = domain.get_triangle_containing_point(point)
110            if domain.tri_full_flag[k] == 1:
111                tri_ids.append(k)
112            else:
113                tri_ids.append(-1)           
114        except:
115            tri_ids.append(-2)
116
117
118    if verbose: print 'P%d has points = %s' %(myid, tri_ids)
119
120    if not parallel:
121        c_coord = domain.get_centroid_coordinates()
122        interpolation_points = []
123        for id in tri_ids:
124            if id<1:
125                print 'ERROR: All interpolation points be within the sequential domain!'
126            interpolation_points.append(c_coord[id,:])
127           
128    #------------------------------------------------------------------------------
129    # Evolve system through time
130    #------------------------------------------------------------------------------
131    time = []
132
133    if parallel:
134        if myid == 0 and verbose: print 'PARALLEL EVOLVE'
135    else:
136        if myid == 0 and verbose: print 'SEQUENTIAL EVOLVE'
137   
138
139    for t in domain.evolve(yieldstep = yieldstep, finaltime = finaltime):
140        if myid == 0 and verbose : domain.write_time()
141
142        # Record time series at known points
143        time.append(domain.get_time())
144
145        stage = domain.get_quantity('stage')
146
147        for i in range(4):
148            if tri_ids[i] > -1:
149                gauge_values[i].append(stage.centroid_values[tri_ids[i]])
150
151
152    #----------------------------------------
153    # Setup test arrays during sequential run
154    #----------------------------------------
155    if not parallel:
156        G = []
157        for i in range(4):
158            G.append(gauge_values[i])
159
160    success = True
161
162    for i in range(4):
163        if tri_ids[i] > -1:
164            success = success and num.allclose(gauge_values[i], G[i])
165
166    assert_(success)
167   
168    return G, interpolation_points
169
170# Test an nprocs-way run of the shallow water equations
171# against the sequential code.
172
173class Test_parallel_sw_flow(unittest.TestCase):
174    def test_parallel_sw_flow(self):
175        print "Expect this test to fail if not run from the parallel directory."
176        result = os.system("mpirun -np %d python test_parallel_sw_flow.py" % nprocs)
177        assert_(result == 0)
178
179# Because we are doing assertions outside of the TestCase class
180# the PyUnit defined assert_ function can't be used.
181def assert_(condition, msg="Assertion Failed"):
182    if condition == False:
183        #pypar.finalize()
184        raise AssertionError, msg
185
186if __name__=="__main__":
187    if numprocs == 1: 
188        runner = unittest.TextTestRunner()
189        suite = unittest.makeSuite(Test_parallel_sw_flow, 'test')
190        runner.run(suite)
191    else:
192
193        #------------------------------------------
194        # Run the sequential code on each processor
195        # and save results at 4 gauge stations to
196        # array G
197        #------------------------------------------
198        barrier()
199        if myid == 0 and verbose: print 'SEQUENTIAL START'
200
201        G , interpolation_points = evolution_test(parallel=False)
202        G = num.array(G,num.float)
203
204        barrier()
205       
206        #------------------------------------------
207        # Run the code code and compare sequential
208        # results at 4 gauge stations
209        #------------------------------------------
210        if myid ==0 and verbose: print 'PARALLEL START'
211
212        evolution_test(parallel=True, G=G, seq_interpolation_points = interpolation_points)
213       
214
215
216
Note: See TracBrowser for help on using the repository browser.