source: trunk/anuga_core/anuga/parallel/mpiextras.c @ 9679

Last change on this file since 9679 was 9534, checked in by steve, 10 years ago

Adding in mpiextras.c to parallel folder

File size: 19.1 KB
Line 
1/************************************************************************/
2/* PyPAR - Parallel Python using MPI                                    */
3/* Copyright (C) 2001 - 2009 Ole Nielsen                                */
4/*                                                                      */
5/* See enclosed README file for details of installation and use.        */
6/*                                                                      */   
7/* This program is free software; you can redistribute it and/or modify */
8/* it under the terms of the GNU General Public License as published by */
9/* the Free Software Foundation; either version 2 of the License, or    */
10/* (at your option) any later version.                                  */
11/*                                                                      */     
12/* This program is distributed in the hope that it will be useful,      */
13/* but WITHOUT ANY WARRANTY; without even the implied warranty of       */
14/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the        */
15/* GNU General Public License (http://www.gnu.org/copyleft/gpl.html)    */
16/* for more details.                                                    */
17/*                                                                      */
18/* You should have received a copy of the GNU General Public License    */
19/* along with this program; if not, write to the Free Software          */
20/*  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307*/
21/*                                                                      */
22/*                                                                      */
23/* Contact address: Ole.Moller.Nielsen@gmail.com                        */
24/*                                                                      */
25/* version (see __version__ in pypar.py)                                */
26/* date (see __date__ in pypar.py)                                      */
27/************************************************************************/
28
29
30#include "Python.h"
31#include "mpi.h"
32#include "math.h"
33#include "numpy/arrayobject.h"
34
35
36/* Handle MPI constants export (shamelessly stolen from _cursesmodule.c)*/
37#define SetDictInt(string,ch) \
38        PyDict_SetItemString(ModDict, string, PyInt_FromLong((long) (ch)));
39
40/* Remap struct MPI_op to int (easier to handle by python)*/
41#define MAX 1
42#define MIN 2
43#define SUM 3
44#define PROD 4
45#define LAND 5
46#define BAND 6
47#define LOR 7
48#define BOR 8
49#define LXOR 9
50#define BXOR 10
51#define MAXLOC 11
52#define MINLOC 12
53/*#define REPLACE 13 // Not available on all MPI systems */
54
55
56static char errmsg[132];  /*Used to create exception messages*/
57
58/* MPI_Bsend() related variables. */
59//static void *pt_buf;  /* Pointer to allocated buffer. */
60//static int buf_size;  /* Size of buffer to allocate. */
61
62int length(PyArrayObject *x) { 
63  /* Compute the total length of contiguous array */
64  /* Necessary for communicating multi dimensional arrays */
65 
66  int i, length;
67 
68  /* Compute total length of contiguous data */
69  length = 1;
70  for (i=0; i<x->nd; i++) {
71    length *= x->dimensions[i];
72  } 
73   
74  return length;
75} 
76
77/**********************************************************/
78/* type_map                                               */
79/*                                                        */
80/**********************************************************/
81MPI_Datatype type_map(PyArrayObject *x, int *count) { 
82  /* Return the MPI Datatype corresponding to                       
83     the Python data type as follows 
84 
85     TYPE    py_type  mpi_type  bytes  symbol
86     ----------------------------------------
87     INT       4        6         4      'i'
88     LONG      5        8         8      'l'
89     FLOAT     6       10         4      'f' 
90     DOUBLE    12      11         8      'd'
91 
92     Also return the total number of elements in the array
93 
94     The Python datatype COMPLEX ('F') and COMPLEX_DOUBLE ('D')
95     is treated as a special case to the absence of an
96     MPI_COMPLEX datatype:
97 
98     Complex arrays are mapped to float or double arrays with real
99     and imaginary parts alternating and count is updated. */
100 
101  int py_type;
102  MPI_Datatype mpi_type;
103  char err_msg[64];
104
105  *count = length(x);
106 
107  py_type = PyArray_TYPE(x);
108  if (py_type == NPY_DOUBLE) 
109    mpi_type = MPI_DOUBLE;
110  else if (py_type == NPY_INT) 
111    mpi_type = MPI_INT;
112  else if (py_type == NPY_CDOUBLE) {
113    mpi_type = MPI_DOUBLE;
114    (*count) *= 2;
115  } else if (py_type == NPY_FLOAT) 
116    mpi_type = MPI_FLOAT;
117  else if (py_type == NPY_LONG)   
118    mpi_type = MPI_LONG; 
119  else if (py_type == NPY_CFLOAT) {
120    mpi_type = MPI_FLOAT;
121    (*count) *= 2;
122  } else {
123    sprintf(err_msg, 
124            "Array must be of type int or float. I got py_type == %d", 
125            py_type);
126    PyErr_SetString(PyExc_ValueError, err_msg);
127    return (MPI_Datatype) NULL;
128  }     
129
130  //printf("Types: py_type=%d, mpi_type=%d\n", py_type, (int) mpi_type);
131 
132  return mpi_type;
133}   
134
135/**********************************************************/
136/* op_map                                                 */
137/*                                                        */
138/**********************************************************/
139MPI_Op op_map(int py_op) { 
140 
141  MPI_Op mpi_op;
142 
143  if (py_op == MAX) 
144    mpi_op = MPI_MAX;
145  else if (py_op == MIN)   
146    mpi_op = MPI_MIN; 
147  else if (py_op == SUM)   
148    mpi_op = MPI_SUM; 
149  else if (py_op == PROD)   
150    mpi_op = MPI_PROD; 
151  else if (py_op == LAND)   
152    mpi_op = MPI_LAND; 
153  else if (py_op == BAND)   
154    mpi_op = MPI_BAND; 
155  else if (py_op == LOR)   
156    mpi_op = MPI_LOR; 
157  else if (py_op == BOR)   
158    mpi_op = MPI_BOR; 
159  else if (py_op == LXOR)   
160    mpi_op = MPI_LXOR; 
161  else if (py_op == BXOR)   
162    mpi_op = MPI_BXOR; 
163  else if (py_op == MAXLOC)
164    mpi_op = MPI_MAXLOC;
165  else if (py_op == MINLOC)   
166    mpi_op = MPI_MINLOC; 
167  /*else if (py_op == REPLACE)*/   
168  /*  mpi_op = MPI_REPLACE; */ 
169  else {
170    PyErr_SetString(PyExc_ValueError, "Operation unknown");
171    return (MPI_Op) NULL;
172  }     
173 
174  return mpi_op;
175} 
176
177
178/**********************************************************/
179/* isend_array (asynchronous)                             */
180/* Send Numpy array of type float, double, int, or long   */
181/*                                                        */
182/**********************************************************/
183static PyObject *isend_array(PyObject *self, PyObject *args) {
184  PyObject *input;
185  PyArrayObject *x;
186  int destination, tag, error, count, myid;
187  MPI_Datatype mpi_type;
188
189  /* process the parameters */
190  if (!PyArg_ParseTuple(args, "Oii", &input, &destination, &tag))
191    return NULL;
192
193  /* Make Numpy array from general sequence type (no cost if already Numpy). */
194  x = (PyArrayObject *)
195    PyArray_ContiguousFromObject(input, NPY_NOTYPE, 0, 0);
196
197  /* Input check and determination of MPI type */
198  mpi_type = type_map(x, &count);
199  if (!mpi_type) return NULL;
200
201  /* call the MPI routine */
202  error = MPI_Send(x->data, count, mpi_type, destination, tag,
203                   MPI_COMM_WORLD);
204  Py_DECREF(x);
205
206  if (error != 0) {
207    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
208    sprintf(errmsg, "Proc %d: MPI_Send failed with error code %d\n",
209            myid, error);
210    PyErr_SetString(PyExc_RuntimeError, errmsg);
211    return NULL;
212  }
213
214  Py_INCREF(Py_None);
215  return (Py_None);
216}
217
218
219
220
221/*************************************************************/
222/* ireceive_array (asynchronous)                             */
223/* Receive Numpy array of type float, double, int, or long   */
224/*                                                           */
225/*************************************************************/
226static PyObject *ireceive_array(PyObject *self, PyObject *args) {
227  PyArrayObject *x;
228  int source, tag, error, st_length, size, count, myid;
229  MPI_Datatype mpi_type;
230  MPI_Status status;
231
232   
233  if (!PyArg_ParseTuple(args, "Oii", &x, &source, &tag))
234    return NULL;   
235   
236  /* Input check and determination of MPI type */         
237  mpi_type = type_map(x, &count);
238  if (!mpi_type) return NULL; 
239     
240  /* call the MPI routine */
241  error =  MPI_Recv(x->data, count, mpi_type, source, tag,
242                    MPI_COMM_WORLD, &status);
243         
244  /* Do not DECREF x as it must be returned to Python */
245         
246  if (error != 0) {
247    MPI_Comm_rank(MPI_COMM_WORLD, &myid);   
248    sprintf(errmsg, "Proc %d: MPI_Recv failed with error code %d\n", 
249            myid, error);
250    PyErr_SetString(PyExc_RuntimeError, errmsg);   
251    return NULL;
252  } 
253   
254         
255  MPI_Get_count(&status, mpi_type, &st_length); 
256  /* status.st_length is not available in all MPI implementations */
257  /* Alternative is: MPI_Get_elements(MPI_Status *, MPI_Datatype, int *); */
258         
259     
260  /* FIXME: This might not be watertight on all platforms */
261  /* Need C equivalent to itemsize().*/
262  if (mpi_type == MPI_DOUBLE) {
263    size = sizeof(double);  /*8 */
264  } else if (mpi_type == MPI_LONG) {
265    size = sizeof(long); /*8? */
266  } else if (mpi_type == MPI_FLOAT) {
267    size = sizeof(float);
268  } else if (mpi_type == MPI_INT) {   
269    size = sizeof(int); 
270  } else {
271    size = 4; 
272  }
273   
274  return Py_BuildValue("(iiiii)", status.MPI_SOURCE, status.MPI_TAG,
275  status.MPI_ERROR, st_length, size); 
276}
277
278
279
280
281
282/*************************************************************/
283/* allreduce_array                                           */
284/* Allreduce Numpy array of type float, double, int, or long */
285/*                                                           */
286/*************************************************************/
287static PyObject *allreduce_array(PyObject *self, PyObject *args) {
288  PyArrayObject *x;
289  PyArrayObject *d;
290  int op, error, count, count1, myid;
291  MPI_Datatype mpi_type, buffer_type;
292  MPI_Op mpi_op;
293
294  /* process the parameters */
295  if (!PyArg_ParseTuple(args, "OOi", &x, &d, &op)) {
296    PyErr_SetString(PyExc_RuntimeError,
297                    "mpiext.c (allreduce_array): could not parse input");
298    return NULL;
299  }
300
301  /* Input check and determination of MPI type */
302  mpi_type = type_map(x, &count);
303  if (!mpi_type) {
304    PyErr_SetString(PyExc_RuntimeError,
305                    "mpiext.c (allreduce_array): could not determine mpi_type");
306    return NULL;
307  }
308
309  /* This error is caught at the pypar level - so we won't end up here
310     unless mpiext is being used independently */
311  buffer_type = type_map(d, &count1);
312  if (mpi_type != buffer_type) {
313    sprintf(errmsg, "mpiext.c (allreduce_array): Input array and buffer must be of the same type.");
314    PyErr_SetString(PyExc_RuntimeError, errmsg);
315
316    return NULL;
317  }
318
319  if (count != count1) {
320    PyErr_SetString(PyExc_RuntimeError,
321                    "mpiext.c (allreduce_array): Input array and buffer must have same length");
322    return NULL;
323  }
324
325  /* Input check and determination of MPI op */
326  mpi_op = op_map(op);
327  if (!mpi_op) {
328    PyErr_SetString(PyExc_RuntimeError,
329                    "mpiext.c (allreduce_array): could not determine mpi_op");
330    return NULL;
331  }
332
333  if (op == MAXLOC || op == MINLOC) {
334    PyErr_SetString(PyExc_RuntimeError,
335                    "mpiext.c (allreduce_array): MAXLOC and MINLOC are not implemented");
336    return NULL;
337  }
338  else {
339    /* call the MPI routine */
340    error =  MPI_Allreduce(x->data, d->data, count, mpi_type, mpi_op, \
341                        MPI_COMM_WORLD);
342  }
343
344  if (error != 0) {
345    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
346    sprintf(errmsg, "Proc %d: MPI_Allreduce failed with error code %d\n",
347            myid, error);
348    PyErr_SetString(PyExc_RuntimeError, errmsg);
349    return NULL;
350  }
351
352  Py_INCREF(Py_None);
353  return (Py_None);
354}
355
356
357/*************************************************************/
358/* do multiple isends and irecv of Numpy array buffers        */
359/* of type float, double, int, or long                       */
360/*                                                           */
361/*************************************************************/
362static PyObject *sendrecv_array(PyObject *self, PyObject *args) {
363  PyObject *send_bufs;
364  PyArrayObject *send_dest;
365  PyObject *recv_bufs;
366  PyArrayObject *recv_dest;
367
368  PyObject *seq;
369
370  PyArrayObject *x;
371  //int op, error, count, count1, myid;
372  //MPI_Datatype mpi_type, buffer_type;
373  //MPI_Op mpi_op;
374  int i, len;
375
376  double *xdata;
377
378  /* process the parameters */
379  if (!PyArg_ParseTuple(args, "OOOO", &send_bufs, &send_dest, &recv_bufs, &recv_dest)) {
380    PyErr_SetString(PyExc_RuntimeError,
381                    "mpiextras.c (sendrecv_array): could not parse input");
382    return NULL;
383  }
384
385  seq = PySequence_Fast(send_bufs, "expected a sequence");
386  len = PySequence_Size(send_bufs);
387  printf("len of sequence %d\n",len);
388  for (i = 0; i < len; i++) {
389    x = (PyArrayObject *) PySequence_Fast_GET_ITEM(seq, i);
390    //lenx = x->dimensions[0];
391    printf("buf size %d\n",len);
392    xdata = (double *) x->data;
393    printf("x->data[0] %g\n",xdata[0]);
394  }
395  Py_DECREF(seq);
396
397
398  Py_INCREF(Py_None);
399  return (Py_None);
400}
401
402
403
404/*************************************************************/
405/* do multiple isends and irecv of Numpy array buffers        */
406/* of type float, double, int, or long                       */
407/*                                                           */
408/*************************************************************/
409static PyObject *send_recv_via_dicts(PyObject *self, PyObject *args) {
410
411  PyObject *send_dict;
412  PyObject *recv_dict;
413
414  //PyObject *seq;
415
416  //PyObject *list;
417  PyArrayObject *X;
418  //PyArrayObject *Id;
419  //int op, error, count, count1, myid;
420  //MPI_Datatype mpi_type, buffer_type;
421  //MPI_Op mpi_op;
422  int k, lenx;
423  int num_recv=0;
424  int num_send=0;
425
426  int ierr;
427  MPI_Request requests[40];
428  MPI_Status statuses[40];
429
430  Py_ssize_t pos = 0;
431
432  PyObject *key, *value;
433  //long *Iddata;
434  //double *xdata;
435
436  /* process the parameters */
437  if (!PyArg_ParseTuple(args, "OO", &send_dict, &recv_dict)) {
438    PyErr_SetString(PyExc_RuntimeError,
439                    "mpiextras.c (sendrecv_array): could not parse input");
440    return NULL;
441  }
442
443  //----------------------------------------------------------------------------
444  // Do the recv first
445  //----------------------------------------------------------------------------
446  num_recv = PyDict_Size(recv_dict);
447  //printf("num_recv = %d\n",num_recv);
448  if (num_recv>20) {
449    PyErr_SetString(PyExc_RuntimeError,
450                    "mpiextras.c; Number of recv communication buffers > 10");
451    return NULL;
452  }
453
454  pos = 0;
455  k = 0;
456  while (PyDict_Next(recv_dict, &pos, &key, &value)) {
457    int i = PyInt_AS_LONG(key);
458    //printf("key %d\n",i);
459
460    //Id = (PyArrayObject *) PyList_GetItem(value, 0);
461    X   = (PyArrayObject *) PyList_GetItem(value, 2);
462
463    lenx = X->dimensions[0]*X->dimensions[1];
464
465
466    //printf("buf size %d by 3\n",lenx/3);
467    //xdata = (double *) X->data;
468    //Iddata = (long *) Id->data;
469
470    //printf("k = %d \n",k);
471    ierr = MPI_Irecv(X->data, lenx, MPI_DOUBLE, i, 123, MPI_COMM_WORLD, &requests[k]);
472    if (ierr>0) {
473        PyErr_SetString(PyExc_RuntimeError,
474                    "mpiextras.c; error from MPI_Irecv");
475        return NULL;
476      }
477    //printf("ierr = %d \n",ierr);
478    k++;
479    //for (k=0 ; k < lenx ; k++) printf("X[%d] = %g Id[%d] = %ld \n",k,xdata[k],k,Iddata[k]);
480}
481  //----------------------------------------------------------------------------
482  // Do the sends second
483  //----------------------------------------------------------------------------
484  num_send = PyDict_Size(send_dict);
485  //printf("num_send = %d\n",num_send);
486  if (num_send>20) {
487    PyErr_SetString(PyExc_RuntimeError,
488                    "mpiextras.c; Number of send communication buffers > 10");
489    return NULL;
490  }
491
492  pos = 0;
493  while (PyDict_Next(send_dict, &pos, &key, &value)) {
494    int i = PyInt_AS_LONG(key);
495    //printf("key %d\n",i);
496
497    //Id = (PyArrayObject *) PyList_GetItem(value, 0);
498    X   = (PyArrayObject *) PyList_GetItem(value, 2);
499
500    lenx = X->dimensions[0]*X->dimensions[1];
501
502    //printf("buf size %d by 3 \n",lenx/3);
503    //xdata = (double *) X->data;
504    //Iddata = (long *) Id->data;
505    //for (k=0 ; k < lenx ; k++) printf("X[%d] = %g Id[%d] = %ld \n",k,xdata[k],k,Iddata[k]);
506
507    //printf("k = %d \n",k);
508    ierr = MPI_Isend(X->data, lenx, MPI_DOUBLE, i, 123, MPI_COMM_WORLD, &requests[k]);
509    //printf("ierr = %d \n",ierr);
510   
511    k++;
512}
513
514
515
516  //printf("k = %d\n",k);
517
518
519  //----------------------------------------------------------------------------
520  // Now complete communication. We could put some computation between the
521  // communication calls above and this call.
522  //----------------------------------------------------------------------------
523  ierr =  MPI_Waitall(k,requests,statuses);
524
525
526
527
528
529/*
530  for (i = 0; i < len; i++) {
531    x = (PyArrayObject *) PySequence_Fast_GET_ITEM(seq, i);
532    lenx = x->dimensions[0];
533    printf("buf size %d\n",len);
534    xdata = (double *) x->data;
535    printf("x->data[0] %g\n",xdata[0]);y
536  }
537  Py_DECREF(seq);
538*/
539
540
541/*
542//   Input check and determination of MPI type
543  mpi_type = type_map(x, &count);
544  if (!mpi_type) {
545    PyErr_SetString(PyExc_RuntimeError,
546                    "mpiext.c (allreduce_array): could not determine mpi_type");
547    return NULL;
548  }
549
550
551  //This error is caught at the pypar level - so we won't end up here
552  //  unless mpiext is being used independently
553  buffer_type = type_map(d, &count1);
554  if (mpi_type != buffer_type) {
555    sprintf(errmsg, "mpiext.c (allreduce_array): Input array and buffer must be of the same type.");
556    PyErr_SetString(PyExc_RuntimeError, errmsg);
557
558    return NULL;
559  }
560
561  if (count != count1) {
562    PyErr_SetString(PyExc_RuntimeError,
563                    "mpiext.c (allreduce_array): Input array and buffer must have same length");
564    return NULL;
565  }
566
567  // Input check and determination of MPI op
568  mpi_op = op_map(op);
569  if (!mpi_op) {
570    PyErr_SetString(PyExc_RuntimeError,
571                    "mpiext.c (allreduce_array): could not determine mpi_op");
572    return NULL;
573  }
574
575  if (op == MAXLOC || op == MINLOC) {
576    PyErr_SetString(PyExc_RuntimeError,
577                    "mpiext.c (allreduce_array): MAXLOC and MINLOC are not implemented");
578    return NULL;
579  }
580  else {
581    // call the MPI routine
582    error =  MPI_Allreduce(x->data, d->data, count, mpi_type, mpi_op, \
583                        MPI_COMM_WORLD);
584  }
585
586  if (error != 0) {
587    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
588    sprintf(errmsg, "Proc %d: MPI_Allreduce failed with error code %d\n",
589            myid, error);
590    PyErr_SetString(PyExc_RuntimeError, errmsg);
591    return NULL;
592  }
593*/
594
595
596  Py_INCREF(Py_None);
597  return (Py_None);
598}
599
600
601 
602/**********************************/
603/* Method table for python module */
604/**********************************/
605static struct PyMethodDef MethodTable[] = {
606  {"isend_array", isend_array, METH_VARARGS},
607  {"ireceive_array", ireceive_array, METH_VARARGS},
608  {"allreduce_array", allreduce_array, METH_VARARGS},
609  {"sendrecv_array", sendrecv_array, METH_VARARGS},
610  {"send_recv_via_dicts", send_recv_via_dicts, METH_VARARGS},
611  {NULL, NULL}
612};
613
614
615/***************************/
616/* Module initialisation   */
617/***************************/
618void initmpiextras(void){
619  PyObject *m, *ModDict;
620 
621  m = Py_InitModule("mpiextras", MethodTable);
622 
623  /* to handle MPI symbolic constants*/
624  ModDict = PyModule_GetDict(m); 
625  SetDictInt("MPI_ANY_TAG", MPI_ANY_TAG);
626  SetDictInt("MPI_TAG_UB", MPI_TAG_UB); 
627  SetDictInt("MPI_ANY_SOURCE", MPI_ANY_SOURCE);
628  SetDictInt("MAX", MAX);
629  SetDictInt("MIN", MIN);
630  SetDictInt("SUM", SUM);
631  SetDictInt("PROD", PROD);
632  SetDictInt("LAND", LAND);
633  SetDictInt("BAND", BAND);
634  SetDictInt("LOR", LOR);
635  SetDictInt("BOR", BOR);
636  SetDictInt("LXOR", LXOR);
637  SetDictInt("BXOR", BXOR);
638
639   
640  import_array();     /* Necessary for handling of numpy structures  */
641}
Note: See TracBrowser for help on using the repository browser.