source: inundation/parallel/parallel_shallow_water.py @ 2813

Last change on this file since 2813 was 2813, checked in by steve, 18 years ago

Moving ghosts into domain.py

File size: 8.6 KB
Line 
1import sys
2from os import sep
3sys.path.append('..'+sep+'pyvolution')
4
5"""Class Parallel_Shallow_Water_Domain -
62D triangular domains for finite-volume computations of
7the shallow water equation, with extra structures to allow
8communication between other Parallel_Domains and itself
9
10This module contains a specialisation of class Domain
11from module shallow_water.py
12
13Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
14Geoscience Australia, 2004-2005
15"""
16
17import logging, logging.config
18logger = logging.getLogger('parallel')
19logger.setLevel(logging.WARNING)
20
21try:
22    logging.config.fileConfig('log.ini')
23except:
24    pass
25
26from shallow_water import *
27from Numeric import zeros, Float, Int, ones, allclose, array
28import pypar
29
30
31class Parallel_Domain(Domain):
32
33    def __init__(self, coordinates, vertices, boundary = None,
34                 full_send_dict = None, ghost_recv_dict = None):
35
36        Domain.__init__(self,
37                        coordinates,
38                        vertices,
39                        boundary,
40                        full_send_dict=full_send_dict,
41                        ghost_recv_dict=ghost_recv_dict,
42                        processor=pypar.rank(),
43                        numproc=pypar.size())
44
45        N = self.number_of_elements
46
47#        self.processor = pypar.rank()
48#        self.numproc   = pypar.size()
49#
50#        # Setup Communication Buffers
51#        self.nsys = 3
52#        for key in full_send_dict:
53#            buffer_shape = full_send_dict[key][0].shape[0]
54#            full_send_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
55#
56#
57#        for key in ghost_recv_dict:
58#            buffer_shape = ghost_recv_dict[key][0].shape[0]
59#            ghost_recv_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
60#
61#        self.full_send_dict  = full_send_dict
62        self.ghost_recv_dict = ghost_recv_dict
63
64        # Buffers for synchronisation of timesteps
65        self.local_timestep = zeros(1, Float)
66        self.global_timestep = zeros(1, Float)
67
68        self.local_timesteps = zeros(self.numproc, Float)
69
70
71        self.communication_time = 0.0
72        self.communication_reduce_time = 0.0
73        self.communication_broadcast_time = 0.0
74
75
76
77    def check_integrity(self):
78        Domain.check_integrity(self)
79
80        msg = 'Will need to check global and local numbering'
81        assert self.conserved_quantities[0] == 'stage', msg
82        assert self.conserved_quantities[1] == 'xmomentum', msg
83        assert self.conserved_quantities[2] == 'ymomentum', msg
84
85
86    def update_timestep_1(self, yieldstep, finaltime):
87        """Calculate local timestep using broadcasts
88        """
89
90
91        Domain.update_timestep(self, yieldstep, finaltime)
92
93        import time
94
95
96        t0 = time.time()
97
98        #Broadcast local timestep from every processor to every other
99        for pid in range(self.numproc):
100            #print 'P%d calling broadcast from %d' %(self.processor, pid)
101            self.local_timestep[0] = self.timestep
102            pypar.broadcast(self.local_timestep, pid, bypass=True)
103            self.local_timesteps[pid] = self.local_timestep[0]
104
105        self.timestep = min(self.local_timesteps)
106
107        pypar.barrier()
108        self.communication_broadcast_time += time.time()-t0
109
110
111
112
113    def update_timestep(self, yieldstep, finaltime):
114        """Calculate local timestep
115        """
116
117
118        #Compute minimal timestep on local process
119        Domain.update_timestep(self, yieldstep, finaltime)
120
121        pypar.barrier()
122
123        import time
124        #Compute minimal timestep across all processes
125        self.local_timestep[0] = self.timestep
126        use_reduce_broadcast = True
127        if use_reduce_broadcast:
128            t0 = time.time()
129            pypar.reduce(self.local_timestep, pypar.MIN, 0,
130                         buffer=self.global_timestep,
131                         bypass=True)
132
133        else:
134            #Alternative: Try using straight send and receives
135            t0 = time.time()
136            self.global_timestep[0] = self.timestep
137
138            if self.processor == 0:
139                for i in range(1, self.numproc):
140                    pypar.receive(i,
141                                  buffer=self.local_timestep,
142                                  bypass=True)
143
144                    if self.local_timestep[0] < self.global_timestep[0]:
145                        self.global_timestep[0] = self.local_timestep[0]
146            else:
147                pypar.send(self.local_timestep, 0,
148                           use_buffer=True, bypass=True)
149
150
151
152
153        self.communication_reduce_time += time.time()-t0
154
155
156
157
158        #Broadcast minimal timestep to all
159        t0 = time.time()
160        pypar.broadcast(self.global_timestep, 0,
161                        bypass=True)
162
163        self.communication_broadcast_time += time.time()-t0
164
165
166        self.timestep = self.global_timestep[0]
167
168
169    #update_timestep = update_timestep_1
170
171    def update_ghosts(self):
172
173        # We must send the information from the full cells and
174        # receive the information for the ghost cells
175        # We have a dictionary of lists with ghosts expecting updates from
176        # the separate processors
177
178
179        from Numeric import take,put
180        import time
181        t0 = time.time()
182
183        # update of non-local ghost cells
184        for iproc in range(self.numproc):
185            if iproc == self.processor:
186                #Send data from iproc processor to other processors
187                for send_proc in self.full_send_dict:
188                    if send_proc != iproc:
189
190                        Idf  = self.full_send_dict[send_proc][0]
191                        Xout = self.full_send_dict[send_proc][2]
192
193                        for i, q in enumerate(self.conserved_quantities):
194                            #print 'Send',i,q
195                            Q_cv =  self.quantities[q].centroid_values
196                            Xout[:,i] = take(Q_cv,     Idf)
197
198                        pypar.send(Xout, send_proc,
199                                   use_buffer=True, bypass = True)
200
201
202            else:
203                #Receive data from the iproc processor
204                if  self.ghost_recv_dict.has_key(iproc):
205
206                    Idg = self.ghost_recv_dict[iproc][0]
207                    X = self.ghost_recv_dict[iproc][2]
208
209                    X = pypar.receive(iproc, buffer=X, bypass = True)
210
211                    for i, q in enumerate(self.conserved_quantities):
212                        #print 'Receive',i,q
213                        Q_cv =  self.quantities[q].centroid_values
214                        put(Q_cv,     Idg, X[:,i])
215
216        #local update of ghost cells
217        iproc = self.processor
218        if self.full_send_dict.has_key(iproc):
219
220            # LINDA:
221            # now store full as local id, global id, value
222            Idf  = self.full_send_dict[iproc][0]
223
224            # LINDA:
225            # now store ghost as local id, global id, value
226            Idg = self.ghost_recv_dict[iproc][0]
227
228            for i, q in enumerate(self.conserved_quantities):
229                #print 'LOCAL SEND RECEIVE',i,q
230                Q_cv =  self.quantities[q].centroid_values
231                put(Q_cv,     Idg, take(Q_cv,     Idf))
232
233        self.communication_time += time.time()-t0
234
235
236    def write_time(self):
237        if self.min_timestep == self.max_timestep:
238            print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
239                  %(self.processor, self.time, self.min_timestep, self.number_of_steps,
240                    self.number_of_first_order_steps)
241        elif self.min_timestep > self.max_timestep:
242            print 'Processor %d, Time = %.4f, steps=%d (%d)'\
243                  %(self.processor, self.time, self.number_of_steps,
244                    self.number_of_first_order_steps)
245        else:
246            print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
247                  %(self.processor, self.time, self.min_timestep,
248                    self.max_timestep, self.number_of_steps,
249                    self.number_of_first_order_steps)
250
251
252    def evolve(self, yieldstep = None, finaltime = None):
253        """Specialisation of basic evolve method from parent class
254        """
255
256        #Initialise real time viz if requested
257        if self.time == 0.0:
258            pass
259
260        #Call basic machinery from parent class
261        for t in Domain.evolve(self, yieldstep, finaltime):
262
263            #Pass control on to outer loop for more specific actions
264            yield(t)
Note: See TracBrowser for help on using the repository browser.