source: trunk/anuga_core/source/anuga_parallel/parallel_advection.py @ 7980

Last change on this file since 7980 was 7877, checked in by hudson, 14 years ago

Moved all development files into trunk.

File size: 6.2 KB
Line 
1import sys
2
3
4"""Class Parallel_domain -
52D triangular domains for finite-volume computations of
6the advection equation, with extra structures to allow
7communication between other Parallel_domains and itself
8
9This module contains a specialisation of class Domain from module advection.py
10
11Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
12Geoscience Australia, 2004-2005
13"""
14
15from anuga.advection import *
16
17from anuga import Domain
18
19import numpy as num
20
21import pypar
22
23
24class Parallel_domain(Domain):
25
26    def __init__(self,
27                 coordinates,
28                 vertices,
29                 boundary = None,
30                 full_send_dict = None,
31                 ghost_recv_dict = None,
32                 velocity = None):
33
34        Domain.__init__(self,
35                        coordinates,
36                        vertices,
37                        boundary,
38                        velocity = velocity,
39                        full_send_dict=full_send_dict,
40                        ghost_recv_dict=ghost_recv_dict,
41                        processor=pypar.rank(),
42                        numproc=pypar.size()
43                        )
44
45        N = self.number_of_elements
46
47
48        self.communication_time = 0.0
49        self.communication_reduce_time = 0.0
50
51
52        print 'processor',self.processor
53        print 'numproc',self.numproc
54
55    def check_integrity(self):
56        Domain.check_integrity(self)
57
58        msg = 'Will need to check global and local numbering'
59        assert self.conserved_quantities[0] == 'stage', msg
60
61    def update_timestep(self, yieldstep, finaltime):
62
63        #LINDA:
64        # moved the calculation so that it is done after timestep
65        # has been broadcast
66       
67#        # Calculate local timestep
68#        Domain.update_timestep(self, yieldstep, finaltime)
69
70        import time
71        t0 = time.time()
72
73        # For some reason it looks like pypar only reduces numeric arrays
74        # hence we need to create some dummy arrays for communication
75        ltimestep = num.ones( 1, num.float )
76        ltimestep[0] = self.flux_timestep
77        gtimestep = num.zeros( 1, num.float ) # Buffer for results
78
79        #ltimestep = self.flux_timeste
80
81        #print self.processor, ltimestep, gtimestep
82       
83        gtimestep = pypar.reduce(ltimestep, pypar.MIN, 0, buffer=gtimestep)
84
85        #print self.processor, ltimestep, gtimestep
86       
87        pypar.broadcast(gtimestep,0)
88
89        #print self.processor, ltimestep, gtimestep
90
91        self.flux_timestep = gtimestep[0]
92       
93        self.communication_reduce_time += time.time()-t0
94
95        # LINDA:
96        # Now update time stats
97       
98        # Calculate local timestep
99        Domain.update_timestep(self, yieldstep, finaltime)
100
101    def update_ghosts(self):
102
103        # We must send the information from the full cells and
104        # receive the information for the ghost cells
105        # We have a dictionary of lists with ghosts expecting updates from
106        # the separate processors
107
108        #from Numeric import take,put
109        import numpy as num
110        import time
111        t0 = time.time()
112
113        stage_cv = self.quantities['stage'].centroid_values
114
115        # update of non-local ghost cells
116        for iproc in range(self.numproc):
117            if iproc == self.processor:
118                #Send data from iproc processor to other processors
119                for send_proc in self.full_send_dict:
120                    if send_proc != iproc:
121
122                        Idf  = self.full_send_dict[send_proc][0]
123                        Xout = self.full_send_dict[send_proc][2]
124
125                        N = len(Idf)
126
127                        #for i in range(N):
128                        #    Xout[i,0] = stage_cv[Idf[i]]
129                        Xout[:,0] = num.take(stage_cv, Idf)
130
131                        pypar.send(Xout,send_proc)
132
133
134            else:
135                #Receive data from the iproc processor
136                if  self.ghost_recv_dict.has_key(iproc):
137
138                    # LINDA:
139                    # now store ghost as local id, global id, value
140                    Idg = self.ghost_recv_dict[iproc][0]
141                    X = self.ghost_recv_dict[iproc][2]
142
143                    X = pypar.receive(iproc,X)
144                    N = len(Idg)
145
146                    num.put(stage_cv, Idg, X[:,0])
147                    #for i in range(N):
148                    #    stage_cv[Idg[i]] = X[i,0]
149
150
151        #local update of ghost cells
152        iproc = self.processor
153        if self.full_send_dict.has_key(iproc):
154
155            # LINDA:
156            # now store full as local id, global id, value
157            Idf  = self.full_send_dict[iproc][0]
158
159            # LINDA:
160            # now store ghost as local id, global id, value
161            Idg = self.ghost_recv_dict[iproc][0]
162
163            N = len(Idg)
164
165            #for i in range(N):
166            #    #print i,Idg[i],Idf[i]
167            #    stage_cv[Idg[i]] = stage_cv[Idf[i]]
168
169            num.put(stage_cv, Idg, num.take(stage_cv, Idf))
170
171
172        self.communication_time += time.time()-t0
173
174
175    ## def write_time(self):
176    ##     if self.min_timestep == self.max_timestep:
177    ##         print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
178    ##               %(self.processor, self.time, self.min_timestep, self.number_of_steps,
179    ##                 self.number_of_first_order_steps)
180    ##     elif self.min_timestep > self.max_timestep:
181    ##         print 'Processor %d, Time = %.4f, steps=%d (%d)'\
182    ##               %(self.processor, self.time, self.number_of_steps,
183    ##                 self.number_of_first_order_steps)
184    ##     else:
185    ##         print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
186    ##               %(self.processor, self.time, self.min_timestep,
187    ##                 self.max_timestep, self.number_of_steps,
188    ##                 self.number_of_first_order_steps)
189
190
191
192    ## def evolve(self, yieldstep = None, finaltime = None):
193    ##     """Specialisation of basic evolve method from parent class
194    ##     """
195
196    ##     #Initialise real time viz if requested
197    ##     if self.time == 0.0:
198    ##         pass
199
200    ##     #Call basic machinery from parent class
201    ##     for t in Domain.evolve(self, yieldstep, finaltime):
202
203    ##         #Pass control on to outer loop for more specific actions
204    ##         yield(t)
205
Note: See TracBrowser for help on using the repository browser.