source: anuga_core/source/anuga/abstract_2d_finite_volumes/generic_boundary_conditions.py @ 6011

Last change on this file since 6011 was 6011, checked in by ole, 15 years ago

Moved obsolete code away

File size: 12.3 KB
Line 
1"""boundary.py - Classes for implementing boundary conditions
2"""
3
4from warnings import warn
5
6from anuga.utilities.numerical_tools import NAN   
7from anuga.fit_interpolate.interpolate import Modeltime_too_late
8from anuga.fit_interpolate.interpolate import Modeltime_too_early
9
10
11class Boundary:
12    """Base class for boundary conditions.
13       Specific boundary conditions must provide values for
14       the conserved_quantities
15
16       A boundary object has one neighbour; the one it
17       serves.
18    """
19
20    def __init__(self):
21        pass
22
23    def evaluate(self, vol_id=None, edge_id=None):
24        msg = 'Generic class Boundary must be subclassed'
25        raise Exception, msg
26
27
28class Transmissive_boundary(Boundary):
29    """Transmissive boundary returns same conserved quantities as
30    those present in its neighbour volume.
31
32    Underlying domain must be specified when boundary is instantiated
33    """
34
35    def __init__(self, domain = None):
36        Boundary.__init__(self)
37
38        if domain is None:
39            msg = 'Domain must be specified for transmissive boundary'
40            raise Exception, msg
41
42        self.domain = domain
43
44    def __repr__(self):
45        return 'Transmissive_boundary(%s)' %self.domain
46
47    def evaluate(self, vol_id, edge_id):
48        """Transmissive boundaries return the edge values
49        of the volume they serve.
50        """
51
52        q = self.domain.get_conserved_quantities(vol_id, edge = edge_id)
53        return q
54
55
56class Dirichlet_boundary(Boundary):
57    """Dirichlet boundary returns constant values for the
58    conserved quantities
59    """
60
61
62    def __init__(self, conserved_quantities=None):
63        Boundary.__init__(self)
64
65        if conserved_quantities is None:
66            msg = 'Must specify one value for each conserved quantity'
67            raise Exception, msg
68
69        from Numeric import array, Float
70        self.conserved_quantities=array(conserved_quantities).astype(Float)
71
72    def __repr__(self):
73        return 'Dirichlet boundary (%s)' %self.conserved_quantities
74
75    def evaluate(self, vol_id=None, edge_id=None):
76        return self.conserved_quantities
77
78
79
80class Time_boundary(Boundary):
81    """Time dependent boundary returns values for the
82    conserved quantities as a function of time.
83    Must specify domain to get access to model time and a function f(t)
84    which must return conserved quantities as a function time
85    """
86
87    # FIXME (Ole): We should rename f to function to be consistent with
88    # Transmissive_Momentum_Set_Stage_Boundary (cf posting by rrraman)
89    def __init__(self, domain = None, f = None):
90        Boundary.__init__(self)
91
92        try:
93            q = f(0.0)
94        except Exception, e:
95            msg = 'Function for time boundary could not be executed:\n%s' %e
96            raise msg
97
98
99        from Numeric import array, Float
100        try:
101            q = array(q).astype(Float)
102        except:
103            msg = 'Return value from time boundary function could '
104            msg += 'not be converted into a Numeric array of floats.\n'
105            msg += 'Specified function should return either list or array.\n'
106            msg += 'I got %s' %str(q)
107            raise msg
108
109        msg = 'ERROR: Time boundary function must return a 1d list or array '
110        assert len(q.shape) == 1, msg
111
112        d = len(domain.conserved_quantities)
113        msg = 'Return value for f must be a list or an array of length %d' %d
114        assert len(q) == d, msg
115
116        self.f = f
117        self.domain = domain
118
119    def __repr__(self):
120        return 'Time boundary'
121
122    def evaluate(self, vol_id=None, edge_id=None):
123        # FIXME (Ole): I think this should be get_time(), see ticket:306
124        return self.f(self.domain.time)
125
126
127
128class File_boundary(Boundary):
129    """The File_boundary reads values for the conserved
130    quantities from an sww NetCDF file, and returns interpolated values
131    at the midpoints of each associated boundary segment.
132    Time dependency is interpolated linearly.
133
134    Assumes that file contains a time series and possibly
135    also spatial info. See docstring for File_function in util.py
136    for details about admissible file formats
137
138    File boundary must read and interpolate from *smoothed* version
139    as stored in sww and cannot work with the discontinuos triangles.
140
141    Example:
142    Bf = File_boundary('source_file.sww', domain)
143
144
145    Note that the resulting solution history is not exactly the same as if
146    the models were coupled as there is no feedback into the source model.
147   
148    Optional keyword argument default_boundary must be either None or
149    an instance of class descending from class Boundary.
150    This will be used in case model time exceeds that available in the
151    underlying data.
152       
153    """
154
155    # FIXME (Ole): I kind of like the name Spatio_Temporal_boundary for this
156    # rather than File_boundary
157
158    def __init__(self, filename, domain, 
159                 time_thinning=1, 
160                 boundary_polygon=None,   
161                 default_boundary=None,
162                 use_cache=False, 
163                 verbose=False): 
164
165        import time
166        from Numeric import array, zeros, Float
167        from anuga.config import time_format
168        from anuga.abstract_2d_finite_volumes.util import file_function
169
170        Boundary.__init__(self)
171
172        # Get x,y vertex coordinates for all triangles
173        V = domain.vertex_coordinates
174
175        # Compute midpoint coordinates for all boundary elements
176        # Only a subset may be invoked when boundary is evaluated but
177        # we don't know which ones at this stage since this object can
178        # be attached to
179        # any tagged boundary later on.
180
181        if verbose: print 'Find midpoint coordinates of entire boundary'
182        self.midpoint_coordinates = zeros( (len(domain.boundary), 2), Float)
183        boundary_keys = domain.boundary.keys()
184
185        xllcorner = domain.geo_reference.get_xllcorner()
186        yllcorner = domain.geo_reference.get_yllcorner()       
187       
188
189        # Make ordering unique #FIXME: should this happen in domain.py?
190        boundary_keys.sort()
191
192        # Record ordering #FIXME: should this also happen in domain.py?
193        self.boundary_indices = {}
194        for i, (vol_id, edge_id) in enumerate(boundary_keys):
195
196            base_index = 3*vol_id
197            x0, y0 = V[base_index, :]
198            x1, y1 = V[base_index+1, :]
199            x2, y2 = V[base_index+2, :]
200           
201            # Compute midpoints
202            if edge_id == 0: m = array([(x1 + x2)/2, (y1 + y2)/2])
203            if edge_id == 1: m = array([(x0 + x2)/2, (y0 + y2)/2])
204            if edge_id == 2: m = array([(x1 + x0)/2, (y1 + y0)/2])
205
206            # Convert to absolute UTM coordinates
207            m[0] += xllcorner
208            m[1] += yllcorner
209           
210            # Register point and index
211            self.midpoint_coordinates[i,:] = m
212
213            # Register index of this boundary edge for use with evaluate
214            self.boundary_indices[(vol_id, edge_id)] = i
215
216        if verbose: print 'Initialise file_function'
217        self.F = file_function(filename,
218                               domain,
219                               quantities=domain.conserved_quantities,
220                               interpolation_points=self.midpoint_coordinates,
221                               time_thinning=time_thinning,
222                               use_cache=use_cache, 
223                               verbose=verbose,
224                               boundary_polygon=boundary_polygon)
225                             
226        # Check and store default_boundary
227        msg = 'Keyword argument default_boundary must be either None '
228        msg += 'or a boundary object.\n I got %s' %(str(default_boundary))
229        assert default_boundary is None or\
230            isinstance(default_boundary, Boundary), msg
231        self.default_boundary = default_boundary
232        self.default_boundary_invoked = False    # Flag
233
234        # Store pointer to domain and verbosity
235        self.domain = domain
236        self.verbose = verbose
237
238
239        # Here we'll flag indices outside the mesh as a warning
240        # as suggested by Joaquim Luis in sourceforge posting
241        # November 2007
242        # We won't make it an error as it is conceivable that
243        # only part of mesh boundary is actually used with a given
244        # file boundary sww file.
245        if hasattr(self.F, 'indices_outside_mesh') and\
246               len(self.F.indices_outside_mesh) > 0:
247            msg = 'WARNING: File_boundary has points outside the mesh '
248            msg += 'given in %s. ' %filename
249            msg += 'See warning message issued by Interpolation_function '
250            msg += 'for details (should appear above somewhere if '
251            msg += 'verbose is True).\n'
252            msg += 'This is perfectly OK as long as the points that are '
253            msg += 'outside aren\'t used on the actual boundary segment.'
254            if verbose is True:           
255                print msg
256            #raise Exception(msg)
257
258        # Test that file function can be called
259        q = self.F(0, point_id=0)
260        d = len(domain.conserved_quantities)
261        msg = 'Values specified in file %s must be ' %filename
262        msg += ' a list or an array of length %d' %d
263        assert len(q) == d, msg
264
265
266    def __repr__(self):
267        return 'File boundary'
268
269
270    def evaluate(self, vol_id=None, edge_id=None):
271        """Return linearly interpolated values based on domain.time
272        at midpoint of segment defined by vol_id and edge_id.
273        """
274
275        # FIXME (Ole): I think this should be get_time(), see ticket:306
276        t = self.domain.time
277       
278        if vol_id is not None and edge_id is not None:
279            i = self.boundary_indices[ vol_id, edge_id ]
280           
281            try:
282                res = self.F(t, point_id=i)
283            except Modeltime_too_early, e:
284                raise Modeltime_too_early, e
285            except Modeltime_too_late, e:
286                if self.default_boundary is None:
287                    raise Exception, e # Reraise exception
288                else:
289                    # Pass control to default boundary
290                    res = self.default_boundary.evaluate(vol_id, edge_id)
291                   
292                    # Ensure that result cannot be manipulated
293                    # This is a real danger in case the
294                    # default_boundary is a Dirichlet type
295                    # for instance.
296                    res = res.copy() 
297                   
298                    if self.default_boundary_invoked is False:
299                        # Issue warning the first time
300                        msg = '%s' %str(e)
301                        msg += 'Instead I will use the default boundary: %s\n'\
302                            %str(self.default_boundary) 
303                        msg += 'Note: Further warnings will be supressed'
304                        warn(msg)
305                   
306                        # FIXME (Ole): Replace this crude flag with
307                        # Python's ability to print warnings only once.
308                        # See http://docs.python.org/lib/warning-filter.html
309                        self.default_boundary_invoked = True
310                   
311
312            if res == NAN:
313                x,y=self.midpoint_coordinates[i,:]
314                msg = 'NAN value found in file_boundary at '
315                msg += 'point id #%d: (%.2f, %.2f).\n' %(i, x, y)
316
317                if hasattr(self.F, 'indices_outside_mesh') and\
318                       len(self.F.indices_outside_mesh) > 0:
319                    # Check if NAN point is due it being outside
320                    # boundary defined in sww file.
321
322                    if i in self.F.indices_outside_mesh:
323                        msg += 'This point refers to one outside the '
324                        msg += 'mesh defined by the file %s.\n'\
325                               %self.F.filename
326                        msg += 'Make sure that the file covers '
327                        msg += 'the boundary segment it is assigned to '
328                        msg += 'in set_boundary.'
329                    else:
330                        msg += 'This point is inside the mesh defined '
331                        msg += 'the file %s.\n' %self.F.filename
332                        msg += 'Check this file for NANs.'
333                raise Exception, msg
334           
335            return res
336        else:
337            msg = 'Boundary call without point_id not implemented.\n'
338            msg += 'vol_id=%s, edge_id=%s' %(str(vol_id), str(edge_id))
339            raise Exception, msg
Note: See TracBrowser for help on using the repository browser.