source: branches/numpy_misc/tools/acceptance_tests/mandelbrot/mandel_parallel_dynamic.py

Last change on this file was 6991, checked in by rwilson, 16 years ago

Initial commit of the cluster acceptance package.

  • Property svn:executable set to *
File size: 2.5 KB
Line 
1#!/usr/bin/env python
2
3"""Parallel program computing the Mandelbrot set using dynamic load balancing.
4   Simplified version for use with exercise 15.
5
6   To keep master process busy, run with P+1 processes
7   where P is the number of physical processors.
8   
9   Ole Nielsen, SUT 2003
10"""
11
12from mandelbrot import calculate_region, balance
13from mandelplot import plot
14import pypar, numpy
15
16
17# User definable parameters
18kmax = 2**15  # Maximal number of iterations (=number of colors)
19M = N = 700   # width = height = N
20B = 24        # Number of blocks (first dim)
21
22
23# Region in complex plane [-2:2]
24real_min = -2.0
25real_max =  1.0
26imag_min = -1.5
27imag_max =  1.5
28
29# MPI controls
30work_tag = 0
31result_tag = 1
32
33#Initialise
34t = pypar.time()
35P = pypar.size()
36p = pypar.rank()
37processor_name = pypar.get_processor_name()
38
39print 'Processor %d initialised on node %s' %(p,processor_name)
40
41assert P > 1, 'Must have at least one slave'
42assert B > P-1, 'Must have more work packets than slaves'
43
44
45A = numpy.zeros((M,N), dtype='i')
46if p == 0:
47    # Create work pool (B blocks)
48    # using balanced work partitioning
49    workpool = []
50    for i in range(B):
51        Mlo, Mhi = balance(M, B, i)   
52        workpool.append( (Mlo, Mhi) )
53
54
55    # Distribute initial work to slaves
56    w = 0 
57    for d in range(1, P):
58        pypar.send(workpool[w], destination=d, tag=work_tag)
59        w += 1
60
61    #Receive computed work and distribute more
62    terminated = 0
63    while(terminated < P-1):
64        R, status = pypar.receive(pypar.any_source, tag=result_tag,
65                                  return_status=True)
66        A += R            #Aggregate data       
67        d = status.source #Id of slave that just finished       
68
69        if w < len(workpool):
70            #Send new work to slave d
71            pypar.send(workpool[w], destination=d, tag=work_tag)
72            w += 1
73        else:
74            #Tell slave d to terminate
75            pypar.send(None, destination=d, tag=work_tag) 
76            terminated += 1
77       
78    print 'Computed region in %.2f seconds' %(pypar.time()-t)
79    plot(A, kmax)       
80
81else:
82    while(True):
83        #Receive work (or None)
84        W = pypar.receive(source=0, tag=work_tag)
85       
86        if W is None:
87            print 'Slave p%d finished: time = %.2f' %(p, pypar.time() - t)
88            break
89
90        #Compute allocated work
91        A = calculate_region(real_min, real_max, imag_min, imag_max, kmax,
92                             M, N, Mlo = W[0], Mhi = W[1])
93
94        #Return result
95        pypar.send(A, destination=0, tag=result_tag)
96
97pypar.finalize()
98
99
100
101
102
103
104
Note: See TracBrowser for help on using the repository browser.