Changeset 6047 for anuga_core/source/anuga_parallel/test_parallel_sw.py
- Timestamp:
- Dec 7, 2008, 9:51:30 AM (15 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
anuga_core/source/anuga_parallel/test_parallel_sw.py
r3586 r6047 1 1 #!/usr/bin/env python 2 # Test a run of the sequential shallow water domain against 3 # a run of the parallel shallow water domain. 4 # WARNING: This assumes that the command to run jobs is mpirun.5 # Tested with MPICH and LAM (Ole) 6 7 #mesh_filename = "test-100.tsh" 8 mesh_filename= "merimbula_10785_1.tsh"9 yieldstep = 1 10 finaltime = 90 11 quantity = 'stage' 12 nprocs = 8 2 3 """Test a run of the sequential shallow water domain against 4 a run of the parallel shallow water domain. 5 6 WARNING: This assumes that the command to run jobs is mpirun. 7 Tested with MPICH and LAM (Ole) 8 """ 9 10 #------------------------------------------------------------------------------ 11 # Import necessary modules 12 #------------------------------------------------------------------------------ 13 13 14 14 import unittest … … 16 16 import sys 17 17 import print_stats 18 19 from Numeric import allclose, array, zeros, Float, take, nonzero 20 21 from anuga.pmesh.mesh_interface import create_mesh_from_regions 22 from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross 23 from anuga.utilities.numerical_tools import ensure_numeric 24 from anuga.utilities.polygon import is_inside_polygon 25 from anuga.abstract_2d_finite_volumes.pmesh2domain import pmesh_to_domain_instance 26 from anuga.utilities.util_ext import double_precision 27 from anuga.utilities.norms import l1_norm, l2_norm, linf_norm 28 29 from anuga.shallow_water import Domain 30 from anuga.shallow_water import Reflective_boundary 31 from anuga.shallow_water import Dirichlet_boundary 32 from anuga.shallow_water import Time_boundary 33 from anuga.shallow_water import Transmissive_boundary 34 18 35 import pypar 19 36 20 from Numeric import array, zeros, Float, take, nonzero 21 from anuga.shallow_water import Domain 22 from anuga.shallow_water import Reflective_boundary as sw_reflective_boundary 23 from anuga.shallow_water import Transmissive_boundary as sw_transmissive_boundary 24 from parallel_shallow_water import Parallel_Domain 25 from parallel_shallow_water import Reflective_boundary as par_reflective_boundary 26 from parallel_shallow_water import Transmissive_boundary as par_transmissive_boundary 27 from anuga.abstract_2d_finite_volumes.pmesh2domain\ 28 import pmesh_to_domain_instance 29 30 from anuga.utilities.norms import * 31 from anuga.utilities.util_ext import double_precision 32 from print_stats import print_test_stats, build_full_flag 33 from pmesh_divide import pmesh_divide_metis 34 from build_submesh import build_submesh 35 from build_local import build_local_mesh 36 from build_commun import send_submesh, rec_submesh, extract_hostmesh 37 37 from parallel_api import distribute, myid, numprocs 38 39 40 #-------------------------------------------------------------------------- 41 # Setup parameters 42 #-------------------------------------------------------------------------- 43 44 mesh_filename = "merimbula_10785_1.tsh" 45 #mesh_filename = "test-100.tsh" 46 yieldstep = 1 47 finaltime = 20 48 quantity = 'stage' 49 nprocs = 2 50 51 #-------------------------------------------------------------------------- 52 # Setup procedures 53 #-------------------------------------------------------------------------- 38 54 class Set_Stage: 39 55 """Set an initial condition with constant water height, for x<x0 … … 48 64 return self.h*((x>self.x0)&(x<self.x1)) 49 65 50 51 def parallel_test(): 52 myid = pypar.rank() 53 numprocs = pypar.size() 54 proc_name = pypar.Get_processor_name() 55 56 rect = zeros(4,Float) # Buffer for results 57 if myid == 0: 58 # Partition 59 domain_full = pmesh_to_domain_instance(mesh_filename, Domain) 60 rect = array(domain_full.xy_extent, Float) 61 62 domain_full.set_quantity('stage', Set_Stage(756000.0, 756500.0, 2.0)) 63 #domain_full.set_quantity('stage', Set_Stage(200.0,300.0,1.0)) 64 domain_full.check_integrity() 65 domain_full.smooth = False 66 domain_full.reduction = min 67 domain_full.checkpoint = False 68 domain_full.visualise = False 69 70 nodes, triangles, boundary, triangles_per_proc, quantities = \ 71 pmesh_divide_metis(domain_full, numprocs) 72 73 submesh = build_submesh(nodes, triangles, boundary,\ 74 quantities, triangles_per_proc) 75 76 for p in range(1, numprocs): 77 send_submesh(submesh, triangles_per_proc, p) 78 79 points, vertices, boundary, quantities, ghost_recv_dict, full_send_dict = \ 80 extract_hostmesh(submesh, triangles_per_proc) 81 82 else: 83 points, vertices, boundary, quantities, ghost_recv_dict, full_send_dict = \ 84 rec_submesh(0) 85 86 pypar.broadcast(rect, 0) 87 domain = Parallel_Domain(points, vertices, boundary, 88 full_send_dict = full_send_dict, 89 ghost_recv_dict = ghost_recv_dict) 90 tri_full_flag = build_full_flag(domain, ghost_recv_dict) 91 92 domain.default_order = 1 93 R = par_reflective_boundary(domain) 94 domain.set_boundary({'outflow' : R, 'inflow' : R, 'inner' : R, 'exterior' : R, 'open' : R, 'ghost' : None}) 95 domain.set_quantity('stage', quantities['stage']) 96 domain.set_quantity('elevation', quantities['elevation']) 66 #-------------------------------------------------------------------------- 67 # Setup test 68 #-------------------------------------------------------------------------- 69 def evolution_test(parallel=False): 70 71 domain = pmesh_to_domain_instance(mesh_filename, Domain) 72 domain.set_quantity('stage', Set_Stage(756000.0, 756500.0, 2.0)) 73 74 #-------------------------------------------------------------------------- 75 # Create parallel domain if requested 76 #-------------------------------------------------------------------------- 77 78 if parallel: 79 if myid == 0: print 'DISTRIBUTING PARALLEL DOMAIN' 80 domain = distribute(domain, verbose=False) 81 82 #------------------------------------------------------------------------------ 83 # Setup boundary conditions 84 # This must currently happen *after* domain has been distributed 85 #------------------------------------------------------------------------------ 97 86 domain.store = False 98 87 Br = Reflective_boundary(domain) # Solid reflective wall 88 89 domain.set_boundary({'outflow' :Br, 'inflow' :Br, 'inner' :Br, 'exterior' :Br, 'open' :Br}) 90 91 #------------------------------------------------------------------------------ 92 # Setup diagnostic arrays 93 #------------------------------------------------------------------------------ 99 94 l1list = [] 100 95 l2list = [] … … 104 99 linfnorm = zeros(3, Float) 105 100 recv_norm = zeros(3, Float) 101 102 #------------------------------------------------------------------------------ 106 103 # Evolution 104 #------------------------------------------------------------------------------ 105 if parallel: 106 if myid == 0: print 'PARALLEL EVOLVE' 107 else: 108 print 'SEQUENTIAL EVOLVE' 109 107 110 for t in domain.evolve(yieldstep = yieldstep, finaltime = finaltime): 108 edges = take(domain.quantities[quantity].edge_values, nonzero( tri_full_flag))109 l1norm[0] = l1_norm(edges[:, 110 l1norm[1] = l1_norm(edges[:, 111 l1norm[2] = l1_norm(edges[:, 112 l2norm[0] = pow(l2_norm(edges[:,0]), 2)113 l2norm[1] = pow(l2_norm(edges[:,1]), 2)114 l2norm[2] = pow(l2_norm(edges[:,2]), 2)111 edges = take(domain.quantities[quantity].edge_values, nonzero(domain.tri_full_flag)) 112 l1norm[0] = l1_norm(edges[:,0]) 113 l1norm[1] = l1_norm(edges[:,1]) 114 l1norm[2] = l1_norm(edges[:,2]) 115 l2norm[0] = l2_norm(edges[:,0]) 116 l2norm[1] = l2_norm(edges[:,1]) 117 l2norm[2] = l2_norm(edges[:,2]) 115 118 linfnorm[0] = linf_norm(edges[:,0]) 116 119 linfnorm[1] = linf_norm(edges[:,1]) 117 120 linfnorm[2] = linf_norm(edges[:,2]) 118 if myid == 0: 121 if parallel: 122 l2norm[0] = pow(l2norm[0], 2) 123 l2norm[1] = pow(l2norm[1], 2) 124 l2norm[2] = pow(l2norm[2], 2) 125 if myid == 0: 126 domain.write_time() 127 128 #print edges[:,1] 129 for p in range(1, numprocs): 130 pypar.receive(p, recv_norm) 131 l1norm += recv_norm 132 pypar.receive(p, recv_norm) 133 l2norm += recv_norm 134 pypar.receive(p, recv_norm) 135 linfnorm[0] = max(linfnorm[0], recv_norm[0]) 136 linfnorm[1] = max(linfnorm[1], recv_norm[1]) 137 linfnorm[2] = max(linfnorm[2], recv_norm[2]) 138 139 l2norm[0] = pow(l2norm[0], 0.5) 140 l2norm[1] = pow(l2norm[1], 0.5) 141 l2norm[2] = pow(l2norm[2], 0.5) 142 143 l1list.append(l1norm) 144 l2list.append(l2norm) 145 linflist.append(linfnorm) 146 else: 147 pypar.send(l1norm, 0) 148 pypar.send(l2norm, 0) 149 pypar.send(linfnorm, 0) 150 else: 119 151 domain.write_time() 120 #print edges[:,1] 121 for p in range(1, numprocs): 122 pypar.receive(p, recv_norm) 123 l1norm += recv_norm 124 pypar.receive(p, recv_norm) 125 l2norm += recv_norm 126 pypar.receive(p, recv_norm) 127 linfnorm[0] = max(linfnorm[0], recv_norm[0]) 128 linfnorm[1] = max(linfnorm[1], recv_norm[1]) 129 linfnorm[2] = max(linfnorm[2], recv_norm[2]) 130 l1list.append(l1norm) 131 l2norm[0] = pow(l2norm[0], 0.5) 132 l2norm[1] = pow(l2norm[1], 0.5) 133 l2norm[2] = pow(l2norm[2], 0.5) 152 l1list.append(l1norm) 134 153 l2list.append(l2norm) 135 154 linflist.append(linfnorm) 136 else: 137 pypar.send(l1norm, 0) 138 pypar.send(l2norm, 0) 139 pypar.send(linfnorm, 0) 140 return (l1list, l2list, linflist) 141 142 def sequential_test(): 143 domain_full = pmesh_to_domain_instance(mesh_filename, Domain) 144 145 domain_full.set_quantity('stage', Set_Stage(756000.0, 756500.0, 2.0)) 146 domain_full.check_integrity() 147 domain_full.default_order = 1 148 domain_full.smooth = False 149 domain_full.reduction = min 150 domain_full.store = False 151 domain_full.checkpoint = False 152 domain_full.visualise = False 153 R = sw_reflective_boundary(domain_full) 154 domain_full.set_boundary({'outflow' : R, 'inflow' : R, 'inner' : R, 'exterior' : R, 'open' : R}) 155 l1list = [] 156 l2list = [] 157 linflist = [] 158 l1norm = zeros(3, Float) 159 l2norm = zeros(3, Float) 160 linfnorm = zeros(3, Float) 161 for t in domain_full.evolve(yieldstep = yieldstep, finaltime = finaltime): 162 domain_full.write_time() 163 edge = domain_full.quantities[quantity].edge_values 164 #print edge[:,1] 165 l1norm[0] = l1_norm(edge[:,0]) 166 l1norm[1] = l1_norm(edge[:,1]) 167 l1norm[2] = l1_norm(edge[:,2]) 168 l2norm[0] = l2_norm(edge[:,0]) 169 l2norm[1] = l2_norm(edge[:,1]) 170 l2norm[2] = l2_norm(edge[:,2]) 171 linfnorm[0] = linf_norm(edge[:,0]) 172 linfnorm[1] = linf_norm(edge[:,1]) 173 linfnorm[2] = linf_norm(edge[:,2]) 174 l1list.append(l1norm) 175 l2list.append(l2norm) 176 linflist.append(linfnorm) 155 156 177 157 return (l1list, l2list, linflist) 178 158 … … 184 164 print "Expect this test to fail if not run from the parallel directory." 185 165 result = os.system("mpirun -np %d python test_parallel_sw.py" % nprocs) 166 print 'result ',result 186 167 assert_(result == 0) 187 168 … … 194 175 195 176 if __name__=="__main__": 196 if pypar.size()== 1:177 if numprocs == 1: 197 178 runner = unittest.TextTestRunner() 198 179 suite = unittest.makeSuite(Test_Parallel_Sw, 'test') 199 180 runner.run(suite) 200 181 else: 201 if pypar.rank() == 0: 202 l1norm_seq, l2norm_seq, linfnorm_seq = sequential_test() 203 204 l1norm_par, l2norm_par, linfnorm_par = parallel_test() 182 183 pypar.barrier() 184 if myid == 0: 185 print 'SEQUENTIAL START' 186 l1norm_seq, l2norm_seq, linfnorm_seq = evolution_test(parallel=False) 187 188 pypar.barrier() 189 if myid ==0: 190 print 'PARALLEL START' 205 191 206 if pypar.rank() == 0:207 #print l2norm_seq, l2norm_par208 192 l1norm_par, l2norm_par, linfnorm_par = evolution_test(parallel=True) 193 194 if myid == 0: 209 195 assert_(len(l1norm_seq) == len(l1norm_par)) 210 196 assert_(len(l2norm_seq) == len(l2norm_par)) … … 231 217 assert_(abs(linfnorm_par[x][y] - linfnorm_par[x-1][y]) < tol) 232 218 233 if pypar.rank()== 0:219 if myid == 0: 234 220 print 'Parallel test OK' 235 221 236 pypar.finalize() 222
Note: See TracChangeset
for help on using the changeset viewer.