[3185] | 1 | import sys |
---|
| 2 | from os import sep |
---|
| 3 | |
---|
| 4 | # FIXME: This should be removed. Set evironment variable PYTHONPATH to |
---|
| 5 | # directory ...../anuga/inundation instead |
---|
| 6 | #sys.path.append('..'+sep+'pyvolution') |
---|
| 7 | |
---|
| 8 | """Class Parallel_Shallow_Water_Domain - |
---|
| 9 | 2D triangular domains for finite-volume computations of |
---|
| 10 | the shallow water equation, with extra structures to allow |
---|
| 11 | communication between other Parallel_Domains and itself |
---|
| 12 | |
---|
| 13 | This module contains a specialisation of class Domain |
---|
| 14 | from module shallow_water.py |
---|
| 15 | |
---|
| 16 | Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou |
---|
| 17 | Geoscience Australia, 2004-2005 |
---|
| 18 | """ |
---|
| 19 | |
---|
| 20 | import logging, logging.config |
---|
| 21 | logger = logging.getLogger('parallel') |
---|
| 22 | logger.setLevel(logging.WARNING) |
---|
| 23 | |
---|
| 24 | try: |
---|
| 25 | logging.config.fileConfig('log.ini') |
---|
| 26 | except: |
---|
| 27 | pass |
---|
| 28 | |
---|
| 29 | from pyvolution.shallow_water import * |
---|
| 30 | from Numeric import zeros, Float, Int, ones, allclose, array |
---|
[3429] | 31 | |
---|
[3434] | 32 | import pypar |
---|
[3185] | 33 | |
---|
[3427] | 34 | |
---|
[3185] | 35 | class Parallel_Domain(Domain): |
---|
| 36 | |
---|
| 37 | def __init__(self, coordinates, vertices, boundary = None, |
---|
| 38 | full_send_dict = None, ghost_recv_dict = None): |
---|
| 39 | |
---|
| 40 | Domain.__init__(self, |
---|
| 41 | coordinates, |
---|
| 42 | vertices, |
---|
| 43 | boundary, |
---|
| 44 | full_send_dict=full_send_dict, |
---|
| 45 | ghost_recv_dict=ghost_recv_dict, |
---|
| 46 | processor=pypar.rank(), |
---|
| 47 | numproc=pypar.size()) |
---|
| 48 | |
---|
| 49 | N = self.number_of_elements |
---|
| 50 | |
---|
| 51 | # self.processor = pypar.rank() |
---|
| 52 | # self.numproc = pypar.size() |
---|
| 53 | # |
---|
| 54 | # # Setup Communication Buffers |
---|
| 55 | # self.nsys = 3 |
---|
| 56 | # for key in full_send_dict: |
---|
| 57 | # buffer_shape = full_send_dict[key][0].shape[0] |
---|
| 58 | # full_send_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float)) |
---|
| 59 | # |
---|
| 60 | # |
---|
| 61 | # for key in ghost_recv_dict: |
---|
| 62 | # buffer_shape = ghost_recv_dict[key][0].shape[0] |
---|
| 63 | # ghost_recv_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float)) |
---|
| 64 | # |
---|
| 65 | # self.full_send_dict = full_send_dict |
---|
| 66 | self.ghost_recv_dict = ghost_recv_dict |
---|
| 67 | |
---|
| 68 | # Buffers for synchronisation of timesteps |
---|
| 69 | self.local_timestep = zeros(1, Float) |
---|
| 70 | self.global_timestep = zeros(1, Float) |
---|
| 71 | |
---|
| 72 | self.local_timesteps = zeros(self.numproc, Float) |
---|
| 73 | |
---|
| 74 | |
---|
| 75 | self.communication_time = 0.0 |
---|
| 76 | self.communication_reduce_time = 0.0 |
---|
| 77 | self.communication_broadcast_time = 0.0 |
---|
| 78 | |
---|
| 79 | |
---|
| 80 | def set_name(self, name): |
---|
| 81 | """Assign name based on processor number |
---|
| 82 | """ |
---|
| 83 | |
---|
| 84 | # Call parents method with processor number attached. |
---|
| 85 | Domain.set_name(self, name + '_P%d_%d' %(self.processor, self.numproc)) |
---|
| 86 | |
---|
[3427] | 87 | #FIXME (Ole): Roll this into domain.py. Only change name if numproc>0 |
---|
[3185] | 88 | |
---|
| 89 | def check_integrity(self): |
---|
| 90 | Domain.check_integrity(self) |
---|
| 91 | |
---|
| 92 | msg = 'Will need to check global and local numbering' |
---|
| 93 | assert self.conserved_quantities[0] == 'stage', msg |
---|
| 94 | assert self.conserved_quantities[1] == 'xmomentum', msg |
---|
| 95 | assert self.conserved_quantities[2] == 'ymomentum', msg |
---|
| 96 | |
---|
| 97 | |
---|
| 98 | def update_timestep_1(self, yieldstep, finaltime): |
---|
| 99 | """Calculate local timestep using broadcasts |
---|
| 100 | """ |
---|
| 101 | |
---|
| 102 | #LINDA: |
---|
| 103 | # Moved below so timestep is found before doing update |
---|
| 104 | |
---|
| 105 | #Domain.update_timestep(self, yieldstep, finaltime) |
---|
| 106 | |
---|
| 107 | import time |
---|
| 108 | |
---|
| 109 | |
---|
| 110 | t0 = time.time() |
---|
| 111 | |
---|
| 112 | #Broadcast local timestep from every processor to every other |
---|
| 113 | for pid in range(self.numproc): |
---|
| 114 | #print 'P%d calling broadcast from %d' %(self.processor, pid) |
---|
| 115 | self.local_timestep[0] = self.timestep |
---|
| 116 | pypar.broadcast(self.local_timestep, pid, bypass=True) |
---|
| 117 | self.local_timesteps[pid] = self.local_timestep[0] |
---|
| 118 | |
---|
| 119 | self.timestep = min(self.local_timesteps) |
---|
| 120 | |
---|
| 121 | pypar.barrier() |
---|
| 122 | self.communication_broadcast_time += time.time()-t0 |
---|
| 123 | |
---|
| 124 | # LINDA: |
---|
| 125 | # Moved timestep to here |
---|
| 126 | |
---|
| 127 | Domain.update_timestep(self, yieldstep, finaltime) |
---|
| 128 | |
---|
| 129 | |
---|
| 130 | def update_timestep(self, yieldstep, finaltime): |
---|
| 131 | """Calculate local timestep |
---|
| 132 | """ |
---|
| 133 | |
---|
| 134 | # LINDA: Moved below so timestep is updated before |
---|
| 135 | # calculating statistic |
---|
| 136 | |
---|
| 137 | #Compute minimal timestep on local process |
---|
| 138 | #Domain.update_timestep(self, yieldstep, finaltime) |
---|
| 139 | |
---|
| 140 | pypar.barrier() |
---|
| 141 | |
---|
| 142 | import time |
---|
| 143 | #Compute minimal timestep across all processes |
---|
| 144 | self.local_timestep[0] = self.timestep |
---|
| 145 | use_reduce_broadcast = True |
---|
| 146 | if use_reduce_broadcast: |
---|
| 147 | t0 = time.time() |
---|
| 148 | pypar.reduce(self.local_timestep, pypar.MIN, 0, |
---|
| 149 | buffer=self.global_timestep, |
---|
| 150 | bypass=True) |
---|
| 151 | |
---|
| 152 | else: |
---|
| 153 | #Alternative: Try using straight send and receives |
---|
| 154 | t0 = time.time() |
---|
| 155 | self.global_timestep[0] = self.timestep |
---|
| 156 | |
---|
| 157 | if self.processor == 0: |
---|
| 158 | for i in range(1, self.numproc): |
---|
| 159 | pypar.receive(i, |
---|
| 160 | buffer=self.local_timestep, |
---|
| 161 | bypass=True) |
---|
| 162 | |
---|
| 163 | if self.local_timestep[0] < self.global_timestep[0]: |
---|
| 164 | self.global_timestep[0] = self.local_timestep[0] |
---|
| 165 | else: |
---|
| 166 | pypar.send(self.local_timestep, 0, |
---|
| 167 | use_buffer=True, bypass=True) |
---|
| 168 | |
---|
| 169 | |
---|
| 170 | self.communication_reduce_time += time.time()-t0 |
---|
| 171 | |
---|
| 172 | |
---|
| 173 | #Broadcast minimal timestep to all |
---|
| 174 | t0 = time.time() |
---|
| 175 | pypar.broadcast(self.global_timestep, 0, |
---|
| 176 | bypass=True) |
---|
| 177 | |
---|
| 178 | self.communication_broadcast_time += time.time()-t0 |
---|
| 179 | |
---|
| 180 | |
---|
| 181 | self.timestep = self.global_timestep[0] |
---|
| 182 | |
---|
| 183 | # LINDA: |
---|
| 184 | # update local stats now |
---|
| 185 | |
---|
| 186 | #Compute minimal timestep on local process |
---|
| 187 | Domain.update_timestep(self, yieldstep, finaltime) |
---|
| 188 | |
---|
[3431] | 189 | # FIXME (Ole) We should update the variable min_timestep for use |
---|
| 190 | # with write_time (or redo write_time) |
---|
| 191 | |
---|
[3185] | 192 | #update_timestep = update_timestep_1 |
---|
| 193 | |
---|
| 194 | def update_ghosts(self): |
---|
| 195 | |
---|
| 196 | # We must send the information from the full cells and |
---|
| 197 | # receive the information for the ghost cells |
---|
| 198 | # We have a dictionary of lists with ghosts expecting updates from |
---|
| 199 | # the separate processors |
---|
| 200 | |
---|
| 201 | |
---|
| 202 | from Numeric import take,put |
---|
| 203 | import time |
---|
| 204 | t0 = time.time() |
---|
| 205 | |
---|
| 206 | # update of non-local ghost cells |
---|
| 207 | for iproc in range(self.numproc): |
---|
| 208 | if iproc == self.processor: |
---|
| 209 | #Send data from iproc processor to other processors |
---|
| 210 | for send_proc in self.full_send_dict: |
---|
| 211 | if send_proc != iproc: |
---|
| 212 | |
---|
| 213 | Idf = self.full_send_dict[send_proc][0] |
---|
| 214 | Xout = self.full_send_dict[send_proc][2] |
---|
| 215 | |
---|
| 216 | for i, q in enumerate(self.conserved_quantities): |
---|
| 217 | #print 'Send',i,q |
---|
| 218 | Q_cv = self.quantities[q].centroid_values |
---|
| 219 | Xout[:,i] = take(Q_cv, Idf) |
---|
| 220 | |
---|
| 221 | pypar.send(Xout, send_proc, |
---|
| 222 | use_buffer=True, bypass = True) |
---|
| 223 | |
---|
| 224 | |
---|
| 225 | else: |
---|
| 226 | #Receive data from the iproc processor |
---|
| 227 | if self.ghost_recv_dict.has_key(iproc): |
---|
| 228 | |
---|
| 229 | Idg = self.ghost_recv_dict[iproc][0] |
---|
| 230 | X = self.ghost_recv_dict[iproc][2] |
---|
| 231 | |
---|
| 232 | X = pypar.receive(iproc, buffer=X, bypass = True) |
---|
| 233 | |
---|
| 234 | for i, q in enumerate(self.conserved_quantities): |
---|
| 235 | #print 'Receive',i,q |
---|
| 236 | Q_cv = self.quantities[q].centroid_values |
---|
| 237 | put(Q_cv, Idg, X[:,i]) |
---|
| 238 | |
---|
| 239 | #local update of ghost cells |
---|
| 240 | iproc = self.processor |
---|
| 241 | if self.full_send_dict.has_key(iproc): |
---|
| 242 | |
---|
| 243 | # LINDA: |
---|
| 244 | # now store full as local id, global id, value |
---|
| 245 | Idf = self.full_send_dict[iproc][0] |
---|
| 246 | |
---|
| 247 | # LINDA: |
---|
| 248 | # now store ghost as local id, global id, value |
---|
| 249 | Idg = self.ghost_recv_dict[iproc][0] |
---|
| 250 | |
---|
| 251 | for i, q in enumerate(self.conserved_quantities): |
---|
| 252 | #print 'LOCAL SEND RECEIVE',i,q |
---|
| 253 | Q_cv = self.quantities[q].centroid_values |
---|
| 254 | put(Q_cv, Idg, take(Q_cv, Idf)) |
---|
| 255 | |
---|
| 256 | self.communication_time += time.time()-t0 |
---|
| 257 | |
---|
| 258 | |
---|
| 259 | def write_time(self): |
---|
| 260 | if self.min_timestep == self.max_timestep: |
---|
| 261 | print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\ |
---|
| 262 | %(self.processor, self.time, self.min_timestep, self.number_of_steps, |
---|
| 263 | self.number_of_first_order_steps) |
---|
| 264 | elif self.min_timestep > self.max_timestep: |
---|
| 265 | print 'Processor %d, Time = %.4f, steps=%d (%d)'\ |
---|
| 266 | %(self.processor, self.time, self.number_of_steps, |
---|
| 267 | self.number_of_first_order_steps) |
---|
| 268 | else: |
---|
| 269 | print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\ |
---|
| 270 | %(self.processor, self.time, self.min_timestep, |
---|
| 271 | self.max_timestep, self.number_of_steps, |
---|
| 272 | self.number_of_first_order_steps) |
---|
| 273 | |
---|
| 274 | |
---|
| 275 | def evolve(self, yieldstep = None, finaltime = None): |
---|
| 276 | """Specialisation of basic evolve method from parent class |
---|
| 277 | """ |
---|
| 278 | |
---|
| 279 | #Initialise real time viz if requested |
---|
| 280 | if self.time == 0.0: |
---|
| 281 | pass |
---|
| 282 | |
---|
| 283 | #Call basic machinery from parent class |
---|
| 284 | for t in Domain.evolve(self, yieldstep, finaltime): |
---|
| 285 | |
---|
| 286 | #Pass control on to outer loop for more specific actions |
---|
| 287 | yield(t) |
---|