Ignore:
Timestamp:
Aug 14, 2012, 8:46:07 PM (13 years ago)
Author:
steve
Message:

Adding working asynchronous update_ghosts

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/anuga_core/source/anuga_parallel/parallel_generic_communications.py

    r8511 r8513  
    3333    """
    3434
    35 
    36 
    3735    import time
    3836
     
    4139    t0 = time.time()
    4240
    43     allreduce = True
    44    
    45     if allreduce:
    46         import anuga_parallel.pypar_extras as par_extras
    47         par_extras.allreduce(domain.local_timestep, pypar.MIN,
     41
     42    import anuga_parallel.pypar_extras as par_exts
     43
     44    par_exts.allreduce(domain.local_timestep, pypar.MIN,
    4845                      buffer=domain.global_timestep,
    4946                      bypass=True)
    50         domain.communication_reduce_time += time.time()-t0
    51     else:
    52         pypar.reduce(domain.local_timestep, pypar.MIN, 0,
    53                       buffer=domain.global_timestep,
    54                       bypass=True)
    55 
    56         domain.communication_reduce_time += time.time()-t0
    57 
    58 
    59         #Broadcast minimal timestep to all processors
    60         t0 = time.time()
    61         pypar.broadcast(domain.global_timestep, 0)#, bypass=True)
    62 
    63         domain.communication_broadcast_time += time.time()-t0
     47
     48    domain.communication_reduce_time += time.time()-t0
     49
     50
     51
     52
     53#    pypar.reduce(domain.local_timestep, pypar.MIN, 0,
     54#                      buffer=domain.global_timestep,
     55#                      bypass=True)
     56#
     57#
     58#    domain.communication_reduce_time += time.time()-t0
     59
     60
     61    #Broadcast minimal timestep to all processors
     62    t0 = time.time()
     63    #pypar.broadcast(domain.global_timestep, 0)#, bypass=True)
     64
     65    domain.communication_broadcast_time += time.time()-t0
    6466
    6567    #old_flux_timestep = domain.flux_timestep
     
    6870   
    6971
    70 def communicate_ghosts(domain):
     72def communicate_ghosts_blocking(domain):
    7173
    7274    # We must send the information from the full cells and
     
    130132    domain.communication_time += time.time()-t0
    131133
     134
     135
     136def communicate_ghosts_asynchronous(domain):
     137
     138    # We must send the information from the full cells and
     139    # receive the information for the ghost cells
     140    # We have a dictionary of lists with ghosts expecting updates from
     141    # the separate processors
     142    # Using isend and irecv
     143
     144    import numpy as num
     145    import time
     146    t0 = time.time()
     147
     148    # update of non-local ghost cells by copying full cell data into the
     149    # Xout buffer arrays
     150
     151    #iproc == domain.processor
     152
     153    #Setup send buffer arrays for sending full data to other processors
     154    for send_proc in domain.full_send_dict:
     155        Idf  = domain.full_send_dict[send_proc][0]
     156        Xout = domain.full_send_dict[send_proc][2]
     157
     158        for i, q in enumerate(domain.conserved_quantities):
     159            #print 'Store send data',i,q
     160            Q_cv =  domain.quantities[q].centroid_values
     161            Xout[:,i] = num.take(Q_cv, Idf)
     162
     163
     164
     165#    from pprint import pprint
     166#
     167#    if pypar.rank() == 0:
     168#        print 'Before commun 0'
     169#        pprint(domain.full_send_dict)
     170#
     171#    if pypar.rank() == 1:
     172#        print 'Before commun 1'
     173#        pprint(domain.full_send_dict)
     174
     175
     176    # Do all the comuunication using isend/irecv via the buffers in the
     177    # full_send_dict and ghost_recv_dict
     178    from anuga_parallel.pypar_extras import mpiextras
     179
     180    mpiextras.send_recv_via_dicts(domain.full_send_dict,domain.ghost_recv_dict)
     181
     182#
     183#    if pypar.rank() == 0:
     184#        print 'After commun 0'
     185#        pprint(domain.ghost_recv_dict)
     186#
     187#    if pypar.rank() == 1:
     188#        print 'After commun 1'
     189#        pprint(domain.ghost_recv_dict)
     190
     191    # Now copy data from receive buffers to the domain
     192    for recv_proc in domain.ghost_recv_dict:
     193        Idg  = domain.ghost_recv_dict[recv_proc][0]
     194        X    = domain.ghost_recv_dict[recv_proc][2]
     195
     196        #print recv_proc
     197        #print X
     198
     199        for i, q in enumerate(domain.conserved_quantities):
     200            #print 'Read receive data',i,q
     201            Q_cv =  domain.quantities[q].centroid_values
     202            num.put(Q_cv, Idg, X[:,i])
     203
     204
     205    domain.communication_time += time.time()-t0
     206
     207
Note: See TracChangeset for help on using the changeset viewer.