source: inundation/parallel/parallel_advection.py @ 3413

Last change on this file since 3413 was 3315, checked in by steve, 18 years ago
File size: 6.1 KB
Line 
1import sys
2from os import sep
3sys.path.append('..'+sep+'pyvolution')
4
5"""Class Parallel_Domain -
62D triangular domains for finite-volume computations of
7the advection equation, with extra structures to allow
8communication between other Parallel_Domains and itself
9
10This module contains a specialisation of class Domain from module advection.py
11
12Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
13Geoscience Australia, 2004-2005
14"""
15
16import logging, logging.config
17logger = logging.getLogger('parallel')
18logger.setLevel(logging.WARNING)
19
20try:
21    logging.config.fileConfig('log.ini')
22except:
23    pass
24
25from pyvolution.advection_vtk import *
26from Numeric import zeros, Float, Int, ones, allclose, array
27import pypar
28
29
30class Parallel_Domain(Domain):
31
32    def __init__(self,
33                 coordinates,
34                 vertices,
35                 boundary = None,
36                 full_send_dict = None,
37                 ghost_recv_dict = None,
38                 velocity = None):
39
40        Domain.__init__(self,
41                        coordinates,
42                        vertices,
43                        boundary,
44                        velocity = velocity,
45                        full_send_dict=full_send_dict,
46                        ghost_recv_dict=ghost_recv_dict,
47                        processor=pypar.rank(),
48                        numproc=pypar.size()
49                        )
50
51        N = self.number_of_elements
52
53
54        self.communication_time = 0.0
55        self.communication_reduce_time = 0.0
56
57
58        print 'processor',self.processor
59        print 'numproc',self.numproc
60
61    def check_integrity(self):
62        Domain.check_integrity(self)
63
64        msg = 'Will need to check global and local numbering'
65        assert self.conserved_quantities[0] == 'stage', msg
66
67    def update_timestep(self, yieldstep, finaltime):
68
69        #LINDA:
70        # moved the calculation so that it is done after timestep
71        # has been broadcast
72       
73#        # Calculate local timestep
74#        Domain.update_timestep(self, yieldstep, finaltime)
75
76        import time
77        t0 = time.time()
78
79        # For some reason it looks like pypar only reduces numeric arrays
80        # hence we need to create some dummy arrays for communication
81        ltimestep = ones( 1, Float )
82        ltimestep[0] = self.timestep
83        gtimestep = zeros( 1, Float) # Buffer for results
84       
85        pypar.raw_reduce(ltimestep, gtimestep, pypar.MIN, 0)
86        pypar.broadcast(gtimestep,0)
87
88        self.timestep = gtimestep[0]
89       
90        self.communication_reduce_time += time.time()-t0
91
92        # LINDA:
93        # Now update time stats
94       
95        # Calculate local timestep
96        Domain.update_timestep(self, yieldstep, finaltime)
97
98    def update_ghosts(self):
99
100        # We must send the information from the full cells and
101        # receive the information for the ghost cells
102        # We have a dictionary of lists with ghosts expecting updates from
103        # the separate processors
104
105        from Numeric import take,put
106        import time
107        t0 = time.time()
108
109        stage_cv = self.quantities['stage'].centroid_values
110
111        # update of non-local ghost cells
112        for iproc in range(self.numproc):
113            if iproc == self.processor:
114                #Send data from iproc processor to other processors
115                for send_proc in self.full_send_dict:
116                    if send_proc != iproc:
117
118                        Idf  = self.full_send_dict[send_proc][0]
119                        Xout = self.full_send_dict[send_proc][2]
120
121                        N = len(Idf)
122
123                        #for i in range(N):
124                        #    Xout[i,0] = stage_cv[Idf[i]]
125                        Xout[:,0] = take(stage_cv, Idf)
126
127                        pypar.send(Xout,send_proc)
128
129
130            else:
131                #Receive data from the iproc processor
132                if  self.ghost_recv_dict.has_key(iproc):
133
134                    # LINDA:
135                    # now store ghost as local id, global id, value
136                    Idg = self.ghost_recv_dict[iproc][0]
137                    X = self.ghost_recv_dict[iproc][2]
138
139                    X = pypar.receive(iproc,X)
140                    N = len(Idg)
141
142                    put(stage_cv, Idg, X[:,0])
143                    #for i in range(N):
144                    #    stage_cv[Idg[i]] = X[i,0]
145
146
147        #local update of ghost cells
148        iproc = self.processor
149        if self.full_send_dict.has_key(iproc):
150
151            # LINDA:
152            # now store full as local id, global id, value
153            Idf  = self.full_send_dict[iproc][0]
154
155            # LINDA:
156            # now store ghost as local id, global id, value
157            Idg = self.ghost_recv_dict[iproc][0]
158
159            N = len(Idg)
160
161            #for i in range(N):
162            #    #print i,Idg[i],Idf[i]
163            #    stage_cv[Idg[i]] = stage_cv[Idf[i]]
164
165            put(stage_cv, Idg, take(stage_cv, Idf))
166
167
168        self.communication_time += time.time()-t0
169
170
171    def write_time(self):
172        if self.min_timestep == self.max_timestep:
173            print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
174                  %(self.processor, self.time, self.min_timestep, self.number_of_steps,
175                    self.number_of_first_order_steps)
176        elif self.min_timestep > self.max_timestep:
177            print 'Processor %d, Time = %.4f, steps=%d (%d)'\
178                  %(self.processor, self.time, self.number_of_steps,
179                    self.number_of_first_order_steps)
180        else:
181            print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
182                  %(self.processor, self.time, self.min_timestep,
183                    self.max_timestep, self.number_of_steps,
184                    self.number_of_first_order_steps)
185
186
187
188    def evolve(self, yieldstep = None, finaltime = None):
189        """Specialisation of basic evolve method from parent class
190        """
191
192        #Initialise real time viz if requested
193        if self.time == 0.0:
194            pass
195
196        #Call basic machinery from parent class
197        for t in Domain.evolve(self, yieldstep, finaltime):
198
199            #Pass control on to outer loop for more specific actions
200            yield(t)
Note: See TracBrowser for help on using the repository browser.