Changeset 123 for pypar/pypar.py
- Timestamp:
- Jul 11, 2005, 3:20:03 PM (19 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pypar/pypar.py
r85 r123 31 31 size() -- Number of processors 32 32 rank() -- Id of current processor 33 Get_processor_name() -- Return host name of current node33 get_processor_name() -- Return host name of current node 34 34 35 35 send() -- Blocking send (all types) 36 36 receive() -- Blocking receive (all types) 37 raw_send() -- Blocking send (Numeric arrays and strings) 38 raw_receive() -- Blocking receive (Numeric arrays and strings) 39 bcast() -- Broadcast 40 Wtime() -- MPI wall time 41 Barrier() -- Synchronisation point. Makes processors wait until all processors 37 broadcast() -- Broadcast 38 time() -- MPI wall time 39 barrier() -- Synchronisation point. Makes processors wait until all processors 42 40 have reached this point. 43 Abort() -- Terminate all processes.44 Finalize() -- Cleanup MPI. No parallelism can take place after this point.41 abort() -- Terminate all processes. 42 finalize() -- Cleanup MPI. No parallelism can take place after this point. 45 43 46 44 … … 48 46 """ 49 47 50 __version__ = '1.8.2' 51 __date__ = '16 November 2003' 48 # Meta data 49 __version__ = '1.9.1' 50 __date__ = '15 December 2003' 52 51 __author__ = 'Ole M. Nielsen' 53 52 54 53 55 56 # -----------------------------------------------------------------------------57 # Options directory with default values - to be set by user58 #59 60 options = {61 'vanilla_bufsize': None, # Buffersize (chars) used for vanilla sends62 'verbose': 063 }64 65 54 # Constants 66 55 # 67 max_tag = 32767 # Max tag value . MPI_TAG_UB returned 0 so I found this value (OK in Linux/LAM)56 max_tag = 32767 # Max tag value (MPI_TAG_UB didn't work and returned 0) 68 57 control_tag = 13849 # Reserved tag used to identify control information 69 58 default_tag = 1 # Tag used as default if not specified 70 59 71 72 control_sep = ':' # Separator for fields in control info (NOT ',') 73 control_data_max_size = 256 # Maximal size of string holding control data 74 75 76 import os, sys 77 78 #------------------------------------------------------------------------ 79 # MPI Status block (returned by receive and raw_receive if specified with 80 # parameter return_status=True 81 #------------------------------------------------------------------------ 82 83 class Status: 84 def __init__(self, status_tuple): 85 self.source = status_tuple[0] #Id of sender 86 self.tag = status_tuple[1] #Tag of received message 87 self.error = status_tuple[2] #Error code 88 self.length = status_tuple[3] #Number of elements transmitted 89 self.size = status_tuple[4] #Size of one element 90 91 def __repr__(self): 92 return 'Pypar Status Object:\n source=%d\n tag=%d\n '+\ 93 'error=%d\n length=%d\n size=%d\n'\ 94 %(self.source, self.tag, self.error, self.length, self.size) 95 96 def bytes(self): 97 """Number of bytes transmitted (excl control info) 98 """ 99 return self.length * self.size 100 60 control_sep = ':' # Separator for fields in control info (NOT ',') 61 control_data_max_size = 64 # Maximal size of string holding control data 62 101 63 102 64 #--------------------------------------------------------------------------- … … 104 66 #-------------------------------------------------------------------------- 105 67 106 107 def raw_send(x, destination, tag=default_tag, vanilla=0): 108 """Wrapper for raw MPI send. 109 Send x to destination with tag. 110 111 Automatically determine appropriate protocol 112 and call corresponding send function. 113 114 The variable x can be any (picklable) type, but 115 Numeric variables and text strings will most efficient. 116 Setting vanilla = 1 forces vanilla mode for any type. 117 118 """ 119 120 assert tag != control_tag, 'Tag %d is reserved by pypar - please use another.' %control_tag 121 122 protocol = get_control_info(x, vanilla)[0] 123 if protocol == 'array': 124 send_array(x, destination, tag) 125 elif protocol == 'string': 126 send_string(x, destination, tag) 127 else: 128 send_vanilla(x, destination, tag) 129 130 131 def raw_receive(x, source, tag=default_tag, vanilla=0, return_status=0): 132 """Wrapper for raw MPI receive. 133 Receive something of same size as x from source with tag. 134 135 Automatically determine appropriate protocol 136 and call corresponding receive function. 137 138 The variable x can be any (picklable) type, but 139 Numeric variables and text strings will most efficient. 140 Setting vanilla = 1 forces vanilla mode for any type. 141 """ 142 143 assert tag != control_tag, 'Tag %d is reserved by pypar - please use another.' %control_tag 144 145 protocol = get_control_info(x, vanilla)[0] 146 if protocol == 'array': 147 err, stat = receive_array(x, source, tag) 148 if err: 149 raise 'receive_array failed with error code %d' %err 150 elif protocol == 'string': 151 err, stat = receive_string(x, source, tag) 152 if err: 153 raise 'receive_string failed with error code %d' %err 154 155 x = x[:stat[3]] #Trim buffer to actual received length 156 else: 157 x, stat = receive_vanilla(x, source, tag) 158 159 160 if return_status: 161 return x, Status(stat) 162 else: 163 return x 164 165 166 def send(x, destination, tag=default_tag, vanilla=0): 167 """Wrapper for easy MPI send. 168 Send x to destination with tag. 169 170 Automatically determine appropriate protocol 171 and call corresponding send function. 172 Also passes type and size information on as preceding message to 173 simplify the receive call. 174 175 The variable x can be any (picklable) type, but 176 Numeric variables and text strings will most efficient. 177 Setting vanilla = 1 forces vanilla mode for any type. 178 179 """ 180 import string 181 182 assert tag != control_tag, 'Tag %d is reserved by pypar - please use another.' %control_tag 183 184 control_info = get_control_info(x, vanilla) 185 protocol = control_info[0] 186 187 if protocol == 'array': 188 send_control_info(control_info, destination) 189 190 send_array(x, destination, tag) 191 elif protocol == 'string': 192 send_control_info(control_info, destination) 193 194 send_string(x, destination, tag) 195 elif protocol == 'vanilla': 196 from cPickle import dumps 197 s = dumps(x, 1) 198 control_info[2] = str(len(s)) 199 200 send_control_info(control_info, destination) 201 202 send_string(s, destination, tag) 203 else: 204 raise "Unknown values for protocol: %s" %protocol 205 206 207 def receive(source, tag=default_tag, return_status=0): 208 """Wrapper for easy MPI receive. 209 Receive data from source with tag. 210 211 Assumes preceding message containing protocol, type, size. 212 Create appropriate buffer and receive data. 213 """ 214 import types 215 216 assert type(source) == types.IntType, 'Source must be a scalar: %s' %source 217 assert tag != control_tag, 'Tag %d is reserved by pypar - please use another.' %control_tag 218 219 220 control_info = receive_control_info(source) 221 222 protocol = control_info[0] 223 typecode = control_info[1] 224 size = control_info[2] 225 shape = control_info[3] 226 227 228 if protocol == 'array': 229 import Numeric 230 x = Numeric.zeros(size,typecode) 231 x = Numeric.reshape(x, shape) 232 err, stat = receive_array(x, source, tag) 233 if err: 234 raise 'receive_array failed with error code %d' %err 235 elif protocol == 'string': 236 x = ' '*size 237 err, stat = receive_string(x, source, tag) 238 if err: 239 raise 'receive_string failed with error code %d' %err 240 241 x = x[:stat[3]] #Trim buffer to actual received length 242 elif protocol == 'vanilla': 243 from cPickle import loads 244 s = ' '*size 245 err, stat = receive_string(s, source, tag) 246 if err: 247 raise 'receive_string failed with error code %d' %err 248 249 x = loads(s) 250 else: 251 raise "Unknown values for protocol: %s" %protocol 252 253 254 if return_status: 255 return x, Status(stat) 256 else: 257 return x 258 259 260 def bcast(x, source, vanilla=0): 261 """Wrapper for MPI bcast. 262 Broadcast x from source. 263 264 Automatically determine appropriate protocol 265 and call corresponding send function. 266 267 The variable x can be any (picklable) type, but 268 Numeric variables and text strings will most efficient. 269 Setting vanilla = 1 forces vanilla mode for any type. 270 271 """ 272 273 protocol = get_control_info(x, vanilla)[0] 274 if protocol == 'array': 275 bcast_array(x, source) 276 elif protocol == 'string': 277 bcast_string(x, source) 278 elif protocol == 'vanilla': 279 from cPickle import loads, dumps 280 s = dumps(x, 1) 281 s = s + ' '*int(0.1*len(s)) #safety 282 bcast_string(s, source) 283 x = loads(s) 284 else: 285 raise "Unknown values for protocol: %s" %protocol 286 287 return x 288 289 def raw_scatter(x, buffer, source, vanilla=0): 290 """Wrapper for MPI scatter. 291 Scatter the first nums elements in x to buffer 292 (of size given by nums) from source. 293 294 Automatically determine appropriate protocol 295 and call corresponding send function. 296 297 The variable x can be any (picklable) type, but 298 Numeric variables and text strings will most efficient. 299 Setting vanilla = 1 forces vanilla mode for any type. 300 301 Return buffer 302 """ 303 304 protocol = get_control_info(x, vanilla)[0] 305 if protocol == 'array': 306 scatter_array(x, buffer, source) 307 elif protocol == 'string': 308 scatter_string(x, buffer, source) 309 elif protocol == 'vanilla': 310 raise "Protocol: %s unsupported for scatter" %protocol 311 else: 312 raise "Unknown values for protocol: %s" %protocol 313 314 return buffer 315 316 317 def scatter(x, source, vanilla=0): 318 """Wrapper for easy MPI Scatter receive. 319 Receive data from source with tag. 320 321 Create appropriate buffer and receive data. 322 323 Return scattered result (same type as x) 324 """ 325 326 control_info = get_control_info(x) 327 328 protocol = control_info[0] 329 typecode = control_info[1] 330 size = control_info[2] 331 shape = control_info[3] 332 333 334 if protocol == 'array': 335 import Numeric 336 buffer = Numeric.zeros(size, typecode) 337 buffer = Numeric.reshape(buffer, shape) 338 339 scatter_array(x, buffer, source) 340 elif protocol == 'string': 341 buffer = ' '*size 342 scatter_string(x, buffer, source) 343 elif protocol == 'vanilla': 344 raise "Protocol: %s unsupported for scatter" %protocol 345 else: 346 raise "Unknown values for protocol: %s" %protocol 347 348 return buffer 349 350 351 def raw_gather(x, buffer, source, vanilla=0): 352 """Wrapper for MPI gather. 353 Gather first nums elements in x to buffer (of size given by nums) 354 from source. 355 356 Automatically determine appropriate protocol 357 and call corresponding send function. 358 359 The variable x can be any (picklable) type, but 360 Numeric variables and text strings will most efficient. 361 Setting vanilla = 1 forces vanilla mode for any type. 362 363 """ 364 365 protocol = get_control_info(x, vanilla)[0] 366 if protocol == 'array': 367 gather_array(x, buffer, source) 368 elif protocol == 'string': 369 gather_string(x, buffer, source) 370 elif protocol == 'vanilla': 371 raise "Protocol: %s unsupported for gather" %protocol 372 else: 373 raise "Unknown values for protocol: %s" %protocol 374 375 return buffer 376 377 378 def gather(x, source, vanilla=0): 379 """Wrapper for easy MPI Gather receive. 380 Receive data from source with tag. 381 382 Create appropriate buffer and receive data. 383 """ 384 from mpiext import size 385 numproc = size() 386 387 control_info = get_control_info(x) 388 389 protocol = control_info[0] 390 typecode = control_info[1] 391 size = control_info[2]*numproc 392 shape = control_info[3] 393 394 if protocol == 'array': 395 import Numeric 396 buffer = Numeric.zeros(size,typecode) 397 398 # Modify shape along axis=0 to match size 399 shape = list(shape) 400 shape[0] *= numproc 401 402 buffer = Numeric.reshape(buffer, shape) 403 404 gather_array(x, buffer, source) 405 elif protocol == 'string': 406 buffer = ' '*size 407 gather_string(x, buffer, source) 408 elif protocol == 'vanilla': 409 raise "Protocol: %s unsupported for gather" %protocol 410 else: 411 raise "Unknown values for protocol: %s" %protocol 412 413 return buffer 414 415 416 417 def raw_reduce(x, buffer, op, source, vanilla=0): 418 """Wrapper for MPI_Reduce. 419 Reduce nums elements in x to buffer (of the same size as x) 420 at source applying operation op elementwise. 421 422 Automatically determine appropriate protocol 423 and call corresponding send function. 424 425 """ 426 427 protocol = get_control_info(x, vanilla)[0] 428 if protocol == 'array': 429 if not x.typecode() == buffer.typecode(): 430 raise "Input array and buffer must have the same typecode" 431 reduce_array(x, buffer, op, source) 432 elif (protocol == 'vanilla' or protocol == 'string'): 433 raise "Protocol: %s unsupported for reduce" %protocol 434 else: 435 raise "Unknown values for protocol: %s" %protocol 436 437 return buffer 438 439 440 def reduce(x, nums, op, source, vanilla=0): 441 """Wrapper for easy MPI Gather receive. 442 Receive data from source with tag. 443 444 Create appropriate buffer and receive data. 445 """ 446 447 control_info = get_control_info(x) 448 449 protocol = control_info[0] 450 typecode = control_info[1] 451 s_size = nums 452 453 if protocol == 'array': 454 import Numeric 455 buffer = Numeric.zeros(s_size * size(),typecode) 456 reduce_array(x, buffer, s_size, op, source) 457 elif (protocol == 'vanilla' or protocol == 'string'): 458 raise "Protocol: %s unsupported for reduce" %protocol 459 else: 460 raise "Unknown values for protocol: %s" %protocol 461 462 return buffer 463 464 465 68 def send(x, destination, use_buffer=False, vanilla=False, 69 tag=default_tag, bypass=False): 70 """Wrapper for easy MPI send. 71 Send x to destination. 72 73 Automatically determine appropriate protocol 74 and call corresponding send function. 75 Also passes type and size information on as preceding message to 76 simplify the receive call. 77 78 The variable x can be any (picklable) type, but 79 Numeric variables and text strings will most efficient. 80 Setting vanilla = 1 forces vanilla mode for any type. 81 82 If bypass is True, all admin and error checks 83 get bypassed to reduce the latency. Should only 84 be used for sending Numeric arrays and should be matched 85 with a bypass in the corresponding receive command. 86 87 """ 88 import types, string 89 90 if bypass: 91 send_array(x, destination, tag) 92 return 93 94 #Input check 95 errmsg = 'Destination id (%s) must be an integer.' %destination 96 assert type(destination) == types.IntType, errmsg 97 98 errmsg = 'Tag %d is reserved by pypar - please use another.' %control_tag 99 assert tag != control_tag, errmsg 100 101 #Create metadata about object to be sent 102 control_info, x = create_control_info(x, vanilla, return_object=True) 103 protocol = control_info[0] 104 105 106 #Possibly transmit control data 107 if use_buffer is False: 108 send_control_info(control_info, destination) 109 110 111 #Transmit payload data 112 if protocol == 'array': 113 send_array(x, destination, tag) 114 elif protocol in ['string', 'vanilla']: 115 send_string(x, destination, tag) 116 else: 117 raise 'Unknown protocol: %s' %protocol 118 119 120 def receive(source, buffer=None, vanilla=False, tag=default_tag, 121 return_status=False, bypass=False): 122 """receive - blocking MPI receive 123 124 Receive data from source. 125 126 Optional parameters: 127 buffer: Use specified buffer for received data (faster). Default None. 128 vanilla: Specify to enforce vanilla protocol for any type. Default False 129 tag: Only received messages tagged as specified. Default default_tag 130 return_status: Return Status object along with result. Default False. 131 132 If no buffer is specified, receive will try to receive a 133 preceding message containing protocol, type, size and shape and 134 then create a suitable buffer. 135 136 If buffer is specified the corresponding send must specify 137 use_buffer = True. 138 The variable buffer can be any (picklable) type, but 139 Numeric variables and text strings will most efficient. 140 141 Appropriate protocol will be automatically determined 142 and corresponding receive function called. 143 144 145 If bypass is True, all admin and error checks 146 get bypassed to reduce the latency. Should only 147 be used for receiving Numerical arrays and should 148 be matched with a bypass in the corresponding send command. 149 Also buffer must be specified. 150 """ 151 152 if bypass: 153 #errmsg = 'bypass mode must be used with specified buffer' 154 #assert buffer is not None, msg 155 stat = receive_array(buffer, source, tag) 156 else: 157 158 import types 159 160 #Input check 161 errmsg = 'Source id (%s) must be an integer.' %source 162 assert type(source) == types.IntType, errmsg 163 164 errmsg = 'Tag %d is reserved by pypar - please use another.' %control_tag 165 assert tag != control_tag, errmsg 166 167 168 #Either receive or create metadata about objetc to receive 169 if buffer is None: 170 protocol, typecode, size, shape = receive_control_info(source) 171 else: 172 protocol, typecode, size, shape = create_control_info(buffer, vanilla) 173 174 175 #Receive payload data 176 if protocol == 'array': 177 if buffer is None: 178 import Numeric 179 buffer = Numeric.zeros(size,typecode) 180 buffer = Numeric.reshape(buffer, shape) 181 182 stat = receive_array(buffer, source, tag) 183 184 elif protocol == 'string': 185 if buffer is None: 186 buffer = ' '*size 187 188 stat = receive_string(buffer, source, tag) 189 190 elif protocol == 'vanilla': 191 from cPickle import dumps, loads 192 if buffer is None: 193 s = ' '*size 194 else: 195 s = dumps(buffer, 1) 196 s = s + ' '*int(0.1*len(s)) #safety 197 198 stat = receive_string(s, source, tag) 199 buffer = loads(s) #Replace buffer with received result 200 else: 201 raise 'Unknown protocol: %s' %protocol 202 203 # Return received data and possibly the status object 204 if return_status: 205 return buffer, Status(stat) 206 else: 207 return buffer 208 209 210 def broadcast(buffer, root, vanilla=False): 211 """Wrapper for MPI bcast. 212 213 Broadcast buffer from the process with rank root to all other processes. 214 215 216 Automatically determine appropriate protocol 217 and call corresponding send function. 218 219 The variable buffer can be any (picklable) type, but 220 Numeric variables and text strings will most efficient. 221 Setting vanilla = 1 forces vanilla mode for any type. 222 223 """ 224 225 import types 226 227 #Input check 228 errmsg = 'Root id (%s) must be an integer.' %root 229 assert type(root) == types.IntType, errmsg 230 231 232 #Create metadata about object to be sent 233 protocol = create_control_info(buffer, vanilla)[0] 234 235 236 #Broadcast 237 if protocol == 'array': 238 broadcast_array(buffer, root) 239 elif protocol == 'string': 240 broadcast_string(buffer, root) 241 elif protocol == 'vanilla': 242 from cPickle import loads, dumps 243 s = dumps(buffer, 1) 244 s = s + ' '*int(0.1*len(s)) #safety 245 246 broadcast_string(s, root) 247 buffer = loads(s) 248 else: 249 raise 'Unknown protocol: %s' %protocol 250 251 return buffer 252 253 254 def scatter(x, root, buffer=None, vanilla=False): 255 """Sends data x from process with rank root to all other processes. 256 257 Create appropriate buffer and receive data. 258 Return scattered result (same type as x) 259 260 Scatter makes only sense for arrays or strings 261 """ 262 263 import types 264 from mpiext import size 265 numproc = size() #Needed to determine buffer size 266 267 #Input check 268 errmsg = 'Root id (%s) must be an integer.' %root 269 assert type(root) == types.IntType, errmsg 270 271 272 #Create metadata about object to be sent 273 protocol, typecode, size, shape = create_control_info(x) 274 275 #Scatter 276 if protocol == 'array': 277 if buffer is None: 278 import Numeric 279 280 # Modify shape along axis=0 to match size 281 shape = list(shape) 282 shape[0] /= numproc 283 count = Numeric.product(shape) 284 285 buffer = Numeric.zeros(count, typecode) 286 buffer = Numeric.reshape(buffer, shape) 287 288 scatter_array(x, buffer, root) 289 elif protocol == 'string': 290 if buffer is None: 291 buffer = ' '*(size/numproc) 292 293 scatter_string(x, buffer, root) 294 elif protocol == 'vanilla': 295 errmsg = 'Scatter is only supported for Numeric arrays and strings.\n' 296 errmsg += 'If you wish to distribute a general sequence, ' 297 errmsg += 'please use send and receive commands or broadcast.' 298 raise errmsg 299 else: 300 raise 'Unknown protocol: %s' %protocol 301 302 return buffer 303 304 305 def gather(x, root, buffer=None, vanilla=0): 306 """Gather values from all processes to root 307 308 Create appropriate buffer and receive data. 309 310 Gather only makes sens for arrays or strings 311 """ 312 313 import types 314 from mpiext import size 315 numproc = size() #Needed to determine buffer size 316 317 #Input check 318 errmsg = 'Root id (%s) must be an integer.' %root 319 assert type(root) == types.IntType, errmsg 320 321 #Create metadata about object to be gathered 322 protocol, typecode, size, shape = create_control_info(x) 323 324 #Gather 325 if protocol == 'array': 326 if buffer is None: 327 import Numeric 328 buffer = Numeric.zeros(size*numproc, typecode) 329 330 # Modify shape along axis=0 to match size 331 shape = list(shape) 332 shape[0] *= numproc 333 buffer = Numeric.reshape(buffer, shape) 334 335 gather_array(x, buffer, root) 336 elif protocol == 'string': 337 if buffer is None: 338 buffer = ' '*size*numproc 339 340 gather_string(x, buffer, root) 341 elif protocol == 'vanilla': 342 errmsg = 'Gather is only supported for Numeric arrays and strings.\n' 343 errmsg += 'If you wish to distribute a general sequence, ' 344 errmsg += 'please use send and receive commands or broadcast.' 345 raise errmsg 346 else: 347 raise 'Unknown protocol: %s' %protocol 348 349 return buffer 350 351 352 def reduce(x, op, root, buffer=None, vanilla=0): 353 """Reduce elements in x to buffer (of the same size as x) 354 at root applying operation op elementwise. 355 """ 356 357 import types 358 359 from mpiext import size 360 numproc = size() #Needed to determine buffer size 361 362 363 #Input check 364 errmsg = 'Root id (%s) must be an integer.' %root 365 assert type(root) == types.IntType, errmsg 366 367 #Create metadata about object 368 protocol, typecode, size, shape = create_control_info(x) 369 370 371 #Reduce 372 if protocol == 'array': 373 if buffer is None: 374 import Numeric 375 buffer = Numeric.zeros(size*numproc, typecode) 376 377 # Modify shape along axis=0 to match size 378 shape = list(shape) 379 shape[0] *= numproc 380 buffer = Numeric.reshape(buffer, shape) 381 382 reduce_array(x, buffer, op, root) 383 elif (protocol == 'vanilla' or protocol == 'string'): 384 raise 'Protocol: %s unsupported for reduce' %protocol 385 else: 386 raise 'Unknown protocol: %s' %protocol 387 388 return buffer 389 466 390 467 391 #--------------------------------------------------------- 468 392 # AUXILIARY FUNCTIONS 469 393 #--------------------------------------------------------- 470 def get_control_info(x, vanilla=0): 471 """Determine which protocol to use for communication: 472 (Numeric) arrays, strings, or vanilla based x's type. 473 474 There are three protocols: 475 'array': Numeric arrays of type 'i', 'l', 'f', or 'd' can be communicated 476 with mpiext.send_array and mpiext.receive_array. 477 'string': Text strings can be communicated with mpiext.send_string and 478 mpiext.receive_string. 479 'vanilla': All other types can be communicated using the scripts send_vanilla and 480 receive_vanilla provided that the objects can be serialised using 481 pickle (or cPickle). The latter mode is less efficient than the 482 first two but it can handle complex structures. 483 484 Rules: 485 If keyword argument vanilla == 1, vanilla is chosen regardless of 486 x's type. 487 Otherwise if x is a string, the string protocol is chosen 488 If x is an array, the 'array' protocol is chosen provided that x has one 489 of the admissible typecodes. 490 """ 491 492 import types 493 494 495 protocol = 'vanilla' 496 typecode = ' ' 497 size = '0' 498 shape = () 499 500 if not vanilla: 501 if type(x) == types.StringType: 502 protocol = 'string' 503 typecode = 'c' 504 size = len(x) 505 elif type(x).__name__ == 'array': #Don't use Numeric as it isn't imported yet 506 try: 507 import Numeric 508 except: 509 print "WARNING (pypar.py): Numeric module could not be imported,", 510 print "reverting to vanilla mode" 511 protocol = 'vanilla' 512 else: 513 typecode = x.typecode() 514 if typecode in ['i', 'l', 'f', 'd', 'F', 'D']: 515 protocol = 'array' 516 shape = x.shape 517 size = Numeric.product(shape) 518 else: 519 print "WARNING (pypar.py): Numeric object type %s is not supported."\ 520 %(x.typecode()) 521 print "Only types 'i', 'l', 'f', 'd', 'F', 'D' are supported,", 522 print "Reverting to vanilla mode." 523 protocol = 'vanilla' 524 525 control_info = [protocol, typecode, size, shape] 526 527 return control_info 394 def balance(N, P, p): 395 """Compute p'th interval when N is distributed over P bins. 396 """ 397 398 from math import floor 399 400 L = int(floor(float(N)/P)) 401 K = N - P*L 402 if p < K: 403 Nlo = p*L + p 404 Nhi = Nlo + L + 1 405 else: 406 Nlo = p*L + K 407 Nhi = Nlo + L 408 409 return Nlo, Nhi 410 411 412 # Obsolete functions 413 # (for backwards compatibility - remove in version 2.0) 414 415 def raw_send(x, destination, tag=default_tag, vanilla=0): 416 send(x, destination, use_buffer=True, tag=tag, vanilla=vanilla) 417 418 419 def raw_receive(x, source, tag=default_tag, vanilla=0, return_status=0): 420 x = receive(source, tag=tag, vanilla=vanilla, 421 return_status=return_status, buffer=x) 422 return x 423 424 def raw_scatter(x, buffer, source, vanilla=0): 425 scatter(x, source, buffer=buffer, vanilla=vanilla) 426 427 def raw_gather(x, buffer, source, vanilla=0): 428 gather(x, source, buffer=buffer, vanilla=0) 429 430 def raw_reduce(x, buffer, op, source, vanilla=0): 431 reduce(x, op, source, buffer=buffer, vanilla=0) 432 433 def bcast(buffer, root, vanilla=False): 434 return broadcast(buffer, root, vanilla) 435 436 def Wtime(): 437 return time() 438 439 def Get_processor_name(): 440 return get_processor_name() 441 442 def Initialized(): 443 return initialized() 444 445 def Finalize(): 446 finalize() 447 448 def Abort(): 449 abort() 450 451 def Barrier(): 452 barrier() 453 454 455 456 #--------------------------------------------------------- 457 # INTERNAL FUNCTIONS 458 #--------------------------------------------------------- 459 460 class Status: 461 """ MPI Status block returned by receive if 462 specified with parameter return_status=True 463 """ 464 465 def __init__(self, status_tuple): 466 self.source = status_tuple[0] #Id of sender 467 self.tag = status_tuple[1] #Tag of received message 468 self.error = status_tuple[2] #MPI Error code 469 self.length = status_tuple[3] #Number of elements transmitted 470 self.size = status_tuple[4] #Size of one element 471 472 def __repr__(self): 473 return 'Pypar Status Object:\n source=%d\n tag=%d\n '+\ 474 'error=%d\n length=%d\n size=%d\n'\ 475 %(self.source, self.tag, self.error, self.length, self.size) 476 477 def bytes(self): 478 """Number of bytes transmitted (excl control info) 479 """ 480 return self.length * self.size 481 482 483 484 def create_control_info(x, vanilla=0, return_object=False): 485 """Determine which protocol to use for communication: 486 (Numeric) arrays, strings, or vanilla based x's type. 487 488 There are three protocols: 489 'array': Numeric arrays of type 'i', 'l', 'f', 'd', 'F' or 'D' can be 490 communicated with mpiext.send_array and mpiext.receive_array. 491 'string': Text strings can be communicated with mpiext.send_string and 492 mpiext.receive_string. 493 'vanilla': All other types can be communicated as string representations 494 provided that the objects 495 can be serialised using pickle (or cPickle). 496 The latter mode is less efficient than the 497 first two but it can handle general structures. 498 499 Rules: 500 If keyword argument vanilla == 1, vanilla is chosen regardless of 501 x's type. 502 Otherwise if x is a string, the string protocol is chosen 503 If x is an array, the 'array' protocol is chosen provided that x has one 504 of the admissible typecodes. 505 506 The optional argument return_object asks to return object as well. 507 This is useful in case it gets modified as in the case of general structures 508 using the vanilla protocol. 509 """ 510 511 import types 512 513 #Default values 514 protocol = 'vanilla' 515 typecode = ' ' 516 size = 0 517 shape = () 518 519 #Determine protocol in case 520 if not vanilla: 521 if type(x) == types.StringType: 522 protocol = 'string' 523 typecode = 'c' 524 size = len(x) 525 elif type(x).__name__ == 'array': #Numeric isn't imported yet 526 try: 527 import Numeric 528 except: 529 print "WARNING (pypar.py): Numeric module could not be imported,", 530 print "reverting to vanilla mode" 531 protocol = 'vanilla' 532 else: 533 typecode = x.typecode() 534 if typecode in ['i', 'l', 'f', 'd', 'F', 'D']: 535 protocol = 'array' 536 shape = x.shape 537 size = Numeric.product(shape) 538 else: 539 print "WARNING (pypar.py): Numeric object type %s is not supported."\ 540 %(x.typecode()) 541 print "Only types 'i', 'l', 'f', 'd', 'F', 'D' are supported,", 542 print "Reverting to vanilla mode." 543 protocol = 'vanilla' 544 545 #Pickle general structures using the vanilla protocol 546 if protocol == 'vanilla': 547 from cPickle import dumps 548 x = dumps(x, 1) 549 size = len(x) # Let count be length of pickled object 550 551 #Return 552 if return_object: 553 return [protocol, typecode, size, shape], x 554 else: 555 return [protocol, typecode, size, shape] 528 556 529 557 … … 532 560 533 561 def send_control_info(control_info, destination): 534 """Send control info to destination 535 """ 536 import string 537 538 #Convert to strings 539 control_info = [str(c) for c in control_info] 540 541 msg = string.join(control_info,control_sep) 542 send_string(msg, destination, control_tag) 562 """Send control info to destination 563 """ 564 import string 565 566 #Convert to strings 567 control_info = [str(c) for c in control_info] 568 569 control_msg = string.join(control_info,control_sep) 570 if len(control_msg) > control_data_max_size: 571 errmsg = 'Length of control_info exceeds specified maximium (%d)'\ 572 %control_data_max_size 573 errmsg += ' - Please increase it (in pypar.py)' 574 raise errmsg 575 576 send_string(control_msg, destination, control_tag) 543 577 544 578 545 579 def receive_control_info(source): 546 """Receive control info from source 547 """ 548 import string 549 550 msg = ' '*control_data_max_size 551 552 err, stat = receive_string(msg, source, control_tag) 553 if err: 554 raise Exception 555 #No need to create status object here - it is reserved 556 #for genuine communications only 557 558 msg = msg[:stat[3]] #Trim buffer to actual received length 559 560 control_info = msg.split(control_sep) 561 562 assert len(control_info) == 4, 'len(control_info) = %d' %len(control_info) 563 control_info[2] = eval(control_info[2]) #Convert back to int 564 control_info[3] = eval(control_info[3]) #Convert back to tuple 565 566 567 return control_info 568 569 570 571 572 # Auxiliary functions 573 # used only by raw communication 574 # 575 def send_vanilla(x, destination, tag=default_tag): 576 from cPickle import dumps 577 from mpiext import send_string 578 579 s=dumps(x, 1) 580 send_string(s, destination, tag) 581 return len(s) 582 583 584 def receive_vanilla(x, source, tag=default_tag): 585 from cPickle import loads, dumps 586 from mpiext import receive_string 587 588 589 #Create buffer of the right size 590 #(assuming that x is similar to sent x). 591 592 if options['vanilla_bufsize']: 593 s = ' '*options['vanilla_bufsize'] 594 else: 595 s = dumps(x, 1) 596 s = s + ' '*int(0.1*len(s)) #safety 597 598 599 err, stat = receive_string(s, source, tag) 600 if err: 601 raise 'receive_string failed with error code %d' %err 602 603 x = loads(s) 604 605 return x, stat 606 580 """Receive control info from source 581 """ 582 import string 583 584 msg = ' '*control_data_max_size 585 586 stat = receive_string(msg, source, control_tag) 587 #No need to create status object here - it is reserved 588 #for payload communications only 589 590 msg = msg[:stat[3]] #Trim buffer to actual received length (needed?) 591 592 control_info = msg.split(control_sep) 593 594 assert len(control_info) == 4, 'len(control_info) = %d' %len(control_info) 595 control_info[2] = eval(control_info[2]) #Convert back to int 596 control_info[3] = eval(control_info[3]) #Convert back to tuple 597 598 599 return control_info 600 607 601 608 602 #---------------------------------------------------------------------------- … … 612 606 613 607 # Take care of situation where module is part of package 614 import os, string, os.path608 import sys, os, string, os.path 615 609 dirname = os.path.dirname(string.replace(__name__,'.',os.sep)).strip() 616 610 … … 623 617 624 618 625 626 619 # Import MPI extension 627 620 # … … 629 622 630 623 try: 631 import mpiext624 import mpiext 632 625 except: 633 errmsg = 'ERROR: C extension mpiext could not be imported.\n'634 errmsg += 'Please compile mpiext.c e.g. by running\n'635 errmsg += ' python install.py\n'636 errmsg += 'in the pypar directory, or by using\n'637 errmsg += ' python setup.py install\n'638 raise Exception, errmsg626 errmsg = 'ERROR: C extension mpiext could not be imported.\n' 627 errmsg += 'Please compile mpiext.c e.g. by running\n' 628 errmsg += ' python install.py\n' 629 errmsg += 'in the pypar directory, or by using\n' 630 errmsg += ' python setup.py install\n' 631 raise Exception, errmsg 639 632 640 633 … … 643 636 644 637 if sys.platform in ['linux2', 'sunos5', 'win32', 'darwin']: 645 #Linux (LAM,MPICH) or Sun (MPICH)646 error = 0 #Sequential execution of MPI is allowed638 #Linux (LAM,MPICH) or Sun (MPICH) 639 error = 0 #Sequential execution of MPI is allowed 647 640 else: 648 #Platform: Alpha 'osf1V5' 649 cmdstring = '"import mpiext, sys; mpiext.init(sys.argv); mpiext.Finalize()"' 650 #s = 'cd %s; python -c %s' %(dirname, cmdstring) 651 s = 'python -c %s >/dev/null 2>/dev/null' %cmdstring 652 error = os.system(s) 653 654 655 # The check is performed in a separate shell. 656 # Reason: The Alpha server, LAM/Linux or the Sun cannot recover from a 657 # try: 658 # mpiext.init(sys.argv) 659 660 # However, on LAM/Linux, this test causes system to hang. 661 # Verified (OMN 12/12/2) 662 # If lamboot is started, the system, will hang when init is called 663 # again further down in this file. 664 # If lamboot is not loaded error will be nozero as it should. 665 # I don't know how to deal with this 666 # 667 #Comparisons of two strategies using LAM 668 # 669 # Strategy 1: Assume seq execution is OK (i.e. set error = 0) 670 # Strategy 2: Try to test if mpi can be initialised (in a separate shell) 671 # 672 # 673 # Strategy 1 (currently used) 674 # | Lam booted | Lam not booted 675 #----------------------------------------------------- 676 # 677 # Sequential exec | OK | Not OK 678 # Parallel exec | OK | Not OK 679 # 680 # 681 # Strategy 2 682 # | Lam booted | Lam not booted 683 #----------------------------------------------------- 684 # 685 # Sequential exec | Hangs | Not OK 686 # Parallel exec | Hangs | OK 687 # 641 #Platform: Alpha 'osf1V5' 642 cmdstring = '"import mpiext, sys; mpiext.init(sys.argv); mpiext.Finalize()"' 643 #s = 'cd %s; python -c %s' %(dirname, cmdstring) 644 s = 'python -c %s >/dev/null 2>/dev/null' %cmdstring 645 error = os.system(s) 646 647 # The check is performed in a separate shell. 648 # Reason: The Alpha server, LAM/Linux or the Sun cannot recover from a 649 # try: 650 # mpiext.init(sys.argv) 651 652 # However, on LAM/Linux, this test causes system to hang. 653 # Verified (OMN 12/12/2) 654 # If lamboot is started, the system, will hang when init is called 655 # again further down in this file. 656 # If lamboot is not loaded error will be nozero as it should. 657 # I don't know how to deal with this 658 # 659 #Comparisons of two strategies using LAM 660 # 661 # Strategy 1: Assume seq execution is OK (i.e. set error = 0) 662 # Strategy 2: Try to test if mpi can be initialised (in a separate shell) 663 # 664 # 665 # Strategy 1 (currently used) 666 # | Lam booted | Lam not booted 667 #----------------------------------------------------- 668 # 669 # Sequential exec | OK | Not OK 670 # Parallel exec | OK | Not OK 671 # 672 # 673 # Strategy 2 674 # | Lam booted | Lam not booted 675 #----------------------------------------------------- 676 # 677 # Sequential exec | Hangs | Not OK 678 # Parallel exec | Hangs | OK 679 # 688 680 689 681 … … 696 688 697 689 if error: 698 print "WARNING: MPI library could not be initialised - running sequentially" 699 700 # Define rudimentary functions to keep sequential programs happy 701 702 def size(): return 1 703 def rank(): return 0 704 705 def Get_processor_name(): 706 import os 707 try: 708 hostname = os.environ['HOST'] 709 except: 710 try: 711 hostname = os.environ['HOSTNAME'] 712 except: 713 hostname = 'Unknown' 714 715 return hostname 716 717 718 def Abort(): 719 import sys 720 sys.exit() 721 722 def Finalize(): pass 723 724 def Barrier(): pass 725 726 def Wtime(): 727 import time 728 return time.time() 690 print "WARNING: MPI library could not be initialised - running sequentially" 691 692 # Define rudimentary functions to keep sequential programs happy 693 694 def size(): return 1 695 def rank(): return 0 696 697 def get_processor_name(): 698 import os 699 try: 700 hostname = os.environ['HOST'] 701 except: 702 try: 703 hostname = os.environ['HOSTNAME'] 704 except: 705 hostname = 'Unknown' 706 707 return hostname 708 709 def abort(): 710 import sys 711 sys.exit() 712 713 def finalize(): pass 714 715 def barrier(): pass 716 717 def time(): 718 import time 719 return time.time() 729 720 730 721 else: 731 722 732 from mpiext import size, rank, Barrier, Wtime, Get_processor_name,\ 733 init, Initialized, Finalize, Abort,\ 734 send_string, receive_string,\ 735 send_array, receive_array, bcast_string, bcast_array,\ 736 scatter_string, scatter_array,\ 737 gather_string, gather_array,\ 738 reduce_array,\ 739 MPI_ANY_TAG as any_tag, MPI_TAG_UB as max_tag, MPI_ANY_SOURCE as any_source,\ 740 MAX, MIN, SUM, PROD, LAND, BAND,\ 741 LOR, BOR, LXOR, BXOR 742 743 init(sys.argv) #Initialise MPI with cmd line (needed by MPICH/Linux) 744 745 if rank() == 0: 746 print "MPI initialised OK with %d processors" %size() 747 748 749 750 751 752 723 from mpiext import size, rank, barrier, time,\ 724 get_processor_name,\ 725 init, initialized, finalize, abort,\ 726 send_string, receive_string,\ 727 send_array, receive_array, broadcast_string, broadcast_array,\ 728 scatter_string, scatter_array,\ 729 gather_string, gather_array,\ 730 reduce_array,\ 731 MPI_ANY_TAG as any_tag, MPI_TAG_UB as max_tag,\ 732 MPI_ANY_SOURCE as any_source,\ 733 MAX, MIN, SUM, PROD, LAND, BAND,\ 734 LOR, BOR, LXOR, BXOR 735 736 init(sys.argv) #Initialise MPI with cmd line (needed by MPICH/Linux) 737 738 if rank() == 0: 739 print "Pypar (version %s) initialised MPI OK with %d processors" %(__version__, size()) 740 741 742 743 744 745
Note: See TracChangeset
for help on using the changeset viewer.