source: branches/numpy/anuga/abstract_2d_finite_volumes/generic_boundary_conditions.py @ 6553

Last change on this file since 6553 was 6553, checked in by rwilson, 15 years ago

Merged trunk into numpy, all tests and validations work.

File size: 14.0 KB
Line 
1"""boundary.py - Classes for implementing boundary conditions
2"""
3
4from warnings import warn
5
6from anuga.utilities.numerical_tools import NAN   
7from anuga.fit_interpolate.interpolate import Modeltime_too_late
8from anuga.fit_interpolate.interpolate import Modeltime_too_early
9
10import numpy as num
11
12
13class Boundary:
14    """Base class for boundary conditions.
15       Specific boundary conditions must provide values for
16       the conserved_quantities
17
18       A boundary object has one neighbour; the one it
19       serves.
20    """
21
22    def __init__(self):
23        pass
24
25    def evaluate(self, vol_id=None, edge_id=None):
26        msg = 'Generic class Boundary must be subclassed'
27        raise Exception, msg
28
29
30class Transmissive_boundary(Boundary):
31    """Transmissive boundary returns same conserved quantities as
32    those present in its neighbour volume.
33
34    Underlying domain must be specified when boundary is instantiated
35    """
36
37    def __init__(self, domain = None):
38        Boundary.__init__(self)
39
40        if domain is None:
41            msg = 'Domain must be specified for transmissive boundary'
42            raise Exception, msg
43
44        self.domain = domain
45
46    def __repr__(self):
47        return 'Transmissive_boundary(%s)' %self.domain
48
49    def evaluate(self, vol_id, edge_id):
50        """Transmissive boundaries return the edge values
51        of the volume they serve.
52        """
53
54        q = self.domain.get_conserved_quantities(vol_id, edge = edge_id)
55        return q
56
57
58class Dirichlet_boundary(Boundary):
59    """Dirichlet boundary returns constant values for the
60    conserved quantities
61    """
62
63
64    def __init__(self, conserved_quantities=None):
65        Boundary.__init__(self)
66
67        if conserved_quantities is None:
68            msg = 'Must specify one value for each conserved quantity'
69            raise Exception, msg
70
71        self.conserved_quantities=num.array(conserved_quantities, num.float)
72
73    def __repr__(self):
74        return 'Dirichlet boundary (%s)' %self.conserved_quantities
75
76    def evaluate(self, vol_id=None, edge_id=None):
77        return self.conserved_quantities
78
79
80
81class Time_boundary(Boundary):
82    """Time dependent boundary returns values for the
83    conserved quantities as a function of time.
84    Must specify domain to get access to model time and a function f(t)
85    which must return conserved quantities as a function time
86    """
87
88    # FIXME (Ole): We should rename f to function to be consistent with
89    # Transmissive_Momentum_Set_Stage_Boundary (cf posting by rrraman)
90    def __init__(self, domain=None,
91                 f=None,
92                 default_boundary=None,
93                 verbose=False):
94        Boundary.__init__(self)
95        self.default_boundary = default_boundary
96        self.default_boundary_invoked = False    # Flag
97        self.domain = domain
98        self.verbose = verbose
99
100        try:
101            q = f(0.0)
102        except Exception, e:
103            msg = 'Function for time boundary could not be executed:\n%s' %e
104            raise msg
105
106
107        try:
108            q = num.array(q, num.float)
109        except:
110            msg = 'Return value from time boundary function could '
111            msg += 'not be converted into a numeric array of floats.\n'
112            msg += 'Specified function should return either list or array.\n'
113            msg += 'I got %s' %str(q)
114            raise msg
115
116        msg = 'ERROR: Time boundary function must return a 1d list or array '
117        assert len(q.shape) == 1, msg
118
119        d = len(domain.conserved_quantities)
120        msg = 'Return value for f must be a list or an array of length %d' %d
121        assert len(q) == d, msg
122
123        self.f = f
124        self.domain = domain
125
126    def __repr__(self):
127        return 'Time boundary'
128
129    def evaluate(self, vol_id=None, edge_id=None):
130        # FIXME (Ole): I think this should be get_time(), see ticket:306
131        try:
132            res = self.f(self.domain.time)
133        except Modeltime_too_early, e:
134            raise Modeltime_too_early, e
135        except Modeltime_too_late, e:
136            if self.default_boundary is None:
137                raise Exception, e # Reraise exception
138            else:
139                # Pass control to default boundary
140                res = self.default_boundary.evaluate(vol_id, edge_id)
141               
142                # Ensure that result cannot be manipulated
143                # This is a real danger in case the
144                # default_boundary is a Dirichlet type
145                # for instance.
146                res = res.copy() 
147               
148                if self.default_boundary_invoked is False:
149                    if self.verbose:               
150                        # Issue warning the first time
151                        msg = '%s' %str(e)
152                        msg += 'Instead I will use the default boundary: %s\n'\
153                            %str(self.default_boundary) 
154                        msg += 'Note: Further warnings will be supressed'
155                        print msg
156               
157                    # FIXME (Ole): Replace this crude flag with
158                    # Python's ability to print warnings only once.
159                    # See http://docs.python.org/lib/warning-filter.html
160                    self.default_boundary_invoked = True
161
162        return res
163
164
165
166class File_boundary(Boundary):
167    """The File_boundary reads values for the conserved
168    quantities from an sww NetCDF file, and returns interpolated values
169    at the midpoints of each associated boundary segment.
170    Time dependency is interpolated linearly.
171
172    Assumes that file contains a time series and possibly
173    also spatial info. See docstring for File_function in util.py
174    for details about admissible file formats
175
176    File boundary must read and interpolate from *smoothed* version
177    as stored in sww and cannot work with the discontinuos triangles.
178
179    Example:
180    Bf = File_boundary('source_file.sww', domain)
181
182
183    Note that the resulting solution history is not exactly the same as if
184    the models were coupled as there is no feedback into the source model.
185   
186    Optional keyword argument default_boundary must be either None or
187    an instance of class descending from class Boundary.
188    This will be used in case model time exceeds that available in the
189    underlying data.
190       
191    """
192
193    # FIXME (Ole): I kind of like the name Spatio_Temporal_boundary for this
194    # rather than File_boundary
195
196    def __init__(self, filename, domain, 
197                 time_thinning=1, 
198                 time_limit=None,
199                 boundary_polygon=None,   
200                 default_boundary=None,
201                 use_cache=False, 
202                 verbose=False): 
203
204        import time
205        from anuga.config import time_format
206        from anuga.abstract_2d_finite_volumes.util import file_function
207
208        Boundary.__init__(self)
209
210        # Get x,y vertex coordinates for all triangles
211        V = domain.vertex_coordinates
212
213        # Compute midpoint coordinates for all boundary elements
214        # Only a subset may be invoked when boundary is evaluated but
215        # we don't know which ones at this stage since this object can
216        # be attached to
217        # any tagged boundary later on.
218
219        if verbose: print 'Find midpoint coordinates of entire boundary'
220        self.midpoint_coordinates = num.zeros((len(domain.boundary), 2), num.float)
221        boundary_keys = domain.boundary.keys()
222
223        xllcorner = domain.geo_reference.get_xllcorner()
224        yllcorner = domain.geo_reference.get_yllcorner()       
225       
226
227        # Make ordering unique #FIXME: should this happen in domain.py?
228        boundary_keys.sort()
229
230        # Record ordering #FIXME: should this also happen in domain.py?
231        self.boundary_indices = {}
232        for i, (vol_id, edge_id) in enumerate(boundary_keys):
233
234            base_index = 3*vol_id
235            x0, y0 = V[base_index, :]
236            x1, y1 = V[base_index+1, :]
237            x2, y2 = V[base_index+2, :]
238           
239            # Compute midpoints
240            if edge_id == 0: m = num.array([(x1 + x2)/2, (y1 + y2)/2], num.float)
241            if edge_id == 1: m = num.array([(x0 + x2)/2, (y0 + y2)/2], num.float)
242            if edge_id == 2: m = num.array([(x1 + x0)/2, (y1 + y0)/2], num.float)
243
244            # Convert to absolute UTM coordinates
245            m[0] += xllcorner
246            m[1] += yllcorner
247           
248            # Register point and index
249            self.midpoint_coordinates[i,:] = m
250
251            # Register index of this boundary edge for use with evaluate
252            self.boundary_indices[(vol_id, edge_id)] = i
253
254        if verbose: print 'Initialise file_function'
255        self.F = file_function(filename,
256                               domain,
257                               quantities=domain.conserved_quantities,
258                               interpolation_points=self.midpoint_coordinates,
259                               time_thinning=time_thinning,
260                               time_limit=time_limit,
261                               use_cache=use_cache, 
262                               verbose=verbose,
263                               boundary_polygon=boundary_polygon)
264                             
265        # Check and store default_boundary
266        msg = 'Keyword argument default_boundary must be either None '
267        msg += 'or a boundary object.\n I got %s' %(str(default_boundary))
268        assert default_boundary is None or\
269            isinstance(default_boundary, Boundary), msg
270        self.default_boundary = default_boundary
271        self.default_boundary_invoked = False    # Flag
272
273        # Store pointer to domain and verbosity
274        self.domain = domain
275        self.verbose = verbose
276
277
278        # Here we'll flag indices outside the mesh as a warning
279        # as suggested by Joaquim Luis in sourceforge posting
280        # November 2007
281        # We won't make it an error as it is conceivable that
282        # only part of mesh boundary is actually used with a given
283        # file boundary sww file.
284        if hasattr(self.F, 'indices_outside_mesh') and\
285               len(self.F.indices_outside_mesh) > 0:
286            msg = 'WARNING: File_boundary has points outside the mesh '
287            msg += 'given in %s. ' %filename
288            msg += 'See warning message issued by Interpolation_function '
289            msg += 'for details (should appear above somewhere if '
290            msg += 'verbose is True).\n'
291            msg += 'This is perfectly OK as long as the points that are '
292            msg += 'outside aren\'t used on the actual boundary segment.'
293            if verbose is True:           
294                print msg
295            #raise Exception(msg)
296
297        # Test that file function can be called
298        q = self.F(0, point_id=0)
299        d = len(domain.conserved_quantities)
300        msg = 'Values specified in file %s must be ' %filename
301        msg += ' a list or an array of length %d' %d
302        assert len(q) == d, msg
303
304
305    def __repr__(self):
306        return 'File boundary'
307
308
309    def evaluate(self, vol_id=None, edge_id=None):
310        """Return linearly interpolated values based on domain.time
311        at midpoint of segment defined by vol_id and edge_id.
312        """
313
314        # FIXME (Ole): I think this should be get_time(), see ticket:306
315        t = self.domain.time
316       
317        if vol_id is not None and edge_id is not None:
318            i = self.boundary_indices[ vol_id, edge_id ]
319           
320            try:
321                res = self.F(t, point_id=i)
322            except Modeltime_too_early, e:
323                raise Modeltime_too_early, e
324            except Modeltime_too_late, e:
325                if self.default_boundary is None:
326                    raise Exception, e # Reraise exception
327                else:
328                    # Pass control to default boundary
329                    res = self.default_boundary.evaluate(vol_id, edge_id)
330                   
331                    # Ensure that result cannot be manipulated
332                    # This is a real danger in case the
333                    # default_boundary is a Dirichlet type
334                    # for instance.
335                    res = res.copy() 
336                   
337                    if self.default_boundary_invoked is False:
338                        # Issue warning the first time
339                        if self.verbose:
340                            msg = '%s' %str(e)
341                            msg += 'Instead I will use the default boundary: %s\n'\
342                                %str(self.default_boundary) 
343                            msg += 'Note: Further warnings will be supressed'
344                            print msg
345                   
346                        # FIXME (Ole): Replace this crude flag with
347                        # Python's ability to print warnings only once.
348                        # See http://docs.python.org/lib/warning-filter.html
349                        self.default_boundary_invoked = True
350           
351            if num.any(res == NAN):
352                x,y=self.midpoint_coordinates[i,:]
353                msg = 'NAN value found in file_boundary at '
354                msg += 'point id #%d: (%.2f, %.2f).\n' %(i, x, y)
355
356                if hasattr(self.F, 'indices_outside_mesh') and\
357                       len(self.F.indices_outside_mesh) > 0:
358                    # Check if NAN point is due it being outside
359                    # boundary defined in sww file.
360
361                    if i in self.F.indices_outside_mesh:
362                        msg += 'This point refers to one outside the '
363                        msg += 'mesh defined by the file %s.\n'\
364                               %self.F.filename
365                        msg += 'Make sure that the file covers '
366                        msg += 'the boundary segment it is assigned to '
367                        msg += 'in set_boundary.'
368                    else:
369                        msg += 'This point is inside the mesh defined '
370                        msg += 'the file %s.\n' %self.F.filename
371                        msg += 'Check this file for NANs.'
372                raise Exception, msg
373           
374            return res
375        else:
376            msg = 'Boundary call without point_id not implemented.\n'
377            msg += 'vol_id=%s, edge_id=%s' %(str(vol_id), str(edge_id))
378            raise Exception, msg
Note: See TracBrowser for help on using the repository browser.