Changeset 7733


Ignore:
Timestamp:
May 19, 2010, 10:13:06 AM (14 years ago)
Author:
hudson
Message:

Fixed unit test failures.

Location:
anuga_core/source/anuga
Files:
2 added
4 edited

Legend:

Unmodified
Added
Removed
  • anuga_core/source/anuga/shallow_water/shallow_water_domain.py

    r7731 r7733  
    8383import numpy as num
    8484
    85 from anuga.abstract_2d_finite_volumes.neighbour_mesh import segment_midpoints
    8685from anuga.abstract_2d_finite_volumes.domain import Domain as Generic_Domain
    87 from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\
    88      import Boundary
    89 from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\
    90      import File_boundary
    91 from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\
    92      import Dirichlet_boundary
    93 from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\
    94      import Time_boundary
    95 from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\
    96      import Transmissive_boundary
    97 from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\
    98      import AWI_boundary
    99 
     86
     87from anuga.shallow_water.forcing import Cross_section
    10088from anuga.pmesh.mesh_interface import create_mesh_from_regions
    10189from anuga.utilities.numerical_tools import gradient, mean, ensure_numeric
     
    14321420
    14331421
    1434 ################################################################################
    1435 # Experimental auxiliary functions
    1436 ################################################################################
    1437 
    1438 ##
    1439 # @brief Check forcefield parameter.
    1440 # @param f Object to check.
    1441 # @note 'f' may be a callable object or a scalar value.
    1442 def check_forcefield(f):
    1443     """Check that force object is as expected.
    1444    
    1445     Check that f is either:
    1446     1: a callable object f(t,x,y), where x and y are vectors
    1447        and that it returns an array or a list of same length
    1448        as x and y
    1449     2: a scalar
    1450     """
    1451 
    1452     if callable(f):
    1453         N = 3
    1454         x = num.ones(3, num.float)
    1455         y = num.ones(3, num.float)
    1456         try:
    1457             q = f(1.0, x=x, y=y)
    1458         except Exception, e:
    1459             msg = 'Function %s could not be executed:\n%s' %(f, e)
    1460             # FIXME: Reconsider this semantics
    1461             raise Exception, msg
    1462 
    1463         try:
    1464             q = num.array(q, num.float)
    1465         except:
    1466             msg = ('Return value from vector function %s could not '
    1467                    'be converted into a numeric array of floats.\nSpecified '
    1468                    'function should return either list or array.' % f)
    1469             raise Exception, msg
    1470 
    1471         # Is this really what we want?
    1472         # info is "(func name, filename, defining line)"
    1473         func_info = (f.func_name, f.func_code.co_filename,
    1474                      f.func_code.co_firstlineno)
    1475         func_msg = 'Function %s (defined in %s, line %d)' % func_info
    1476         try:
    1477             result_len = len(q)
    1478         except:
    1479             msg = '%s must return vector' % func_msg
    1480             self.fail(msg)
    1481         msg = '%s must return vector of length %d' % (func_msg, N)
    1482         assert result_len == N, msg
    1483     else:
    1484         try:
    1485             f = float(f)
    1486         except:
    1487             msg = ('Force field %s must be a scalar value coercible to float.'
    1488                    % str(f))
    1489             raise Exception, msg
    1490 
    1491     return f
    1492 
    1493 
    1494 ##
    1495 # Class to apply a wind stress to a domain.
    1496 class Wind_stress:
    1497     """Apply wind stress to water momentum in terms of
    1498     wind speed [m/s] and wind direction [degrees]
    1499     """
    1500 
    1501     ##
    1502     # @brief Create an instance of Wind_stress.
    1503     # @param *args
    1504     # @param **kwargs
    1505     def __init__(self, *args, **kwargs):
    1506         """Initialise windfield from wind speed s [m/s]
    1507         and wind direction phi [degrees]
    1508 
    1509         Inputs v and phi can be either scalars or Python functions, e.g.
    1510 
    1511         W = Wind_stress(10, 178)
    1512 
    1513         #FIXME - 'normal' degrees are assumed for now, i.e. the
    1514         vector (1,0) has zero degrees.
    1515         We may need to convert from 'compass' degrees later on and also
    1516         map from True north to grid north.
    1517 
    1518         Arguments can also be Python functions of t,x,y as in
    1519 
    1520         def speed(t,x,y):
    1521             ...
    1522             return s
    1523 
    1524         def angle(t,x,y):
    1525             ...
    1526             return phi
    1527 
    1528         where x and y are vectors.
    1529 
    1530         and then pass the functions in
    1531 
    1532         W = Wind_stress(speed, angle)
    1533 
    1534         The instantiated object W can be appended to the list of
    1535         forcing_terms as in
    1536 
    1537         Alternatively, one vector valued function for (speed, angle)
    1538         can be applied, providing both quantities simultaneously.
    1539         As in
    1540         W = Wind_stress(F), where returns (speed, angle) for each t.
    1541 
    1542         domain.forcing_terms.append(W)
    1543         """
    1544 
    1545         from anuga.config import rho_a, rho_w, eta_w
    1546 
    1547         if len(args) == 2:
    1548             s = args[0]
    1549             phi = args[1]
    1550         elif len(args) == 1:
    1551             # Assume vector function returning (s, phi)(t,x,y)
    1552             vector_function = args[0]
    1553             s = lambda t,x,y: vector_function(t,x=x,y=y)[0]
    1554             phi = lambda t,x,y: vector_function(t,x=x,y=y)[1]
    1555         else:
    1556            # Assume info is in 2 keyword arguments
    1557            if len(kwargs) == 2:
    1558                s = kwargs['s']
    1559                phi = kwargs['phi']
    1560            else:
    1561                raise Exception, 'Assumes two keyword arguments: s=..., phi=....'
    1562 
    1563         self.speed = check_forcefield(s)
    1564         self.phi = check_forcefield(phi)
    1565 
    1566         self.const = eta_w*rho_a/rho_w
    1567 
    1568     ##
    1569     # @brief 'execute' this class instance.
    1570     # @param domain
    1571     def __call__(self, domain):
    1572         """Evaluate windfield based on values found in domain"""
    1573 
    1574         from math import pi, cos, sin, sqrt
    1575 
    1576         xmom_update = domain.quantities['xmomentum'].explicit_update
    1577         ymom_update = domain.quantities['ymomentum'].explicit_update
    1578 
    1579         N = len(domain)    # number_of_triangles
    1580         t = domain.time
    1581 
    1582         if callable(self.speed):
    1583             xc = domain.get_centroid_coordinates()
    1584             s_vec = self.speed(t, xc[:,0], xc[:,1])
    1585         else:
    1586             # Assume s is a scalar
    1587             try:
    1588                 s_vec = self.speed * num.ones(N, num.float)
    1589             except:
    1590                 msg = 'Speed must be either callable or a scalar: %s' %self.s
    1591                 raise msg
    1592 
    1593         if callable(self.phi):
    1594             xc = domain.get_centroid_coordinates()
    1595             phi_vec = self.phi(t, xc[:,0], xc[:,1])
    1596         else:
    1597             # Assume phi is a scalar
    1598 
    1599             try:
    1600                 phi_vec = self.phi * num.ones(N, num.float)
    1601             except:
    1602                 msg = 'Angle must be either callable or a scalar: %s' %self.phi
    1603                 raise msg
    1604 
    1605         assign_windfield_values(xmom_update, ymom_update,
    1606                                 s_vec, phi_vec, self.const)
    1607 
    1608 
    1609 ##
    1610 # @brief Assign wind field values
    1611 # @param xmom_update
    1612 # @param ymom_update
    1613 # @param s_vec
    1614 # @param phi_vec
    1615 # @param const
    1616 def assign_windfield_values(xmom_update, ymom_update,
    1617                             s_vec, phi_vec, const):
    1618     """Python version of assigning wind field to update vectors.
    1619     A C version also exists (for speed)
    1620     """
    1621 
    1622     from math import pi, cos, sin, sqrt
    1623 
    1624     N = len(s_vec)
    1625     for k in range(N):
    1626         s = s_vec[k]
    1627         phi = phi_vec[k]
    1628 
    1629         # Convert to radians
    1630         phi = phi*pi/180
    1631 
    1632         # Compute velocity vector (u, v)
    1633         u = s*cos(phi)
    1634         v = s*sin(phi)
    1635 
    1636         # Compute wind stress
    1637         S = const * sqrt(u**2 + v**2)
    1638         xmom_update[k] += S*u
    1639         ymom_update[k] += S*v
    1640 
    1641 
    1642 ##
    1643 # @brief A class for a general explicit forcing term.
    1644 class General_forcing:
    1645     """General explicit forcing term for update of quantity
    1646 
    1647     This is used by Inflow and Rainfall for instance
    1648 
    1649 
    1650     General_forcing(quantity_name, rate, center, radius, polygon)
    1651 
    1652     domain:     ANUGA computational domain
    1653     quantity_name: Name of quantity to update.
    1654                    It must be a known conserved quantity.
    1655 
    1656     rate [?/s]: Total rate of change over the specified area.
    1657                 This parameter can be either a constant or a
    1658                 function of time. Positive values indicate increases,
    1659                 negative values indicate decreases.
    1660                 Rate can be None at initialisation but must be specified
    1661                 before forcing term is applied (i.e. simulation has started).
    1662 
    1663     center [m]: Coordinates at center of flow point
    1664     radius [m]: Size of circular area
    1665     polygon:    Arbitrary polygon
    1666     default_rate: Rate to be used if rate fails (e.g. if model time exceeds its data)
    1667                   Admissible types: None, constant number or function of t
    1668 
    1669 
    1670     Either center, radius or polygon can be specified but not both.
    1671     If neither are specified the entire domain gets updated.
    1672     All coordinates to be specified in absolute UTM coordinates (x, y) assuming the zone of domain.
    1673 
    1674     Inflow or Rainfall for examples of use
    1675     """
    1676 
    1677 
    1678     # FIXME (AnyOne) : Add various methods to allow spatial variations
    1679 
    1680     ##
    1681     # @brief Create an instance of this forcing term.
    1682     # @param domain
    1683     # @param quantity_name
    1684     # @param rate
    1685     # @param center
    1686     # @param radius
    1687     # @param polygon
    1688     # @param default_rate
    1689     # @param verbose
    1690     def __init__(self,
    1691                  domain,
    1692                  quantity_name,
    1693                  rate=0.0,
    1694                  center=None,
    1695                  radius=None,
    1696                  polygon=None,
    1697                  default_rate=None,
    1698                  verbose=False):
    1699 
    1700         from math import pi, cos, sin
    1701 
    1702         if center is None:
    1703             msg = 'I got radius but no center.'
    1704             assert radius is None, msg
    1705 
    1706         if radius is None:
    1707             msg += 'I got center but no radius.'
    1708             assert center is None, msg
    1709 
    1710         self.domain = domain
    1711         self.quantity_name = quantity_name
    1712         self.rate = rate
    1713         self.center = ensure_numeric(center)
    1714         self.radius = radius
    1715         self.polygon = polygon
    1716         self.verbose = verbose
    1717         self.value = 0.0    # Can be used to remember value at
    1718                             # previous timestep in order to obtain rate
    1719 
    1720         # Get boundary (in absolute coordinates)
    1721         bounding_polygon = domain.get_boundary_polygon()
    1722 
    1723         # Update area if applicable
    1724         if center is not None and radius is not None:
    1725             assert len(center) == 2
    1726             msg = 'Polygon cannot be specified when center and radius are'
    1727             assert polygon is None, msg
    1728 
    1729             # Check that circle center lies within the mesh.
    1730             msg = 'Center %s specified for forcing term did not' % str(center)
    1731             msg += 'fall within the domain boundary.'
    1732             assert is_inside_polygon(center, bounding_polygon), msg
    1733 
    1734             # Check that circle periphery lies within the mesh.
    1735             N = 100
    1736             periphery_points = []
    1737             for i in range(N):
    1738                 theta = 2*pi*i/100
    1739 
    1740                 x = center[0] + radius*cos(theta)
    1741                 y = center[1] + radius*sin(theta)
    1742 
    1743                 periphery_points.append([x,y])
    1744 
    1745             for point in periphery_points:
    1746                 msg = 'Point %s on periphery for forcing term' % str(point)
    1747                 msg += ' did not fall within the domain boundary.'
    1748                 assert is_inside_polygon(point, bounding_polygon), msg
    1749 
    1750         if polygon is not None:
    1751             # Check that polygon lies within the mesh.
    1752             for point in self.polygon:
    1753                 msg = 'Point %s in polygon for forcing term' % str(point)
    1754                 msg += ' did not fall within the domain boundary.'
    1755                 assert is_inside_polygon(point, bounding_polygon), msg
    1756 
    1757         # Pointer to update vector
    1758         self.update = domain.quantities[self.quantity_name].explicit_update
    1759 
    1760         # Determine indices in flow area
    1761         N = len(domain)
    1762         points = domain.get_centroid_coordinates(absolute=True)
    1763 
    1764         # Calculate indices in exchange area for this forcing term
    1765         self.exchange_indices = None
    1766         if self.center is not None and self.radius is not None:
    1767             # Inlet is circular
    1768             inlet_region = 'center=%s, radius=%s' % (self.center, self.radius)
    1769 
    1770             self.exchange_indices = []
    1771             for k in range(N):
    1772                 x, y = points[k,:]    # Centroid
    1773 
    1774                 c = self.center
    1775                 if ((x-c[0])**2+(y-c[1])**2) < self.radius**2:
    1776                     self.exchange_indices.append(k)
    1777 
    1778         if self.polygon is not None:
    1779             # Inlet is polygon
    1780             inlet_region = 'polygon=%s' % (self.polygon)
    1781             self.exchange_indices = inside_polygon(points, self.polygon)
    1782 
    1783         if self.exchange_indices is None:
    1784             self.exchange_area = polygon_area(bounding_polygon)
    1785         else:   
    1786             if len(self.exchange_indices) == 0:
    1787                 msg = 'No triangles have been identified in '
    1788                 msg += 'specified region: %s' % inlet_region
    1789                 raise Exception, msg
    1790 
    1791             # Compute exchange area as the sum of areas of triangles identified
    1792             # by circle or polygon
    1793             self.exchange_area = 0.0
    1794             for i in self.exchange_indices:
    1795                 self.exchange_area += domain.areas[i]
    1796            
    1797 
    1798         msg = 'Exchange area in forcing term'
    1799         msg += ' has area = %f' %self.exchange_area
    1800         assert self.exchange_area > 0.0           
    1801            
    1802                
    1803 
    1804            
    1805         # Check and store default_rate
    1806         msg = ('Keyword argument default_rate must be either None '
    1807                'or a function of time.\nI got %s.' % str(default_rate))
    1808         assert (default_rate is None or
    1809                 type(default_rate) in [IntType, FloatType] or
    1810                 callable(default_rate)), msg
    1811 
    1812         if default_rate is not None:
    1813             # If it is a constant, make it a function
    1814             if not callable(default_rate):
    1815                 tmp = default_rate
    1816                 default_rate = lambda t: tmp
    1817 
    1818             # Check that default_rate is a function of one argument
    1819             try:
    1820                 default_rate(0.0)
    1821             except:
    1822                 raise Exception, msg
    1823 
    1824         self.default_rate = default_rate
    1825         self.default_rate_invoked = False    # Flag
    1826 
    1827     ##
    1828     # @brief Execute this instance.
    1829     # @param domain
    1830     def __call__(self, domain):
    1831         """Apply inflow function at time specified in domain, update stage"""
    1832 
    1833         # Call virtual method allowing local modifications
    1834         t = domain.get_time()
    1835         try:
    1836             rate = self.update_rate(t)
    1837         except Modeltime_too_early, e:
    1838             raise Modeltime_too_early, e
    1839         except Modeltime_too_late, e:
    1840             if self.default_rate is None:
    1841                 msg = '%s: ANUGA is trying to run longer than specified data.\n' %str(e)
    1842                 msg += 'You can specify keyword argument default_rate in the '
    1843                 msg += 'forcing function to tell it what to do in the absence of time data.'
    1844                 raise Modeltime_too_late, msg   
    1845             else:
    1846                 # Pass control to default rate function
    1847                 rate = self.default_rate(t)
    1848 
    1849                 if self.default_rate_invoked is False:
    1850                     # Issue warning the first time
    1851                     msg = ('%s\n'
    1852                            'Instead I will use the default rate: %s\n'
    1853                            'Note: Further warnings will be supressed'
    1854                            % (str(e), str(self.default_rate)))
    1855                     warn(msg)
    1856 
    1857                     # FIXME (Ole): Replace this crude flag with
    1858                     # Python's ability to print warnings only once.
    1859                     # See http://docs.python.org/lib/warning-filter.html
    1860                     self.default_rate_invoked = True
    1861 
    1862         if rate is None:
    1863             msg = ('Attribute rate must be specified in General_forcing '
    1864                    'or its descendants before attempting to call it')
    1865             raise Exception, msg
    1866 
    1867         # Now rate is a number
    1868         if self.verbose is True:
    1869             log.critical('Rate of %s at time = %.2f = %f'
    1870                          % (self.quantity_name, domain.get_time(), rate))
    1871 
    1872         if self.exchange_indices is None:
    1873             self.update[:] += rate
    1874         else:
    1875             # Brute force assignment of restricted rate
    1876             for k in self.exchange_indices:
    1877                 self.update[k] += rate
    1878 
    1879     ##
    1880     # @brief Update the internal rate.
    1881     # @param t A callable or scalar used to set the rate.
    1882     # @return The new rate.
    1883     def update_rate(self, t):
    1884         """Virtual method allowing local modifications by writing an
    1885         overriding version in descendant
    1886         """
    1887 
    1888         if callable(self.rate):
    1889             rate = self.rate(t)
    1890         else:
    1891             rate = self.rate
    1892 
    1893         return rate
    1894 
    1895     ##
    1896     # @brief Get values for the specified quantity.
    1897     # @param quantity_name Name of the quantity of interest.
    1898     # @return The value(s) of the quantity.
    1899     # @note If 'quantity_name' is None, use self.quantity_name.
    1900     def get_quantity_values(self, quantity_name=None):
    1901         """Return values for specified quantity restricted to opening
    1902 
    1903         Optionally a quantity name can be specified if values from another
    1904         quantity is sought
    1905         """
    1906 
    1907         if quantity_name is None:
    1908             quantity_name = self.quantity_name
    1909 
    1910         q = self.domain.quantities[quantity_name]
    1911         return q.get_values(location='centroids',
    1912                             indices=self.exchange_indices)
    1913 
    1914     ##
    1915     # @brief Set value for the specified quantity.
    1916     # @param val The value object used to set value.
    1917     # @param quantity_name Name of the quantity of interest.
    1918     # @note If 'quantity_name' is None, use self.quantity_name.
    1919     def set_quantity_values(self, val, quantity_name=None):
    1920         """Set values for specified quantity restricted to opening
    1921 
    1922         Optionally a quantity name can be specified if values from another
    1923         quantity is sought
    1924         """
    1925 
    1926         if quantity_name is None:
    1927             quantity_name = self.quantity_name
    1928 
    1929         q = self.domain.quantities[self.quantity_name]
    1930         q.set_values(val,
    1931                      location='centroids',
    1932                      indices=self.exchange_indices)
    1933 
    1934 
    1935 ##
    1936 # @brief A class for rainfall forcing function.
    1937 # @note Inherits from General_forcing.
    1938 class Rainfall(General_forcing):
    1939     """Class Rainfall - general 'rain over entire domain' forcing term.
    1940 
    1941     Used for implementing Rainfall over the entire domain.
    1942 
    1943         Current Limited to only One Gauge..
    1944 
    1945         Need to add Spatial Varying Capability
    1946         (This module came from copying and amending the Inflow Code)
    1947 
    1948     Rainfall(rain)
    1949 
    1950     domain
    1951     rain [mm/s]:  Total rain rate over the specified domain.
    1952                   NOTE: Raingauge Data needs to reflect the time step.
    1953                   IE: if Gauge is mm read at a time step, then the input
    1954                   here is as mm/(timeStep) so 10mm in 5minutes becomes
    1955                   10/(5x60) = 0.0333mm/s.
    1956 
    1957                   This parameter can be either a constant or a
    1958                   function of time. Positive values indicate inflow,
    1959                   negative values indicate outflow.
    1960                   (and be used for Infiltration - Write Seperate Module)
    1961                   The specified flow will be divided by the area of
    1962                   the inflow region and then applied to update the
    1963                   stage quantity.
    1964 
    1965     polygon: Specifies a polygon to restrict the rainfall.
    1966 
    1967     Examples
    1968     How to put them in a run File...
    1969 
    1970     #------------------------------------------------------------------------
    1971     # Setup specialised forcing terms
    1972     #------------------------------------------------------------------------
    1973     # This is the new element implemented by Ole and Rudy to allow direct
    1974     # input of Rainfall in mm/s
    1975 
    1976     catchmentrainfall = Rainfall(rain=file_function('Q100_2hr_Rain.tms'))
    1977                         # Note need path to File in String.
    1978                         # Else assumed in same directory
    1979 
    1980     domain.forcing_terms.append(catchmentrainfall)
    1981     """
    1982 
    1983     ##
    1984     # @brief Create an instance of the class.
    1985     # @param domain Domain of interest.
    1986     # @param rate Total rain rate over the specified domain (mm/s).
    1987     # @param center
    1988     # @param radius
    1989     # @param polygon Polygon  to restrict rainfall.
    1990     # @param default_rate
    1991     # @param verbose True if this instance is to be verbose.
    1992     def __init__(self,
    1993                  domain,
    1994                  rate=0.0,
    1995                  center=None,
    1996                  radius=None,
    1997                  polygon=None,
    1998                  default_rate=None,
    1999                  verbose=False):
    2000 
    2001         # Converting mm/s to m/s to apply in ANUGA)
    2002         if callable(rate):
    2003             rain = lambda t: rate(t)/1000.0
    2004         else:
    2005             rain = rate/1000.0
    2006 
    2007         if default_rate is not None:
    2008             if callable(default_rate):
    2009                 default_rain = lambda t: default_rate(t)/1000.0
    2010             else:
    2011                 default_rain = default_rate/1000.0
    2012         else:
    2013             default_rain = None
    2014 
    2015            
    2016            
    2017         General_forcing.__init__(self,
    2018                                  domain,
    2019                                  'stage',
    2020                                  rate=rain,
    2021                                  center=center,
    2022                                  radius=radius,
    2023                                  polygon=polygon,
    2024                                  default_rate=default_rain,
    2025                                  verbose=verbose)
    2026 
    2027 
    2028 ##
    2029 # @brief A class for inflow (rain and drain) forcing function.
    2030 # @note Inherits from General_forcing.
    2031 class Inflow(General_forcing):
    2032     """Class Inflow - general 'rain and drain' forcing term.
    2033 
    2034     Useful for implementing flows in and out of the domain.
    2035 
    2036     Inflow(flow, center, radius, polygon)
    2037 
    2038     domain
    2039     rate [m^3/s]: Total flow rate over the specified area.
    2040                   This parameter can be either a constant or a
    2041                   function of time. Positive values indicate inflow,
    2042                   negative values indicate outflow.
    2043                   The specified flow will be divided by the area of
    2044                   the inflow region and then applied to update stage.
    2045     center [m]: Coordinates at center of flow point
    2046     radius [m]: Size of circular area
    2047     polygon:    Arbitrary polygon.
    2048 
    2049     Either center, radius or polygon must be specified
    2050 
    2051     Examples
    2052 
    2053     # Constant drain at 0.003 m^3/s.
    2054     # The outflow area is 0.07**2*pi=0.0154 m^2
    2055     # This corresponds to a rate of change of 0.003/0.0154 = 0.2 m/s
    2056     #
    2057     Inflow((0.7, 0.4), 0.07, -0.003)
    2058 
    2059 
    2060     # Tap turning up to a maximum inflow of 0.0142 m^3/s.
    2061     # The inflow area is 0.03**2*pi = 0.00283 m^2
    2062     # This corresponds to a rate of change of 0.0142/0.00283 = 5 m/s
    2063     # over the specified area
    2064     Inflow((0.5, 0.5), 0.03, lambda t: min(0.01*t, 0.0142))
    2065 
    2066 
    2067     #------------------------------------------------------------------------
    2068     # Setup specialised forcing terms
    2069     #------------------------------------------------------------------------
    2070     # This is the new element implemented by Ole to allow direct input
    2071     # of Inflow in m^3/s
    2072 
    2073     hydrograph = Inflow(center=(320, 300), radius=10,
    2074                         rate=file_function('Q/QPMF_Rot_Sub13.tms'))
    2075 
    2076     domain.forcing_terms.append(hydrograph)
    2077     """
    2078 
    2079     ##
    2080     # @brief Create an instance of the class.
    2081     # @param domain Domain of interest.
    2082     # @param rate Total rain rate over the specified domain (mm/s).
    2083     # @param center
    2084     # @param radius
    2085     # @param polygon Polygon  to restrict rainfall.
    2086     # @param default_rate
    2087     # @param verbose True if this instance is to be verbose.
    2088     def __init__(self,
    2089                  domain,
    2090                  rate=0.0,
    2091                  center=None,
    2092                  radius=None,
    2093                  polygon=None,
    2094                  default_rate=None,
    2095                  verbose=False):
    2096         # Create object first to make area is available
    2097         General_forcing.__init__(self,
    2098                                  domain,
    2099                                  'stage',
    2100                                  rate=rate,
    2101                                  center=center,
    2102                                  radius=radius,
    2103                                  polygon=polygon,
    2104                                  default_rate=default_rate,
    2105                                  verbose=verbose)
    2106 
    2107     ##
    2108     # @brief Update the instance rate.
    2109     # @param t New rate object.
    2110     def update_rate(self, t):
    2111         """Virtual method allowing local modifications by writing an
    2112         overriding version in descendant
    2113 
    2114         This one converts m^3/s to m/s which can be added directly
    2115         to 'stage' in ANUGA
    2116         """
    2117 
    2118         if callable(self.rate):
    2119             _rate = self.rate(t)/self.exchange_area
    2120         else:
    2121             _rate = self.rate/self.exchange_area
    2122 
    2123         return _rate
    2124 
    2125 
    2126 ##
    2127 # @brief A class for creating cross sections.
    2128 # @note Inherits from General_forcing.
    2129 class Cross_section:
    2130     """Class Cross_section - a class to setup a cross section from
    2131     which you can then calculate flow and energy through cross section
    2132 
    2133 
    2134     Cross_section(domain, polyline)
    2135 
    2136     domain:
    2137     polyline: Representation of desired cross section - it may contain
    2138               multiple sections allowing for complex shapes. Assume
    2139               absolute UTM coordinates.
    2140               Format [[x0, y0], [x1, y1], ...]
    2141     verbose:
    2142     """
    2143 
    2144     ##
    2145     # @brief Create an instance of the class.
    2146     # @param domain Domain of interest.
    2147     # @param polyline Polyline defining cross section
    2148     # @param verbose True if this instance is to be verbose.
    2149     def __init__(self,
    2150                  domain,
    2151                  polyline=None,
    2152                  verbose=False):
    2153        
    2154         self.domain = domain
    2155         self.polyline = polyline
    2156         self.verbose = verbose
    2157        
    2158         # Find all intersections and associated triangles.
    2159         self.segments = self.domain.get_intersecting_segments(self.polyline,
    2160                                                               use_cache=True,
    2161                                                               verbose=self.verbose)
    2162        
    2163         # Get midpoints
    2164         self.midpoints = segment_midpoints(self.segments)
    2165 
    2166         # Make midpoints Geospatial instances
    2167         self.midpoints = ensure_geospatial(self.midpoints, self.domain.geo_reference)
    2168 
    2169     ##
    2170     # @brief set verbose mode
    2171     def set_verbose(self,verbose=True):
    2172         """Set verbose mode true or flase
    2173         """
    2174 
    2175         self.verbose=verbose
    2176 
    2177     ##
    2178     # @brief calculate current flow through cross section
    2179     def get_flow_through_cross_section(self):
    2180         """ Output: Total flow [m^3/s] across cross section.
    2181         """
    2182 
    2183         # Get interpolated values
    2184         xmomentum = self.domain.get_quantity('xmomentum')
    2185         ymomentum = self.domain.get_quantity('ymomentum')
    2186 
    2187         uh = xmomentum.get_values(interpolation_points=self.midpoints,
    2188                                   use_cache=True)
    2189         vh = ymomentum.get_values(interpolation_points=self.midpoints,
    2190                                   use_cache=True)
    2191 
    2192         # Compute and sum flows across each segment
    2193         total_flow = 0
    2194         for i in range(len(uh)):
    2195             # Inner product of momentum vector with segment normal [m^2/s]
    2196             normal = self.segments[i].normal
    2197             normal_momentum = uh[i]*normal[0] + vh[i]*normal[1]
    2198 
    2199             # Flow across this segment [m^3/s]
    2200             segment_flow = normal_momentum*self.segments[i].length
    2201 
    2202             # Accumulate
    2203             total_flow += segment_flow
    2204 
    2205         return total_flow
    2206  
    2207 
    2208     ##
    2209     # @brief calculate current energy flow through cross section
    2210     def get_energy_through_cross_section(self, kind='total'):
    2211         """Obtain average energy head [m] across specified cross section.
    2212 
    2213         Output:
    2214             E: Average energy [m] across given segments for all stored times.
    2215 
    2216         The average velocity is computed for each triangle intersected by
    2217         the polyline and averaged weighted by segment lengths.
    2218 
    2219         The typical usage of this function would be to get average energy of
    2220         flow in a channel, and the polyline would then be a cross section
    2221         perpendicular to the flow.
    2222 
    2223         #FIXME (Ole) - need name for this energy reflecting that its dimension
    2224         is [m].
    2225         """
    2226 
    2227         from anuga.config import g, epsilon, velocity_protection as h0
    2228        
    2229         # Get interpolated values
    2230         stage = self.domain.get_quantity('stage')
    2231         elevation = self.domain.get_quantity('elevation')
    2232         xmomentum = self.domain.get_quantity('xmomentum')
    2233         ymomentum = self.domain.get_quantity('ymomentum')
    2234 
    2235         w = stage.get_values(interpolation_points=self.midpoints, use_cache=True)
    2236         z = elevation.get_values(interpolation_points=self.midpoints, use_cache=True)
    2237         uh = xmomentum.get_values(interpolation_points=self.midpoints,
    2238                                   use_cache=True)
    2239         vh = ymomentum.get_values(interpolation_points=self.midpoints,
    2240                                   use_cache=True)
    2241         h = w-z                # Depth
    2242 
    2243         # Compute total length of polyline for use with weighted averages
    2244         total_line_length = 0.0
    2245         for segment in self.segments:
    2246             total_line_length += segment.length
    2247 
    2248         # Compute and sum flows across each segment
    2249         average_energy = 0.0
    2250         for i in range(len(w)):
    2251             # Average velocity across this segment
    2252             if h[i] > epsilon:
    2253                 # Use protection against degenerate velocities
    2254                 u = uh[i]/(h[i] + h0/h[i])
    2255                 v = vh[i]/(h[i] + h0/h[i])
    2256             else:
    2257                 u = v = 0.0
    2258 
    2259             speed_squared = u*u + v*v
    2260             kinetic_energy = 0.5*speed_squared/g
    2261 
    2262             if kind == 'specific':
    2263                 segment_energy = h[i] + kinetic_energy
    2264             elif kind == 'total':
    2265                 segment_energy = w[i] + kinetic_energy
    2266             else:
    2267                 msg = 'Energy kind must be either "specific" or "total".'
    2268                 msg += ' I got %s' %kind
    2269 
    2270             # Add to weighted average
    2271             weigth = self.segments[i].length/total_line_length
    2272             average_energy += segment_energy*weigth
    2273 
    2274         return average_energy
    2275 
    2276 
    22771422
    22781423################################################################################
  • anuga_core/source/anuga/shallow_water/test_shallow_water_domain.py

    r7731 r7733  
    55from math import pi, sqrt
    66import tempfile
     7
     8from anuga.shallow_water import Domain
    79
    810from anuga.config import g, epsilon
     
    1416from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
    1517from anuga.abstract_2d_finite_volumes.quantity import Quantity
     18from anuga.shallow_water.forcing import Inflow, Cross_section
     19from anuga.geospatial_data.geospatial_data import ensure_geospatial
    1620
    1721from anuga.utilities.system_tools import get_pathname_from_package
    1822
    19 from anuga.shallow_water import Domain
    2023from anuga.abstract_2d_finite_volumes.generic_boundary_conditions \
    2124        import Dirichlet_boundary
    22 from anuga.shallow_water.shallow_water_domain import Rainfall, Wind_stress
    23 from anuga.shallow_water.shallow_water_domain import Inflow, Cross_section
     25from anuga.shallow_water.forcing import Rainfall, Wind_stress
     26from anuga.shallow_water.forcing import Inflow, Cross_section
    2427from anuga.shallow_water.data_manager import get_flow_through_cross_section
    2528
     
    171174
    172175
    173 # Variable windfield implemented using functions
    174 def speed(t, x, y):
    175     """Large speeds halfway between center and edges
    176 
    177     Low speeds at center and edges
    178     """
    179 
    180     from math import exp, cos, pi
    181 
    182     x = num.array(x)
    183     y = num.array(y)
    184 
    185     N = len(x)
    186     s = 0*x  #New array
    187 
    188     for k in range(N):
    189         r = num.sqrt(x[k]**2 + y[k]**2)
    190         factor = exp(-(r-0.15)**2)
    191         s[k] = 4000 * factor * (cos(t*2*pi/150) + 2)
    192 
    193     return s
    194176
    195177def scalar_func(t, x, y):
     
    201183    return 17.7
    202184
    203 def scalar_func_list(t, x, y):
    204     """Function that returns a scalar.
    205 
    206     Used to test error message when numeric array is expected
    207     """
    208 
    209     return [17.7]
    210 
    211 def angle(t, x, y):
    212     """Rotating field
    213     """
    214     from math import atan, pi
    215 
    216     x = num.array(x)
    217     y = num.array(y)
    218 
    219     N = len(x)
    220     a = 0 * x    # New array
    221 
    222     for k in range(N):
    223         r = num.sqrt(x[k]**2 + y[k]**2)
    224 
    225         angle = atan(y[k]/x[k])
    226 
    227         if x[k] < 0:
    228             angle += pi
    229 
    230         # Take normal direction
    231         angle -= pi/2
    232 
    233         # Ensure positive radians
    234         if angle < 0:
    235             angle += 2*pi
    236 
    237         a[k] = angle/pi*180
    238 
    239     return a
    240185
    241186
     
    22352180                            4*S)
    22362181       
    2237     def test_constant_wind_stress(self):
    2238         from anuga.config import rho_a, rho_w, eta_w
    2239         from math import pi, cos, sin
    2240 
    2241         a = [0.0, 0.0]
    2242         b = [0.0, 2.0]
    2243         c = [2.0, 0.0]
    2244         d = [0.0, 4.0]
    2245         e = [2.0, 2.0]
    2246         f = [4.0, 0.0]
    2247 
    2248         points = [a, b, c, d, e, f]
    2249         #             bac,     bce,     ecf,     dbe
    2250         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2251 
    2252         domain = Domain(points, vertices)
    2253 
    2254         #Flat surface with 1m of water
    2255         domain.set_quantity('elevation', 0)
    2256         domain.set_quantity('stage', 1.0)
    2257         domain.set_quantity('friction', 0)
    2258 
    2259         Br = Reflective_boundary(domain)
    2260         domain.set_boundary({'exterior': Br})
    2261 
    2262         #Setup only one forcing term, constant wind stress
    2263         s = 100
    2264         phi = 135
    2265         domain.forcing_terms = []
    2266         domain.forcing_terms.append(Wind_stress(s, phi))
    2267 
    2268         domain.compute_forcing_terms()
    2269 
    2270         const = eta_w*rho_a / rho_w
    2271 
    2272         #Convert to radians
    2273         phi = phi*pi / 180
    2274 
    2275         #Compute velocity vector (u, v)
    2276         u = s*cos(phi)
    2277         v = s*sin(phi)
    2278 
    2279         #Compute wind stress
    2280         S = const * num.sqrt(u**2 + v**2)
    2281 
    2282         assert num.allclose(domain.quantities['stage'].explicit_update, 0)
    2283         assert num.allclose(domain.quantities['xmomentum'].explicit_update, S*u)
    2284         assert num.allclose(domain.quantities['ymomentum'].explicit_update, S*v)
    2285 
    2286     def test_variable_wind_stress(self):
    2287         from anuga.config import rho_a, rho_w, eta_w
    2288         from math import pi, cos, sin
    2289 
    2290         a = [0.0, 0.0]
    2291         b = [0.0, 2.0]
    2292         c = [2.0, 0.0]
    2293         d = [0.0, 4.0]
    2294         e = [2.0, 2.0]
    2295         f = [4.0, 0.0]
    2296 
    2297         points = [a, b, c, d, e, f]
    2298         #             bac,     bce,     ecf,     dbe
    2299         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2300 
    2301         domain = Domain(points, vertices)
    2302 
    2303         #Flat surface with 1m of water
    2304         domain.set_quantity('elevation', 0)
    2305         domain.set_quantity('stage', 1.0)
    2306         domain.set_quantity('friction', 0)
    2307 
    2308         Br = Reflective_boundary(domain)
    2309         domain.set_boundary({'exterior': Br})
    2310 
    2311         domain.time = 5.54    # Take a random time (not zero)
    2312 
    2313         #Setup only one forcing term, constant wind stress
    2314         s = 100
    2315         phi = 135
    2316         domain.forcing_terms = []
    2317         domain.forcing_terms.append(Wind_stress(s=speed, phi=angle))
    2318 
    2319         domain.compute_forcing_terms()
    2320 
    2321         #Compute reference solution
    2322         const = eta_w*rho_a / rho_w
    2323 
    2324         N = len(domain)    # number_of_triangles
    2325 
    2326         xc = domain.get_centroid_coordinates()
    2327         t = domain.time
    2328 
    2329         x = xc[:,0]
    2330         y = xc[:,1]
    2331         s_vec = speed(t,x,y)
    2332         phi_vec = angle(t,x,y)
    2333 
    2334         for k in range(N):
    2335             # Convert to radians
    2336             phi = phi_vec[k]*pi / 180
    2337             s = s_vec[k]
    2338 
    2339             # Compute velocity vector (u, v)
    2340             u = s*cos(phi)
    2341             v = s*sin(phi)
    2342 
    2343             # Compute wind stress
    2344             S = const * num.sqrt(u**2 + v**2)
    2345 
    2346             assert num.allclose(domain.quantities['stage'].explicit_update[k],
    2347                                 0)
    2348             assert num.allclose(domain.quantities['xmomentum'].\
    2349                                      explicit_update[k],
    2350                                 S*u)
    2351             assert num.allclose(domain.quantities['ymomentum'].\
    2352                                      explicit_update[k],
    2353                                 S*v)
    2354 
    2355     def test_windfield_from_file(self):
    2356         import time
    2357         from anuga.config import rho_a, rho_w, eta_w
    2358         from math import pi, cos, sin
    2359         from anuga.config import time_format
    2360         from anuga.abstract_2d_finite_volumes.util import file_function
    2361 
    2362         a = [0.0, 0.0]
    2363         b = [0.0, 2.0]
    2364         c = [2.0, 0.0]
    2365         d = [0.0, 4.0]
    2366         e = [2.0, 2.0]
    2367         f = [4.0, 0.0]
    2368 
    2369         points = [a, b, c, d, e, f]
    2370         #             bac,     bce,     ecf,     dbe
    2371         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2372 
    2373         domain = Domain(points, vertices)
    2374 
    2375         # Flat surface with 1m of water
    2376         domain.set_quantity('elevation', 0)
    2377         domain.set_quantity('stage', 1.0)
    2378         domain.set_quantity('friction', 0)
    2379 
    2380         Br = Reflective_boundary(domain)
    2381         domain.set_boundary({'exterior': Br})
    2382 
    2383         domain.time = 7    # Take a time that is represented in file (not zero)
    2384 
    2385         # Write wind stress file (ensure that domain.time is covered)
    2386         # Take x=1 and y=0
    2387         filename = 'test_windstress_from_file'
    2388         start = time.mktime(time.strptime('2000', '%Y'))
    2389         fid = open(filename + '.txt', 'w')
    2390         dt = 1    # One second interval
    2391         t = 0.0
    2392         while t <= 10.0:
    2393             t_string = time.strftime(time_format, time.gmtime(t+start))
    2394 
    2395             fid.write('%s, %f %f\n' %
    2396                       (t_string, speed(t,[1],[0])[0], angle(t,[1],[0])[0]))
    2397             t += dt
    2398 
    2399         fid.close()
    2400 
    2401         # Convert ASCII file to NetCDF (Which is what we really like!)
    2402         from data_manager import timefile2netcdf
    2403 
    2404         timefile2netcdf(filename)
    2405         os.remove(filename + '.txt')
    2406 
    2407         # Setup wind stress
    2408         F = file_function(filename + '.tms',
    2409                           quantities=['Attribute0', 'Attribute1'])
    2410         os.remove(filename + '.tms')
    2411 
    2412         W = Wind_stress(F)
    2413 
    2414         domain.forcing_terms = []
    2415         domain.forcing_terms.append(W)
    2416 
    2417         domain.compute_forcing_terms()
    2418 
    2419         # Compute reference solution
    2420         const = eta_w*rho_a / rho_w
    2421 
    2422         N = len(domain)    # number_of_triangles
    2423 
    2424         t = domain.time
    2425 
    2426         s = speed(t, [1], [0])[0]
    2427         phi = angle(t, [1], [0])[0]
    2428 
    2429         # Convert to radians
    2430         phi = phi*pi / 180
    2431 
    2432         # Compute velocity vector (u, v)
    2433         u = s*cos(phi)
    2434         v = s*sin(phi)
    2435 
    2436         # Compute wind stress
    2437         S = const * num.sqrt(u**2 + v**2)
    2438 
    2439         for k in range(N):
    2440             assert num.allclose(domain.quantities['stage'].explicit_update[k],
    2441                                 0)
    2442             assert num.allclose(domain.quantities['xmomentum'].\
    2443                                     explicit_update[k],
    2444                                 S*u)
    2445             assert num.allclose(domain.quantities['ymomentum'].\
    2446                                     explicit_update[k],
    2447                                 S*v)
    2448 
    2449     def test_windfield_from_file_seconds(self):
    2450         import time
    2451         from anuga.config import rho_a, rho_w, eta_w
    2452         from math import pi, cos, sin
    2453         from anuga.config import time_format
    2454         from anuga.abstract_2d_finite_volumes.util import file_function
    2455 
    2456         a = [0.0, 0.0]
    2457         b = [0.0, 2.0]
    2458         c = [2.0, 0.0]
    2459         d = [0.0, 4.0]
    2460         e = [2.0, 2.0]
    2461         f = [4.0, 0.0]
    2462 
    2463         points = [a, b, c, d, e, f]
    2464         #             bac,     bce,     ecf,     dbe
    2465         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2466 
    2467         domain = Domain(points, vertices)
    2468 
    2469         # Flat surface with 1m of water
    2470         domain.set_quantity('elevation', 0)
    2471         domain.set_quantity('stage', 1.0)
    2472         domain.set_quantity('friction', 0)
    2473 
    2474         Br = Reflective_boundary(domain)
    2475         domain.set_boundary({'exterior': Br})
    2476 
    2477         domain.time = 7    # Take a time that is represented in file (not zero)
    2478 
    2479         # Write wind stress file (ensure that domain.time is covered)
    2480         # Take x=1 and y=0
    2481         filename = 'test_windstress_from_file'
    2482         start = time.mktime(time.strptime('2000', '%Y'))
    2483         fid = open(filename + '.txt', 'w')
    2484         dt = 0.5    # Half second interval
    2485         t = 0.0
    2486         while t <= 10.0:
    2487             fid.write('%s, %f %f\n'
    2488                       % (str(t), speed(t, [1], [0])[0], angle(t, [1], [0])[0]))
    2489             t += dt
    2490 
    2491         fid.close()
    2492 
    2493         # Convert ASCII file to NetCDF (Which is what we really like!)
    2494         from data_manager import timefile2netcdf
    2495 
    2496         timefile2netcdf(filename, time_as_seconds=True)
    2497         os.remove(filename + '.txt')
    2498 
    2499         # Setup wind stress
    2500         F = file_function(filename + '.tms',
    2501                           quantities=['Attribute0', 'Attribute1'])
    2502         os.remove(filename + '.tms')
    2503 
    2504         W = Wind_stress(F)
    2505 
    2506         domain.forcing_terms = []
    2507         domain.forcing_terms.append(W)
    2508 
    2509         domain.compute_forcing_terms()
    2510 
    2511         # Compute reference solution
    2512         const = eta_w*rho_a / rho_w
    2513 
    2514         N = len(domain)    # number_of_triangles
    2515 
    2516         t = domain.time
    2517 
    2518         s = speed(t, [1], [0])[0]
    2519         phi = angle(t, [1], [0])[0]
    2520 
    2521         # Convert to radians
    2522         phi = phi*pi / 180
    2523 
    2524         # Compute velocity vector (u, v)
    2525         u = s*cos(phi)
    2526         v = s*sin(phi)
    2527 
    2528         # Compute wind stress
    2529         S = const * num.sqrt(u**2 + v**2)
    2530 
    2531         for k in range(N):
    2532             assert num.allclose(domain.quantities['stage'].explicit_update[k],
    2533                                 0)
    2534             assert num.allclose(domain.quantities['xmomentum'].\
    2535                                     explicit_update[k],
    2536                                 S*u)
    2537             assert num.allclose(domain.quantities['ymomentum'].\
    2538                                     explicit_update[k],
    2539                                 S*v)
    2540 
    2541     def test_wind_stress_error_condition(self):
    2542         """Test that windstress reacts properly when forcing functions
    2543         are wrong - e.g. returns a scalar
    2544         """
    2545 
    2546         from math import pi, cos, sin
    2547         from anuga.config import rho_a, rho_w, eta_w
    2548 
    2549         a = [0.0, 0.0]
    2550         b = [0.0, 2.0]
    2551         c = [2.0, 0.0]
    2552         d = [0.0, 4.0]
    2553         e = [2.0, 2.0]
    2554         f = [4.0, 0.0]
    2555 
    2556         points = [a, b, c, d, e, f]
    2557         #             bac,     bce,     ecf,     dbe
    2558         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2559 
    2560         domain = Domain(points, vertices)
    2561 
    2562         # Flat surface with 1m of water
    2563         domain.set_quantity('elevation', 0)
    2564         domain.set_quantity('stage', 1.0)
    2565         domain.set_quantity('friction', 0)
    2566 
    2567         Br = Reflective_boundary(domain)
    2568         domain.set_boundary({'exterior': Br})
    2569 
    2570         domain.time = 5.54    # Take a random time (not zero)
    2571 
    2572         # Setup only one forcing term, bad func
    2573         domain.forcing_terms = []
    2574 
    2575         try:
    2576             domain.forcing_terms.append(Wind_stress(s=scalar_func_list,
    2577                                                     phi=angle))
    2578         except AssertionError:
    2579             pass
    2580         else:
    2581             msg = 'Should have raised exception'
    2582             raise Exception, msg
    2583 
    2584         try:
    2585             domain.forcing_terms.append(Wind_stress(s=speed, phi=scalar_func))
    2586         except Exception:
    2587             pass
    2588         else:
    2589             msg = 'Should have raised exception'
    2590             raise Exception, msg
    2591 
    2592         try:
    2593             domain.forcing_terms.append(Wind_stress(s=speed, phi='xx'))
    2594         except:
    2595             pass
    2596         else:
    2597             msg = 'Should have raised exception'
    2598             raise Exception, msg
    2599 
    2600     def test_rainfall(self):
    2601         from math import pi, cos, sin
    2602 
    2603         a = [0.0, 0.0]
    2604         b = [0.0, 2.0]
    2605         c = [2.0, 0.0]
    2606         d = [0.0, 4.0]
    2607         e = [2.0, 2.0]
    2608         f = [4.0, 0.0]
    2609 
    2610         points = [a, b, c, d, e, f]
    2611         #             bac,     bce,     ecf,     dbe
    2612         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2613 
    2614         domain = Domain(points, vertices)
    2615 
    2616         # Flat surface with 1m of water
    2617         domain.set_quantity('elevation', 0)
    2618         domain.set_quantity('stage', 1.0)
    2619         domain.set_quantity('friction', 0)
    2620 
    2621         Br = Reflective_boundary(domain)
    2622         domain.set_boundary({'exterior': Br})
    2623 
    2624         # Setup only one forcing term, constant rainfall
    2625         domain.forcing_terms = []
    2626         domain.forcing_terms.append(Rainfall(domain, rate=2.0))
    2627 
    2628         domain.compute_forcing_terms()
    2629         assert num.allclose(domain.quantities['stage'].explicit_update,
    2630                             2.0/1000)
    2631 
    2632     def test_rainfall_restricted_by_polygon(self):
    2633         from math import pi, cos, sin
    2634 
    2635         a = [0.0, 0.0]
    2636         b = [0.0, 2.0]
    2637         c = [2.0, 0.0]
    2638         d = [0.0, 4.0]
    2639         e = [2.0, 2.0]
    2640         f = [4.0, 0.0]
    2641 
    2642         points = [a, b, c, d, e, f]
    2643         #             bac,     bce,     ecf,     dbe
    2644         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2645 
    2646         domain = Domain(points, vertices)
    2647 
    2648         # Flat surface with 1m of water
    2649         domain.set_quantity('elevation', 0)
    2650         domain.set_quantity('stage', 1.0)
    2651         domain.set_quantity('friction', 0)
    2652 
    2653         Br = Reflective_boundary(domain)
    2654         domain.set_boundary({'exterior': Br})
    2655 
    2656         # Setup only one forcing term, constant rainfall
    2657         # restricted to a polygon enclosing triangle #1 (bce)
    2658         domain.forcing_terms = []
    2659         R = Rainfall(domain, rate=2.0, polygon=[[1,1], [2,1], [2,2], [1,2]])
    2660 
    2661         assert num.allclose(R.exchange_area, 2)
    2662 
    2663         domain.forcing_terms.append(R)
    2664 
    2665         domain.compute_forcing_terms()
    2666 
    2667         assert num.allclose(domain.quantities['stage'].explicit_update[1],
    2668                             2.0/1000)
    2669         assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
    2670         assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
    2671 
    2672     def test_time_dependent_rainfall_restricted_by_polygon(self):
    2673         a = [0.0, 0.0]
    2674         b = [0.0, 2.0]
    2675         c = [2.0, 0.0]
    2676         d = [0.0, 4.0]
    2677         e = [2.0, 2.0]
    2678         f = [4.0, 0.0]
    2679 
    2680         points = [a, b, c, d, e, f]
    2681         #             bac,     bce,     ecf,     dbe
    2682         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2683 
    2684         domain = Domain(points, vertices)
    2685 
    2686         # Flat surface with 1m of water
    2687         domain.set_quantity('elevation', 0)
    2688         domain.set_quantity('stage', 1.0)
    2689         domain.set_quantity('friction', 0)
    2690 
    2691         Br = Reflective_boundary(domain)
    2692         domain.set_boundary({'exterior': Br})
    2693 
    2694         # Setup only one forcing term, time dependent rainfall
    2695         # restricted to a polygon enclosing triangle #1 (bce)
    2696         domain.forcing_terms = []
    2697         R = Rainfall(domain,
    2698                      rate=lambda t: 3*t + 7,
    2699                      polygon = [[1,1], [2,1], [2,2], [1,2]])
    2700 
    2701         assert num.allclose(R.exchange_area, 2)
    2702        
    2703         domain.forcing_terms.append(R)
    2704 
    2705         domain.time = 10.
    2706 
    2707         domain.compute_forcing_terms()
    2708 
    2709         assert num.allclose(domain.quantities['stage'].explicit_update[1],
    2710                             (3*domain.time + 7)/1000)
    2711         assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
    2712         assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
    2713 
    2714     def test_time_dependent_rainfall_using_starttime(self):
    2715         rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
    2716 
    2717         a = [0.0, 0.0]
    2718         b = [0.0, 2.0]
    2719         c = [2.0, 0.0]
    2720         d = [0.0, 4.0]
    2721         e = [2.0, 2.0]
    2722         f = [4.0, 0.0]
    2723 
    2724         points = [a, b, c, d, e, f]
    2725         #             bac,     bce,     ecf,     dbe
    2726         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2727 
    2728         domain = Domain(points, vertices)
    2729 
    2730         # Flat surface with 1m of water
    2731         domain.set_quantity('elevation', 0)
    2732         domain.set_quantity('stage', 1.0)
    2733         domain.set_quantity('friction', 0)
    2734 
    2735         Br = Reflective_boundary(domain)
    2736         domain.set_boundary({'exterior': Br})
    2737 
    2738         # Setup only one forcing term, time dependent rainfall
    2739         # restricted to a polygon enclosing triangle #1 (bce)
    2740         domain.forcing_terms = []
    2741         R = Rainfall(domain,
    2742                      rate=lambda t: 3*t + 7,
    2743                      polygon=rainfall_poly)                     
    2744 
    2745         assert num.allclose(R.exchange_area, 2)
    2746        
    2747         domain.forcing_terms.append(R)
    2748 
    2749         # This will test that time used in the forcing function takes
    2750         # startime into account.
    2751         domain.starttime = 5.0
    2752 
    2753         domain.time = 7.
    2754 
    2755         domain.compute_forcing_terms()
    2756 
    2757         assert num.allclose(domain.quantities['stage'].explicit_update[1],
    2758                             (3*domain.get_time() + 7)/1000)
    2759         assert num.allclose(domain.quantities['stage'].explicit_update[1],
    2760                             (3*(domain.time + domain.starttime) + 7)/1000)
    2761 
    2762         # Using internal time her should fail
    2763         assert not num.allclose(domain.quantities['stage'].explicit_update[1],
    2764                                 (3*domain.time + 7)/1000)
    2765 
    2766         assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
    2767         assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
    2768 
    2769     def test_time_dependent_rainfall_using_georef(self):
    2770         """test_time_dependent_rainfall_using_georef
    2771 
    2772         This will also test the General forcing term using georef
    2773         """
    2774 
    2775         # Mesh in zone 56 (absolute coords)
    2776         x0 = 314036.58727982
    2777         y0 = 6224951.2960092
    2778 
    2779         rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float)
    2780         rainfall_poly += [x0, y0]
    2781 
    2782         a = [0.0, 0.0]
    2783         b = [0.0, 2.0]
    2784         c = [2.0, 0.0]
    2785         d = [0.0, 4.0]
    2786         e = [2.0, 2.0]
    2787         f = [4.0, 0.0]
    2788 
    2789         points = [a, b, c, d, e, f]
    2790         #             bac,     bce,     ecf,     dbe
    2791         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2792 
    2793         domain = Domain(points, vertices,
    2794                         geo_reference=Geo_reference(56, x0, y0))
    2795 
    2796         # Flat surface with 1m of water
    2797         domain.set_quantity('elevation', 0)
    2798         domain.set_quantity('stage', 1.0)
    2799         domain.set_quantity('friction', 0)
    2800 
    2801         Br = Reflective_boundary(domain)
    2802         domain.set_boundary({'exterior': Br})
    2803 
    2804         # Setup only one forcing term, time dependent rainfall
    2805         # restricted to a polygon enclosing triangle #1 (bce)
    2806         domain.forcing_terms = []
    2807         R = Rainfall(domain,
    2808                      rate=lambda t: 3*t + 7,
    2809                      polygon=rainfall_poly)
    2810 
    2811         assert num.allclose(R.exchange_area, 2)
    2812        
    2813         domain.forcing_terms.append(R)
    2814 
    2815         # This will test that time used in the forcing function takes
    2816         # startime into account.
    2817         domain.starttime = 5.0
    2818 
    2819         domain.time = 7.
    2820 
    2821         domain.compute_forcing_terms()
    2822 
    2823         assert num.allclose(domain.quantities['stage'].explicit_update[1],
    2824                             (3*domain.get_time() + 7)/1000)
    2825         assert num.allclose(domain.quantities['stage'].explicit_update[1],
    2826                             (3*(domain.time + domain.starttime) + 7)/1000)
    2827 
    2828         # Using internal time her should fail
    2829         assert not num.allclose(domain.quantities['stage'].explicit_update[1],
    2830                                 (3*domain.time + 7)/1000)
    2831 
    2832         assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
    2833         assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
    2834 
    2835     def test_time_dependent_rainfall_restricted_by_polygon_with_default(self):
    2836         """
    2837         Test that default rainfall can be used when given rate runs out of data.
    2838         """
    2839 
    2840         a = [0.0, 0.0]
    2841         b = [0.0, 2.0]
    2842         c = [2.0, 0.0]
    2843         d = [0.0, 4.0]
    2844         e = [2.0, 2.0]
    2845         f = [4.0, 0.0]
    2846 
    2847         points = [a, b, c, d, e, f]
    2848         #             bac,     bce,     ecf,     dbe
    2849         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2850 
    2851         domain = Domain(points, vertices)
    2852 
    2853         # Flat surface with 1m of water
    2854         domain.set_quantity('elevation', 0)
    2855         domain.set_quantity('stage', 1.0)
    2856         domain.set_quantity('friction', 0)
    2857 
    2858         Br = Reflective_boundary(domain)
    2859         domain.set_boundary({'exterior': Br})
    2860 
    2861         # Setup only one forcing term, time dependent rainfall
    2862         # that expires at t==20
    2863         from anuga.fit_interpolate.interpolate import Modeltime_too_late
    2864 
    2865         def main_rate(t):
    2866             if t > 20:
    2867                 msg = 'Model time exceeded.'
    2868                 raise Modeltime_too_late, msg
    2869             else:
    2870                 return 3*t + 7
    2871 
    2872         domain.forcing_terms = []
    2873         R = Rainfall(domain,
    2874                      rate=main_rate,
    2875                      polygon = [[1,1], [2,1], [2,2], [1,2]],
    2876                      default_rate=5.0)
    2877 
    2878         assert num.allclose(R.exchange_area, 2)
    2879        
    2880         domain.forcing_terms.append(R)
    2881 
    2882         domain.time = 10.
    2883 
    2884         domain.compute_forcing_terms()
    2885 
    2886         assert num.allclose(domain.quantities['stage'].explicit_update[1],
    2887                             (3*domain.time+7)/1000)
    2888         assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
    2889         assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
    2890 
    2891         domain.time = 100.
    2892         domain.quantities['stage'].explicit_update[:] = 0.0     # Reset
    2893         domain.compute_forcing_terms()
    2894 
    2895         assert num.allclose(domain.quantities['stage'].explicit_update[1],
    2896                             5.0/1000) # Default value
    2897         assert num.allclose(domain.quantities['stage'].explicit_update[0], 0)
    2898         assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0)
    2899 
    2900     def test_rainfall_forcing_with_evolve(self):
    2901         """test_rainfall_forcing_with_evolve
    2902 
    2903         Test how forcing terms are called within evolve
    2904         """
    2905 
    2906         # FIXME(Ole): This test is just to experiment
    2907 
    2908         a = [0.0, 0.0]
    2909         b = [0.0, 2.0]
    2910         c = [2.0, 0.0]
    2911         d = [0.0, 4.0]
    2912         e = [2.0, 2.0]
    2913         f = [4.0, 0.0]
    2914 
    2915         points = [a, b, c, d, e, f]
    2916         #             bac,     bce,     ecf,     dbe
    2917         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2918 
    2919         domain = Domain(points, vertices)
    2920 
    2921         # Flat surface with 1m of water
    2922         domain.set_quantity('elevation', 0)
    2923         domain.set_quantity('stage', 1.0)
    2924         domain.set_quantity('friction', 0)
    2925 
    2926         Br = Reflective_boundary(domain)
    2927         domain.set_boundary({'exterior': Br})
    2928 
    2929         # Setup only one forcing term, time dependent rainfall
    2930         # that expires at t==20
    2931         from anuga.fit_interpolate.interpolate import Modeltime_too_late
    2932 
    2933         def main_rate(t):
    2934             if t > 20:
    2935                 msg = 'Model time exceeded.'
    2936                 raise Modeltime_too_late, msg
    2937             else:
    2938                 return 3*t + 7
    2939 
    2940         domain.forcing_terms = []
    2941         R = Rainfall(domain,
    2942                      rate=main_rate,
    2943                      polygon=[[1,1], [2,1], [2,2], [1,2]],
    2944                      default_rate=5.0)
    2945 
    2946         assert num.allclose(R.exchange_area, 2)
    2947        
    2948         domain.forcing_terms.append(R)
    2949 
    2950         for t in domain.evolve(yieldstep=1, finaltime=25):
    2951             pass
    2952             #FIXME(Ole):  A test here is hard because explicit_update also
    2953             # receives updates from the flux calculation.
    2954 
    2955 
    2956     def test_rainfall_forcing_with_evolve_1(self):
    2957         """test_rainfall_forcing_with_evolve
    2958 
    2959         Test how forcing terms are called within evolve.
    2960         This test checks that proper exception is thrown when no default_rate is set
    2961         """
    2962 
    2963 
    2964         a = [0.0, 0.0]
    2965         b = [0.0, 2.0]
    2966         c = [2.0, 0.0]
    2967         d = [0.0, 4.0]
    2968         e = [2.0, 2.0]
    2969         f = [4.0, 0.0]
    2970 
    2971         points = [a, b, c, d, e, f]
    2972         #             bac,     bce,     ecf,     dbe
    2973         vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
    2974 
    2975         domain = Domain(points, vertices)
    2976 
    2977         # Flat surface with 1m of water
    2978         domain.set_quantity('elevation', 0)
    2979         domain.set_quantity('stage', 1.0)
    2980         domain.set_quantity('friction', 0)
    2981 
    2982         Br = Reflective_boundary(domain)
    2983         domain.set_boundary({'exterior': Br})
    2984 
    2985         # Setup only one forcing term, time dependent rainfall
    2986         # that expires at t==20
    2987         from anuga.fit_interpolate.interpolate import Modeltime_too_late
    2988 
    2989         def main_rate(t):
    2990             if t > 20:
    2991                 msg = 'Model time exceeded.'
    2992                 raise Modeltime_too_late, msg
    2993             else:
    2994                 return 3*t + 7
    2995 
    2996         domain.forcing_terms = []
    2997         R = Rainfall(domain,
    2998                      rate=main_rate,
    2999                      polygon=[[1,1], [2,1], [2,2], [1,2]])
    3000 
    3001 
    3002         assert num.allclose(R.exchange_area, 2)
    3003        
    3004         domain.forcing_terms.append(R)
    3005         #for t in domain.evolve(yieldstep=1, finaltime=25):
    3006         #    pass
    3007                
    3008         try:
    3009             for t in domain.evolve(yieldstep=1, finaltime=25):
    3010                 pass
    3011         except Modeltime_too_late, e:
    3012             # Test that error message is as expected
    3013             assert 'can specify keyword argument default_rate in the forcing function' in str(e)
    3014         else:
    3015             raise Exception, 'Should have raised exception'
    3016 
    3017            
     2182
    30182183           
    30192184    def test_inflow_using_circle(self):
     
    66165781                import rectangular_cross
    66175782        from anuga.shallow_water import Domain
    6618         from anuga.shallow_water.shallow_water_domain import Inflow
     5783        from anuga.shallow_water.forcing import Inflow
    66195784        from anuga.shallow_water.data_manager \
    66205785                import get_flow_through_cross_section
     
    67075872        from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
    67085873        from anuga.shallow_water import Domain
    6709         from anuga.shallow_water.shallow_water_domain import Inflow
    67105874        from anuga.shallow_water.data_manager import get_flow_through_cross_section
    67115875
     
    72766440        #----------------------------------------------------------------------
    72776441        from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
    7278         from anuga.shallow_water import Domain
    7279         from anuga.shallow_water.shallow_water_domain import Reflective_boundary
    7280         from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
    7281         from anuga.shallow_water.shallow_water_domain import Inflow_boundary
    72826442        from anuga.shallow_water.data_manager import get_flow_through_cross_section
    72836443        from anuga.abstract_2d_finite_volumes.util import sww2csv_gauges, csv2timeseries_graphs
     
    74166576        from anuga.shallow_water.shallow_water_domain import Reflective_boundary
    74176577        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
    7418         from anuga.shallow_water.shallow_water_domain import Inflow
     6578        from anuga.shallow_water.forcing import Inflow
    74196579        from anuga.shallow_water.data_manager \
    74206580                import get_flow_through_cross_section
  • anuga_core/source/anuga/shallow_water_balanced/test_swb_conservation.py

    r7573 r7733  
    458458        from anuga.shallow_water.shallow_water_domain import Reflective_boundary
    459459        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
    460         from anuga.shallow_water.shallow_water_domain import Inflow
     460        from anuga.shallow_water.forcing import Inflow
    461461        from anuga.shallow_water.data_manager \
    462462                import get_flow_through_cross_section
     
    551551        from anuga.shallow_water.shallow_water_domain import Reflective_boundary
    552552        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
    553         from anuga.shallow_water.shallow_water_domain import Inflow
     553        from anuga.shallow_water.forcing import Inflow
    554554        from anuga.shallow_water.data_manager import get_flow_through_cross_section
    555555
  • anuga_core/source/anuga/shallow_water_balanced/test_swb_forcing_terms.py

    r7562 r7733  
    13621362        from anuga.shallow_water.shallow_water_domain import Reflective_boundary
    13631363        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
    1364         from anuga.shallow_water.shallow_water_domain import Inflow
     1364        from anuga.shallow_water.forcing import Inflow
    13651365        from anuga.shallow_water.data_manager \
    13661366                import get_flow_through_cross_section
     
    15211521        from anuga.shallow_water.shallow_water_domain import Reflective_boundary
    15221522        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
    1523         from anuga.shallow_water.shallow_water_domain import Inflow_boundary
     1523        from anuga.shallow_water.forcing import Inflow_boundary
    15241524        from anuga.shallow_water.data_manager import get_flow_through_cross_section
    15251525        from anuga.abstract_2d_finite_volumes.util import sww2csv_gauges, csv2timeseries_graphs
     
    16581658        from anuga.shallow_water.shallow_water_domain import Reflective_boundary
    16591659        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
    1660         from anuga.shallow_water.shallow_water_domain import Inflow
     1660        from anuga.shallow_water.forcing import Inflow
    16611661        from anuga.shallow_water.data_manager \
    16621662                import get_flow_through_cross_section
Note: See TracChangeset for help on using the changeset viewer.