source: anuga_work/development/parallel/parallel_shallow_water.py @ 4113

Last change on this file since 4113 was 3557, checked in by ole, 19 years ago

nitnoi

File size: 9.3 KB
Line 
1import sys
2from os import sep
3
4# FIXME: This should be removed. Set evironment variable PYTHONPATH to
5# directory ...../anuga/inundation instead
6#sys.path.append('..'+sep+'pyvolution')
7
8"""Class Parallel_Shallow_Water_Domain -
92D triangular domains for finite-volume computations of
10the shallow water equation, with extra structures to allow
11communication between other Parallel_Domains and itself
12
13This module contains a specialisation of class Domain
14from module shallow_water.py
15
16Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
17Geoscience Australia, 2004-2005
18"""
19
20import logging, logging.config
21logger = logging.getLogger('parallel')
22logger.setLevel(logging.WARNING)
23
24try:
25    logging.config.fileConfig('log.ini')
26except:
27    pass
28
29from anuga.pyvolution.shallow_water import *
30from Numeric import zeros, Float, Int, ones, allclose, array
31
32import pypar
33
34
35class Parallel_Domain(Domain):
36
37    def __init__(self, coordinates, vertices, boundary = None,
38                 full_send_dict = None, ghost_recv_dict = None):
39
40        Domain.__init__(self,
41                        coordinates,
42                        vertices,
43                        boundary,
44                        full_send_dict=full_send_dict,
45                        ghost_recv_dict=ghost_recv_dict,
46                        processor=pypar.rank(),
47                        numproc=pypar.size())
48
49        N = self.number_of_elements
50
51#        self.processor = pypar.rank()
52#        self.numproc   = pypar.size()
53#
54#        # Setup Communication Buffers
55#        self.nsys = 3
56#        for key in full_send_dict:
57#            buffer_shape = full_send_dict[key][0].shape[0]
58#            full_send_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
59#
60#
61#        for key in ghost_recv_dict:
62#            buffer_shape = ghost_recv_dict[key][0].shape[0]
63#            ghost_recv_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
64#
65#        self.full_send_dict  = full_send_dict
66        self.ghost_recv_dict = ghost_recv_dict
67
68        # Buffers for synchronisation of timesteps
69        self.local_timestep = zeros(1, Float)
70        self.global_timestep = zeros(1, Float)
71
72        self.local_timesteps = zeros(self.numproc, Float)
73
74
75        self.communication_time = 0.0
76        self.communication_reduce_time = 0.0
77        self.communication_broadcast_time = 0.0
78
79       
80
81
82    def set_name(self, name):
83        """Assign name based on processor number
84        """
85
86        # Call parents method with processor number attached.
87        Domain.set_name(self, name + '_P%d_%d' %(self.processor, self.numproc))
88
89
90    def check_integrity(self):
91        Domain.check_integrity(self)
92
93        msg = 'Will need to check global and local numbering'
94        assert self.conserved_quantities[0] == 'stage', msg
95        assert self.conserved_quantities[1] == 'xmomentum', msg
96        assert self.conserved_quantities[2] == 'ymomentum', msg
97
98
99    def update_timestep_1(self, yieldstep, finaltime):
100        """Calculate local timestep using broadcasts
101        """
102
103        #LINDA:
104        # Moved below so timestep is found before doing update
105       
106        #Domain.update_timestep(self, yieldstep, finaltime)
107
108        import time
109
110
111        t0 = time.time()
112
113        #Broadcast local timestep from every processor to every other
114        for pid in range(self.numproc):
115            #print 'P%d calling broadcast from %d' %(self.processor, pid)
116            self.local_timestep[0] = self.timestep
117            pypar.broadcast(self.local_timestep, pid, bypass=True)
118            self.local_timesteps[pid] = self.local_timestep[0]
119
120        self.timestep = min(self.local_timesteps)
121
122        pypar.barrier()
123        self.communication_broadcast_time += time.time()-t0
124
125        # LINDA:
126        # Moved timestep to here
127       
128        Domain.update_timestep(self, yieldstep, finaltime)
129
130
131    def update_timestep(self, yieldstep, finaltime):
132        """Calculate local timestep
133        """
134
135        # LINDA: Moved below so timestep is updated before
136        # calculating statistic
137       
138        #Compute minimal timestep on local process
139        #Domain.update_timestep(self, yieldstep, finaltime)
140
141        pypar.barrier()
142
143        import time
144        #Compute minimal timestep across all processes
145        self.local_timestep[0] = self.timestep
146        use_reduce_broadcast = True
147        if use_reduce_broadcast:
148            t0 = time.time()
149            pypar.reduce(self.local_timestep, pypar.MIN, 0,
150                         buffer=self.global_timestep,
151                         bypass=True)
152
153        else:
154            #Alternative: Try using straight send and receives
155            t0 = time.time()
156            self.global_timestep[0] = self.timestep
157
158            if self.processor == 0:
159                for i in range(1, self.numproc):
160                    pypar.receive(i,
161                                  buffer=self.local_timestep,
162                                  bypass=True)
163
164                    if self.local_timestep[0] < self.global_timestep[0]:
165                        self.global_timestep[0] = self.local_timestep[0]
166            else:
167                pypar.send(self.local_timestep, 0,
168                           use_buffer=True, bypass=True)
169
170
171        self.communication_reduce_time += time.time()-t0
172
173
174        #Broadcast minimal timestep to all
175        t0 = time.time()
176        pypar.broadcast(self.global_timestep, 0,
177                        bypass=True)
178
179        self.communication_broadcast_time += time.time()-t0
180
181
182        self.timestep = self.global_timestep[0]
183       
184        # LINDA:
185        # update local stats now
186       
187        #Compute minimal timestep on local process
188        Domain.update_timestep(self, yieldstep, finaltime)
189
190        # FIXME (Ole) We should update the variable min_timestep for use
191        # with write_time (or redo write_time)
192
193    #update_timestep = update_timestep_1
194
195    def update_ghosts(self):
196
197        # We must send the information from the full cells and
198        # receive the information for the ghost cells
199        # We have a dictionary of lists with ghosts expecting updates from
200        # the separate processors
201
202
203        from Numeric import take,put
204        import time
205        t0 = time.time()
206
207        # update of non-local ghost cells
208        for iproc in range(self.numproc):
209            if iproc == self.processor:
210                #Send data from iproc processor to other processors
211                for send_proc in self.full_send_dict:
212                    if send_proc != iproc:
213
214                        Idf  = self.full_send_dict[send_proc][0]
215                        Xout = self.full_send_dict[send_proc][2]
216
217                        for i, q in enumerate(self.conserved_quantities):
218                            #print 'Send',i,q
219                            Q_cv =  self.quantities[q].centroid_values
220                            Xout[:,i] = take(Q_cv,     Idf)
221
222                        pypar.send(Xout, send_proc,
223                                   use_buffer=True, bypass = True)
224
225
226            else:
227                #Receive data from the iproc processor
228                if  self.ghost_recv_dict.has_key(iproc):
229
230                    Idg = self.ghost_recv_dict[iproc][0]
231                    X = self.ghost_recv_dict[iproc][2]
232
233                    X = pypar.receive(iproc, buffer=X, bypass = True)
234
235                    for i, q in enumerate(self.conserved_quantities):
236                        #print 'Receive',i,q
237                        Q_cv =  self.quantities[q].centroid_values
238                        put(Q_cv,     Idg, X[:,i])
239
240        #local update of ghost cells
241        iproc = self.processor
242        if self.full_send_dict.has_key(iproc):
243
244            # LINDA:
245            # now store full as local id, global id, value
246            Idf  = self.full_send_dict[iproc][0]
247
248            # LINDA:
249            # now store ghost as local id, global id, value
250            Idg = self.ghost_recv_dict[iproc][0]
251
252            for i, q in enumerate(self.conserved_quantities):
253                #print 'LOCAL SEND RECEIVE',i,q
254                Q_cv =  self.quantities[q].centroid_values
255                put(Q_cv,     Idg, take(Q_cv,     Idf))
256
257        self.communication_time += time.time()-t0
258
259
260    def write_time(self):
261        if self.min_timestep == self.max_timestep:
262            print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
263                  %(self.processor, self.time, self.min_timestep, self.number_of_steps,
264                    self.number_of_first_order_steps)
265        elif self.min_timestep > self.max_timestep:
266            print 'Processor %d, Time = %.4f, steps=%d (%d)'\
267                  %(self.processor, self.time, self.number_of_steps,
268                    self.number_of_first_order_steps)
269        else:
270            print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
271                  %(self.processor, self.time, self.min_timestep,
272                    self.max_timestep, self.number_of_steps,
273                    self.number_of_first_order_steps)
274
275
276    def evolve(self, yieldstep = None, finaltime = None):
277        """Specialisation of basic evolve method from parent class
278        """
279
280        #Initialise real time viz if requested
281        if self.time == 0.0:
282            pass
283
284        #Call basic machinery from parent class
285        for t in Domain.evolve(self, yieldstep, finaltime):
286
287            #Pass control on to outer loop for more specific actions
288            yield(t)
Note: See TracBrowser for help on using the repository browser.