source: pypar/contrib/pypar.py @ 900

Last change on this file since 900 was 85, checked in by ole, 19 years ago

Added pypar files

File size: 19.2 KB
Line 
1# =============================================================================
2# pypar.py - Parallel Python using MPI
3# Copyright (C) 2001, 2002 Ole M. Nielsen, Gian Paolo Ciceri         
4#
5#    This program is free software; you can redistribute it and/or modify
6#    it under the terms of the GNU General Public License as published by
7#    the Free Software Foundation; either version 2 of the License, or
8#    (at your option) any later version.
9#
10#    This program is distributed in the hope that it will be useful,
11#    but WITHOUT ANY WARRANTY; without even the implied warranty of
12#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13#    GNU General Public License (http://www.gnu.org/copyleft/gpl.html)
14#    for more details.
15#
16#    You should have received a copy of the GNU General Public License
17#    along with this program; if not, write to the Free Software
18#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
19#
20#
21# Contact addresses: Ole.Nielsen@anu.edu.au, gp.ciceri@acm.org         
22#
23# version 1.2.1, 16 February 2002                                     
24#   Status block, MPI_ANY_TAG, MPI_ANY_SOURCE exported                 
25# Version 1.2, 15 February 2002                                           
26#   Scatter added by Gian Paolo Ciceri                                   
27# Version 1.1, 14 February 2002                                           
28#   Bcast added by Gian Paolo Ciceri                                   
29# Version 1.0.2, 10 February 2002                                         
30#   Modified by Gian Paulo Ciceri to allow pypar run under Python 2.2 
31# Version 1.0.1, 8 February 2002                                       
32#   Modified to install on SUN enterprise systems under Mpich         
33# Version 1.0, 7 February 2002                                         
34#   First public release for Python 2.1 (OMN)                         
35# =============================================================================
36
37"""Module pypar.py - Parallel Python using MPI
38
39Public functions:
40
41size() -- Number of processors
42rank() -- Id of current processor
43Get_processor_name() -- Return host name of current node
44
45send() -- Blocking send (all types)
46receive() -- Blocking receive (all types)
47raw_send() -- Blocking send (Numeric arrays and strings)
48raw_receive() -- Blocking receive (Numeric arrays and strings)
49bcast() -- Broadcast
50Wtime() -- MPI wall time
51Barrier() -- Synchronisation point. Makes processors wait until all processors
52             have reached this point.
53Abort() -- Terminate all processes.
54Finalize() -- Cleanup MPI. No parallelism can take place after this point.
55
56
57See doc strings of individual functions for detailed documentation.
58"""
59
60import os, sys
61
62# -----------------------------------------------------------------------------
63# Options directory with default values - to be set by user
64#
65
66options = { 
67  'vanilla_bufsize': None   # Buffersize (chars) used for vanilla sends
68}
69
70# Constants
71#
72control_tag = 32            # Tag used to identify control information
73control_data_max_size = 256 # Maximal size of string holding control data
74
75
76#------------------------------------------------------------------------
77# MPI Status block (can be queried by user after each receive
78#------------------------------------------------------------------------
79
80class Status:
81  def __init__(self):
82    pass
83 
84  def set_values(self, status_tuple):
85    self.source = status_tuple[0] 
86    self.tag = status_tuple[1] 
87    self.error = status_tuple[2] 
88    self.length = status_tuple[3]
89   
90status = Status() #Initialise status object     
91
92#---------------------------------------------------------------------------
93# Communication functions
94#--------------------------------------------------------------------------
95
96
97def raw_send(x, destination, tag=0, vanilla=0):
98  """Wrapper for raw MPI send.
99     Send x to destination with tag.
100     
101     Automatically determine appropriate protocol
102     and call corresponding send function.
103     
104     The variable x can be any (picklable) type, but
105     Numeric variables and text strings will most efficient.
106     Setting vanilla = 1 forces vanilla mode for any type.
107
108  """
109
110  protocol = get_control_info(x, vanilla)[0]
111  if protocol == 'array':
112    send_array(x, destination, tag) 
113  elif protocol == 'string':
114    send_string(x, destination, tag)           
115  else: 
116    send_vanilla(x, destination, tag)
117
118     
119def raw_receive(x, source, tag=0, vanilla=0):
120  """Wrapper for raw MPI receive.
121     Receive something of same size as x from source with tag.
122     
123     Automatically determine appropriate protocol
124     and call corresponding receive function.
125     
126     The variable x can be any (picklable) type, but
127     Numeric variables and text strings will most efficient.
128     Setting vanilla = 1 forces vanilla mode for any type.
129  """
130
131 
132  protocol = get_control_info(x, vanilla)[0]
133  if protocol == 'array':
134    err, stat = receive_array(x, source, tag) 
135    if not err:
136      status.set_values(stat)
137    else:
138      raise 'receive_array failed with error code %d' %err 
139  elif protocol == 'string':
140    err, stat = receive_string(x, source, tag)
141    if not err:
142      status.set_values(stat)
143    else:
144      raise 'receive_string failed with error code %d' %err 
145  else: 
146    x = receive_vanilla(x, source, tag)
147
148  return x
149
150
151def send(x, destination, tag=0, vanilla=0):
152  """Wrapper for easy MPI send.
153     Send x to destination with tag.
154     
155     Automatically determine appropriate protocol
156     and call corresponding send function.
157     Also passes type and size information on as preceding message to
158     simplify the receive call.
159     
160     The variable x can be any (picklable) type, but
161     Numeric variables and text strings will most efficient.
162     Setting vanilla = 1 forces vanilla mode for any type.
163
164  """
165  import string
166
167  control_info = get_control_info(x, vanilla)
168  protocol = control_info[0]
169 
170  if protocol == 'array':
171    send_control_info(control_info, destination)
172   
173    send_array(x, destination, tag)   
174  elif protocol == 'string':
175    send_control_info(control_info, destination)   
176   
177    send_string(x, destination, tag)         
178  elif protocol == 'vanilla':
179    from cPickle import dumps     
180    s = dumps(x, 1)
181    control_info[2] = str(len(s))
182
183    send_control_info(control_info, destination)   
184   
185    send_string(s, destination, tag)
186  else:
187    raise "Unknown values for protocol: %s" %protocol   
188
189     
190def receive(source, tag=0):
191  """Wrapper for easy MPI receive.
192     Receive data from source with tag.
193     
194     Assumes preceding message containing protocol, type, size.
195     Create appropriate buffer and receive data.
196  """
197
198  control_info = receive_control_info(source)
199 
200  protocol = control_info[0]
201  typecode = control_info[1]
202  size =     control_info[2] 
203 
204  if protocol == 'array':
205    import Numeric
206    x = Numeric.zeros(size,typecode)
207    err, stat = receive_array(x, source, tag)   
208    if not err:
209      status.set_values(stat)
210    else:
211      raise 'receive_array failed with error code %d' %err 
212  elif protocol == 'string':
213    x = ' '*size
214    err, stat = receive_string(x, source, tag)       
215    if not err:
216      status.set_values(stat)
217    else:
218      raise 'receive_string failed with error code %d' %err 
219  elif protocol == 'vanilla':
220    from cPickle import loads
221    s = ' '*size   
222    err, stat = receive_string(s, source, tag)
223    if not err:
224      status.set_values(stat)
225    else:
226      raise 'receive_string failed with error code %d' %err 
227   
228    x = loads(s)
229  else:
230    raise "Unknown values for protocol: %s" %protocol
231     
232  return x 
233
234def bcast(x, source, vanilla=0):
235  """Wrapper for MPI bcast.
236     Broadcast x from source.
237     
238     Automatically determine appropriate protocol
239     and call corresponding send function.
240     
241     The variable x can be any (picklable) type, but
242     Numeric variables and text strings will most efficient.
243     Setting vanilla = 1 forces vanilla mode for any type.
244
245  """
246
247  protocol = get_control_info(x, vanilla)[0]
248  if protocol == 'array':
249    bcast_array(x, source)   
250  elif protocol == 'string':
251    bcast_string(x, source)         
252  elif protocol == 'vanilla':
253    from cPickle import loads, dumps
254    s = dumps(x, 1)
255    s = s + ' '*int(0.1*len(s)) #safety
256    bcast_string(s, source)
257    x = loads(s)
258  else:
259    raise "Unknown values for protocol: %s" %protocol 
260   
261  return x         
262
263def raw_scatter(s, nums, d, source, vanilla=0):
264  """Wrapper for MPI scatter.
265     Scatter s in nums element to d (of the same nums size) from source.
266     
267     Automatically determine appropriate protocol
268     and call corresponding send function.
269     
270     The variable s can be any (picklable) type, but
271     Numeric variables and text strings will most efficient.
272     Setting vanilla = 1 forces vanilla mode for any type.
273
274  """
275
276  protocol = get_control_info(s, vanilla)[0]
277  if protocol == 'array':
278    scatter_array(s, nums, d, source)   
279  elif protocol == 'string':
280    scatter_string(s, nums, d, source)         
281  elif protocol == 'vanilla':
282    raise "Protocol: %s unsupported for scatter" %protocol
283  else:
284    raise "Unknown values for protocol: %s" %protocol 
285   
286  return d         
287
288
289def scatter(s, nums, source, vanilla=0):
290  """Wrapper for easy MPI Scatter receive.
291     Receive data from source with tag.
292     
293     Create appropriate buffer and receive data.
294  """
295
296  control_info = get_control_info(s)
297 
298  protocol = control_info[0]
299  typecode = control_info[1]
300  size =  nums   
301 
302  if protocol == 'array':
303    import Numeric
304    x = Numeric.zeros(size,typecode)
305    scatter_array(s, size, x, source)   
306  elif protocol == 'string':
307    x = ' '*size
308    scatter_string(s, size, x, source)         
309  elif protocol == 'vanilla':
310    raise "Protocol: %s unsupported for scatter" %protocol
311  else:
312    raise "Unknown values for protocol: %s" %protocol
313     
314  return x 
315
316def raw_gather(s, nums, d, source, vanilla=0):
317  """Wrapper for MPI gather.
318     Gather s in nums element to d (of the same nums size) from source.
319     
320     Automatically determine appropriate protocol
321     and call corresponding send function.
322     
323     The variable s can be any (picklable) type, but
324     Numeric variables and text strings will most efficient.
325     Setting vanilla = 1 forces vanilla mode for any type.
326
327  """
328
329  protocol = get_control_info(s, vanilla)[0]
330  if protocol == 'array':
331    gather_array(s, nums, d, source)   
332  elif protocol == 'string':
333    gather_string(s, nums, d, source)         
334  elif protocol == 'vanilla':
335    raise "Protocol: %s unsupported for gather" %protocol
336  else:
337    raise "Unknown values for protocol: %s" %protocol 
338   
339  return d         
340
341
342def gather(s, nums, source, vanilla=0):
343  """Wrapper for easy MPI Gather receive.
344     Receive data from source with tag.
345     
346     Create appropriate buffer and receive data.
347  """
348
349  control_info = get_control_info(s)
350 
351  protocol = control_info[0]
352  typecode = control_info[1]
353  s_size =  nums   
354 
355  if protocol == 'array':
356    import Numeric
357    x = Numeric.zeros(s_size * size(),typecode)
358    gather_array(s, s_size, x, source)   
359  elif protocol == 'string':
360    x = ' '*s_size*size()
361    gather_string(s, s_size, x, source)         
362  elif protocol == 'vanilla':
363    raise "Protocol: %s unsupported for gather" %protocol
364  else:
365    raise "Unknown values for protocol: %s" %protocol
366     
367  return x 
368
369
370def raw_reduce(s, d, nums, op, source, vanilla=0):
371  """Wrapper for MPI_Reduce.
372     Reduce s in nums element to d (of the same nums size) at source
373     applying operation op.
374     
375     Automatically determine appropriate protocol
376     and call corresponding send function.
377   
378  """
379
380  protocol = get_control_info(s, vanilla)[0]
381  if protocol == 'array':
382    reduce_array(s, d, nums, op, source)         
383  elif (protocol == 'vanilla' or protocol == 'string'):
384    raise "Protocol: %s unsupported for reduce" %protocol
385  else:
386    raise "Unknown values for protocol: %s" %protocol 
387   
388  return d         
389
390
391def reduce(s, nums, op, source, vanilla=0):
392  """Wrapper for easy MPI Gather receive.
393     Receive data from source with tag.
394     
395     Create appropriate buffer and receive data.
396  """
397
398  control_info = get_control_info(s)
399 
400  protocol = control_info[0]
401  typecode = control_info[1]
402  s_size =  nums   
403 
404  if protocol == 'array':
405    import Numeric
406    x = Numeric.zeros(s_size * size(),typecode)
407    reduce_array(s, x, s_size, op, source)   
408  elif (protocol == 'vanilla' or protocol == 'string'):
409    raise "Protocol: %s unsupported for reduce" %protocol
410  else:
411    raise "Unknown values for protocol: %s" %protocol
412     
413  return x 
414
415
416     
417
418#---------------------------------------------------------
419# AUXILIARY FUNCTIONS
420#---------------------------------------------------------
421def get_control_info(x, vanilla=0):
422  """Determine which protocol to use for communication:
423     (Numeric) arrays, strings, or vanilla based x's type.
424
425     There are three protocols:
426     'array':   Numeric arrays of type 'i', 'l', 'f', or 'd' can be communicated
427                with mpi.send_array and mpi.receive_array.
428     'string':  Text strings can be communicated with mpi.send_string and
429                mpi.receive_string.
430     'vanilla': All other types can be communicated using the scripts send_vanilla and
431                receive_vanilla provided that the objects can be serialised using
432                pickle (or cPickle). The latter mode is less efficient than the
433                first two but it can handle complex structures.
434
435     Rules:
436     If keyword argument vanilla == 1, vanilla is chosen regardless of
437     x's type.
438     Otherwise if x is a string, the string protocol is chosen
439     If x is an array, the 'array' protocol is chosen provided that x has one
440     of the admissible typecodes.
441  """
442
443  protocol = 'vanilla'
444  typecode = ' '
445  size = '0'
446 
447  if not vanilla:
448    #if type(x).__name__ == 'string':  # OK in Python 2.1 but not 2.2
449    if type(x).__name__[0:3] == 'str': # Fixed by Gian Paolo Ciceri 10/2/2   
450      protocol = 'string'
451      typecode = 'c'
452      size = len(x)
453    elif type(x).__name__ == 'array':
454      try:
455        import Numeric
456
457        typecode = x.typecode() 
458        if typecode in ['i', 'l', 'f', 'd']:
459          protocol = 'array'
460          size = len(x)
461        else:   
462          print "WARNING (pypar.py): Numeric object type %s is not supported."\
463                %(x.typecode())
464          print "Only types 'i', 'l', 'f', 'd' are supported,",
465          print "Reverting to vanilla mode."
466          protocol = 'vanilla'
467 
468      except:
469        print "WARNING (pypar.py): Numeric module could not be imported,",
470        print "reverting to vanilla mode"
471        protocol = 'vanilla'
472
473
474  control_info = [protocol, typecode, str(size)]
475
476  return control_info
477
478
479
480#----------------------------------------------
481
482def send_control_info(control_info, destination):
483  """Send control info to destination
484  """
485  import string
486 
487  msg = string.join(control_info,',')
488  send_string(msg, destination, control_tag)
489
490 
491def receive_control_info(source):
492  """Receive control info from source
493  """
494  import string
495 
496  msg = ' '*control_data_max_size
497  err, stat = receive_string(msg, source, control_tag)
498  if err:
499    raise Exception
500  #NB: Do not set status block here 
501
502  control_info = msg.split(',')
503  assert len(control_info) == 3
504  control_info[2] = int(control_info[2])
505
506  return control_info
507
508
509
510
511#
512# Used only by raw communication
513#
514def send_vanilla(x, destination, tag=0):
515  from cPickle import dumps
516  from mpi import send_string as send
517 
518  s=dumps(x, 1)
519  send(s, destination, tag)     
520  return len(s)
521
522
523def receive_vanilla(x, source, tag=0):
524  from cPickle import loads, dumps
525  from mpi import receive_string as receive
526
527
528  #Create buffer of the right size
529  #(assuming that x is similar to sent x).
530
531  if options['vanilla_bufsize']:
532    s = ' '*options['vanilla_bufsize']
533  else: 
534    s = dumps(x, 1)
535    s = s + ' '*int(0.1*len(s)) #safety
536                 
537  receive(s, source, tag)
538
539  return loads(s)
540   
541
542#-----------------------
543# ADM
544#-----------------------
545
546def check_C_extension(sequential_allowed):
547  """Verify existence of mpi.so.
548  """   
549 
550
551  try:
552    if sequential_allowed: 
553      import mpi
554      # This will crash on systems like the Alpha Server or the Sun
555      # if program is run sequentially     
556    else: 
557      # A more general test suitable for the Alpha Server is
558      fid = open('mpi.so', 'r')
559      fid.close()
560      # On the other hand, when using pypar as a package, we do
561      # not have access to mpi.so so we need link from parent directory to
562      # pypar/mpi.so or pypar/mpi.c
563  except:
564    try:
565      import compile
566      compile.compile('mpi.c', 'mpicc', verbose = 0)
567    except:
568      raise "ERROR: Please compile C extension mpi.c - python install.py"
569 
570
571
572#----------------------------------------------------------------------------
573# Initialise module
574#----------------------------------------------------------------------------
575
576# Determine if MPI program is allowed to run sequentially
577# Attempting to check this automatically may case some systems to hang.
578#
579if sys.platform in ['osf1V5', 'sunos5']:  #Compaq AlphaServer or Sun
580  sequential_allowed = 0
581else: 
582  sequential_allowed = 1
583
584#print sequential_allowed 
585
586# Attempt to compile mpi.so if necessary
587#
588
589check_C_extension(sequential_allowed) 
590
591
592#Check if MPI module can be initialised
593#
594# Attempt to import and initialise mpi.so
595# If this fails, define a rudimentary interface suitable for
596# sequential execution.
597#
598#   
599
600#
601
602
603if sys.platform == 'osf1V5':
604  error = os.system('python -c "import mpi" >/dev/null 2>/dev/null') 
605 
606  # On the sun we can't do this test - even in the parallel case...
607  #
608  #if sys.platform == 'osf1V5':
609  #  error = os.system('python -c "import mpi" >/dev/null 2>/dev/null')
610  #else:
611  #  error = 0 
612
613  # The check is performed in a separate shell.
614  # Reason: The Alpha server or the Sun cannot recover from a
615  # try:
616  #   import mpi
617  # if mpi.so isn't there or if the program is run as sequential.
618  # On LAM/Linux, this test may cause system to hang.
619else:
620  error = 0 
621
622
623if error:
624  print "WARNING: MPI library could not be initialised - running sequentially"
625  parallel = 0
626
627  # Define rudimentary functions to keep sequential programs happy
628 
629  def size(): return 1
630  def rank(): return 0
631
632  def Get_processor_name():
633    import os
634    try:
635      hostname = os.environ['HOST']
636    except:
637      try: 
638        hostname = os.environ['HOSTNAME'] 
639      except:
640        hostname = 'Unknown' 
641
642    return hostname
643     
644
645  def Abort():
646    import sys
647    sys.exit()
648
649  def Finalize(): pass
650 
651  def Barrier(): pass 
652
653  def Wtime():
654    import time
655    return time.time()
656else:   
657  #Import the C-extension and initialise MPI
658  #
659  parallel = 1
660  from mpi import size, rank, Barrier, Wtime, Get_processor_name, Finalize, Abort, \
661                  send_string, receive_string,\
662                  send_array, receive_array, \
663                  bcast_string, bcast_array, \
664                  scatter_string, scatter_array, \
665                  gather_string, gather_array, \
666                  reduce_array, \
667                  MPI_ANY_TAG as any_tag, MPI_ANY_SOURCE as any_source, \
668                  mpi_MAX, mpi_MIN, mpi_SUM, mpi_PROD, mpi_LAND, mpi_BAND, \
669                  mpi_LOR, mpi_BOR, mpi_LXOR, mpi_BXOR, mpi_MAXLOC, mpi_MINLOC, mpi_REPLACE
670
671                 
672  print "Proc %d: MPI initialised OK" %rank()   
673                 
674
675
676
677
678
Note: See TracBrowser for help on using the repository browser.