source: trunk/anuga_core/anuga/parallel/parallel_inlet.py @ 9710

Last change on this file since 9710 was 9710, checked in by steve, 10 years ago

Setting up install scripts

File size: 18.4 KB
Line 
1# To change this template, choose Tools | Templates
2# and open the template in the editor.
3
4import anuga.geometry.polygon
5from anuga.geometry.polygon import inside_polygon, is_inside_polygon, line_intersect
6from anuga.config import velocity_protection, g
7import math
8
9import numpy as num
10from anuga.structures.inlet import Inlet
11import warnings
12
13class Parallel_Inlet(Inlet):
14    """Contains information associated with each inlet
15    """
16
17    """
18    Parallel inlet:
19   
20    master_proc - coordinates all processors associated with this inlet
21    usually the processors with domains which contains parts of this inlet.
22   
23    procs - is the list of all processors associated with this inlet.
24
25    (We assume that the above arguments are determined correctly by the parallel_operator_factory)
26    """
27
28    def __init__(self, domain, poly, master_proc = 0, procs = None, verbose=False):
29
30        self.domain = domain
31        self.poly = num.asarray(poly, dtype=num.float64)
32        self.verbose = verbose
33
34        self.line = True
35        if len(self.poly) > 2:
36            self.line = False
37
38        self.master_proc = master_proc
39
40        if procs is None:
41            self.procs = [self.master_proc]
42        else:
43            self.procs = procs
44
45        import pypar
46        self.myid = pypar.rank()
47
48        self.compute_triangle_indices()
49        self.compute_area()
50        #self.compute_inlet_length()
51
52
53
54    def compute_triangle_indices(self):
55
56
57        domain_centroids = self.domain.get_centroid_coordinates(absolute=True)
58        vertex_coordinates = self.domain.get_full_vertex_coordinates(absolute=True)
59
60
61        if self.line: # poly is a line
62            self.triangle_indices = line_intersect(vertex_coordinates, self.poly)
63
64        else: # poly is a polygon
65
66            tris_0 = line_intersect(vertex_coordinates, [self.poly[0],self.poly[1]])
67            tris_1 = inside_polygon(domain_centroids, self.poly)
68            self.triangle_indices = num.union1d(tris_0, tris_1)
69            #print self.triangle_indices
70
71        for i in self.triangle_indices:
72            assert self.domain.tri_full_flag[i] == 1
73           
74
75    def compute_area(self):
76
77        # Compute inlet area as the sum of areas of triangles identified
78        # by line. Must be called after compute_inlet_triangle_indices().
79        if len(self.triangle_indices) == 0:
80            region = 'Inlet line=%s' % (self.line)
81            msg = 'No triangles have been identified in region '
82            print "WARNING: " + msg
83
84        self.area = 0.0
85        for j in self.triangle_indices:
86            self.area += self.domain.areas[j]
87
88        msg = 'Inlet exchange area has area = %f' % self.area
89        assert self.area >= 0.0
90
91    def compute_inlet_length(self):
92        """ Compute the length of the inlet within this domain (as
93        defined by the input line
94        """
95
96        point0 = self.line[0]
97        point1 = self.line[1]
98
99        self.inlet_length = anuga.geometry.polygon.line_length(self.line)
100
101
102    def get_inlet_length(self):
103        # LOCAL
104        msg = "Warning: compute_inlet_length not implemented"
105        warnings.warn(msg)
106        return self.inlet_length
107
108    def get_line(self):
109        return self.line
110
111    def get_area(self):
112        # LOCAL
113        return self.area
114
115    def get_global_area(self):
116        # GLOBAL: Master processor gathers area from all child processors, and returns value
117
118        # WARNING: requires synchronization, must be called by all procs associated
119        # with this inlet
120
121        import pypar
122        local_area = self.area
123        area = local_area
124
125        if self.myid == self.master_proc:
126
127            for i in self.procs:
128                if i == self.master_proc: continue
129
130                val = pypar.receive(i)
131                area = area + val
132        else:
133            pypar.send(area, self.master_proc)
134
135        return area
136
137
138    def get_areas(self):
139        # Must be called after compute_inlet_triangle_indices().
140        # LOCAL
141       
142        return self.domain.areas.take(self.triangle_indices)
143
144
145    def get_stages(self):
146        # LOCAL
147
148        return self.domain.quantities['stage'].centroid_values.take(self.triangle_indices)
149
150
151    def get_average_stage(self):
152        # LOCAL
153
154        return num.sum(self.get_stages()*self.get_areas())/self.area
155
156    def get_global_average_stage(self):
157        # GLOBAL: Master processor gathers stages from all child processors, and returns average
158
159        # WARNING: requires synchronization, must be called by all procs associated
160        # with this inlet
161
162        import pypar
163        local_stage = num.sum(self.get_stages()*self.get_areas())
164        global_area = self.get_global_area()
165
166
167
168        global_stage = local_stage
169
170        if self.myid == self.master_proc:
171            for i in self.procs:
172                if i == self.master_proc: continue
173
174                val = pypar.receive(i)
175                global_stage = global_stage + val
176        else:
177            pypar.send(local_stage, self.master_proc)
178
179
180        if global_area > 0.0:
181            return global_stage/global_area
182        else:
183            return 0.0
184
185    def get_elevations(self):
186        # LOCAL
187        return self.domain.quantities['elevation'].centroid_values.take(self.triangle_indices)
188
189    def get_average_elevation(self):
190        # LOCAL
191
192        if self.area > 0:
193            return num.sum(self.get_elevations()*self.get_areas())/self.area
194        else:
195            return 0.0
196
197    def get_global_average_elevation(self):
198        # GLOBAL: Master processor gathers elevations from all child processors, and returns average
199
200        # WARNING: requires synchronization, must be called by all procs associated
201        # with this inlet
202
203        import pypar
204        local_elevation = num.sum(self.get_elevations()*self.get_areas())
205        global_area = self.get_global_area()
206
207
208
209        global_elevation = local_elevation
210
211        if self.myid == self.master_proc:
212            for i in self.procs:
213                if i == self.master_proc: continue
214
215                val = pypar.receive(i)
216                global_elevation = global_elevation + val
217        else:
218            pypar.send(local_elevation, self.master_proc)
219
220
221        if global_area > 0.0:
222            return global_elevation/global_area
223        else:
224            return 0.0
225
226    def get_xmoms(self):
227        # LOCAL
228        return self.domain.quantities['xmomentum'].centroid_values.take(self.triangle_indices)
229
230
231    def get_average_xmom(self):
232        # LOCAL
233
234        if self.area > 0:
235            return num.sum(self.get_xmoms()*self.get_areas())/self.area
236        else:
237            return 0.0
238
239    def get_global_average_xmom(self):
240        # GLOBAL: master proc gathers all xmom values and returns average
241        # WARNING: requires synchronization, must be called by all procs associated
242        # with this inlet
243
244        import pypar
245        global_area = self.get_global_area()
246        local_xmoms = num.sum(self.get_xmoms()*self.get_areas())
247        global_xmoms = local_xmoms
248
249        if self.myid == self.master_proc:
250            for i in self.procs:
251                if i == self.master_proc: continue
252
253                val = pypar.receive(i)
254                global_xmoms = global_xmoms + val
255        else:
256            pypar.send(local_xmoms, self.master_proc)
257
258
259        if global_area > 0.0:
260            return global_xmoms/global_area
261        else:
262            return 0.0
263
264
265    def get_ymoms(self):
266        # LOCAL
267        return self.domain.quantities['ymomentum'].centroid_values.take(self.triangle_indices)
268
269
270    def get_average_ymom(self):
271        # LOCAL
272        return num.sum(self.get_ymoms()*self.get_areas())/self.area
273
274    def get_global_average_ymom(self):
275        # GLOBAL: master proc gathers all ymom values and returns average
276        # WARNING: requires synchronization, must be called by all procs associated
277        # with this inlet
278
279        import pypar
280        global_area = self.get_global_area()
281        local_ymoms = num.sum(self.get_ymoms()*self.get_areas())
282        global_ymoms = local_ymoms
283
284        if self.myid == self.master_proc:
285            for i in self.procs:
286                if i == self.master_proc: continue
287
288                val = pypar.receive(i)
289                global_ymoms = global_ymoms + val
290        else:
291            pypar.send(local_ymoms, self.master_proc)
292
293
294        if global_area > 0.0:
295            return global_ymoms/global_area
296        else:
297            return 0.0
298
299    def get_depths(self):
300        # LOCAL
301        return self.get_stages() - self.get_elevations()
302
303
304    def get_total_water_volume(self):
305        # LOCAL
306       return num.sum(self.get_depths()*self.get_areas())
307
308    def get_global_total_water_volume(self):
309        # GLOBAL: master proc gathers total water volumes from each proc and returns average
310        # WARNING: requires synchronization, must be called by all procs associated
311        # with this inlet
312
313        import pypar
314        local_volume = num.sum(self.get_depths()*self.get_areas())
315        volume = local_volume
316
317        if self.myid == self.master_proc:
318
319            for i in self.procs:
320                if i == self.master_proc: continue
321
322                val = pypar.receive(i)
323                volume = volume + val
324        else:
325            pypar.send(volume, self.master_proc)
326
327        return volume
328
329    def get_average_depth(self):
330        # LOCAL
331
332        if self.area > 0.0:
333            return self.get_total_water_volume()/self.area
334        else:
335            return 0.0
336
337    def get_global_average_depth(self):
338        # GLOBAL: master proc gathers all depth values and returns average
339        # WARNING: requires synchronization, must be called by all procs associated
340        # with this inlet
341       
342        area = self.get_global_area()
343        total_water_volume = self.get_global_total_water_volume()
344
345
346        if area > 0.0:
347            return total_water_volume / area
348        else:
349            return 0.0
350
351
352    def get_velocities(self):
353        #LOCAL
354        depths = self.get_depths()
355        u = depths*self.get_xmoms()/(depths**2 + velocity_protection)
356        v = depths*self.get_ymoms()/(depths**2 + velocity_protection)
357
358        return u, v
359
360
361    def get_xvelocities(self):
362        #LOCAL
363        depths = self.get_depths()
364        return depth*self.get_xmoms()/(depths**2 + velocity_protection)
365
366    def get_yvelocities(self):
367        #LOCAL
368        depths = self.get_depths()
369        return depths*self.get_ymoms()/(depths**2 + velocity_protection)
370
371
372    def get_average_speed(self):
373        #LOCAL
374        u, v = self.get_velocities()
375
376        average_u = num.sum(u*self.get_areas())/self.area
377        average_v = num.sum(v*self.get_areas())/self.area
378
379        return math.sqrt(average_u**2 + average_v**2)
380
381
382    def get_average_velocity_head(self):
383        #LOCAL
384        return 0.5*self.get_average_speed()**2/g
385
386
387    def get_average_total_energy(self):
388        #LOCAL
389        return self.get_average_velocity_head() + self.get_average_stage()
390
391
392    def get_average_specific_energy(self):
393        #LOCAL
394        return self.get_average_velocity_head() + self.get_average_depth()
395
396
397# Set routines (ALL LOCAL)
398
399    def set_depths(self,depth):
400
401        self.domain.quantities['stage'].centroid_values.put(self.triangle_indices, self.get_elevations() + depth)
402
403
404    def set_stages(self,stage):
405
406        self.domain.quantities['stage'].centroid_values.put(self.triangle_indices, stage)
407
408
409    def set_xmoms(self,xmom):
410
411        self.domain.quantities['xmomentum'].centroid_values.put(self.triangle_indices, xmom)
412
413
414    def set_ymoms(self,ymom):
415
416        self.domain.quantities['ymomentum'].centroid_values.put(self.triangle_indices, ymom)
417
418
419    def set_elevations(self,elevation):
420
421        self.domain.quantities['elevation'].centroid_values.put(self.triangle_indices, elevation)
422
423
424    def set_stages_evenly(self,volume):
425        """ Distribute volume of water over
426        inlet exchange region so that stage is level
427        """
428        # WARNING: requires synchronization, must be called by all procs associated
429        # with this inlet
430
431        import pypar
432        centroid_coordinates = self.domain.get_full_centroid_coordinates(absolute=True)
433        areas = self.get_areas()
434        stages = self.get_stages()
435
436        stages_order = stages.argsort()
437
438        # PETE: send stages and areas, apply merging procedure
439
440        s_areas = {}
441        s_stages = {}
442        s_stages_order = {}
443        total_stages = len(stages)
444
445        if self.myid == self.master_proc:
446            s_areas[self.myid] = areas
447            s_stages[self.myid] = stages
448            s_stages_order[self.myid] = stages_order
449     
450            # Recieve areas, stages, and stages order
451            for i in self.procs:
452                if i != self.master_proc:
453                    s_areas[i] = pypar.receive(i)
454                    s_stages[i] = pypar.receive(i)
455                    s_stages_order[i] = pypar.receive(i)
456                    total_stages = total_stages + len(s_stages[i])
457
458        else:
459            # Send areas, stages, and stages order to master proc of inlet
460            pypar.send(areas, self.master_proc)
461            pypar.send(stages, self.master_proc)
462            pypar.send(stages_order, self.master_proc)
463
464        # merge sorted stage order
465        if self.myid == self.master_proc:
466            pos = {}
467            summed_volume = 0.
468            summed_areas = 0.
469            prev_stage = 0.
470            num_stages = 0.
471            first = True
472           
473            for i in self.procs:
474                pos[i] = 0
475
476            while num_stages < total_stages:
477                # Determine current minimum stage of all the processors in s_stages
478                num_stages = num_stages + 1
479                current_stage = num.finfo(num.float32).max
480                index = -1
481
482                for i in self.procs:
483                    if pos[i] >= len(s_stages[i]):
484                        continue
485                       
486                    if s_stages[i][s_stages_order[i][pos[i]]] < current_stage:
487                        current_stage = s_stages[i][s_stages_order[i][pos[i]]]
488                        index = i
489
490                # If first iteration, then only update summed_areas, position, and prev|current stage
491               
492                if first:
493                    first = False
494                    summed_areas = s_areas[index][s_stages_order[index][pos[index]]]
495                    pos[index] = pos[index] + 1
496                    prev_stage = current_stage
497                    continue
498
499                assert index >= 0, "Index out of bounds"
500
501                # Update summed volume and summed areas
502                tmp_volume = summed_volume + (summed_areas * (current_stage - prev_stage))
503
504                # Terminate if volume exceeded
505                if tmp_volume >= volume:
506                    break
507
508                summed_areas = summed_areas + s_areas[index][s_stages_order[index][pos[index]]]
509                pos[index] = pos[index] + 1
510                summed_volume = tmp_volume
511               
512                # Update position of index processor and current stage
513                prev_stage = current_stage
514
515            # Calculate new stage
516            new_stage = prev_stage + (volume - summed_volume) / summed_areas
517
518            # Send postion and new stage to all processors
519            for i in self.procs:
520                if i != self.master_proc:
521                    pypar.send(pos[i], i)
522                    pypar.send(new_stage, i)
523
524            # Update own depth
525            stages[stages_order[0:pos[self.myid]]] = new_stage
526        else:
527            pos = pypar.receive(self.master_proc)
528            new_stage = pypar.receive(self.master_proc)
529            stages[stages_order[0:pos]] = new_stage
530
531        self.set_stages(stages)
532
533        stages = self.get_stages()
534        stages_order = stages.argsort()
535
536    def set_depths_evenly(self,volume):
537        """ Distribute volume over all exchange
538        cells with equal depth of water
539        """
540        new_depth = self.get_average_depth() + (volume/self.get_area())
541        self.set_depths(new_depth)
542
543    def get_master_proc(self):
544        return self.master_proc
545
546    def parallel_safe(self):
547
548        return True
549
550    def statistics(self):
551        # WARNING: requires synchronization, must be called by all procs associated
552        # with this inlet
553
554        import pypar
555
556        message = ''
557
558        tri_indices = {}
559
560        if self.myid == self.master_proc:
561            tri_indices[self.myid] = self.triangle_indices
562           
563            for proc in self.procs:
564                if proc == self.master_proc: continue
565               
566                tri_indices[proc] = pypar.receive(proc)
567
568        else:
569            pypar.send(self.triangle_indices, self.master_proc)
570
571
572        if self.myid == self.master_proc:
573            message += '=====================================\n'
574            message +=  'Inlet\n' 
575            message += '=====================================\n'
576
577            for proc in self.procs:
578                message += '======> inlet triangle indices and centres and elevation at P%d\n' %(proc)
579                message += '%s' % tri_indices[proc]
580                message += '\n'
581               
582                message += '%s' % self.domain.get_centroid_coordinates()[tri_indices[proc]]
583                message += '\n'
584
585                elev = self.domain.quantities['elevation'].centroid_values[tri_indices[proc]]
586                message += '%s' % elev
587                message += '\n'
588               
589                try:
590                    elevation_difference = elev.max() - elev.min()
591                except ValueError:
592                    elevation_difference = 0.0
593                   
594                if not num.allclose(elevation_difference, 0.):
595                    message += 'Elevation range of ' + str(elevation_difference) 
596                    message += 'Warning: Non-constant inlet elevation can lead to well-balancing problems'
597
598                try:
599                    # If the inlet does not have an enquiry point this will
600                    # fail gracefully
601                    message += '\n'
602                    message += 'Enquiry point:'
603                    message += '%s' % self.domain.get_centroid_coordinates()[self.enquiry_index]
604                    message += '\n'
605                    message += 'Enquiry Index:'
606                    message += '%s' % self.enquiry_index
607                    message += '\n'
608                except:
609                    pass
610
611               
612            message += 'line\n'
613            message += '%s' % self.line
614            message += '\n'
615
616        return message
617
618__author__="pete"
619__date__ ="$16/08/2011 6:49:42 PM$"
620
621if __name__ == "__main__":
622    print "Hello World"
Note: See TracBrowser for help on using the repository browser.