source: anuga_core/source/anuga/abstract_2d_finite_volumes/generic_boundary_conditions.py @ 6475

Last change on this file since 6475 was 6475, checked in by myall, 14 years ago

fixed error with verbose keyword for time_boundary

File size: 14.0 KB
Line 
1"""boundary.py - Classes for implementing boundary conditions
2"""
3
4from warnings import warn
5
6from anuga.utilities.numerical_tools import NAN   
7from anuga.fit_interpolate.interpolate import Modeltime_too_late
8from anuga.fit_interpolate.interpolate import Modeltime_too_early
9
10import Numeric as num
11
12
13class Boundary:
14    """Base class for boundary conditions.
15       Specific boundary conditions must provide values for
16       the conserved_quantities
17
18       A boundary object has one neighbour; the one it
19       serves.
20    """
21
22    def __init__(self):
23        pass
24
25    def evaluate(self, vol_id=None, edge_id=None):
26        msg = 'Generic class Boundary must be subclassed'
27        raise Exception, msg
28
29
30class Transmissive_boundary(Boundary):
31    """Transmissive boundary returns same conserved quantities as
32    those present in its neighbour volume.
33
34    Underlying domain must be specified when boundary is instantiated
35    """
36
37    def __init__(self, domain = None):
38        Boundary.__init__(self)
39
40        if domain is None:
41            msg = 'Domain must be specified for transmissive boundary'
42            raise Exception, msg
43
44        self.domain = domain
45
46    def __repr__(self):
47        return 'Transmissive_boundary(%s)' %self.domain
48
49    def evaluate(self, vol_id, edge_id):
50        """Transmissive boundaries return the edge values
51        of the volume they serve.
52        """
53
54        q = self.domain.get_conserved_quantities(vol_id, edge = edge_id)
55        return q
56
57
58class Dirichlet_boundary(Boundary):
59    """Dirichlet boundary returns constant values for the
60    conserved quantities
61    """
62
63
64    def __init__(self, conserved_quantities=None):
65        Boundary.__init__(self)
66
67        if conserved_quantities is None:
68            msg = 'Must specify one value for each conserved quantity'
69            raise Exception, msg
70
71        self.conserved_quantities=num.array(conserved_quantities, num.Float)
72
73    def __repr__(self):
74        return 'Dirichlet boundary (%s)' %self.conserved_quantities
75
76    def evaluate(self, vol_id=None, edge_id=None):
77        return self.conserved_quantities
78
79
80
81class Time_boundary(Boundary):
82    """Time dependent boundary returns values for the
83    conserved quantities as a function of time.
84    Must specify domain to get access to model time and a function f(t)
85    which must return conserved quantities as a function time
86    """
87
88    # FIXME (Ole): We should rename f to function to be consistent with
89    # Transmissive_Momentum_Set_Stage_Boundary (cf posting by rrraman)
90    def __init__(self, domain=None,
91                 f=None,
92                 default_boundary=None,
93                 verbose=False):
94        Boundary.__init__(self)
95
96        self.default_boundary = default_boundary
97        self.default_boundary_invoked = False    # Flag
98        self.domain = domain
99        self.verbose = verbose
100
101        try:
102            q = f(0.0)
103        except Exception, e:
104            msg = 'Function for time boundary could not be executed:\n%s' %e
105            raise msg
106
107
108        try:
109            q = num.array(q, num.Float)
110        except:
111            msg = 'Return value from time boundary function could '
112            msg += 'not be converted into a Numeric array of floats.\n'
113            msg += 'Specified function should return either list or array.\n'
114            msg += 'I got %s' %str(q)
115            raise msg
116
117        msg = 'ERROR: Time boundary function must return a 1d list or array '
118        assert len(q.shape) == 1, msg
119
120        d = len(domain.conserved_quantities)
121        msg = 'Return value for f must be a list or an array of length %d' %d
122        assert len(q) == d, msg
123
124        self.f = f
125        self.domain = domain
126
127    def __repr__(self):
128        return 'Time boundary'
129
130    def evaluate(self, vol_id=None, edge_id=None):
131        # FIXME (Ole): I think this should be get_time(), see ticket:306
132        try:
133            res = self.f(self.domain.time)
134        except Modeltime_too_early, e:
135            raise Modeltime_too_early, e
136        except Modeltime_too_late, e:
137            if self.default_boundary is None:
138                raise Exception, e # Reraise exception
139            else:
140                # Pass control to default boundary
141                res = self.default_boundary.evaluate(vol_id, edge_id)
142               
143                # Ensure that result cannot be manipulated
144                # This is a real danger in case the
145                # default_boundary is a Dirichlet type
146                # for instance.
147                res = res.copy() 
148               
149                if self.default_boundary_invoked is False:
150                    if self.verbose:               
151                        # Issue warning the first time
152                        msg = '%s' %str(e)
153                        msg += 'Instead I will use the default boundary: %s\n'\
154                            %str(self.default_boundary) 
155                        msg += 'Note: Further warnings will be supressed'
156                        print msg
157               
158                    # FIXME (Ole): Replace this crude flag with
159                    # Python's ability to print warnings only once.
160                    # See http://docs.python.org/lib/warning-filter.html
161                    self.default_boundary_invoked = True
162
163        return res
164
165
166
167class File_boundary(Boundary):
168    """The File_boundary reads values for the conserved
169    quantities from an sww NetCDF file, and returns interpolated values
170    at the midpoints of each associated boundary segment.
171    Time dependency is interpolated linearly.
172
173    Assumes that file contains a time series and possibly
174    also spatial info. See docstring for File_function in util.py
175    for details about admissible file formats
176
177    File boundary must read and interpolate from *smoothed* version
178    as stored in sww and cannot work with the discontinuos triangles.
179
180    Example:
181    Bf = File_boundary('source_file.sww', domain)
182
183
184    Note that the resulting solution history is not exactly the same as if
185    the models were coupled as there is no feedback into the source model.
186   
187    Optional keyword argument default_boundary must be either None or
188    an instance of class descending from class Boundary.
189    This will be used in case model time exceeds that available in the
190    underlying data.
191       
192    """
193
194    # FIXME (Ole): I kind of like the name Spatio_Temporal_boundary for this
195    # rather than File_boundary
196
197    def __init__(self, filename, domain, 
198                 time_thinning=1, 
199                 time_limit=None,
200                 boundary_polygon=None,   
201                 default_boundary=None,
202                 use_cache=False, 
203                 verbose=False): 
204
205        import time
206        from anuga.config import time_format
207        from anuga.abstract_2d_finite_volumes.util import file_function
208
209        Boundary.__init__(self)
210
211        # Get x,y vertex coordinates for all triangles
212        V = domain.vertex_coordinates
213
214        # Compute midpoint coordinates for all boundary elements
215        # Only a subset may be invoked when boundary is evaluated but
216        # we don't know which ones at this stage since this object can
217        # be attached to
218        # any tagged boundary later on.
219
220        if verbose: print 'Find midpoint coordinates of entire boundary'
221        self.midpoint_coordinates = num.zeros((len(domain.boundary), 2), num.Float)
222        boundary_keys = domain.boundary.keys()
223
224        xllcorner = domain.geo_reference.get_xllcorner()
225        yllcorner = domain.geo_reference.get_yllcorner()       
226       
227
228        # Make ordering unique #FIXME: should this happen in domain.py?
229        boundary_keys.sort()
230
231        # Record ordering #FIXME: should this also happen in domain.py?
232        self.boundary_indices = {}
233        for i, (vol_id, edge_id) in enumerate(boundary_keys):
234
235            base_index = 3*vol_id
236            x0, y0 = V[base_index, :]
237            x1, y1 = V[base_index+1, :]
238            x2, y2 = V[base_index+2, :]
239           
240            # Compute midpoints
241            if edge_id == 0: m = num.array([(x1 + x2)/2, (y1 + y2)/2], num.Float)
242            if edge_id == 1: m = num.array([(x0 + x2)/2, (y0 + y2)/2], num.Float)
243            if edge_id == 2: m = num.array([(x1 + x0)/2, (y1 + y0)/2], num.Float)
244
245            # Convert to absolute UTM coordinates
246            m[0] += xllcorner
247            m[1] += yllcorner
248           
249            # Register point and index
250            self.midpoint_coordinates[i,:] = m
251
252            # Register index of this boundary edge for use with evaluate
253            self.boundary_indices[(vol_id, edge_id)] = i
254
255        if verbose: print 'Initialise file_function'
256        self.F = file_function(filename,
257                               domain,
258                               quantities=domain.conserved_quantities,
259                               interpolation_points=self.midpoint_coordinates,
260                               time_thinning=time_thinning,
261                               time_limit=time_limit,
262                               use_cache=use_cache, 
263                               verbose=verbose,
264                               boundary_polygon=boundary_polygon)
265                             
266        # Check and store default_boundary
267        msg = 'Keyword argument default_boundary must be either None '
268        msg += 'or a boundary object.\n I got %s' %(str(default_boundary))
269        assert default_boundary is None or\
270            isinstance(default_boundary, Boundary), msg
271        self.default_boundary = default_boundary
272        self.default_boundary_invoked = False    # Flag
273
274        # Store pointer to domain and verbosity
275        self.domain = domain
276        self.verbose = verbose
277
278
279        # Here we'll flag indices outside the mesh as a warning
280        # as suggested by Joaquim Luis in sourceforge posting
281        # November 2007
282        # We won't make it an error as it is conceivable that
283        # only part of mesh boundary is actually used with a given
284        # file boundary sww file.
285        if hasattr(self.F, 'indices_outside_mesh') and\
286               len(self.F.indices_outside_mesh) > 0:
287            msg = 'WARNING: File_boundary has points outside the mesh '
288            msg += 'given in %s. ' %filename
289            msg += 'See warning message issued by Interpolation_function '
290            msg += 'for details (should appear above somewhere if '
291            msg += 'verbose is True).\n'
292            msg += 'This is perfectly OK as long as the points that are '
293            msg += 'outside aren\'t used on the actual boundary segment.'
294            if verbose is True:           
295                print msg
296            #raise Exception(msg)
297
298        # Test that file function can be called
299        q = self.F(0, point_id=0)
300        d = len(domain.conserved_quantities)
301        msg = 'Values specified in file %s must be ' %filename
302        msg += ' a list or an array of length %d' %d
303        assert len(q) == d, msg
304
305
306    def __repr__(self):
307        return 'File boundary'
308
309
310    def evaluate(self, vol_id=None, edge_id=None):
311        """Return linearly interpolated values based on domain.time
312        at midpoint of segment defined by vol_id and edge_id.
313        """
314
315        # FIXME (Ole): I think this should be get_time(), see ticket:306
316        t = self.domain.time
317       
318        if vol_id is not None and edge_id is not None:
319            i = self.boundary_indices[ vol_id, edge_id ]
320           
321            try:
322                res = self.F(t, point_id=i)
323            except Modeltime_too_early, e:
324                raise Modeltime_too_early, e
325            except Modeltime_too_late, e:
326                if self.default_boundary is None:
327                    raise Exception, e # Reraise exception
328                else:
329                    # Pass control to default boundary
330                    res = self.default_boundary.evaluate(vol_id, edge_id)
331                   
332                    # Ensure that result cannot be manipulated
333                    # This is a real danger in case the
334                    # default_boundary is a Dirichlet type
335                    # for instance.
336                    res = res.copy() 
337                   
338                    if self.default_boundary_invoked is False:
339                        # Issue warning the first time
340                        if self.verbose:
341                            msg = '%s' %str(e)
342                            msg += 'Instead I will use the default boundary: %s\n'\
343                                %str(self.default_boundary) 
344                            msg += 'Note: Further warnings will be supressed'
345                            print msg
346                   
347                        # FIXME (Ole): Replace this crude flag with
348                        # Python's ability to print warnings only once.
349                        # See http://docs.python.org/lib/warning-filter.html
350                        self.default_boundary_invoked = True
351                   
352
353            if res == NAN:
354                x,y=self.midpoint_coordinates[i,:]
355                msg = 'NAN value found in file_boundary at '
356                msg += 'point id #%d: (%.2f, %.2f).\n' %(i, x, y)
357
358                if hasattr(self.F, 'indices_outside_mesh') and\
359                       len(self.F.indices_outside_mesh) > 0:
360                    # Check if NAN point is due it being outside
361                    # boundary defined in sww file.
362
363                    if i in self.F.indices_outside_mesh:
364                        msg += 'This point refers to one outside the '
365                        msg += 'mesh defined by the file %s.\n'\
366                               %self.F.filename
367                        msg += 'Make sure that the file covers '
368                        msg += 'the boundary segment it is assigned to '
369                        msg += 'in set_boundary.'
370                    else:
371                        msg += 'This point is inside the mesh defined '
372                        msg += 'the file %s.\n' %self.F.filename
373                        msg += 'Check this file for NANs.'
374                raise Exception, msg
375           
376            return res
377        else:
378            msg = 'Boundary call without point_id not implemented.\n'
379            msg += 'vol_id=%s, edge_id=%s' %(str(vol_id), str(edge_id))
380            raise Exception, msg
Note: See TracBrowser for help on using the repository browser.