source: anuga_core/source/anuga_parallel/parallel_shallow_water.py @ 6226

Last change on this file since 6226 was 5763, checked in by steve, 17 years ago

Fixed a few bugs in the parallel shallow water code. Seems to
be working for low level code. Need to check Ole' parallel api

File size: 10.1 KB
Line 
1"""Class Parallel_Shallow_Water_Domain -
22D triangular domains for finite-volume computations of
3the shallow water equation, with extra structures to allow
4communication between other Parallel_Domains and itself
5
6This module contains a specialisation of class Domain
7from module shallow_water.py
8
9Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
10Geoscience Australia, 2004-2005
11
12"""
13
14import logging, logging.config
15logger = logging.getLogger('parallel')
16logger.setLevel(logging.WARNING)
17
18try:
19    logging.config.fileConfig('log.ini')
20except:
21    pass
22
23
24from anuga.shallow_water.shallow_water_domain import *
25from Numeric import zeros, Float, Int, ones, allclose, array
26
27import pypar
28
29
30class Parallel_Domain(Domain):
31
32    def __init__(self, coordinates, vertices,
33                 boundary=None,
34                 full_send_dict=None,
35                 ghost_recv_dict=None,
36                 number_of_full_nodes=None,
37                 number_of_full_triangles=None):
38
39        Domain.__init__(self,
40                        coordinates,
41                        vertices,
42                        boundary,
43                        full_send_dict=full_send_dict,
44                        ghost_recv_dict=ghost_recv_dict,
45                        processor=pypar.rank(),
46                        numproc=pypar.size(),
47                        number_of_full_nodes=number_of_full_nodes,
48                        number_of_full_triangles=number_of_full_triangles)
49
50        N = len(self) # number_of_triangles
51
52#        self.processor = pypar.rank()
53#        self.numproc   = pypar.size()
54#
55#        # Setup Communication Buffers
56#        self.nsys = 3
57#        for key in full_send_dict:
58#            buffer_shape = full_send_dict[key][0].shape[0]
59#            full_send_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
60#
61#
62#        for key in ghost_recv_dict:
63#            buffer_shape = ghost_recv_dict[key][0].shape[0]
64#            ghost_recv_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
65#
66#        self.full_send_dict  = full_send_dict
67        self.ghost_recv_dict = ghost_recv_dict
68
69        # Buffers for synchronisation of timesteps
70        self.local_timestep = zeros(1, Float)
71        self.global_timestep = zeros(1, Float)
72
73        self.local_timesteps = zeros(self.numproc, Float)
74
75
76        self.communication_time = 0.0
77        self.communication_reduce_time = 0.0
78        self.communication_broadcast_time = 0.0
79
80       
81
82
83    def set_name(self, name):
84        """Assign name based on processor number
85        """
86
87        if name.endswith('.sww'):
88            name = name[:-4]
89
90        # Call parents method with processor number attached.
91        Domain.set_name(self, name + '_P%d_%d' %(self.processor, self.numproc))
92
93
94    def check_integrity(self):
95        Domain.check_integrity(self)
96
97        msg = 'Will need to check global and local numbering'
98        assert self.conserved_quantities[0] == 'stage', msg
99        assert self.conserved_quantities[1] == 'xmomentum', msg
100        assert self.conserved_quantities[2] == 'ymomentum', msg
101
102
103    def update_timestep_1(self, yieldstep, finaltime):
104        """Calculate local timestep using broadcasts
105        """
106
107        #LINDA:
108        # Moved below so timestep is found before doing update
109       
110        #Domain.update_timestep(self, yieldstep, finaltime)
111
112        import time
113
114
115        t0 = time.time()
116
117        #Broadcast local timestep from every processor to every other
118        for pid in range(self.numproc):
119            #print 'P%d calling broadcast from %d' %(self.processor, pid)
120            self.local_timestep[0] = self.flux_timestep
121            pypar.broadcast(self.local_timestep, pid, bypass=True)
122            self.local_timesteps[pid] = self.local_timestep[0]
123
124        self.flux_timestep = min(self.local_timesteps)
125
126        #print 'Flux Timestep %d P%d_%d' %(self.flux_timestep, self.processor, self.numproc)
127
128        pypar.barrier()
129        self.communication_broadcast_time += time.time()-t0
130
131        # LINDA:
132        # Moved timestep to here
133       
134        Domain.update_timestep(self, yieldstep, finaltime)
135
136
137    def update_timestep(self, yieldstep, finaltime):
138        """Calculate local timestep
139        """
140
141        # LINDA: Moved below so timestep is updated before
142        # calculating statistic
143       
144        #Compute minimal timestep on local process
145        #Domain.update_timestep(self, yieldstep, finaltime)
146
147        pypar.barrier()
148
149        import time
150        #Compute minimal timestep across all processes
151        self.local_timestep[0] = self.flux_timestep
152        use_reduce_broadcast = True
153        if use_reduce_broadcast:
154            t0 = time.time()
155            pypar.reduce(self.local_timestep, pypar.MIN, 0,
156                         buffer=self.global_timestep)#,
157                         #bypass=True)
158
159        else:
160            #Alternative: Try using straight send and receives
161            t0 = time.time()
162            self.global_timestep[0] = self.flux_timestep
163
164            if self.processor == 0:
165                for i in range(1, self.numproc):
166                    pypar.receive(i,
167                                  buffer=self.local_timestep,
168                                  bypass=True)
169
170                    if self.local_timestep[0] < self.global_timestep[0]:
171                        self.global_timestep[0] = self.local_timestep[0]
172            else:
173                pypar.send(self.local_timestep, 0,
174                           use_buffer=True, bypass=True)
175
176
177        self.communication_reduce_time += time.time()-t0
178
179
180        #Broadcast minimal timestep to all
181        t0 = time.time()
182        pypar.broadcast(self.global_timestep, 0)#,
183                        #bypass=True)
184
185        self.communication_broadcast_time += time.time()-t0
186
187        #old_timestep = self.flux_timestep
188        self.flux_timestep = self.global_timestep[0]
189        #print 'Flux Timestep %15.5e %15.5e P%d_%d' %(self.flux_timestep, old_timestep, self.processor, self.numproc)
190       
191        # LINDA:
192        # update local stats now
193       
194        #Compute minimal timestep on local process
195        Domain.update_timestep(self, yieldstep, finaltime)
196
197        # FIXME (Ole) We should update the variable min_timestep for use
198        # with write_time (or redo write_time)
199
200    #update_timestep = update_timestep_1
201
202    def update_ghosts(self):
203
204        # We must send the information from the full cells and
205        # receive the information for the ghost cells
206        # We have a dictionary of lists with ghosts expecting updates from
207        # the separate processors
208
209
210        from Numeric import take,put
211        import time
212        t0 = time.time()
213
214        # update of non-local ghost cells
215        for iproc in range(self.numproc):
216            if iproc == self.processor:
217                #Send data from iproc processor to other processors
218                for send_proc in self.full_send_dict:
219                    if send_proc != iproc:
220
221                        Idf  = self.full_send_dict[send_proc][0]
222                        Xout = self.full_send_dict[send_proc][2]
223
224                        for i, q in enumerate(self.conserved_quantities):
225                            #print 'Send',i,q
226                            Q_cv =  self.quantities[q].centroid_values
227                            Xout[:,i] = take(Q_cv, Idf)
228
229                        pypar.send(Xout, send_proc,
230                                   use_buffer=True, bypass = True)
231
232
233            else:
234                #Receive data from the iproc processor
235                if  self.ghost_recv_dict.has_key(iproc):
236
237                    Idg = self.ghost_recv_dict[iproc][0]
238                    X = self.ghost_recv_dict[iproc][2]
239
240                    X = pypar.receive(iproc, buffer=X, bypass = True)
241
242                    for i, q in enumerate(self.conserved_quantities):
243                        #print 'Receive',i,q
244                        Q_cv =  self.quantities[q].centroid_values
245                        put(Q_cv, Idg, X[:,i])
246
247        #local update of ghost cells
248        iproc = self.processor
249        if self.full_send_dict.has_key(iproc):
250
251            # LINDA:
252            # now store full as local id, global id, value
253            Idf  = self.full_send_dict[iproc][0]
254
255            # LINDA:
256            # now store ghost as local id, global id, value
257            Idg = self.ghost_recv_dict[iproc][0]
258
259            for i, q in enumerate(self.conserved_quantities):
260                #print 'LOCAL SEND RECEIVE',i,q
261                Q_cv =  self.quantities[q].centroid_values
262                put(Q_cv,     Idg, take(Q_cv,     Idf))
263
264        self.communication_time += time.time()-t0
265
266'''
267This was removed due to not beening required to be redefined in parallel_shallow_water
268the original "write_time" is good... however might need some small edits to work properly
269with parallel- Nick and Ole April 2007
270    def write_time(self):
271        if self.min_timestep == self.max_timestep:
272            print 'Processor %d/%d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
273                  %(self.processor, self.numproc,
274                    self.time, self.min_timestep, self.number_of_steps,
275                    self.number_of_first_order_steps)
276        elif self.min_timestep > self.max_timestep:
277            print 'Processor %d/%d, Time = %.4f, steps=%d (%d)'\
278                  %(self.processor, self.numproc,
279                    self.time, self.number_of_steps,
280                    self.number_of_first_order_steps)
281        else:
282            print 'Processor %d/%d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
283                  %(self.processor, self.numproc,
284                    self.time, self.min_timestep,
285                    self.max_timestep, self.number_of_steps,
286                    self.number_of_first_order_steps)
287'''
288
289# commented out on the 7/11/06
290#    def evolve(self, yieldstep=None, finaltime=None,
291#               skip_initial_step=False):
292#        """Specialisation of basic evolve method from parent class
293#        """
294
295        #Initialise real time viz if requested
296#        if self.time == 0.0:
297#            pass
298
299        #Call basic machinery from parent class
300#        for t in Domain.evolve(self, yieldstep, finaltime, skip_initial_step):
301
302            #Pass control on to outer loop for more specific actions
303#            yield(t)
Note: See TracBrowser for help on using the repository browser.