source: pypar/demo3.py @ 820

Last change on this file since 820 was 85, checked in by ole, 19 years ago

Added pypar files

File size: 4.0 KB
Line 
1#!/usr/bin/env python
2#################################################################
3# Master/Slave Parallel decomposition sample
4#
5# Run as
6#   python demo3.py
7# or
8#   mpirun -np 2 demo3.py
9# (perhaps try number of processors more than 2)
10#################################################################
11#
12# To verify bandwidth of your architexture please
13# run pytiming (and ctiming)
14#
15# OMN, GPC FEB 2002
16#
17#
18
19import sys
20
21try:
22  import Numeric
23except:
24  raise 'Module Numeric must be present to run pypar'
25 
26try:
27  import pypar
28except:
29  raise 'Module pypar must be present to run parallel'
30
31sys.stderr.write("Modules Numeric, pypar imported OK\n")
32
33WORKTAG = 1
34DIETAG =  2
35
36
37def master():
38    numCompleted = 0
39   
40    sys.stderr.write("[MASTER]: I am processor %d of %d on node %s\n" %(MPI_myid, MPI_numproc, MPI_node))
41   
42    # start slaves distributing the first work slot
43    for i in range(1, min(MPI_numproc, numWorks)): 
44        work = workList[i]
45        pypar.raw_send(work, i, WORKTAG) 
46        sys.stderr.write("[MASTER]: sent work '%s' to node '%d'\n" %(work, i))
47
48    # dispach the remaining work slots on dynamic load-balancing policy
49    # the quicker to do the job, the more jobs it takes
50    for work in workList[MPI_numproc:]:
51        result = '  '
52        err, status = pypar.raw_receive(result, pypar.any_source, pypar.any_tag, return_status=True) 
53        #sys.stderr.write( "[MASTER]: received result '%s' from node '%d'\n" %(result, err[1][0]))
54        sys.stderr.write("[MASTER]: received result '%s' from node '%d'\n" %(result, status.source))
55        numCompleted += 1
56        pypar.raw_send(work, status.source, WORKTAG)
57        sys.stderr.write("[MASTER]: sent work '%s' to node '%d'\n" %(work, status.source))
58   
59    # all works have been dispatched out
60    sys.stderr.write("[MASTER]: toDo : %d\n" %numWorks)
61    sys.stderr.write("[MASTER]: done : %d\n" %numCompleted)
62   
63    # I've still to take into the remaining completions   
64    while(numCompleted < numWorks): 
65        result = '  '
66        err, status = pypar.raw_receive(result, pypar.any_source, pypar.any_tag, return_status=True) 
67        sys.stderr.write("[MASTER]: received (final) result '%s' from node '%d'\n" %(result, status.source))
68        numCompleted += 1
69        sys.stderr.write("[MASTER]: %d completed\n" %numCompleted)
70       
71    sys.stderr.write( "[MASTER]: about to terminate slaves\n")
72
73    # say slaves to stop working
74    for i in range(1, MPI_numproc): 
75        pypar.raw_send('#', i, DIETAG) 
76        sys.stderr.write("[MASTER]: sent (final) work '%s' to node '%d'\n" %(0, i))
77       
78    return
79   
80def slave():
81
82    sys.stderr.write( "[SLAVE %d]: I am processor %d of %d on node %s\n" %(MPI_myid, MPI_myid, MPI_numproc, MPI_node))
83
84    while 1:
85        result = ' '
86        err, status = pypar.raw_receive(result, pypar.any_source, pypar.any_tag, return_status=True) 
87        sys.stderr.write("[SLAVE %d]: received work '%s' with tag '%d' from node '%d'\n"\
88              %(MPI_myid, result, status.tag, status.source))
89       
90        if (status.tag == DIETAG):
91            sys.stderr.write("[SLAVE %d]: received termination from node '%d'\n" %(MPI_myid, 0))
92            return
93        else:
94            result = 'X'+result
95            pypar.raw_send(result, 0)
96            sys.stderr.write("[SLAVE %d]: sent result '%s' to node '%d'\n" %(MPI_myid, result, 0))
97           
98       
99
100if __name__ == '__main__':
101    MPI_myid =    pypar.rank()
102    MPI_numproc = pypar.size()
103    MPI_node =    pypar.Get_processor_name()
104
105    _workList = ('_dummy_', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j')
106    workList = ('_dummy_', 'a', 'b', 'c')
107    numWorks = len(workList) - 1
108   
109   
110    #FIXME, better control here
111    if MPI_numproc > numWorks or MPI_numproc < 2:
112        pypar.Finalize()
113        if MPI_myid == 0:
114          sys.stderr.write("ERROR: Number of processors must be in the interval [2,%d].\n" %numWorks)
115         
116        sys.exit(-1)
117
118    if MPI_myid == 0:
119        master()
120    else:
121        slave()
122
123    pypar.Finalize()
124    sys.stderr.write("MPI environment finalized.\n")
125               
Note: See TracBrowser for help on using the repository browser.