source: trunk/anuga_core/source/pypar_dist/demos/demo3.py @ 8494

Last change on this file since 8494 was 8494, checked in by steve, 13 years ago

We are working on improvements to the parallel code which means we need to
add to the standard pypar distribution.

File size: 3.6 KB
Line 
1"""
2Master/Slave Parallel decomposition sample
3
4Run as
5   python demo3.py
6or
7   mpirun -np 2 demo3.py
8  (perhaps try number of processors more than 2)
9 
10
11OMN, GPC FEB 2002
12"""
13
14import sys
15
16try:
17    import numpy
18except:
19    raise Exception, 'Module numpy must be present to run pypar'
20 
21try:
22    import pypar
23except:
24    raise Exception, 'Module pypar must be present to run parallel'
25
26
27print 'Modules numpy, pypar imported OK'
28
29WORKTAG = 1
30DIETAG =  2
31
32
33def master():
34    numCompleted = 0
35
36    print '[MASTER]: I am processor %d of %d on node %s'\
37          %(MPI_myid, MPI_numproc, MPI_node)
38
39    # start slaves distributing the first work slot
40    for i in range(1, min(MPI_numproc, numWorks)):
41        work = workList[i]
42        pypar.send(work, destination=i, tag=WORKTAG)
43        print '[MASTER]: sent work "%s" to node %d' %(work, i)
44
45    # dispatch the remaining work slots on dynamic load-balancing policy
46    # the quicker to do the job, the more jobs it takes
47    for work in workList[MPI_numproc:]:
48        result, status = pypar.receive(source=pypar.any_source, tag=WORKTAG,
49                                       return_status=True)
50
51        print '[MASTER]: received result "%s" from node %d'\
52              %(result, status.source)
53
54        numCompleted += 1
55        pypar.send(work, destination=status.source, tag=WORKTAG)
56        print '[MASTER]: sent work "%s" to node %d' %(work, status.source)
57
58    # all works have been dispatched out
59    print '[MASTER]: toDo : %d' %numWorks
60    print '[MASTER]: done : %d' %numCompleted
61
62    # I've still to take into the remaining completions
63    while (numCompleted < numWorks):
64        result, status = pypar.receive(source=pypar.any_source,
65                                       tag=WORKTAG,
66                                       return_status=True)
67        print '[MASTER]: received (final) result "%s" from node %d'\
68                  %(result, status.source)
69        numCompleted += 1
70        print '[MASTER]: %d completed' %numCompleted
71
72    print '[MASTER]: about to terminate slaves'
73
74    # Tell slaves to stop working
75    for i in range(1, MPI_numproc):
76        pypar.send('#', destination=i, tag=DIETAG)
77        print '[MASTER]: sent termination signal to node %d' %(i, )
78
79    return
80
81def slave():
82
83    print '[SLAVE %d]: I am processor %d of %d on node %s'\
84                     %(MPI_myid, MPI_myid, MPI_numproc, MPI_node)
85
86    while True:
87        result, status = pypar.receive(source=0,
88                                       tag=pypar.any_tag,
89                                       return_status=True)
90        print '[SLAVE %d]: received work "%s" with tag %d from node %d'\
91                  %(MPI_myid, result, status.tag, status.source)
92
93        if (status.tag == DIETAG):
94            print '[SLAVE %d]: received termination from node %d'\
95                             %(MPI_myid, 0)
96            return
97        else:
98            result = 'X'+result
99            pypar.send(result, destination=0, tag=WORKTAG)
100            print '[SLAVE %d]: sent result "%s" to node %d'\
101            %(MPI_myid, result, 0)
102
103
104
105if __name__ == '__main__':
106    MPI_myid =    pypar.rank()
107    MPI_numproc = pypar.size()
108    MPI_node =    pypar.get_processor_name()
109
110    workList = ('_dummy_', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j')
111    numWorks = len(workList) - 1
112
113    #FIXME, better control here
114    if MPI_numproc > numWorks or MPI_numproc < 2:
115        pypar.finalize()
116        if MPI_myid == 0:
117              print 'ERROR: Number of processors must be in the interval [2,%d].'%numWorks
118              sys.exit(-1)
119
120    if MPI_myid == 0:
121        master()
122    else:
123        slave()
124
125    pypar.finalize()
126    print 'MPI environment finalized.'
127   
Note: See TracBrowser for help on using the repository browser.