source: anuga_core/source/anuga_parallel/parallel_shallow_water.py @ 4686

Last change on this file since 4686 was 4398, checked in by nick, 18 years ago

commented out "write_time" and since parallel_shallow_water.py inherits for shallow_water_domain.py, "write_time" will also come from Domain in shallow_water_domain.py

This was done by svn merge -r4394:4395 .

File size: 9.8 KB
Line 
1"""Class Parallel_Shallow_Water_Domain -
22D triangular domains for finite-volume computations of
3the shallow water equation, with extra structures to allow
4communication between other Parallel_Domains and itself
5
6This module contains a specialisation of class Domain
7from module shallow_water.py
8
9Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
10Geoscience Australia, 2004-2005
11
12"""
13
14import logging, logging.config
15logger = logging.getLogger('parallel')
16logger.setLevel(logging.WARNING)
17
18try:
19    logging.config.fileConfig('log.ini')
20except:
21    pass
22
23
24from anuga.shallow_water.shallow_water_domain import *
25from Numeric import zeros, Float, Int, ones, allclose, array
26
27import pypar
28
29
30class Parallel_Domain(Domain):
31
32    def __init__(self, coordinates, vertices,
33                 boundary=None,
34                 full_send_dict=None,
35                 ghost_recv_dict=None,
36                 number_of_full_nodes=None,
37                 number_of_full_triangles=None):
38
39        Domain.__init__(self,
40                        coordinates,
41                        vertices,
42                        boundary,
43                        full_send_dict=full_send_dict,
44                        ghost_recv_dict=ghost_recv_dict,
45                        processor=pypar.rank(),
46                        numproc=pypar.size(),
47                        number_of_full_nodes=number_of_full_nodes,
48                        number_of_full_triangles=number_of_full_triangles)
49
50        N = len(self) # number_of_triangles
51
52#        self.processor = pypar.rank()
53#        self.numproc   = pypar.size()
54#
55#        # Setup Communication Buffers
56#        self.nsys = 3
57#        for key in full_send_dict:
58#            buffer_shape = full_send_dict[key][0].shape[0]
59#            full_send_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
60#
61#
62#        for key in ghost_recv_dict:
63#            buffer_shape = ghost_recv_dict[key][0].shape[0]
64#            ghost_recv_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
65#
66#        self.full_send_dict  = full_send_dict
67        self.ghost_recv_dict = ghost_recv_dict
68
69        # Buffers for synchronisation of timesteps
70        self.local_timestep = zeros(1, Float)
71        self.global_timestep = zeros(1, Float)
72
73        self.local_timesteps = zeros(self.numproc, Float)
74
75
76        self.communication_time = 0.0
77        self.communication_reduce_time = 0.0
78        self.communication_broadcast_time = 0.0
79
80       
81
82
83    def set_name(self, name):
84        """Assign name based on processor number
85        """
86
87        if name.endswith('.sww'):
88            name = name[:-4]
89
90        # Call parents method with processor number attached.
91        Domain.set_name(self, name + '_P%d_%d' %(self.processor, self.numproc))
92
93
94    def check_integrity(self):
95        Domain.check_integrity(self)
96
97        msg = 'Will need to check global and local numbering'
98        assert self.conserved_quantities[0] == 'stage', msg
99        assert self.conserved_quantities[1] == 'xmomentum', msg
100        assert self.conserved_quantities[2] == 'ymomentum', msg
101
102
103    def update_timestep_1(self, yieldstep, finaltime):
104        """Calculate local timestep using broadcasts
105        """
106
107        #LINDA:
108        # Moved below so timestep is found before doing update
109       
110        #Domain.update_timestep(self, yieldstep, finaltime)
111
112        import time
113
114
115        t0 = time.time()
116
117        #Broadcast local timestep from every processor to every other
118        for pid in range(self.numproc):
119            #print 'P%d calling broadcast from %d' %(self.processor, pid)
120            self.local_timestep[0] = self.timestep
121            pypar.broadcast(self.local_timestep, pid, bypass=True)
122            self.local_timesteps[pid] = self.local_timestep[0]
123
124        self.timestep = min(self.local_timesteps)
125
126        pypar.barrier()
127        self.communication_broadcast_time += time.time()-t0
128
129        # LINDA:
130        # Moved timestep to here
131       
132        Domain.update_timestep(self, yieldstep, finaltime)
133
134
135    def update_timestep(self, yieldstep, finaltime):
136        """Calculate local timestep
137        """
138
139        # LINDA: Moved below so timestep is updated before
140        # calculating statistic
141       
142        #Compute minimal timestep on local process
143        #Domain.update_timestep(self, yieldstep, finaltime)
144
145        pypar.barrier()
146
147        import time
148        #Compute minimal timestep across all processes
149        self.local_timestep[0] = self.timestep
150        use_reduce_broadcast = True
151        if use_reduce_broadcast:
152            t0 = time.time()
153            pypar.reduce(self.local_timestep, pypar.MIN, 0,
154                         buffer=self.global_timestep,
155                         bypass=True)
156
157        else:
158            #Alternative: Try using straight send and receives
159            t0 = time.time()
160            self.global_timestep[0] = self.timestep
161
162            if self.processor == 0:
163                for i in range(1, self.numproc):
164                    pypar.receive(i,
165                                  buffer=self.local_timestep,
166                                  bypass=True)
167
168                    if self.local_timestep[0] < self.global_timestep[0]:
169                        self.global_timestep[0] = self.local_timestep[0]
170            else:
171                pypar.send(self.local_timestep, 0,
172                           use_buffer=True, bypass=True)
173
174
175        self.communication_reduce_time += time.time()-t0
176
177
178        #Broadcast minimal timestep to all
179        t0 = time.time()
180        pypar.broadcast(self.global_timestep, 0,
181                        bypass=True)
182
183        self.communication_broadcast_time += time.time()-t0
184
185
186        self.timestep = self.global_timestep[0]
187       
188        # LINDA:
189        # update local stats now
190       
191        #Compute minimal timestep on local process
192        Domain.update_timestep(self, yieldstep, finaltime)
193
194        # FIXME (Ole) We should update the variable min_timestep for use
195        # with write_time (or redo write_time)
196
197    #update_timestep = update_timestep_1
198
199    def update_ghosts(self):
200
201        # We must send the information from the full cells and
202        # receive the information for the ghost cells
203        # We have a dictionary of lists with ghosts expecting updates from
204        # the separate processors
205
206
207        from Numeric import take,put
208        import time
209        t0 = time.time()
210
211        # update of non-local ghost cells
212        for iproc in range(self.numproc):
213            if iproc == self.processor:
214                #Send data from iproc processor to other processors
215                for send_proc in self.full_send_dict:
216                    if send_proc != iproc:
217
218                        Idf  = self.full_send_dict[send_proc][0]
219                        Xout = self.full_send_dict[send_proc][2]
220
221                        for i, q in enumerate(self.conserved_quantities):
222                            #print 'Send',i,q
223                            Q_cv =  self.quantities[q].centroid_values
224                            Xout[:,i] = take(Q_cv, Idf)
225
226                        pypar.send(Xout, send_proc,
227                                   use_buffer=True, bypass = True)
228
229
230            else:
231                #Receive data from the iproc processor
232                if  self.ghost_recv_dict.has_key(iproc):
233
234                    Idg = self.ghost_recv_dict[iproc][0]
235                    X = self.ghost_recv_dict[iproc][2]
236
237                    X = pypar.receive(iproc, buffer=X, bypass = True)
238
239                    for i, q in enumerate(self.conserved_quantities):
240                        #print 'Receive',i,q
241                        Q_cv =  self.quantities[q].centroid_values
242                        put(Q_cv, Idg, X[:,i])
243
244        #local update of ghost cells
245        iproc = self.processor
246        if self.full_send_dict.has_key(iproc):
247
248            # LINDA:
249            # now store full as local id, global id, value
250            Idf  = self.full_send_dict[iproc][0]
251
252            # LINDA:
253            # now store ghost as local id, global id, value
254            Idg = self.ghost_recv_dict[iproc][0]
255
256            for i, q in enumerate(self.conserved_quantities):
257                #print 'LOCAL SEND RECEIVE',i,q
258                Q_cv =  self.quantities[q].centroid_values
259                put(Q_cv,     Idg, take(Q_cv,     Idf))
260
261        self.communication_time += time.time()-t0
262
263'''
264This was removed due to not beening required to be redefined in parallel_shallow_water
265the original "write_time" is good... however might need some small edits to work properly
266with parallel- Nick and Ole April 2007
267    def write_time(self):
268        if self.min_timestep == self.max_timestep:
269            print 'Processor %d/%d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
270                  %(self.processor, self.numproc,
271                    self.time, self.min_timestep, self.number_of_steps,
272                    self.number_of_first_order_steps)
273        elif self.min_timestep > self.max_timestep:
274            print 'Processor %d/%d, Time = %.4f, steps=%d (%d)'\
275                  %(self.processor, self.numproc,
276                    self.time, self.number_of_steps,
277                    self.number_of_first_order_steps)
278        else:
279            print 'Processor %d/%d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
280                  %(self.processor, self.numproc,
281                    self.time, self.min_timestep,
282                    self.max_timestep, self.number_of_steps,
283                    self.number_of_first_order_steps)
284'''
285
286# commented out on the 7/11/06
287#    def evolve(self, yieldstep=None, finaltime=None,
288#               skip_initial_step=False):
289#        """Specialisation of basic evolve method from parent class
290#        """
291
292        #Initialise real time viz if requested
293#        if self.time == 0.0:
294#            pass
295
296        #Call basic machinery from parent class
297#        for t in Domain.evolve(self, yieldstep, finaltime, skip_initial_step):
298
299            #Pass control on to outer loop for more specific actions
300#            yield(t)
Note: See TracBrowser for help on using the repository browser.