[3184] | 1 | import sys |
---|
| 2 | from os import sep |
---|
| 3 | sys.path.append('..'+sep+'pyvolution') |
---|
| 4 | |
---|
| 5 | """Class Parallel_Domain - |
---|
| 6 | 2D triangular domains for finite-volume computations of |
---|
| 7 | the advection equation, with extra structures to allow |
---|
| 8 | communication between other Parallel_Domains and itself |
---|
| 9 | |
---|
| 10 | This module contains a specialisation of class Domain from module advection.py |
---|
| 11 | |
---|
| 12 | Ole Nielsen, Stephen Roberts, Duncan Gray, Christopher Zoppou |
---|
| 13 | Geoscience Australia, 2004-2005 |
---|
| 14 | """ |
---|
| 15 | |
---|
| 16 | import logging, logging.config |
---|
| 17 | logger = logging.getLogger('parallel') |
---|
| 18 | logger.setLevel(logging.WARNING) |
---|
| 19 | |
---|
| 20 | try: |
---|
| 21 | logging.config.fileConfig('log.ini') |
---|
| 22 | except: |
---|
| 23 | pass |
---|
| 24 | |
---|
[3315] | 25 | from pyvolution.advection_vtk import * |
---|
[3184] | 26 | from Numeric import zeros, Float, Int, ones, allclose, array |
---|
| 27 | import pypar |
---|
| 28 | |
---|
| 29 | |
---|
| 30 | class Parallel_Domain(Domain): |
---|
| 31 | |
---|
| 32 | def __init__(self, |
---|
| 33 | coordinates, |
---|
| 34 | vertices, |
---|
| 35 | boundary = None, |
---|
| 36 | full_send_dict = None, |
---|
| 37 | ghost_recv_dict = None, |
---|
| 38 | velocity = None): |
---|
| 39 | |
---|
| 40 | Domain.__init__(self, |
---|
| 41 | coordinates, |
---|
| 42 | vertices, |
---|
| 43 | boundary, |
---|
| 44 | velocity = velocity, |
---|
| 45 | full_send_dict=full_send_dict, |
---|
| 46 | ghost_recv_dict=ghost_recv_dict, |
---|
| 47 | processor=pypar.rank(), |
---|
| 48 | numproc=pypar.size() |
---|
| 49 | ) |
---|
| 50 | |
---|
| 51 | N = self.number_of_elements |
---|
| 52 | |
---|
| 53 | |
---|
| 54 | self.communication_time = 0.0 |
---|
| 55 | self.communication_reduce_time = 0.0 |
---|
| 56 | |
---|
| 57 | |
---|
| 58 | print 'processor',self.processor |
---|
| 59 | print 'numproc',self.numproc |
---|
| 60 | |
---|
| 61 | def check_integrity(self): |
---|
| 62 | Domain.check_integrity(self) |
---|
| 63 | |
---|
| 64 | msg = 'Will need to check global and local numbering' |
---|
| 65 | assert self.conserved_quantities[0] == 'stage', msg |
---|
| 66 | |
---|
| 67 | def update_timestep(self, yieldstep, finaltime): |
---|
| 68 | |
---|
| 69 | #LINDA: |
---|
| 70 | # moved the calculation so that it is done after timestep |
---|
| 71 | # has been broadcast |
---|
| 72 | |
---|
| 73 | # # Calculate local timestep |
---|
| 74 | # Domain.update_timestep(self, yieldstep, finaltime) |
---|
| 75 | |
---|
| 76 | import time |
---|
| 77 | t0 = time.time() |
---|
| 78 | |
---|
| 79 | # For some reason it looks like pypar only reduces numeric arrays |
---|
| 80 | # hence we need to create some dummy arrays for communication |
---|
| 81 | ltimestep = ones( 1, Float ) |
---|
| 82 | ltimestep[0] = self.timestep |
---|
| 83 | gtimestep = zeros( 1, Float) # Buffer for results |
---|
| 84 | |
---|
| 85 | pypar.raw_reduce(ltimestep, gtimestep, pypar.MIN, 0) |
---|
| 86 | pypar.broadcast(gtimestep,0) |
---|
| 87 | |
---|
| 88 | self.timestep = gtimestep[0] |
---|
| 89 | |
---|
| 90 | self.communication_reduce_time += time.time()-t0 |
---|
| 91 | |
---|
| 92 | # LINDA: |
---|
| 93 | # Now update time stats |
---|
| 94 | |
---|
| 95 | # Calculate local timestep |
---|
| 96 | Domain.update_timestep(self, yieldstep, finaltime) |
---|
| 97 | |
---|
| 98 | def update_ghosts(self): |
---|
| 99 | |
---|
| 100 | # We must send the information from the full cells and |
---|
| 101 | # receive the information for the ghost cells |
---|
| 102 | # We have a dictionary of lists with ghosts expecting updates from |
---|
| 103 | # the separate processors |
---|
| 104 | |
---|
| 105 | from Numeric import take,put |
---|
| 106 | import time |
---|
| 107 | t0 = time.time() |
---|
| 108 | |
---|
| 109 | stage_cv = self.quantities['stage'].centroid_values |
---|
| 110 | |
---|
| 111 | # update of non-local ghost cells |
---|
| 112 | for iproc in range(self.numproc): |
---|
| 113 | if iproc == self.processor: |
---|
| 114 | #Send data from iproc processor to other processors |
---|
| 115 | for send_proc in self.full_send_dict: |
---|
| 116 | if send_proc != iproc: |
---|
| 117 | |
---|
| 118 | Idf = self.full_send_dict[send_proc][0] |
---|
| 119 | Xout = self.full_send_dict[send_proc][2] |
---|
| 120 | |
---|
| 121 | N = len(Idf) |
---|
| 122 | |
---|
| 123 | #for i in range(N): |
---|
| 124 | # Xout[i,0] = stage_cv[Idf[i]] |
---|
| 125 | Xout[:,0] = take(stage_cv, Idf) |
---|
| 126 | |
---|
| 127 | pypar.send(Xout,send_proc) |
---|
| 128 | |
---|
| 129 | |
---|
| 130 | else: |
---|
| 131 | #Receive data from the iproc processor |
---|
| 132 | if self.ghost_recv_dict.has_key(iproc): |
---|
| 133 | |
---|
| 134 | # LINDA: |
---|
| 135 | # now store ghost as local id, global id, value |
---|
| 136 | Idg = self.ghost_recv_dict[iproc][0] |
---|
| 137 | X = self.ghost_recv_dict[iproc][2] |
---|
| 138 | |
---|
| 139 | X = pypar.receive(iproc,X) |
---|
| 140 | N = len(Idg) |
---|
| 141 | |
---|
| 142 | put(stage_cv, Idg, X[:,0]) |
---|
| 143 | #for i in range(N): |
---|
| 144 | # stage_cv[Idg[i]] = X[i,0] |
---|
| 145 | |
---|
| 146 | |
---|
| 147 | #local update of ghost cells |
---|
| 148 | iproc = self.processor |
---|
| 149 | if self.full_send_dict.has_key(iproc): |
---|
| 150 | |
---|
| 151 | # LINDA: |
---|
| 152 | # now store full as local id, global id, value |
---|
| 153 | Idf = self.full_send_dict[iproc][0] |
---|
| 154 | |
---|
| 155 | # LINDA: |
---|
| 156 | # now store ghost as local id, global id, value |
---|
| 157 | Idg = self.ghost_recv_dict[iproc][0] |
---|
| 158 | |
---|
| 159 | N = len(Idg) |
---|
| 160 | |
---|
| 161 | #for i in range(N): |
---|
| 162 | # #print i,Idg[i],Idf[i] |
---|
| 163 | # stage_cv[Idg[i]] = stage_cv[Idf[i]] |
---|
| 164 | |
---|
| 165 | put(stage_cv, Idg, take(stage_cv, Idf)) |
---|
| 166 | |
---|
| 167 | |
---|
| 168 | self.communication_time += time.time()-t0 |
---|
| 169 | |
---|
| 170 | |
---|
| 171 | def write_time(self): |
---|
| 172 | if self.min_timestep == self.max_timestep: |
---|
| 173 | print 'Processor %d, Time = %.4f, delta t = %.8f, steps=%d (%d)'\ |
---|
| 174 | %(self.processor, self.time, self.min_timestep, self.number_of_steps, |
---|
| 175 | self.number_of_first_order_steps) |
---|
| 176 | elif self.min_timestep > self.max_timestep: |
---|
| 177 | print 'Processor %d, Time = %.4f, steps=%d (%d)'\ |
---|
| 178 | %(self.processor, self.time, self.number_of_steps, |
---|
| 179 | self.number_of_first_order_steps) |
---|
| 180 | else: |
---|
| 181 | print 'Processor %d, Time = %.4f, delta t in [%.8f, %.8f], steps=%d (%d)'\ |
---|
| 182 | %(self.processor, self.time, self.min_timestep, |
---|
| 183 | self.max_timestep, self.number_of_steps, |
---|
| 184 | self.number_of_first_order_steps) |
---|
| 185 | |
---|
| 186 | |
---|
| 187 | |
---|
| 188 | def evolve(self, yieldstep = None, finaltime = None): |
---|
| 189 | """Specialisation of basic evolve method from parent class |
---|
| 190 | """ |
---|
| 191 | |
---|
| 192 | #Initialise real time viz if requested |
---|
| 193 | if self.time == 0.0: |
---|
| 194 | pass |
---|
| 195 | |
---|
| 196 | #Call basic machinery from parent class |
---|
| 197 | for t in Domain.evolve(self, yieldstep, finaltime): |
---|
| 198 | |
---|
| 199 | #Pass control on to outer loop for more specific actions |
---|
| 200 | yield(t) |
---|