source: anuga_core/source/anuga/abstract_2d_finite_volumes/generic_boundary_conditions.py @ 6471

Last change on this file since 6471 was 6471, checked in by ole, 15 years ago

Made warning for exceeding time limits dependent on the verbose flag and
also made it a simple print rather than a real warning. This was due to annoyances with stderr being used for something this trivial.

File size: 14.0 KB
Line 
1"""boundary.py - Classes for implementing boundary conditions
2"""
3
4from warnings import warn
5
6from anuga.utilities.numerical_tools import NAN   
7from anuga.fit_interpolate.interpolate import Modeltime_too_late
8from anuga.fit_interpolate.interpolate import Modeltime_too_early
9
10import Numeric as num
11
12
13class Boundary:
14    """Base class for boundary conditions.
15       Specific boundary conditions must provide values for
16       the conserved_quantities
17
18       A boundary object has one neighbour; the one it
19       serves.
20    """
21
22    def __init__(self):
23        pass
24
25    def evaluate(self, vol_id=None, edge_id=None):
26        msg = 'Generic class Boundary must be subclassed'
27        raise Exception, msg
28
29
30class Transmissive_boundary(Boundary):
31    """Transmissive boundary returns same conserved quantities as
32    those present in its neighbour volume.
33
34    Underlying domain must be specified when boundary is instantiated
35    """
36
37    def __init__(self, domain = None):
38        Boundary.__init__(self)
39
40        if domain is None:
41            msg = 'Domain must be specified for transmissive boundary'
42            raise Exception, msg
43
44        self.domain = domain
45
46    def __repr__(self):
47        return 'Transmissive_boundary(%s)' %self.domain
48
49    def evaluate(self, vol_id, edge_id):
50        """Transmissive boundaries return the edge values
51        of the volume they serve.
52        """
53
54        q = self.domain.get_conserved_quantities(vol_id, edge = edge_id)
55        return q
56
57
58class Dirichlet_boundary(Boundary):
59    """Dirichlet boundary returns constant values for the
60    conserved quantities
61    """
62
63
64    def __init__(self, conserved_quantities=None):
65        Boundary.__init__(self)
66
67        if conserved_quantities is None:
68            msg = 'Must specify one value for each conserved quantity'
69            raise Exception, msg
70
71        self.conserved_quantities=num.array(conserved_quantities, num.Float)
72
73    def __repr__(self):
74        return 'Dirichlet boundary (%s)' %self.conserved_quantities
75
76    def evaluate(self, vol_id=None, edge_id=None):
77        return self.conserved_quantities
78
79
80
81class Time_boundary(Boundary):
82    """Time dependent boundary returns values for the
83    conserved quantities as a function of time.
84    Must specify domain to get access to model time and a function f(t)
85    which must return conserved quantities as a function time
86    """
87
88    # FIXME (Ole): We should rename f to function to be consistent with
89    # Transmissive_Momentum_Set_Stage_Boundary (cf posting by rrraman)
90    def __init__(self, domain=None,
91                 f=None,
92                 default_boundary=None):
93        Boundary.__init__(self)
94
95        self.default_boundary = default_boundary
96        self.default_boundary_invoked = False    # Flag
97
98        try:
99            q = f(0.0)
100        except Exception, e:
101            msg = 'Function for time boundary could not be executed:\n%s' %e
102            raise msg
103
104
105        try:
106            q = num.array(q, num.Float)
107        except:
108            msg = 'Return value from time boundary function could '
109            msg += 'not be converted into a Numeric array of floats.\n'
110            msg += 'Specified function should return either list or array.\n'
111            msg += 'I got %s' %str(q)
112            raise msg
113
114        msg = 'ERROR: Time boundary function must return a 1d list or array '
115        assert len(q.shape) == 1, msg
116
117        d = len(domain.conserved_quantities)
118        msg = 'Return value for f must be a list or an array of length %d' %d
119        assert len(q) == d, msg
120
121        self.f = f
122        self.domain = domain
123
124    def __repr__(self):
125        return 'Time boundary'
126
127    def evaluate(self, vol_id=None, edge_id=None):
128        # FIXME (Ole): I think this should be get_time(), see ticket:306
129        try:
130            res = self.f(self.domain.time)
131        except Modeltime_too_early, e:
132            raise Modeltime_too_early, e
133        except Modeltime_too_late, e:
134            if self.default_boundary is None:
135                raise Exception, e # Reraise exception
136            else:
137                # Pass control to default boundary
138                res = self.default_boundary.evaluate(vol_id, edge_id)
139               
140                # Ensure that result cannot be manipulated
141                # This is a real danger in case the
142                # default_boundary is a Dirichlet type
143                # for instance.
144                res = res.copy() 
145               
146                if self.default_boundary_invoked is False:
147                    if self.verbose:               
148                        # Issue warning the first time
149                        msg = '%s' %str(e)
150                        msg += 'Instead I will use the default boundary: %s\n'\
151                            %str(self.default_boundary) 
152                        msg += 'Note: Further warnings will be supressed'
153                        print msg
154               
155                    # FIXME (Ole): Replace this crude flag with
156                    # Python's ability to print warnings only once.
157                    # See http://docs.python.org/lib/warning-filter.html
158                    self.default_boundary_invoked = True
159
160        return res
161
162
163
164class File_boundary(Boundary):
165    """The File_boundary reads values for the conserved
166    quantities from an sww NetCDF file, and returns interpolated values
167    at the midpoints of each associated boundary segment.
168    Time dependency is interpolated linearly.
169
170    Assumes that file contains a time series and possibly
171    also spatial info. See docstring for File_function in util.py
172    for details about admissible file formats
173
174    File boundary must read and interpolate from *smoothed* version
175    as stored in sww and cannot work with the discontinuos triangles.
176
177    Example:
178    Bf = File_boundary('source_file.sww', domain)
179
180
181    Note that the resulting solution history is not exactly the same as if
182    the models were coupled as there is no feedback into the source model.
183   
184    Optional keyword argument default_boundary must be either None or
185    an instance of class descending from class Boundary.
186    This will be used in case model time exceeds that available in the
187    underlying data.
188       
189    """
190
191    # FIXME (Ole): I kind of like the name Spatio_Temporal_boundary for this
192    # rather than File_boundary
193
194    def __init__(self, filename, domain, 
195                 time_thinning=1, 
196                 time_limit=None,
197                 boundary_polygon=None,   
198                 default_boundary=None,
199                 use_cache=False, 
200                 verbose=False): 
201
202        import time
203        from anuga.config import time_format
204        from anuga.abstract_2d_finite_volumes.util import file_function
205
206        Boundary.__init__(self)
207
208        # Get x,y vertex coordinates for all triangles
209        V = domain.vertex_coordinates
210
211        # Compute midpoint coordinates for all boundary elements
212        # Only a subset may be invoked when boundary is evaluated but
213        # we don't know which ones at this stage since this object can
214        # be attached to
215        # any tagged boundary later on.
216
217        if verbose: print 'Find midpoint coordinates of entire boundary'
218        self.midpoint_coordinates = num.zeros((len(domain.boundary), 2), num.Float)
219        boundary_keys = domain.boundary.keys()
220
221        xllcorner = domain.geo_reference.get_xllcorner()
222        yllcorner = domain.geo_reference.get_yllcorner()       
223       
224
225        # Make ordering unique #FIXME: should this happen in domain.py?
226        boundary_keys.sort()
227
228        # Record ordering #FIXME: should this also happen in domain.py?
229        self.boundary_indices = {}
230        for i, (vol_id, edge_id) in enumerate(boundary_keys):
231
232            base_index = 3*vol_id
233            x0, y0 = V[base_index, :]
234            x1, y1 = V[base_index+1, :]
235            x2, y2 = V[base_index+2, :]
236           
237            # Compute midpoints
238            if edge_id == 0: m = num.array([(x1 + x2)/2, (y1 + y2)/2], num.Float)
239            if edge_id == 1: m = num.array([(x0 + x2)/2, (y0 + y2)/2], num.Float)
240            if edge_id == 2: m = num.array([(x1 + x0)/2, (y1 + y0)/2], num.Float)
241
242            # Convert to absolute UTM coordinates
243            m[0] += xllcorner
244            m[1] += yllcorner
245           
246            # Register point and index
247            self.midpoint_coordinates[i,:] = m
248
249            # Register index of this boundary edge for use with evaluate
250            self.boundary_indices[(vol_id, edge_id)] = i
251
252        if verbose: print 'Initialise file_function'
253        self.F = file_function(filename,
254                               domain,
255                               quantities=domain.conserved_quantities,
256                               interpolation_points=self.midpoint_coordinates,
257                               time_thinning=time_thinning,
258                               time_limit=time_limit,
259                               use_cache=use_cache, 
260                               verbose=verbose,
261                               boundary_polygon=boundary_polygon)
262                             
263        # Check and store default_boundary
264        msg = 'Keyword argument default_boundary must be either None '
265        msg += 'or a boundary object.\n I got %s' %(str(default_boundary))
266        assert default_boundary is None or\
267            isinstance(default_boundary, Boundary), msg
268        self.default_boundary = default_boundary
269        self.default_boundary_invoked = False    # Flag
270
271        # Store pointer to domain and verbosity
272        self.domain = domain
273        self.verbose = verbose
274
275
276        # Here we'll flag indices outside the mesh as a warning
277        # as suggested by Joaquim Luis in sourceforge posting
278        # November 2007
279        # We won't make it an error as it is conceivable that
280        # only part of mesh boundary is actually used with a given
281        # file boundary sww file.
282        if hasattr(self.F, 'indices_outside_mesh') and\
283               len(self.F.indices_outside_mesh) > 0:
284            msg = 'WARNING: File_boundary has points outside the mesh '
285            msg += 'given in %s. ' %filename
286            msg += 'See warning message issued by Interpolation_function '
287            msg += 'for details (should appear above somewhere if '
288            msg += 'verbose is True).\n'
289            msg += 'This is perfectly OK as long as the points that are '
290            msg += 'outside aren\'t used on the actual boundary segment.'
291            if verbose is True:           
292                print msg
293            #raise Exception(msg)
294
295        # Test that file function can be called
296        q = self.F(0, point_id=0)
297        d = len(domain.conserved_quantities)
298        msg = 'Values specified in file %s must be ' %filename
299        msg += ' a list or an array of length %d' %d
300        assert len(q) == d, msg
301
302
303    def __repr__(self):
304        return 'File boundary'
305
306
307    def evaluate(self, vol_id=None, edge_id=None):
308        """Return linearly interpolated values based on domain.time
309        at midpoint of segment defined by vol_id and edge_id.
310        """
311
312        # FIXME (Ole): I think this should be get_time(), see ticket:306
313        t = self.domain.time
314       
315        if vol_id is not None and edge_id is not None:
316            i = self.boundary_indices[ vol_id, edge_id ]
317           
318            try:
319                res = self.F(t, point_id=i)
320            except Modeltime_too_early, e:
321                raise Modeltime_too_early, e
322            except Modeltime_too_late, e:
323                if self.default_boundary is None:
324                    raise Exception, e # Reraise exception
325                else:
326                    # Pass control to default boundary
327                    res = self.default_boundary.evaluate(vol_id, edge_id)
328                   
329                    # Ensure that result cannot be manipulated
330                    # This is a real danger in case the
331                    # default_boundary is a Dirichlet type
332                    # for instance.
333                    res = res.copy() 
334                   
335                    if self.default_boundary_invoked is False:
336                        # Issue warning the first time
337                        if self.verbose:
338                            msg = '%s' %str(e)
339                            msg += 'Instead I will use the default boundary: %s\n'\
340                                %str(self.default_boundary) 
341                            msg += 'Note: Further warnings will be supressed'
342                            print msg
343                   
344                        # FIXME (Ole): Replace this crude flag with
345                        # Python's ability to print warnings only once.
346                        # See http://docs.python.org/lib/warning-filter.html
347                        self.default_boundary_invoked = True
348                   
349
350            if res == NAN:
351                x,y=self.midpoint_coordinates[i,:]
352                msg = 'NAN value found in file_boundary at '
353                msg += 'point id #%d: (%.2f, %.2f).\n' %(i, x, y)
354
355                if hasattr(self.F, 'indices_outside_mesh') and\
356                       len(self.F.indices_outside_mesh) > 0:
357                    # Check if NAN point is due it being outside
358                    # boundary defined in sww file.
359
360                    if i in self.F.indices_outside_mesh:
361                        msg += 'This point refers to one outside the '
362                        msg += 'mesh defined by the file %s.\n'\
363                               %self.F.filename
364                        msg += 'Make sure that the file covers '
365                        msg += 'the boundary segment it is assigned to '
366                        msg += 'in set_boundary.'
367                    else:
368                        msg += 'This point is inside the mesh defined '
369                        msg += 'the file %s.\n' %self.F.filename
370                        msg += 'Check this file for NANs.'
371                raise Exception, msg
372           
373            return res
374        else:
375            msg = 'Boundary call without point_id not implemented.\n'
376            msg += 'vol_id=%s, edge_id=%s' %(str(vol_id), str(edge_id))
377            raise Exception, msg
Note: See TracBrowser for help on using the repository browser.