source: trunk/anuga_core/anuga/parallel/pypar_ext.py @ 9679

Last change on this file since 9679 was 9550, checked in by steve, 10 years ago

Changed lots mesh_factory to full name anuga. ...

File size: 16.9 KB
Line 
1# =============================================================================
2# pypar_ext.py - Extension to Parallel Python using MPI
3# Copyright (C) 2012 Stephen G Roberts
4#              (ANU)
5#
6#    This program is free software; you can redistribute it and/or modify
7#    it under the terms of the GNU General Public License as published by
8#    the Free Software Foundation; either version 2 of the License, or
9#    (at your option) any later version.
10#
11#    This program is distributed in the hope that it will be useful,
12#    but WITHOUT ANY WARRANTY; without even the implied warranty of
13#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14#    GNU General Public License (http://www.gnu.org/copyleft/gpl.html)
15#    for more details.
16#
17#    You should have received a copy of the GNU General Public License
18#    along with this program; if not, write to the Free Software
19#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
20#
21#
22# Contact address: Stephen.Roberts@anu.edu.au
23#
24# Version: See pypar_ext.__version__
25# =============================================================================
26
27"""Module pypar_ext.py -Extension to pypar
28
29Need to import pypar first to init mpi.
30
31Public functions:
32
33
34
35isend() -- Asyncronous send (arrays)
36receive() --  Asyncronous receive (arrays)
37allreduce() -- wrapper for MPI_Allreduce (array)
38
39
40See doc strings of individual functions for detailed documentation.
41"""
42
43from numpy import zeros, reshape, product
44#from __metadata__ import __version__, __date__, __author__
45
46
47# Constants
48#
49max_tag = 32767      # Max tag value (MPI_TAG_UB didn't work and returned 0)
50control_tag = 13849  # Reserved tag used to identify control information
51default_tag = 1      # Tag used as default if not specified
52
53control_sep = ':'          # Separator for fields in control info (NOT ',')
54control_data_max_size = 64 # Maximal size of string holding control data
55
56
57#---------------------------------------------------------------------------
58# Communication functions
59#--------------------------------------------------------------------------
60
61def isend(x, destination, use_buffer=False, vanilla=False,
62         tag=default_tag, bypass=False):
63    """Wrapper for easy MPI send.
64       Send x to destination.
65
66       Automatically determine appropriate protocol
67       and call corresponding send function.
68       Also passes type and size information on as preceding message to
69       simplify the receive call.
70
71       The variable x can be any (picklable) type, but
72       numpy variables and text strings will most efficient.
73       Setting vanilla = 1 forces vanilla mode for any type.
74
75       If use_buffer is True, workspace x will be used for the return value.
76       In this case the corresponding receive call must specify a buffer.
77       Otherwise a new workspace will be created by receive.
78
79       If bypass is True, all admin and error checks
80       get bypassed to reduce the latency. Should only
81       be used for sending numpy arrays and should be matched
82       with a bypass in the corresponding receive command.
83
84    """
85    import types, string
86
87    if bypass:
88        isend_array(x, destination, tag)
89        return
90
91    # Input check
92    errmsg = 'Destination id (%s) must be an integer.' % destination
93    assert type(destination) == types.IntType, errmsg
94
95    errmsg = 'Tag %d is reserved by pypar - please use another.' % control_tag
96    assert tag != control_tag, errmsg
97
98    # Create metadata about object to be sent
99    control_info, x = create_control_info(x, vanilla, return_object=True)
100    protocol = control_info[0]
101
102
103    # Possibly transmit control data
104    if use_buffer is False:
105        send_control_info(control_info, destination)
106
107
108    # Transmit payload data
109    if protocol == 'array':
110        send_array(x, destination, tag)
111    elif protocol in ['string', 'vanilla']:
112        send_string(x, destination, tag)
113    else:
114        raise 'Unknown protocol: %s' %protocol
115
116
117def ireceive(source, buffer=None, vanilla=False, tag=default_tag,
118            return_status=False, bypass=False):
119    """receive - blocking MPI receive
120
121       Receive data from source.
122
123       Optional parameters:
124         buffer: Use specified buffer for received data (faster). Default None.
125         vanilla: Specify to enforce vanilla protocol for any type.
126                  Default False
127
128         tag: Only received messages tagged as specified. Default default_tag
129         return_status: Return Status object along with result. Default False.
130
131       If no buffer is specified, receive will try to receive a
132       preceding message containing protocol, type, size and shape and
133       then create a suitable buffer.
134
135       If buffer is specified the corresponding send must specify
136       use_buffer = True.
137       The variable buffer can be any (picklable) type, but
138       numpy variables and text strings will most efficient.
139
140       Appropriate protocol will be automatically determined
141       and corresponding receive function called.
142
143
144       If bypass is True, all admin and error checks
145       get bypassed to reduce the latency. Should only
146       be used for receiving numpy arrays and should
147       be matched with a bypass in the corresponding send command.
148       Also buffer must be specified.
149    """
150
151    if bypass:
152        # errmsg = 'bypass mode must be used with specified buffer'
153        # assert buffer is not None, msg
154        stat = ireceive_array(buffer, source, tag)
155    else:
156
157        import types
158
159        # Input check
160        errmsg = 'Source id (%s) must be an integer.' %source
161        assert type(source) == types.IntType, errmsg
162
163        errmsg = 'Tag %d is reserved by pypar - please use another.'\
164            % control_tag
165        assert tag != control_tag, errmsg
166
167
168        # Either receive or create metadata about object to receive
169        if buffer is None:
170            control_info, source = receive_control_info(source,
171                                                        return_source=True)
172            protocol, typecode, size, shape = control_info
173        else:
174            protocol, typecode, size, shape = create_control_info(buffer,
175                                                                  vanilla)
176
177
178        # Receive payload data
179        if protocol == 'array':
180            if buffer is None:
181                buffer = zeros(size,typecode)
182                buffer = reshape(buffer, shape)
183
184            stat = receive_array(buffer, source, tag)
185
186        elif protocol == 'string':
187            if buffer is None:
188                buffer = ' '*size
189
190            stat = receive_string(buffer, source, tag)
191
192        elif protocol == 'vanilla':
193            from cPickle import dumps, loads, UnpicklingError
194            if buffer is None:
195                s = ' '*size
196            else:
197                s = dumps(buffer, protocol=2)
198                s = s + ' '*int(0.1*len(s)) #safety
199
200            stat = receive_string(s, source, tag)
201            try:
202                buffer = loads(s)   #Replace buffer with received result
203            except UnpicklingError, err:
204                raise UnpicklingError(str(err) + " - '%s'" % s)
205        else:
206            raise 'Unknown protocol: %s' %protocol
207
208    # Return received data and possibly the status object
209    if return_status:
210        return buffer, Status(stat)
211    else:
212        return buffer
213
214
215
216
217def allreduce(x, op, buffer=None, vanilla=0, bypass=False):
218    """Allreduce elements in x to buffer (of the same size as x)
219       applying operation op elementwise.
220
221       If bypass is True, all admin and error checks
222       get bypassed to reduce the latency.
223       The buffer must be specified explicitly in this case.
224    """
225
226    if bypass:
227        allreduce_array(x, buffer, op)
228        return
229
230
231    import types
232    from pypar import size
233    numproc = size()         # Needed to determine buffer size
234
235
236
237    # Create metadata about object
238    protocol, typecode, size, shape = create_control_info(x)
239
240    # Allreduce
241    if protocol == 'array':
242        if buffer is None:
243            buffer = zeros(size*numproc, typecode)
244
245            # Modify shape along axis=0 to match size
246            shape = list(shape)
247            shape[0] *= numproc
248            buffer = reshape(buffer, shape)
249
250
251        msg = 'Data array and buffer must have same type '
252        msg = 'in allreduce. I got types "%s" and "%s"' % (x.dtype.char,
253                                                        buffer.dtype.char)
254        assert x.dtype.char == buffer.dtype.char, msg
255        allreduce_array(x, buffer, op)
256
257
258    elif (protocol == 'vanilla' or protocol == 'string'):
259        raise 'Protocol: %s unsupported for allreduce' % protocol
260    else:
261        raise 'Unknown protocol: %s' % protocol
262
263    return buffer
264
265
266
267
268#---------------------------------------------------------
269# INTERNAL FUNCTIONS
270#---------------------------------------------------------
271
272class Status:
273    """ MPI Status block returned by receive if
274        specified with parameter return_status=True
275    """
276
277    def __init__(self, status_tuple):
278        self.source = status_tuple[0]  # Id of sender
279        self.tag = status_tuple[1]     # Tag of received message
280        self.error = status_tuple[2]   # MPI Error code
281        self.length = status_tuple[3]  # Number of elements transmitted
282        self.size = status_tuple[4]    # Size of one element
283
284    def __repr__(self):
285        return 'Pypar Status Object:\n  source=%d\n  tag=%d\n  error=%d\n  length=%d\n  size=%d\n' %(self.source, self.tag, self.error, self.length, self.size)
286
287    def bytes(self):
288        """Number of bytes transmitted (excl control info)
289        """
290        return self.length * self.size
291
292
293
294def create_control_info(x, vanilla=0, return_object=False):
295    """Determine which protocol to use for communication:
296       (numpy) arrays, strings, or vanilla based x's type.
297
298       There are three protocols:
299       'array':   numpy arrays of type 'i', 'l', 'f', 'd', 'F' or 'D' can be
300                  communicated  with mpiext.send_array and mpiext.receive_array.
301       'string':  Text strings can be communicated with mpiext.send_string and
302                  mpiext.receive_string.
303       'vanilla': All other types can be communicated as string representations
304                  provided that the objects
305                  can be serialised using pickle (or cPickle).
306                  The latter mode is less efficient than the
307                  first two but it can handle general structures.
308
309       Rules:
310       If keyword argument vanilla == 1, vanilla is chosen regardless of
311       x's type.
312       Otherwise if x is a string, the string protocol is chosen
313       If x is an array, the 'array' protocol is chosen provided that x has one
314       of the admissible typecodes.
315
316       The optional argument return_object asks to return object as well.
317       This is useful in case it gets modified as in the case of general structures
318       using the vanilla protocol.
319    """
320
321    import types
322
323    # Default values
324    protocol = 'vanilla'
325    typecode = ' '
326    size = 0
327    shape = ()
328
329    # Determine protocol in case
330    if not vanilla:
331        if type(x) == types.StringType:
332            protocol = 'string'
333            typecode = 'c'
334            size = len(x)
335        elif type(x).__name__ == 'ndarray': # numpy isn't imported yet
336            try:
337                import numpy
338            except:
339                print "WARNING (pypar.py): numpy module could not be imported,",
340                print "reverting to vanilla mode"
341                protocol = 'vanilla'
342            else:
343                typecode = x.dtype.char
344                if typecode in ['i', 'l', 'f', 'd', 'F', 'D']:
345                    protocol = 'array'
346                    shape = x.shape
347                    size = product(shape)
348                else:
349                    print "WARNING (pypar.py): numpy object type %s is not supported."\
350                          %(x.dtype.char)
351                    print "Only types 'i', 'l', 'f', 'd', 'F', 'D' are supported,",
352                    print "Reverting to vanilla mode."
353                    protocol = 'vanilla'
354
355    # Pickle general structures using the vanilla protocol
356    if protocol == 'vanilla':
357        from cPickle import dumps
358        x = dumps(x, protocol=2)
359        size = len(x) # Let count be length of pickled object
360
361    # Return
362    if return_object:
363        return [protocol, typecode, size, shape], x
364    else:
365        return [protocol, typecode, size, shape]
366
367
368
369#----------------------------------------------
370
371def send_control_info(control_info, destination):
372    """Send control info to destination
373    """
374    import string
375
376    # Convert to strings
377    control_info = [str(c) for c in control_info]
378
379    control_msg = string.join(control_info,control_sep)
380    if len(control_msg) > control_data_max_size:
381        errmsg = 'Length of control_info exceeds specified maximium (%d)'\
382                 %control_data_max_size
383        errmsg += ' - Please increase it (in pypar.py)'
384        raise errmsg
385
386    send_string(control_msg, destination, control_tag)
387
388
389def receive_control_info(source, return_source=False):
390    """Receive control info from source
391
392    The optional argument (due to Jim Bosch) also returns the actual source node
393    which can be used to require that the data message come from the same node.
394    """
395
396    # FIXME (Ole): Perhaps we should include actual source in the control info?
397
398    import string
399
400    msg = ' '*control_data_max_size
401
402    stat = receive_string(msg, source, control_tag)
403    # No need to create status object here - it is reserved
404    # for payload communications only
405
406    msg = msg[:stat[3]] # Trim buffer to actual received length (needed?)
407
408    control_info = msg.split(control_sep)
409
410    assert len(control_info) == 4, 'len(control_info) = %d' %len(control_info)
411    control_info[2] = eval(control_info[2]) # Convert back to int
412    control_info[3] = eval(control_info[3]) # Convert back to tuple
413
414    if return_source:
415        return control_info, int(stat[0])
416    else:
417        return control_info
418
419
420#----------------------------------------------------------------------------
421# Initialise module
422#----------------------------------------------------------------------------
423
424
425# Take care of situation where module is part of package
426import sys, os, string, os.path
427dirname = os.path.dirname(string.replace(__name__,'.',os.sep)).strip()
428
429if not dirname:
430    dirname = '.'
431
432if dirname[-1] != os.sep:
433    dirname += os.sep
434
435
436
437# Import MPI extension
438#
439# Verify existence of mpiext.so.
440
441try:
442    import mpiextras
443except:
444    errmsg = 'ERROR: C extension mpiextras could not be imported.\n'
445    errmsg += 'Please compile mpiextras.c manually.\n'
446    #raise Exception, errmsg
447    error = 1
448    print errmsg
449else:
450
451    # Determine if MPI program is allowed to run sequentially on current platform
452    # Attempting to check this automatically may case some systems to hang.
453
454    if sys.platform in ['linux2', 'sunos5', 'win32', 'darwin']:
455        # Linux (LAM,MPICH) or Sun (MPICH)
456        error = 0  #Sequential execution of MPI is allowed
457    else:
458        # Platform: Alpha 'osf1V5'
459        cmdstring = '"import mpiext, sys; mpiext.init(sys.argv); mpiext.finalize()"'
460        #s = 'cd %s; python -c %s' %(dirname, cmdstring)
461        s = 'python -c %s >/dev/null 2>/dev/null' %cmdstring
462        error = os.system(s)
463
464        # The check is performed in a separate shell.
465        # Reason: The Alpha server, LAM/Linux or the Sun cannot recover from a
466        # try:
467        #   mpiext.init(sys.argv)
468
469        # However, on LAM/Linux, this test causes system to hang.
470        # Verified (OMN 12/12/2)
471        # If lamboot is started, the system, will hang when init is called
472        # again further down in this file.
473        # If lamboot is not loaded error will be nozero as it should.
474        # I don't know how to deal with this
475        #
476        #Comparisons of two strategies using LAM
477        #
478        # Strategy 1: Assume seq execution is OK (i.e. set error = 0)
479        # Strategy 2: Try to test if mpi can be initialised (in a separate shell)
480        #
481        #
482        # Strategy 1 (currently used)
483        #                    | Lam booted  | Lam not booted
484        #-----------------------------------------------------
485        #
486        # Sequential exec    |  OK         | Not OK
487        # Parallel exec      |  OK         | Not OK
488        #
489        #
490        # Strategy 2
491        #                    | Lam booted  | Lam not booted
492        #-----------------------------------------------------
493        #
494        # Sequential exec    |  Hangs      | Not OK
495        # Parallel exec      |  Hangs      | OK
496        #
497
498
499
500# Initialise MPI
501#
502# Attempt to initialise mpiext.so
503# If this fails, define a rudimentary interface suitable for
504# sequential execution.
505
506if error:
507    print "WARNING: MPI library could not be initialised"
508
509else:
510    from mpiextras import \
511         isend_array, \
512         ireceive_array, \
513         allreduce_array
514
515    # Work around bug in OpenMPI (December 2009):
516    # https://bugs.launchpad.net/ubuntu/+source/petsc4py/+bug/232036
517
518    from ctypes import *
519    CDLL('libmpi.so', RTLD_GLOBAL)
520    # End work around
521
522
523
524
525
526
527
528
Note: See TracBrowser for help on using the repository browser.