source: anuga_core/source/pypar-numeric/pytiming @ 7276

Last change on this file since 7276 was 5779, checked in by steve, 15 years ago

Added the old version of pypar which works with Numeric. Necessary for parallel code until we move anuga to numpy (and then we can use pypar as distribute via sourceforge).

File size: 6.6 KB
RevLine 
[5779]1#!/usr/bin/env python
2
3# Timing of MPI module for Python and estimation of latency and bandwidth
4#
5# Send numerical array in a ring from processor 0 to 1 etc back to 0
6# Perform timings and compare different sending strategies
7#
8# OMN, OCT 2001
9
10import time, sys, pypar, Numeric
11
12# The send/recv routines
13#
14import pypar
15
16#from mpiext import send_array, receive_array  #most direct methods
17#Not available in final install - use bypass form for now
18
19#--------------------------------------------------------------
20
21method = 2   # Use
22             # 0: automatically allocated buffers
23             # 1: user-supplied buffers
24             # 2: Use bypass - let pypar use direct mpi_ext call
25             # 3 (not there): direct call to mpiext (with buffers),
26             #    only fractionally better than bypass
27                 
28vanilla = False  # Force use of vanilla format (1)
29consistency_check = False # Check correctness
30
31
32#--------------------------------------------------------------
33# linfit
34#
35def linfit(x, y):
36  """Fit a and b to the model y = ax + b. Return a,b,variance
37  """
38 
39  Sx = Sy = SSoN = SxoN = norm = varest = 0.0
40  N = len(x)
41  assert len(y) == N, "x and y must have same length"
42 
43  for i in range(N):
44    #print("x,y = %f, %f\n",x[i],y[i])
45    Sx  = Sx + x[i]
46    Sy  = Sy + y[i]
47 
48  SxoN = Sx/N
49 
50  a = 0.0 
51  for i in range(N):
52    t    = x[i] - SxoN
53    SSoN = SSoN + t*t
54    a    = a + t*y[i]
55
56
57  a = a/SSoN            # a = (N Sxy - SxSy)/(NSxx - Sx^2) */
58  b = (Sy - Sx*a)/N
59 
60  # Quality - variance estimate \sum_i r_i^2 /(m-n)
61  for i in range(N): 
62    norm = norm + float(x[i])*x[i]
63    res = y[i] - a*x[i] - b
64    varest = varest + res*res
65
66  varest = varest/norm/(N-2)
67  return a, b, varest
68
69
70
71
72#--------------------------------------------------------------
73# Main program
74#
75MAXI  = 10         # Number of blocks
76MAXM  = 500000     # Largest block
77BLOCK = MAXM/MAXI  # Block size
78
79repeats = 10
80msgid = 0
81vanilla = 0 #Select vanilla mode (slower but general)
82
83numprocs = pypar.size()
84myid = pypar.rank()
85processor_name = pypar.Get_processor_name()
86
87if myid == 0:
88  # Main process - Create message, pass on, verify correctness and log timing
89  #
90  print "MAXM = %d, number of processors = %d" %(MAXM, numprocs)
91  print "Measurements are repeated %d times for reliability" %repeats
92
93if numprocs < 2:
94  print "Program needs at least two processors - aborting\n"
95  pypar.Abort()
96   
97pypar.Barrier() #Synchronize all before timing   
98print "I am process %d on %s" %(myid,processor_name)
99
100
101#Initialise data and timings
102#
103
104try:
105  from RandomArray import uniform, seed
106  seed(17, 53)
107  A = uniform(0.0,100.0,MAXM)
108except:
109  print 'problem with RandomArray'
110  from Numeric import ones, Float
111  A = ones( MAXM, Float )
112 
113elsize = A.itemsize()
114#print elsize
115
116noelem  = [0]*MAXI
117bytes   = [0]*MAXI         
118avgtime = [0.0]*MAXI         
119mintime = [ 1000000.0]*MAXI     
120maxtime = [-1000000.0]*MAXI           
121
122
123
124
125if myid == 0:   
126  # Determine timer overhead
127  cpuOH = 1.0;
128  for k in range(repeats):   # Repeat to get reliable timings
129    t1 = pypar.Wtime()
130    t2 = pypar.Wtime()
131    if t2-t1 < cpuOH: cpuOH = t2-t1
132   
133  print "Timing overhead is %f seconds.\n" %cpuOH         
134
135     
136# Pass msg circularly   
137for k in range(repeats):
138  if myid == 0:
139    print "Run %d of %d" %(k+1,repeats)
140   
141  for i in range(MAXI):
142    m=BLOCK*i+1       
143   
144    noelem[i] = m
145   
146    pypar.Barrier() # Synchronize
147   
148    if myid == 0:
149      #
150      # Main process
151      #
152      t1 = pypar.Wtime()
153
154      if method==0:
155        pypar.send(A[:m], destination=1, tag=msgid, vanilla=vanilla)
156        C = pypar.receive(numprocs-1, tag=msgid, vanilla=vanilla)
157      elif method == 1: 
158        pypar.send(A[:m], use_buffer=True, destination=1,
159                   tag=msgid, vanilla=vanilla)
160        C = pypar.receive(numprocs-1, buffer=A[:m], tag=msgid, vanilla=vanilla)
161      elif method==2:
162        pypar.send(A[:m], use_buffer=True, destination=1,
163                   tag=msgid, vanilla=vanilla, bypass=True)
164        C = pypar.receive(numprocs-1, buffer=A[:m], tag=msgid,
165                          vanilla=vanilla, bypass=True)
166      else:
167        raise 'Unknown method'
168        #send_array(A[:m], 1, msgid)   
169        #stat = receive_array(A[:m], numprocs-1, msgid)
170        #C = A[:m]
171       
172      t2 = pypar.Wtime() - t1 - cpuOH
173      t2 = t2/numprocs
174      avgtime[i] = avgtime[i] + t2
175      if t2 < mintime[i]: mintime[i] = t2
176      if t2 > maxtime[i]: maxtime[i] = t2
177
178      # Uncomment to verify integrity of data
179      # However, this may affect accuracy of timings for some reason.
180      #
181      if consistency_check:
182        assert Numeric.alltrue(C == A[:m])
183    else:
184      #
185      # Parallel process - get msg and pass it on
186      #
187
188      if method==0:
189        C = pypar.receive(myid-1, tag=msgid, vanilla=vanilla)
190        pypar.send(A[:m], destination=(myid+1)%numprocs,
191                   tag=msgid, vanilla=vanilla)
192      elif method==1: 
193        C = pypar.receive(myid-1, buffer=A[:m], tag=msgid, vanilla=vanilla)
194        pypar.send(A[:m], use_buffer=True, destination=(myid+1)%numprocs,
195                   tag=msgid, vanilla=vanilla)                       
196      elif method==2:
197        # Use pypar bypass
198        C = pypar.receive(myid-1, buffer=A[:m], tag=msgid,
199                          vanilla=vanilla, bypass=True)
200        pypar.send(A[:m], use_buffer=True, destination=(myid+1)%numprocs,
201                   tag=msgid, vanilla=vanilla, bypass=True)
202      else:
203        raise 'Unknown method'
204        # Use direct mpiext call
205        #stat = receive_array(A[:m], myid-1, msgid)               
206        #send_array(A[:m], (myid+1)%numprocs, msgid)
207
208
209# Output stats
210#
211if myid == 0:
212  print "Bytes transferred   time (micro seconds)"
213  print "                    min        avg        max "     
214  print "----------------------------------------------"
215     
216  for i in range(MAXI):
217    avgtime[i] = avgtime[i]/repeats*1.0e6 #Average micro seconds
218    mintime[i] = mintime[i]*1.0e6         #Min micro seconds       
219    maxtime[i] = maxtime[i]*1.0e6         #Min micro seconds             
220         
221    m = noelem[i]
222    bytes[i] = elsize*noelem[i]       
223     
224    print "%10d    %10d %10d %10d" %(bytes[i], mintime[i], avgtime[i], maxtime[i]) 
225
226
227  Tbw, Tlat, varest = linfit(bytes, mintime)
228  print "\nLinear regression on best timings (t = t_l + t_b * bytes):\n",
229  print "  t_b = %f\n  t_l = %f" %(Tbw,Tlat)
230  print "  Estimated relative variance = %.9f\n" %varest
231       
232  print "Estimated bandwith (1/t_b):  %.3f Mb/s" %(1.0/Tbw)   
233  print "Estimated latency:           %d micro s" %int(mintime[0]-bytes[0]*Tbw) 
234
235   
236pypar.Finalize()
237
238
239
240
241 
242 
Note: See TracBrowser for help on using the repository browser.