source: inundation/parallel/parallel_shallow_water.py @ 3184

Last change on this file since 3184 was 3117, checked in by ole, 19 years ago

Added number of processors to sww filename

File size: 8.9 KB
Line 
1import sys
2from os import sep
3
4# FIXME: This should be removed. Set evironment variable PYTHONPATH to
5# directory ...../anuga/inundation instead
6#sys.path.append('..'+sep+'pyvolution')
7
8"""Class Parallel_Shallow_Water_Domain -
92D triangular domains for finite-volume computations of
10the shallow water equation, with extra structures to allow
11communication between other Parallel_Domains and itself
12
13This module contains a specialisation of class Domain
14from module shallow_water.py
15
16Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
17Geoscience Australia, 2004-2005
18"""
19
20import logging, logging.config
21logger = logging.getLogger('parallel')
22logger.setLevel(logging.WARNING)
23
24try:
25    logging.config.fileConfig('log.ini')
26except:
27    pass
28
29from pyvolution.shallow_water import *
30from Numeric import zeros, Float, Int, ones, allclose, array
31from pypar_dist import pypar
32
33
34class Parallel_Domain(Domain):
35
36    def __init__(self, coordinates, vertices, boundary = None,
37                 full_send_dict = None, ghost_recv_dict = None):
38
39        Domain.__init__(self,
40                        coordinates,
41                        vertices,
42                        boundary,
43                        full_send_dict=full_send_dict,
44                        ghost_recv_dict=ghost_recv_dict,
45                        processor=pypar.rank(),
46                        numproc=pypar.size())
47
48        N = self.number_of_elements
49
50#        self.processor = pypar.rank()
51#        self.numproc   = pypar.size()
52#
53#        # Setup Communication Buffers
54#        self.nsys = 3
55#        for key in full_send_dict:
56#            buffer_shape = full_send_dict[key][0].shape[0]
57#            full_send_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
58#
59#
60#        for key in ghost_recv_dict:
61#            buffer_shape = ghost_recv_dict[key][0].shape[0]
62#            ghost_recv_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
63#
64#        self.full_send_dict  = full_send_dict
65        self.ghost_recv_dict = ghost_recv_dict
66
67        # Buffers for synchronisation of timesteps
68        self.local_timestep = zeros(1, Float)
69        self.global_timestep = zeros(1, Float)
70
71        self.local_timesteps = zeros(self.numproc, Float)
72
73
74        self.communication_time = 0.0
75        self.communication_reduce_time = 0.0
76        self.communication_broadcast_time = 0.0
77
78
79    def set_name(self, name):
80        """Assign name based on processor number
81        """
82
83        # Call parents method with processor number attached.
84        Domain.set_name(self, name + '_P%d_%d' %(self.processor, self.numproc))
85
86
87    def check_integrity(self):
88        Domain.check_integrity(self)
89
90        msg = 'Will need to check global and local numbering'
91        assert self.conserved_quantities[0] == 'stage', msg
92        assert self.conserved_quantities[1] == 'xmomentum', msg
93        assert self.conserved_quantities[2] == 'ymomentum', msg
94
95
96    def update_timestep_1(self, yieldstep, finaltime):
97        """Calculate local timestep using broadcasts
98        """
99
100
101        Domain.update_timestep(self, yieldstep, finaltime)
102
103        import time
104
105
106        t0 = time.time()
107
108        #Broadcast local timestep from every processor to every other
109        for pid in range(self.numproc):
110            #print 'P%d calling broadcast from %d' %(self.processor, pid)
111            self.local_timestep[0] = self.timestep
112            pypar.broadcast(self.local_timestep, pid, bypass=True)
113            self.local_timesteps[pid] = self.local_timestep[0]
114
115        self.timestep = min(self.local_timesteps)
116
117        pypar.barrier()
118        self.communication_broadcast_time += time.time()-t0
119
120
121
122
123    def update_timestep(self, yieldstep, finaltime):
124        """Calculate local timestep
125        """
126       
127        #Compute minimal timestep on local process
128        Domain.update_timestep(self, yieldstep, finaltime)
129
130        pypar.barrier()
131
132        import time
133        #Compute minimal timestep across all processes
134        self.local_timestep[0] = self.timestep
135        use_reduce_broadcast = True
136        if use_reduce_broadcast:
137            t0 = time.time()
138            pypar.reduce(self.local_timestep, pypar.MIN, 0,
139                         buffer=self.global_timestep,
140                         bypass=True)
141
142        else:
143            #Alternative: Try using straight send and receives
144            t0 = time.time()
145            self.global_timestep[0] = self.timestep
146
147            if self.processor == 0:
148                for i in range(1, self.numproc):
149                    pypar.receive(i,
150                                  buffer=self.local_timestep,
151                                  bypass=True)
152
153                    if self.local_timestep[0] < self.global_timestep[0]:
154                        self.global_timestep[0] = self.local_timestep[0]
155            else:
156                pypar.send(self.local_timestep, 0,
157                           use_buffer=True, bypass=True)
158
159
160        self.communication_reduce_time += time.time()-t0
161
162
163        #Broadcast minimal timestep to all
164        t0 = time.time()
165        pypar.broadcast(self.global_timestep, 0,
166                        bypass=True)
167
168        self.communication_broadcast_time += time.time()-t0
169
170
171        self.timestep = self.global_timestep[0]
172
173
174    #update_timestep = update_timestep_1
175
176    def update_ghosts(self):
177
178        # We must send the information from the full cells and
179        # receive the information for the ghost cells
180        # We have a dictionary of lists with ghosts expecting updates from
181        # the separate processors
182
183
184        from Numeric import take,put
185        import time
186        t0 = time.time()
187
188        # update of non-local ghost cells
189        for iproc in range(self.numproc):
190            if iproc == self.processor:
191                #Send data from iproc processor to other processors
192                for send_proc in self.full_send_dict:
193                    if send_proc != iproc:
194
195                        Idf  = self.full_send_dict[send_proc][0]
196                        Xout = self.full_send_dict[send_proc][2]
197
198                        for i, q in enumerate(self.conserved_quantities):
199                            #print 'Send',i,q
200                            Q_cv =  self.quantities[q].centroid_values
201                            Xout[:,i] = take(Q_cv,     Idf)
202
203                        pypar.send(Xout, send_proc,
204                                   use_buffer=True, bypass = True)
205
206
207            else:
208                #Receive data from the iproc processor
209                if  self.ghost_recv_dict.has_key(iproc):
210
211                    Idg = self.ghost_recv_dict[iproc][0]
212                    X = self.ghost_recv_dict[iproc][2]
213
214                    X = pypar.receive(iproc, buffer=X, bypass = True)
215
216                    for i, q in enumerate(self.conserved_quantities):
217                        #print 'Receive',i,q
218                        Q_cv =  self.quantities[q].centroid_values
219                        put(Q_cv,     Idg, X[:,i])
220
221        #local update of ghost cells
222        iproc = self.processor
223        if self.full_send_dict.has_key(iproc):
224
225            # LINDA:
226            # now store full as local id, global id, value
227            Idf  = self.full_send_dict[iproc][0]
228
229            # LINDA:
230            # now store ghost as local id, global id, value
231            Idg = self.ghost_recv_dict[iproc][0]
232
233            for i, q in enumerate(self.conserved_quantities):
234                #print 'LOCAL SEND RECEIVE',i,q
235                Q_cv =  self.quantities[q].centroid_values
236                put(Q_cv,     Idg, take(Q_cv,     Idf))
237
238        self.communication_time += time.time()-t0
239
240
241    def write_time(self):
242        if self.min_timestep == self.max_timestep:
243            print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
244                  %(self.processor, self.time, self.min_timestep, self.number_of_steps,
245                    self.number_of_first_order_steps)
246        elif self.min_timestep > self.max_timestep:
247            print 'Processor %d, Time = %.4f, steps=%d (%d)'\
248                  %(self.processor, self.time, self.number_of_steps,
249                    self.number_of_first_order_steps)
250        else:
251            print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
252                  %(self.processor, self.time, self.min_timestep,
253                    self.max_timestep, self.number_of_steps,
254                    self.number_of_first_order_steps)
255
256
257    def evolve(self, yieldstep = None, finaltime = None):
258        """Specialisation of basic evolve method from parent class
259        """
260
261        #Initialise real time viz if requested
262        if self.time == 0.0:
263            pass
264
265        #Call basic machinery from parent class
266        for t in Domain.evolve(self, yieldstep, finaltime):
267
268            #Pass control on to outer loop for more specific actions
269            yield(t)
Note: See TracBrowser for help on using the repository browser.