source: trunk/anuga_core/source/anuga_parallel/parallel_inlet.py @ 8283

Last change on this file since 8283 was 8227, checked in by janes, 13 years ago

Added additional comments for parallel fractional operator codes

File size: 15.5 KB
Line 
1# To change this template, choose Tools | Templates
2# and open the template in the editor.
3
4import anuga.geometry.polygon
5from anuga.geometry.polygon import inside_polygon, is_inside_polygon, line_intersect
6from anuga.config import velocity_protection, g
7import math
8
9import numpy as num
10from anuga.structures.inlet import Inlet
11import pypar
12
13class Parallel_Inlet(Inlet):
14    """Contains information associated with each inlet
15    """
16
17    """
18    Parallel inlet:
19   
20    master_proc - coordinates all processors associated with this inlet
21    usually the processors with domains which contains parts of this inlet.
22   
23    procs - is the list of all processors associated with this inlet.
24
25    (We assume that the above arguments are determined correctly by the parallel_operator_factory)
26    """
27
28    def __init__(self, domain, line, master_proc = 0, procs = None, verbose=False):
29
30        self.domain = domain
31        self.line = line
32        self.verbose = verbose
33        self.master_proc = master_proc
34
35        if procs is None:
36            self.procs = [self.master_proc]
37        else:
38            self.procs = procs
39
40        self.myid = pypar.rank()
41
42        self.compute_triangle_indices()
43        self.compute_area()
44        self.compute_inlet_length()
45
46
47
48    def compute_triangle_indices(self):
49
50        # Get boundary of full triangles (in absolute coordinates)
51        vertex_coordinates = self.domain.get_full_vertex_coordinates(absolute=True)
52
53        # PETE: we can assume that the ghost triangles in this domain exist in another
54        # domain, therefore any part of the inlet corresponding to them are accounted for.
55
56        self.triangle_indices = line_intersect(vertex_coordinates, self.line)
57
58        for i in self.triangle_indices:
59            assert self.domain.tri_full_flag[i] == 1
60
61    def compute_area(self):
62
63        # Compute inlet area as the sum of areas of triangles identified
64        # by line. Must be called after compute_inlet_triangle_indices().
65        if len(self.triangle_indices) == 0:
66            region = 'Inlet line=%s' % (self.line)
67            msg = 'No triangles have been identified in region '
68            print "WARNING: " + msg
69
70        self.area = 0.0
71        for j in self.triangle_indices:
72            self.area += self.domain.areas[j]
73
74        msg = 'Inlet exchange area has area = %f' % self.area
75        assert self.area >= 0.0
76
77    def compute_inlet_length(self):
78        """ Compute the length of the inlet within this domain (as
79        defined by the input line
80        """
81
82        point0 = self.line[0]
83        point1 = self.line[1]
84
85        self.inlet_length = anuga.geometry.polygon.line_length(self.line)
86
87
88    def get_inlet_length(self):
89        # LOCAL
90        msg = "Warning: compute_inlet_length not implemented"
91        warnings.warn(msg)
92        return self.inlet_length
93
94    def get_line(self):
95        return self.line
96
97    def get_area(self):
98        # LOCAL
99        return self.area
100
101    def get_global_area(self):
102        # GLOBAL: Master processor gathers area from all child processors, and returns value
103
104        # WARNING: requires synchronization, must be called by all procs associated
105        # with this inlet
106       
107        local_area = self.area
108        area = local_area
109
110        if self.myid == self.master_proc:
111
112            for i in self.procs:
113                if i == self.master_proc: continue
114
115                val = pypar.receive(i)
116                area = area + val
117        else:
118            pypar.send(area, self.master_proc)
119
120        return area
121
122
123    def get_areas(self):
124        # Must be called after compute_inlet_triangle_indices().
125        # LOCAL
126       
127        return self.domain.areas.take(self.triangle_indices)
128
129
130    def get_stages(self):
131        # LOCAL
132
133        return self.domain.quantities['stage'].centroid_values.take(self.triangle_indices)
134
135
136    def get_average_stage(self):
137        # LOCAL
138
139        return num.sum(self.get_stages()*self.get_areas())/self.area
140
141    def get_global_average_stage(self):
142        # GLOBAL: Master processor gathers stages from all child processors, and returns average
143
144        # WARNING: requires synchronization, must be called by all procs associated
145        # with this inlet
146
147        local_stage = num.sum(self.get_stages()*self.get_areas())
148        global_area = self.get_global_area()
149        global_stage = local_stage
150
151        if self.myid == self.master_proc:
152            for i in self.procs:
153                if i == self.master_proc: continue
154
155                val = pypar.receive(i)
156                global_stage = global_stage + val
157        else:
158            pypar.send(local_stage, self.master_proc)
159
160
161        return global_stage/global_area
162
163    def get_elevations(self):
164        # LOCAL
165        return self.domain.quantities['elevation'].centroid_values.take(self.triangle_indices)
166
167    def get_average_elevation(self):
168        # LOCAL
169        return num.sum(self.get_elevations()*self.get_areas())/self.area
170
171
172    def get_xmoms(self):
173        # LOCAL
174        return self.domain.quantities['xmomentum'].centroid_values.take(self.triangle_indices)
175
176
177    def get_average_xmom(self):
178        # LOCAL
179        return num.sum(self.get_xmoms()*self.get_areas())/self.area
180
181    def get_global_average_xmom(self):
182        # GLOBAL: master proc gathers all xmom values and returns average
183        # WARNING: requires synchronization, must be called by all procs associated
184        # with this inlet
185
186        global_area = self.get_global_area()
187        local_xmoms = num.sum(self.get_xmoms()*self.get_areas())
188        global_xmoms = local_xmoms
189
190        if self.myid == self.master_proc:
191            for i in self.procs:
192                if i == self.master_proc: continue
193
194                val = pypar.receive(i)
195                global_xmoms = global_xmoms + val
196        else:
197            pypar.send(local_xmoms, self.master_proc)
198
199
200        return global_xmoms/global_area
201
202    def get_ymoms(self):
203        # LOCAL
204        return self.domain.quantities['ymomentum'].centroid_values.take(self.triangle_indices)
205
206
207    def get_average_ymom(self):
208        # LOCAL
209        return num.sum(self.get_ymoms()*self.get_areas())/self.area
210
211    def get_global_average_ymom(self):
212        # GLOBAL: master proc gathers all ymom values and returns average
213        # WARNING: requires synchronization, must be called by all procs associated
214        # with this inlet
215
216        global_area = self.get_global_area()
217        local_ymoms = num.sum(self.get_ymoms()*self.get_areas())
218        global_ymoms = local_ymoms
219
220        if self.myid == self.master_proc:
221            for i in self.procs:
222                if i == self.master_proc: continue
223
224                val = pypar.receive(i)
225                global_ymoms = global_ymoms + val
226        else:
227            pypar.send(local_ymoms, self.master_proc)
228
229
230        return global_ymoms/global_area
231
232    def get_depths(self):
233        # LOCAL
234        return self.get_stages() - self.get_elevations()
235
236
237    def get_total_water_volume(self):
238        # LOCAL
239       return num.sum(self.get_depths()*self.get_areas())
240
241    def get_global_total_water_volume(self):
242        # GLOBAL: master proc gathers total water volumes from each proc and returns average
243        # WARNING: requires synchronization, must be called by all procs associated
244        # with this inlet
245
246        local_volume = num.sum(self.get_depths()*self.get_areas())
247        volume = local_volume
248
249        if self.myid == self.master_proc:
250
251            for i in self.procs:
252                if i == self.master_proc: continue
253
254                val = pypar.receive(i)
255                volume = volume + val
256        else:
257            pypar.send(volume, self.master_proc)
258
259        return volume
260
261    def get_average_depth(self):
262        # LOCAL
263        return self.get_total_water_volume()/self.area
264
265    def get_global_average_depth(self):
266        # GLOBAL: master proc gathers all depth values and returns average
267        # WARNING: requires synchronization, must be called by all procs associated
268        # with this inlet
269       
270        area = self.get_global_area()
271        total_water_volume = self.get_global_total_water_volume()
272
273        return total_water_volume / area
274
275
276    def get_velocities(self):
277        #LOCAL
278            depths = self.get_depths()
279            u = self.get_xmoms()/(depths + velocity_protection/depths)
280            v = self.get_ymoms()/(depths + velocity_protection/depths)
281
282            return u, v
283
284
285    def get_xvelocities(self):
286        #LOCAL
287            depths = self.get_depths()
288            return self.get_xmoms()/(depths + velocity_protection/depths)
289
290    def get_yvelocities(self):
291        #LOCAL
292            depths = self.get_depths()
293            return self.get_ymoms()/(depths + velocity_protection/depths)
294
295
296    def get_average_speed(self):
297        #LOCAL
298            u, v = self.get_velocities()
299
300            average_u = num.sum(u*self.get_areas())/self.area
301            average_v = num.sum(v*self.get_areas())/self.area
302
303            return math.sqrt(average_u**2 + average_v**2)
304
305
306    def get_average_velocity_head(self):
307        #LOCAL
308        return 0.5*self.get_average_speed()**2/g
309
310
311    def get_average_total_energy(self):
312        #LOCAL
313        return self.get_average_velocity_head() + self.get_average_stage()
314
315
316    def get_average_specific_energy(self):
317        #LOCAL
318        return self.get_average_velocity_head() + self.get_average_depth()
319
320
321# Set routines (ALL LOCAL)
322
323    def set_depths(self,depth):
324
325        self.domain.quantities['stage'].centroid_values.put(self.triangle_indices, self.get_elevations() + depth)
326
327
328    def set_stages(self,stage):
329
330        self.domain.quantities['stage'].centroid_values.put(self.triangle_indices, stage)
331
332
333    def set_xmoms(self,xmom):
334
335        self.domain.quantities['xmomentum'].centroid_values.put(self.triangle_indices, xmom)
336
337
338    def set_ymoms(self,ymom):
339
340        self.domain.quantities['ymomentum'].centroid_values.put(self.triangle_indices, ymom)
341
342
343    def set_elevations(self,elevation):
344
345        self.domain.quantities['elevation'].centroid_values.put(self.triangle_indices, elevation)
346
347
348    def set_stages_evenly(self,volume):
349        """ Distribute volume of water over
350        inlet exchange region so that stage is level
351        """
352        # WARNING: requires synchronization, must be called by all procs associated
353        # with this inlet
354
355        centroid_coordinates = self.domain.get_full_centroid_coordinates(absolute=True)
356        areas = self.get_areas()
357        stages = self.get_stages()
358
359        stages_order = stages.argsort()
360
361        # PETE: send stages and areas, apply merging procedure
362
363        s_areas = {}
364        s_stages = {}
365        s_stages_order = {}
366        total_stages = len(stages)
367
368        if self.myid == self.master_proc:
369            s_areas[self.myid] = areas
370            s_stages[self.myid] = stages
371            s_stages_order[self.myid] = stages_order
372     
373            # Recieve areas, stages, and stages order
374            for i in self.procs:
375                if i != self.master_proc:
376                    s_areas[i] = pypar.receive(i)
377                    s_stages[i] = pypar.receive(i)
378                    s_stages_order[i] = pypar.receive(i)
379                    total_stages = total_stages + len(s_stages[i])
380
381        else:
382            # Send areas, stages, and stages order to master proc of inlet
383            pypar.send(areas, self.master_proc)
384            pypar.send(stages, self.master_proc)
385            pypar.send(stages_order, self.master_proc)
386
387        # merge sorted stage order
388        if self.myid == self.master_proc:
389            pos = {}
390            summed_volume = 0.
391            summed_areas = 0.
392            prev_stage = 0.
393            num_stages = 0.
394            first = True
395           
396            for i in self.procs:
397                pos[i] = 0
398
399            while num_stages < total_stages:
400                # Determine current minimum stage of all the processors in s_stages
401                num_stages = num_stages + 1
402                current_stage = num.finfo(num.float32).max
403                index = -1
404
405                for i in self.procs:
406                    if pos[i] >= len(s_stages[i]):
407                        continue
408                       
409                    if s_stages[i][s_stages_order[i][pos[i]]] < current_stage:
410                        current_stage = s_stages[i][s_stages_order[i][pos[i]]]
411                        index = i
412
413                # If first iteration, then only update summed_areas, position, and prev|current stage
414               
415                if first:
416                    first = False
417                    summed_areas = s_areas[index][s_stages_order[index][pos[index]]]
418                    pos[index] = pos[index] + 1
419                    prev_stage = current_stage
420                    continue
421
422                assert index >= 0, "Index out of bounds"
423
424                # Update summed volume and summed areas
425                tmp_volume = summed_volume + (summed_areas * (current_stage - prev_stage))
426
427                # Terminate if volume exceeded
428                if tmp_volume >= volume:
429                    break
430
431                summed_areas = summed_areas + s_areas[index][s_stages_order[index][pos[index]]]
432                pos[index] = pos[index] + 1
433                summed_volume = tmp_volume
434               
435                # Update position of index processor and current stage
436                prev_stage = current_stage
437
438            # Calculate new stage
439            new_stage = prev_stage + (volume - summed_volume) / summed_areas
440
441            # Send postion and new stage to all processors
442            for i in self.procs:
443                if i != self.master_proc:
444                    pypar.send(pos[i], i)
445                    pypar.send(new_stage, i)
446
447            # Update own depth
448            stages[stages_order[0:pos[self.myid]]] = new_stage
449        else:
450            pos = pypar.receive(self.master_proc)
451            new_stage = pypar.receive(self.master_proc)
452            stages[stages_order[0:pos]] = new_stage
453
454        self.set_stages(stages)
455
456        stages = self.get_stages()
457        stages_order = stages.argsort()
458
459    def set_depths_evenly(self,volume):
460        """ Distribute volume over all exchange
461        cells with equal depth of water
462        """
463        new_depth = self.get_average_depth() + (volume/self.get_area())
464        self.set_depths(new_depth)
465
466    def get_master_proc(self):
467        return self.master_proc
468
469    def __parallel_safe(self):
470
471        return True
472
473    def statistics(self):
474        # WARNING: requires synchronization, must be called by all procs associated
475        # with this inlet
476
477        message = ''
478
479        tri_indices = {}
480
481        if self.myid == self.master_proc:
482            tri_indices[self.myid] = self.triangle_indices
483           
484            for proc in self.procs:
485                if proc == self.master_proc: continue
486               
487                tri_indices[proc] = pypar.receive(proc)
488
489        else:
490            pypar.send(self.triangle_indices, self.master_proc)
491
492
493        if self.myid == self.master_proc:
494            message += '=====================================\n'
495            message +=  'Inlet\n' 
496            message += '=====================================\n'
497
498            for proc in self.procs:
499                message += '======> inlet triangle indices and centres at P%d\n' %(proc)
500                message += '%s' % tri_indices[proc]
501                message += '\n'
502               
503                message += '%s' % self.domain.get_centroid_coordinates()[tri_indices[proc]]
504                message += '\n'
505               
506            message += 'line\n'
507            message += '%s' % self.line
508            message += '\n'
509
510        return message
511
512__author__="pete"
513__date__ ="$16/08/2011 6:49:42 PM$"
514
515if __name__ == "__main__":
516    print "Hello World"
Note: See TracBrowser for help on using the repository browser.