source: anuga_core/source/anuga_parallel/parallel_advection.py @ 7447

Last change on this file since 7447 was 7447, checked in by steve, 15 years ago

Concentrating code

File size: 6.4 KB
Line 
1import sys
2
3
4"""Class Parallel_Domain -
52D triangular domains for finite-volume computations of
6the advection equation, with extra structures to allow
7communication between other Parallel_Domains and itself
8
9This module contains a specialisation of class Domain from module advection.py
10
11Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
12Geoscience Australia, 2004-2005
13"""
14
15import logging, logging.config
16logger = logging.getLogger('parallel')
17logger.setLevel(logging.WARNING)
18
19try:
20    logging.config.fileConfig('log.ini')
21except:
22    pass
23
24from anuga.advection import *
25
26
27#from Numeric import zeros, Float, Int, ones, allclose, array
28import numpy as num
29
30import pypar
31
32
33class Parallel_Domain(Domain):
34
35    def __init__(self,
36                 coordinates,
37                 vertices,
38                 boundary = None,
39                 full_send_dict = None,
40                 ghost_recv_dict = None,
41                 velocity = None):
42
43        Domain.__init__(self,
44                        coordinates,
45                        vertices,
46                        boundary,
47                        velocity = velocity,
48                        full_send_dict=full_send_dict,
49                        ghost_recv_dict=ghost_recv_dict,
50                        processor=pypar.rank(),
51                        numproc=pypar.size()
52                        )
53
54        N = self.number_of_elements
55
56
57        self.communication_time = 0.0
58        self.communication_reduce_time = 0.0
59
60
61        print 'processor',self.processor
62        print 'numproc',self.numproc
63
64    def check_integrity(self):
65        Domain.check_integrity(self)
66
67        msg = 'Will need to check global and local numbering'
68        assert self.conserved_quantities[0] == 'stage', msg
69
70    def update_timestep(self, yieldstep, finaltime):
71
72        #LINDA:
73        # moved the calculation so that it is done after timestep
74        # has been broadcast
75       
76#        # Calculate local timestep
77#        Domain.update_timestep(self, yieldstep, finaltime)
78
79        import time
80        t0 = time.time()
81
82        # For some reason it looks like pypar only reduces numeric arrays
83        # hence we need to create some dummy arrays for communication
84        ltimestep = num.ones( 1, num.float )
85        ltimestep[0] = self.flux_timestep
86        gtimestep = num.zeros( 1, num.float ) # Buffer for results
87
88        #ltimestep = self.flux_timeste
89
90        #print self.processor, ltimestep, gtimestep
91       
92        gtimestep = pypar.reduce(ltimestep, pypar.MIN, 0, buffer=gtimestep)
93
94        #print self.processor, ltimestep, gtimestep
95       
96        pypar.broadcast(gtimestep,0)
97
98        #print self.processor, ltimestep, gtimestep
99
100        self.flux_timestep = gtimestep[0]
101       
102        self.communication_reduce_time += time.time()-t0
103
104        # LINDA:
105        # Now update time stats
106       
107        # Calculate local timestep
108        Domain.update_timestep(self, yieldstep, finaltime)
109
110    def update_ghosts(self):
111
112        # We must send the information from the full cells and
113        # receive the information for the ghost cells
114        # We have a dictionary of lists with ghosts expecting updates from
115        # the separate processors
116
117        #from Numeric import take,put
118        import numpy as num
119        import time
120        t0 = time.time()
121
122        stage_cv = self.quantities['stage'].centroid_values
123
124        # update of non-local ghost cells
125        for iproc in range(self.numproc):
126            if iproc == self.processor:
127                #Send data from iproc processor to other processors
128                for send_proc in self.full_send_dict:
129                    if send_proc != iproc:
130
131                        Idf  = self.full_send_dict[send_proc][0]
132                        Xout = self.full_send_dict[send_proc][2]
133
134                        N = len(Idf)
135
136                        #for i in range(N):
137                        #    Xout[i,0] = stage_cv[Idf[i]]
138                        Xout[:,0] = num.take(stage_cv, Idf)
139
140                        pypar.send(Xout,send_proc)
141
142
143            else:
144                #Receive data from the iproc processor
145                if  self.ghost_recv_dict.has_key(iproc):
146
147                    # LINDA:
148                    # now store ghost as local id, global id, value
149                    Idg = self.ghost_recv_dict[iproc][0]
150                    X = self.ghost_recv_dict[iproc][2]
151
152                    X = pypar.receive(iproc,X)
153                    N = len(Idg)
154
155                    num.put(stage_cv, Idg, X[:,0])
156                    #for i in range(N):
157                    #    stage_cv[Idg[i]] = X[i,0]
158
159
160        #local update of ghost cells
161        iproc = self.processor
162        if self.full_send_dict.has_key(iproc):
163
164            # LINDA:
165            # now store full as local id, global id, value
166            Idf  = self.full_send_dict[iproc][0]
167
168            # LINDA:
169            # now store ghost as local id, global id, value
170            Idg = self.ghost_recv_dict[iproc][0]
171
172            N = len(Idg)
173
174            #for i in range(N):
175            #    #print i,Idg[i],Idf[i]
176            #    stage_cv[Idg[i]] = stage_cv[Idf[i]]
177
178            num.put(stage_cv, Idg, num.take(stage_cv, Idf))
179
180
181        self.communication_time += time.time()-t0
182
183
184    ## def write_time(self):
185    ##     if self.min_timestep == self.max_timestep:
186    ##         print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
187    ##               %(self.processor, self.time, self.min_timestep, self.number_of_steps,
188    ##                 self.number_of_first_order_steps)
189    ##     elif self.min_timestep > self.max_timestep:
190    ##         print 'Processor %d, Time = %.4f, steps=%d (%d)'\
191    ##               %(self.processor, self.time, self.number_of_steps,
192    ##                 self.number_of_first_order_steps)
193    ##     else:
194    ##         print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
195    ##               %(self.processor, self.time, self.min_timestep,
196    ##                 self.max_timestep, self.number_of_steps,
197    ##                 self.number_of_first_order_steps)
198
199
200
201    ## def evolve(self, yieldstep = None, finaltime = None):
202    ##     """Specialisation of basic evolve method from parent class
203    ##     """
204
205    ##     #Initialise real time viz if requested
206    ##     if self.time == 0.0:
207    ##         pass
208
209    ##     #Call basic machinery from parent class
210    ##     for t in Domain.evolve(self, yieldstep, finaltime):
211
212    ##         #Pass control on to outer loop for more specific actions
213    ##         yield(t)
214
Note: See TracBrowser for help on using the repository browser.