source: anuga_core/source/anuga_parallel/parallel_shallow_water.py @ 3580

Last change on this file since 3580 was 3580, checked in by ole, 18 years ago

Resolved

File size: 9.1 KB
Line 
1"""Class Parallel_Shallow_Water_Domain -
22D triangular domains for finite-volume computations of
3the shallow water equation, with extra structures to allow
4communication between other Parallel_Domains and itself
5
6This module contains a specialisation of class Domain
7from module shallow_water.py
8
9Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
10Geoscience Australia, 2004-2005
11
12"""
13
14import logging, logging.config
15logger = logging.getLogger('parallel')
16logger.setLevel(logging.WARNING)
17
18try:
19    logging.config.fileConfig('log.ini')
20except:
21    pass
22
23
24from anuga.shallow_water.shallow_water_domain import *
25from Numeric import zeros, Float, Int, ones, allclose, array
26
27import pypar
28
29
30class Parallel_Domain(Domain):
31
32    def __init__(self, coordinates, vertices, boundary = None,
33                 full_send_dict = None, ghost_recv_dict = None):
34
35        Domain.__init__(self,
36                        coordinates,
37                        vertices,
38                        boundary,
39                        full_send_dict=full_send_dict,
40                        ghost_recv_dict=ghost_recv_dict,
41                        processor=pypar.rank(),
42                        numproc=pypar.size())
43
44        N = self.number_of_elements
45
46#        self.processor = pypar.rank()
47#        self.numproc   = pypar.size()
48#
49#        # Setup Communication Buffers
50#        self.nsys = 3
51#        for key in full_send_dict:
52#            buffer_shape = full_send_dict[key][0].shape[0]
53#            full_send_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
54#
55#
56#        for key in ghost_recv_dict:
57#            buffer_shape = ghost_recv_dict[key][0].shape[0]
58#            ghost_recv_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
59#
60#        self.full_send_dict  = full_send_dict
61        self.ghost_recv_dict = ghost_recv_dict
62
63        # Buffers for synchronisation of timesteps
64        self.local_timestep = zeros(1, Float)
65        self.global_timestep = zeros(1, Float)
66
67        self.local_timesteps = zeros(self.numproc, Float)
68
69
70        self.communication_time = 0.0
71        self.communication_reduce_time = 0.0
72        self.communication_broadcast_time = 0.0
73
74       
75
76
77    def set_name(self, name):
78        """Assign name based on processor number
79        """
80
81        # Call parents method with processor number attached.
82        Domain.set_name(self, name + '_P%d_%d' %(self.processor, self.numproc))
83
84
85    def check_integrity(self):
86        Domain.check_integrity(self)
87
88        msg = 'Will need to check global and local numbering'
89        assert self.conserved_quantities[0] == 'stage', msg
90        assert self.conserved_quantities[1] == 'xmomentum', msg
91        assert self.conserved_quantities[2] == 'ymomentum', msg
92
93
94    def update_timestep_1(self, yieldstep, finaltime):
95        """Calculate local timestep using broadcasts
96        """
97
98        #LINDA:
99        # Moved below so timestep is found before doing update
100       
101        #Domain.update_timestep(self, yieldstep, finaltime)
102
103        import time
104
105
106        t0 = time.time()
107
108        #Broadcast local timestep from every processor to every other
109        for pid in range(self.numproc):
110            #print 'P%d calling broadcast from %d' %(self.processor, pid)
111            self.local_timestep[0] = self.timestep
112            pypar.broadcast(self.local_timestep, pid, bypass=True)
113            self.local_timesteps[pid] = self.local_timestep[0]
114
115        self.timestep = min(self.local_timesteps)
116
117        pypar.barrier()
118        self.communication_broadcast_time += time.time()-t0
119
120        # LINDA:
121        # Moved timestep to here
122       
123        Domain.update_timestep(self, yieldstep, finaltime)
124
125
126    def update_timestep(self, yieldstep, finaltime):
127        """Calculate local timestep
128        """
129
130        # LINDA: Moved below so timestep is updated before
131        # calculating statistic
132       
133        #Compute minimal timestep on local process
134        #Domain.update_timestep(self, yieldstep, finaltime)
135
136        pypar.barrier()
137
138        import time
139        #Compute minimal timestep across all processes
140        self.local_timestep[0] = self.timestep
141        use_reduce_broadcast = True
142        if use_reduce_broadcast:
143            t0 = time.time()
144            pypar.reduce(self.local_timestep, pypar.MIN, 0,
145                         buffer=self.global_timestep,
146                         bypass=True)
147
148        else:
149            #Alternative: Try using straight send and receives
150            t0 = time.time()
151            self.global_timestep[0] = self.timestep
152
153            if self.processor == 0:
154                for i in range(1, self.numproc):
155                    pypar.receive(i,
156                                  buffer=self.local_timestep,
157                                  bypass=True)
158
159                    if self.local_timestep[0] < self.global_timestep[0]:
160                        self.global_timestep[0] = self.local_timestep[0]
161            else:
162                pypar.send(self.local_timestep, 0,
163                           use_buffer=True, bypass=True)
164
165
166        self.communication_reduce_time += time.time()-t0
167
168
169        #Broadcast minimal timestep to all
170        t0 = time.time()
171        pypar.broadcast(self.global_timestep, 0,
172                        bypass=True)
173
174        self.communication_broadcast_time += time.time()-t0
175
176
177        self.timestep = self.global_timestep[0]
178       
179        # LINDA:
180        # update local stats now
181       
182        #Compute minimal timestep on local process
183        Domain.update_timestep(self, yieldstep, finaltime)
184
185        # FIXME (Ole) We should update the variable min_timestep for use
186        # with write_time (or redo write_time)
187
188    #update_timestep = update_timestep_1
189
190    def update_ghosts(self):
191
192        # We must send the information from the full cells and
193        # receive the information for the ghost cells
194        # We have a dictionary of lists with ghosts expecting updates from
195        # the separate processors
196
197
198        from Numeric import take,put
199        import time
200        t0 = time.time()
201
202        # update of non-local ghost cells
203        for iproc in range(self.numproc):
204            if iproc == self.processor:
205                #Send data from iproc processor to other processors
206                for send_proc in self.full_send_dict:
207                    if send_proc != iproc:
208
209                        Idf  = self.full_send_dict[send_proc][0]
210                        Xout = self.full_send_dict[send_proc][2]
211
212                        for i, q in enumerate(self.conserved_quantities):
213                            #print 'Send',i,q
214                            Q_cv =  self.quantities[q].centroid_values
215                            Xout[:,i] = take(Q_cv,     Idf)
216
217                        pypar.send(Xout, send_proc,
218                                   use_buffer=True, bypass = True)
219
220
221            else:
222                #Receive data from the iproc processor
223                if  self.ghost_recv_dict.has_key(iproc):
224
225                    Idg = self.ghost_recv_dict[iproc][0]
226                    X = self.ghost_recv_dict[iproc][2]
227
228                    X = pypar.receive(iproc, buffer=X, bypass = True)
229
230                    for i, q in enumerate(self.conserved_quantities):
231                        #print 'Receive',i,q
232                        Q_cv =  self.quantities[q].centroid_values
233                        put(Q_cv,     Idg, X[:,i])
234
235        #local update of ghost cells
236        iproc = self.processor
237        if self.full_send_dict.has_key(iproc):
238
239            # LINDA:
240            # now store full as local id, global id, value
241            Idf  = self.full_send_dict[iproc][0]
242
243            # LINDA:
244            # now store ghost as local id, global id, value
245            Idg = self.ghost_recv_dict[iproc][0]
246
247            for i, q in enumerate(self.conserved_quantities):
248                #print 'LOCAL SEND RECEIVE',i,q
249                Q_cv =  self.quantities[q].centroid_values
250                put(Q_cv,     Idg, take(Q_cv,     Idf))
251
252        self.communication_time += time.time()-t0
253
254
255    def write_time(self):
256        if self.min_timestep == self.max_timestep:
257            print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
258                  %(self.processor, self.time, self.min_timestep, self.number_of_steps,
259                    self.number_of_first_order_steps)
260        elif self.min_timestep > self.max_timestep:
261            print 'Processor %d, Time = %.4f, steps=%d (%d)'\
262                  %(self.processor, self.time, self.number_of_steps,
263                    self.number_of_first_order_steps)
264        else:
265            print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
266                  %(self.processor, self.time, self.min_timestep,
267                    self.max_timestep, self.number_of_steps,
268                    self.number_of_first_order_steps)
269
270
271    def evolve(self, yieldstep = None, finaltime = None):
272        """Specialisation of basic evolve method from parent class
273        """
274
275        #Initialise real time viz if requested
276        if self.time == 0.0:
277            pass
278
279        #Call basic machinery from parent class
280        for t in Domain.evolve(self, yieldstep, finaltime):
281
282            #Pass control on to outer loop for more specific actions
283            yield(t)
Note: See TracBrowser for help on using the repository browser.