source: anuga_core/source/pypar-numeric/demo3.py @ 5779

Last change on this file since 5779 was 5779, checked in by steve, 16 years ago

Added the old version of pypar which works with Numeric. Necessary for parallel code until we move anuga to numpy (and then we can use pypar as distribute via sourceforge).

File size: 4.0 KB
Line 
1#!/usr/bin/env python
2#################################################################
3# Master/Slave Parallel decomposition sample
4#
5# Run as
6#   python demo3.py
7# or
8#   mpirun -np 2 demo3.py
9# (perhaps try number of processors more than 2)
10#################################################################
11#
12# To verify bandwidth of your architexture please
13# run pytiming (and ctiming)
14#
15# OMN, GPC FEB 2002
16#
17#
18
19import sys
20
21try:
22  import Numeric
23except:
24  raise 'Module Numeric must be present to run pypar'
25 
26try:
27  import pypar
28except:
29  raise 'Module pypar must be present to run parallel'
30
31sys.stderr.write("Modules Numeric, pypar imported OK\n")
32
33WORKTAG = 1
34DIETAG =  2
35
36
37def master():
38    numCompleted = 0
39   
40    sys.stderr.write("[MASTER]: I am processor %d of %d on node %s\n" %(MPI_myid, MPI_numproc, MPI_node))
41   
42    # start slaves distributing the first work slot
43    for i in range(1, min(MPI_numproc, numWorks)): 
44        work = workList[i]
45        pypar.raw_send(work, i, WORKTAG) 
46        sys.stderr.write("[MASTER]: sent work '%s' to node '%d'\n" %(work, i))
47
48    # dispach the remaining work slots on dynamic load-balancing policy
49    # the quicker to do the job, the more jobs it takes
50    for work in workList[MPI_numproc:]:
51        result = '  '
52        err, status = pypar.raw_receive(result, pypar.any_source, pypar.any_tag, return_status=True) 
53        #sys.stderr.write( "[MASTER]: received result '%s' from node '%d'\n" %(result, err[1][0]))
54        sys.stderr.write("[MASTER]: received result '%s' from node '%d'\n" %(result, status.source))
55        numCompleted += 1
56        pypar.raw_send(work, status.source, WORKTAG)
57        sys.stderr.write("[MASTER]: sent work '%s' to node '%d'\n" %(work, status.source))
58   
59    # all works have been dispatched out
60    sys.stderr.write("[MASTER]: toDo : %d\n" %numWorks)
61    sys.stderr.write("[MASTER]: done : %d\n" %numCompleted)
62   
63    # I've still to take into the remaining completions   
64    while(numCompleted < numWorks): 
65        result = '  '
66        err, status = pypar.raw_receive(result, pypar.any_source, pypar.any_tag, return_status=True) 
67        sys.stderr.write("[MASTER]: received (final) result '%s' from node '%d'\n" %(result, status.source))
68        numCompleted += 1
69        sys.stderr.write("[MASTER]: %d completed\n" %numCompleted)
70       
71    sys.stderr.write( "[MASTER]: about to terminate slaves\n")
72
73    # say slaves to stop working
74    for i in range(1, MPI_numproc): 
75        pypar.raw_send('#', i, DIETAG) 
76        sys.stderr.write("[MASTER]: sent (final) work '%s' to node '%d'\n" %(0, i))
77       
78    return
79   
80def slave():
81
82    sys.stderr.write( "[SLAVE %d]: I am processor %d of %d on node %s\n" %(MPI_myid, MPI_myid, MPI_numproc, MPI_node))
83
84    while 1:
85        result = ' '
86        err, status = pypar.raw_receive(result, pypar.any_source, pypar.any_tag, return_status=True) 
87        sys.stderr.write("[SLAVE %d]: received work '%s' with tag '%d' from node '%d'\n"\
88              %(MPI_myid, result, status.tag, status.source))
89       
90        if (status.tag == DIETAG):
91            sys.stderr.write("[SLAVE %d]: received termination from node '%d'\n" %(MPI_myid, 0))
92            return
93        else:
94            result = 'X'+result
95            pypar.raw_send(result, 0)
96            sys.stderr.write("[SLAVE %d]: sent result '%s' to node '%d'\n" %(MPI_myid, result, 0))
97           
98       
99
100if __name__ == '__main__':
101    MPI_myid =    pypar.rank()
102    MPI_numproc = pypar.size()
103    MPI_node =    pypar.Get_processor_name()
104
105    _workList = ('_dummy_', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j')
106    workList = ('_dummy_', 'a', 'b', 'c')
107    numWorks = len(workList) - 1
108   
109   
110    #FIXME, better control here
111    if MPI_numproc > numWorks or MPI_numproc < 2:
112        pypar.Finalize()
113        if MPI_myid == 0:
114          sys.stderr.write("ERROR: Number of processors must be in the interval [2,%d].\n" %numWorks)
115         
116        sys.exit(-1)
117
118    if MPI_myid == 0:
119        master()
120    else:
121        slave()
122
123    pypar.Finalize()
124    sys.stderr.write("MPI environment finalized.\n")
125               
Note: See TracBrowser for help on using the repository browser.