source: inundation/parallel/parallel_shallow_water.py @ 2225

Last change on this file since 2225 was 2090, checked in by linda, 19 years ago

Removed pypar test files and commented pmesh_divide

File size: 8.4 KB
RevLine 
[1558]1import sys
2from os import sep
3sys.path.append('..'+sep+'pyvolution')
4
5"""Class Parallel_Shallow_Water_Domain -
62D triangular domains for finite-volume computations of
7the shallow water equation, with extra structures to allow
8communication between other Parallel_Domains and itself
9
10This module contains a specialisation of class Domain
11from module shallow_water.py
12
13Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
14Geoscience Australia, 2004-2005
15"""
16
17import logging, logging.config
18logger = logging.getLogger('parallel')
19logger.setLevel(logging.WARNING)
20
21try:
22    logging.config.fileConfig('log.ini')
23except:
24    pass
25
26from shallow_water import *
27from Numeric import zeros, Float, Int, ones, allclose, array
28import pypar
29
30
[1575]31class Parallel_Domain(Domain):
[1558]32
33    def __init__(self, coordinates, vertices, boundary = None,
[1563]34                 full_send_dict = None, ghost_recv_dict = None):
[1558]35
36        self.processor = pypar.rank()
37        self.numproc   = pypar.size()
38
[1575]39        Domain.__init__(self, coordinates, vertices, boundary)
[1558]40
41        N = self.number_of_elements
42
43        self.processor = pypar.rank()
44        self.numproc   = pypar.size()
45
[1563]46        # Setup Communication Buffers
47        self.nsys = 3
48        for key in full_send_dict:
49            buffer_shape = full_send_dict[key][0].shape[0]
50            full_send_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
[1558]51
52
[1563]53        for key in ghost_recv_dict:
54            buffer_shape = ghost_recv_dict[key][0].shape[0]
55            ghost_recv_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
56
57        self.full_send_dict  = full_send_dict
[1558]58        self.ghost_recv_dict = ghost_recv_dict
59
[1598]60        # Buffers for synchronisation of timesteps
61        self.local_timestep = zeros(1, Float)
[1601]62        self.global_timestep = zeros(1, Float)
[1563]63
[1607]64        self.local_timesteps = zeros(self.numproc, Float)
65
66
[1558]67        self.communication_time = 0.0
68        self.communication_reduce_time = 0.0
[1607]69        self.communication_broadcast_time = 0.0
[1558]70
71
[1563]72
[1558]73    def check_integrity(self):
[1575]74        Domain.check_integrity(self)
[1558]75
76        msg = 'Will need to check global and local numbering'
77        assert self.conserved_quantities[0] == 'stage', msg
[1563]78        assert self.conserved_quantities[1] == 'xmomentum', msg
79        assert self.conserved_quantities[2] == 'ymomentum', msg
[1558]80
[1607]81
[1601]82    def update_timestep_1(self, yieldstep, finaltime):
83        """Calculate local timestep using broadcasts
84        """
[1558]85
86
[1601]87        Domain.update_timestep(self, yieldstep, finaltime)
88
89        import time
90
91
92        t0 = time.time()
[1607]93
[1601]94        #Broadcast local timestep from every processor to every other
95        for pid in range(self.numproc):
96            #print 'P%d calling broadcast from %d' %(self.processor, pid)
[1607]97            self.local_timestep[0] = self.timestep
[2090]98            pypar.broadcast(self.local_timestep, pid, bypass=True)           
[1607]99            self.local_timesteps[pid] = self.local_timestep[0]
[1601]100
101        self.timestep = min(self.local_timesteps)
102
[1603]103        pypar.barrier()
[1601]104        self.communication_broadcast_time += time.time()-t0
105
106
107
[1607]108
[1558]109    def update_timestep(self, yieldstep, finaltime):
[1603]110        """Calculate local timestep
111        """
[1558]112
[1607]113
[1603]114        #Compute minimal timestep on local process
[1575]115        Domain.update_timestep(self, yieldstep, finaltime)
[1558]116
[1607]117        pypar.barrier()
[1558]118
[1603]119        import time
120        #Compute minimal timestep across all processes
[1598]121        self.local_timestep[0] = self.timestep
122        use_reduce_broadcast = True
123        if use_reduce_broadcast:
[1607]124            t0 = time.time()
[1598]125            pypar.reduce(self.local_timestep, pypar.MIN, 0,
126                         buffer=self.global_timestep,
127                         bypass=True)
[1607]128
[1598]129        else:
[1603]130            #Alternative: Try using straight send and receives
[1607]131            t0 = time.time()
[1598]132            self.global_timestep[0] = self.timestep
[1607]133
[1598]134            if self.processor == 0:
135                for i in range(1, self.numproc):
136                    pypar.receive(i,
137                                  buffer=self.local_timestep,
138                                  bypass=True)
[1558]139
[1598]140                    if self.local_timestep[0] < self.global_timestep[0]:
141                        self.global_timestep[0] = self.local_timestep[0]
142            else:
143                pypar.send(self.local_timestep, 0,
144                           use_buffer=True, bypass=True)
[1558]145
[1603]146
147
148
[1607]149        self.communication_reduce_time += time.time()-t0
[1603]150
[1598]151
[1607]152
[1697]153
[1603]154        #Broadcast minimal timestep to all
[1607]155        t0 = time.time()
[1601]156        pypar.broadcast(self.global_timestep, 0,
157                        bypass=True)
[1607]158
[1601]159        self.communication_broadcast_time += time.time()-t0
160
[1607]161
[1598]162        self.timestep = self.global_timestep[0]
[1558]163
[1563]164
[1607]165    #update_timestep = update_timestep_1
[1601]166
[1558]167    def update_ghosts(self):
168
169        # We must send the information from the full cells and
170        # receive the information for the ghost cells
171        # We have a dictionary of lists with ghosts expecting updates from
172        # the separate processors
173
174
[1588]175        from Numeric import take,put
[1558]176        import time
177        t0 = time.time()
178
179        # update of non-local ghost cells
180        for iproc in range(self.numproc):
181            if iproc == self.processor:
182                #Send data from iproc processor to other processors
183                for send_proc in self.full_send_dict:
184                    if send_proc != iproc:
185
186                        Idf  = self.full_send_dict[send_proc][0]
187                        Xout = self.full_send_dict[send_proc][2]
188
[1697]189                        for i, q in enumerate(self.conserved_quantities):
190                            #print 'Send',i,q
191                            Q_cv =  self.quantities[q].centroid_values
192                            Xout[:,i] = take(Q_cv,     Idf)
[1558]193
[1598]194                        pypar.send(Xout, send_proc,
195                                   use_buffer=True, bypass = True)
[1558]196
197
198            else:
199                #Receive data from the iproc processor
200                if  self.ghost_recv_dict.has_key(iproc):
201
202                    Idg = self.ghost_recv_dict[iproc][0]
203                    X = self.ghost_recv_dict[iproc][2]
204
[1598]205                    X = pypar.receive(iproc, buffer=X, bypass = True)
[1558]206
[1697]207                    for i, q in enumerate(self.conserved_quantities):
208                        #print 'Receive',i,q
209                        Q_cv =  self.quantities[q].centroid_values
210                        put(Q_cv,     Idg, X[:,i])
[1558]211
212        #local update of ghost cells
213        iproc = self.processor
214        if self.full_send_dict.has_key(iproc):
215
216            # LINDA:
217            # now store full as local id, global id, value
218            Idf  = self.full_send_dict[iproc][0]
219
220            # LINDA:
221            # now store ghost as local id, global id, value
222            Idg = self.ghost_recv_dict[iproc][0]
223
[1697]224            for i, q in enumerate(self.conserved_quantities):
225                #print 'LOCAL SEND RECEIVE',i,q
226                Q_cv =  self.quantities[q].centroid_values
227                put(Q_cv,     Idg, take(Q_cv,     Idf))
[1558]228
229        self.communication_time += time.time()-t0
230
231
232    def write_time(self):
233        if self.min_timestep == self.max_timestep:
234            print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
235                  %(self.processor, self.time, self.min_timestep, self.number_of_steps,
236                    self.number_of_first_order_steps)
237        elif self.min_timestep > self.max_timestep:
238            print 'Processor %d, Time = %.4f, steps=%d (%d)'\
239                  %(self.processor, self.time, self.number_of_steps,
240                    self.number_of_first_order_steps)
241        else:
242            print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
243                  %(self.processor, self.time, self.min_timestep,
244                    self.max_timestep, self.number_of_steps,
245                    self.number_of_first_order_steps)
246
247
248    def evolve(self, yieldstep = None, finaltime = None):
249        """Specialisation of basic evolve method from parent class
250        """
251
252        #Initialise real time viz if requested
253        if self.time == 0.0:
254            pass
255
256        #Call basic machinery from parent class
[1575]257        for t in Domain.evolve(self, yieldstep, finaltime):
[1558]258
259            #Pass control on to outer loop for more specific actions
260            yield(t)
Note: See TracBrowser for help on using the repository browser.