- Timestamp:
- Aug 14, 2012, 8:46:07 PM (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/anuga_core/source/anuga_parallel/parallel_generic_communications.py
r8511 r8513 33 33 """ 34 34 35 36 37 35 import time 38 36 … … 41 39 t0 = time.time() 42 40 43 allreduce = True 44 45 if allreduce: 46 import anuga_parallel.pypar_extras as par_extras 47 par_extras.allreduce(domain.local_timestep, pypar.MIN, 41 42 import anuga_parallel.pypar_extras as par_exts 43 44 par_exts.allreduce(domain.local_timestep, pypar.MIN, 48 45 buffer=domain.global_timestep, 49 46 bypass=True) 50 domain.communication_reduce_time += time.time()-t0 51 else: 52 pypar.reduce(domain.local_timestep, pypar.MIN, 0, 53 buffer=domain.global_timestep, 54 bypass=True) 55 56 domain.communication_reduce_time += time.time()-t0 57 58 59 #Broadcast minimal timestep to all processors 60 t0 = time.time() 61 pypar.broadcast(domain.global_timestep, 0)#, bypass=True) 62 63 domain.communication_broadcast_time += time.time()-t0 47 48 domain.communication_reduce_time += time.time()-t0 49 50 51 52 53 # pypar.reduce(domain.local_timestep, pypar.MIN, 0, 54 # buffer=domain.global_timestep, 55 # bypass=True) 56 # 57 # 58 # domain.communication_reduce_time += time.time()-t0 59 60 61 #Broadcast minimal timestep to all processors 62 t0 = time.time() 63 #pypar.broadcast(domain.global_timestep, 0)#, bypass=True) 64 65 domain.communication_broadcast_time += time.time()-t0 64 66 65 67 #old_flux_timestep = domain.flux_timestep … … 68 70 69 71 70 def communicate_ghosts (domain):72 def communicate_ghosts_blocking(domain): 71 73 72 74 # We must send the information from the full cells and … … 130 132 domain.communication_time += time.time()-t0 131 133 134 135 136 def communicate_ghosts_asynchronous(domain): 137 138 # We must send the information from the full cells and 139 # receive the information for the ghost cells 140 # We have a dictionary of lists with ghosts expecting updates from 141 # the separate processors 142 # Using isend and irecv 143 144 import numpy as num 145 import time 146 t0 = time.time() 147 148 # update of non-local ghost cells by copying full cell data into the 149 # Xout buffer arrays 150 151 #iproc == domain.processor 152 153 #Setup send buffer arrays for sending full data to other processors 154 for send_proc in domain.full_send_dict: 155 Idf = domain.full_send_dict[send_proc][0] 156 Xout = domain.full_send_dict[send_proc][2] 157 158 for i, q in enumerate(domain.conserved_quantities): 159 #print 'Store send data',i,q 160 Q_cv = domain.quantities[q].centroid_values 161 Xout[:,i] = num.take(Q_cv, Idf) 162 163 164 165 # from pprint import pprint 166 # 167 # if pypar.rank() == 0: 168 # print 'Before commun 0' 169 # pprint(domain.full_send_dict) 170 # 171 # if pypar.rank() == 1: 172 # print 'Before commun 1' 173 # pprint(domain.full_send_dict) 174 175 176 # Do all the comuunication using isend/irecv via the buffers in the 177 # full_send_dict and ghost_recv_dict 178 from anuga_parallel.pypar_extras import mpiextras 179 180 mpiextras.send_recv_via_dicts(domain.full_send_dict,domain.ghost_recv_dict) 181 182 # 183 # if pypar.rank() == 0: 184 # print 'After commun 0' 185 # pprint(domain.ghost_recv_dict) 186 # 187 # if pypar.rank() == 1: 188 # print 'After commun 1' 189 # pprint(domain.ghost_recv_dict) 190 191 # Now copy data from receive buffers to the domain 192 for recv_proc in domain.ghost_recv_dict: 193 Idg = domain.ghost_recv_dict[recv_proc][0] 194 X = domain.ghost_recv_dict[recv_proc][2] 195 196 #print recv_proc 197 #print X 198 199 for i, q in enumerate(domain.conserved_quantities): 200 #print 'Read receive data',i,q 201 Q_cv = domain.quantities[q].centroid_values 202 num.put(Q_cv, Idg, X[:,i]) 203 204 205 domain.communication_time += time.time()-t0 206 207
Note: See TracChangeset
for help on using the changeset viewer.