source: anuga_core/source/anuga_parallel/parallel_advection.py @ 7449

Last change on this file since 7449 was 7449, checked in by steve, 15 years ago

Testing unit tests

File size: 6.2 KB
Line 
1import sys
2
3
4"""Class Parallel_domain -
52D triangular domains for finite-volume computations of
6the advection equation, with extra structures to allow
7communication between other Parallel_domains and itself
8
9This module contains a specialisation of class Domain from module advection.py
10
11Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
12Geoscience Australia, 2004-2005
13"""
14
15from anuga.advection import *
16
17
18import numpy as num
19
20import pypar
21
22
23class Parallel_domain(Domain):
24
25    def __init__(self,
26                 coordinates,
27                 vertices,
28                 boundary = None,
29                 full_send_dict = None,
30                 ghost_recv_dict = None,
31                 velocity = None):
32
33        Domain.__init__(self,
34                        coordinates,
35                        vertices,
36                        boundary,
37                        velocity = velocity,
38                        full_send_dict=full_send_dict,
39                        ghost_recv_dict=ghost_recv_dict,
40                        processor=pypar.rank(),
41                        numproc=pypar.size()
42                        )
43
44        N = self.number_of_elements
45
46
47        self.communication_time = 0.0
48        self.communication_reduce_time = 0.0
49
50
51        print 'processor',self.processor
52        print 'numproc',self.numproc
53
54    def check_integrity(self):
55        Domain.check_integrity(self)
56
57        msg = 'Will need to check global and local numbering'
58        assert self.conserved_quantities[0] == 'stage', msg
59
60    def update_timestep(self, yieldstep, finaltime):
61
62        #LINDA:
63        # moved the calculation so that it is done after timestep
64        # has been broadcast
65       
66#        # Calculate local timestep
67#        Domain.update_timestep(self, yieldstep, finaltime)
68
69        import time
70        t0 = time.time()
71
72        # For some reason it looks like pypar only reduces numeric arrays
73        # hence we need to create some dummy arrays for communication
74        ltimestep = num.ones( 1, num.float )
75        ltimestep[0] = self.flux_timestep
76        gtimestep = num.zeros( 1, num.float ) # Buffer for results
77
78        #ltimestep = self.flux_timeste
79
80        #print self.processor, ltimestep, gtimestep
81       
82        gtimestep = pypar.reduce(ltimestep, pypar.MIN, 0, buffer=gtimestep)
83
84        #print self.processor, ltimestep, gtimestep
85       
86        pypar.broadcast(gtimestep,0)
87
88        #print self.processor, ltimestep, gtimestep
89
90        self.flux_timestep = gtimestep[0]
91       
92        self.communication_reduce_time += time.time()-t0
93
94        # LINDA:
95        # Now update time stats
96       
97        # Calculate local timestep
98        Domain.update_timestep(self, yieldstep, finaltime)
99
100    def update_ghosts(self):
101
102        # We must send the information from the full cells and
103        # receive the information for the ghost cells
104        # We have a dictionary of lists with ghosts expecting updates from
105        # the separate processors
106
107        #from Numeric import take,put
108        import numpy as num
109        import time
110        t0 = time.time()
111
112        stage_cv = self.quantities['stage'].centroid_values
113
114        # update of non-local ghost cells
115        for iproc in range(self.numproc):
116            if iproc == self.processor:
117                #Send data from iproc processor to other processors
118                for send_proc in self.full_send_dict:
119                    if send_proc != iproc:
120
121                        Idf  = self.full_send_dict[send_proc][0]
122                        Xout = self.full_send_dict[send_proc][2]
123
124                        N = len(Idf)
125
126                        #for i in range(N):
127                        #    Xout[i,0] = stage_cv[Idf[i]]
128                        Xout[:,0] = num.take(stage_cv, Idf)
129
130                        pypar.send(Xout,send_proc)
131
132
133            else:
134                #Receive data from the iproc processor
135                if  self.ghost_recv_dict.has_key(iproc):
136
137                    # LINDA:
138                    # now store ghost as local id, global id, value
139                    Idg = self.ghost_recv_dict[iproc][0]
140                    X = self.ghost_recv_dict[iproc][2]
141
142                    X = pypar.receive(iproc,X)
143                    N = len(Idg)
144
145                    num.put(stage_cv, Idg, X[:,0])
146                    #for i in range(N):
147                    #    stage_cv[Idg[i]] = X[i,0]
148
149
150        #local update of ghost cells
151        iproc = self.processor
152        if self.full_send_dict.has_key(iproc):
153
154            # LINDA:
155            # now store full as local id, global id, value
156            Idf  = self.full_send_dict[iproc][0]
157
158            # LINDA:
159            # now store ghost as local id, global id, value
160            Idg = self.ghost_recv_dict[iproc][0]
161
162            N = len(Idg)
163
164            #for i in range(N):
165            #    #print i,Idg[i],Idf[i]
166            #    stage_cv[Idg[i]] = stage_cv[Idf[i]]
167
168            num.put(stage_cv, Idg, num.take(stage_cv, Idf))
169
170
171        self.communication_time += time.time()-t0
172
173
174    ## def write_time(self):
175    ##     if self.min_timestep == self.max_timestep:
176    ##         print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
177    ##               %(self.processor, self.time, self.min_timestep, self.number_of_steps,
178    ##                 self.number_of_first_order_steps)
179    ##     elif self.min_timestep > self.max_timestep:
180    ##         print 'Processor %d, Time = %.4f, steps=%d (%d)'\
181    ##               %(self.processor, self.time, self.number_of_steps,
182    ##                 self.number_of_first_order_steps)
183    ##     else:
184    ##         print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
185    ##               %(self.processor, self.time, self.min_timestep,
186    ##                 self.max_timestep, self.number_of_steps,
187    ##                 self.number_of_first_order_steps)
188
189
190
191    ## def evolve(self, yieldstep = None, finaltime = None):
192    ##     """Specialisation of basic evolve method from parent class
193    ##     """
194
195    ##     #Initialise real time viz if requested
196    ##     if self.time == 0.0:
197    ##         pass
198
199    ##     #Call basic machinery from parent class
200    ##     for t in Domain.evolve(self, yieldstep, finaltime):
201
202    ##         #Pass control on to outer loop for more specific actions
203    ##         yield(t)
204
Note: See TracBrowser for help on using the repository browser.