source: pypar/contrib/frost_pypar.py @ 659

Last change on this file since 659 was 85, checked in by ole, 19 years ago

Added pypar files

File size: 18.0 KB
Line 
1# =============================================================================
2# pypar.py - Parallel Python using MPI
3# Copyright (C) 2001, 2002 Ole M. Nielsen
4#              (Center for Mathematics and its Applications ANU and APAC)
5#              and Gian Paolo Ciceri (Milano, Italy)         
6#
7#    This program is free software; you can redistribute it and/or modify
8#    it under the terms of the GNU General Public License as published by
9#    the Free Software Foundation; either version 2 of the License, or
10#    (at your option) any later version.
11#
12#    This program is distributed in the hope that it will be useful,
13#    but WITHOUT ANY WARRANTY; without even the implied warranty of
14#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15#    GNU General Public License (http://www.gnu.org/copyleft/gpl.html)
16#    for more details.
17#
18#    You should have received a copy of the GNU General Public License
19#    along with this program; if not, write to the Free Software
20#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
21#
22#
23# Contact addresses: Ole.Nielsen@anu.edu.au, gp.ciceri@acm.org         
24#
25# version 1.6, 18 October 2002                                     
26# =============================================================================
27
28"""Module pypar.py - Parallel Python using MPI
29
30Public functions:
31
32size() -- Number of processors
33rank() -- Id of current processor
34Get_processor_name() -- Return host name of current node
35
36send() -- Blocking send (all types)
37receive() -- Blocking receive (all types)
38raw_send() -- Blocking send (Numeric arrays and strings)
39raw_receive() -- Blocking receive (Numeric arrays and strings)
40bcast() -- Broadcast
41Wtime() -- MPI wall time
42Barrier() -- Synchronisation point. Makes processors wait until all processors
43             have reached this point.
44Abort() -- Terminate all processes.
45Finalize() -- Cleanup MPI. No parallelism can take place after this point.
46
47
48See doc strings of individual functions for detailed documentation.
49"""
50
51import os, sys
52
53# -----------------------------------------------------------------------------
54# Options directory with default values - to be set by user
55#
56
57options = { 
58  'vanilla_bufsize': None,   # Buffersize (chars) used for vanilla sends
59  'verbose': 0
60}
61
62# Constants
63#
64control_tag = 32            # Tag used to identify control information
65control_data_max_size = 256 # Maximal size of string holding control data
66
67
68#------------------------------------------------------------------------
69# MPI Status block (can be queried by user after each receive
70#------------------------------------------------------------------------
71
72class Status:
73  def __init__(self):
74    pass
75 
76  def set_values(self, status_tuple):
77    self.source = status_tuple[0] 
78    self.tag = status_tuple[1] 
79    self.error = status_tuple[2] 
80    self.length = status_tuple[3]
81   
82status = Status() #Initialise status object     
83
84#---------------------------------------------------------------------------
85# Communication functions
86#--------------------------------------------------------------------------
87
88
89def raw_send(x, destination, tag=0, vanilla=0):
90  """Wrapper for raw MPI send.
91     Send x to destination with tag.
92     
93     Automatically determine appropriate protocol
94     and call corresponding send function.
95     
96     The variable x can be any (picklable) type, but
97     Numeric variables and text strings will most efficient.
98     Setting vanilla = 1 forces vanilla mode for any type.
99
100  """
101
102  protocol = get_control_info(x, vanilla)[0]
103  if protocol == 'array':
104    send_array(x, destination, tag) 
105  elif protocol == 'string':
106    send_string(x, destination, tag)           
107  else: 
108    send_vanilla(x, destination, tag)
109
110     
111def raw_receive(x, source, tag=0, vanilla=0):
112  """Wrapper for raw MPI receive.
113     Receive something of same size as x from source with tag.
114     
115     Automatically determine appropriate protocol
116     and call corresponding receive function.
117     
118     The variable x can be any (picklable) type, but
119     Numeric variables and text strings will most efficient.
120     Setting vanilla = 1 forces vanilla mode for any type.
121  """
122
123 
124  protocol = get_control_info(x, vanilla)[0]
125  if protocol == 'array':
126    err, stat = receive_array(x, source, tag) 
127    if not err:
128      status.set_values(stat)
129    else:
130      raise 'receive_array failed with error code %d' %err 
131  elif protocol == 'string':
132    err, stat = receive_string(x, source, tag)
133    if not err:
134      status.set_values(stat)
135    else:
136      raise 'receive_string failed with error code %d' %err 
137  else: 
138    x = receive_vanilla(x, source, tag)
139
140  return x
141
142
143def send(x, destination, tag=0, vanilla=0):
144  """Wrapper for easy MPI send.
145     Send x to destination with tag.
146     
147     Automatically determine appropriate protocol
148     and call corresponding send function.
149     Also passes type and size information on as preceding message to
150     simplify the receive call.
151     
152     The variable x can be any (picklable) type, but
153     Numeric variables and text strings will most efficient.
154     Setting vanilla = 1 forces vanilla mode for any type.
155
156  """
157  import string
158
159  control_info = get_control_info(x, vanilla)
160  protocol = control_info[0]
161 
162  if protocol == 'array':
163    send_control_info(control_info, destination)
164   
165    send_array(x, destination, tag)   
166  elif protocol == 'string':
167    send_control_info(control_info, destination)   
168   
169    send_string(x, destination, tag)         
170  elif protocol == 'vanilla':
171    from cPickle import dumps     
172    s = dumps(x, 1)
173    control_info[2] = str(len(s))
174
175    send_control_info(control_info, destination)   
176   
177    send_string(s, destination, tag)
178  else:
179    raise "Unknown values for protocol: %s" %protocol   
180
181     
182def receive(source, tag=0):
183  """Wrapper for easy MPI receive.
184     Receive data from source with tag.
185     
186     Assumes preceding message containing protocol, type, size.
187     Create appropriate buffer and receive data.
188  """
189
190  control_info = receive_control_info(source)
191 
192  protocol = control_info[0]
193  typecode = control_info[1]
194  size =     control_info[2] 
195 
196  if protocol == 'array':
197    import Numeric
198    x = Numeric.zeros(size,typecode)
199    err, stat = receive_array(x, source, tag)   
200    if not err:
201      status.set_values(stat)
202    else:
203      raise 'receive_array failed with error code %d' %err 
204  elif protocol == 'string':
205    x = ' '*size
206    err, stat = receive_string(x, source, tag)       
207    if not err:
208      status.set_values(stat)
209    else:
210      raise 'receive_string failed with error code %d' %err 
211  elif protocol == 'vanilla':
212    from cPickle import loads
213    s = ' '*size   
214    err, stat = receive_string(s, source, tag)
215    if not err:
216      status.set_values(stat)
217    else:
218      raise 'receive_string failed with error code %d' %err 
219   
220    x = loads(s)
221  else:
222    raise "Unknown values for protocol: %s" %protocol
223     
224  return x 
225
226def bcast(x, source, vanilla=0):
227  """Wrapper for MPI bcast.
228     Broadcast x from source.
229     
230     Automatically determine appropriate protocol
231     and call corresponding send function.
232     
233     The variable x can be any (picklable) type, but
234     Numeric variables and text strings will most efficient.
235     Setting vanilla = 1 forces vanilla mode for any type.
236
237  """
238
239  protocol = get_control_info(x, vanilla)[0]
240  if protocol == 'array':
241    bcast_array(x, source)   
242  elif protocol == 'string':
243    bcast_string(x, source)         
244  elif protocol == 'vanilla':
245    from cPickle import loads, dumps
246    s = dumps(x, 1)
247    s = s + ' '*int(0.1*len(s)) #safety
248    bcast_string(s, source)
249    x = loads(s)
250  else:
251    raise "Unknown values for protocol: %s" %protocol 
252   
253  return x         
254
255def raw_scatter(s, nums, d, source, vanilla=0):
256  """Wrapper for MPI scatter.
257     Scatter the first nums elements in s to processor d
258     (of the same nums size) from source.
259     
260     Automatically determine appropriate protocol
261     and call corresponding send function.
262     
263     The variable s can be any (picklable) type, but
264     Numeric variables and text strings will most efficient.
265     Setting vanilla = 1 forces vanilla mode for any type.
266
267  """
268
269  protocol = get_control_info(s, vanilla)[0]
270  if protocol == 'array':
271    scatter_array(s, nums, d, source)   
272  elif protocol == 'string':
273    scatter_string(s, nums, d, source)         
274  elif protocol == 'vanilla':
275    raise "Protocol: %s unsupported for scatter" %protocol
276  else:
277    raise "Unknown values for protocol: %s" %protocol 
278   
279  return d         
280
281
282def scatter(s, nums, source, vanilla=0):
283  """Wrapper for easy MPI Scatter receive.
284     Receive data from source with tag.
285     
286     Create appropriate buffer and receive data.
287  """
288
289  control_info = get_control_info(s)
290 
291  protocol = control_info[0]
292  typecode = control_info[1]
293  size =  nums   
294 
295  if protocol == 'array':
296    import Numeric
297    x = Numeric.zeros(size,typecode)
298    scatter_array(s, size, x, source)   
299  elif protocol == 'string':
300    x = ' '*size
301    scatter_string(s, size, x, source)         
302  elif protocol == 'vanilla':
303    raise "Protocol: %s unsupported for scatter" %protocol
304  else:
305    raise "Unknown values for protocol: %s" %protocol
306     
307  return x 
308
309def raw_gather(s, nums, d, source, vanilla=0):
310  """Wrapper for MPI gather.
311     Gather first nums elements in s to d (of the same size) from source.
312     
313     Automatically determine appropriate protocol
314     and call corresponding send function.
315     
316     The variable s can be any (picklable) type, but
317     Numeric variables and text strings will most efficient.
318     Setting vanilla = 1 forces vanilla mode for any type.
319
320  """
321
322  protocol = get_control_info(s, vanilla)[0]
323  if protocol == 'array':
324    gather_array(s, nums, d, source)   
325  elif protocol == 'string':
326    gather_string(s, nums, d, source)         
327  elif protocol == 'vanilla':
328    raise "Protocol: %s unsupported for gather" %protocol
329  else:
330    raise "Unknown values for protocol: %s" %protocol 
331   
332  return d         
333
334
335def gather(s, nums, source, vanilla=0):
336  """Wrapper for easy MPI Gather receive.
337     Receive data from source with tag.
338     
339     Create appropriate buffer and receive data.
340  """
341
342  control_info = get_control_info(s)
343 
344  protocol = control_info[0]
345  typecode = control_info[1]
346  s_size =  nums   
347 
348  if protocol == 'array':
349    import Numeric
350    x = Numeric.zeros(s_size * size(),typecode)
351    gather_array(s, s_size, x, source)   
352  elif protocol == 'string':
353    x = ' '*s_size*size()
354    gather_string(s, s_size, x, source)         
355  elif protocol == 'vanilla':
356    raise "Protocol: %s unsupported for gather" %protocol
357  else:
358    raise "Unknown values for protocol: %s" %protocol
359     
360  return x 
361
362
363
364def raw_reduce(s, d, nums, op, source, vanilla=0):
365  """Wrapper for MPI_Reduce.
366     Reduce nums elements in s to d (of the same size) at source
367     applying operation op.
368     
369     Automatically determine appropriate protocol
370     and call corresponding send function.
371   
372  """
373
374  protocol = get_control_info(s, vanilla)[0]
375  if protocol == 'array':
376    if not s.typecode() == d.typecode():
377      raise "Input array and buffer must have the same typecode"
378    reduce_array(s, d, nums, op, source)         
379  elif (protocol == 'vanilla' or protocol == 'string'):
380    raise "Protocol: %s unsupported for reduce" %protocol
381  else:
382    raise "Unknown values for protocol: %s" %protocol 
383   
384  return d         
385
386
387def reduce(s, nums, op, source, vanilla=0):
388  """Wrapper for easy MPI Gather receive.
389     Receive data from source with tag.
390     
391     Create appropriate buffer and receive data.
392  """
393
394  control_info = get_control_info(s)
395 
396  protocol = control_info[0]
397  typecode = control_info[1]
398  s_size =  nums   
399 
400  if protocol == 'array':
401    import Numeric
402    x = Numeric.zeros(s_size * size(),typecode)
403    reduce_array(s, x, s_size, op, source)   
404  elif (protocol == 'vanilla' or protocol == 'string'):
405    raise "Protocol: %s unsupported for reduce" %protocol
406  else:
407    raise "Unknown values for protocol: %s" %protocol
408     
409  return x 
410
411
412     
413
414#---------------------------------------------------------
415# AUXILIARY FUNCTIONS
416#---------------------------------------------------------
417def get_control_info(x, vanilla=0):
418  """Determine which protocol to use for communication:
419     (Numeric) arrays, strings, or vanilla based x's type.
420
421     There are three protocols:
422     'array':   Numeric arrays of type 'i', 'l', 'f', or 'd' can be communicated
423                with mpiext.send_array and mpiext.receive_array.
424     'string':  Text strings can be communicated with mpiext.send_string and
425                mpiext.receive_string.
426     'vanilla': All other types can be communicated using the scripts send_vanilla and
427                receive_vanilla provided that the objects can be serialised using
428                pickle (or cPickle). The latter mode is less efficient than the
429                first two but it can handle complex structures.
430
431     Rules:
432     If keyword argument vanilla == 1, vanilla is chosen regardless of
433     x's type.
434     Otherwise if x is a string, the string protocol is chosen
435     If x is an array, the 'array' protocol is chosen provided that x has one
436     of the admissible typecodes.
437  """
438
439  protocol = 'vanilla'
440  typecode = ' '
441  size = '0'
442 
443  if not vanilla:
444    #if type(x).__name__ == 'string':  # OK in Python 2.1 but not 2.2
445    if type(x).__name__[0:3] == 'str': # Fixed by Gian Paolo Ciceri 10/2/2   
446      protocol = 'string'
447      typecode = 'c'
448      size = len(x)
449    elif type(x).__name__ == 'array':
450      try:
451        import Numeric
452
453        typecode = x.typecode() 
454        if typecode in ['i', 'l', 'f', 'd']:
455          protocol = 'array'
456          size = len(x)
457        else:   
458          print "WARNING (pypar.py): Numeric object type %s is not supported."\
459                %(x.typecode())
460          print "Only types 'i', 'l', 'f', 'd' are supported,",
461          print "Reverting to vanilla mode."
462          protocol = 'vanilla'
463 
464      except:
465        print "WARNING (pypar.py): Numeric module could not be imported,",
466        print "reverting to vanilla mode"
467        protocol = 'vanilla'
468
469
470  control_info = [protocol, typecode, str(size)]
471
472  return control_info
473
474
475
476#----------------------------------------------
477
478def send_control_info(control_info, destination):
479  """Send control info to destination
480  """
481  import string
482 
483  msg = string.join(control_info,',')
484  send_string(msg, destination, control_tag)
485
486 
487def receive_control_info(source):
488  """Receive control info from source
489  """
490  import string
491 
492  msg = ' '*control_data_max_size
493  err, stat = receive_string(msg, source, control_tag)
494  if err:
495    raise Exception
496  #NB: Do not set status block here 
497
498  control_info = msg.split(',')
499  assert len(control_info) == 3
500  control_info[2] = int(control_info[2])
501
502  return control_info
503
504
505
506
507#
508# Used only by raw communication
509#
510def send_vanilla(x, destination, tag=0):
511  from cPickle import dumps
512  from mpiext import send_string as send
513 
514  s=dumps(x, 1)
515  send(s, destination, tag)     
516  return len(s)
517
518
519def receive_vanilla(x, source, tag=0):
520  from cPickle import loads, dumps
521  from mpiext import receive_string as receive
522
523
524  #Create buffer of the right size
525  #(assuming that x is similar to sent x).
526
527  if options['vanilla_bufsize']:
528    s = ' '*options['vanilla_bufsize']
529  else: 
530    s = dumps(x, 1)
531    s = s + ' '*int(0.1*len(s)) #safety
532                 
533  receive(s, source, tag)
534
535  return loads(s)
536   
537
538
539#----------------------------------------------------------------------------
540# Initialise module
541#----------------------------------------------------------------------------
542
543# Take care of situation where module is part of package
544import os, string, os.path
545dirname = os.path.dirname(string.replace(__name__,'.',os.sep))
546if dirname and dirname[-1] != os.sep:
547  dirname += os.sep   
548
549extension = dirname+'mpiext'
550
551# Import MPI extension
552#
553# Verify existence of mpiext.so.
554
555try:
556  import mpiext
557except:
558  errmsg = 'ERROR: compiled C extension %s.c not accessible:\n' %extension
559  errmsg += 'Please compile it e.g. by using python setup.py install\n'
560  raise Exception, errmsg
561 
562
563# Determine if MPI program is allowed to run sequentially on current platform
564# Attempting to check this automatically may case some systems to hang.
565#
566if sys.platform in ['linux2', 'sunos5', 'win32']:  #Linux (LAM,MPICH) or Sun (MPICH) or Win32 (MPICH)
567  error = 0  #Sequential execution of MPI is allowed
568else: 
569  cmdstring = '"import mpiext; import sys; mpiext.init(sys.argv)"'
570  error = os.system('python -c %s >/dev/null 2>/dev/null' %cmdstring) 
571  # The check is performed in a separate shell.
572  # Reason: The Alpha server or the Sun cannot recover from a
573  # try:
574  #   mpiext.init(sys.argv)
575
576  # On LAM/Linux, this test causes system to hang. 
577
578
579# Initialise MPI
580#
581# Attempt to initialise mpiext.so
582# If this fails, define a rudimentary interface suitable for
583# sequential execution.
584
585if error:
586  print "WARNING: MPI library could not be initialised - running sequentially"
587
588  # Define rudimentary functions to keep sequential programs happy
589
590  def size(): return 1
591  def rank(): return 0
592
593  def Get_processor_name():
594    import os
595    try:
596      hostname = os.environ['HOST']
597    except:
598      try: 
599        hostname = os.environ['HOSTNAME'] 
600      except:
601        hostname = 'Unknown' 
602
603    return hostname
604     
605
606  def Abort():
607    import sys
608    sys.exit()
609
610  def Finalize(): pass
611 
612  def Barrier(): pass 
613
614  def Wtime():
615    import time
616    return time.time()
617
618else:
619
620  from mpiext import size, rank, Barrier, Wtime, Get_processor_name,\
621                  init, Finalize, Abort, send_string, receive_string,\
622                  send_array, receive_array, bcast_string, bcast_array,\
623                  scatter_string, scatter_array,\
624                  gather_string, gather_array,\
625                  reduce_array,\
626                  MPI_ANY_TAG as any_tag, MPI_ANY_SOURCE as any_source,\
627                  MAX, MIN, SUM, PROD, LAND, BAND,\
628                  LOR, BOR, LXOR, BXOR
629
630  init(sys.argv) #Initialise MPI with cmd line (needed by MPICH/Linux) 
631
632  if rank() == 0:     
633    print "MPI initialised OK with %d processors" %size()
634
635
636
637
638
639
Note: See TracBrowser for help on using the repository browser.