Ignore:
Timestamp:
Dec 7, 2008, 9:51:30 AM (15 years ago)
Author:
steve
Message:

Changed over test_parallel_sw.py to use Ole parallel api. There is a problem on bogong, that if there is an error, the unit test does not pick it up!

File:
1 edited

Legend:

Unmodified
Added
Removed
  • anuga_core/source/anuga_parallel/test_parallel_sw.py

    r3586 r6047  
    11#!/usr/bin/env python
    2 # Test a run of the sequential shallow water domain against
    3 # a run of the parallel shallow water domain.
    4 # WARNING: This assumes that the command to run jobs is mpirun.
    5 # Tested with MPICH and LAM (Ole)
    6 
    7 #mesh_filename = "test-100.tsh"
    8 mesh_filename= "merimbula_10785_1.tsh"
    9 yieldstep = 1
    10 finaltime = 90
    11 quantity = 'stage'
    12 nprocs = 8
     2
     3"""Test a run of the sequential shallow water domain against
     4a run of the parallel shallow water domain.
     5
     6WARNING: This assumes that the command to run jobs is mpirun.
     7Tested with MPICH and LAM (Ole)
     8"""
     9
     10#------------------------------------------------------------------------------
     11# Import necessary modules
     12#------------------------------------------------------------------------------
    1313
    1414import unittest
     
    1616import sys
    1717import print_stats
     18
     19from Numeric import allclose, array, zeros, Float, take, nonzero
     20
     21from anuga.pmesh.mesh_interface import create_mesh_from_regions
     22from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
     23from anuga.utilities.numerical_tools import ensure_numeric
     24from anuga.utilities.polygon import is_inside_polygon
     25from anuga.abstract_2d_finite_volumes.pmesh2domain import pmesh_to_domain_instance
     26from anuga.utilities.util_ext import double_precision
     27from anuga.utilities.norms import l1_norm, l2_norm, linf_norm
     28
     29from anuga.shallow_water import Domain
     30from anuga.shallow_water import Reflective_boundary
     31from anuga.shallow_water import Dirichlet_boundary
     32from anuga.shallow_water import Time_boundary
     33from anuga.shallow_water import Transmissive_boundary
     34
    1835import pypar
    1936
    20 from Numeric import array, zeros, Float, take, nonzero
    21 from anuga.shallow_water import Domain
    22 from anuga.shallow_water import Reflective_boundary as sw_reflective_boundary
    23 from anuga.shallow_water import Transmissive_boundary as sw_transmissive_boundary
    24 from parallel_shallow_water import Parallel_Domain
    25 from parallel_shallow_water import Reflective_boundary as par_reflective_boundary
    26 from parallel_shallow_water import Transmissive_boundary as par_transmissive_boundary
    27 from anuga.abstract_2d_finite_volumes.pmesh2domain\
    28      import pmesh_to_domain_instance
    29 
    30 from anuga.utilities.norms import *
    31 from anuga.utilities.util_ext import double_precision
    32 from print_stats import print_test_stats, build_full_flag
    33 from pmesh_divide import pmesh_divide_metis
    34 from build_submesh import build_submesh
    35 from build_local import build_local_mesh
    36 from build_commun import send_submesh, rec_submesh, extract_hostmesh
    37 
     37from parallel_api import distribute, myid, numprocs
     38
     39
     40#--------------------------------------------------------------------------
     41# Setup parameters
     42#--------------------------------------------------------------------------
     43
     44mesh_filename = "merimbula_10785_1.tsh"
     45#mesh_filename = "test-100.tsh"
     46yieldstep = 1
     47finaltime = 20
     48quantity = 'stage'
     49nprocs = 2
     50
     51#--------------------------------------------------------------------------
     52# Setup procedures
     53#--------------------------------------------------------------------------
    3854class Set_Stage:
    3955    """Set an initial condition with constant water height, for x<x0
     
    4864        return self.h*((x>self.x0)&(x<self.x1))
    4965
    50 
    51 def parallel_test():
    52     myid = pypar.rank()
    53     numprocs = pypar.size()
    54     proc_name = pypar.Get_processor_name()
    55 
    56     rect = zeros(4,Float) # Buffer for results
    57     if myid == 0:
    58         # Partition
    59         domain_full = pmesh_to_domain_instance(mesh_filename, Domain)
    60         rect = array(domain_full.xy_extent, Float)
    61        
    62         domain_full.set_quantity('stage', Set_Stage(756000.0, 756500.0, 2.0))
    63         #domain_full.set_quantity('stage', Set_Stage(200.0,300.0,1.0))
    64         domain_full.check_integrity()
    65         domain_full.smooth = False
    66         domain_full.reduction = min
    67         domain_full.checkpoint = False
    68         domain_full.visualise = False
    69 
    70         nodes, triangles, boundary, triangles_per_proc, quantities = \
    71                pmesh_divide_metis(domain_full, numprocs)
    72 
    73         submesh = build_submesh(nodes, triangles, boundary,\
    74                                 quantities, triangles_per_proc)
    75 
    76         for p in range(1, numprocs):
    77             send_submesh(submesh, triangles_per_proc, p)
    78 
    79         points, vertices, boundary, quantities, ghost_recv_dict, full_send_dict = \
    80                 extract_hostmesh(submesh, triangles_per_proc)
    81 
    82     else:
    83         points, vertices, boundary, quantities, ghost_recv_dict, full_send_dict = \
    84                 rec_submesh(0)
    85 
    86     pypar.broadcast(rect, 0)
    87     domain = Parallel_Domain(points, vertices, boundary,
    88                              full_send_dict = full_send_dict,
    89                              ghost_recv_dict = ghost_recv_dict)
    90     tri_full_flag = build_full_flag(domain, ghost_recv_dict)
    91 
    92     domain.default_order = 1
    93     R = par_reflective_boundary(domain)
    94     domain.set_boundary({'outflow' : R, 'inflow' : R, 'inner' : R, 'exterior' : R, 'open' : R, 'ghost' : None})
    95     domain.set_quantity('stage', quantities['stage'])
    96     domain.set_quantity('elevation', quantities['elevation'])
     66#--------------------------------------------------------------------------
     67# Setup test
     68#--------------------------------------------------------------------------
     69def evolution_test(parallel=False):
     70
     71    domain = pmesh_to_domain_instance(mesh_filename, Domain)
     72    domain.set_quantity('stage', Set_Stage(756000.0, 756500.0, 2.0))
     73
     74    #--------------------------------------------------------------------------
     75    # Create parallel domain if requested
     76    #--------------------------------------------------------------------------
     77
     78    if parallel:
     79        if myid == 0: print 'DISTRIBUTING PARALLEL DOMAIN'
     80        domain = distribute(domain, verbose=False)
     81
     82    #------------------------------------------------------------------------------
     83    # Setup boundary conditions
     84    # This must currently happen *after* domain has been distributed
     85    #------------------------------------------------------------------------------
    9786    domain.store = False
    98 
     87    Br = Reflective_boundary(domain)      # Solid reflective wall
     88
     89    domain.set_boundary({'outflow' :Br, 'inflow' :Br, 'inner' :Br, 'exterior' :Br, 'open' :Br})
     90
     91    #------------------------------------------------------------------------------
     92    # Setup diagnostic arrays
     93    #------------------------------------------------------------------------------
    9994    l1list = []
    10095    l2list = []
     
    10499    linfnorm = zeros(3, Float)
    105100    recv_norm = zeros(3, Float)
     101
     102    #------------------------------------------------------------------------------
    106103    # Evolution
     104    #------------------------------------------------------------------------------
     105    if parallel:
     106        if myid == 0: print 'PARALLEL EVOLVE'
     107    else:
     108        print 'SEQUENTIAL EVOLVE'
     109       
    107110    for t in domain.evolve(yieldstep = yieldstep, finaltime = finaltime):
    108         edges = take(domain.quantities[quantity].edge_values, nonzero(tri_full_flag))
    109         l1norm[0] = l1_norm(edges[:, 0])
    110         l1norm[1] = l1_norm(edges[:, 1])
    111         l1norm[2] = l1_norm(edges[:, 2])
    112         l2norm[0] = pow(l2_norm(edges[:,0]), 2)
    113         l2norm[1] = pow(l2_norm(edges[:,1]), 2)
    114         l2norm[2] = pow(l2_norm(edges[:,2]), 2)
     111        edges = take(domain.quantities[quantity].edge_values, nonzero(domain.tri_full_flag))
     112        l1norm[0] = l1_norm(edges[:,0])
     113        l1norm[1] = l1_norm(edges[:,1])
     114        l1norm[2] = l1_norm(edges[:,2])
     115        l2norm[0] = l2_norm(edges[:,0])
     116        l2norm[1] = l2_norm(edges[:,1])
     117        l2norm[2] = l2_norm(edges[:,2])
    115118        linfnorm[0] = linf_norm(edges[:,0])
    116119        linfnorm[1] = linf_norm(edges[:,1])
    117120        linfnorm[2] = linf_norm(edges[:,2])
    118         if myid == 0:
     121        if parallel:
     122            l2norm[0] = pow(l2norm[0], 2)
     123            l2norm[1] = pow(l2norm[1], 2)
     124            l2norm[2] = pow(l2norm[2], 2)
     125            if myid == 0:
     126                domain.write_time()
     127
     128                #print edges[:,1]           
     129                for p in range(1, numprocs):
     130                    pypar.receive(p, recv_norm)
     131                    l1norm += recv_norm
     132                    pypar.receive(p, recv_norm)
     133                    l2norm += recv_norm
     134                    pypar.receive(p, recv_norm)
     135                    linfnorm[0] = max(linfnorm[0], recv_norm[0])
     136                    linfnorm[1] = max(linfnorm[1], recv_norm[1])
     137                    linfnorm[2] = max(linfnorm[2], recv_norm[2])
     138
     139                l2norm[0] = pow(l2norm[0], 0.5)
     140                l2norm[1] = pow(l2norm[1], 0.5)
     141                l2norm[2] = pow(l2norm[2], 0.5)
     142
     143                l1list.append(l1norm)               
     144                l2list.append(l2norm)
     145                linflist.append(linfnorm)               
     146            else:
     147                pypar.send(l1norm, 0)
     148                pypar.send(l2norm, 0)
     149                pypar.send(linfnorm, 0)
     150        else:
    119151            domain.write_time()
    120             #print edges[:,1]           
    121             for p in range(1, numprocs):
    122                 pypar.receive(p, recv_norm)
    123                 l1norm += recv_norm
    124                 pypar.receive(p, recv_norm)
    125                 l2norm += recv_norm
    126                 pypar.receive(p, recv_norm)
    127                 linfnorm[0] = max(linfnorm[0], recv_norm[0])
    128                 linfnorm[1] = max(linfnorm[1], recv_norm[1])
    129                 linfnorm[2] = max(linfnorm[2], recv_norm[2])
    130             l1list.append(l1norm)
    131             l2norm[0] = pow(l2norm[0], 0.5)
    132             l2norm[1] = pow(l2norm[1], 0.5)
    133             l2norm[2] = pow(l2norm[2], 0.5)
     152            l1list.append(l1norm)               
    134153            l2list.append(l2norm)
    135154            linflist.append(linfnorm)
    136         else:
    137             pypar.send(l1norm, 0)
    138             pypar.send(l2norm, 0)
    139             pypar.send(linfnorm, 0)
    140     return (l1list, l2list, linflist)
    141 
    142 def sequential_test():
    143     domain_full = pmesh_to_domain_instance(mesh_filename, Domain)
    144 
    145     domain_full.set_quantity('stage', Set_Stage(756000.0, 756500.0, 2.0))
    146     domain_full.check_integrity()
    147     domain_full.default_order = 1
    148     domain_full.smooth = False
    149     domain_full.reduction = min
    150     domain_full.store = False
    151     domain_full.checkpoint = False
    152     domain_full.visualise = False
    153     R = sw_reflective_boundary(domain_full)
    154     domain_full.set_boundary({'outflow' : R, 'inflow' : R, 'inner' : R, 'exterior' : R, 'open' : R})
    155     l1list = []
    156     l2list = []
    157     linflist = []
    158     l1norm = zeros(3, Float)
    159     l2norm = zeros(3, Float)
    160     linfnorm = zeros(3, Float)
    161     for t in domain_full.evolve(yieldstep = yieldstep, finaltime = finaltime):
    162         domain_full.write_time()
    163         edge = domain_full.quantities[quantity].edge_values
    164         #print edge[:,1]
    165         l1norm[0] = l1_norm(edge[:,0])
    166         l1norm[1] = l1_norm(edge[:,1])
    167         l1norm[2] = l1_norm(edge[:,2])
    168         l2norm[0] = l2_norm(edge[:,0])
    169         l2norm[1] = l2_norm(edge[:,1])
    170         l2norm[2] = l2_norm(edge[:,2])
    171         linfnorm[0] = linf_norm(edge[:,0])
    172         linfnorm[1] = linf_norm(edge[:,1])
    173         linfnorm[2] = linf_norm(edge[:,2])
    174         l1list.append(l1norm)
    175         l2list.append(l2norm)
    176         linflist.append(linfnorm)
     155           
     156
    177157    return (l1list, l2list, linflist)
    178158
     
    184164        print "Expect this test to fail if not run from the parallel directory."
    185165        result = os.system("mpirun -np %d python test_parallel_sw.py" % nprocs)
     166        print 'result ',result
    186167        assert_(result == 0)
    187168
     
    194175
    195176if __name__=="__main__":
    196     if pypar.size() == 1:
     177    if numprocs == 1:
    197178        runner = unittest.TextTestRunner()
    198179        suite = unittest.makeSuite(Test_Parallel_Sw, 'test')
    199180        runner.run(suite)
    200181    else:
    201         if pypar.rank() == 0:
    202             l1norm_seq, l2norm_seq, linfnorm_seq = sequential_test()
    203            
    204         l1norm_par, l2norm_par, linfnorm_par = parallel_test()
     182
     183        pypar.barrier()
     184        if myid == 0:
     185            print 'SEQUENTIAL START'
     186            l1norm_seq, l2norm_seq, linfnorm_seq = evolution_test(parallel=False)
     187
     188        pypar.barrier()
     189        if myid ==0:
     190            print 'PARALLEL START'
    205191       
    206         if pypar.rank() == 0:
    207             #print l2norm_seq, l2norm_par
    208            
     192        l1norm_par, l2norm_par, linfnorm_par = evolution_test(parallel=True)
     193       
     194        if myid == 0:
    209195            assert_(len(l1norm_seq) == len(l1norm_par))
    210196            assert_(len(l2norm_seq) == len(l2norm_par))
     
    231217                        assert_(abs(linfnorm_par[x][y] - linfnorm_par[x-1][y]) < tol)
    232218
    233         if pypar.rank() == 0:
     219        if myid == 0:
    234220            print 'Parallel test OK'
    235221
    236         pypar.finalize()
     222
Note: See TracChangeset for help on using the changeset viewer.