source: anuga_core/source/anuga/abstract_2d_finite_volumes/generic_boundary_conditions.py @ 6449

Last change on this file since 6449 was 6449, checked in by kristy, 15 years ago

added boundary_default to Time_boundary

File size: 13.8 KB
Line 
1"""boundary.py - Classes for implementing boundary conditions
2"""
3
4from warnings import warn
5
6from anuga.utilities.numerical_tools import NAN   
7from anuga.fit_interpolate.interpolate import Modeltime_too_late
8from anuga.fit_interpolate.interpolate import Modeltime_too_early
9
10import Numeric as num
11
12
13class Boundary:
14    """Base class for boundary conditions.
15       Specific boundary conditions must provide values for
16       the conserved_quantities
17
18       A boundary object has one neighbour; the one it
19       serves.
20    """
21
22    def __init__(self):
23        pass
24
25    def evaluate(self, vol_id=None, edge_id=None):
26        msg = 'Generic class Boundary must be subclassed'
27        raise Exception, msg
28
29
30class Transmissive_boundary(Boundary):
31    """Transmissive boundary returns same conserved quantities as
32    those present in its neighbour volume.
33
34    Underlying domain must be specified when boundary is instantiated
35    """
36
37    def __init__(self, domain = None):
38        Boundary.__init__(self)
39
40        if domain is None:
41            msg = 'Domain must be specified for transmissive boundary'
42            raise Exception, msg
43
44        self.domain = domain
45
46    def __repr__(self):
47        return 'Transmissive_boundary(%s)' %self.domain
48
49    def evaluate(self, vol_id, edge_id):
50        """Transmissive boundaries return the edge values
51        of the volume they serve.
52        """
53
54        q = self.domain.get_conserved_quantities(vol_id, edge = edge_id)
55        return q
56
57
58class Dirichlet_boundary(Boundary):
59    """Dirichlet boundary returns constant values for the
60    conserved quantities
61    """
62
63
64    def __init__(self, conserved_quantities=None):
65        Boundary.__init__(self)
66
67        if conserved_quantities is None:
68            msg = 'Must specify one value for each conserved quantity'
69            raise Exception, msg
70
71        self.conserved_quantities=num.array(conserved_quantities, num.Float)
72
73    def __repr__(self):
74        return 'Dirichlet boundary (%s)' %self.conserved_quantities
75
76    def evaluate(self, vol_id=None, edge_id=None):
77        return self.conserved_quantities
78
79
80
81class Time_boundary(Boundary):
82    """Time dependent boundary returns values for the
83    conserved quantities as a function of time.
84    Must specify domain to get access to model time and a function f(t)
85    which must return conserved quantities as a function time
86    """
87
88    # FIXME (Ole): We should rename f to function to be consistent with
89    # Transmissive_Momentum_Set_Stage_Boundary (cf posting by rrraman)
90    def __init__(self, domain=None,
91                 f=None,
92                 default_boundary=None):
93        Boundary.__init__(self)
94
95        self.default_boundary = default_boundary
96        self.default_boundary_invoked = False    # Flag
97
98        try:
99            q = f(0.0)
100        except Exception, e:
101            msg = 'Function for time boundary could not be executed:\n%s' %e
102            raise msg
103
104
105        try:
106            q = num.array(q, num.Float)
107        except:
108            msg = 'Return value from time boundary function could '
109            msg += 'not be converted into a Numeric array of floats.\n'
110            msg += 'Specified function should return either list or array.\n'
111            msg += 'I got %s' %str(q)
112            raise msg
113
114        msg = 'ERROR: Time boundary function must return a 1d list or array '
115        assert len(q.shape) == 1, msg
116
117        d = len(domain.conserved_quantities)
118        msg = 'Return value for f must be a list or an array of length %d' %d
119        assert len(q) == d, msg
120
121        self.f = f
122        self.domain = domain
123
124    def __repr__(self):
125        return 'Time boundary'
126
127    def evaluate(self, vol_id=None, edge_id=None):
128        # FIXME (Ole): I think this should be get_time(), see ticket:306
129        try:
130            res = self.f(self.domain.time)
131        except Modeltime_too_early, e:
132            raise Modeltime_too_early, e
133        except Modeltime_too_late, e:
134            if self.default_boundary is None:
135                raise Exception, e # Reraise exception
136            else:
137                # Pass control to default boundary
138                res = self.default_boundary.evaluate(vol_id, edge_id)
139               
140                # Ensure that result cannot be manipulated
141                # This is a real danger in case the
142                # default_boundary is a Dirichlet type
143                # for instance.
144                res = res.copy() 
145               
146                if self.default_boundary_invoked is False:
147                    # Issue warning the first time
148                    msg = '%s' %str(e)
149                    msg += 'Instead I will use the default boundary: %s\n'\
150                        %str(self.default_boundary) 
151                    msg += 'Note: Further warnings will be supressed'
152                    print msg
153               
154                    # FIXME (Ole): Replace this crude flag with
155                    # Python's ability to print warnings only once.
156                    # See http://docs.python.org/lib/warning-filter.html
157                    self.default_boundary_invoked = True
158
159        return res
160
161
162
163class File_boundary(Boundary):
164    """The File_boundary reads values for the conserved
165    quantities from an sww NetCDF file, and returns interpolated values
166    at the midpoints of each associated boundary segment.
167    Time dependency is interpolated linearly.
168
169    Assumes that file contains a time series and possibly
170    also spatial info. See docstring for File_function in util.py
171    for details about admissible file formats
172
173    File boundary must read and interpolate from *smoothed* version
174    as stored in sww and cannot work with the discontinuos triangles.
175
176    Example:
177    Bf = File_boundary('source_file.sww', domain)
178
179
180    Note that the resulting solution history is not exactly the same as if
181    the models were coupled as there is no feedback into the source model.
182   
183    Optional keyword argument default_boundary must be either None or
184    an instance of class descending from class Boundary.
185    This will be used in case model time exceeds that available in the
186    underlying data.
187       
188    """
189
190    # FIXME (Ole): I kind of like the name Spatio_Temporal_boundary for this
191    # rather than File_boundary
192
193    def __init__(self, filename, domain, 
194                 time_thinning=1, 
195                 time_limit=None,
196                 boundary_polygon=None,   
197                 default_boundary=None,
198                 use_cache=False, 
199                 verbose=False): 
200
201        import time
202        from anuga.config import time_format
203        from anuga.abstract_2d_finite_volumes.util import file_function
204
205        Boundary.__init__(self)
206
207        # Get x,y vertex coordinates for all triangles
208        V = domain.vertex_coordinates
209
210        # Compute midpoint coordinates for all boundary elements
211        # Only a subset may be invoked when boundary is evaluated but
212        # we don't know which ones at this stage since this object can
213        # be attached to
214        # any tagged boundary later on.
215
216        if verbose: print 'Find midpoint coordinates of entire boundary'
217        self.midpoint_coordinates = num.zeros((len(domain.boundary), 2), num.Float)
218        boundary_keys = domain.boundary.keys()
219
220        xllcorner = domain.geo_reference.get_xllcorner()
221        yllcorner = domain.geo_reference.get_yllcorner()       
222       
223
224        # Make ordering unique #FIXME: should this happen in domain.py?
225        boundary_keys.sort()
226
227        # Record ordering #FIXME: should this also happen in domain.py?
228        self.boundary_indices = {}
229        for i, (vol_id, edge_id) in enumerate(boundary_keys):
230
231            base_index = 3*vol_id
232            x0, y0 = V[base_index, :]
233            x1, y1 = V[base_index+1, :]
234            x2, y2 = V[base_index+2, :]
235           
236            # Compute midpoints
237            if edge_id == 0: m = num.array([(x1 + x2)/2, (y1 + y2)/2], num.Float)
238            if edge_id == 1: m = num.array([(x0 + x2)/2, (y0 + y2)/2], num.Float)
239            if edge_id == 2: m = num.array([(x1 + x0)/2, (y1 + y0)/2], num.Float)
240
241            # Convert to absolute UTM coordinates
242            m[0] += xllcorner
243            m[1] += yllcorner
244           
245            # Register point and index
246            self.midpoint_coordinates[i,:] = m
247
248            # Register index of this boundary edge for use with evaluate
249            self.boundary_indices[(vol_id, edge_id)] = i
250
251        if verbose: print 'Initialise file_function'
252        self.F = file_function(filename,
253                               domain,
254                               quantities=domain.conserved_quantities,
255                               interpolation_points=self.midpoint_coordinates,
256                               time_thinning=time_thinning,
257                               time_limit=time_limit,
258                               use_cache=use_cache, 
259                               verbose=verbose,
260                               boundary_polygon=boundary_polygon)
261                             
262        # Check and store default_boundary
263        msg = 'Keyword argument default_boundary must be either None '
264        msg += 'or a boundary object.\n I got %s' %(str(default_boundary))
265        assert default_boundary is None or\
266            isinstance(default_boundary, Boundary), msg
267        self.default_boundary = default_boundary
268        self.default_boundary_invoked = False    # Flag
269
270        # Store pointer to domain and verbosity
271        self.domain = domain
272        self.verbose = verbose
273
274
275        # Here we'll flag indices outside the mesh as a warning
276        # as suggested by Joaquim Luis in sourceforge posting
277        # November 2007
278        # We won't make it an error as it is conceivable that
279        # only part of mesh boundary is actually used with a given
280        # file boundary sww file.
281        if hasattr(self.F, 'indices_outside_mesh') and\
282               len(self.F.indices_outside_mesh) > 0:
283            msg = 'WARNING: File_boundary has points outside the mesh '
284            msg += 'given in %s. ' %filename
285            msg += 'See warning message issued by Interpolation_function '
286            msg += 'for details (should appear above somewhere if '
287            msg += 'verbose is True).\n'
288            msg += 'This is perfectly OK as long as the points that are '
289            msg += 'outside aren\'t used on the actual boundary segment.'
290            if verbose is True:           
291                print msg
292            #raise Exception(msg)
293
294        # Test that file function can be called
295        q = self.F(0, point_id=0)
296        d = len(domain.conserved_quantities)
297        msg = 'Values specified in file %s must be ' %filename
298        msg += ' a list or an array of length %d' %d
299        assert len(q) == d, msg
300
301
302    def __repr__(self):
303        return 'File boundary'
304
305
306    def evaluate(self, vol_id=None, edge_id=None):
307        """Return linearly interpolated values based on domain.time
308        at midpoint of segment defined by vol_id and edge_id.
309        """
310
311        # FIXME (Ole): I think this should be get_time(), see ticket:306
312        t = self.domain.time
313       
314        if vol_id is not None and edge_id is not None:
315            i = self.boundary_indices[ vol_id, edge_id ]
316           
317            try:
318                res = self.F(t, point_id=i)
319            except Modeltime_too_early, e:
320                raise Modeltime_too_early, e
321            except Modeltime_too_late, e:
322                if self.default_boundary is None:
323                    raise Exception, e # Reraise exception
324                else:
325                    # Pass control to default boundary
326                    res = self.default_boundary.evaluate(vol_id, edge_id)
327                   
328                    # Ensure that result cannot be manipulated
329                    # This is a real danger in case the
330                    # default_boundary is a Dirichlet type
331                    # for instance.
332                    res = res.copy() 
333                   
334                    if self.default_boundary_invoked is False:
335                        # Issue warning the first time
336                        msg = '%s' %str(e)
337                        msg += 'Instead I will use the default boundary: %s\n'\
338                            %str(self.default_boundary) 
339                        msg += 'Note: Further warnings will be supressed'
340                        warn(msg)
341                   
342                        # FIXME (Ole): Replace this crude flag with
343                        # Python's ability to print warnings only once.
344                        # See http://docs.python.org/lib/warning-filter.html
345                        self.default_boundary_invoked = True
346                   
347
348            if res == NAN:
349                x,y=self.midpoint_coordinates[i,:]
350                msg = 'NAN value found in file_boundary at '
351                msg += 'point id #%d: (%.2f, %.2f).\n' %(i, x, y)
352
353                if hasattr(self.F, 'indices_outside_mesh') and\
354                       len(self.F.indices_outside_mesh) > 0:
355                    # Check if NAN point is due it being outside
356                    # boundary defined in sww file.
357
358                    if i in self.F.indices_outside_mesh:
359                        msg += 'This point refers to one outside the '
360                        msg += 'mesh defined by the file %s.\n'\
361                               %self.F.filename
362                        msg += 'Make sure that the file covers '
363                        msg += 'the boundary segment it is assigned to '
364                        msg += 'in set_boundary.'
365                    else:
366                        msg += 'This point is inside the mesh defined '
367                        msg += 'the file %s.\n' %self.F.filename
368                        msg += 'Check this file for NANs.'
369                raise Exception, msg
370           
371            return res
372        else:
373            msg = 'Boundary call without point_id not implemented.\n'
374            msg += 'vol_id=%s, edge_id=%s' %(str(vol_id), str(edge_id))
375            raise Exception, msg
Note: See TracBrowser for help on using the repository browser.