1 | #!/usr/bin/env python |
---|
2 | # Test of MPI module 'pypar' for Python |
---|
3 | # Demonstrates a 'master-slave' parallel program |
---|
4 | # |
---|
5 | # Run as |
---|
6 | # python demo2.py |
---|
7 | # or |
---|
8 | # mpirun -np 4 demo2.py |
---|
9 | # |
---|
10 | # GPC, FEB 2002 |
---|
11 | |
---|
12 | try: |
---|
13 | import Numeric |
---|
14 | except: |
---|
15 | raise 'Module Numeric must be present to run pypar' |
---|
16 | |
---|
17 | try: |
---|
18 | import pypar |
---|
19 | except: |
---|
20 | raise 'Module pypar must be present to run parallel' |
---|
21 | |
---|
22 | print "Modules Numeric, pypar imported OK" |
---|
23 | |
---|
24 | WORKTAG = 1 |
---|
25 | DIETAG = 2 |
---|
26 | |
---|
27 | |
---|
28 | def master(): |
---|
29 | numCompleted = 0 |
---|
30 | |
---|
31 | print "[MASTER]: I am processor %d of %d on node %s" %(MPI_myid, MPI_numproc, MPI_node) |
---|
32 | |
---|
33 | # start slaves distributing the first work slot |
---|
34 | for i in range(1, MPI_numproc): |
---|
35 | work = workList[i] |
---|
36 | pypar.raw_send(work, i, WORKTAG) |
---|
37 | print "[MASTER]: sent work '%s' to node '%d'" %(work, i) |
---|
38 | |
---|
39 | # dispach the remaining work slots on dynamic load-balancing policy |
---|
40 | # the quicker to do the job, the more jobs it takes |
---|
41 | for work in workList[MPI_numproc:]: |
---|
42 | result = ' ' |
---|
43 | err = pypar.raw_receive(result, pypar.MPI_ANY_SOURCE, pypar.MPI_ANY_TAG) |
---|
44 | print "[MASTER]: received result '%s' from node '%d'" %(result, err[1][0]) |
---|
45 | numCompleted += 1 |
---|
46 | pypar.raw_send(work, err[1][0], WORKTAG) |
---|
47 | print "[MASTER]: sent work '%s' to node '%d'" %(work, err[1][0]) |
---|
48 | |
---|
49 | # all works have been dispatched out |
---|
50 | print "[MASTER]: toDo : %d" %numWorks |
---|
51 | print "[MASTER]: done : %d" %numCompleted |
---|
52 | |
---|
53 | # I've still to take into the remaining completions |
---|
54 | while(numCompleted < numWorks): |
---|
55 | result = ' ' |
---|
56 | err = pypar.raw_receive(result, pypar.MPI_ANY_SOURCE, pypar.MPI_ANY_TAG) |
---|
57 | print "[MASTER]: received (final) result '%s' from node '%d'" %(result, err[1][0]) |
---|
58 | numCompleted += 1 |
---|
59 | print "[MASTER]: %d completed" %numCompleted |
---|
60 | |
---|
61 | print "[MASTER]: about to terminate slaves" |
---|
62 | |
---|
63 | # say slaves to stop working |
---|
64 | for i in range(1, MPI_numproc): |
---|
65 | pypar.raw_send('#', i, DIETAG) |
---|
66 | print "[MASTER]: sent (final) work '%s' to node '%d'" %(0, i) |
---|
67 | |
---|
68 | return |
---|
69 | |
---|
70 | def slave(): |
---|
71 | |
---|
72 | print "[SLAVE %d]: I am processor %d of %d on node %s" %(MPI_myid, MPI_myid, MPI_numproc, MPI_node) |
---|
73 | |
---|
74 | while 1: |
---|
75 | result = ' ' |
---|
76 | err = pypar.raw_receive(result, pypar.MPI_ANY_SOURCE, pypar.MPI_ANY_TAG) |
---|
77 | print "[SLAVE %d]: received work '%s' with tag '%d' from node '%d'" %(MPI_myid, result, err[1][1], err[1][0]) |
---|
78 | |
---|
79 | if (err[1][1] == DIETAG): |
---|
80 | print "[SLAVE %d]: received termination from node '%d'" %(MPI_myid, 0) |
---|
81 | return |
---|
82 | else: |
---|
83 | result = 'X'+result |
---|
84 | pypar.raw_send(result, 0) |
---|
85 | print "[SLAVE %d]: sent result '%s' to node '%d'" %(MPI_myid, result, 0) |
---|
86 | |
---|
87 | |
---|
88 | |
---|
89 | if __name__ == '__main__': |
---|
90 | MPI_myid = pypar.rank() |
---|
91 | MPI_numproc = pypar.size() |
---|
92 | MPI_node = pypar.Get_processor_name() |
---|
93 | |
---|
94 | workList = ('_dummy_', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j') |
---|
95 | _workList = ('_dummy_', 'a', 'b', 'c', 'd', 'e', 'f') |
---|
96 | numWorks = len(workList) - 1 |
---|
97 | |
---|
98 | if MPI_myid == 0: |
---|
99 | master() |
---|
100 | else: |
---|
101 | slave() |
---|
102 | |
---|
103 | pypar.Finalize() |
---|
104 | print "MPI environment finalized." |
---|
105 | |
---|