source: anuga_core/source/anuga_parallel/parallel_shallow_water.py @ 7400

Last change on this file since 7400 was 7400, checked in by steve, 15 years ago

Commit a working copy of numpy version of build_commun

File size: 10.1 KB
Line 
1"""Class Parallel_Shallow_Water_Domain -
22D triangular domains for finite-volume computations of
3the shallow water equation, with extra structures to allow
4communication between other Parallel_Domains and itself
5
6This module contains a specialisation of class Domain
7from module shallow_water.py
8
9Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
10Geoscience Australia, 2004-2005
11
12"""
13
14import logging, logging.config
15logger = logging.getLogger('parallel')
16logger.setLevel(logging.WARNING)
17
18try:
19    logging.config.fileConfig('log.ini')
20except:
21    pass
22
23
24from anuga.shallow_water.shallow_water_domain import *
25
26
27import numpy as num
28
29import pypar
30
31
32class Parallel_Domain(Domain):
33
34    def __init__(self, coordinates, vertices,
35                 boundary=None,
36                 full_send_dict=None,
37                 ghost_recv_dict=None,
38                 number_of_full_nodes=None,
39                 number_of_full_triangles=None):
40
41        Domain.__init__(self,
42                        coordinates,
43                        vertices,
44                        boundary,
45                        full_send_dict=full_send_dict,
46                        ghost_recv_dict=ghost_recv_dict,
47                        processor=pypar.rank(),
48                        numproc=pypar.size(),
49                        number_of_full_nodes=number_of_full_nodes,
50                        number_of_full_triangles=number_of_full_triangles)
51
52        N = len(self) # number_of_triangles
53
54#        self.processor = pypar.rank()
55#        self.numproc   = pypar.size()
56#
57#        # Setup Communication Buffers
58#        self.nsys = 3
59#        for key in full_send_dict:
60#            buffer_shape = full_send_dict[key][0].shape[0]
61#            full_send_dict[key].append(num.zeros( (buffer_shape,self.nsys) ,num.loat))
62#
63#
64#        for key in ghost_recv_dict:
65#            buffer_shape = ghost_recv_dict[key][0].shape[0]
66#            ghost_recv_dict[key].append(num.zeros( (buffer_shape,self.nsys) ,num.float))
67#
68#        self.full_send_dict  = full_send_dict
69        self.ghost_recv_dict = ghost_recv_dict
70
71        # Buffers for synchronisation of timesteps
72        self.local_timestep = num.zeros(1, num.float)
73        self.global_timestep = num.zeros(1, num.float)
74
75        self.local_timesteps = num.zeros(self.numproc, num.loat)
76
77
78        self.communication_time = 0.0
79        self.communication_reduce_time = 0.0
80        self.communication_broadcast_time = 0.0
81
82       
83
84
85    def set_name(self, name):
86        """Assign name based on processor number
87        """
88
89        if name.endswith('.sww'):
90            name = name[:-4]
91
92        # Call parents method with processor number attached.
93        Domain.set_name(self, name + '_P%d_%d' %(self.processor, self.numproc))
94
95
96    def check_integrity(self):
97        Domain.check_integrity(self)
98
99        msg = 'Will need to check global and local numbering'
100        assert self.conserved_quantities[0] == 'stage', msg
101        assert self.conserved_quantities[1] == 'xmomentum', msg
102        assert self.conserved_quantities[2] == 'ymomentum', msg
103
104
105    def update_timestep_1(self, yieldstep, finaltime):
106        """Calculate local timestep using broadcasts
107        """
108
109        #LINDA:
110        # Moved below so timestep is found before doing update
111       
112        #Domain.update_timestep(self, yieldstep, finaltime)
113
114        import time
115
116
117        t0 = time.time()
118
119        #Broadcast local timestep from every processor to every other
120        for pid in range(self.numproc):
121            #print 'P%d calling broadcast from %d' %(self.processor, pid)
122            self.local_timestep[0] = self.flux_timestep
123            pypar.broadcast(self.local_timestep, pid, bypass=True)
124            self.local_timesteps[pid] = self.local_timestep[0]
125
126        self.flux_timestep = min(self.local_timesteps)
127
128        #print 'Flux Timestep %d P%d_%d' %(self.flux_timestep, self.processor, self.numproc)
129
130        pypar.barrier()
131        self.communication_broadcast_time += time.time()-t0
132
133        # LINDA:
134        # Moved timestep to here
135       
136        Domain.update_timestep(self, yieldstep, finaltime)
137
138
139    def update_timestep(self, yieldstep, finaltime):
140        """Calculate local timestep
141        """
142
143        # LINDA: Moved below so timestep is updated before
144        # calculating statistic
145       
146        #Compute minimal timestep on local process
147        #Domain.update_timestep(self, yieldstep, finaltime)
148
149        pypar.barrier()
150
151        import time
152        #Compute minimal timestep across all processes
153        self.local_timestep[0] = self.flux_timestep
154        use_reduce_broadcast = True
155        if use_reduce_broadcast:
156            t0 = time.time()
157            pypar.reduce(self.local_timestep, pypar.MIN, 0,
158                         buffer=self.global_timestep)#,
159                         #bypass=True)
160
161        else:
162            #Alternative: Try using straight send and receives
163            t0 = time.time()
164            self.global_timestep[0] = self.flux_timestep
165
166            if self.processor == 0:
167                for i in range(1, self.numproc):
168                    pypar.receive(i,
169                                  buffer=self.local_timestep,
170                                  bypass=True)
171
172                    if self.local_timestep[0] < self.global_timestep[0]:
173                        self.global_timestep[0] = self.local_timestep[0]
174            else:
175                pypar.send(self.local_timestep, 0,
176                           use_buffer=True, bypass=True)
177
178
179        self.communication_reduce_time += time.time()-t0
180
181
182        #Broadcast minimal timestep to all
183        t0 = time.time()
184        pypar.broadcast(self.global_timestep, 0)#,
185                        #bypass=True)
186
187        self.communication_broadcast_time += time.time()-t0
188
189        #old_timestep = self.flux_timestep
190        self.flux_timestep = self.global_timestep[0]
191        #print 'Flux Timestep %15.5e %15.5e P%d_%d' %(self.flux_timestep, old_timestep, self.processor, self.numproc)
192       
193        # LINDA:
194        # update local stats now
195       
196        #Compute minimal timestep on local process
197        Domain.update_timestep(self, yieldstep, finaltime)
198
199        # FIXME (Ole) We should update the variable min_timestep for use
200        # with write_time (or redo write_time)
201
202    #update_timestep = update_timestep_1
203
204    def update_ghosts(self):
205
206        # We must send the information from the full cells and
207        # receive the information for the ghost cells
208        # We have a dictionary of lists with ghosts expecting updates from
209        # the separate processors
210
211
212        from Numeric import take,put
213        import time
214        t0 = time.time()
215
216        # update of non-local ghost cells
217        for iproc in range(self.numproc):
218            if iproc == self.processor:
219                #Send data from iproc processor to other processors
220                for send_proc in self.full_send_dict:
221                    if send_proc != iproc:
222
223                        Idf  = self.full_send_dict[send_proc][0]
224                        Xout = self.full_send_dict[send_proc][2]
225
226                        for i, q in enumerate(self.conserved_quantities):
227                            #print 'Send',i,q
228                            Q_cv =  self.quantities[q].centroid_values
229                            Xout[:,i] = take(Q_cv, Idf)
230
231                        pypar.send(Xout, send_proc,
232                                   use_buffer=True, bypass = True)
233
234
235            else:
236                #Receive data from the iproc processor
237                if  self.ghost_recv_dict.has_key(iproc):
238
239                    Idg = self.ghost_recv_dict[iproc][0]
240                    X = self.ghost_recv_dict[iproc][2]
241
242                    X = pypar.receive(iproc, buffer=X, bypass = True)
243
244                    for i, q in enumerate(self.conserved_quantities):
245                        #print 'Receive',i,q
246                        Q_cv =  self.quantities[q].centroid_values
247                        put(Q_cv, Idg, X[:,i])
248
249        #local update of ghost cells
250        iproc = self.processor
251        if self.full_send_dict.has_key(iproc):
252
253            # LINDA:
254            # now store full as local id, global id, value
255            Idf  = self.full_send_dict[iproc][0]
256
257            # LINDA:
258            # now store ghost as local id, global id, value
259            Idg = self.ghost_recv_dict[iproc][0]
260
261            for i, q in enumerate(self.conserved_quantities):
262                #print 'LOCAL SEND RECEIVE',i,q
263                Q_cv =  self.quantities[q].centroid_values
264                put(Q_cv,     Idg, take(Q_cv,     Idf))
265
266        self.communication_time += time.time()-t0
267
268'''
269This was removed due to not beening required to be redefined in parallel_shallow_water
270the original "write_time" is good... however might need some small edits to work properly
271with parallel- Nick and Ole April 2007
272    def write_time(self):
273        if self.min_timestep == self.max_timestep:
274            print 'Processor %d/%d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
275                  %(self.processor, self.numproc,
276                    self.time, self.min_timestep, self.number_of_steps,
277                    self.number_of_first_order_steps)
278        elif self.min_timestep > self.max_timestep:
279            print 'Processor %d/%d, Time = %.4f, steps=%d (%d)'\
280                  %(self.processor, self.numproc,
281                    self.time, self.number_of_steps,
282                    self.number_of_first_order_steps)
283        else:
284            print 'Processor %d/%d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
285                  %(self.processor, self.numproc,
286                    self.time, self.min_timestep,
287                    self.max_timestep, self.number_of_steps,
288                    self.number_of_first_order_steps)
289'''
290
291# commented out on the 7/11/06
292#    def evolve(self, yieldstep=None, finaltime=None,
293#               skip_initial_step=False):
294#        """Specialisation of basic evolve method from parent class
295#        """
296
297        #Initialise real time viz if requested
298#        if self.time == 0.0:
299#            pass
300
301        #Call basic machinery from parent class
302#        for t in Domain.evolve(self, yieldstep, finaltime, skip_initial_step):
303
304            #Pass control on to outer loop for more specific actions
305#            yield(t)
Note: See TracBrowser for help on using the repository browser.