source: inundation/parallel/parallel_shallow_water.py @ 3422

Last change on this file since 3422 was 3185, checked in by linda, 19 years ago

Updated shallow water equation to broadcast timestep after flux
calculation

File size: 9.2 KB
Line 
1import sys
2from os import sep
3
4# FIXME: This should be removed. Set evironment variable PYTHONPATH to
5# directory ...../anuga/inundation instead
6#sys.path.append('..'+sep+'pyvolution')
7
8"""Class Parallel_Shallow_Water_Domain -
92D triangular domains for finite-volume computations of
10the shallow water equation, with extra structures to allow
11communication between other Parallel_Domains and itself
12
13This module contains a specialisation of class Domain
14from module shallow_water.py
15
16Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
17Geoscience Australia, 2004-2005
18"""
19
20import logging, logging.config
21logger = logging.getLogger('parallel')
22logger.setLevel(logging.WARNING)
23
24try:
25    logging.config.fileConfig('log.ini')
26except:
27    pass
28
29from pyvolution.shallow_water import *
30from Numeric import zeros, Float, Int, ones, allclose, array
31#from pypar_dist import pypar
32import pypar
33
34class Parallel_Domain(Domain):
35
36    def __init__(self, coordinates, vertices, boundary = None,
37                 full_send_dict = None, ghost_recv_dict = None):
38
39        Domain.__init__(self,
40                        coordinates,
41                        vertices,
42                        boundary,
43                        full_send_dict=full_send_dict,
44                        ghost_recv_dict=ghost_recv_dict,
45                        processor=pypar.rank(),
46                        numproc=pypar.size())
47
48        N = self.number_of_elements
49
50#        self.processor = pypar.rank()
51#        self.numproc   = pypar.size()
52#
53#        # Setup Communication Buffers
54#        self.nsys = 3
55#        for key in full_send_dict:
56#            buffer_shape = full_send_dict[key][0].shape[0]
57#            full_send_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
58#
59#
60#        for key in ghost_recv_dict:
61#            buffer_shape = ghost_recv_dict[key][0].shape[0]
62#            ghost_recv_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
63#
64#        self.full_send_dict  = full_send_dict
65        self.ghost_recv_dict = ghost_recv_dict
66
67        # Buffers for synchronisation of timesteps
68        self.local_timestep = zeros(1, Float)
69        self.global_timestep = zeros(1, Float)
70
71        self.local_timesteps = zeros(self.numproc, Float)
72
73
74        self.communication_time = 0.0
75        self.communication_reduce_time = 0.0
76        self.communication_broadcast_time = 0.0
77
78
79    def set_name(self, name):
80        """Assign name based on processor number
81        """
82
83        # Call parents method with processor number attached.
84        Domain.set_name(self, name + '_P%d_%d' %(self.processor, self.numproc))
85
86
87    def check_integrity(self):
88        Domain.check_integrity(self)
89
90        msg = 'Will need to check global and local numbering'
91        assert self.conserved_quantities[0] == 'stage', msg
92        assert self.conserved_quantities[1] == 'xmomentum', msg
93        assert self.conserved_quantities[2] == 'ymomentum', msg
94
95
96    def update_timestep_1(self, yieldstep, finaltime):
97        """Calculate local timestep using broadcasts
98        """
99
100        #LINDA:
101        # Moved below so timestep is found before doing update
102       
103        #Domain.update_timestep(self, yieldstep, finaltime)
104
105        import time
106
107
108        t0 = time.time()
109
110        #Broadcast local timestep from every processor to every other
111        for pid in range(self.numproc):
112            #print 'P%d calling broadcast from %d' %(self.processor, pid)
113            self.local_timestep[0] = self.timestep
114            pypar.broadcast(self.local_timestep, pid, bypass=True)
115            self.local_timesteps[pid] = self.local_timestep[0]
116
117        self.timestep = min(self.local_timesteps)
118
119        pypar.barrier()
120        self.communication_broadcast_time += time.time()-t0
121
122        # LINDA:
123        # Moved timestep to here
124       
125        Domain.update_timestep(self, yieldstep, finaltime)
126
127
128    def update_timestep(self, yieldstep, finaltime):
129        """Calculate local timestep
130        """
131
132        # LINDA: Moved below so timestep is updated before
133        # calculating statistic
134       
135        #Compute minimal timestep on local process
136        #Domain.update_timestep(self, yieldstep, finaltime)
137
138        pypar.barrier()
139
140        import time
141        #Compute minimal timestep across all processes
142        self.local_timestep[0] = self.timestep
143        use_reduce_broadcast = True
144        if use_reduce_broadcast:
145            t0 = time.time()
146            pypar.reduce(self.local_timestep, pypar.MIN, 0,
147                         buffer=self.global_timestep,
148                         bypass=True)
149
150        else:
151            #Alternative: Try using straight send and receives
152            t0 = time.time()
153            self.global_timestep[0] = self.timestep
154
155            if self.processor == 0:
156                for i in range(1, self.numproc):
157                    pypar.receive(i,
158                                  buffer=self.local_timestep,
159                                  bypass=True)
160
161                    if self.local_timestep[0] < self.global_timestep[0]:
162                        self.global_timestep[0] = self.local_timestep[0]
163            else:
164                pypar.send(self.local_timestep, 0,
165                           use_buffer=True, bypass=True)
166
167
168        self.communication_reduce_time += time.time()-t0
169
170
171        #Broadcast minimal timestep to all
172        t0 = time.time()
173        pypar.broadcast(self.global_timestep, 0,
174                        bypass=True)
175
176        self.communication_broadcast_time += time.time()-t0
177
178
179        self.timestep = self.global_timestep[0]
180       
181        # LINDA:
182        # update local stats now
183       
184        #Compute minimal timestep on local process
185        Domain.update_timestep(self, yieldstep, finaltime)
186
187    #update_timestep = update_timestep_1
188
189    def update_ghosts(self):
190
191        # We must send the information from the full cells and
192        # receive the information for the ghost cells
193        # We have a dictionary of lists with ghosts expecting updates from
194        # the separate processors
195
196
197        from Numeric import take,put
198        import time
199        t0 = time.time()
200
201        # update of non-local ghost cells
202        for iproc in range(self.numproc):
203            if iproc == self.processor:
204                #Send data from iproc processor to other processors
205                for send_proc in self.full_send_dict:
206                    if send_proc != iproc:
207
208                        Idf  = self.full_send_dict[send_proc][0]
209                        Xout = self.full_send_dict[send_proc][2]
210
211                        for i, q in enumerate(self.conserved_quantities):
212                            #print 'Send',i,q
213                            Q_cv =  self.quantities[q].centroid_values
214                            Xout[:,i] = take(Q_cv,     Idf)
215
216                        pypar.send(Xout, send_proc,
217                                   use_buffer=True, bypass = True)
218
219
220            else:
221                #Receive data from the iproc processor
222                if  self.ghost_recv_dict.has_key(iproc):
223
224                    Idg = self.ghost_recv_dict[iproc][0]
225                    X = self.ghost_recv_dict[iproc][2]
226
227                    X = pypar.receive(iproc, buffer=X, bypass = True)
228
229                    for i, q in enumerate(self.conserved_quantities):
230                        #print 'Receive',i,q
231                        Q_cv =  self.quantities[q].centroid_values
232                        put(Q_cv,     Idg, X[:,i])
233
234        #local update of ghost cells
235        iproc = self.processor
236        if self.full_send_dict.has_key(iproc):
237
238            # LINDA:
239            # now store full as local id, global id, value
240            Idf  = self.full_send_dict[iproc][0]
241
242            # LINDA:
243            # now store ghost as local id, global id, value
244            Idg = self.ghost_recv_dict[iproc][0]
245
246            for i, q in enumerate(self.conserved_quantities):
247                #print 'LOCAL SEND RECEIVE',i,q
248                Q_cv =  self.quantities[q].centroid_values
249                put(Q_cv,     Idg, take(Q_cv,     Idf))
250
251        self.communication_time += time.time()-t0
252
253
254    def write_time(self):
255        if self.min_timestep == self.max_timestep:
256            print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
257                  %(self.processor, self.time, self.min_timestep, self.number_of_steps,
258                    self.number_of_first_order_steps)
259        elif self.min_timestep > self.max_timestep:
260            print 'Processor %d, Time = %.4f, steps=%d (%d)'\
261                  %(self.processor, self.time, self.number_of_steps,
262                    self.number_of_first_order_steps)
263        else:
264            print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
265                  %(self.processor, self.time, self.min_timestep,
266                    self.max_timestep, self.number_of_steps,
267                    self.number_of_first_order_steps)
268
269
270    def evolve(self, yieldstep = None, finaltime = None):
271        """Specialisation of basic evolve method from parent class
272        """
273
274        #Initialise real time viz if requested
275        if self.time == 0.0:
276            pass
277
278        #Call basic machinery from parent class
279        for t in Domain.evolve(self, yieldstep, finaltime):
280
281            #Pass control on to outer loop for more specific actions
282            yield(t)
Note: See TracBrowser for help on using the repository browser.