source: trunk/anuga_core/source/anuga_parallel/parallel_structure_operator.py @ 8283

Last change on this file since 8283 was 8224, checked in by janes, 13 years ago

PETE: Adding parallel fractional operator code

File size: 19.7 KB
Line 
1import anuga
2import numpy as num
3import math
4import parallel_inlet_enquiry 
5
6from anuga.utilities.system_tools import log_to_file
7from anuga.utilities.numerical_tools import ensure_numeric
8from anuga.structures.inlet_enquiry import Inlet_enquiry
9import pypar
10
11class Parallel_Structure_operator(anuga.Operator):
12    """Parallel Structure Operator - transfer water from one rectangular box to another.
13    Sets up the geometry of problem
14   
15    This is the base class for structures (culverts, pipes, bridges etc) that exist across multiple
16    parallel shallow water domains. Inherit from this class (and overwrite discharge_routine method
17    for specific subclasses)
18   
19    Input: Two points, pipe_size (either diameter or width, depth),
20    mannings_rougness,
21    """ 
22
23    counter = 0
24
25    """
26     ===========================================================================================
27     PETE: Inputs to this constructor are identical to the serial
28     structure operator, except for the following arguments:
29     master_proc - the processor that coordinates all processors (with domains) associated with this structure [INT]
30     procs - the list of processors associated with thisstructure (List[INT])
31     inlet_master_proc - master_proc of the first and second inlet (List[2])
32     inlet_procs - list of processors associated with the first and second inlet (LIST[2][INT])
33     enquiry_proc - processor associated the first and second enquiry point (List[2])
34    """
35
36    def __init__(self,
37                 domain,
38                 end_points,
39                 exchange_lines,
40                 enquiry_points,
41                 width,
42                 height,
43                 apron,
44                 manning,
45                 enquiry_gap,
46                 description,
47                 label,
48                 structure_type,
49                 logging,
50                 verbose,
51                 master_proc = 0,
52                 procs = None,
53                 inlet_master_proc = [0,0],
54                 inlet_procs = None,
55                 enquiry_proc = None):
56
57        anuga.Operator.__init__(self,domain)
58
59        # Allocate default processor associations if not specified in arguments
60        # although we assume that such associations are provided correctly by the
61        # parallel_operator_factory.
62
63        self.master_proc = master_proc
64        self.inlet_master_proc = inlet_master_proc
65       
66        if procs is None:
67            self.procs = [master_proc]
68        else:
69            self.procs = procs
70
71        if inlet_procs is None:
72            self.inlet_procs = [[inlet_master_proc[0]],[inlet_master_proc[0]]]
73        else:
74            self.inlet_procs = inlet_procs
75
76        if enquiry_proc is None:
77            self.enquiry_proc = [[inlet_master_proc[0]],[inlet_master_proc[0]]]
78        else:
79            self.enquiry_proc = enquiry_proc
80
81        self.end_points = ensure_numeric(end_points)
82        self.exchange_lines = ensure_numeric(exchange_lines)
83        self.enquiry_points = ensure_numeric(enquiry_points)
84        self.myid = pypar.rank()
85        self.num_procs = pypar.size()
86       
87        if height is None:
88            height = width
89
90        if apron is None:
91            apron = width
92
93        self.width  = width
94        self.height = height
95        self.apron  = apron
96        self.manning = manning
97        self.enquiry_gap = enquiry_gap
98
99        if description == None:
100            self.description = ' '
101        else:
102            self.description = description
103       
104        if label == None:
105            self.label = "structure_%g" % Parallel_Structure_operator.counter + "_P" + str(self.myid)
106        else:
107            self.label = label + '_%g' % Parallel_Structure_operator.counter + "_P" + str(self.myid)
108
109        if structure_type == None:
110            self.structure_type = 'generic structure'
111        else:
112            self.structure_type = structure_type
113           
114        self.verbose = verbose       
115       
116        # Keep count of structures
117        if self.myid == master_proc:
118            Parallel_Structure_operator.counter += 1
119
120        # Slots for recording current statistics
121        self.discharge = 0.0
122        self.velocity = 0.0
123        self.delta_total_energy = 0.0
124        self.driving_energy = 0.0
125       
126        if exchange_lines is not None:
127            self.__process_skew_culvert()
128
129        elif end_points is not None:
130            self.__process_non_skew_culvert()
131        else:
132            raise Exception, 'Define either exchange_lines or end_points'
133       
134        self.inlets = []
135
136        # Allocate parallel inlet enquiry, assign None if processor is not associated with particular
137        # inlet.
138
139        if self.myid in self.inlet_procs[0]:
140            line0 = self.exchange_lines[0] 
141            enquiry_point0 = self.enquiry_points[0]
142            outward_vector0 = self.culvert_vector
143
144            self.inlets.append(parallel_inlet_enquiry.Parallel_Inlet_enquiry(self.domain, line0,
145                               enquiry_point0, self.inlet_master_proc[0], self.inlet_procs[0], 
146                               self.enquiry_proc[0], outward_vector0, self.verbose))
147        else:
148            self.inlets.append(None)
149
150        if self.myid in self.inlet_procs[1]:
151            line1 = self.exchange_lines[1]
152            enquiry_point1 = self.enquiry_points[1]
153            outward_vector1  = - self.culvert_vector
154
155            self.inlets.append(parallel_inlet_enquiry.Parallel_Inlet_enquiry(self.domain, line1,
156                               enquiry_point1, self.inlet_master_proc[1],
157                               self.inlet_procs[1], self.enquiry_proc[1], outward_vector1, self.verbose))
158        else:
159            self.inlets.append(None)
160
161        self.inflow_index = 0
162        self.outflow_index = 1
163
164        self.set_logging(logging)
165
166    def __call__(self):
167
168        timestep = self.domain.get_timestep()
169
170        Q, barrel_speed, outlet_depth = self.discharge_routine()
171
172        # Get attributes of Inflow inlet, all procs associated with inlet must call
173        if self.myid in self.inlet_procs[self.inflow_index]:
174            old_inflow_depth = self.inlets[self.inflow_index].get_global_average_depth()
175            old_inflow_stage = self.inlets[self.inflow_index].get_global_average_stage()
176            old_inflow_xmom = self.inlets[self.inflow_index].get_global_average_xmom()
177            old_inflow_ymom = self.inlets[self.inflow_index].get_global_average_ymom()
178            inflow_area = self.inlets[self.inflow_index].get_global_area()
179
180        # Master proc of inflow inlet sends attributes to master proc of structure
181        if self.myid == self.master_proc:
182            if self.myid != self.inlet_master_proc[self.inflow_index]:
183                old_inflow_depth = pypar.receive(self.inlet_master_proc[self.inflow_index])
184                old_inflow_stage = pypar.receive(self.inlet_master_proc[self.inflow_index])
185                old_inflow_xmom = pypar.receive(self.inlet_master_proc[self.inflow_index])
186                old_inflow_ymom = pypar.receive(self.inlet_master_proc[self.inflow_index])
187                inflow_area = pypar.receive(self.inlet_master_proc[self.inflow_index])
188        elif self.myid == self.inlet_master_proc[self.inflow_index]:
189            pypar.send(old_inflow_depth, self.master_proc)
190            pypar.send(old_inflow_stage, self.master_proc)
191            pypar.send(old_inflow_xmom, self.master_proc)
192            pypar.send(old_inflow_ymom, self.master_proc)
193            pypar.send(inflow_area, self.master_proc)
194
195        # Implement the update of flow over a timestep by
196        # using a semi-implict update. This ensures that
197        # the update does not create a negative depth
198       
199        # Master proc of structure only
200        if self.myid == self.master_proc:
201            if old_inflow_depth > 0.0 :
202                Q_star = Q/old_inflow_depth
203            else:
204                Q_star = 0.0
205
206            factor = 1.0/(1.0 + Q_star*timestep/inflow_area)
207
208            new_inflow_depth = old_inflow_depth*factor
209            new_inflow_xmom = old_inflow_xmom*factor
210            new_inflow_ymom = old_inflow_ymom*factor
211
212        # Master proc of structure sends new inflow attributes to all inflow inlet processors
213
214        if self.myid == self.master_proc:
215            for i in self.inlet_procs[self.inflow_index]:
216                if i == self.master_proc: continue
217                pypar.send(new_inflow_depth, i)
218                pypar.send(new_inflow_xmom, i)
219                pypar.send(new_inflow_ymom, i)
220        elif self.myid in self.inlet_procs[self.inflow_index]:
221            new_inflow_depth = pypar.receive(self.master_proc)
222            new_inflow_xmom = pypar.receive(self.master_proc)
223            new_inflow_ymom = pypar.receive(self.master_proc)
224
225        # Inflow inlet procs sets new attributes
226        if self.myid in self.inlet_procs[self.inflow_index]:
227            self.inlets[self.inflow_index].set_depths(new_inflow_depth)
228            self.inlets[self.inflow_index].set_xmoms(new_inflow_xmom)
229            self.inlets[self.inflow_index].set_ymoms(new_inflow_ymom)
230
231        # Get outflow inlet attributes, all processors associated with outflow inlet must call
232        if self.myid in self.inlet_procs[self.outflow_index]:
233            outflow_area = self.inlets[self.outflow_index].get_global_area()
234            outflow_average_depth = self.inlets[self.outflow_index].get_global_average_depth()
235            outflow_outward_culvert_vector = self.inlets[self.outflow_index].outward_culvert_vector
236
237        # Master proc of outflow inlet sends attribute to master proc of structure
238        if self.myid == self.master_proc:
239            if self.myid != self.inlet_master_proc[self.outflow_index]:
240                outflow_area = pypar.receive(self.inlet_master_proc[self.outflow_index])
241                outflow_average_depth = pypar.receive(self.inlet_master_proc[self.outflow_index])
242                outflow_outward_culvert_vector = pypar.receive(self.inlet_master_proc[self.outflow_index])
243        elif self.myid == self.inlet_master_proc[self.outflow_index]:
244            pypar.send(outflow_area, self.master_proc)
245            pypar.send(outflow_average_depth, self.master_proc)
246            pypar.send(outflow_outward_culvert_vector, self.master_proc)
247
248        # Master proc of structure computes new outflow attributes
249        if self.myid == self.master_proc:
250            loss = (old_inflow_depth - new_inflow_depth)*inflow_area
251
252            # set outflow
253            if old_inflow_depth > 0.0 :
254                timestep_star = timestep*new_inflow_depth/old_inflow_depth
255            else:
256                timestep_star = 0.0
257
258            outflow_extra_depth = Q*timestep_star/outflow_area
259            outflow_direction = - outflow_outward_culvert_vector
260            outflow_extra_momentum = outflow_extra_depth*barrel_speed*outflow_direction
261           
262            gain = outflow_extra_depth*outflow_area
263
264        # Update Stats
265            self.discharge  = Q#outflow_extra_depth*self.outflow.get_area()/timestep
266            self.velocity = barrel_speed#self.discharge/outlet_depth/self.width
267
268            new_outflow_depth = outflow_average_depth + outflow_extra_depth
269
270            if self.use_momentum_jet :
271                # FIXME (SR) Review momentum to account for possible hydraulic jumps at outlet
272                #new_outflow_xmom = outflow.get_average_xmom() + outflow_extra_momentum[0]
273                #new_outflow_ymom = outflow.get_average_ymom() + outflow_extra_momentum[1]
274
275                new_outflow_xmom = barrel_speed*new_outflow_depth*outflow_direction[0]
276                new_outflow_ymom = barrel_speed*new_outflow_depth*outflow_direction[1]
277
278            else:
279                #new_outflow_xmom = outflow.get_average_xmom()
280                #new_outflow_ymom = outflow.get_average_ymom()
281
282                new_outflow_xmom = 0.0
283                new_outflow_ymom = 0.0
284
285            # master proc of structure sends outflow attributes to all outflow procs
286            for i in self.inlet_procs[self.outflow_index]:
287                if i == self.myid: continue
288                pypar.send(new_outflow_depth, i)
289                pypar.send(new_outflow_xmom, i)
290                pypar.send(new_outflow_ymom, i)
291        # outflow inlet procs receives new outflow attributes
292        elif self.myid in self.inlet_procs[self.outflow_index]:
293            new_outflow_depth = pypar.receive(self.master_proc)
294            new_outflow_xmom = pypar.receive(self.master_proc)
295            new_outflow_ymom = pypar.receive(self.master_proc)
296
297        # outflow inlet procs sets new outflow attributes
298        if self.myid in self.inlet_procs[self.outflow_index]:
299            self.inlets[self.outflow_index].set_depths(new_outflow_depth)
300            self.inlets[self.outflow_index].set_xmoms(new_outflow_xmom)
301            self.inlets[self.outflow_index].set_ymoms(new_outflow_ymom)
302
303    def __process_non_skew_culvert(self):
304        """Create lines at the end of a culvert inlet and outlet.
305        At either end two lines will be created; one for the actual flow to pass through and one a little further away
306        for enquiring the total energy at both ends of the culvert and transferring flow.
307        """
308       
309        self.culvert_vector = self.end_points[1] - self.end_points[0]
310        self.culvert_length = math.sqrt(num.sum(self.culvert_vector**2))   
311        assert self.culvert_length > 0.0, 'The length of culvert is less than 0'
312       
313        self.culvert_vector /= self.culvert_length
314       
315        culvert_normal = num.array([-self.culvert_vector[1], self.culvert_vector[0]])  # Normal vector
316        w = 0.5*self.width*culvert_normal # Perpendicular vector of 1/2 width
317
318        self.exchange_lines = []
319
320        # Build exchange polyline and enquiry point
321        if self.enquiry_points is None:
322           
323            gap = (self.apron + self.enquiry_gap)*self.culvert_vector
324            self.enquiry_points = []
325           
326            for i in [0, 1]:
327                p0 = self.end_points[i] + w
328                p1 = self.end_points[i] - w
329                self.exchange_lines.append(num.array([p0, p1]))
330                ep = self.end_points[i] + (2*i - 1)*gap #(2*i - 1) determines the sign of the points
331                self.enquiry_points.append(ep)
332           
333        else:           
334            for i in [0, 1]:
335                p0 = self.end_points[i] + w
336                p1 = self.end_points[i] - w
337                self.exchange_lines.append(num.array([p0, p1]))
338           
339 
340    def __process_skew_culvert(self):   
341       
342        """Compute skew culvert.
343        If exchange lines are given, the enquiry points are determined. This is for enquiring
344        the total energy at both ends of the culvert and transferring flow.
345        """
346           
347        centre_point0 = 0.5*(self.exchange_lines[0][0] + self.exchange_lines[0][1])
348        centre_point1 = 0.5*(self.exchange_lines[1][0] + self.exchange_lines[1][1])
349       
350        if self.end_points is None:
351            self.culvert_vector = centre_point1 - centre_point0
352        else:
353            self.culvert_vector = self.end_points[1] - self.end_points[0]
354       
355        self.culvert_length = math.sqrt(num.sum(self.culvert_vector**2))
356        assert self.culvert_length > 0.0, 'The length of culvert is less than 0'
357       
358        if self.enquiry_points is None:
359       
360            self.culvert_vector /= self.culvert_length
361            gap = (self.apron + self.enquiry_gap)*self.culvert_vector
362       
363            self.enquiry_points = []
364
365            self.enquiry_points.append(centre_point0 - gap)
366            self.enquiry_points.append(centre_point1 + gap)
367           
368
369    def discharge_routine(self):
370
371        msg = 'Need to impelement '
372        raise
373           
374
375    def statistics(self):
376        # Warning: requires synchronization, must be called by all procs associated
377        # with this structure
378
379        message = ' '
380
381        if self.myid == self.master_proc:
382
383            message  = '===============================================\n'
384            message += 'Parallel Structure Operator: %s\n' % self.label
385            message += '===============================================\n'
386
387            message += 'Structure Type: %s\n' % self.structure_type
388
389            message += 'Description\n'
390            message += '%s' % self.description
391            message += '\n'
392       
393        for i, inlet in enumerate(self.inlets):
394            if self.myid == self.master_proc:
395                message += '-------------------------------------\n'
396                message +=  'Inlet %i\n' %(i)
397                message += '-------------------------------------\n'
398
399            if inlet is not None: stats = inlet.statistics()
400
401            if self.myid == self.master_proc:
402                if self.myid != self.inlet_master_proc[i]:
403                    stats = pypar.receive(self.inlet_master_proc[i])                   
404            elif self.myid == self.inlet_master_proc[i]:
405                pypar.send(stats, self.master_proc)
406
407            if self.myid == self.master_proc: message += stats
408 
409
410        if self.myid == self.master_proc: message += '=====================================\n'
411
412        return message
413
414
415    def print_statistics(self):
416        # Warning: requires synchronization, must be called by all procs associated
417        # with this structure
418
419        print self.statistics()
420
421
422    def print_timestepping_statistics(self):
423        # Warning: must be called by the master proc of this structure to obtain
424        # meaningful output
425
426        message = ' '
427
428        if self.myid == self.master_proc:
429            message = '--------------------------------------------------\n'
430            message += 'Parallel Structure report for %s:\n' % self.label
431            message += '-------------------------------------------------\n'
432            message += 'Type: %s\n' % self.structure_type
433            message += 'Discharge [m^3/s]: %.2f\n' % self.discharge
434            message += 'Velocity  [m/s]: %.2f\n' % self.velocity
435            message += 'Inlet Driving Energy %.2f\n' % self.driving_energy
436            message += 'Delta Total Energy %.2f\n' % self.delta_total_energy
437            message += 'Control at this instant: %s\n' % self.case
438
439        print message
440
441
442    def set_logging(self, flag=True):
443        # Warning: requires synchronization, must be called by all procs associated
444        # with this structure
445
446        stats = self.statistics()
447        self.logging = flag
448
449        # If flag is true open file with mode = "w" to form a clean file for logging
450        if self.logging:
451            self.log_filename = self.label + '.log'
452            log_to_file(self.log_filename, stats, mode='w')
453            log_to_file(self.log_filename, 'time,discharge,velocity,driving_energy,delta_total_energy')
454
455            #log_to_file(self.log_filename, self.culvert_type)
456
457
458    def timestepping_statistics(self):
459
460        message  = '%.5f, ' % self.domain.get_time()
461        message += '%.5f, ' % self.discharge
462        message += '%.5f, ' % self.velocity
463        message += '%.5f, ' % self.driving_energy
464        message += '%.5f' % self.delta_total_energy
465
466        return message
467
468
469    def get_inlets(self):
470       
471        return self.inlets
472       
473       
474    def get_culvert_length(self):
475       
476        return self.culvert_length
477       
478       
479    def get_culvert_width(self):       
480        return self.width
481       
482       
483    def get_culvert_diameter(self):
484        return self.width
485       
486       
487    def get_culvert_height(self):
488        return self.height
489
490
491    def get_culvert_apron(self):
492        return self.apron
493
494    # Get id of master proc of this structure
495    def get_master_proc(self):
496        return self.master_proc
497
498    # Get id of master proc of first and second inlet
499    def get_inlet_master_proc(self):
500        return self.inlet_master_proc
501
502    # Get id of processors associated with first and second inlet enquiry points
503    def get_enquiry_proc(self):
504        return self.enquiry_proc
505
506    def __parallel_safe(self):
507        return True
Note: See TracBrowser for help on using the repository browser.