[6991] | 1 | #!/usr/bin/env python |
---|
| 2 | |
---|
| 3 | """Parallel program computing the Mandelbrot set using dynamic load balancing. |
---|
| 4 | Simplified version for use with exercise 15. |
---|
| 5 | |
---|
| 6 | To keep master process busy, run with P+1 processes |
---|
| 7 | where P is the number of physical processors. |
---|
| 8 | |
---|
| 9 | Ole Nielsen, SUT 2003 |
---|
| 10 | """ |
---|
| 11 | |
---|
| 12 | from mandelbrot import calculate_region, balance |
---|
| 13 | from mandelplot import plot |
---|
| 14 | import pypar, numpy |
---|
| 15 | |
---|
| 16 | |
---|
| 17 | # User definable parameters |
---|
| 18 | kmax = 2**15 # Maximal number of iterations (=number of colors) |
---|
| 19 | M = N = 700 # width = height = N |
---|
| 20 | B = 24 # Number of blocks (first dim) |
---|
| 21 | |
---|
| 22 | |
---|
| 23 | # Region in complex plane [-2:2] |
---|
| 24 | real_min = -2.0 |
---|
| 25 | real_max = 1.0 |
---|
| 26 | imag_min = -1.5 |
---|
| 27 | imag_max = 1.5 |
---|
| 28 | |
---|
| 29 | # MPI controls |
---|
| 30 | work_tag = 0 |
---|
| 31 | result_tag = 1 |
---|
| 32 | |
---|
| 33 | #Initialise |
---|
| 34 | t = pypar.time() |
---|
| 35 | P = pypar.size() |
---|
| 36 | p = pypar.rank() |
---|
| 37 | processor_name = pypar.get_processor_name() |
---|
| 38 | |
---|
| 39 | print 'Processor %d initialised on node %s' %(p,processor_name) |
---|
| 40 | |
---|
| 41 | assert P > 1, 'Must have at least one slave' |
---|
| 42 | assert B > P-1, 'Must have more work packets than slaves' |
---|
| 43 | |
---|
| 44 | |
---|
| 45 | A = numpy.zeros((M,N), dtype='i') |
---|
| 46 | if p == 0: |
---|
| 47 | # Create work pool (B blocks) |
---|
| 48 | # using balanced work partitioning |
---|
| 49 | workpool = [] |
---|
| 50 | for i in range(B): |
---|
| 51 | Mlo, Mhi = balance(M, B, i) |
---|
| 52 | workpool.append( (Mlo, Mhi) ) |
---|
| 53 | |
---|
| 54 | |
---|
| 55 | # Distribute initial work to slaves |
---|
| 56 | w = 0 |
---|
| 57 | for d in range(1, P): |
---|
| 58 | pypar.send(workpool[w], destination=d, tag=work_tag) |
---|
| 59 | w += 1 |
---|
| 60 | |
---|
| 61 | #Receive computed work and distribute more |
---|
| 62 | terminated = 0 |
---|
| 63 | while(terminated < P-1): |
---|
| 64 | R, status = pypar.receive(pypar.any_source, tag=result_tag, |
---|
| 65 | return_status=True) |
---|
| 66 | A += R #Aggregate data |
---|
| 67 | d = status.source #Id of slave that just finished |
---|
| 68 | |
---|
| 69 | if w < len(workpool): |
---|
| 70 | #Send new work to slave d |
---|
| 71 | pypar.send(workpool[w], destination=d, tag=work_tag) |
---|
| 72 | w += 1 |
---|
| 73 | else: |
---|
| 74 | #Tell slave d to terminate |
---|
| 75 | pypar.send(None, destination=d, tag=work_tag) |
---|
| 76 | terminated += 1 |
---|
| 77 | |
---|
| 78 | print 'Computed region in %.2f seconds' %(pypar.time()-t) |
---|
| 79 | plot(A, kmax) |
---|
| 80 | |
---|
| 81 | else: |
---|
| 82 | while(True): |
---|
| 83 | #Receive work (or None) |
---|
| 84 | W = pypar.receive(source=0, tag=work_tag) |
---|
| 85 | |
---|
| 86 | if W is None: |
---|
| 87 | print 'Slave p%d finished: time = %.2f' %(p, pypar.time() - t) |
---|
| 88 | break |
---|
| 89 | |
---|
| 90 | #Compute allocated work |
---|
| 91 | A = calculate_region(real_min, real_max, imag_min, imag_max, kmax, |
---|
| 92 | M, N, Mlo = W[0], Mhi = W[1]) |
---|
| 93 | |
---|
| 94 | #Return result |
---|
| 95 | pypar.send(A, destination=0, tag=result_tag) |
---|
| 96 | |
---|
| 97 | pypar.finalize() |
---|
| 98 | |
---|
| 99 | |
---|
| 100 | |
---|
| 101 | |
---|
| 102 | |
---|
| 103 | |
---|
| 104 | |
---|