source: inundation/parallel/parallel_shallow_water.py @ 3095

Last change on this file since 3095 was 3090, checked in by ole, 19 years ago

Changed mesh to neighbour_mesh

File size: 8.7 KB
Line 
1import sys
2from os import sep
3
4# FIXME: This should be removed. Set evironment variable PYTHONPATH to
5# directory ...../anuga/inundation instead
6sys.path.append('..'+sep+'pyvolution')
7
8"""Class Parallel_Shallow_Water_Domain -
92D triangular domains for finite-volume computations of
10the shallow water equation, with extra structures to allow
11communication between other Parallel_Domains and itself
12
13This module contains a specialisation of class Domain
14from module shallow_water.py
15
16Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
17Geoscience Australia, 2004-2005
18"""
19
20import logging, logging.config
21logger = logging.getLogger('parallel')
22logger.setLevel(logging.WARNING)
23
24try:
25    logging.config.fileConfig('log.ini')
26except:
27    pass
28
29from pyvolution.shallow_water import *
30from Numeric import zeros, Float, Int, ones, allclose, array
31import pypar
32
33
34class Parallel_Domain(Domain):
35
36    def __init__(self, coordinates, vertices, boundary = None,
37                 full_send_dict = None, ghost_recv_dict = None):
38
39        Domain.__init__(self,
40                        coordinates,
41                        vertices,
42                        boundary,
43                        full_send_dict=full_send_dict,
44                        ghost_recv_dict=ghost_recv_dict,
45                        processor=pypar.rank(),
46                        numproc=pypar.size())
47
48        N = self.number_of_elements
49
50#        self.processor = pypar.rank()
51#        self.numproc   = pypar.size()
52#
53#        # Setup Communication Buffers
54#        self.nsys = 3
55#        for key in full_send_dict:
56#            buffer_shape = full_send_dict[key][0].shape[0]
57#            full_send_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
58#
59#
60#        for key in ghost_recv_dict:
61#            buffer_shape = ghost_recv_dict[key][0].shape[0]
62#            ghost_recv_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
63#
64#        self.full_send_dict  = full_send_dict
65        self.ghost_recv_dict = ghost_recv_dict
66
67        # Buffers for synchronisation of timesteps
68        self.local_timestep = zeros(1, Float)
69        self.global_timestep = zeros(1, Float)
70
71        self.local_timesteps = zeros(self.numproc, Float)
72
73
74        self.communication_time = 0.0
75        self.communication_reduce_time = 0.0
76        self.communication_broadcast_time = 0.0
77
78
79
80    def check_integrity(self):
81        Domain.check_integrity(self)
82
83        msg = 'Will need to check global and local numbering'
84        assert self.conserved_quantities[0] == 'stage', msg
85        assert self.conserved_quantities[1] == 'xmomentum', msg
86        assert self.conserved_quantities[2] == 'ymomentum', msg
87
88
89    def update_timestep_1(self, yieldstep, finaltime):
90        """Calculate local timestep using broadcasts
91        """
92
93
94        Domain.update_timestep(self, yieldstep, finaltime)
95
96        import time
97
98
99        t0 = time.time()
100
101        #Broadcast local timestep from every processor to every other
102        for pid in range(self.numproc):
103            #print 'P%d calling broadcast from %d' %(self.processor, pid)
104            self.local_timestep[0] = self.timestep
105            pypar.broadcast(self.local_timestep, pid, bypass=True)
106            self.local_timesteps[pid] = self.local_timestep[0]
107
108        self.timestep = min(self.local_timesteps)
109
110        pypar.barrier()
111        self.communication_broadcast_time += time.time()-t0
112
113
114
115
116    def update_timestep(self, yieldstep, finaltime):
117        """Calculate local timestep
118        """
119       
120        #Compute minimal timestep on local process
121        Domain.update_timestep(self, yieldstep, finaltime)
122
123        pypar.barrier()
124
125        import time
126        #Compute minimal timestep across all processes
127        self.local_timestep[0] = self.timestep
128        use_reduce_broadcast = True
129        if use_reduce_broadcast:
130            t0 = time.time()
131            pypar.reduce(self.local_timestep, pypar.MIN, 0,
132                         buffer=self.global_timestep,
133                         bypass=True)
134
135        else:
136            #Alternative: Try using straight send and receives
137            t0 = time.time()
138            self.global_timestep[0] = self.timestep
139
140            if self.processor == 0:
141                for i in range(1, self.numproc):
142                    pypar.receive(i,
143                                  buffer=self.local_timestep,
144                                  bypass=True)
145
146                    if self.local_timestep[0] < self.global_timestep[0]:
147                        self.global_timestep[0] = self.local_timestep[0]
148            else:
149                pypar.send(self.local_timestep, 0,
150                           use_buffer=True, bypass=True)
151
152
153        self.communication_reduce_time += time.time()-t0
154
155
156        #Broadcast minimal timestep to all
157        t0 = time.time()
158        pypar.broadcast(self.global_timestep, 0,
159                        bypass=True)
160
161        self.communication_broadcast_time += time.time()-t0
162
163
164        self.timestep = self.global_timestep[0]
165
166
167    #update_timestep = update_timestep_1
168
169    def update_ghosts(self):
170
171        # We must send the information from the full cells and
172        # receive the information for the ghost cells
173        # We have a dictionary of lists with ghosts expecting updates from
174        # the separate processors
175
176
177        from Numeric import take,put
178        import time
179        t0 = time.time()
180
181        # update of non-local ghost cells
182        for iproc in range(self.numproc):
183            if iproc == self.processor:
184                #Send data from iproc processor to other processors
185                for send_proc in self.full_send_dict:
186                    if send_proc != iproc:
187
188                        Idf  = self.full_send_dict[send_proc][0]
189                        Xout = self.full_send_dict[send_proc][2]
190
191                        for i, q in enumerate(self.conserved_quantities):
192                            #print 'Send',i,q
193                            Q_cv =  self.quantities[q].centroid_values
194                            Xout[:,i] = take(Q_cv,     Idf)
195
196                        pypar.send(Xout, send_proc,
197                                   use_buffer=True, bypass = True)
198
199
200            else:
201                #Receive data from the iproc processor
202                if  self.ghost_recv_dict.has_key(iproc):
203
204                    Idg = self.ghost_recv_dict[iproc][0]
205                    X = self.ghost_recv_dict[iproc][2]
206
207                    X = pypar.receive(iproc, buffer=X, bypass = True)
208
209                    for i, q in enumerate(self.conserved_quantities):
210                        #print 'Receive',i,q
211                        Q_cv =  self.quantities[q].centroid_values
212                        put(Q_cv,     Idg, X[:,i])
213
214        #local update of ghost cells
215        iproc = self.processor
216        if self.full_send_dict.has_key(iproc):
217
218            # LINDA:
219            # now store full as local id, global id, value
220            Idf  = self.full_send_dict[iproc][0]
221
222            # LINDA:
223            # now store ghost as local id, global id, value
224            Idg = self.ghost_recv_dict[iproc][0]
225
226            for i, q in enumerate(self.conserved_quantities):
227                #print 'LOCAL SEND RECEIVE',i,q
228                Q_cv =  self.quantities[q].centroid_values
229                put(Q_cv,     Idg, take(Q_cv,     Idf))
230
231        self.communication_time += time.time()-t0
232
233
234    def write_time(self):
235        if self.min_timestep == self.max_timestep:
236            print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
237                  %(self.processor, self.time, self.min_timestep, self.number_of_steps,
238                    self.number_of_first_order_steps)
239        elif self.min_timestep > self.max_timestep:
240            print 'Processor %d, Time = %.4f, steps=%d (%d)'\
241                  %(self.processor, self.time, self.number_of_steps,
242                    self.number_of_first_order_steps)
243        else:
244            print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
245                  %(self.processor, self.time, self.min_timestep,
246                    self.max_timestep, self.number_of_steps,
247                    self.number_of_first_order_steps)
248
249
250    def evolve(self, yieldstep = None, finaltime = None):
251        """Specialisation of basic evolve method from parent class
252        """
253
254        #Initialise real time viz if requested
255        if self.time == 0.0:
256            pass
257
258        #Call basic machinery from parent class
259        for t in Domain.evolve(self, yieldstep, finaltime):
260
261            #Pass control on to outer loop for more specific actions
262            yield(t)
Note: See TracBrowser for help on using the repository browser.