[85] | 1 | #!/usr/bin/env python |
---|
| 2 | ################################################################# |
---|
| 3 | # Master/Slave Parallel decomposition sample |
---|
| 4 | # |
---|
| 5 | # Run as |
---|
| 6 | # python demo3.py |
---|
| 7 | # or |
---|
| 8 | # mpirun -np 2 demo3.py |
---|
| 9 | # (perhaps try number of processors more than 2) |
---|
| 10 | ################################################################# |
---|
| 11 | # |
---|
| 12 | # To verify bandwidth of your architexture please |
---|
| 13 | # run pytiming (and ctiming) |
---|
| 14 | # |
---|
| 15 | # OMN, GPC FEB 2002 |
---|
| 16 | # |
---|
| 17 | # |
---|
| 18 | |
---|
| 19 | import sys |
---|
| 20 | |
---|
| 21 | try: |
---|
| 22 | import Numeric |
---|
| 23 | except: |
---|
| 24 | raise 'Module Numeric must be present to run pypar' |
---|
| 25 | |
---|
| 26 | try: |
---|
| 27 | import pypar |
---|
| 28 | except: |
---|
| 29 | raise 'Module pypar must be present to run parallel' |
---|
| 30 | |
---|
| 31 | sys.stderr.write("Modules Numeric, pypar imported OK\n") |
---|
| 32 | |
---|
| 33 | WORKTAG = 1 |
---|
| 34 | DIETAG = 2 |
---|
| 35 | |
---|
| 36 | |
---|
| 37 | def master(): |
---|
| 38 | numCompleted = 0 |
---|
| 39 | |
---|
| 40 | sys.stderr.write("[MASTER]: I am processor %d of %d on node %s\n" %(MPI_myid, MPI_numproc, MPI_node)) |
---|
| 41 | |
---|
| 42 | # start slaves distributing the first work slot |
---|
| 43 | for i in range(1, min(MPI_numproc, numWorks)): |
---|
| 44 | work = workList[i] |
---|
| 45 | pypar.raw_send(work, i, WORKTAG) |
---|
| 46 | sys.stderr.write("[MASTER]: sent work '%s' to node '%d'\n" %(work, i)) |
---|
| 47 | |
---|
| 48 | # dispach the remaining work slots on dynamic load-balancing policy |
---|
| 49 | # the quicker to do the job, the more jobs it takes |
---|
| 50 | for work in workList[MPI_numproc:]: |
---|
| 51 | result = ' ' |
---|
| 52 | err, status = pypar.raw_receive(result, pypar.any_source, pypar.any_tag, return_status=True) |
---|
| 53 | #sys.stderr.write( "[MASTER]: received result '%s' from node '%d'\n" %(result, err[1][0])) |
---|
| 54 | sys.stderr.write("[MASTER]: received result '%s' from node '%d'\n" %(result, status.source)) |
---|
| 55 | numCompleted += 1 |
---|
| 56 | pypar.raw_send(work, status.source, WORKTAG) |
---|
| 57 | sys.stderr.write("[MASTER]: sent work '%s' to node '%d'\n" %(work, status.source)) |
---|
| 58 | |
---|
| 59 | # all works have been dispatched out |
---|
| 60 | sys.stderr.write("[MASTER]: toDo : %d\n" %numWorks) |
---|
| 61 | sys.stderr.write("[MASTER]: done : %d\n" %numCompleted) |
---|
| 62 | |
---|
| 63 | # I've still to take into the remaining completions |
---|
| 64 | while(numCompleted < numWorks): |
---|
| 65 | result = ' ' |
---|
| 66 | err, status = pypar.raw_receive(result, pypar.any_source, pypar.any_tag, return_status=True) |
---|
| 67 | sys.stderr.write("[MASTER]: received (final) result '%s' from node '%d'\n" %(result, status.source)) |
---|
| 68 | numCompleted += 1 |
---|
| 69 | sys.stderr.write("[MASTER]: %d completed\n" %numCompleted) |
---|
| 70 | |
---|
| 71 | sys.stderr.write( "[MASTER]: about to terminate slaves\n") |
---|
| 72 | |
---|
| 73 | # say slaves to stop working |
---|
| 74 | for i in range(1, MPI_numproc): |
---|
| 75 | pypar.raw_send('#', i, DIETAG) |
---|
| 76 | sys.stderr.write("[MASTER]: sent (final) work '%s' to node '%d'\n" %(0, i)) |
---|
| 77 | |
---|
| 78 | return |
---|
| 79 | |
---|
| 80 | def slave(): |
---|
| 81 | |
---|
| 82 | sys.stderr.write( "[SLAVE %d]: I am processor %d of %d on node %s\n" %(MPI_myid, MPI_myid, MPI_numproc, MPI_node)) |
---|
| 83 | |
---|
| 84 | while 1: |
---|
| 85 | result = ' ' |
---|
| 86 | err, status = pypar.raw_receive(result, pypar.any_source, pypar.any_tag, return_status=True) |
---|
| 87 | sys.stderr.write("[SLAVE %d]: received work '%s' with tag '%d' from node '%d'\n"\ |
---|
| 88 | %(MPI_myid, result, status.tag, status.source)) |
---|
| 89 | |
---|
| 90 | if (status.tag == DIETAG): |
---|
| 91 | sys.stderr.write("[SLAVE %d]: received termination from node '%d'\n" %(MPI_myid, 0)) |
---|
| 92 | return |
---|
| 93 | else: |
---|
| 94 | result = 'X'+result |
---|
| 95 | pypar.raw_send(result, 0) |
---|
| 96 | sys.stderr.write("[SLAVE %d]: sent result '%s' to node '%d'\n" %(MPI_myid, result, 0)) |
---|
| 97 | |
---|
| 98 | |
---|
| 99 | |
---|
| 100 | if __name__ == '__main__': |
---|
| 101 | MPI_myid = pypar.rank() |
---|
| 102 | MPI_numproc = pypar.size() |
---|
| 103 | MPI_node = pypar.Get_processor_name() |
---|
| 104 | |
---|
| 105 | _workList = ('_dummy_', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j') |
---|
| 106 | workList = ('_dummy_', 'a', 'b', 'c') |
---|
| 107 | numWorks = len(workList) - 1 |
---|
| 108 | |
---|
| 109 | |
---|
| 110 | #FIXME, better control here |
---|
| 111 | if MPI_numproc > numWorks or MPI_numproc < 2: |
---|
| 112 | pypar.Finalize() |
---|
| 113 | if MPI_myid == 0: |
---|
| 114 | sys.stderr.write("ERROR: Number of processors must be in the interval [2,%d].\n" %numWorks) |
---|
| 115 | |
---|
| 116 | sys.exit(-1) |
---|
| 117 | |
---|
| 118 | if MPI_myid == 0: |
---|
| 119 | master() |
---|
| 120 | else: |
---|
| 121 | slave() |
---|
| 122 | |
---|
| 123 | pypar.Finalize() |
---|
| 124 | sys.stderr.write("MPI environment finalized.\n") |
---|
| 125 | |
---|