source: trunk/anuga_core/source/anuga_parallel/parallel_inlet.py @ 8877

Last change on this file since 8877 was 8875, checked in by steve, 12 years ago

Setting up parallel_structure to allow enquiry of data

File size: 16.2 KB
Line 
1# To change this template, choose Tools | Templates
2# and open the template in the editor.
3
4import anuga.geometry.polygon
5from anuga.geometry.polygon import inside_polygon, is_inside_polygon, line_intersect
6from anuga.config import velocity_protection, g
7import math
8
9import numpy as num
10from anuga.structures.inlet import Inlet
11
12
13class Parallel_Inlet(Inlet):
14    """Contains information associated with each inlet
15    """
16
17    """
18    Parallel inlet:
19   
20    master_proc - coordinates all processors associated with this inlet
21    usually the processors with domains which contains parts of this inlet.
22   
23    procs - is the list of all processors associated with this inlet.
24
25    (We assume that the above arguments are determined correctly by the parallel_operator_factory)
26    """
27
28    def __init__(self, domain, poly, master_proc = 0, procs = None, verbose=False):
29
30        self.domain = domain
31        self.poly = num.asarray(poly, dtype=num.float64)
32        self.verbose = verbose
33
34        self.line = True
35        if len(self.poly) > 2:
36            self.line = False
37
38        self.master_proc = master_proc
39
40        if procs is None:
41            self.procs = [self.master_proc]
42        else:
43            self.procs = procs
44
45        import pypar
46        self.myid = pypar.rank()
47
48        self.compute_triangle_indices()
49        self.compute_area()
50        #self.compute_inlet_length()
51
52
53
54    def compute_triangle_indices(self):
55
56
57        domain_centroids = self.domain.get_centroid_coordinates(absolute=True)
58        vertex_coordinates = self.domain.get_full_vertex_coordinates(absolute=True)
59
60
61        if self.line: # poly is a line
62            self.triangle_indices = line_intersect(vertex_coordinates, self.poly)
63
64        else: # poly is a polygon
65
66            self.triangle_indices = inside_polygon(domain_centroids, self.poly)
67
68
69        for i in self.triangle_indices:
70            assert self.domain.tri_full_flag[i] == 1
71           
72
73    def compute_area(self):
74
75        # Compute inlet area as the sum of areas of triangles identified
76        # by line. Must be called after compute_inlet_triangle_indices().
77        if len(self.triangle_indices) == 0:
78            region = 'Inlet line=%s' % (self.line)
79            msg = 'No triangles have been identified in region '
80            print "WARNING: " + msg
81
82        self.area = 0.0
83        for j in self.triangle_indices:
84            self.area += self.domain.areas[j]
85
86        msg = 'Inlet exchange area has area = %f' % self.area
87        assert self.area >= 0.0
88
89    def compute_inlet_length(self):
90        """ Compute the length of the inlet within this domain (as
91        defined by the input line
92        """
93
94        point0 = self.line[0]
95        point1 = self.line[1]
96
97        self.inlet_length = anuga.geometry.polygon.line_length(self.line)
98
99
100    def get_inlet_length(self):
101        # LOCAL
102        msg = "Warning: compute_inlet_length not implemented"
103        warnings.warn(msg)
104        return self.inlet_length
105
106    def get_line(self):
107        return self.line
108
109    def get_area(self):
110        # LOCAL
111        return self.area
112
113    def get_global_area(self):
114        # GLOBAL: Master processor gathers area from all child processors, and returns value
115
116        # WARNING: requires synchronization, must be called by all procs associated
117        # with this inlet
118
119        import pypar
120        local_area = self.area
121        area = local_area
122
123        if self.myid == self.master_proc:
124
125            for i in self.procs:
126                if i == self.master_proc: continue
127
128                val = pypar.receive(i)
129                area = area + val
130        else:
131            pypar.send(area, self.master_proc)
132
133        return area
134
135
136    def get_areas(self):
137        # Must be called after compute_inlet_triangle_indices().
138        # LOCAL
139       
140        return self.domain.areas.take(self.triangle_indices)
141
142
143    def get_stages(self):
144        # LOCAL
145
146        return self.domain.quantities['stage'].centroid_values.take(self.triangle_indices)
147
148
149    def get_average_stage(self):
150        # LOCAL
151
152        return num.sum(self.get_stages()*self.get_areas())/self.area
153
154    def get_global_average_stage(self):
155        # GLOBAL: Master processor gathers stages from all child processors, and returns average
156
157        # WARNING: requires synchronization, must be called by all procs associated
158        # with this inlet
159
160        import pypar
161        local_stage = num.sum(self.get_stages()*self.get_areas())
162        global_area = self.get_global_area()
163
164
165
166        global_stage = local_stage
167
168        if self.myid == self.master_proc:
169            for i in self.procs:
170                if i == self.master_proc: continue
171
172                val = pypar.receive(i)
173                global_stage = global_stage + val
174        else:
175            pypar.send(local_stage, self.master_proc)
176
177
178        if global_area > 0.0:
179            return global_stage/global_area
180        else:
181            return 0.0
182
183    def get_elevations(self):
184        # LOCAL
185        return self.domain.quantities['elevation'].centroid_values.take(self.triangle_indices)
186
187    def get_average_elevation(self):
188        # LOCAL
189
190        if self.area > 0:
191            return num.sum(self.get_elevations()*self.get_areas())/self.area
192        else:
193            return 0.0
194
195
196    def get_xmoms(self):
197        # LOCAL
198        return self.domain.quantities['xmomentum'].centroid_values.take(self.triangle_indices)
199
200
201    def get_average_xmom(self):
202        # LOCAL
203
204        if self.area > 0:
205            return num.sum(self.get_xmoms()*self.get_areas())/self.area
206        else:
207            return 0.0
208
209    def get_global_average_xmom(self):
210        # GLOBAL: master proc gathers all xmom values and returns average
211        # WARNING: requires synchronization, must be called by all procs associated
212        # with this inlet
213
214        import pypar
215        global_area = self.get_global_area()
216        local_xmoms = num.sum(self.get_xmoms()*self.get_areas())
217        global_xmoms = local_xmoms
218
219        if self.myid == self.master_proc:
220            for i in self.procs:
221                if i == self.master_proc: continue
222
223                val = pypar.receive(i)
224                global_xmoms = global_xmoms + val
225        else:
226            pypar.send(local_xmoms, self.master_proc)
227
228
229        if global_area > 0.0:
230            return global_xmoms/global_area
231        else:
232            return 0.0
233
234
235    def get_ymoms(self):
236        # LOCAL
237        return self.domain.quantities['ymomentum'].centroid_values.take(self.triangle_indices)
238
239
240    def get_average_ymom(self):
241        # LOCAL
242        return num.sum(self.get_ymoms()*self.get_areas())/self.area
243
244    def get_global_average_ymom(self):
245        # GLOBAL: master proc gathers all ymom values and returns average
246        # WARNING: requires synchronization, must be called by all procs associated
247        # with this inlet
248
249        import pypar
250        global_area = self.get_global_area()
251        local_ymoms = num.sum(self.get_ymoms()*self.get_areas())
252        global_ymoms = local_ymoms
253
254        if self.myid == self.master_proc:
255            for i in self.procs:
256                if i == self.master_proc: continue
257
258                val = pypar.receive(i)
259                global_ymoms = global_ymoms + val
260        else:
261            pypar.send(local_ymoms, self.master_proc)
262
263
264        if global_area > 0.0:
265            return global_ymoms/global_area
266        else:
267            return 0.0
268
269    def get_depths(self):
270        # LOCAL
271        return self.get_stages() - self.get_elevations()
272
273
274    def get_total_water_volume(self):
275        # LOCAL
276       return num.sum(self.get_depths()*self.get_areas())
277
278    def get_global_total_water_volume(self):
279        # GLOBAL: master proc gathers total water volumes from each proc and returns average
280        # WARNING: requires synchronization, must be called by all procs associated
281        # with this inlet
282
283        import pypar
284        local_volume = num.sum(self.get_depths()*self.get_areas())
285        volume = local_volume
286
287        if self.myid == self.master_proc:
288
289            for i in self.procs:
290                if i == self.master_proc: continue
291
292                val = pypar.receive(i)
293                volume = volume + val
294        else:
295            pypar.send(volume, self.master_proc)
296
297        return volume
298
299    def get_average_depth(self):
300        # LOCAL
301
302        if self.area > 0.0:
303            return self.get_total_water_volume()/self.area
304        else:
305            return 0.0
306
307    def get_global_average_depth(self):
308        # GLOBAL: master proc gathers all depth values and returns average
309        # WARNING: requires synchronization, must be called by all procs associated
310        # with this inlet
311       
312        area = self.get_global_area()
313        total_water_volume = self.get_global_total_water_volume()
314
315
316        if area > 0.0:
317            return total_water_volume / area
318        else:
319            return 0.0
320
321
322    def get_velocities(self):
323        #LOCAL
324        depths = self.get_depths()
325        u = depths*self.get_xmoms()/(depths**2 + velocity_protection)
326        v = depths*self.get_ymoms()/(depths**2 + velocity_protection)
327
328        return u, v
329
330
331    def get_xvelocities(self):
332        #LOCAL
333        depths = self.get_depths()
334        return depth*self.get_xmoms()/(depths**2 + velocity_protection)
335
336    def get_yvelocities(self):
337        #LOCAL
338        depths = self.get_depths()
339        return depths*self.get_ymoms()/(depths**2 + velocity_protection)
340
341
342    def get_average_speed(self):
343        #LOCAL
344        u, v = self.get_velocities()
345
346        average_u = num.sum(u*self.get_areas())/self.area
347        average_v = num.sum(v*self.get_areas())/self.area
348
349        return math.sqrt(average_u**2 + average_v**2)
350
351
352    def get_average_velocity_head(self):
353        #LOCAL
354        return 0.5*self.get_average_speed()**2/g
355
356
357    def get_average_total_energy(self):
358        #LOCAL
359        return self.get_average_velocity_head() + self.get_average_stage()
360
361
362    def get_average_specific_energy(self):
363        #LOCAL
364        return self.get_average_velocity_head() + self.get_average_depth()
365
366
367# Set routines (ALL LOCAL)
368
369    def set_depths(self,depth):
370
371        self.domain.quantities['stage'].centroid_values.put(self.triangle_indices, self.get_elevations() + depth)
372
373
374    def set_stages(self,stage):
375
376        self.domain.quantities['stage'].centroid_values.put(self.triangle_indices, stage)
377
378
379    def set_xmoms(self,xmom):
380
381        self.domain.quantities['xmomentum'].centroid_values.put(self.triangle_indices, xmom)
382
383
384    def set_ymoms(self,ymom):
385
386        self.domain.quantities['ymomentum'].centroid_values.put(self.triangle_indices, ymom)
387
388
389    def set_elevations(self,elevation):
390
391        self.domain.quantities['elevation'].centroid_values.put(self.triangle_indices, elevation)
392
393
394    def set_stages_evenly(self,volume):
395        """ Distribute volume of water over
396        inlet exchange region so that stage is level
397        """
398        # WARNING: requires synchronization, must be called by all procs associated
399        # with this inlet
400
401        import pypar
402        centroid_coordinates = self.domain.get_full_centroid_coordinates(absolute=True)
403        areas = self.get_areas()
404        stages = self.get_stages()
405
406        stages_order = stages.argsort()
407
408        # PETE: send stages and areas, apply merging procedure
409
410        s_areas = {}
411        s_stages = {}
412        s_stages_order = {}
413        total_stages = len(stages)
414
415        if self.myid == self.master_proc:
416            s_areas[self.myid] = areas
417            s_stages[self.myid] = stages
418            s_stages_order[self.myid] = stages_order
419     
420            # Recieve areas, stages, and stages order
421            for i in self.procs:
422                if i != self.master_proc:
423                    s_areas[i] = pypar.receive(i)
424                    s_stages[i] = pypar.receive(i)
425                    s_stages_order[i] = pypar.receive(i)
426                    total_stages = total_stages + len(s_stages[i])
427
428        else:
429            # Send areas, stages, and stages order to master proc of inlet
430            pypar.send(areas, self.master_proc)
431            pypar.send(stages, self.master_proc)
432            pypar.send(stages_order, self.master_proc)
433
434        # merge sorted stage order
435        if self.myid == self.master_proc:
436            pos = {}
437            summed_volume = 0.
438            summed_areas = 0.
439            prev_stage = 0.
440            num_stages = 0.
441            first = True
442           
443            for i in self.procs:
444                pos[i] = 0
445
446            while num_stages < total_stages:
447                # Determine current minimum stage of all the processors in s_stages
448                num_stages = num_stages + 1
449                current_stage = num.finfo(num.float32).max
450                index = -1
451
452                for i in self.procs:
453                    if pos[i] >= len(s_stages[i]):
454                        continue
455                       
456                    if s_stages[i][s_stages_order[i][pos[i]]] < current_stage:
457                        current_stage = s_stages[i][s_stages_order[i][pos[i]]]
458                        index = i
459
460                # If first iteration, then only update summed_areas, position, and prev|current stage
461               
462                if first:
463                    first = False
464                    summed_areas = s_areas[index][s_stages_order[index][pos[index]]]
465                    pos[index] = pos[index] + 1
466                    prev_stage = current_stage
467                    continue
468
469                assert index >= 0, "Index out of bounds"
470
471                # Update summed volume and summed areas
472                tmp_volume = summed_volume + (summed_areas * (current_stage - prev_stage))
473
474                # Terminate if volume exceeded
475                if tmp_volume >= volume:
476                    break
477
478                summed_areas = summed_areas + s_areas[index][s_stages_order[index][pos[index]]]
479                pos[index] = pos[index] + 1
480                summed_volume = tmp_volume
481               
482                # Update position of index processor and current stage
483                prev_stage = current_stage
484
485            # Calculate new stage
486            new_stage = prev_stage + (volume - summed_volume) / summed_areas
487
488            # Send postion and new stage to all processors
489            for i in self.procs:
490                if i != self.master_proc:
491                    pypar.send(pos[i], i)
492                    pypar.send(new_stage, i)
493
494            # Update own depth
495            stages[stages_order[0:pos[self.myid]]] = new_stage
496        else:
497            pos = pypar.receive(self.master_proc)
498            new_stage = pypar.receive(self.master_proc)
499            stages[stages_order[0:pos]] = new_stage
500
501        self.set_stages(stages)
502
503        stages = self.get_stages()
504        stages_order = stages.argsort()
505
506    def set_depths_evenly(self,volume):
507        """ Distribute volume over all exchange
508        cells with equal depth of water
509        """
510        new_depth = self.get_average_depth() + (volume/self.get_area())
511        self.set_depths(new_depth)
512
513    def get_master_proc(self):
514        return self.master_proc
515
516    def parallel_safe(self):
517
518        return True
519
520    def statistics(self):
521        # WARNING: requires synchronization, must be called by all procs associated
522        # with this inlet
523
524        import pypar
525
526        message = ''
527
528        tri_indices = {}
529
530        if self.myid == self.master_proc:
531            tri_indices[self.myid] = self.triangle_indices
532           
533            for proc in self.procs:
534                if proc == self.master_proc: continue
535               
536                tri_indices[proc] = pypar.receive(proc)
537
538        else:
539            pypar.send(self.triangle_indices, self.master_proc)
540
541
542        if self.myid == self.master_proc:
543            message += '=====================================\n'
544            message +=  'Inlet\n' 
545            message += '=====================================\n'
546
547            for proc in self.procs:
548                message += '======> inlet triangle indices and centres at P%d\n' %(proc)
549                message += '%s' % tri_indices[proc]
550                message += '\n'
551               
552                message += '%s' % self.domain.get_centroid_coordinates()[tri_indices[proc]]
553                message += '\n'
554               
555            message += 'line\n'
556            message += '%s' % self.line
557            message += '\n'
558
559        return message
560
561__author__="pete"
562__date__ ="$16/08/2011 6:49:42 PM$"
563
564if __name__ == "__main__":
565    print "Hello World"
Note: See TracBrowser for help on using the repository browser.