source: anuga_work/development/parallel/parallel_advection.py @ 4243

Last change on this file since 4243 was 3514, checked in by duncan, 18 years ago

Hi all,
I'm doing a change in the anuga structure, moving the code to

\anuga_core\source\anuga

After you have done an svn update, the PYTHONPATH has to be changed to;
PYTHONPATH = anuga_core/source/

This is part of changes required to make installation of anuga quicker and reducing the size of our sandpits.

If any imports are broken, try fixing them. With adding anuga. to them for example. If this seems to have really broken things, email/phone me.

Cheers
Duncan

File size: 6.1 KB
Line 
1import sys
2from os import sep
3sys.path.append('..'+sep+'pyvolution')
4
5"""Class Parallel_Domain -
62D triangular domains for finite-volume computations of
7the advection equation, with extra structures to allow
8communication between other Parallel_Domains and itself
9
10This module contains a specialisation of class Domain from module advection.py
11
12Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
13Geoscience Australia, 2004-2005
14"""
15
16import logging, logging.config
17logger = logging.getLogger('parallel')
18logger.setLevel(logging.WARNING)
19
20try:
21    logging.config.fileConfig('log.ini')
22except:
23    pass
24
25from anuga.pyvolution.advection_vtk import *
26from Numeric import zeros, Float, Int, ones, allclose, array
27import pypar
28
29
30class Parallel_Domain(Domain):
31
32    def __init__(self,
33                 coordinates,
34                 vertices,
35                 boundary = None,
36                 full_send_dict = None,
37                 ghost_recv_dict = None,
38                 velocity = None):
39
40        Domain.__init__(self,
41                        coordinates,
42                        vertices,
43                        boundary,
44                        velocity = velocity,
45                        full_send_dict=full_send_dict,
46                        ghost_recv_dict=ghost_recv_dict,
47                        processor=pypar.rank(),
48                        numproc=pypar.size()
49                        )
50
51        N = self.number_of_elements
52
53
54        self.communication_time = 0.0
55        self.communication_reduce_time = 0.0
56
57
58        print 'processor',self.processor
59        print 'numproc',self.numproc
60
61    def check_integrity(self):
62        Domain.check_integrity(self)
63
64        msg = 'Will need to check global and local numbering'
65        assert self.conserved_quantities[0] == 'stage', msg
66
67    def update_timestep(self, yieldstep, finaltime):
68
69        #LINDA:
70        # moved the calculation so that it is done after timestep
71        # has been broadcast
72       
73#        # Calculate local timestep
74#        Domain.update_timestep(self, yieldstep, finaltime)
75
76        import time
77        t0 = time.time()
78
79        # For some reason it looks like pypar only reduces numeric arrays
80        # hence we need to create some dummy arrays for communication
81        ltimestep = ones( 1, Float )
82        ltimestep[0] = self.timestep
83        gtimestep = zeros( 1, Float) # Buffer for results
84       
85        pypar.raw_reduce(ltimestep, gtimestep, pypar.MIN, 0)
86        pypar.broadcast(gtimestep,0)
87
88        self.timestep = gtimestep[0]
89       
90        self.communication_reduce_time += time.time()-t0
91
92        # LINDA:
93        # Now update time stats
94       
95        # Calculate local timestep
96        Domain.update_timestep(self, yieldstep, finaltime)
97
98    def update_ghosts(self):
99
100        # We must send the information from the full cells and
101        # receive the information for the ghost cells
102        # We have a dictionary of lists with ghosts expecting updates from
103        # the separate processors
104
105        from Numeric import take,put
106        import time
107        t0 = time.time()
108
109        stage_cv = self.quantities['stage'].centroid_values
110
111        # update of non-local ghost cells
112        for iproc in range(self.numproc):
113            if iproc == self.processor:
114                #Send data from iproc processor to other processors
115                for send_proc in self.full_send_dict:
116                    if send_proc != iproc:
117
118                        Idf  = self.full_send_dict[send_proc][0]
119                        Xout = self.full_send_dict[send_proc][2]
120
121                        N = len(Idf)
122
123                        #for i in range(N):
124                        #    Xout[i,0] = stage_cv[Idf[i]]
125                        Xout[:,0] = take(stage_cv, Idf)
126
127                        pypar.send(Xout,send_proc)
128
129
130            else:
131                #Receive data from the iproc processor
132                if  self.ghost_recv_dict.has_key(iproc):
133
134                    # LINDA:
135                    # now store ghost as local id, global id, value
136                    Idg = self.ghost_recv_dict[iproc][0]
137                    X = self.ghost_recv_dict[iproc][2]
138
139                    X = pypar.receive(iproc,X)
140                    N = len(Idg)
141
142                    put(stage_cv, Idg, X[:,0])
143                    #for i in range(N):
144                    #    stage_cv[Idg[i]] = X[i,0]
145
146
147        #local update of ghost cells
148        iproc = self.processor
149        if self.full_send_dict.has_key(iproc):
150
151            # LINDA:
152            # now store full as local id, global id, value
153            Idf  = self.full_send_dict[iproc][0]
154
155            # LINDA:
156            # now store ghost as local id, global id, value
157            Idg = self.ghost_recv_dict[iproc][0]
158
159            N = len(Idg)
160
161            #for i in range(N):
162            #    #print i,Idg[i],Idf[i]
163            #    stage_cv[Idg[i]] = stage_cv[Idf[i]]
164
165            put(stage_cv, Idg, take(stage_cv, Idf))
166
167
168        self.communication_time += time.time()-t0
169
170
171    def write_time(self):
172        if self.min_timestep == self.max_timestep:
173            print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
174                  %(self.processor, self.time, self.min_timestep, self.number_of_steps,
175                    self.number_of_first_order_steps)
176        elif self.min_timestep > self.max_timestep:
177            print 'Processor %d, Time = %.4f, steps=%d (%d)'\
178                  %(self.processor, self.time, self.number_of_steps,
179                    self.number_of_first_order_steps)
180        else:
181            print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
182                  %(self.processor, self.time, self.min_timestep,
183                    self.max_timestep, self.number_of_steps,
184                    self.number_of_first_order_steps)
185
186
187
188    def evolve(self, yieldstep = None, finaltime = None):
189        """Specialisation of basic evolve method from parent class
190        """
191
192        #Initialise real time viz if requested
193        if self.time == 0.0:
194            pass
195
196        #Call basic machinery from parent class
197        for t in Domain.evolve(self, yieldstep, finaltime):
198
199            #Pass control on to outer loop for more specific actions
200            yield(t)
Note: See TracBrowser for help on using the repository browser.