Changeset 8227
- Timestamp:
- Oct 14, 2011, 3:07:01 PM (14 years ago)
- Location:
- trunk/anuga_core/source/anuga_parallel
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/anuga_core/source/anuga_parallel/parallel_inlet.py
r8224 r8227 15 15 """ 16 16 17 """ 18 Parallel inlet: 19 20 master_proc - coordinates all processors associated with this inlet 21 usually the processors with domains which contains parts of this inlet. 22 23 procs - is the list of all processors associated with this inlet. 24 25 (We assume that the above arguments are determined correctly by the parallel_operator_factory) 26 """ 27 17 28 def __init__(self, domain, line, master_proc = 0, procs = None, verbose=False): 18 29 … … 20 31 self.line = line 21 32 self.verbose = verbose 22 self.master_proc = master_proc #Master processor where global data is gathered33 self.master_proc = master_proc 23 34 24 35 if procs is None: … … 37 48 def compute_triangle_indices(self): 38 49 39 # PETE: Note that all of these are fine but keep in mind that they are local to a domain. 40 41 # Get boundary (in absolute coordinates) 50 # Get boundary of full triangles (in absolute coordinates) 42 51 vertex_coordinates = self.domain.get_full_vertex_coordinates(absolute=True) 43 52 44 # PETE: Eliminate ghost triangle indices, we can assume that it is in the other inlet 53 # PETE: we can assume that the ghost triangles in this domain exist in another 54 # domain, therefore any part of the inlet corresponding to them are accounted for. 45 55 46 56 self.triangle_indices = line_intersect(vertex_coordinates, self.line) 47 57 48 #print "P%d has %d inlet triangles" %(self.myid, len(self.triangle_indices))49 50 #print "Triangle Indices:"51 #52 58 for i in self.triangle_indices: 53 59 assert self.domain.tri_full_flag[i] == 1 54 55 # This effectively does the checks already56 if len(self.triangle_indices) == 0:57 msg = 'Inlet line=%s ' % (self.line)58 msg += 'No triangles intersecting line (Only an enquiry point?)'59 print "WARNING: " + msg60 #raise Exception, msg61 62 63 60 64 61 def compute_area(self): … … 70 67 msg = 'No triangles have been identified in region ' 71 68 print "WARNING: " + msg 72 #raise Exception, msg73 69 74 70 self.area = 0.0 … … 76 72 self.area += self.domain.areas[j] 77 73 78 # PETE: Do a reduction operation to tally up the areas? Must be asynchronous79 # Can we assume that this will be called roughly at the same time?80 # At this point this calculates the local area81 82 74 msg = 'Inlet exchange area has area = %f' % self.area 83 75 assert self.area >= 0.0 84 #assert self.area > 0.085 86 76 87 77 def compute_inlet_length(self): 88 """ Compute the length of the inlet (as78 """ Compute the length of the inlet within this domain (as 89 79 defined by the input line 90 80 """ 91 81 92 # PETE: This is ok, I think this is independent of the domain?93 94 82 point0 = self.line[0] 95 83 point1 = self.line[1] 96 97 #TODO: Go through each point in the line, only count the lenght as the one within the98 #bounding polygon99 84 100 85 self.inlet_length = anuga.geometry.polygon.line_length(self.line) … … 108 93 109 94 def get_line(self): 110 # LOCAL111 95 return self.line 112 96 … … 116 100 117 101 def get_global_area(self): 118 # Master processor gathers area from all child processors, and returns value 102 # GLOBAL: Master processor gathers area from all child processors, and returns value 103 104 # WARNING: requires synchronization, must be called by all procs associated 105 # with this inlet 106 119 107 local_area = self.area 120 108 area = local_area … … 134 122 135 123 def get_areas(self): 136 # LOCAL137 124 # Must be called after compute_inlet_triangle_indices(). 125 # LOCAL 126 138 127 return self.domain.areas.take(self.triangle_indices) 139 128 … … 141 130 def get_stages(self): 142 131 # LOCAL 143 # PETE: Do we provide all the stages, is it ok if we just provide the local stages? 144 # Are there any dependencies? 132 145 133 return self.domain.quantities['stage'].centroid_values.take(self.triangle_indices) 146 134 … … 148 136 def get_average_stage(self): 149 137 # LOCAL 138 150 139 return num.sum(self.get_stages()*self.get_areas())/self.area 151 140 152 141 def get_global_average_stage(self): 153 # LOCAL 142 # GLOBAL: Master processor gathers stages from all child processors, and returns average 143 144 # WARNING: requires synchronization, must be called by all procs associated 145 # with this inlet 146 154 147 local_stage = num.sum(self.get_stages()*self.get_areas()) 155 148 global_area = self.get_global_area() … … 187 180 188 181 def get_global_average_xmom(self): 182 # GLOBAL: master proc gathers all xmom values and returns average 183 # WARNING: requires synchronization, must be called by all procs associated 184 # with this inlet 185 189 186 global_area = self.get_global_area() 190 187 local_xmoms = num.sum(self.get_xmoms()*self.get_areas()) … … 213 210 214 211 def get_global_average_ymom(self): 212 # GLOBAL: master proc gathers all ymom values and returns average 213 # WARNING: requires synchronization, must be called by all procs associated 214 # with this inlet 215 215 216 global_area = self.get_global_area() 216 217 local_ymoms = num.sum(self.get_ymoms()*self.get_areas()) … … 239 240 240 241 def get_global_total_water_volume(self): 242 # GLOBAL: master proc gathers total water volumes from each proc and returns average 243 # WARNING: requires synchronization, must be called by all procs associated 244 # with this inlet 245 241 246 local_volume = num.sum(self.get_depths()*self.get_areas()) 242 247 volume = local_volume … … 259 264 260 265 def get_global_average_depth(self): 266 # GLOBAL: master proc gathers all depth values and returns average 267 # WARNING: requires synchronization, must be called by all procs associated 268 # with this inlet 269 261 270 area = self.get_global_area() 262 271 total_water_volume = self.get_global_total_water_volume() … … 296 305 297 306 def get_average_velocity_head(self): 298 307 #LOCAL 299 308 return 0.5*self.get_average_speed()**2/g 300 309 301 310 302 311 def get_average_total_energy(self): 303 312 #LOCAL 304 313 return self.get_average_velocity_head() + self.get_average_stage() 305 314 306 315 307 316 def get_average_specific_energy(self): 308 317 #LOCAL 309 318 return self.get_average_velocity_head() + self.get_average_depth() 310 319 311 320 312 # Set routines 321 # Set routines (ALL LOCAL) 313 322 314 323 def set_depths(self,depth): … … 336 345 self.domain.quantities['elevation'].centroid_values.put(self.triangle_indices, elevation) 337 346 338 def debug_set_stages_evenly(self,volume):339 """ Distribute volume of water over340 inlet exchange region so that stage is level341 """342 343 areas = self.get_areas()344 stages = self.get_stages()345 346 stages_order = stages.argsort()347 348 # accumulate areas of cells ordered by stage349 summed_areas = num.cumsum(areas[stages_order])350 351 # accumulate the volume need to fill cells352 summed_volume = num.zeros_like(areas)353 summed_volume[1:] = num.cumsum(summed_areas[:-1]*num.diff(stages[stages_order]))354 355 # find the number of cells which will be filled356 index = num.nonzero(summed_volume<=volume)[0][-1]357 358 # calculate stage needed to fill chosen cells with given volume of water359 depth = (volume - summed_volume[index])/summed_areas[index]360 new_stage = stages[stages_order[index]]+depth361 362 363 #print "Summed Volume = " + str(summed_volume)364 #print "Summed Area = " + str(summed_areas)365 #print "Ordered Stages = " + str(stages[stages_order[:]])366 #print "New Stage = " + str(new_stage) + " Volume = " + str(volume) + " Summed Volume = " + str(summed_volume[index]) + " Index = " + str(index+1)367 #print "NS = " + str(new_stage) + " SAr = " + str(summed_areas[index]) + " Vol = " + str(volume) + " SVol = " + str(summed_volume[index]) + " I = " + str(index)368 369 370 stages[stages_order[0:index+1]] = new_stage371 #stages[stages_order[0:index+1]] = stages[stages_order[index]]+depth372 373 374 375 self.set_stages(stages)376 347 377 348 def set_stages_evenly(self,volume): … … 379 350 inlet exchange region so that stage is level 380 351 """ 381 # PETE: THIS must be global and in parallel - this does not appear to set the stage for the part 382 # above the volume 383 # 384 385 ''' 386 if pypar.size() == 1: 387 self.debug_set_stages_evenly(volume) 388 return 389 ''' 352 # WARNING: requires synchronization, must be called by all procs associated 353 # with this inlet 354 390 355 centroid_coordinates = self.domain.get_full_centroid_coordinates(absolute=True) 391 356 areas = self.get_areas() … … 400 365 s_stages_order = {} 401 366 total_stages = len(stages) 402 403 #for i in stages_order:404 # print "[%d, %f, %s]" %(self.myid, stages[i], centroid_coordinates[self.triangle_indices[i]])405 367 406 368 if self.myid == self.master_proc: … … 415 377 s_stages[i] = pypar.receive(i) 416 378 s_stages_order[i] = pypar.receive(i) 417 #print "Recieved from P%d" %(i)418 #print str(s_stages[i])419 379 total_stages = total_stages + len(s_stages[i]) 420 380 421 381 else: 422 # Send areas, stages, and stages order to master 382 # Send areas, stages, and stages order to master proc of inlet 423 383 pypar.send(areas, self.master_proc) 424 384 pypar.send(stages, self.master_proc) … … 433 393 num_stages = 0. 434 394 first = True 435 #sa_debug = [] 436 #sv_debug = [] 437 #s_debug = [] 438 395 439 396 for i in self.procs: 440 397 pos[i] = 0 … … 452 409 if s_stages[i][s_stages_order[i][pos[i]]] < current_stage: 453 410 current_stage = s_stages[i][s_stages_order[i][pos[i]]] 454 #s_debug.append(current_stage)455 411 index = i 456 412 457 # If first iteration, then only update summed_areas, and current stage 458 #print "(%d, %f, %s)" %(index, current_stage, centroid_coordinates[self.triangle_indices[s_stages_order[index][pos[index]]]]) 459 413 # If first iteration, then only update summed_areas, position, and prev|current stage 414 460 415 if first: 461 416 first = False … … 477 432 pos[index] = pos[index] + 1 478 433 summed_volume = tmp_volume 479 #sa_debug.append(summed_areas) 480 #sv_debug.append(summed_volume) 434 481 435 # Update position of index processor and current stage 482 436 prev_stage = current_stage … … 484 438 # Calculate new stage 485 439 new_stage = prev_stage + (volume - summed_volume) / summed_areas 486 487 #print "Ordered Stages = " + str(stages[stages_order[:]])488 #print "NS = " + str(new_stage) + " SAr = " + str(summed_areas) + " Vol = " + str(volume) + " SVol = " + str(summed_volume) + " I = " + str(pos[self.myid])489 440 490 441 # Send postion and new stage to all processors … … 495 446 496 447 # Update own depth 497 #print "P%d, pos = %d, new_stage = %f" %(self.myid, pos[self.myid], new_stage)498 448 stages[stages_order[0:pos[self.myid]]] = new_stage 499 449 else: … … 501 451 new_stage = pypar.receive(self.master_proc) 502 452 stages[stages_order[0:pos]] = new_stage 503 #print "P%d, pos = %d, new_stage = %f" %(self.myid, pos, new_stage)504 #print str(stages)505 453 506 454 self.set_stages(stages) … … 508 456 stages = self.get_stages() 509 457 stages_order = stages.argsort() 510 511 #for i in stages_order:512 #print "eeee: [%d, %f, %s]" %(self.myid, stages[i], centroid_coordinates[self.triangle_indices[i]])513 458 514 459 def set_depths_evenly(self,volume): … … 516 461 cells with equal depth of water 517 462 """ 518 # Is this correct?519 463 new_depth = self.get_average_depth() + (volume/self.get_area()) 520 464 self.set_depths(new_depth) … … 528 472 529 473 def statistics(self): 474 # WARNING: requires synchronization, must be called by all procs associated 475 # with this inlet 476 530 477 message = '' 531 478 -
trunk/anuga_core/source/anuga_parallel/parallel_inlet_enquiry.py
r8224 r8227 12 12 """ 13 13 14 """ 15 master_proc - index of the processor which coordinates all processors 16 associated with this inlet operator. 17 procs - list of all processors associated with this inlet operator 18 enquiry_proc - processor containing inlet enquiry point 19 """ 20 14 21 def __init__(self, domain, polyline, enquiry_pt, master_proc = 0, procs = None, enquiry_proc = -1, 15 22 outward_culvert_vector=None, verbose=False): 16 23 17 # TODO: Include statement that excludes non-participating process 18 24 19 25 parallel_inlet.Parallel_Inlet.__init__(self, domain, polyline, 20 26 master_proc = master_proc, procs = procs, verbose=verbose) 21 #print "Inlet line = " + str(polyline)27 22 28 self.enquiry_pt = enquiry_pt 23 29 self.outward_culvert_vector = outward_culvert_vector … … 53 59 if has_enq_point: 54 60 self.enquiry_index = self.domain.get_triangle_containing_point(self.enquiry_pt) 55 #print "enquiry index = %d" %(self.enquiry_index) 56 61 57 62 if self.enquiry_index in self.triangle_indices: 58 63 msg = 'Enquiry point %s' % (self.enquiry_pt) … … 60 65 raise Exception, msg 61 66 62 if self.enquiry_proc >= 0: assert self.enquiry_proc == self.myid, "Specified enquiry proc does not match actual enquiry proc" 67 if self.enquiry_proc >= 0: 68 assert self.enquiry_proc == self.myid, "Specified enquiry proc does not match actual enquiry proc" 63 69 self.enquiry_proc = self.myid 64 70 assert self.enquiry_index >= 0, "Enquiry point inside polygon, but no triangle index found" … … 68 74 69 75 def get_enquiry_position(self): 70 # GLOBAL76 # WARNING: Must be called by processor containing inlet enquiry point to have effect 71 77 72 78 if self.enquiry_index >= 0: … … 76 82 77 83 def get_enquiry_stage(self): 84 # WARNING: Must be called by processor containing inlet enquiry point to have effect 78 85 79 86 if self.enquiry_index >= 0: … … 85 92 86 93 def get_enquiry_xmom(self): 94 # WARNING: Must be called by processor containing inlet enquiry point to have effect 95 87 96 if self.enquiry_index >= 0: 88 97 return self.domain.quantities['xmomentum'].centroid_values[self.enquiry_index] … … 91 100 92 101 def get_enquiry_ymom(self): 102 # WARNING: Must be called by processor containing inlet enquiry point to have effect 103 93 104 if self.enquiry_index >= 0: 94 105 return self.domain.quantities['ymomentum'].centroid_values[self.enquiry_index] … … 97 108 98 109 def get_enquiry_elevation(self): 110 # WARNING: Must be called by processor containing inlet enquiry point to have effect 111 99 112 if self.enquiry_index >= 0: 100 113 return self.domain.quantities['elevation'].centroid_values[self.enquiry_index] … … 104 117 105 118 def get_enquiry_depth(self): 119 # WARNING: Must be called by processor containing inlet enquiry point to have effect 106 120 107 121 if self.enquiry_index >= 0: … … 112 126 113 127 def get_enquiry_velocity(self): 128 # WARNING: Must be called by processor containing inlet enquiry point to have effect 114 129 115 130 if self.enquiry_index >= 0: … … 124 139 125 140 def get_enquiry_xvelocity(self): 141 # WARNING: Must be called by processor containing inlet enquiry point to have effect 126 142 127 143 if self.enquiry_index >= 0: … … 132 148 133 149 def get_enquiry_yvelocity(self): 150 # WARNING: Must be called by processor containing inlet enquiry point to have effect 134 151 135 152 if self.enquiry_index >= 0: … … 140 157 141 158 def get_enquiry_speed(self): 159 # WARNING: Must be called by processor containing inlet enquiry point to have effect 142 160 143 161 if self.enquiry_index >= 0: … … 150 168 151 169 def get_enquiry_velocity_head(self): 170 # WARNING: Must be called by processor containing inlet enquiry point to have effect 152 171 153 172 if self.enquiry_index >= 0: … … 158 177 159 178 def get_enquiry_total_energy(self): 179 # WARNING: Must be called by processor containing inlet enquiry point to have effect 160 180 161 181 if self.enquiry_index >= 0: … … 166 186 167 187 def get_enquiry_specific_energy(self): 188 # WARNING: Must be called by processor containing inlet enquiry point to have effect 189 168 190 if self.enquiry_index >= 0: 169 191 return self.get_enquiry_velocity_head() + self.get_enquiry_depth() -
trunk/anuga_core/source/anuga_parallel/parallel_inlet_operator.py
r8224 r8227 13 13 14 14 class Parallel_Inlet_operator(Inlet_operator): 15 """Inlet Operator - add water to an inlet. 15 """Parallel Inlet Operator - add water to an inlet potentially 16 shared between different parallel domains. 17 16 18 Sets up the geometry of problem 17 19 … … 22 24 """ 23 25 24 # PETE: This only counts the number of inlets in processor? 25 counter = 0 26 """ 27 master_proc - index of the processor which coordinates all processors 28 associated with this inlet operator. 29 procs - list of all processors associated with this inlet operator 30 31 """ 26 32 27 33 def __init__(self, … … 36 42 verbose = False): 37 43 38 # TODO: Include statement that excludes non-participating process39 # PETE: Only set if domain actually contains the line, EXIT otherwise40 44 self.domain = domain 41 45 self.domain.set_fractional_step_operator(self) 42 46 self.line = numpy.array(line, dtype='d') 43 self.master_proc = master_proc #PETE: id of the master processor that gathers global data47 self.master_proc = master_proc 44 48 45 49 if procs is None: … … 53 57 self.Q = Q 54 58 55 # PETE: Have description mentioning the name of the processor56 59 if description == None: 57 60 self.description = ' ' … … 74 77 Inlet_operator.counter += 1 75 78 76 # PETE: Should the line be global or local? What if enquiry point is elsewhere77 # TODO: Must determine the location of the enquiry point78 79 self.enquiry_point = 0.5*(self.line[0] + self.line[1]) 79 # TODO: Check whether the current processor contains enquiry point, tell the other processors80 # who owns it81 80 82 81 self.outward_vector = self.line 83 82 self.inlet = parallel_inlet.Parallel_Inlet(self.domain, self.line, master_proc = master_proc, 84 83 procs = procs, verbose= verbose) 85 86 #TODO: Should the master processor do this?87 84 self.set_logging(logging) 88 85 … … 91 88 volume = 0 92 89 93 # PETE: The master proc calculates the volume 90 # Only the master proc calculates the volume 91 94 92 if self.myid == self.master_proc: 95 93 timestep = self.domain.get_timestep() … … 105 103 #print "Volume to be removed from Inlet = " + str(volume) 106 104 107 # PETE: this is ok, assume that the master proc for inlet operator is the same as that 108 # for the the inlet itself, thus the other processes need not know the volume 105 # Set stages evenly 109 106 self.inlet.set_stages_evenly(volume) 110 107 111 # Distribute volume evenly over all cells112 #self.inlet.set_depths_evenly(volume)113 108 114 109 def update_Q(self, t): … … 116 111 overriding version in descendant 117 112 """ 118 # Only one processor should call this unless Q is parallelizable113 # Only one processor should call this function unless Q is parallelizable 119 114 if callable(self.Q): 120 115 Q = self.Q(t)[0] … … 125 120 126 121 def statistics(self): 122 # WARNING: requires synchronization, must be called by all procs associated 123 # with this inlet 127 124 128 125 message = '' … … 148 145 149 146 def print_statistics(self): 150 147 # WARNING: requires synchronization, must be called by all procs associated 148 # with this inlet 149 151 150 print self.statistics() 152 151 153 152 154 153 def print_timestepping_statistics(self): 154 # WARNING: Must be called by master proc to have any effect 155 155 156 156 if self.myid == self.master_proc: … … 164 164 165 165 def set_logging(self, flag=True): 166 # WARNING: Must be called by master proc to have any effect 166 167 167 168 stats = self.statistics() … … 187 188 188 189 def log_timestepping_statistics(self): 189 190 # WARNING: Must be called by master proc to have any effect 191 190 192 if self.myid == self.master_proc: 191 193 if self.logging: … … 195 197 196 198 def set_Q(self, Q): 197 199 # LOCAL 198 200 self.Q = Q 199 201 200 202 def get_Q(self): 201 203 # LOCAL 202 204 return self.Q 203 205 204 206 205 207 def get_inlet(self): 206 208 # LOCAL 207 209 return self.inlet 208 210 209 211 def get_line(self): 210 211 212 return self.line 212 213 -
trunk/anuga_core/source/anuga_parallel/parallel_operator_factory.py
r8224 r8227 27 27 import math 28 28 29 29 """ 30 Factory method for Parallel Inlet_operator. All parameters are the same 31 as normal Inlet_Operators master_proc coordinates the allocation process, 32 procs contains the potential list of processors to allocate the inlet to. 33 34 Returns None for calling processors not associated with inlet. Otherwise 35 return an instance of Parallel_Inlet_Operator 36 """ 30 37 31 38 def Inlet_operator(domain, line, Q, master_proc = 0, procs = range(0,pypar.size()), debug = False): 32 39 40 # If not parallel domain then allocate serial Inlet operator 33 41 if isinstance(domain, Parallel_domain) is False: 34 42 if debug: print "Allocating non parallel inlet operator ....." … … 37 45 38 46 myid = pypar.rank() 47 39 48 40 49 alloc, inlet_master_proc, inlet_procs, enquiry_proc = allocate_inlet_procs(domain, line, … … 56 65 return None 57 66 67 """ 68 Factory method for Parallel Boyd_box_operator. All parameters are the same 69 as normal Inlet_Operators master_proc coordinates the allocation process, 70 procs contains the potential list of processors to allocate the inlet to. 71 72 Returns None for calling processors not associated with structure. Otherwise 73 return an instance of Parallel_Inlet_Operator 74 """ 58 75 59 76 def Boyd_box_operator(domain, … … 78 95 debug = False): 79 96 97 # If not parallel domain then allocate serial Boyd box operator 80 98 if isinstance(domain, Parallel_domain) is False: 81 99 if debug: print "Allocating non parallel boyd box operator ....." … … 110 128 apron = width 111 129 130 # Calculate location of inlet enquiry points and exchange lines 112 131 if myid == master_proc: 113 132 if exchange_lines is not None: … … 137 156 enquiry_points_tmp = pypar.receive(master_proc) 138 157 139 line0 = exchange_lines_tmp[0] #self.inlet0_lines[0] 158 # Determine processors associated with first inlet 159 line0 = exchange_lines_tmp[0] 140 160 enquiry_point0 = enquiry_points_tmp[0] 141 161 … … 144 164 procs = procs, debug = debug) 145 165 166 # Determine processors associated with second inlet 146 167 line1 = exchange_lines_tmp[1] 147 168 enquiry_point1 = enquiry_points_tmp[1] … … 195 216 196 217 def __process_non_skew_culvert(end_points, width, enquiry_points, apron, enquiry_gap): 197 # PETE: This can actually be computed by the master198 218 """Create lines at the end of a culvert inlet and outlet. 199 219 At either end two lines will be created; one for the actual flow to pass through and one a little further away … … 272 292 numprocs = pypar.size() 273 293 274 line_procs = []294 inlet_procs = [] 275 295 max_size = -1 276 line_master_proc = -1277 line_enq_proc = -1296 inlet_master_proc = -1 297 inlet_enq_proc = -1 278 298 279 299 # Calculate the number of points of the line inside full polygon 280 281 300 282 301 tri_id = line_intersect(vertex_coordinates, line) … … 304 323 if myid == master_proc: 305 324 # Recieve size of overlap from each processor 306 307 325 # Initialize line_master_proc and inlet_procs 326 308 327 if size > 0: 309 line_procs = [master_proc]328 inlet_procs = [master_proc] 310 329 max_size = size 311 line_master_proc = master_proc330 inlet_master_proc = master_proc 312 331 if has_enq_point: 313 line_enq_proc = master_proc332 inlet_enq_proc = master_proc 314 333 315 334 # Recieve size of overlap … … 320 339 321 340 if x > 0: 322 line_procs.append(i) 323 # Choose line_master_proc as the one with the most overlap 341 inlet_procs.append(i) 342 343 # Choose inlet_master_proc as the one with the most overlap 324 344 if x > max_size: 325 345 max_size = x 326 line_master_proc = i346 inlet_master_proc = i 327 347 328 348 if y is True: 329 assert line_enq_proc == -1, "Enquiry point correspond to more than one proc"330 line_enq_proc = i331 332 assert len( line_procs) > 0, "Line does not intersect any domain"333 assert line_master_proc >= 0, "No master processor assigned"334 if enquiry_point is not None: assert line_enq_proc >= 0, "No enquiry point processor assigned"335 336 # Send line_master_proc and line_procs to all processors in line_procs349 assert inlet_enq_proc == -1, "Enquiry point correspond to more than one proc" 350 inlet_enq_proc = i 351 352 assert len(inlet_procs) > 0, "Line does not intersect any domain" 353 assert inlet_master_proc >= 0, "No master processor assigned" 354 if enquiry_point is not None: assert inlet_enq_proc >= 0, "No enquiry point processor assigned" 355 356 # Send inlet_master_proc and inlet_procs to all processors in inlet_procs 337 357 for i in procs: 338 358 if i != master_proc: 339 pypar.send( line_master_proc, i)340 pypar.send( line_procs, i)341 pypar.send( line_enq_proc, i)359 pypar.send(inlet_master_proc, i) 360 pypar.send(inlet_procs, i) 361 pypar.send(inlet_enq_proc, i) 342 362 343 363 else: … … 345 365 pypar.send(has_enq_point, master_proc) 346 366 347 line_master_proc = pypar.receive(master_proc)348 line_procs = pypar.receive(master_proc)349 line_enq_proc = pypar.receive(master_proc)350 if has_enq_point: assert line_enq_proc == myid, "Enquiry found in proc, but not declared globally"367 inlet_master_proc = pypar.receive(master_proc) 368 inlet_procs = pypar.receive(master_proc) 369 inlet_enq_proc = pypar.receive(master_proc) 370 if has_enq_point: assert inlet_enq_proc == myid, "Enquiry found in proc, but not declared globally" 351 371 352 372 if size > 0: 353 return True, line_master_proc, line_procs, line_enq_proc354 else: 355 return False, line_master_proc, line_procs, line_enq_proc373 return True, inlet_master_proc, inlet_procs, inlet_enq_proc 374 else: 375 return False, inlet_master_proc, inlet_procs, inlet_enq_proc 356 376 357 377
Note: See TracChangeset
for help on using the changeset viewer.