Changeset 123 for pypar/pypar.py


Ignore:
Timestamp:
Jul 11, 2005, 3:20:03 PM (19 years ago)
Author:
ole
Message:

Files for 64 bit machine + latest Cvs version

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pypar/pypar.py

    r85 r123  
    3131size() -- Number of processors
    3232rank() -- Id of current processor
    33 Get_processor_name() -- Return host name of current node
     33get_processor_name() -- Return host name of current node
    3434
    3535send() -- Blocking send (all types)
    3636receive() -- Blocking receive (all types)
    37 raw_send() -- Blocking send (Numeric arrays and strings)
    38 raw_receive() -- Blocking receive (Numeric arrays and strings)
    39 bcast() -- Broadcast
    40 Wtime() -- MPI wall time
    41 Barrier() -- Synchronisation point. Makes processors wait until all processors
     37broadcast() -- Broadcast
     38time() -- MPI wall time
     39barrier() -- Synchronisation point. Makes processors wait until all processors
    4240             have reached this point.
    43 Abort() -- Terminate all processes.
    44 Finalize() -- Cleanup MPI. No parallelism can take place after this point.
     41abort() -- Terminate all processes.
     42finalize() -- Cleanup MPI. No parallelism can take place after this point.
    4543
    4644
     
    4846"""
    4947
    50 __version__ = '1.8.2'
    51 __date__ = '16 November 2003'
     48# Meta data
     49__version__ = '1.9.1'
     50__date__ = '15 December 2003'
    5251__author__ = 'Ole M. Nielsen'
    5352
    5453
    55 
    56 # -----------------------------------------------------------------------------
    57 # Options directory with default values - to be set by user
    58 #
    59 
    60 options = {
    61   'vanilla_bufsize': None,   # Buffersize (chars) used for vanilla sends
    62   'verbose': 0
    63 }
    64 
    6554# Constants
    6655#
    67 max_tag = 32767      # Max tag value. MPI_TAG_UB returned 0 so I found this value (OK in Linux/LAM)
     56max_tag = 32767      # Max tag value (MPI_TAG_UB didn't work and returned 0)
    6857control_tag = 13849  # Reserved tag used to identify control information
    6958default_tag = 1      # Tag used as default if not specified
    7059
    71 
    72 control_sep = ':'           # Separator for fields in control info (NOT ',')
    73 control_data_max_size = 256 # Maximal size of string holding control data
    74 
    75 
    76 import os, sys
    77 
    78 #------------------------------------------------------------------------
    79 # MPI Status block (returned by receive and raw_receive if specified with
    80 # parameter return_status=True
    81 #------------------------------------------------------------------------
    82 
    83 class Status:
    84   def __init__(self, status_tuple):
    85     self.source = status_tuple[0]  #Id of sender
    86     self.tag = status_tuple[1]     #Tag of received message
    87     self.error = status_tuple[2]   #Error code
    88     self.length = status_tuple[3]  #Number of elements transmitted
    89     self.size = status_tuple[4]    #Size of one element
    90 
    91   def __repr__(self):
    92     return 'Pypar Status Object:\n  source=%d\n  tag=%d\n '+\
    93            'error=%d\n  length=%d\n  size=%d\n'\
    94            %(self.source, self.tag, self.error, self.length, self.size)
    95  
    96   def bytes(self):
    97     """Number of bytes transmitted (excl control info)
    98     """
    99     return self.length * self.size
    100  
     60control_sep = ':'          # Separator for fields in control info (NOT ',')
     61control_data_max_size = 64 # Maximal size of string holding control data
     62
    10163
    10264#---------------------------------------------------------------------------
     
    10466#--------------------------------------------------------------------------
    10567
    106 
    107 def raw_send(x, destination, tag=default_tag, vanilla=0):
    108   """Wrapper for raw MPI send.
    109      Send x to destination with tag.
    110      
    111      Automatically determine appropriate protocol
    112      and call corresponding send function.
    113      
    114      The variable x can be any (picklable) type, but
    115      Numeric variables and text strings will most efficient.
    116      Setting vanilla = 1 forces vanilla mode for any type.
    117 
    118   """
    119 
    120   assert tag != control_tag, 'Tag %d is reserved by pypar - please use another.' %control_tag
    121 
    122   protocol = get_control_info(x, vanilla)[0]
    123   if protocol == 'array':
    124     send_array(x, destination, tag) 
    125   elif protocol == 'string':
    126     send_string(x, destination, tag)           
    127   else:
    128     send_vanilla(x, destination, tag)
    129 
    130      
    131 def raw_receive(x, source, tag=default_tag, vanilla=0, return_status=0):
    132   """Wrapper for raw MPI receive.
    133      Receive something of same size as x from source with tag.
    134      
    135      Automatically determine appropriate protocol
    136      and call corresponding receive function.
    137      
    138      The variable x can be any (picklable) type, but
    139      Numeric variables and text strings will most efficient.
    140      Setting vanilla = 1 forces vanilla mode for any type.
    141   """
    142  
    143   assert tag != control_tag, 'Tag %d is reserved by pypar - please use another.' %control_tag
    144  
    145   protocol = get_control_info(x, vanilla)[0]
    146   if protocol == 'array':
    147     err, stat = receive_array(x, source, tag)
    148     if err:
    149       raise 'receive_array failed with error code %d' %err 
    150   elif protocol == 'string':
    151     err, stat = receive_string(x, source, tag)
    152     if err:
    153       raise 'receive_string failed with error code %d' %err
    154 
    155     x = x[:stat[3]] #Trim buffer to actual received length     
    156   else:
    157     x, stat = receive_vanilla(x, source, tag)
    158 
    159 
    160   if return_status:
    161     return x, Status(stat)
    162   else:
    163     return x
    164 
    165 
    166 def send(x, destination, tag=default_tag, vanilla=0):
    167   """Wrapper for easy MPI send.
    168      Send x to destination with tag.
    169      
    170      Automatically determine appropriate protocol
    171      and call corresponding send function.
    172      Also passes type and size information on as preceding message to
    173      simplify the receive call.
    174      
    175      The variable x can be any (picklable) type, but
    176      Numeric variables and text strings will most efficient.
    177      Setting vanilla = 1 forces vanilla mode for any type.
    178 
    179   """
    180   import string
    181 
    182   assert tag != control_tag, 'Tag %d is reserved by pypar - please use another.' %control_tag
    183  
    184   control_info = get_control_info(x, vanilla)
    185   protocol = control_info[0]
    186  
    187   if protocol == 'array':
    188     send_control_info(control_info, destination)
    189 
    190     send_array(x, destination, tag)   
    191   elif protocol == 'string':
    192     send_control_info(control_info, destination)   
    193    
    194     send_string(x, destination, tag)         
    195   elif protocol == 'vanilla':
    196     from cPickle import dumps     
    197     s = dumps(x, 1)
    198     control_info[2] = str(len(s))
    199 
    200     send_control_info(control_info, destination)   
    201    
    202     send_string(s, destination, tag)
    203   else:
    204     raise "Unknown values for protocol: %s" %protocol   
    205 
    206      
    207 def receive(source, tag=default_tag, return_status=0):
    208   """Wrapper for easy MPI receive.
    209      Receive data from source with tag.
    210      
    211      Assumes preceding message containing protocol, type, size.
    212      Create appropriate buffer and receive data.
    213   """
    214   import types
    215  
    216   assert type(source) == types.IntType, 'Source must be a scalar: %s' %source
    217   assert tag != control_tag, 'Tag %d is reserved by pypar - please use another.' %control_tag
    218  
    219 
    220   control_info = receive_control_info(source)
    221  
    222   protocol = control_info[0]
    223   typecode = control_info[1]
    224   size =     control_info[2]
    225   shape =    control_info[3]
    226 
    227  
    228   if protocol == 'array':
    229     import Numeric
    230     x = Numeric.zeros(size,typecode)
    231     x = Numeric.reshape(x, shape)
    232     err, stat = receive_array(x, source, tag)
    233     if err:
    234       raise 'receive_array failed with error code %d' %err 
    235   elif protocol == 'string':
    236     x = ' '*size
    237     err, stat = receive_string(x, source, tag)
    238     if err:
    239       raise 'receive_string failed with error code %d' %err
    240 
    241     x = x[:stat[3]] #Trim buffer to actual received length
    242   elif protocol == 'vanilla':
    243     from cPickle import loads
    244     s = ' '*size   
    245     err, stat = receive_string(s, source, tag)
    246     if err:
    247       raise 'receive_string failed with error code %d' %err 
    248    
    249     x = loads(s)
    250   else:
    251     raise "Unknown values for protocol: %s" %protocol
    252 
    253 
    254   if return_status:
    255     return x, Status(stat)
    256   else:
    257     return x
    258 
    259 
    260 def bcast(x, source, vanilla=0):
    261   """Wrapper for MPI bcast.
    262      Broadcast x from source.
    263      
    264      Automatically determine appropriate protocol
    265      and call corresponding send function.
    266      
    267      The variable x can be any (picklable) type, but
    268      Numeric variables and text strings will most efficient.
    269      Setting vanilla = 1 forces vanilla mode for any type.
    270 
    271   """
    272 
    273   protocol = get_control_info(x, vanilla)[0]
    274   if protocol == 'array':
    275     bcast_array(x, source)   
    276   elif protocol == 'string':
    277     bcast_string(x, source)         
    278   elif protocol == 'vanilla':
    279     from cPickle import loads, dumps
    280     s = dumps(x, 1)
    281     s = s + ' '*int(0.1*len(s)) #safety
    282     bcast_string(s, source)
    283     x = loads(s)
    284   else:
    285     raise "Unknown values for protocol: %s" %protocol 
    286    
    287   return x         
    288 
    289 def raw_scatter(x, buffer, source, vanilla=0):
    290   """Wrapper for MPI scatter.
    291      Scatter the first nums elements in x to buffer
    292      (of size given by nums) from source.
    293      
    294      Automatically determine appropriate protocol
    295      and call corresponding send function.
    296      
    297      The variable x can be any (picklable) type, but
    298      Numeric variables and text strings will most efficient.
    299      Setting vanilla = 1 forces vanilla mode for any type.
    300 
    301      Return buffer
    302   """
    303 
    304   protocol = get_control_info(x, vanilla)[0]
    305   if protocol == 'array':
    306     scatter_array(x, buffer, source)   
    307   elif protocol == 'string':
    308     scatter_string(x, buffer, source)         
    309   elif protocol == 'vanilla':
    310     raise "Protocol: %s unsupported for scatter" %protocol
    311   else:
    312     raise "Unknown values for protocol: %s" %protocol 
    313    
    314   return buffer 
    315 
    316 
    317 def scatter(x, source, vanilla=0):
    318   """Wrapper for easy MPI Scatter receive.
    319      Receive data from source with tag.
    320      
    321      Create appropriate buffer and receive data.
    322 
    323      Return scattered result (same type as x)
    324   """
    325 
    326   control_info = get_control_info(x)
    327  
    328   protocol = control_info[0]
    329   typecode = control_info[1]
    330   size =     control_info[2]
    331   shape =    control_info[3]
    332  
    333  
    334   if protocol == 'array':
    335     import Numeric
    336     buffer = Numeric.zeros(size, typecode)
    337     buffer = Numeric.reshape(buffer, shape)
    338    
    339     scatter_array(x, buffer, source)   
    340   elif protocol == 'string':
    341     buffer = ' '*size
    342     scatter_string(x, buffer, source)         
    343   elif protocol == 'vanilla':
    344     raise "Protocol: %s unsupported for scatter" %protocol
    345   else:
    346     raise "Unknown values for protocol: %s" %protocol
    347      
    348   return buffer 
    349 
    350 
    351 def raw_gather(x, buffer, source, vanilla=0):
    352   """Wrapper for MPI gather.
    353      Gather first nums elements in x to buffer (of size given by nums)
    354      from source.
    355      
    356      Automatically determine appropriate protocol
    357      and call corresponding send function.
    358      
    359      The variable x can be any (picklable) type, but
    360      Numeric variables and text strings will most efficient.
    361      Setting vanilla = 1 forces vanilla mode for any type.
    362 
    363   """
    364 
    365   protocol = get_control_info(x, vanilla)[0]
    366   if protocol == 'array':
    367     gather_array(x, buffer, source)   
    368   elif protocol == 'string':
    369     gather_string(x, buffer, source)         
    370   elif protocol == 'vanilla':
    371     raise "Protocol: %s unsupported for gather" %protocol
    372   else:
    373     raise "Unknown values for protocol: %s" %protocol 
    374    
    375   return buffer
    376 
    377 
    378 def gather(x, source, vanilla=0):
    379   """Wrapper for easy MPI Gather receive.
    380      Receive data from source with tag.
    381      
    382      Create appropriate buffer and receive data.
    383   """
    384   from mpiext import size
    385   numproc = size()
    386  
    387   control_info = get_control_info(x)
    388  
    389   protocol = control_info[0]
    390   typecode = control_info[1]
    391   size =     control_info[2]*numproc
    392   shape =    control_info[3]
    393 
    394   if protocol == 'array':
    395     import Numeric
    396     buffer = Numeric.zeros(size,typecode)
    397 
    398     # Modify shape along axis=0 to match size
    399     shape = list(shape)
    400     shape[0] *= numproc
    401    
    402     buffer = Numeric.reshape(buffer, shape)
    403    
    404     gather_array(x, buffer, source)   
    405   elif protocol == 'string':
    406     buffer = ' '*size
    407     gather_string(x, buffer, source)         
    408   elif protocol == 'vanilla':
    409     raise "Protocol: %s unsupported for gather" %protocol
    410   else:
    411     raise "Unknown values for protocol: %s" %protocol
    412      
    413   return buffer 
    414 
    415 
    416 
    417 def raw_reduce(x, buffer, op, source, vanilla=0):
    418   """Wrapper for MPI_Reduce.
    419      Reduce nums elements in x to buffer (of the same size as x)
    420      at source applying operation op elementwise.
    421      
    422      Automatically determine appropriate protocol
    423      and call corresponding send function.
    424    
    425   """
    426 
    427   protocol = get_control_info(x, vanilla)[0]
    428   if protocol == 'array':
    429     if not x.typecode() == buffer.typecode():
    430       raise "Input array and buffer must have the same typecode"
    431     reduce_array(x, buffer, op, source)         
    432   elif (protocol == 'vanilla' or protocol == 'string'):
    433     raise "Protocol: %s unsupported for reduce" %protocol
    434   else:
    435     raise "Unknown values for protocol: %s" %protocol 
    436    
    437   return buffer         
    438 
    439 
    440 def reduce(x, nums, op, source, vanilla=0):
    441   """Wrapper for easy MPI Gather receive.
    442      Receive data from source with tag.
    443      
    444      Create appropriate buffer and receive data.
    445   """
    446 
    447   control_info = get_control_info(x)
    448  
    449   protocol = control_info[0]
    450   typecode = control_info[1]
    451   s_size =  nums   
    452  
    453   if protocol == 'array':
    454     import Numeric
    455     buffer = Numeric.zeros(s_size * size(),typecode)
    456     reduce_array(x, buffer, s_size, op, source)   
    457   elif (protocol == 'vanilla' or protocol == 'string'):
    458     raise "Protocol: %s unsupported for reduce" %protocol
    459   else:
    460     raise "Unknown values for protocol: %s" %protocol
    461      
    462   return buffer 
    463 
    464 
    465      
     68def send(x, destination, use_buffer=False, vanilla=False,
     69         tag=default_tag, bypass=False):
     70    """Wrapper for easy MPI send.
     71       Send x to destination.
     72       
     73       Automatically determine appropriate protocol
     74       and call corresponding send function.
     75       Also passes type and size information on as preceding message to
     76       simplify the receive call.
     77       
     78       The variable x can be any (picklable) type, but
     79       Numeric variables and text strings will most efficient.
     80       Setting vanilla = 1 forces vanilla mode for any type.
     81
     82       If bypass is True, all admin and error checks
     83       get bypassed to reduce the latency. Should only
     84       be used for sending Numeric arrays and should be matched
     85       with a bypass in the corresponding receive command.
     86
     87    """
     88    import types, string
     89
     90    if bypass:
     91        send_array(x, destination, tag)
     92        return
     93       
     94    #Input check
     95    errmsg = 'Destination id (%s) must be an integer.' %destination
     96    assert type(destination) == types.IntType, errmsg
     97   
     98    errmsg = 'Tag %d is reserved by pypar - please use another.' %control_tag
     99    assert tag != control_tag, errmsg
     100
     101    #Create metadata about object to be sent
     102    control_info, x = create_control_info(x, vanilla, return_object=True)
     103    protocol = control_info[0]
     104
     105
     106    #Possibly transmit control data
     107    if use_buffer is False:
     108        send_control_info(control_info, destination)   
     109
     110     
     111    #Transmit payload data   
     112    if protocol == 'array':
     113        send_array(x, destination, tag)   
     114    elif protocol in ['string', 'vanilla']:
     115        send_string(x, destination, tag)         
     116    else:
     117        raise 'Unknown protocol: %s' %protocol   
     118
     119     
     120def receive(source, buffer=None, vanilla=False, tag=default_tag,
     121            return_status=False, bypass=False):           
     122    """receive - blocking MPI receive
     123   
     124       Receive data from source.
     125
     126       Optional parameters:
     127         buffer: Use specified buffer for received data (faster). Default None.
     128         vanilla: Specify to enforce vanilla protocol for any type. Default False
     129         tag: Only received messages tagged as specified. Default default_tag
     130         return_status: Return Status object along with result. Default False.
     131
     132       If no buffer is specified, receive will try to receive a
     133       preceding message containing protocol, type, size and shape and
     134       then create a suitable buffer.
     135
     136       If buffer is specified the corresponding send must specify
     137       use_buffer = True.
     138       The variable buffer can be any (picklable) type, but
     139       Numeric variables and text strings will most efficient.
     140
     141       Appropriate protocol will be automatically determined
     142       and corresponding receive function called.
     143
     144
     145       If bypass is True, all admin and error checks
     146       get bypassed to reduce the latency. Should only
     147       be used for receiving Numerical arrays and should
     148       be matched with a bypass in the corresponding send command.
     149       Also buffer must be specified.
     150    """
     151
     152    if bypass:
     153        #errmsg = 'bypass mode must be used with specified buffer'
     154        #assert buffer is not None, msg
     155        stat = receive_array(buffer, source, tag)       
     156    else:   
     157   
     158        import types
     159   
     160        #Input check
     161        errmsg = 'Source id (%s) must be an integer.' %source
     162        assert type(source) == types.IntType, errmsg
     163   
     164        errmsg = 'Tag %d is reserved by pypar - please use another.' %control_tag
     165        assert tag != control_tag, errmsg
     166   
     167   
     168        #Either receive or create metadata about objetc to receive
     169        if buffer is None:
     170            protocol, typecode, size, shape = receive_control_info(source)
     171        else: 
     172            protocol, typecode, size, shape = create_control_info(buffer, vanilla)
     173   
     174   
     175        #Receive payload data     
     176        if protocol == 'array':
     177            if buffer is None:
     178                import Numeric
     179                buffer = Numeric.zeros(size,typecode)
     180                buffer = Numeric.reshape(buffer, shape)
     181           
     182            stat = receive_array(buffer, source, tag)
     183           
     184        elif protocol == 'string':
     185            if buffer is None:
     186                buffer = ' '*size
     187           
     188            stat = receive_string(buffer, source, tag)
     189   
     190        elif protocol == 'vanilla':
     191            from cPickle import dumps, loads     
     192            if buffer is None:
     193                s = ' '*size     
     194            else:
     195                s = dumps(buffer, 1)
     196                s = s + ' '*int(0.1*len(s)) #safety
     197           
     198            stat = receive_string(s, source, tag)
     199            buffer = loads(s)  #Replace buffer with received result
     200        else:
     201            raise 'Unknown protocol: %s' %protocol
     202
     203    # Return received data and possibly the status object 
     204    if return_status:
     205        return buffer, Status(stat)
     206    else:
     207        return buffer
     208
     209
     210def broadcast(buffer, root, vanilla=False):
     211    """Wrapper for MPI bcast.
     212
     213       Broadcast buffer from the process with rank root to all other processes.
     214
     215   
     216       Automatically determine appropriate protocol
     217       and call corresponding send function.
     218       
     219       The variable buffer can be any (picklable) type, but
     220       Numeric variables and text strings will most efficient.
     221       Setting vanilla = 1 forces vanilla mode for any type.
     222
     223    """
     224
     225    import types
     226   
     227    #Input check
     228    errmsg = 'Root id (%s) must be an integer.' %root
     229    assert type(root) == types.IntType, errmsg
     230
     231
     232    #Create metadata about object to be sent
     233    protocol = create_control_info(buffer, vanilla)[0]
     234
     235
     236    #Broadcast
     237    if protocol == 'array':
     238        broadcast_array(buffer, root)   
     239    elif protocol == 'string':
     240        broadcast_string(buffer, root)         
     241    elif protocol == 'vanilla':
     242        from cPickle import loads, dumps
     243        s = dumps(buffer, 1)
     244        s = s + ' '*int(0.1*len(s)) #safety
     245       
     246        broadcast_string(s, root)
     247        buffer = loads(s)
     248    else:
     249        raise 'Unknown protocol: %s' %protocol 
     250     
     251    return buffer         
     252
     253
     254def scatter(x, root, buffer=None, vanilla=False):
     255    """Sends data x from process with rank root to all other processes.
     256   
     257       Create appropriate buffer and receive data.
     258       Return scattered result (same type as x)
     259
     260       Scatter makes only sense for arrays or strings
     261    """
     262
     263    import types
     264    from mpiext import size
     265    numproc = size()         #Needed to determine buffer size   
     266   
     267    #Input check
     268    errmsg = 'Root id (%s) must be an integer.' %root
     269    assert type(root) == types.IntType, errmsg
     270
     271   
     272    #Create metadata about object to be sent
     273    protocol, typecode, size, shape = create_control_info(x)
     274
     275    #Scatter
     276    if protocol == 'array':
     277        if buffer is None:
     278            import Numeric
     279           
     280            # Modify shape along axis=0 to match size
     281            shape = list(shape)
     282            shape[0] /= numproc
     283            count = Numeric.product(shape)           
     284           
     285            buffer = Numeric.zeros(count, typecode)
     286            buffer = Numeric.reshape(buffer, shape)
     287     
     288        scatter_array(x, buffer, root)
     289    elif protocol == 'string':
     290        if buffer is None:
     291            buffer = ' '*(size/numproc)
     292           
     293        scatter_string(x, buffer, root)
     294    elif protocol == 'vanilla':
     295        errmsg = 'Scatter is only supported for Numeric arrays and strings.\n'
     296        errmsg += 'If you wish to distribute a general sequence, '
     297        errmsg += 'please use send and receive commands or broadcast.'
     298        raise errmsg
     299    else:
     300        raise 'Unknown protocol: %s' %protocol
     301       
     302    return buffer 
     303
     304
     305def gather(x, root, buffer=None, vanilla=0):
     306    """Gather values from all processes to root
     307       
     308       Create appropriate buffer and receive data.
     309
     310       Gather only makes sens for arrays or strings
     311    """
     312
     313    import types     
     314    from mpiext import size
     315    numproc = size()         #Needed to determine buffer size
     316
     317    #Input check
     318    errmsg = 'Root id (%s) must be an integer.' %root
     319    assert type(root) == types.IntType, errmsg
     320
     321    #Create metadata about object to be gathered
     322    protocol, typecode, size, shape = create_control_info(x)
     323
     324    #Gather
     325    if protocol == 'array':
     326        if buffer is None:
     327            import Numeric
     328            buffer = Numeric.zeros(size*numproc, typecode)
     329
     330            # Modify shape along axis=0 to match size
     331            shape = list(shape)
     332            shape[0] *= numproc
     333            buffer = Numeric.reshape(buffer, shape)
     334     
     335        gather_array(x, buffer, root)   
     336    elif protocol == 'string':
     337        if buffer is None:
     338            buffer = ' '*size*numproc
     339       
     340        gather_string(x, buffer, root)         
     341    elif protocol == 'vanilla':
     342        errmsg = 'Gather is only supported for Numeric arrays and strings.\n'
     343        errmsg += 'If you wish to distribute a general sequence, '
     344        errmsg += 'please use send and receive commands or broadcast.'
     345        raise errmsg
     346    else:
     347        raise 'Unknown protocol: %s' %protocol
     348       
     349    return buffer 
     350
     351
     352def reduce(x, op, root, buffer=None, vanilla=0):
     353    """Reduce elements in x to buffer (of the same size as x)
     354       at root applying operation op elementwise.
     355    """
     356
     357    import types
     358   
     359    from mpiext import size
     360    numproc = size()         #Needed to determine buffer size
     361
     362
     363    #Input check
     364    errmsg = 'Root id (%s) must be an integer.' %root
     365    assert type(root) == types.IntType, errmsg
     366
     367    #Create metadata about object
     368    protocol, typecode, size, shape = create_control_info(x)
     369
     370
     371    #Reduce
     372    if protocol == 'array':
     373        if buffer is None:
     374            import Numeric
     375            buffer = Numeric.zeros(size*numproc, typecode)
     376     
     377            # Modify shape along axis=0 to match size
     378            shape = list(shape)
     379            shape[0] *= numproc
     380            buffer = Numeric.reshape(buffer, shape)
     381     
     382        reduce_array(x, buffer, op, root)   
     383    elif (protocol == 'vanilla' or protocol == 'string'):
     384        raise 'Protocol: %s unsupported for reduce' %protocol
     385    else:
     386        raise 'Unknown protocol: %s' %protocol
     387     
     388    return buffer 
     389
    466390
    467391#---------------------------------------------------------
    468392# AUXILIARY FUNCTIONS
    469393#---------------------------------------------------------
    470 def get_control_info(x, vanilla=0):
    471   """Determine which protocol to use for communication:
    472      (Numeric) arrays, strings, or vanilla based x's type.
    473 
    474      There are three protocols:
    475      'array':   Numeric arrays of type 'i', 'l', 'f', or 'd' can be communicated
    476                 with mpiext.send_array and mpiext.receive_array.
    477      'string':  Text strings can be communicated with mpiext.send_string and
    478                 mpiext.receive_string.
    479      'vanilla': All other types can be communicated using the scripts send_vanilla and
    480                 receive_vanilla provided that the objects can be serialised using
    481                 pickle (or cPickle). The latter mode is less efficient than the
    482                 first two but it can handle complex structures.
    483 
    484      Rules:
    485      If keyword argument vanilla == 1, vanilla is chosen regardless of
    486      x's type.
    487      Otherwise if x is a string, the string protocol is chosen
    488      If x is an array, the 'array' protocol is chosen provided that x has one
    489      of the admissible typecodes.
    490   """
    491 
    492   import types
    493 
    494 
    495   protocol = 'vanilla'
    496   typecode = ' '
    497   size = '0'
    498   shape = ()
    499  
    500   if not vanilla:
    501     if type(x) == types.StringType:
    502       protocol = 'string'
    503       typecode = 'c'
    504       size = len(x)
    505     elif type(x).__name__ == 'array': #Don't use Numeric as it isn't imported yet
    506       try:
    507         import Numeric
    508       except:
    509         print "WARNING (pypar.py): Numeric module could not be imported,",
    510         print "reverting to vanilla mode"
    511         protocol = 'vanilla'
    512       else: 
    513         typecode = x.typecode()
    514         if typecode in ['i', 'l', 'f', 'd', 'F', 'D']:
    515           protocol = 'array'
    516           shape = x.shape
    517           size = Numeric.product(shape)
    518         else:   
    519           print "WARNING (pypar.py): Numeric object type %s is not supported."\
    520                 %(x.typecode())
    521           print "Only types 'i', 'l', 'f', 'd', 'F', 'D' are supported,",
    522           print "Reverting to vanilla mode."
    523           protocol = 'vanilla'
    524 
    525   control_info = [protocol, typecode, size, shape]
    526 
    527   return control_info
     394def balance(N, P, p):
     395    """Compute p'th interval when N is distributed over P bins.
     396    """
     397
     398    from math import floor
     399
     400    L = int(floor(float(N)/P))
     401    K = N - P*L
     402    if p < K:
     403        Nlo = p*L + p
     404        Nhi = Nlo + L + 1
     405    else:
     406        Nlo = p*L + K
     407        Nhi = Nlo + L
     408
     409    return Nlo, Nhi
     410
     411
     412# Obsolete functions
     413# (for backwards compatibility - remove in version 2.0)
     414
     415def raw_send(x, destination, tag=default_tag, vanilla=0):
     416    send(x, destination, use_buffer=True, tag=tag, vanilla=vanilla)
     417
     418
     419def raw_receive(x, source, tag=default_tag, vanilla=0, return_status=0):
     420    x = receive(source, tag=tag, vanilla=vanilla,
     421              return_status=return_status, buffer=x)
     422    return x
     423
     424def raw_scatter(x, buffer, source, vanilla=0):
     425    scatter(x, source, buffer=buffer, vanilla=vanilla)
     426
     427def raw_gather(x, buffer, source, vanilla=0):
     428    gather(x, source, buffer=buffer, vanilla=0) 
     429
     430def raw_reduce(x, buffer, op, source, vanilla=0):
     431    reduce(x, op, source, buffer=buffer, vanilla=0)
     432
     433def bcast(buffer, root, vanilla=False):
     434    return broadcast(buffer, root, vanilla)
     435
     436def Wtime():
     437    return time()
     438
     439def Get_processor_name():
     440    return get_processor_name()
     441
     442def Initialized():
     443    return initialized()
     444
     445def Finalize():
     446    finalize()
     447
     448def Abort():
     449    abort()
     450
     451def Barrier():
     452    barrier()
     453   
     454     
     455
     456#---------------------------------------------------------
     457# INTERNAL FUNCTIONS
     458#---------------------------------------------------------
     459
     460class Status:
     461    """ MPI Status block returned by receive if
     462        specified with parameter return_status=True
     463    """   
     464 
     465    def __init__(self, status_tuple):
     466        self.source = status_tuple[0]  #Id of sender
     467        self.tag = status_tuple[1]     #Tag of received message
     468        self.error = status_tuple[2]   #MPI Error code
     469        self.length = status_tuple[3]  #Number of elements transmitted
     470        self.size = status_tuple[4]    #Size of one element
     471
     472    def __repr__(self):
     473        return 'Pypar Status Object:\n  source=%d\n  tag=%d\n '+\
     474               'error=%d\n  length=%d\n  size=%d\n'\
     475               %(self.source, self.tag, self.error, self.length, self.size)
     476 
     477    def bytes(self):
     478        """Number of bytes transmitted (excl control info)
     479        """
     480        return self.length * self.size
     481 
     482
     483
     484def create_control_info(x, vanilla=0, return_object=False):
     485    """Determine which protocol to use for communication:
     486       (Numeric) arrays, strings, or vanilla based x's type.
     487
     488       There are three protocols:
     489       'array':   Numeric arrays of type 'i', 'l', 'f', 'd', 'F' or 'D' can be
     490                  communicated  with mpiext.send_array and mpiext.receive_array.
     491       'string':  Text strings can be communicated with mpiext.send_string and
     492                  mpiext.receive_string.
     493       'vanilla': All other types can be communicated as string representations
     494                  provided that the objects
     495                  can be serialised using pickle (or cPickle).
     496                  The latter mode is less efficient than the
     497                  first two but it can handle general structures.
     498
     499       Rules:
     500       If keyword argument vanilla == 1, vanilla is chosen regardless of
     501       x's type.
     502       Otherwise if x is a string, the string protocol is chosen
     503       If x is an array, the 'array' protocol is chosen provided that x has one
     504       of the admissible typecodes.
     505
     506       The optional argument return_object asks to return object as well.
     507       This is useful in case it gets modified as in the case of general structures
     508       using the vanilla protocol.
     509    """
     510
     511    import types
     512
     513    #Default values
     514    protocol = 'vanilla'
     515    typecode = ' '
     516    size = 0
     517    shape = ()
     518
     519    #Determine protocol in case
     520    if not vanilla:
     521        if type(x) == types.StringType:
     522            protocol = 'string'
     523            typecode = 'c'
     524            size = len(x)
     525        elif type(x).__name__ == 'array': #Numeric isn't imported yet
     526            try:
     527                import Numeric
     528            except:
     529                print "WARNING (pypar.py): Numeric module could not be imported,",
     530                print "reverting to vanilla mode"
     531                protocol = 'vanilla'
     532            else: 
     533                typecode = x.typecode()
     534                if typecode in ['i', 'l', 'f', 'd', 'F', 'D']:
     535                    protocol = 'array'
     536                    shape = x.shape
     537                    size = Numeric.product(shape)
     538                else:   
     539                    print "WARNING (pypar.py): Numeric object type %s is not supported."\
     540                          %(x.typecode())
     541                    print "Only types 'i', 'l', 'f', 'd', 'F', 'D' are supported,",
     542                    print "Reverting to vanilla mode."
     543                    protocol = 'vanilla'
     544
     545    #Pickle general structures using the vanilla protocol               
     546    if protocol == 'vanilla':                   
     547        from cPickle import dumps     
     548        x = dumps(x, 1)
     549        size = len(x) # Let count be length of pickled object
     550
     551    #Return   
     552    if return_object:
     553        return [protocol, typecode, size, shape], x
     554    else: 
     555        return [protocol, typecode, size, shape]
    528556
    529557
     
    532560
    533561def send_control_info(control_info, destination):
    534   """Send control info to destination
    535   """
    536   import string
    537 
    538   #Convert to strings
    539   control_info = [str(c) for c in control_info]
    540  
    541   msg = string.join(control_info,control_sep)
    542   send_string(msg, destination, control_tag)
     562    """Send control info to destination
     563    """
     564    import string
     565
     566    #Convert to strings
     567    control_info = [str(c) for c in control_info]
     568 
     569    control_msg = string.join(control_info,control_sep)
     570    if len(control_msg) > control_data_max_size:
     571        errmsg = 'Length of control_info exceeds specified maximium (%d)'\
     572                 %control_data_max_size
     573        errmsg += ' - Please increase it (in pypar.py)' 
     574        raise errmsg
     575 
     576    send_string(control_msg, destination, control_tag)
    543577
    544578 
    545579def receive_control_info(source):
    546   """Receive control info from source
    547   """
    548   import string
    549  
    550   msg = ' '*control_data_max_size
    551 
    552   err, stat = receive_string(msg, source, control_tag)
    553   if err:
    554     raise Exception
    555   #No need to create status object here - it is reserved
    556   #for genuine communications only
    557 
    558   msg = msg[:stat[3]] #Trim buffer to actual received length
    559 
    560   control_info = msg.split(control_sep)
    561 
    562   assert len(control_info) == 4, 'len(control_info) = %d' %len(control_info)
    563   control_info[2] = eval(control_info[2]) #Convert back to int
    564   control_info[3] = eval(control_info[3]) #Convert back to tuple
    565 
    566 
    567   return control_info
    568 
    569 
    570 
    571 
    572 # Auxiliary functions
    573 # used only by raw communication
    574 #
    575 def send_vanilla(x, destination, tag=default_tag):
    576   from cPickle import dumps
    577   from mpiext import send_string
    578 
    579   s=dumps(x, 1)
    580   send_string(s, destination, tag)     
    581   return len(s)
    582 
    583 
    584 def receive_vanilla(x, source, tag=default_tag):
    585   from cPickle import loads, dumps
    586   from mpiext import receive_string
    587 
    588  
    589   #Create buffer of the right size
    590   #(assuming that x is similar to sent x).
    591 
    592   if options['vanilla_bufsize']:
    593     s = ' '*options['vanilla_bufsize']
    594   else: 
    595     s = dumps(x, 1)
    596     s = s + ' '*int(0.1*len(s)) #safety
    597 
    598                      
    599   err, stat = receive_string(s, source, tag)
    600   if err:
    601     raise 'receive_string failed with error code %d' %err
    602 
    603   x = loads(s)
    604 
    605   return x, stat
    606  
     580    """Receive control info from source
     581    """
     582    import string
     583 
     584    msg = ' '*control_data_max_size
     585
     586    stat = receive_string(msg, source, control_tag)
     587    #No need to create status object here - it is reserved
     588    #for payload communications only
     589
     590    msg = msg[:stat[3]] #Trim buffer to actual received length (needed?)
     591
     592    control_info = msg.split(control_sep)
     593
     594    assert len(control_info) == 4, 'len(control_info) = %d' %len(control_info)
     595    control_info[2] = eval(control_info[2]) #Convert back to int
     596    control_info[3] = eval(control_info[3]) #Convert back to tuple
     597
     598
     599    return control_info
     600
    607601
    608602#----------------------------------------------------------------------------
     
    612606
    613607# Take care of situation where module is part of package
    614 import os, string, os.path
     608import sys, os, string, os.path
    615609dirname = os.path.dirname(string.replace(__name__,'.',os.sep)).strip()
    616610
     
    623617
    624618
    625 
    626619# Import MPI extension
    627620#
     
    629622
    630623try:
    631   import mpiext
     624    import mpiext
    632625except:
    633   errmsg = 'ERROR: C extension mpiext could not be imported.\n'
    634   errmsg += 'Please compile mpiext.c e.g. by running\n'
    635   errmsg += '  python install.py\n'
    636   errmsg += 'in the pypar directory, or by using\n'
    637   errmsg += '  python setup.py install\n'
    638   raise Exception, errmsg
     626    errmsg = 'ERROR: C extension mpiext could not be imported.\n'
     627    errmsg += 'Please compile mpiext.c e.g. by running\n'
     628    errmsg += '  python install.py\n'
     629    errmsg += 'in the pypar directory, or by using\n'
     630    errmsg += '  python setup.py install\n'
     631    raise Exception, errmsg
    639632
    640633 
     
    643636
    644637if sys.platform in ['linux2', 'sunos5', 'win32', 'darwin']:
    645   #Linux (LAM,MPICH) or Sun (MPICH)
    646   error = 0  #Sequential execution of MPI is allowed   
     638    #Linux (LAM,MPICH) or Sun (MPICH)
     639    error = 0  #Sequential execution of MPI is allowed   
    647640else:
    648   #Platform: Alpha 'osf1V5' 
    649   cmdstring = '"import mpiext, sys; mpiext.init(sys.argv); mpiext.Finalize()"'
    650   #s = 'cd %s; python -c %s' %(dirname, cmdstring)
    651   s = 'python -c %s >/dev/null 2>/dev/null' %cmdstring 
    652   error = os.system(s)
    653  
    654  
    655   # The check is performed in a separate shell.
    656   # Reason: The Alpha server, LAM/Linux or the Sun cannot recover from a
    657   # try:
    658   #   mpiext.init(sys.argv)
    659 
    660   # However, on LAM/Linux, this test causes system to hang.
    661   # Verified (OMN 12/12/2)
    662   # If lamboot is started, the system, will hang when init is called
    663   # again further down in this file.
    664   # If lamboot is not loaded error will be nozero as it should.
    665   # I don't know how to deal with this
    666   #
    667   #Comparisons of two strategies using LAM
    668   #
    669   # Strategy 1: Assume seq execution is OK (i.e. set error = 0)
    670   # Strategy 2: Try to test if mpi can be initialised (in a separate shell)
    671   #
    672   #
    673   # Strategy 1 (currently used)
    674   #                    | Lam booted  | Lam not booted
    675   #-----------------------------------------------------
    676   #
    677   # Sequential exec    |  OK         | Not OK
    678   # Parallel exec      |  OK         | Not OK 
    679   #
    680   #
    681   # Strategy 2
    682   #                    | Lam booted  | Lam not booted
    683   #-----------------------------------------------------
    684   #
    685   # Sequential exec    |  Hangs      | Not OK
    686   # Parallel exec      |  Hangs      | OK 
    687   #
     641    #Platform: Alpha 'osf1V5' 
     642    cmdstring = '"import mpiext, sys; mpiext.init(sys.argv); mpiext.Finalize()"'
     643    #s = 'cd %s; python -c %s' %(dirname, cmdstring)
     644    s = 'python -c %s >/dev/null 2>/dev/null' %cmdstring 
     645    error = os.system(s)
     646 
     647    # The check is performed in a separate shell.
     648    # Reason: The Alpha server, LAM/Linux or the Sun cannot recover from a
     649    # try:
     650    #   mpiext.init(sys.argv)
     651
     652    # However, on LAM/Linux, this test causes system to hang.
     653    # Verified (OMN 12/12/2)
     654    # If lamboot is started, the system, will hang when init is called
     655    # again further down in this file.
     656    # If lamboot is not loaded error will be nozero as it should.
     657    # I don't know how to deal with this
     658    #
     659    #Comparisons of two strategies using LAM
     660    #
     661    # Strategy 1: Assume seq execution is OK (i.e. set error = 0)
     662    # Strategy 2: Try to test if mpi can be initialised (in a separate shell)
     663    #
     664    #
     665    # Strategy 1 (currently used)
     666    #                    | Lam booted  | Lam not booted
     667    #-----------------------------------------------------
     668    #
     669    # Sequential exec    |  OK         | Not OK
     670    # Parallel exec      |  OK         | Not OK 
     671    #
     672    #
     673    # Strategy 2
     674    #                    | Lam booted  | Lam not booted
     675    #-----------------------------------------------------
     676    #
     677    # Sequential exec    |  Hangs      | Not OK
     678    # Parallel exec      |  Hangs      | OK 
     679    #
    688680
    689681
     
    696688
    697689if error:
    698   print "WARNING: MPI library could not be initialised - running sequentially"
    699 
    700   # Define rudimentary functions to keep sequential programs happy
    701 
    702   def size(): return 1
    703   def rank(): return 0
    704 
    705   def Get_processor_name():
    706     import os
    707     try:
    708       hostname = os.environ['HOST']
    709     except:
    710       try: 
    711         hostname = os.environ['HOSTNAME'] 
    712       except:
    713         hostname = 'Unknown' 
    714 
    715     return hostname
    716      
    717 
    718   def Abort():
    719     import sys
    720     sys.exit()
    721 
    722   def Finalize(): pass
    723  
    724   def Barrier(): pass 
    725 
    726   def Wtime():
    727     import time
    728     return time.time()
     690    print "WARNING: MPI library could not be initialised - running sequentially"
     691
     692    # Define rudimentary functions to keep sequential programs happy
     693
     694    def size(): return 1
     695    def rank(): return 0
     696
     697    def get_processor_name():
     698        import os
     699        try:
     700            hostname = os.environ['HOST']
     701        except:
     702            try: 
     703                hostname = os.environ['HOSTNAME'] 
     704            except:
     705                hostname = 'Unknown' 
     706
     707        return hostname
     708       
     709    def abort():
     710        import sys
     711        sys.exit()
     712
     713    def finalize(): pass
     714 
     715    def barrier(): pass 
     716
     717    def time():
     718        import time
     719        return time.time()
    729720
    730721else:
    731722
    732   from mpiext import size, rank, Barrier, Wtime, Get_processor_name,\
    733                   init, Initialized, Finalize, Abort,\
    734                   send_string, receive_string,\
    735                   send_array, receive_array, bcast_string, bcast_array,\
    736                   scatter_string, scatter_array,\
    737                   gather_string, gather_array,\
    738                   reduce_array,\
    739                   MPI_ANY_TAG as any_tag, MPI_TAG_UB as max_tag, MPI_ANY_SOURCE as any_source,\
    740                   MAX, MIN, SUM, PROD, LAND, BAND,\
    741                   LOR, BOR, LXOR, BXOR
    742 
    743   init(sys.argv) #Initialise MPI with cmd line (needed by MPICH/Linux)
    744 
    745   if rank() == 0:     
    746     print "MPI initialised OK with %d processors" %size()
    747 
    748 
    749 
    750 
    751 
    752 
     723    from mpiext import size, rank, barrier, time,\
     724         get_processor_name,\
     725         init, initialized, finalize, abort,\
     726         send_string, receive_string,\
     727         send_array, receive_array, broadcast_string, broadcast_array,\
     728         scatter_string, scatter_array,\
     729         gather_string, gather_array,\
     730         reduce_array,\
     731         MPI_ANY_TAG as any_tag, MPI_TAG_UB as max_tag,\
     732         MPI_ANY_SOURCE as any_source,\
     733         MAX, MIN, SUM, PROD, LAND, BAND,\
     734         LOR, BOR, LXOR, BXOR
     735
     736    init(sys.argv) #Initialise MPI with cmd line (needed by MPICH/Linux)
     737
     738    if rank() == 0:     
     739        print "Pypar (version %s) initialised MPI OK with %d processors" %(__version__, size())
     740
     741
     742
     743
     744
     745
Note: See TracChangeset for help on using the changeset viewer.