1 | """ |
---|
2 | Master/Slave Parallel decomposition sample |
---|
3 | |
---|
4 | Run as |
---|
5 | python demo3.py |
---|
6 | or |
---|
7 | mpirun -np 2 demo3.py |
---|
8 | (perhaps try number of processors more than 2) |
---|
9 | |
---|
10 | |
---|
11 | OMN, GPC FEB 2002 |
---|
12 | """ |
---|
13 | |
---|
14 | import sys |
---|
15 | |
---|
16 | try: |
---|
17 | import numpy |
---|
18 | except: |
---|
19 | raise Exception, 'Module numpy must be present to run pypar' |
---|
20 | |
---|
21 | try: |
---|
22 | import pypar |
---|
23 | except: |
---|
24 | raise Exception, 'Module pypar must be present to run parallel' |
---|
25 | |
---|
26 | |
---|
27 | print 'Modules numpy, pypar imported OK' |
---|
28 | |
---|
29 | WORKTAG = 1 |
---|
30 | DIETAG = 2 |
---|
31 | |
---|
32 | |
---|
33 | def master(): |
---|
34 | numCompleted = 0 |
---|
35 | |
---|
36 | print '[MASTER]: I am processor %d of %d on node %s'\ |
---|
37 | %(MPI_myid, MPI_numproc, MPI_node) |
---|
38 | |
---|
39 | # start slaves distributing the first work slot |
---|
40 | for i in range(1, min(MPI_numproc, numWorks)): |
---|
41 | work = workList[i] |
---|
42 | pypar.send(work, destination=i, tag=WORKTAG) |
---|
43 | print '[MASTER]: sent work "%s" to node %d' %(work, i) |
---|
44 | |
---|
45 | # dispatch the remaining work slots on dynamic load-balancing policy |
---|
46 | # the quicker to do the job, the more jobs it takes |
---|
47 | for work in workList[MPI_numproc:]: |
---|
48 | result, status = pypar.receive(source=pypar.any_source, tag=WORKTAG, |
---|
49 | return_status=True) |
---|
50 | |
---|
51 | print '[MASTER]: received result "%s" from node %d'\ |
---|
52 | %(result, status.source) |
---|
53 | |
---|
54 | numCompleted += 1 |
---|
55 | pypar.send(work, destination=status.source, tag=WORKTAG) |
---|
56 | print '[MASTER]: sent work "%s" to node %d' %(work, status.source) |
---|
57 | |
---|
58 | # all works have been dispatched out |
---|
59 | print '[MASTER]: toDo : %d' %numWorks |
---|
60 | print '[MASTER]: done : %d' %numCompleted |
---|
61 | |
---|
62 | # I've still to take into the remaining completions |
---|
63 | while (numCompleted < numWorks): |
---|
64 | result, status = pypar.receive(source=pypar.any_source, |
---|
65 | tag=WORKTAG, |
---|
66 | return_status=True) |
---|
67 | print '[MASTER]: received (final) result "%s" from node %d'\ |
---|
68 | %(result, status.source) |
---|
69 | numCompleted += 1 |
---|
70 | print '[MASTER]: %d completed' %numCompleted |
---|
71 | |
---|
72 | print '[MASTER]: about to terminate slaves' |
---|
73 | |
---|
74 | # Tell slaves to stop working |
---|
75 | for i in range(1, MPI_numproc): |
---|
76 | pypar.send('#', destination=i, tag=DIETAG) |
---|
77 | print '[MASTER]: sent termination signal to node %d' %(i, ) |
---|
78 | |
---|
79 | return |
---|
80 | |
---|
81 | def slave(): |
---|
82 | |
---|
83 | print '[SLAVE %d]: I am processor %d of %d on node %s'\ |
---|
84 | %(MPI_myid, MPI_myid, MPI_numproc, MPI_node) |
---|
85 | |
---|
86 | while True: |
---|
87 | result, status = pypar.receive(source=0, |
---|
88 | tag=pypar.any_tag, |
---|
89 | return_status=True) |
---|
90 | print '[SLAVE %d]: received work "%s" with tag %d from node %d'\ |
---|
91 | %(MPI_myid, result, status.tag, status.source) |
---|
92 | |
---|
93 | if (status.tag == DIETAG): |
---|
94 | print '[SLAVE %d]: received termination from node %d'\ |
---|
95 | %(MPI_myid, 0) |
---|
96 | return |
---|
97 | else: |
---|
98 | result = 'X'+result |
---|
99 | pypar.send(result, destination=0, tag=WORKTAG) |
---|
100 | print '[SLAVE %d]: sent result "%s" to node %d'\ |
---|
101 | %(MPI_myid, result, 0) |
---|
102 | |
---|
103 | |
---|
104 | |
---|
105 | if __name__ == '__main__': |
---|
106 | MPI_myid = pypar.rank() |
---|
107 | MPI_numproc = pypar.size() |
---|
108 | MPI_node = pypar.get_processor_name() |
---|
109 | |
---|
110 | workList = ('_dummy_', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j') |
---|
111 | numWorks = len(workList) - 1 |
---|
112 | |
---|
113 | #FIXME, better control here |
---|
114 | if MPI_numproc > numWorks or MPI_numproc < 2: |
---|
115 | pypar.finalize() |
---|
116 | if MPI_myid == 0: |
---|
117 | print 'ERROR: Number of processors must be in the interval [2,%d].'%numWorks |
---|
118 | sys.exit(-1) |
---|
119 | |
---|
120 | if MPI_myid == 0: |
---|
121 | master() |
---|
122 | else: |
---|
123 | slave() |
---|
124 | |
---|
125 | pypar.finalize() |
---|
126 | print 'MPI environment finalized.' |
---|
127 | |
---|