source: anuga_core/source/anuga_parallel/parallel_advection.py @ 5763

Last change on this file since 5763 was 5763, checked in by steve, 16 years ago

Fixed a few bugs in the parallel shallow water code. Seems to
be working for low level code. Need to check Ole' parallel api

File size: 6.1 KB
Line 
1import sys
2
3
4"""Class Parallel_Domain -
52D triangular domains for finite-volume computations of
6the advection equation, with extra structures to allow
7communication between other Parallel_Domains and itself
8
9This module contains a specialisation of class Domain from module advection.py
10
11Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
12Geoscience Australia, 2004-2005
13"""
14
15import logging, logging.config
16logger = logging.getLogger('parallel')
17logger.setLevel(logging.WARNING)
18
19try:
20    logging.config.fileConfig('log.ini')
21except:
22    pass
23
24from anuga.advection import *
25from Numeric import zeros, Float, Int, ones, allclose, array
26import pypar
27
28
29class Parallel_Domain(Domain):
30
31    def __init__(self,
32                 coordinates,
33                 vertices,
34                 boundary = None,
35                 full_send_dict = None,
36                 ghost_recv_dict = None,
37                 velocity = None):
38
39        Domain.__init__(self,
40                        coordinates,
41                        vertices,
42                        boundary,
43                        velocity = velocity,
44                        full_send_dict=full_send_dict,
45                        ghost_recv_dict=ghost_recv_dict,
46                        processor=pypar.rank(),
47                        numproc=pypar.size()
48                        )
49
50        N = self.number_of_elements
51
52
53        self.communication_time = 0.0
54        self.communication_reduce_time = 0.0
55
56
57        print 'processor',self.processor
58        print 'numproc',self.numproc
59
60    def check_integrity(self):
61        Domain.check_integrity(self)
62
63        msg = 'Will need to check global and local numbering'
64        assert self.conserved_quantities[0] == 'stage', msg
65
66    def update_timestep(self, yieldstep, finaltime):
67
68        #LINDA:
69        # moved the calculation so that it is done after timestep
70        # has been broadcast
71       
72#        # Calculate local timestep
73#        Domain.update_timestep(self, yieldstep, finaltime)
74
75        import time
76        t0 = time.time()
77
78        # For some reason it looks like pypar only reduces numeric arrays
79        # hence we need to create some dummy arrays for communication
80        ltimestep = ones( 1, Float )
81        ltimestep[0] = self.flux_timestep
82        gtimestep = zeros( 1, Float) # Buffer for results
83       
84        pypar.raw_reduce(ltimestep, gtimestep, pypar.MIN, 0)
85        pypar.broadcast(gtimestep,0)
86
87        self.flux_timestep = gtimestep[0]
88       
89        self.communication_reduce_time += time.time()-t0
90
91        # LINDA:
92        # Now update time stats
93       
94        # Calculate local timestep
95        Domain.update_timestep(self, yieldstep, finaltime)
96
97    def update_ghosts(self):
98
99        # We must send the information from the full cells and
100        # receive the information for the ghost cells
101        # We have a dictionary of lists with ghosts expecting updates from
102        # the separate processors
103
104        from Numeric import take,put
105        import time
106        t0 = time.time()
107
108        stage_cv = self.quantities['stage'].centroid_values
109
110        # update of non-local ghost cells
111        for iproc in range(self.numproc):
112            if iproc == self.processor:
113                #Send data from iproc processor to other processors
114                for send_proc in self.full_send_dict:
115                    if send_proc != iproc:
116
117                        Idf  = self.full_send_dict[send_proc][0]
118                        Xout = self.full_send_dict[send_proc][2]
119
120                        N = len(Idf)
121
122                        #for i in range(N):
123                        #    Xout[i,0] = stage_cv[Idf[i]]
124                        Xout[:,0] = take(stage_cv, Idf)
125
126                        pypar.send(Xout,send_proc)
127
128
129            else:
130                #Receive data from the iproc processor
131                if  self.ghost_recv_dict.has_key(iproc):
132
133                    # LINDA:
134                    # now store ghost as local id, global id, value
135                    Idg = self.ghost_recv_dict[iproc][0]
136                    X = self.ghost_recv_dict[iproc][2]
137
138                    X = pypar.receive(iproc,X)
139                    N = len(Idg)
140
141                    put(stage_cv, Idg, X[:,0])
142                    #for i in range(N):
143                    #    stage_cv[Idg[i]] = X[i,0]
144
145
146        #local update of ghost cells
147        iproc = self.processor
148        if self.full_send_dict.has_key(iproc):
149
150            # LINDA:
151            # now store full as local id, global id, value
152            Idf  = self.full_send_dict[iproc][0]
153
154            # LINDA:
155            # now store ghost as local id, global id, value
156            Idg = self.ghost_recv_dict[iproc][0]
157
158            N = len(Idg)
159
160            #for i in range(N):
161            #    #print i,Idg[i],Idf[i]
162            #    stage_cv[Idg[i]] = stage_cv[Idf[i]]
163
164            put(stage_cv, Idg, take(stage_cv, Idf))
165
166
167        self.communication_time += time.time()-t0
168
169
170    def write_time(self):
171        if self.min_timestep == self.max_timestep:
172            print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
173                  %(self.processor, self.time, self.min_timestep, self.number_of_steps,
174                    self.number_of_first_order_steps)
175        elif self.min_timestep > self.max_timestep:
176            print 'Processor %d, Time = %.4f, steps=%d (%d)'\
177                  %(self.processor, self.time, self.number_of_steps,
178                    self.number_of_first_order_steps)
179        else:
180            print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
181                  %(self.processor, self.time, self.min_timestep,
182                    self.max_timestep, self.number_of_steps,
183                    self.number_of_first_order_steps)
184
185
186
187    def evolve(self, yieldstep = None, finaltime = None):
188        """Specialisation of basic evolve method from parent class
189        """
190
191        #Initialise real time viz if requested
192        if self.time == 0.0:
193            pass
194
195        #Call basic machinery from parent class
196        for t in Domain.evolve(self, yieldstep, finaltime):
197
198            #Pass control on to outer loop for more specific actions
199            yield(t)
Note: See TracBrowser for help on using the repository browser.