source: inundation/ga/storm_surge/parallel/parallel_advection.py @ 1599

Last change on this file since 1599 was 1588, checked in by steve, 19 years ago

Sped up communications with numeric put and take commands

File size: 6.2 KB
Line 
1import sys
2from os import sep
3sys.path.append('..'+sep+'pyvolution')
4
5"""Class Parallel_Domain -
62D triangular domains for finite-volume computations of
7the advection equation, with extra structures to allow
8communication between other Parallel_Domains and itself
9
10This module contains a specialisation of class Domain from module advection.py
11
12Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou
13Geoscience Australia, 2004-2005
14"""
15
16import logging, logging.config
17logger = logging.getLogger('parallel')
18logger.setLevel(logging.WARNING)
19
20try:
21    logging.config.fileConfig('log.ini')
22except:
23    pass
24
25from advection import *
26from Numeric import zeros, Float, Int, ones, allclose, array
27import pypar
28
29
30class Parallel_Domain(Domain):
31
32    def __init__(self, coordinates, vertices, boundary = None,
33                 full_send_dict = None, ghost_recv_dict = None,
34                 velocity = None):
35
36        self.processor = pypar.rank()
37        self.numproc   = pypar.size()
38
39        Domain.__init__(self, coordinates, vertices, boundary,
40                        velocity = velocity)
41
42        N = self.number_of_elements
43
44        self.processor = pypar.rank()
45        self.numproc   = pypar.size()
46
47
48        # Setup Communication Buffers
49        self.nsys = 1
50        for key in full_send_dict:
51            buffer_shape = full_send_dict[key][0].shape[0]
52            full_send_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
53
54
55        for key in ghost_recv_dict:
56            buffer_shape = ghost_recv_dict[key][0].shape[0]
57            ghost_recv_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float))
58
59        self.full_send_dict  = full_send_dict
60        self.ghost_recv_dict = ghost_recv_dict
61
62        self.communication_time = 0.0
63        self.communication_reduce_time = 0.0
64
65    def check_integrity(self):
66        Domain.check_integrity(self)
67
68        msg = 'Will need to check global and local numbering'
69        assert self.conserved_quantities[0] == 'stage', msg
70
71    def update_timestep(self, yieldstep, finaltime):
72
73        # Calculate local timestep
74        Domain.update_timestep(self, yieldstep, finaltime)
75
76        import time
77        t0 = time.time()
78
79        # For some reason it looks like pypar only reduces numeric arrays
80        # hence we need to create some dummy arrays for communication
81        ltimestep = ones( 1, Float )
82        ltimestep[0] = self.timestep
83        gtimestep = zeros( 1, Float) # Buffer for results
84
85        pypar.raw_reduce(ltimestep, gtimestep, pypar.MIN, 0)
86        pypar.broadcast(gtimestep,0)
87
88        self.timestep = gtimestep[0]
89
90        self.communication_reduce_time += time.time()-t0
91
92
93    def update_ghosts(self):
94
95        # We must send the information from the full cells and
96        # receive the information for the ghost cells
97        # We have a dictionary of lists with ghosts expecting updates from
98        # the separate processors
99
100        from Numeric import take,put
101        import time
102        t0 = time.time()
103
104        stage_cv = self.quantities['stage'].centroid_values
105
106        # update of non-local ghost cells
107        for iproc in range(self.numproc):
108            if iproc == self.processor:
109                #Send data from iproc processor to other processors
110                for send_proc in self.full_send_dict:
111                    if send_proc != iproc:
112
113                        Idf  = self.full_send_dict[send_proc][0]
114                        Xout = self.full_send_dict[send_proc][2]
115
116                        N = len(Idf)
117
118                        #for i in range(N):
119                        #    Xout[i,0] = stage_cv[Idf[i]]
120                        Xout[:,0] = take(stage_cv, Idf)
121
122                        pypar.send(Xout,send_proc)
123
124
125            else:
126                #Receive data from the iproc processor
127                if  self.ghost_recv_dict.has_key(iproc):
128
129                    # LINDA:
130                    # now store ghost as local id, global id, value
131                    Idg = self.ghost_recv_dict[iproc][0]
132                    X = self.ghost_recv_dict[iproc][2]
133
134                    X = pypar.receive(iproc,X)
135                    N = len(Idg)
136
137                    put(stage_cv, Idg, X[:,0])
138                    #for i in range(N):
139                    #    stage_cv[Idg[i]] = X[i,0]
140
141
142        #local update of ghost cells
143        iproc = self.processor
144        if self.full_send_dict.has_key(iproc):
145
146            # LINDA:
147            # now store full as local id, global id, value
148            Idf  = self.full_send_dict[iproc][0]
149
150            # LINDA:
151            # now store ghost as local id, global id, value
152            Idg = self.ghost_recv_dict[iproc][0]
153
154            N = len(Idg)
155
156            #for i in range(N):
157            #    #print i,Idg[i],Idf[i]
158            #    stage_cv[Idg[i]] = stage_cv[Idf[i]]
159
160            put(stage_cv, Idg, take(stage_cv, Idf))
161
162
163        self.communication_time += time.time()-t0
164
165
166    def write_time(self):
167        if self.min_timestep == self.max_timestep:
168            print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\
169                  %(self.processor, self.time, self.min_timestep, self.number_of_steps,
170                    self.number_of_first_order_steps)
171        elif self.min_timestep > self.max_timestep:
172            print 'Processor %d, Time = %.4f, steps=%d (%d)'\
173                  %(self.processor, self.time, self.number_of_steps,
174                    self.number_of_first_order_steps)
175        else:
176            print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\
177                  %(self.processor, self.time, self.min_timestep,
178                    self.max_timestep, self.number_of_steps,
179                    self.number_of_first_order_steps)
180
181
182
183    def evolve(self, yieldstep = None, finaltime = None):
184        """Specialisation of basic evolve method from parent class
185        """
186
187        #Initialise real time viz if requested
188        if self.time == 0.0:
189            pass
190
191        #Call basic machinery from parent class
192        for t in Domain.evolve(self, yieldstep, finaltime):
193
194            #Pass control on to outer loop for more specific actions
195            yield(t)
Note: See TracBrowser for help on using the repository browser.