source: anuga_core/source/anuga_parallel/parallel_shallow_water.py @ 7449

Last change on this file since 7449 was 7449, checked in by steve, 15 years ago

Testing unit tests

File size: 7.6 KB
RevLine 
[7449]1"""Class Parallel_shallow_water_domain -
[3185]22D triangular domains for finite-volume computations of
3the shallow water equation, with extra structures to allow
[7449]4communication between other Parallel_domains and itself
[3185]5
6This module contains a specialisation of class Domain
7from module shallow_water.py
8
9Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
10Geoscience Australia, 2004-2005
[3580]11
[3185]12"""
13
[7447]14from anuga.interface import Domain
[3429]15
[7400]16
17import numpy as num
18
[3434]19import pypar
[3185]20
[3427]21
[7449]22class Parallel_domain(Domain):
[3185]23
[3926]24    def __init__(self, coordinates, vertices,
25                 boundary=None,
26                 full_send_dict=None,
27                 ghost_recv_dict=None,
[3928]28                 number_of_full_nodes=None,
29                 number_of_full_triangles=None):
[3185]30
31        Domain.__init__(self,
32                        coordinates,
33                        vertices,
34                        boundary,
35                        full_send_dict=full_send_dict,
36                        ghost_recv_dict=ghost_recv_dict,
37                        processor=pypar.rank(),
[3926]38                        numproc=pypar.size(),
39                        number_of_full_nodes=number_of_full_nodes,
40                        number_of_full_triangles=number_of_full_triangles)
[3185]41
[3928]42        N = len(self) # number_of_triangles
[3185]43
44
45        # Buffers for synchronisation of timesteps
[7400]46        self.local_timestep = num.zeros(1, num.float)
47        self.global_timestep = num.zeros(1, num.float)
[3185]48
[7447]49        self.local_timesteps = num.zeros(self.numproc, num.float)
[3185]50
51
52        self.communication_time = 0.0
53        self.communication_reduce_time = 0.0
54        self.communication_broadcast_time = 0.0
55
[3557]56       
[3185]57
[3557]58
[3185]59    def set_name(self, name):
60        """Assign name based on processor number
61        """
62
[3893]63        if name.endswith('.sww'):
64            name = name[:-4]
65
[3185]66        # Call parents method with processor number attached.
67        Domain.set_name(self, name + '_P%d_%d' %(self.processor, self.numproc))
68
69
70    def check_integrity(self):
71        Domain.check_integrity(self)
72
73        msg = 'Will need to check global and local numbering'
74        assert self.conserved_quantities[0] == 'stage', msg
75        assert self.conserved_quantities[1] == 'xmomentum', msg
76        assert self.conserved_quantities[2] == 'ymomentum', msg
77
78
79    def update_timestep_1(self, yieldstep, finaltime):
80        """Calculate local timestep using broadcasts
81        """
82
83        #LINDA:
84        # Moved below so timestep is found before doing update
85       
86        #Domain.update_timestep(self, yieldstep, finaltime)
87
88        import time
89
90
91        t0 = time.time()
92
93        #Broadcast local timestep from every processor to every other
94        for pid in range(self.numproc):
95            #print 'P%d calling broadcast from %d' %(self.processor, pid)
[5763]96            self.local_timestep[0] = self.flux_timestep
[3185]97            pypar.broadcast(self.local_timestep, pid, bypass=True)
98            self.local_timesteps[pid] = self.local_timestep[0]
99
[5763]100        self.flux_timestep = min(self.local_timesteps)
[3185]101
[5763]102        #print 'Flux Timestep %d P%d_%d' %(self.flux_timestep, self.processor, self.numproc)
103
[3185]104        pypar.barrier()
105        self.communication_broadcast_time += time.time()-t0
106
107        # LINDA:
108        # Moved timestep to here
109       
110        Domain.update_timestep(self, yieldstep, finaltime)
111
112
113    def update_timestep(self, yieldstep, finaltime):
114        """Calculate local timestep
115        """
116
117        # LINDA: Moved below so timestep is updated before
118        # calculating statistic
119       
120        #Compute minimal timestep on local process
121        #Domain.update_timestep(self, yieldstep, finaltime)
122
123        pypar.barrier()
124
125        import time
[7447]126
[3185]127        #Compute minimal timestep across all processes
[5763]128        self.local_timestep[0] = self.flux_timestep
[3185]129        use_reduce_broadcast = True
130        if use_reduce_broadcast:
131            t0 = time.time()
132            pypar.reduce(self.local_timestep, pypar.MIN, 0,
[4884]133                         buffer=self.global_timestep)#,
134                         #bypass=True)
[3185]135
136        else:
137            #Alternative: Try using straight send and receives
138            t0 = time.time()
[5763]139            self.global_timestep[0] = self.flux_timestep
[3185]140
141            if self.processor == 0:
142                for i in range(1, self.numproc):
143                    pypar.receive(i,
[7447]144                                  buffer=self.local_timestep)
[3185]145
146                    if self.local_timestep[0] < self.global_timestep[0]:
147                        self.global_timestep[0] = self.local_timestep[0]
148            else:
149                pypar.send(self.local_timestep, 0,
[7447]150                           use_buffer=True)
[3185]151
152
153        self.communication_reduce_time += time.time()-t0
154
155
156        #Broadcast minimal timestep to all
157        t0 = time.time()
[4884]158        pypar.broadcast(self.global_timestep, 0)#,
159                        #bypass=True)
[3185]160
161        self.communication_broadcast_time += time.time()-t0
162
[7447]163        old_timestep = self.flux_timestep
[5763]164        self.flux_timestep = self.global_timestep[0]
165        #print 'Flux Timestep %15.5e %15.5e P%d_%d' %(self.flux_timestep, old_timestep, self.processor, self.numproc)
[3185]166       
167        # LINDA:
168        # update local stats now
169       
170        #Compute minimal timestep on local process
171        Domain.update_timestep(self, yieldstep, finaltime)
172
[3431]173        # FIXME (Ole) We should update the variable min_timestep for use
174        # with write_time (or redo write_time)
175
[3185]176    #update_timestep = update_timestep_1
177
178    def update_ghosts(self):
179
180        # We must send the information from the full cells and
181        # receive the information for the ghost cells
182        # We have a dictionary of lists with ghosts expecting updates from
183        # the separate processors
184
[7447]185        import numpy as num
[3185]186        import time
187        t0 = time.time()
188
189        # update of non-local ghost cells
190        for iproc in range(self.numproc):
191            if iproc == self.processor:
192                #Send data from iproc processor to other processors
193                for send_proc in self.full_send_dict:
194                    if send_proc != iproc:
195
196                        Idf  = self.full_send_dict[send_proc][0]
197                        Xout = self.full_send_dict[send_proc][2]
198
199                        for i, q in enumerate(self.conserved_quantities):
200                            #print 'Send',i,q
201                            Q_cv =  self.quantities[q].centroid_values
[7447]202                            Xout[:,i] = num.take(Q_cv, Idf)
[3185]203
[7447]204                        pypar.send(Xout, int(send_proc), use_buffer=True)
[3185]205
206
207            else:
208                #Receive data from the iproc processor
209                if  self.ghost_recv_dict.has_key(iproc):
210
211                    Idg = self.ghost_recv_dict[iproc][0]
[7447]212                    X   = self.ghost_recv_dict[iproc][2]
[3185]213
[7447]214                    X = pypar.receive(int(iproc), buffer=X)
[3185]215
216                    for i, q in enumerate(self.conserved_quantities):
217                        #print 'Receive',i,q
218                        Q_cv =  self.quantities[q].centroid_values
[7447]219                        num.put(Q_cv, Idg, X[:,i])
[3185]220
221        #local update of ghost cells
222        iproc = self.processor
223        if self.full_send_dict.has_key(iproc):
224
225            # LINDA:
226            # now store full as local id, global id, value
227            Idf  = self.full_send_dict[iproc][0]
228
229            # LINDA:
230            # now store ghost as local id, global id, value
231            Idg = self.ghost_recv_dict[iproc][0]
232
233            for i, q in enumerate(self.conserved_quantities):
234                #print 'LOCAL SEND RECEIVE',i,q
235                Q_cv =  self.quantities[q].centroid_values
[7447]236                num.put(Q_cv, Idg, num.take(Q_cv, Idf))
[3185]237
238        self.communication_time += time.time()-t0
239
Note: See TracBrowser for help on using the repository browser.