"""Test a run of the sequential shallow water domain against
a run of the parallel shallow water domain.

WARNING: This assumes that the command to run jobs is mpirun.
Tested with MPICH and LAM (Ole)
"""

#------------------------------------------------------------------------------
# Import necessary modules
#------------------------------------------------------------------------------

import unittest
import os
import sys
import pypar

import numpy as num




from anuga.utilities.numerical_tools import ensure_numeric
from anuga.utilities.util_ext import double_precision
from anuga.utilities.norms import l1_norm, l2_norm, linf_norm

from anuga import Domain
from anuga import Reflective_boundary
from anuga import Dirichlet_boundary
from anuga import Time_boundary
from anuga import Transmissive_boundary

from anuga import rectangular_cross
from anuga import create_domain_from_file


from anuga_parallel.interface import distribute, myid, numprocs, finalize


#--------------------------------------------------------------------------
# Setup parameters
#--------------------------------------------------------------------------

mesh_filename = "merimbula_10785_1.tsh"
#mesh_filename = "test-100.tsh"
yieldstep = 1
finaltime = 20
quantity = 'stage'
nprocs = 4
verbose = False

#--------------------------------------------------------------------------
# Setup procedures
#--------------------------------------------------------------------------
class Set_Stage:
"""Set an initial condition with constant water height, for x<x0
"""

def __init__(self, x0=0.25, x1=0.5, h=1.0):
self.x0 = x0
self.x1 = x1
self.h = h

def __call__(self, x, y):
return self.h*((x>self.x0)&(x<self.x1))

#--------------------------------------------------------------------------
# Setup test
#--------------------------------------------------------------------------
def evolution_test(parallel=False):


domain = create_domain_from_file(mesh_filename)
domain.set_quantity('stage', Set_Stage(756000.0, 756500.0, 2.0))

#--------------------------------------------------------------------------
# Create parallel domain if requested
#--------------------------------------------------------------------------

if parallel:
if myid == 0 and verbose: print 'DISTRIBUTING PARALLEL DOMAIN'
domain = distribute(domain)

#------------------------------------------------------------------------------
# Setup boundary conditions
# This must currently happen *after* domain has been distributed
#------------------------------------------------------------------------------
domain.store = False
Br = Reflective_boundary(domain) # Solid reflective wall

domain.set_boundary({'outflow' :Br, 'inflow' :Br, 'inner' :Br, 'exterior' :Br, 'open' :Br})

#------------------------------------------------------------------------------
# Setup diagnostic arrays
#------------------------------------------------------------------------------
l1list = []
l2list = []
linflist = []
l1norm = num.zeros(3, num.float)
l2norm = num.zeros(3, num.float)
linfnorm = num.zeros(3, num.float)
recv_norm = num.zeros(3, num.float)

#------------------------------------------------------------------------------
# Evolution
#------------------------------------------------------------------------------
if parallel:
if myid == 0 and verbose: print 'PARALLEL EVOLVE'
else:
if verbose: print 'SEQUENTIAL EVOLVE'

for t in domain.evolve(yieldstep = yieldstep, finaltime = finaltime):
edges = domain.quantities[quantity].edge_values.take(num.flatnonzero(domain.tri_full_flag),axis=0)
l1norm[0] = l1_norm(edges[:,0])
l1norm[1] = l1_norm(edges[:,1])
l1norm[2] = l1_norm(edges[:,2])
l2norm[0] = l2_norm(edges[:,0])
l2norm[1] = l2_norm(edges[:,1])
l2norm[2] = l2_norm(edges[:,2])
linfnorm[0] = linf_norm(edges[:,0])
linfnorm[1] = linf_norm(edges[:,1])
linfnorm[2] = linf_norm(edges[:,2])
if parallel:
l2norm[0] = pow(l2norm[0], 2)
l2norm[1] = pow(l2norm[1], 2)
l2norm[2] = pow(l2norm[2], 2)
if myid == 0:
#domain.write_time()

#print edges[:,1]
for p in range(1, numprocs):
recv_norm = pypar.receive(p)
l1norm += recv_norm
recv_norm = pypar.receive(p)
l2norm += recv_norm
recv_norm = pypar.receive(p)
linfnorm[0] = max(linfnorm[0], recv_norm[0])
linfnorm[1] = max(linfnorm[1], recv_norm[1])
linfnorm[2] = max(linfnorm[2], recv_norm[2])

