source: inundation/parallel/parallel_shallow_water.py @ 3431

Last change on this file since 3431 was 3431, checked in by ole, 18 years ago

Comment

File size: 9.3 KB
Line 
1import sys
2from os import sep
3
4# FIXME: This should be removed. Set evironment variable PYTHONPATH to
5# directory ...../anuga/inundation instead
6#sys.path.append('..'+sep+'pyvolution')
7
8"""Class Parallel_Shallow_Water_Domain -
92D triangular domains for finite-volume computations of
10the shallow water equation, with extra structures to allow
11communication between other Parallel_Domains and itself
12
13This module contains a specialisation of class Domain
14from module shallow_water.py
15
16Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
17Geoscience Australia, 2004-2005
18"""
19
20import logging, logging.config
21logger = logging.getLogger('parallel')
22logger.setLevel(logging.WARNING)
23
24try:
25    logging.config.fileConfig('log.ini')
26except:
27    pass
28
29from pyvolution.shallow_water import *
30from Numeric import zeros, Float, Int, ones, allclose, array
31
32from pypar_dist import pypar
33
34
35class Parallel_Domain(Domain):
36
37    def __init__(self, coordinates, vertices, boundary = None,
38                 full_send_dict = None, ghost_recv_dict = None):
39
40        Domain.__init__(self,
41                        coordinates,
42                        vertices,
43                        boundary,
44                        full_send_dict=full_send_dict,
45                        ghost_recv_dict=ghost_recv_dict,
46                        processor=pypar.rank(),
47                        numproc=pypar.size())
48
49        N = self.number_of_elements
50
51#        self.processor = pypar.rank()
52#        self.numproc   = pypar.size()
53#
54#        # Setup Communication Buffers
55#        self.nsys = 3
56#        for key in full_send_dict:
57#            buffer_shape = full_send_dict[key][0].shape[0]
58#            full_send_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
59#
60#
61#        for key in ghost_recv_dict:
62#            buffer_shape = ghost_recv_dict[key][0].shape[0]
63#            ghost_recv_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
64#
65#        self.full_send_dict  = full_send_dict
66        self.ghost_recv_dict = ghost_recv_dict
67
68        # Buffers for synchronisation of timesteps
69        self.local_timestep = zeros(1, Float)
70        self.global_timestep = zeros(1, Float)
71
72        self.local_timesteps = zeros(self.numproc, Float)
73
74
75        self.communication_time = 0.0
76        self.communication_reduce_time = 0.0
77        self.communication_broadcast_time = 0.0
78
79
80    def set_name(self, name):
81        """Assign name based on processor number
82        """
83
84        # Call parents method with processor number attached.
85        Domain.set_name(self, name + '_P%d_%d' %(self.processor, self.numproc))
86
87        #FIXME (Ole): Roll this into domain.py. Only change name if numproc>0 
88
89    def check_integrity(self):
90        Domain.check_integrity(self)
91
92        msg = 'Will need to check global and local numbering'
93        assert self.conserved_quantities[0] == 'stage', msg
94        assert self.conserved_quantities[1] == 'xmomentum', msg
95        assert self.conserved_quantities[2] == 'ymomentum', msg
96
97
98    def update_timestep_1(self, yieldstep, finaltime):
99        """Calculate local timestep using broadcasts
100        """
101
102        #LINDA:
103        # Moved below so timestep is found before doing update
104       
105        #Domain.update_timestep(self, yieldstep, finaltime)
106
107        import time
108
109
110        t0 = time.time()
111
112        #Broadcast local timestep from every processor to every other
113        for pid in range(self.numproc):
114            #print 'P%d calling broadcast from %d' %(self.processor, pid)
115            self.local_timestep[0] = self.timestep
116            pypar.broadcast(self.local_timestep, pid, bypass=True)
117            self.local_timesteps[pid] = self.local_timestep[0]
118
119        self.timestep = min(self.local_timesteps)
120
121        pypar.barrier()
122        self.communication_broadcast_time += time.time()-t0
123
124        # LINDA:
125        # Moved timestep to here
126       
127        Domain.update_timestep(self, yieldstep, finaltime)
128
129
130    def update_timestep(self, yieldstep, finaltime):
131        """Calculate local timestep
132        """
133
134        # LINDA: Moved below so timestep is updated before
135        # calculating statistic
136       
137        #Compute minimal timestep on local process
138        #Domain.update_timestep(self, yieldstep, finaltime)
139
140        pypar.barrier()
141
142        import time
143        #Compute minimal timestep across all processes
144        self.local_timestep[0] = self.timestep
145        use_reduce_broadcast = True
146        if use_reduce_broadcast:
147            t0 = time.time()
148            pypar.reduce(self.local_timestep, pypar.MIN, 0,
149                         buffer=self.global_timestep,
150                         bypass=True)
151
152        else:
153            #Alternative: Try using straight send and receives
154            t0 = time.time()
155            self.global_timestep[0] = self.timestep
156
157            if self.processor == 0:
158                for i in range(1, self.numproc):
159                    pypar.receive(i,
160                                  buffer=self.local_timestep,
161                                  bypass=True)
162
163                    if self.local_timestep[0] < self.global_timestep[0]:
164                        self.global_timestep[0] = self.local_timestep[0]
165            else:
166                pypar.send(self.local_timestep, 0,
167                           use_buffer=True, bypass=True)
168
169
170        self.communication_reduce_time += time.time()-t0
171
172
173        #Broadcast minimal timestep to all
174        t0 = time.time()
175        pypar.broadcast(self.global_timestep, 0,
176                        bypass=True)
177
178        self.communication_broadcast_time += time.time()-t0
179
180
181        self.timestep = self.global_timestep[0]
182       
183        # LINDA:
184        # update local stats now
185       
186        #Compute minimal timestep on local process
187        Domain.update_timestep(self, yieldstep, finaltime)
188
189        # FIXME (Ole) We should update the variable min_timestep for use
190        # with write_time (or redo write_time)
191
192    #update_timestep = update_timestep_1
193
194    def update_ghosts(self):
195
196        # We must send the information from the full cells and
197        # receive the information for the ghost cells
198        # We have a dictionary of lists with ghosts expecting updates from
199        # the separate processors
200
201
202        from Numeric import take,put
203        import time
204        t0 = time.time()
205
206        # update of non-local ghost cells
207        for iproc in range(self.numproc):
208            if iproc == self.processor:
209                #Send data from iproc processor to other processors
210                for send_proc in self.full_send_dict:
211                    if send_proc != iproc:
212
213                        Idf  = self.full_send_dict[send_proc][0]
214                        Xout = self.full_send_dict[send_proc][2]
215
216                        for i, q in enumerate(self.conserved_quantities):
217                            #print 'Send',i,q
218                            Q_cv =  self.quantities[q].centroid_values
219                            Xout[:,i] = take(Q_cv,     Idf)
220
221                        pypar.send(Xout, send_proc,
222                                   use_buffer=True, bypass = True)
223
224
225            else:
226                #Receive data from the iproc processor
227                if  self.ghost_recv_dict.has_key(iproc):
228
229                    Idg = self.ghost_recv_dict[iproc][0]
230                    X = self.ghost_recv_dict[iproc][2]
231
232                    X = pypar.receive(iproc, buffer=X, bypass = True)
233
234                    for i, q in enumerate(self.conserved_quantities):
235                        #print 'Receive',i,q
236                        Q_cv =  self.quantities[q].centroid_values
237                        put(Q_cv,     Idg, X[:,i])
238
239        #local update of ghost cells
240        iproc = self.processor
241        if self.full_send_dict.has_key(iproc):
242
243            # LINDA:
244            # now store full as local id, global id, value
245            Idf  = self.full_send_dict[iproc][0]
246
247            # LINDA:
248            # now store ghost as local id, global id, value
249            Idg = self.ghost_recv_dict[iproc][0]
250
251            for i, q in enumerate(self.conserved_quantities):
252                #print 'LOCAL SEND RECEIVE',i,q
253                Q_cv =  self.quantities[q].centroid_values
254                put(Q_cv,     Idg, take(Q_cv,     Idf))
255
256        self.communication_time += time.time()-t0
257
258
259    def write_time(self):
260        if self.min_timestep == self.max_timestep:
261            print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
262                  %(self.processor, self.time, self.min_timestep, self.number_of_steps,
263                    self.number_of_first_order_steps)
264        elif self.min_timestep > self.max_timestep:
265            print 'Processor %d, Time = %.4f, steps=%d (%d)'\
266                  %(self.processor, self.time, self.number_of_steps,
267                    self.number_of_first_order_steps)
268        else:
269            print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
270                  %(self.processor, self.time, self.min_timestep,
271                    self.max_timestep, self.number_of_steps,
272                    self.number_of_first_order_steps)
273
274
275    def evolve(self, yieldstep = None, finaltime = None):
276        """Specialisation of basic evolve method from parent class
277        """
278
279        #Initialise real time viz if requested
280        if self.time == 0.0:
281            pass
282
283        #Call basic machinery from parent class
284        for t in Domain.evolve(self, yieldstep, finaltime):
285
286            #Pass control on to outer loop for more specific actions
287            yield(t)
Note: See TracBrowser for help on using the repository browser.