source: anuga_core/source/anuga_parallel/test_parallel_distribute_domain.py @ 7459

Last change on this file since 7459 was 7459, checked in by ole, 14 years ago

Work on parallel testing.
First cut at an Exception that will call pypar.abort to
terminate all parallel processes if test fails.

  • Property svn:executable set to *
File size: 8.3 KB
Line 
1
2"""Test a run of the sequential shallow water domain against
3a run of the parallel shallow water domain.
4
5WARNING: This assumes that the command to run jobs is mpirun.
6Tested with MPICH and LAM (Ole)
7"""
8
9#------------------------------------------------------------------------------
10# Import necessary modules
11#------------------------------------------------------------------------------
12
13import unittest
14import os
15import sys
16import pypar
17
18import numpy as num
19
20
21
22
23from anuga.utilities.numerical_tools import ensure_numeric
24from anuga.utilities.util_ext        import double_precision
25from anuga.utilities.norms           import l1_norm, l2_norm, linf_norm
26
27from anuga.interface import Domain
28from anuga.interface import Reflective_boundary
29from anuga.interface import Dirichlet_boundary
30from anuga.interface import Time_boundary
31from anuga.interface import Transmissive_boundary
32
33from anuga.interface import rectangular_cross
34from anuga.interface import create_domain_from_file
35
36
37from anuga_parallel.interface import distribute, myid, numprocs
38
39
40#--------------------------------------------------------------------------
41# Setup parameters
42#--------------------------------------------------------------------------
43
44mesh_filename = "merimbula_10785_1.tsh"
45#mesh_filename = "test-100.tsh"
46yieldstep = 1
47finaltime = 20
48quantity = 'stage'
49nprocs = 4
50verbose = True
51
52#--------------------------------------------------------------------------
53# Setup procedures
54#--------------------------------------------------------------------------
55class Set_Stage:
56    """Set an initial condition with constant water height, for x<x0
57    """
58
59    def __init__(self, x0=0.25, x1=0.5, h=1.0):
60        self.x0 = x0
61        self.x1 = x1
62        self.= h
63
64    def __call__(self, x, y):
65        return self.h*((x>self.x0)&(x<self.x1))
66
67#--------------------------------------------------------------------------
68# Setup test
69#--------------------------------------------------------------------------
70def evolution_test(parallel=False):
71
72
73    domain = create_domain_from_file(mesh_filename)
74    domain.set_quantity('stage', Set_Stage(756000.0, 756500.0, 2.0))
75
76    #--------------------------------------------------------------------------
77    # Create parallel domain if requested
78    #--------------------------------------------------------------------------
79
80    if parallel:
81        if myid == 0 and verbose: print 'DISTRIBUTING PARALLEL DOMAIN'
82        domain = distribute(domain)
83
84    #------------------------------------------------------------------------------
85    # Setup boundary conditions
86    # This must currently happen *after* domain has been distributed
87    #------------------------------------------------------------------------------
88    domain.store = False
89    Br = Reflective_boundary(domain)      # Solid reflective wall
90
91    domain.set_boundary({'outflow' :Br, 'inflow' :Br, 'inner' :Br, 'exterior' :Br, 'open' :Br})
92
93    #------------------------------------------------------------------------------
94    # Setup diagnostic arrays
95    #------------------------------------------------------------------------------
96    l1list = []
97    l2list = []
98    linflist = []
99    l1norm = num.zeros(3, num.float)
100    l2norm = num.zeros(3, num.float)
101    linfnorm = num.zeros(3, num.float)
102    recv_norm = num.zeros(3, num.float)
103
104    #------------------------------------------------------------------------------
105    # Evolution
106    #------------------------------------------------------------------------------
107    if parallel:
108        if myid == 0 and verbose: print 'PARALLEL EVOLVE'
109    else:
110        if verbose: print 'SEQUENTIAL EVOLVE'
111       
112    for t in domain.evolve(yieldstep = yieldstep, finaltime = finaltime):
113        edges = domain.quantities[quantity].edge_values.take(num.flatnonzero(domain.tri_full_flag),axis=0)
114        l1norm[0] = l1_norm(edges[:,0])
115        l1norm[1] = l1_norm(edges[:,1])
116        l1norm[2] = l1_norm(edges[:,2])
117        l2norm[0] = l2_norm(edges[:,0])
118        l2norm[1] = l2_norm(edges[:,1])
119        l2norm[2] = l2_norm(edges[:,2])
120        linfnorm[0] = linf_norm(edges[:,0])
121        linfnorm[1] = linf_norm(edges[:,1])
122        linfnorm[2] = linf_norm(edges[:,2])
123        if parallel:
124            l2norm[0] = pow(l2norm[0], 2)
125            l2norm[1] = pow(l2norm[1], 2)
126            l2norm[2] = pow(l2norm[2], 2)
127            if myid == 0:
128                #domain.write_time()
129
130                #print edges[:,1]           
131                for p in range(1, numprocs):
132                    recv_norm = pypar.receive(p)
133                    l1norm += recv_norm
134                    recv_norm = pypar.receive(p)
135                    l2norm += recv_norm
136                    recv_norm = pypar.receive(p)
137                    linfnorm[0] = max(linfnorm[0], recv_norm[0])
138                    linfnorm[1] = max(linfnorm[1], recv_norm[1])
139                    linfnorm[2] = max(linfnorm[2], recv_norm[2])
140
141                l2norm[0] = pow(l2norm[0], 0.5)
142                l2norm[1] = pow(l2norm[1], 0.5)
143                l2norm[2] = pow(l2norm[2], 0.5)
144
145                l1list.append(l1norm)               
146                l2list.append(l2norm)
147                linflist.append(linfnorm)               
148            else:
149                pypar.send(l1norm, 0)
150                pypar.send(l2norm, 0)
151                pypar.send(linfnorm, 0)
152        else:
153            #domain.write_time()
154            l1list.append(l1norm)               
155            l2list.append(l2norm)
156            linflist.append(linfnorm)
157           
158
159    return (l1list, l2list, linflist)
160
161# Test an nprocs-way run of the shallow water equations
162# against the sequential code.
163
164class Test_parallel_distribute_domain(unittest.TestCase):
165    def test_parallel_distribute_domain(self):
166        print "Expect this test to fail if not run from the parallel directory."
167        result = os.system("mpirun -np %d python test_parallel_distribute_domain.py" % nprocs)
168        assert_(result == 0)
169
170
171# Because we are doing assertions outside of the TestCase class
172# the PyUnit defined assert_ function can't be used.
173def assert_(condition, msg="Assertion Failed"):
174    if condition == False:
175        #pypar.finalize()
176        raise AssertionError, msg
177
178if __name__=="__main__":
179    if numprocs == 1: 
180        runner = unittest.TextTestRunner()
181        suite = unittest.makeSuite(Test_parallel_distribute_domain, 'test')
182        runner.run(suite)
183    else:
184
185        pypar.barrier()
186        if myid == 0:
187            if verbose: print 'SEQUENTIAL START'
188            l1norm_seq, l2norm_seq, linfnorm_seq = evolution_test(parallel=False)
189
190        pypar.barrier()
191        if myid ==0:
192            if verbose: print 'PARALLEL START'
193       
194        l1norm_par, l2norm_par, linfnorm_par = evolution_test(parallel=True)
195       
196        if myid == 0:
197            assert_(len(l1norm_seq) == len(l1norm_par))
198            assert_(len(l2norm_seq) == len(l2norm_par))
199            assert_(len(linfnorm_seq) == len(linfnorm_par))
200            assert_(len(l1norm_seq) == len(l2norm_seq))
201            assert_(len(l2norm_seq) == len(linfnorm_seq))
202            # Anything smaller than tol we consider to be 0.
203            # This usualy comes out to be 10^-14 (DBL_DIG = 10^-15)
204            # * 10 to cater for rounding error in computation.
205            tol = pow(10, -1 * (double_precision() - 1))
206            for x in range(len(l1norm_seq)):
207                for y in range(3):
208                    # Calculate relative difference in the norms
209                    assert_(abs(l1norm_seq[x][y] - l1norm_par[x][y])/l1norm_seq[x][y] < tol)
210                    assert_(abs(l2norm_seq[x][y] - l2norm_par[x][y])/l2norm_seq[x][y] < tol)
211                    assert_(abs(linfnorm_seq[x][y] - linfnorm_par[x][y])/linfnorm_seq[x][y] < tol)
212                    if x > 0:
213                        # Verify that the quantity is being conserved across iterations.
214                        assert_(abs(l1norm_seq[x][y] - l1norm_seq[x-1][y]) < tol)
215                        assert_(abs(l2norm_seq[x][y] - l2norm_seq[x-1][y]) < tol)
216                        assert_(abs(linfnorm_seq[x][y] - linfnorm_seq[x-1][y]) < tol)
217                        assert_(abs(l1norm_par[x][y] - l1norm_par[x-1][y]) < tol)
218                        assert_(abs(l2norm_par[x][y] - l2norm_par[x-1][y]) < tol)
219                        assert_(abs(linfnorm_par[x][y] - linfnorm_par[x-1][y]) < tol)
220               
221            if verbose: print 'Parallel test OK'
222
223
Note: See TracBrowser for help on using the repository browser.