source: branches/numpy/anuga/abstract_2d_finite_volumes/generic_boundary_conditions.py @ 6463

Last change on this file since 6463 was 6304, checked in by rwilson, 15 years ago

Initial commit of numpy changes. Still a long way to go.

File size: 12.4 KB
Line 
1"""boundary.py - Classes for implementing boundary conditions
2"""
3
4from warnings import warn
5
6from anuga.utilities.numerical_tools import NAN   
7from anuga.fit_interpolate.interpolate import Modeltime_too_late
8from anuga.fit_interpolate.interpolate import Modeltime_too_early
9
10import numpy as num
11
12
13class Boundary:
14    """Base class for boundary conditions.
15       Specific boundary conditions must provide values for
16       the conserved_quantities
17
18       A boundary object has one neighbour; the one it
19       serves.
20    """
21
22    def __init__(self):
23        pass
24
25    def evaluate(self, vol_id=None, edge_id=None):
26        msg = 'Generic class Boundary must be subclassed'
27        raise Exception, msg
28
29
30class Transmissive_boundary(Boundary):
31    """Transmissive boundary returns same conserved quantities as
32    those present in its neighbour volume.
33
34    Underlying domain must be specified when boundary is instantiated
35    """
36
37    def __init__(self, domain = None):
38        Boundary.__init__(self)
39
40        if domain is None:
41            msg = 'Domain must be specified for transmissive boundary'
42            raise Exception, msg
43
44        self.domain = domain
45
46    def __repr__(self):
47        return 'Transmissive_boundary(%s)' %self.domain
48
49    def evaluate(self, vol_id, edge_id):
50        """Transmissive boundaries return the edge values
51        of the volume they serve.
52        """
53
54        q = self.domain.get_conserved_quantities(vol_id, edge = edge_id)
55        return q
56
57
58class Dirichlet_boundary(Boundary):
59    """Dirichlet boundary returns constant values for the
60    conserved quantities
61    """
62
63
64    def __init__(self, conserved_quantities=None):
65        Boundary.__init__(self)
66
67        if conserved_quantities is None:
68            msg = 'Must specify one value for each conserved quantity'
69            raise Exception, msg
70
71        self.conserved_quantities=num.array(conserved_quantities, num.float)
72
73    def __repr__(self):
74        return 'Dirichlet boundary (%s)' %self.conserved_quantities
75
76    def evaluate(self, vol_id=None, edge_id=None):
77        return self.conserved_quantities
78
79
80
81class Time_boundary(Boundary):
82    """Time dependent boundary returns values for the
83    conserved quantities as a function of time.
84    Must specify domain to get access to model time and a function f(t)
85    which must return conserved quantities as a function time
86    """
87
88    # FIXME (Ole): We should rename f to function to be consistent with
89    # Transmissive_Momentum_Set_Stage_Boundary (cf posting by rrraman)
90    def __init__(self, domain = None, f = None):
91        Boundary.__init__(self)
92
93        try:
94            q = f(0.0)
95        except Exception, e:
96            msg = 'Function for time boundary could not be executed:\n%s' %e
97            raise msg
98
99
100        try:
101            q = num.array(q, num.float)
102        except:
103            msg = 'Return value from time boundary function could '
104            msg += 'not be converted into a numeric array of floats.\n'
105            msg += 'Specified function should return either list or array.\n'
106            msg += 'I got %s' %str(q)
107            raise msg
108
109        msg = 'ERROR: Time boundary function must return a 1d list or array '
110        assert len(q.shape) == 1, msg
111
112        d = len(domain.conserved_quantities)
113        msg = 'Return value for f must be a list or an array of length %d' %d
114        assert len(q) == d, msg
115
116        self.f = f
117        self.domain = domain
118
119    def __repr__(self):
120        return 'Time boundary'
121
122    def evaluate(self, vol_id=None, edge_id=None):
123        # FIXME (Ole): I think this should be get_time(), see ticket:306
124        return self.f(self.domain.time)
125
126
127
128class File_boundary(Boundary):
129    """The File_boundary reads values for the conserved
130    quantities from an sww NetCDF file, and returns interpolated values
131    at the midpoints of each associated boundary segment.
132    Time dependency is interpolated linearly.
133
134    Assumes that file contains a time series and possibly
135    also spatial info. See docstring for File_function in util.py
136    for details about admissible file formats
137
138    File boundary must read and interpolate from *smoothed* version
139    as stored in sww and cannot work with the discontinuos triangles.
140
141    Example:
142    Bf = File_boundary('source_file.sww', domain)
143
144
145    Note that the resulting solution history is not exactly the same as if
146    the models were coupled as there is no feedback into the source model.
147   
148    Optional keyword argument default_boundary must be either None or
149    an instance of class descending from class Boundary.
150    This will be used in case model time exceeds that available in the
151    underlying data.
152       
153    """
154
155    # FIXME (Ole): I kind of like the name Spatio_Temporal_boundary for this
156    # rather than File_boundary
157
158    def __init__(self, filename, domain, 
159                 time_thinning=1, 
160                 time_limit=None,
161                 boundary_polygon=None,   
162                 default_boundary=None,
163                 use_cache=False, 
164                 verbose=False): 
165
166        import time
167        from anuga.config import time_format
168        from anuga.abstract_2d_finite_volumes.util import file_function
169
170        Boundary.__init__(self)
171
172        # Get x,y vertex coordinates for all triangles
173        V = domain.vertex_coordinates
174
175        # Compute midpoint coordinates for all boundary elements
176        # Only a subset may be invoked when boundary is evaluated but
177        # we don't know which ones at this stage since this object can
178        # be attached to
179        # any tagged boundary later on.
180
181        if verbose: print 'Find midpoint coordinates of entire boundary'
182        self.midpoint_coordinates = num.zeros((len(domain.boundary), 2), num.float)
183        boundary_keys = domain.boundary.keys()
184
185        xllcorner = domain.geo_reference.get_xllcorner()
186        yllcorner = domain.geo_reference.get_yllcorner()       
187       
188
189        # Make ordering unique #FIXME: should this happen in domain.py?
190        boundary_keys.sort()
191
192        # Record ordering #FIXME: should this also happen in domain.py?
193        self.boundary_indices = {}
194        for i, (vol_id, edge_id) in enumerate(boundary_keys):
195
196            base_index = 3*vol_id
197            x0, y0 = V[base_index, :]
198            x1, y1 = V[base_index+1, :]
199            x2, y2 = V[base_index+2, :]
200           
201            # Compute midpoints
202            if edge_id == 0: m = num.array([(x1 + x2)/2, (y1 + y2)/2], num.float)
203            if edge_id == 1: m = num.array([(x0 + x2)/2, (y0 + y2)/2], num.float)
204            if edge_id == 2: m = num.array([(x1 + x0)/2, (y1 + y0)/2], num.float)
205
206            # Convert to absolute UTM coordinates
207            m[0] += xllcorner
208            m[1] += yllcorner
209           
210            # Register point and index
211            self.midpoint_coordinates[i,:] = m
212
213            # Register index of this boundary edge for use with evaluate
214            self.boundary_indices[(vol_id, edge_id)] = i
215
216        if verbose: print 'Initialise file_function'
217        self.F = file_function(filename,
218                               domain,
219                               quantities=domain.conserved_quantities,
220                               interpolation_points=self.midpoint_coordinates,
221                               time_thinning=time_thinning,
222                               time_limit=time_limit,
223                               use_cache=use_cache, 
224                               verbose=verbose,
225                               boundary_polygon=boundary_polygon)
226                             
227        # Check and store default_boundary
228        msg = 'Keyword argument default_boundary must be either None '
229        msg += 'or a boundary object.\n I got %s' %(str(default_boundary))
230        assert default_boundary is None or\
231            isinstance(default_boundary, Boundary), msg
232        self.default_boundary = default_boundary
233        self.default_boundary_invoked = False    # Flag
234
235        # Store pointer to domain and verbosity
236        self.domain = domain
237        self.verbose = verbose
238
239
240        # Here we'll flag indices outside the mesh as a warning
241        # as suggested by Joaquim Luis in sourceforge posting
242        # November 2007
243        # We won't make it an error as it is conceivable that
244        # only part of mesh boundary is actually used with a given
245        # file boundary sww file.
246        if hasattr(self.F, 'indices_outside_mesh') and\
247               len(self.F.indices_outside_mesh) > 0:
248            msg = 'WARNING: File_boundary has points outside the mesh '
249            msg += 'given in %s. ' %filename
250            msg += 'See warning message issued by Interpolation_function '
251            msg += 'for details (should appear above somewhere if '
252            msg += 'verbose is True).\n'
253            msg += 'This is perfectly OK as long as the points that are '
254            msg += 'outside aren\'t used on the actual boundary segment.'
255            if verbose is True:           
256                print msg
257            #raise Exception(msg)
258
259        # Test that file function can be called
260        q = self.F(0, point_id=0)
261        d = len(domain.conserved_quantities)
262        msg = 'Values specified in file %s must be ' %filename
263        msg += ' a list or an array of length %d' %d
264        assert len(q) == d, msg
265
266
267    def __repr__(self):
268        return 'File boundary'
269
270
271    def evaluate(self, vol_id=None, edge_id=None):
272        """Return linearly interpolated values based on domain.time
273        at midpoint of segment defined by vol_id and edge_id.
274        """
275
276        # FIXME (Ole): I think this should be get_time(), see ticket:306
277        t = self.domain.time
278       
279        if vol_id is not None and edge_id is not None:
280            i = self.boundary_indices[ vol_id, edge_id ]
281           
282            try:
283                res = self.F(t, point_id=i)
284            except Modeltime_too_early, e:
285                raise Modeltime_too_early, e
286            except Modeltime_too_late, e:
287                if self.default_boundary is None:
288                    raise Exception, e # Reraise exception
289                else:
290                    # Pass control to default boundary
291                    res = self.default_boundary.evaluate(vol_id, edge_id)
292                   
293                    # Ensure that result cannot be manipulated
294                    # This is a real danger in case the
295                    # default_boundary is a Dirichlet type
296                    # for instance.
297                    res = res.copy() 
298                   
299                    if self.default_boundary_invoked is False:
300                        # Issue warning the first time
301                        msg = '%s' %str(e)
302                        msg += 'Instead I will use the default boundary: %s\n'\
303                            %str(self.default_boundary) 
304                        msg += 'Note: Further warnings will be supressed'
305                        warn(msg)
306                   
307                        # FIXME (Ole): Replace this crude flag with
308                        # Python's ability to print warnings only once.
309                        # See http://docs.python.org/lib/warning-filter.html
310                        self.default_boundary_invoked = True
311           
312            if num.any(res == NAN):
313                x,y=self.midpoint_coordinates[i,:]
314                msg = 'NAN value found in file_boundary at '
315                msg += 'point id #%d: (%.2f, %.2f).\n' %(i, x, y)
316
317                if hasattr(self.F, 'indices_outside_mesh') and\
318                       len(self.F.indices_outside_mesh) > 0:
319                    # Check if NAN point is due it being outside
320                    # boundary defined in sww file.
321
322                    if i in self.F.indices_outside_mesh:
323                        msg += 'This point refers to one outside the '
324                        msg += 'mesh defined by the file %s.\n'\
325                               %self.F.filename
326                        msg += 'Make sure that the file covers '
327                        msg += 'the boundary segment it is assigned to '
328                        msg += 'in set_boundary.'
329                    else:
330                        msg += 'This point is inside the mesh defined '
331                        msg += 'the file %s.\n' %self.F.filename
332                        msg += 'Check this file for NANs.'
333                raise Exception, msg
334           
335            return res
336        else:
337            msg = 'Boundary call without point_id not implemented.\n'
338            msg += 'vol_id=%s, edge_id=%s' %(str(vol_id), str(edge_id))
339            raise Exception, msg
Note: See TracBrowser for help on using the repository browser.