source: trunk/anuga_core/anuga/parallel/parallel_generic_communications.py @ 9679

Last change on this file since 9679 was 9529, checked in by steve, 10 years ago

Removed pypar_extras folder as it really only contains two files

File size: 5.8 KB
Line 
1"""
2Generic implementation of update_timestep and update_ghosts for
3parallel domains (eg shallow_water or advection)
4
5Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
6Geoscience Australia, 2004-2010
7
8"""
9
10import numpy as num
11
12import anuga.utilities.parallel_abstraction as pypar
13
14
15
16
17def setup_buffers(domain):
18    """Buffers for synchronisation of timesteps
19    """
20
21    domain.local_timestep = num.zeros(1, num.float)
22    domain.global_timestep = num.zeros(1, num.float)
23
24    domain.local_timesteps = num.zeros(domain.numproc, num.float)
25
26    domain.communication_time = 0.0
27    domain.communication_reduce_time = 0.0
28    domain.communication_broadcast_time = 0.0
29
30
31def communicate_flux_timestep(domain, yieldstep, finaltime):
32    """Calculate local timestep
33    """
34
35    import time
36
37    #Compute minimal timestep across all processes
38    domain.local_timestep[0] = domain.flux_timestep
39    t0 = time.time()
40
41
42    import anuga.parallel.pypar_ext as par_exts
43
44    par_exts.allreduce(domain.local_timestep, pypar.MIN,
45                      buffer=domain.global_timestep,
46                      bypass=True)
47
48    domain.communication_reduce_time += time.time()-t0
49
50
51
52
53#    pypar.reduce(domain.local_timestep, pypar.MIN, 0,
54#                      buffer=domain.global_timestep,
55#                      bypass=True)
56#
57#
58#    domain.communication_reduce_time += time.time()-t0
59
60
61    #Broadcast minimal timestep to all processors
62    t0 = time.time()
63    #pypar.broadcast(domain.global_timestep, 0)#, bypass=True)
64
65    domain.communication_broadcast_time += time.time()-t0
66
67    #old_flux_timestep = domain.flux_timestep
68    domain.flux_timestep = domain.global_timestep[0]
69   
70   
71
72def communicate_ghosts_blocking(domain):
73
74    # We must send the information from the full cells and
75    # receive the information for the ghost cells
76    # We have a dictionary of lists with ghosts expecting updates from
77    # the separate processors
78
79    import numpy as num
80    import time
81    t0 = time.time()
82
83    # update of non-local ghost cells
84    for iproc in range(domain.numproc):
85        if iproc == domain.processor:
86            #Send data from iproc processor to other processors
87            for send_proc in domain.full_send_dict:
88                if send_proc != iproc:
89
90                    Idf  = domain.full_send_dict[send_proc][0]
91                    Xout = domain.full_send_dict[send_proc][2]
92
93                    for i, q in enumerate(domain.conserved_quantities):
94                        #print 'Send',i,q
95                        Q_cv =  domain.quantities[q].centroid_values
96                        Xout[:,i] = num.take(Q_cv, Idf)
97
98                    pypar.send(Xout, int(send_proc), use_buffer=True, bypass=True)
99
100
101        else:
102            #Receive data from the iproc processor
103            if  domain.ghost_recv_dict.has_key(iproc):
104
105                Idg = domain.ghost_recv_dict[iproc][0]
106                X   = domain.ghost_recv_dict[iproc][2]
107
108                X = pypar.receive(int(iproc), buffer=X, bypass=True)
109
110                for i, q in enumerate(domain.conserved_quantities):
111                    #print 'Receive',i,q
112                    Q_cv =  domain.quantities[q].centroid_values
113                    num.put(Q_cv, Idg, X[:,i])
114
115    #local update of ghost cells
116    iproc = domain.processor
117    if domain.full_send_dict.has_key(iproc):
118
119        # LINDA:
120        # now store full as local id, global id, value
121        Idf  = domain.full_send_dict[iproc][0]
122
123        # LINDA:
124        # now store ghost as local id, global id, value
125        Idg = domain.ghost_recv_dict[iproc][0]
126
127        for i, q in enumerate(domain.conserved_quantities):
128            #print 'LOCAL SEND RECEIVE',i,q
129            Q_cv =  domain.quantities[q].centroid_values
130            num.put(Q_cv, Idg, num.take(Q_cv, Idf))
131
132    domain.communication_time += time.time()-t0
133
134
135
136def communicate_ghosts_asynchronous(domain):
137
138    # We must send the information from the full cells and
139    # receive the information for the ghost cells
140    # We have a dictionary of lists with ghosts expecting updates from
141    # the separate processors
142    # Using isend and irecv
143
144    import numpy as num
145    import time
146    t0 = time.time()
147
148    # update of non-local ghost cells by copying full cell data into the
149    # Xout buffer arrays
150
151    #iproc == domain.processor
152
153    #Setup send buffer arrays for sending full data to other processors
154    for send_proc in domain.full_send_dict:
155        Idf  = domain.full_send_dict[send_proc][0]
156        Xout = domain.full_send_dict[send_proc][2]
157
158        for i, q in enumerate(domain.conserved_quantities):
159            #print 'Store send data',i,q
160            Q_cv =  domain.quantities[q].centroid_values
161            Xout[:,i] = num.take(Q_cv, Idf)
162
163
164
165#    from pprint import pprint
166#
167#    if pypar.rank() == 0:
168#        print 'Before commun 0'
169#        pprint(domain.full_send_dict)
170#
171#    if pypar.rank() == 1:
172#        print 'Before commun 1'
173#        pprint(domain.full_send_dict)
174
175
176    # Do all the comuunication using isend/irecv via the buffers in the
177    # full_send_dict and ghost_recv_dict
178    from anuga.parallel import mpiextras
179
180    mpiextras.send_recv_via_dicts(domain.full_send_dict,domain.ghost_recv_dict)
181
182#
183#    if pypar.rank() == 0:
184#        print 'After commun 0'
185#        pprint(domain.ghost_recv_dict)
186#
187#    if pypar.rank() == 1:
188#        print 'After commun 1'
189#        pprint(domain.ghost_recv_dict)
190
191    # Now copy data from receive buffers to the domain
192    for recv_proc in domain.ghost_recv_dict:
193        Idg  = domain.ghost_recv_dict[recv_proc][0]
194        X    = domain.ghost_recv_dict[recv_proc][2]
195
196        #print recv_proc
197        #print X
198
199        for i, q in enumerate(domain.conserved_quantities):
200            #print 'Read receive data',i,q
201            Q_cv =  domain.quantities[q].centroid_values
202            num.put(Q_cv, Idg, X[:,i])
203
204
205    domain.communication_time += time.time()-t0
206
207
Note: See TracBrowser for help on using the repository browser.