source: anuga_core/source/anuga_parallel/parallel_shallow_water.py @ 3926

Last change on this file since 3926 was 3926, checked in by ole, 17 years ago

First step towards keeping track of full nodes and triangles in
parallel domains.

File size: 9.5 KB
Line 
1"""Class Parallel_Shallow_Water_Domain -
22D triangular domains for finite-volume computations of
3the shallow water equation, with extra structures to allow
4communication between other Parallel_Domains and itself
5
6This module contains a specialisation of class Domain
7from module shallow_water.py
8
9Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
10Geoscience Australia, 2004-2005
11
12"""
13
14import logging, logging.config
15logger = logging.getLogger('parallel')
16logger.setLevel(logging.WARNING)
17
18try:
19    logging.config.fileConfig('log.ini')
20except:
21    pass
22
23
24from anuga.shallow_water.shallow_water_domain import *
25from Numeric import zeros, Float, Int, ones, allclose, array
26
27import pypar
28
29
30class Parallel_Domain(Domain):
31
32    def __init__(self, coordinates, vertices,
33                 boundary=None,
34                 full_send_dict=None,
35                 ghost_recv_dict=None,
36                 number_of_full_nodes=0,
37                 number_of_full_triangles=0):
38
39        Domain.__init__(self,
40                        coordinates,
41                        vertices,
42                        boundary,
43                        full_send_dict=full_send_dict,
44                        ghost_recv_dict=ghost_recv_dict,
45                        processor=pypar.rank(),
46                        numproc=pypar.size(),
47                        number_of_full_nodes=number_of_full_nodes,
48                        number_of_full_triangles=number_of_full_triangles)
49
50        N = self.number_of_elements
51
52#        self.processor = pypar.rank()
53#        self.numproc   = pypar.size()
54#
55#        # Setup Communication Buffers
56#        self.nsys = 3
57#        for key in full_send_dict:
58#            buffer_shape = full_send_dict[key][0].shape[0]
59#            full_send_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
60#
61#
62#        for key in ghost_recv_dict:
63#            buffer_shape = ghost_recv_dict[key][0].shape[0]
64#            ghost_recv_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
65#
66#        self.full_send_dict  = full_send_dict
67        self.ghost_recv_dict = ghost_recv_dict
68
69        # Buffers for synchronisation of timesteps
70        self.local_timestep = zeros(1, Float)
71        self.global_timestep = zeros(1, Float)
72
73        self.local_timesteps = zeros(self.numproc, Float)
74
75
76        self.communication_time = 0.0
77        self.communication_reduce_time = 0.0
78        self.communication_broadcast_time = 0.0
79
80       
81
82
83    def set_name(self, name):
84        """Assign name based on processor number
85        """
86
87        if name.endswith('.sww'):
88            name = name[:-4]
89
90        # Call parents method with processor number attached.
91        Domain.set_name(self, name + '_P%d_%d' %(self.processor, self.numproc))
92
93
94    def check_integrity(self):
95        Domain.check_integrity(self)
96
97        msg = 'Will need to check global and local numbering'
98        assert self.conserved_quantities[0] == 'stage', msg
99        assert self.conserved_quantities[1] == 'xmomentum', msg
100        assert self.conserved_quantities[2] == 'ymomentum', msg
101
102
103    def update_timestep_1(self, yieldstep, finaltime):
104        """Calculate local timestep using broadcasts
105        """
106
107        #LINDA:
108        # Moved below so timestep is found before doing update
109       
110        #Domain.update_timestep(self, yieldstep, finaltime)
111
112        import time
113
114
115        t0 = time.time()
116
117        #Broadcast local timestep from every processor to every other
118        for pid in range(self.numproc):
119            #print 'P%d calling broadcast from %d' %(self.processor, pid)
120            self.local_timestep[0] = self.timestep
121            pypar.broadcast(self.local_timestep, pid, bypass=True)
122            self.local_timesteps[pid] = self.local_timestep[0]
123
124        self.timestep = min(self.local_timesteps)
125
126        pypar.barrier()
127        self.communication_broadcast_time += time.time()-t0
128
129        # LINDA:
130        # Moved timestep to here
131       
132        Domain.update_timestep(self, yieldstep, finaltime)
133
134
135    def update_timestep(self, yieldstep, finaltime):
136        """Calculate local timestep
137        """
138
139        # LINDA: Moved below so timestep is updated before
140        # calculating statistic
141       
142        #Compute minimal timestep on local process
143        #Domain.update_timestep(self, yieldstep, finaltime)
144
145        pypar.barrier()
146
147        import time
148        #Compute minimal timestep across all processes
149        self.local_timestep[0] = self.timestep
150        use_reduce_broadcast = True
151        if use_reduce_broadcast:
152            t0 = time.time()
153            pypar.reduce(self.local_timestep, pypar.MIN, 0,
154                         buffer=self.global_timestep,
155                         bypass=True)
156
157        else:
158            #Alternative: Try using straight send and receives
159            t0 = time.time()
160            self.global_timestep[0] = self.timestep
161
162            if self.processor == 0:
163                for i in range(1, self.numproc):
164                    pypar.receive(i,
165                                  buffer=self.local_timestep,
166                                  bypass=True)
167
168                    if self.local_timestep[0] < self.global_timestep[0]:
169                        self.global_timestep[0] = self.local_timestep[0]
170            else:
171                pypar.send(self.local_timestep, 0,
172                           use_buffer=True, bypass=True)
173
174
175        self.communication_reduce_time += time.time()-t0
176
177
178        #Broadcast minimal timestep to all
179        t0 = time.time()
180        pypar.broadcast(self.global_timestep, 0,
181                        bypass=True)
182
183        self.communication_broadcast_time += time.time()-t0
184
185
186        self.timestep = self.global_timestep[0]
187       
188        # LINDA:
189        # update local stats now
190       
191        #Compute minimal timestep on local process
192        Domain.update_timestep(self, yieldstep, finaltime)
193
194        # FIXME (Ole) We should update the variable min_timestep for use
195        # with write_time (or redo write_time)
196
197    #update_timestep = update_timestep_1
198
199    def update_ghosts(self):
200
201        # We must send the information from the full cells and
202        # receive the information for the ghost cells
203        # We have a dictionary of lists with ghosts expecting updates from
204        # the separate processors
205
206
207        from Numeric import take,put
208        import time
209        t0 = time.time()
210
211        # update of non-local ghost cells
212        for iproc in range(self.numproc):
213            if iproc == self.processor:
214                #Send data from iproc processor to other processors
215                for send_proc in self.full_send_dict:
216                    if send_proc != iproc:
217
218                        Idf  = self.full_send_dict[send_proc][0]
219                        Xout = self.full_send_dict[send_proc][2]
220
221                        for i, q in enumerate(self.conserved_quantities):
222                            #print 'Send',i,q
223                            Q_cv =  self.quantities[q].centroid_values
224                            Xout[:,i] = take(Q_cv, Idf)
225
226                        pypar.send(Xout, send_proc,
227                                   use_buffer=True, bypass = True)
228
229
230            else:
231                #Receive data from the iproc processor
232                if  self.ghost_recv_dict.has_key(iproc):
233
234                    Idg = self.ghost_recv_dict[iproc][0]
235                    X = self.ghost_recv_dict[iproc][2]
236
237                    X = pypar.receive(iproc, buffer=X, bypass = True)
238
239                    for i, q in enumerate(self.conserved_quantities):
240                        #print 'Receive',i,q
241                        Q_cv =  self.quantities[q].centroid_values
242                        put(Q_cv, Idg, X[:,i])
243
244        #local update of ghost cells
245        iproc = self.processor
246        if self.full_send_dict.has_key(iproc):
247
248            # LINDA:
249            # now store full as local id, global id, value
250            Idf  = self.full_send_dict[iproc][0]
251
252            # LINDA:
253            # now store ghost as local id, global id, value
254            Idg = self.ghost_recv_dict[iproc][0]
255
256            for i, q in enumerate(self.conserved_quantities):
257                #print 'LOCAL SEND RECEIVE',i,q
258                Q_cv =  self.quantities[q].centroid_values
259                put(Q_cv,     Idg, take(Q_cv,     Idf))
260
261        self.communication_time += time.time()-t0
262
263
264    def write_time(self):
265        if self.min_timestep == self.max_timestep:
266            print 'Processor %d/%d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
267                  %(self.processor, self.numproc,
268                    self.time, self.min_timestep, self.number_of_steps,
269                    self.number_of_first_order_steps)
270        elif self.min_timestep > self.max_timestep:
271            print 'Processor %d/%d, Time = %.4f, steps=%d (%d)'\
272                  %(self.processor, self.numproc,
273                    self.time, self.number_of_steps,
274                    self.number_of_first_order_steps)
275        else:
276            print 'Processor %d/%d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
277                  %(self.processor, self.numproc,
278                    self.time, self.min_timestep,
279                    self.max_timestep, self.number_of_steps,
280                    self.number_of_first_order_steps)
281
282
283    def evolve(self, yieldstep = None, finaltime = None):
284        """Specialisation of basic evolve method from parent class
285        """
286
287        #Initialise real time viz if requested
288        if self.time == 0.0:
289            pass
290
291        #Call basic machinery from parent class
292        for t in Domain.evolve(self, yieldstep, finaltime):
293
294            #Pass control on to outer loop for more specific actions
295            yield(t)
Note: See TracBrowser for help on using the repository browser.