import sys from os import sep sys.path.append('..'+sep+'pyvolution') """Class Parallel_Shallow_Water_Domain - 2D triangular domains for finite-volume computations of the shallow water equation, with extra structures to allow communication between other Parallel_Domains and itself This module contains a specialisation of class Domain from module shallow_water.py Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou Geoscience Australia, 2004-2005 """ import logging, logging.config logger = logging.getLogger('parallel') logger.setLevel(logging.WARNING) try: logging.config.fileConfig('log.ini') except: pass from shallow_water import * from Numeric import zeros, Float, Int, ones, allclose, array import pypar class Parallel_Domain(Domain): def __init__(self, coordinates, vertices, boundary = None, full_send_dict = None, ghost_recv_dict = None): self.processor = pypar.rank() self.numproc = pypar.size() Domain.__init__(self, coordinates, vertices, boundary) N = self.number_of_elements self.processor = pypar.rank() self.numproc = pypar.size() # Setup Communication Buffers self.nsys = 3 for key in full_send_dict: buffer_shape = full_send_dict[key][0].shape[0] full_send_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float)) for key in ghost_recv_dict: buffer_shape = ghost_recv_dict[key][0].shape[0] ghost_recv_dict[key].append(zeros( (buffer_shape,self.nsys) ,Float)) self.full_send_dict = full_send_dict self.ghost_recv_dict = ghost_recv_dict # Buffers for synchronisation of timesteps self.local_timestep = zeros(1, Float) self.global_timestep = zeros(1, Float) self.local_timesteps = zeros(self.numproc, Float) self.communication_time = 0.0 self.communication_reduce_time = 0.0 self.communication_broadcast_time = 0.0 def check_integrity(self): Domain.check_integrity(self) msg = 'Will need to check global and local numbering' assert self.conserved_quantities[0] == 'stage', msg assert self.conserved_quantities[1] == 'xmomentum', msg assert self.conserved_quantities[2] == 'ymomentum', msg def update_timestep_1(self, yieldstep, finaltime): """Calculate local timestep using broadcasts """ Domain.update_timestep(self, yieldstep, finaltime) import time t0 = time.time() #Broadcast local timestep from every processor to every other for pid in range(self.numproc): #print 'P%d calling broadcast from %d' %(self.processor, pid) self.local_timestep[0] = self.timestep pypar.broadcast(self.local_timestep, pid, bypass=True) self.local_timesteps[pid] = self.local_timestep[0] self.timestep = min(self.local_timesteps) pypar.barrier() self.communication_broadcast_time += time.time()-t0 def update_timestep(self, yieldstep, finaltime): """Calculate local timestep """ #Compute minimal timestep on local process Domain.update_timestep(self, yieldstep, finaltime) pypar.barrier() import time #Compute minimal timestep across all processes self.local_timestep[0] = self.timestep use_reduce_broadcast = True if use_reduce_broadcast: t0 = time.time() pypar.reduce(self.local_timestep, pypar.MIN, 0, buffer=self.global_timestep, bypass=True) else: #Alternative: Try using straight send and receives t0 = time.time() self.global_timestep[0] = self.timestep if self.processor == 0: for i in range(1, self.numproc): pypar.receive(i, buffer=self.local_timestep, bypass=True) if self.local_timestep[0] < self.global_timestep[0]: self.global_timestep[0] = self.local_timestep[0] else: pypar.send(self.local_timestep, 0, use_buffer=True, bypass=True) self.communication_reduce_time += time.time()-t0 #Broadcast minimal timestep to all t0 = time.time() pypar.broadcast(self.global_timestep, 0, bypass=True) self.communication_broadcast_time += time.time()-t0 self.timestep = self.global_timestep[0] #update_timestep = update_timestep_1 def update_ghosts(self): # We must send the information from the full cells and # receive the information for the ghost cells # We have a dictionary of lists with ghosts expecting updates from # the separate processors from Numeric import take,put import time t0 = time.time() # update of non-local ghost cells for iproc in range(self.numproc): if iproc == self.processor: #Send data from iproc processor to other processors for send_proc in self.full_send_dict: if send_proc != iproc: Idf = self.full_send_dict[send_proc][0] Xout = self.full_send_dict[send_proc][2] for i, q in enumerate(self.conserved_quantities): #print 'Send',i,q Q_cv = self.quantities[q].centroid_values Xout[:,i] = take(Q_cv, Idf) pypar.send(Xout, send_proc, use_buffer=True, bypass = True) else: #Receive data from the iproc processor if self.ghost_recv_dict.has_key(iproc): Idg = self.ghost_recv_dict[iproc][0] X = self.ghost_recv_dict[iproc][2] X = pypar.receive(iproc, buffer=X, bypass = True) for i, q in enumerate(self.conserved_quantities): #print 'Receive',i,q Q_cv = self.quantities[q].centroid_values put(Q_cv, Idg, X[:,i]) #local update of ghost cells iproc = self.processor if self.full_send_dict.has_key(iproc): # LINDA: # now store full as local id, global id, value Idf = self.full_send_dict[iproc][0] # LINDA: # now store ghost as local id, global id, value Idg = self.ghost_recv_dict[iproc][0] for i, q in enumerate(self.conserved_quantities): #print 'LOCAL SEND RECEIVE',i,q Q_cv = self.quantities[q].centroid_values put(Q_cv, Idg, take(Q_cv, Idf)) self.communication_time += time.time()-t0 def write_time(self): if self.min_timestep == self.max_timestep: print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\ %(self.processor, self.time, self.min_timestep, self.number_of_steps, self.number_of_first_order_steps) elif self.min_timestep > self.max_timestep: print 'Processor %d, Time = %.4f, steps=%d (%d)'\ %(self.processor, self.time, self.number_of_steps, self.number_of_first_order_steps) else: print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\ %(self.processor, self.time, self.min_timestep, self.max_timestep, self.number_of_steps, self.number_of_first_order_steps) def evolve(self, yieldstep = None, finaltime = None): """Specialisation of basic evolve method from parent class """ #Initialise real time viz if requested if self.time == 0.0: pass #Call basic machinery from parent class for t in Domain.evolve(self, yieldstep, finaltime): #Pass control on to outer loop for more specific actions yield(t)