l2norm[0] = pow(l2norm[0], 0.5)
l2norm[1] = pow(l2norm[
| 143 | l2norm[2] = pow(l2norm[2], 0.5) |
| 144 | |
| 145 | l1list.append(l1norm) |
| 146 | l2list.append(l2norm) |
| 147 | linflist.append(linfnorm) |
| 148 | else: |
| 149 | pypar.send(l1norm, 0) |
| 150 | pypar.send(l2norm, 0) |
| 151 | pypar.send(linfnorm, 0) |
| 152 | else: |
[7449] | 153 | #domain.write_time() |
[6047] | 154 | l1list.append(l1norm) |
[2738] | 155 | l2list.append(l2norm) |
| 156 | linflist.append(linfnorm) |
[6047] | 157 | |
[2738] | 158 | |
| 159 | return (l1list, l2list, linflist) |
| 160 | |
[7449] | 161 | # Test an nprocs-way run of the shallow water equations |
[2738] | 162 | # against the sequential code. |
| 163 | |
[7459] | 164 | class Test_parallel_distribute_domain(unittest.TestCase): |
| 165 | def test_parallel_distribute_domain(self): |
[2739] | 166 | print "Expect this test to fail if not run from the parallel directory." |
[7459] | 167 | result = os.system("mpirun -np %d python test_parallel_distribute_domain.py" % nprocs) |
[2850] | 168 | assert_(result == 0) |
[2738] | 169 | |
[7459] | 170 | |
[2776] | 171 | # Because we are doing assertions outside of the TestCase class |
| 172 | # the PyUnit defined assert_ function can't be used. |
| 173 | def assert_(condition, msg="Assertion Failed"): |
| 174 | if condition == False: |
[6721] | 175 | #pypar.finalize() |
[2776] | 176 | raise AssertionError, msg |
| 177 | |
[2738] | 178 | if __name__=="__main__": |
[6047] | 179 | if numprocs == 1: |
[2738] | 180 | runner = unittest.TextTestRunner() |
[7459] | 181 | suite = unittest.makeSuite(Test_parallel_distribute_domain, 'test') |
[2738] | 182 | runner.run(suite) |
| 183 | else: |
[6047] | 184 | |
| 185 | pypar.barrier() |
| 186 | if myid == 0: |
[7459] | 187 | if verbose: print 'SEQUENTIAL START' |
[6047] | 188 | l1norm_seq, l2norm_seq, linfnorm_seq = evolution_test(parallel=False) |
| 189 | |
| 190 | pypar.barrier() |
| 191 | if myid ==0: |
[7459] | 192 | if verbose: print 'PARALLEL START' |
[3586] | 193 | |
[6047] | 194 | l1norm_par, l2norm_par, linfnorm_par = evolution_test(parallel=True) |
| 195 | |
| 196 | if myid == 0: |
[2776] | 197 | assert_(len(l1norm_seq) == len(l1norm_par)) |
| 198 | assert_(len(l2norm_seq) == len(l2norm_par)) |
| 199 | assert_(len(linfnorm_seq) == len(linfnorm_par)) |
| 200 | assert_(len(l1norm_seq) == len(l2norm_seq)) |
| 201 | assert_(len(l2norm_seq) == len(linfnorm_seq)) |
[2738] | 202 | # Anything smaller than tol we consider to be 0. |
[3243] | 203 | # This usualy comes out to be 10^-14 (DBL_DIG = 10^-15) |
| 204 | # * 10 to cater for rounding error in computation. |
| 205 | tol = pow(10, -1 * (double_precision() - 1)) |
[2738] | 206 | for x in range(len(l1norm_seq)): |
| 207 | for y in range(3): |
[3243] | 208 | # Calculate relative difference in the norms |
| 209 | assert_(abs(l1norm_seq[x][y] - l1norm_par[x][y])/l1norm_seq[x][y] < tol) |
| 210 | assert_(abs(l2norm_seq[x][y] - l2norm_par[x][y])/l2norm_seq[x][y] < tol) |
| 211 | assert_(abs(linfnorm_seq[x][y] - linfnorm_par[x][y])/linfnorm_seq[x][y] < tol) |
[2776] | 212 | if x > 0: |
[3243] | 213 | # Verify that the quantity is being conserved across iterations. |
| 214 | assert_(abs(l1norm_seq[x][y] - l1norm_seq[x-1][y]) < tol) |
| 215 | assert_(abs(l2norm_seq[x][y] - l2norm_seq[x-1][y]) < tol) |
| 216 | assert_(abs(linfnorm_seq[x][y] - linfnorm_seq[x-1][y]) < tol) |
| 217 | assert_(abs(l1norm_par[x][y] - l1norm_par[x-1][y]) < tol) |
| 218 | assert_(abs(l2norm_par[x][y] - l2norm_par[x-1][y]) < tol) |
| 219 | assert_(abs(linfnorm_par[x][y] - linfnorm_par[x-1][y]) < tol) |
[6721] | 220 | |
[7459] | 221 | if verbose: print 'Parallel test OK' |
[3586] | 222 | |
[6047] | 223 | |
[7562] | 224 | |
| 225 | finalize() |
