Changeset 7733
- Timestamp:
- May 19, 2010, 10:13:06 AM (15 years ago)
- Location:
- anuga_core/source/anuga
- Files:
-
- 2 added
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
anuga_core/source/anuga/shallow_water/shallow_water_domain.py
r7731 r7733 83 83 import numpy as num 84 84 85 from anuga.abstract_2d_finite_volumes.neighbour_mesh import segment_midpoints86 85 from anuga.abstract_2d_finite_volumes.domain import Domain as Generic_Domain 87 from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\ 88 import Boundary 89 from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\ 90 import File_boundary 91 from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\ 92 import Dirichlet_boundary 93 from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\ 94 import Time_boundary 95 from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\ 96 import Transmissive_boundary 97 from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\ 98 import AWI_boundary 99 86 87 from anuga.shallow_water.forcing import Cross_section 100 88 from anuga.pmesh.mesh_interface import create_mesh_from_regions 101 89 from anuga.utilities.numerical_tools import gradient, mean, ensure_numeric … … 1432 1420 1433 1421 1434 ################################################################################1435 # Experimental auxiliary functions1436 ################################################################################1437 1438 ##1439 # @brief Check forcefield parameter.1440 # @param f Object to check.1441 # @note 'f' may be a callable object or a scalar value.1442 def check_forcefield(f):1443 """Check that force object is as expected.1444 1445 Check that f is either:1446 1: a callable object f(t,x,y), where x and y are vectors1447 and that it returns an array or a list of same length1448 as x and y1449 2: a scalar1450 """1451 1452 if callable(f):1453 N = 31454 x = num.ones(3, num.float)1455 y = num.ones(3, num.float)1456 try:1457 q = f(1.0, x=x, y=y)1458 except Exception, e:1459 msg = 'Function %s could not be executed:\n%s' %(f, e)1460 # FIXME: Reconsider this semantics1461 raise Exception, msg1462 1463 try:1464 q = num.array(q, num.float)1465 except:1466 msg = ('Return value from vector function %s could not '1467 'be converted into a numeric array of floats.\nSpecified '1468 'function should return either list or array.' % f)1469 raise Exception, msg1470 1471 # Is this really what we want?1472 # info is "(func name, filename, defining line)"1473 func_info = (f.func_name, f.func_code.co_filename,1474 f.func_code.co_firstlineno)1475 func_msg = 'Function %s (defined in %s, line %d)' % func_info1476 try:1477 result_len = len(q)1478 except:1479 msg = '%s must return vector' % func_msg1480 self.fail(msg)1481 msg = '%s must return vector of length %d' % (func_msg, N)1482 assert result_len == N, msg1483 else:1484 try:1485 f = float(f)1486 except:1487 msg = ('Force field %s must be a scalar value coercible to float.'1488 % str(f))1489 raise Exception, msg1490 1491 return f1492 1493 1494 ##1495 # Class to apply a wind stress to a domain.1496 class Wind_stress:1497 """Apply wind stress to water momentum in terms of1498 wind speed [m/s] and wind direction [degrees]1499 """1500 1501 ##1502 # @brief Create an instance of Wind_stress.1503 # @param *args1504 # @param **kwargs1505 def __init__(self, *args, **kwargs):1506 """Initialise windfield from wind speed s [m/s]1507 and wind direction phi [degrees]1508 1509 Inputs v and phi can be either scalars or Python functions, e.g.1510 1511 W = Wind_stress(10, 178)1512 1513 #FIXME - 'normal' degrees are assumed for now, i.e. the1514 vector (1,0) has zero degrees.1515 We may need to convert from 'compass' degrees later on and also1516 map from True north to grid north.1517 1518 Arguments can also be Python functions of t,x,y as in1519 1520 def speed(t,x,y):1521 ...1522 return s1523 1524 def angle(t,x,y):1525 ...1526 return phi1527 1528 where x and y are vectors.1529 1530 and then pass the functions in1531 1532 W = Wind_stress(speed, angle)1533 1534 The instantiated object W can be appended to the list of1535 forcing_terms as in1536 1537 Alternatively, one vector valued function for (speed, angle)1538 can be applied, providing both quantities simultaneously.1539 As in1540 W = Wind_stress(F), where returns (speed, angle) for each t.1541 1542 domain.forcing_terms.append(W)1543 """1544 1545 from anuga.config import rho_a, rho_w, eta_w1546 1547 if len(args) == 2:1548 s = args[0]1549 phi = args[1]1550 elif len(args) == 1:1551 # Assume vector function returning (s, phi)(t,x,y)1552 vector_function = args[0]1553 s = lambda t,x,y: vector_function(t,x=x,y=y)[0]1554 phi = lambda t,x,y: vector_function(t,x=x,y=y)[1]1555 else:1556 # Assume info is in 2 keyword arguments1557 if len(kwargs) == 2:1558 s = kwargs['s']1559 phi = kwargs['phi']1560 else:1561 raise Exception, 'Assumes two keyword arguments: s=..., phi=....'1562 1563 self.speed = check_forcefield(s)1564 self.phi = check_forcefield(phi)1565 1566 self.const = eta_w*rho_a/rho_w1567 1568 ##1569 # @brief 'execute' this class instance.1570 # @param domain1571 def __call__(self, domain):1572 """Evaluate windfield based on values found in domain"""1573 1574 from math import pi, cos, sin, sqrt1575 1576 xmom_update = domain.quantities['xmomentum'].explicit_update1577 ymom_update = domain.quantities['ymomentum'].explicit_update1578 1579 N = len(domain) # number_of_triangles1580 t = domain.time1581 1582 if callable(self.speed):1583 xc = domain.get_centroid_coordinates()1584 s_vec = self.speed(t, xc[:,0], xc[:,1])1585 else:1586 # Assume s is a scalar1587 try:1588 s_vec = self.speed * num.ones(N, num.float)1589 except:1590 msg = 'Speed must be either callable or a scalar: %s' %self.s1591 raise msg1592 1593 if callable(self.phi):1594 xc = domain.get_centroid_coordinates()1595 phi_vec = self.phi(t, xc[:,0], xc[:,1])1596 else:1597 # Assume phi is a scalar1598 1599 try:1600 phi_vec = self.phi * num.ones(N, num.float)1601 except:1602 msg = 'Angle must be either callable or a scalar: %s' %self.phi1603 raise msg1604 1605 assign_windfield_values(xmom_update, ymom_update,1606 s_vec, phi_vec, self.const)1607 1608 1609 ##1610 # @brief Assign wind field values1611 # @param xmom_update1612 # @param ymom_update1613 # @param s_vec1614 # @param phi_vec1615 # @param const1616 def assign_windfield_values(xmom_update, ymom_update,1617 s_vec, phi_vec, const):1618 """Python version of assigning wind field to update vectors.1619 A C version also exists (for speed)1620 """1621 1622 from math import pi, cos, sin, sqrt1623 1624 N = len(s_vec)1625 for k in range(N):1626 s = s_vec[k]1627 phi = phi_vec[k]1628 1629 # Convert to radians1630 phi = phi*pi/1801631 1632 # Compute velocity vector (u, v)1633 u = s*cos(phi)1634 v = s*sin(phi)1635 1636 # Compute wind stress1637 S = const * sqrt(u**2 + v**2)1638 xmom_update[k] += S*u1639 ymom_update[k] += S*v1640 1641 1642 ##1643 # @brief A class for a general explicit forcing term.1644 class General_forcing:1645 """General explicit forcing term for update of quantity1646 1647 This is used by Inflow and Rainfall for instance1648 1649 1650 General_forcing(quantity_name, rate, center, radius, polygon)1651 1652 domain: ANUGA computational domain1653 quantity_name: Name of quantity to update.1654 It must be a known conserved quantity.1655 1656 rate [?/s]: Total rate of change over the specified area.1657 This parameter can be either a constant or a1658 function of time. Positive values indicate increases,1659 negative values indicate decreases.1660 Rate can be None at initialisation but must be specified1661 before forcing term is applied (i.e. simulation has started).1662 1663 center [m]: Coordinates at center of flow point1664 radius [m]: Size of circular area1665 polygon: Arbitrary polygon1666 default_rate: Rate to be used if rate fails (e.g. if model time exceeds its data)1667 Admissible types: None, constant number or function of t1668 1669 1670 Either center, radius or polygon can be specified but not both.1671 If neither are specified the entire domain gets updated.1672 All coordinates to be specified in absolute UTM coordinates (x, y) assuming the zone of domain.1673 1674 Inflow or Rainfall for examples of use1675 """1676 1677 1678 # FIXME (AnyOne) : Add various methods to allow spatial variations1679 1680 ##1681 # @brief Create an instance of this forcing term.1682 # @param domain1683 # @param quantity_name1684 # @param rate1685 # @param center1686 # @param radius1687 # @param polygon1688 # @param default_rate1689 # @param verbose1690 def __init__(self,1691 domain,1692 quantity_name,1693 rate=0.0,1694 center=None,1695 radius=None,1696 polygon=None,1697 default_rate=None,1698 verbose=False):1699 1700 from math import pi, cos, sin1701 1702 if center is None:1703 msg = 'I got radius but no center.'1704 assert radius is None, msg1705 1706 if radius is None:1707 msg += 'I got center but no radius.'1708 assert center is None, msg1709 1710 self.domain = domain1711 self.quantity_name = quantity_name1712 self.rate = rate1713 self.center = ensure_numeric(center)1714 self.radius = radius1715 self.polygon = polygon1716 self.verbose = verbose1717 self.value = 0.0 # Can be used to remember value at1718 # previous timestep in order to obtain rate1719 1720 # Get boundary (in absolute coordinates)1721 bounding_polygon = domain.get_boundary_polygon()1722 1723 # Update area if applicable1724 if center is not None and radius is not None:1725 assert len(center) == 21726 msg = 'Polygon cannot be specified when center and radius are'1727 assert polygon is None, msg1728 1729 # Check that circle center lies within the mesh.1730 msg = 'Center %s specified for forcing term did not' % str(center)1731 msg += 'fall within the domain boundary.'1732 assert is_inside_polygon(center, bounding_polygon), msg1733 1734 # Check that circle periphery lies within the mesh.1735 N = 1001736 periphery_points = []1737 for i in range(N):1738 theta = 2*pi*i/1001739 1740 x = center[0] + radius*cos(theta)1741 y = center[1] + radius*sin(theta)1742 1743 periphery_points.append([x,y])1744 1745 for point in periphery_points:1746 msg = 'Point %s on periphery for forcing term' % str(point)1747 msg += ' did not fall within the domain boundary.'1748 assert is_inside_polygon(point, bounding_polygon), msg1749 1750 if polygon is not None:1751 # Check that polygon lies within the mesh.1752 for point in self.polygon:1753 msg = 'Point %s in polygon for forcing term' % str(point)1754 msg += ' did not fall within the domain boundary.'1755 assert is_inside_polygon(point, bounding_polygon), msg1756 1757 # Pointer to update vector1758 self.update = domain.quantities[self.quantity_name].explicit_update1759 1760 # Determine indices in flow area1761 N = len(domain)1762 points = domain.get_centroid_coordinates(absolute=True)1763 1764 # Calculate indices in exchange area for this forcing term1765 self.exchange_indices = None1766 if self.center is not None and self.radius is not None:1767 # Inlet is circular1768 inlet_region = 'center=%s, radius=%s' % (self.center, self.radius)1769 1770 self.exchange_indices = []1771 for k in range(N):1772 x, y = points[k,:] # Centroid1773 1774 c = self.center1775 if ((x-c[0])**2+(y-c[1])**2) < self.radius**2:1776 self.exchange_indices.append(k)1777 1778 if self.polygon is not None:1779 # Inlet is polygon1780 inlet_region = 'polygon=%s' % (self.polygon)1781 self.exchange_indices = inside_polygon(points, self.polygon)1782 1783 if self.exchange_indices is None:1784 self.exchange_area = polygon_area(bounding_polygon)1785 else:1786 if len(self.exchange_indices) == 0:1787 msg = 'No triangles have been identified in '1788 msg += 'specified region: %s' % inlet_region1789 raise Exception, msg1790 1791 # Compute exchange area as the sum of areas of triangles identified1792 # by circle or polygon1793 self.exchange_area = 0.01794 for i in self.exchange_indices:1795 self.exchange_area += domain.areas[i]1796 1797 1798 msg = 'Exchange area in forcing term'1799 msg += ' has area = %f' %self.exchange_area1800 assert self.exchange_area > 0.01801 1802 1803 1804 1805 # Check and store default_rate1806 msg = ('Keyword argument default_rate must be either None '1807 'or a function of time.\nI got %s.' % str(default_rate))1808 assert (default_rate is None or1809 type(default_rate) in [IntType, FloatType] or1810 callable(default_rate)), msg1811 1812 if default_rate is not None:1813 # If it is a constant, make it a function1814 if not callable(default_rate):1815 tmp = default_rate1816 default_rate = lambda t: tmp1817 1818 # Check that default_rate is a function of one argument1819 try:1820 default_rate(0.0)1821 except:1822 raise Exception, msg1823 1824 self.default_rate = default_rate1825 self.default_rate_invoked = False # Flag1826 1827 ##1828 # @brief Execute this instance.1829 # @param domain1830 def __call__(self, domain):1831 """Apply inflow function at time specified in domain, update stage"""1832 1833 # Call virtual method allowing local modifications1834 t = domain.get_time()1835 try:1836 rate = self.update_rate(t)1837 except Modeltime_too_early, e:1838 raise Modeltime_too_early, e1839 except Modeltime_too_late, e:1840 if self.default_rate is None:1841 msg = '%s: ANUGA is trying to run longer than specified data.\n' %str(e)1842 msg += 'You can specify keyword argument default_rate in the '1843 msg += 'forcing function to tell it what to do in the absence of time data.'1844 raise Modeltime_too_late, msg1845 else:1846 # Pass control to default rate function1847 rate = self.default_rate(t)1848 1849 if self.default_rate_invoked is False:1850 # Issue warning the first time1851 msg = ('%s\n'1852 'Instead I will use the default rate: %s\n'1853 'Note: Further warnings will be supressed'1854 % (str(e), str(self.default_rate)))1855 warn(msg)1856 1857 # FIXME (Ole): Replace this crude flag with1858 # Python's ability to print warnings only once.1859 # See http://docs.python.org/lib/warning-filter.html1860 self.default_rate_invoked = True1861 1862 if rate is None:1863 msg = ('Attribute rate must be specified in General_forcing '1864 'or its descendants before attempting to call it')1865 raise Exception, msg1866 1867 # Now rate is a number1868 if self.verbose is True:1869 log.critical('Rate of %s at time = %.2f = %f'1870 % (self.quantity_name, domain.get_time(), rate))1871 1872 if self.exchange_indices is None:1873 self.update[:] += rate1874 else:1875 # Brute force assignment of restricted rate1876 for k in self.exchange_indices:1877 self.update[k] += rate1878 1879 ##1880 # @brief Update the internal rate.1881 # @param t A callable or scalar used to set the rate.1882 # @return The new rate.1883 def update_rate(self, t):1884 """Virtual method allowing local modifications by writing an1885 overriding version in descendant1886 """1887 1888 if callable(self.rate):1889 rate = self.rate(t)1890 else:1891 rate = self.rate1892 1893 return rate1894 1895 ##1896 # @brief Get values for the specified quantity.1897 # @param quantity_name Name of the quantity of interest.1898 # @return The value(s) of the quantity.1899 # @note If 'quantity_name' is None, use self.quantity_name.1900 def get_quantity_values(self, quantity_name=None):1901 """Return values for specified quantity restricted to opening1902 1903 Optionally a quantity name can be specified if values from another1904 quantity is sought1905 """1906 1907 if quantity_name is None:1908 quantity_name = self.quantity_name1909 1910 q = self.domain.quantities[quantity_name]1911 return q.get_values(location='centroids',1912 indices=self.exchange_indices)1913 1914 ##1915 # @brief Set value for the specified quantity.1916 # @param val The value object used to set value.1917 # @param quantity_name Name of the quantity of interest.1918 # @note If 'quantity_name' is None, use self.quantity_name.1919 def set_quantity_values(self, val, quantity_name=None):1920 """Set values for specified quantity restricted to opening1921 1922 Optionally a quantity name can be specified if values from another1923 quantity is sought1924 """1925 1926 if quantity_name is None:1927 quantity_name = self.quantity_name1928 1929 q = self.domain.quantities[self.quantity_name]1930 q.set_values(val,1931 location='centroids',1932 indices=self.exchange_indices)1933 1934 1935 ##1936 # @brief A class for rainfall forcing function.1937 # @note Inherits from General_forcing.1938 class Rainfall(General_forcing):1939 """Class Rainfall - general 'rain over entire domain' forcing term.1940 1941 Used for implementing Rainfall over the entire domain.1942 1943 Current Limited to only One Gauge..1944 1945 Need to add Spatial Varying Capability1946 (This module came from copying and amending the Inflow Code)1947 1948 Rainfall(rain)1949 1950 domain1951 rain [mm/s]: Total rain rate over the specified domain.1952 NOTE: Raingauge Data needs to reflect the time step.1953 IE: if Gauge is mm read at a time step, then the input1954 here is as mm/(timeStep) so 10mm in 5minutes becomes1955 10/(5x60) = 0.0333mm/s.1956 1957 This parameter can be either a constant or a1958 function of time. Positive values indicate inflow,1959 negative values indicate outflow.1960 (and be used for Infiltration - Write Seperate Module)1961 The specified flow will be divided by the area of1962 the inflow region and then applied to update the1963 stage quantity.1964 1965 polygon: Specifies a polygon to restrict the rainfall.1966 1967 Examples1968 How to put them in a run File...1969 1970 #------------------------------------------------------------------------1971 # Setup specialised forcing terms1972 #------------------------------------------------------------------------1973 # This is the new element implemented by Ole and Rudy to allow direct1974 # input of Rainfall in mm/s1975 1976 catchmentrainfall = Rainfall(rain=file_function('Q100_2hr_Rain.tms'))1977 # Note need path to File in String.1978 # Else assumed in same directory1979 1980 domain.forcing_terms.append(catchmentrainfall)1981 """1982 1983 ##1984 # @brief Create an instance of the class.1985 # @param domain Domain of interest.1986 # @param rate Total rain rate over the specified domain (mm/s).1987 # @param center1988 # @param radius1989 # @param polygon Polygon to restrict rainfall.1990 # @param default_rate1991 # @param verbose True if this instance is to be verbose.1992 def __init__(self,1993 domain,1994 rate=0.0,1995 center=None,1996 radius=None,1997 polygon=None,1998 default_rate=None,1999 verbose=False):2000 2001 # Converting mm/s to m/s to apply in ANUGA)2002 if callable(rate):2003 rain = lambda t: rate(t)/1000.02004 else:2005 rain = rate/1000.02006 2007 if default_rate is not None:2008 if callable(default_rate):2009 default_rain = lambda t: default_rate(t)/1000.02010 else:2011 default_rain = default_rate/1000.02012 else:2013 default_rain = None2014 2015 2016 2017 General_forcing.__init__(self,2018 domain,2019 'stage',2020 rate=rain,2021 center=center,2022 radius=radius,2023 polygon=polygon,2024 default_rate=default_rain,2025 verbose=verbose)2026 2027 2028 ##2029 # @brief A class for inflow (rain and drain) forcing function.2030 # @note Inherits from General_forcing.2031 class Inflow(General_forcing):2032 """Class Inflow - general 'rain and drain' forcing term.2033 2034 Useful for implementing flows in and out of the domain.2035 2036 Inflow(flow, center, radius, polygon)2037 2038 domain2039 rate [m^3/s]: Total flow rate over the specified area.2040 This parameter can be either a constant or a2041 function of time. Positive values indicate inflow,2042 negative values indicate outflow.2043 The specified flow will be divided by the area of2044 the inflow region and then applied to update stage.2045 center [m]: Coordinates at center of flow point2046 radius [m]: Size of circular area2047 polygon: Arbitrary polygon.2048 2049 Either center, radius or polygon must be specified2050 2051 Examples2052 2053 # Constant drain at 0.003 m^3/s.2054 # The outflow area is 0.07**2*pi=0.0154 m^22055 # This corresponds to a rate of change of 0.003/0.0154 = 0.2 m/s2056 #2057 Inflow((0.7, 0.4), 0.07, -0.003)2058 2059 2060 # Tap turning up to a maximum inflow of 0.0142 m^3/s.2061 # The inflow area is 0.03**2*pi = 0.00283 m^22062 # This corresponds to a rate of change of 0.0142/0.00283 = 5 m/s2063 # over the specified area2064 Inflow((0.5, 0.5), 0.03, lambda t: min(0.01*t, 0.0142))2065 2066 2067 #------------------------------------------------------------------------2068 # Setup specialised forcing terms2069 #------------------------------------------------------------------------2070 # This is the new element implemented by Ole to allow direct input2071 # of Inflow in m^3/s2072 2073 hydrograph = Inflow(center=(320, 300), radius=10,2074 rate=file_function('Q/QPMF_Rot_Sub13.tms'))2075 2076 domain.forcing_terms.append(hydrograph)2077 """2078 2079 ##2080 # @brief Create an instance of the class.2081 # @param domain Domain of interest.2082 # @param rate Total rain rate over the specified domain (mm/s).2083 # @param center2084 # @param radius2085 # @param polygon Polygon to restrict rainfall.2086 # @param default_rate2087 # @param verbose True if this instance is to be verbose.2088 def __init__(self,2089 domain,2090 rate=0.0,2091 center=None,2092 radius=None,2093 polygon=None,2094 default_rate=None,2095 verbose=False):2096 # Create object first to make area is available2097 General_forcing.__init__(self,2098 domain,2099 'stage',2100 rate=rate,2101 center=center,2102 radius=radius,2103 polygon=polygon,2104 default_rate=default_rate,2105 verbose=verbose)2106 2107 ##2108 # @brief Update the instance rate.2109 # @param t New rate object.2110 def update_rate(self, t):2111 """Virtual method allowing local modifications by writing an2112 overriding version in descendant2113 2114 This one converts m^3/s to m/s which can be added directly2115 to 'stage' in ANUGA2116 """2117 2118 if callable(self.rate):2119 _rate = self.rate(t)/self.exchange_area2120 else:2121 _rate = self.rate/self.exchange_area2122 2123 return _rate2124 2125 2126 ##2127 # @brief A class for creating cross sections.2128 # @note Inherits from General_forcing.2129 class Cross_section:2130 """Class Cross_section - a class to setup a cross section from2131 which you can then calculate flow and energy through cross section2132 2133 2134 Cross_section(domain, polyline)2135 2136 domain:2137 polyline: Representation of desired cross section - it may contain2138 multiple sections allowing for complex shapes. Assume2139 absolute UTM coordinates.2140 Format [[x0, y0], [x1, y1], ...]2141 verbose:2142 """2143 2144 ##2145 # @brief Create an instance of the class.2146 # @param domain Domain of interest.2147 # @param polyline Polyline defining cross section2148 # @param verbose True if this instance is to be verbose.2149 def __init__(self,2150 domain,2151 polyline=None,2152 verbose=False):2153 2154 self.domain = domain2155 self.polyline = polyline2156 self.verbose = verbose2157 2158 # Find all intersections and associated triangles.2159 self.segments = self.domain.get_intersecting_segments(self.polyline,2160 use_cache=True,2161 verbose=self.verbose)2162 2163 # Get midpoints2164 self.midpoints = segment_midpoints(self.segments)2165 2166 # Make midpoints Geospatial instances2167 self.midpoints = ensure_geospatial(self.midpoints, self.domain.geo_reference)2168 2169 ##2170 # @brief set verbose mode2171 def set_verbose(self,verbose=True):2172 """Set verbose mode true or flase2173 """2174 2175 self.verbose=verbose2176 2177 ##2178 # @brief calculate current flow through cross section2179 def get_flow_through_cross_section(self):2180 """ Output: Total flow [m^3/s] across cross section.2181 """2182 2183 # Get interpolated values2184 xmomentum = self.domain.get_quantity('xmomentum')2185 ymomentum = self.domain.get_quantity('ymomentum')2186 2187 uh = xmomentum.get_values(interpolation_points=self.midpoints,2188 use_cache=True)2189 vh = ymomentum.get_values(interpolation_points=self.midpoints,2190 use_cache=True)2191 2192 # Compute and sum flows across each segment2193 total_flow = 02194 for i in range(len(uh)):2195 # Inner product of momentum vector with segment normal [m^2/s]2196 normal = self.segments[i].normal2197 normal_momentum = uh[i]*normal[0] + vh[i]*normal[1]2198 2199 # Flow across this segment [m^3/s]2200 segment_flow = normal_momentum*self.segments[i].length2201 2202 # Accumulate2203 total_flow += segment_flow2204 2205 return total_flow2206 2207 2208 ##2209 # @brief calculate current energy flow through cross section2210 def get_energy_through_cross_section(self, kind='total'):2211 """Obtain average energy head [m] across specified cross section.2212 2213 Output:2214 E: Average energy [m] across given segments for all stored times.2215 2216 The average velocity is computed for each triangle intersected by2217 the polyline and averaged weighted by segment lengths.2218 2219 The typical usage of this function would be to get average energy of2220 flow in a channel, and the polyline would then be a cross section2221 perpendicular to the flow.2222 2223 #FIXME (Ole) - need name for this energy reflecting that its dimension2224 is [m].2225 """2226 2227 from anuga.config import g, epsilon, velocity_protection as h02228 2229 # Get interpolated values2230 stage = self.domain.get_quantity('stage')2231 elevation = self.domain.get_quantity('elevation')2232 xmomentum = self.domain.get_quantity('xmomentum')2233 ymomentum = self.domain.get_quantity('ymomentum')2234 2235 w = stage.get_values(interpolation_points=self.midpoints, use_cache=True)2236 z = elevation.get_values(interpolation_points=self.midpoints, use_cache=True)2237 uh = xmomentum.get_values(interpolation_points=self.midpoints,2238 use_cache=True)2239 vh = ymomentum.get_values(interpolation_points=self.midpoints,2240 use_cache=True)2241 h = w-z # Depth2242 2243 # Compute total length of polyline for use with weighted averages2244 total_line_length = 0.02245 for segment in self.segments:2246 total_line_length += segment.length2247 2248 # Compute and sum flows across each segment2249 average_energy = 0.02250 for i in range(len(w)):2251 # Average velocity across this segment2252 if h[i] > epsilon:2253 # Use protection against degenerate velocities2254 u = uh[i]/(h[i] + h0/h[i])2255 v = vh[i]/(h[i] + h0/h[i])2256 else:2257 u = v = 0.02258 2259 speed_squared = u*u + v*v2260 kinetic_energy = 0.5*speed_squared/g2261 2262 if kind == 'specific':2263 segment_energy = h[i] + kinetic_energy2264 elif kind == 'total':2265 segment_energy = w[i] + kinetic_energy2266 else:2267 msg = 'Energy kind must be either "specific" or "total".'2268 msg += ' I got %s' %kind2269 2270 # Add to weighted average2271 weigth = self.segments[i].length/total_line_length2272 average_energy += segment_energy*weigth2273 2274 return average_energy2275 2276 2277 1422 2278 1423 ################################################################################ -
anuga_core/source/anuga/shallow_water/test_shallow_water_domain.py
r7731 r7733 5 5 from math import pi, sqrt 6 6 import tempfile 7 8 from anuga.shallow_water import Domain 7 9 8 10 from anuga.config import g, epsilon … … 14 16 from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross 15 17 from anuga.abstract_2d_finite_volumes.quantity import Quantity 18 from anuga.shallow_water.forcing import Inflow, Cross_section 19 from anuga.geospatial_data.geospatial_data import ensure_geospatial 16 20 17 21 from anuga.utilities.system_tools import get_pathname_from_package 18 22 19 from anuga.shallow_water import Domain20 23 from anuga.abstract_2d_finite_volumes.generic_boundary_conditions \ 21 24 import Dirichlet_boundary 22 from anuga.shallow_water. shallow_water_domainimport Rainfall, Wind_stress23 from anuga.shallow_water. shallow_water_domainimport Inflow, Cross_section25 from anuga.shallow_water.forcing import Rainfall, Wind_stress 26 from anuga.shallow_water.forcing import Inflow, Cross_section 24 27 from anuga.shallow_water.data_manager import get_flow_through_cross_section 25 28 … … 171 174 172 175 173 # Variable windfield implemented using functions174 def speed(t, x, y):175 """Large speeds halfway between center and edges176 177 Low speeds at center and edges178 """179 180 from math import exp, cos, pi181 182 x = num.array(x)183 y = num.array(y)184 185 N = len(x)186 s = 0*x #New array187 188 for k in range(N):189 r = num.sqrt(x[k]**2 + y[k]**2)190 factor = exp(-(r-0.15)**2)191 s[k] = 4000 * factor * (cos(t*2*pi/150) + 2)192 193 return s194 176 195 177 def scalar_func(t, x, y): … … 201 183 return 17.7 202 184 203 def scalar_func_list(t, x, y):204 """Function that returns a scalar.205 206 Used to test error message when numeric array is expected207 """208 209 return [17.7]210 211 def angle(t, x, y):212 """Rotating field213 """214 from math import atan, pi215 216 x = num.array(x)217 y = num.array(y)218 219 N = len(x)220 a = 0 * x # New array221 222 for k in range(N):223 r = num.sqrt(x[k]**2 + y[k]**2)224 225 angle = atan(y[k]/x[k])226 227 if x[k] < 0:228 angle += pi229 230 # Take normal direction231 angle -= pi/2232 233 # Ensure positive radians234 if angle < 0:235 angle += 2*pi236 237 a[k] = angle/pi*180238 239 return a240 185 241 186 … … 2235 2180 4*S) 2236 2181 2237 def test_constant_wind_stress(self): 2238 from anuga.config import rho_a, rho_w, eta_w 2239 from math import pi, cos, sin 2240 2241 a = [0.0, 0.0] 2242 b = [0.0, 2.0] 2243 c = [2.0, 0.0] 2244 d = [0.0, 4.0] 2245 e = [2.0, 2.0] 2246 f = [4.0, 0.0] 2247 2248 points = [a, b, c, d, e, f] 2249 # bac, bce, ecf, dbe 2250 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2251 2252 domain = Domain(points, vertices) 2253 2254 #Flat surface with 1m of water 2255 domain.set_quantity('elevation', 0) 2256 domain.set_quantity('stage', 1.0) 2257 domain.set_quantity('friction', 0) 2258 2259 Br = Reflective_boundary(domain) 2260 domain.set_boundary({'exterior': Br}) 2261 2262 #Setup only one forcing term, constant wind stress 2263 s = 100 2264 phi = 135 2265 domain.forcing_terms = [] 2266 domain.forcing_terms.append(Wind_stress(s, phi)) 2267 2268 domain.compute_forcing_terms() 2269 2270 const = eta_w*rho_a / rho_w 2271 2272 #Convert to radians 2273 phi = phi*pi / 180 2274 2275 #Compute velocity vector (u, v) 2276 u = s*cos(phi) 2277 v = s*sin(phi) 2278 2279 #Compute wind stress 2280 S = const * num.sqrt(u**2 + v**2) 2281 2282 assert num.allclose(domain.quantities['stage'].explicit_update, 0) 2283 assert num.allclose(domain.quantities['xmomentum'].explicit_update, S*u) 2284 assert num.allclose(domain.quantities['ymomentum'].explicit_update, S*v) 2285 2286 def test_variable_wind_stress(self): 2287 from anuga.config import rho_a, rho_w, eta_w 2288 from math import pi, cos, sin 2289 2290 a = [0.0, 0.0] 2291 b = [0.0, 2.0] 2292 c = [2.0, 0.0] 2293 d = [0.0, 4.0] 2294 e = [2.0, 2.0] 2295 f = [4.0, 0.0] 2296 2297 points = [a, b, c, d, e, f] 2298 # bac, bce, ecf, dbe 2299 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2300 2301 domain = Domain(points, vertices) 2302 2303 #Flat surface with 1m of water 2304 domain.set_quantity('elevation', 0) 2305 domain.set_quantity('stage', 1.0) 2306 domain.set_quantity('friction', 0) 2307 2308 Br = Reflective_boundary(domain) 2309 domain.set_boundary({'exterior': Br}) 2310 2311 domain.time = 5.54 # Take a random time (not zero) 2312 2313 #Setup only one forcing term, constant wind stress 2314 s = 100 2315 phi = 135 2316 domain.forcing_terms = [] 2317 domain.forcing_terms.append(Wind_stress(s=speed, phi=angle)) 2318 2319 domain.compute_forcing_terms() 2320 2321 #Compute reference solution 2322 const = eta_w*rho_a / rho_w 2323 2324 N = len(domain) # number_of_triangles 2325 2326 xc = domain.get_centroid_coordinates() 2327 t = domain.time 2328 2329 x = xc[:,0] 2330 y = xc[:,1] 2331 s_vec = speed(t,x,y) 2332 phi_vec = angle(t,x,y) 2333 2334 for k in range(N): 2335 # Convert to radians 2336 phi = phi_vec[k]*pi / 180 2337 s = s_vec[k] 2338 2339 # Compute velocity vector (u, v) 2340 u = s*cos(phi) 2341 v = s*sin(phi) 2342 2343 # Compute wind stress 2344 S = const * num.sqrt(u**2 + v**2) 2345 2346 assert num.allclose(domain.quantities['stage'].explicit_update[k], 2347 0) 2348 assert num.allclose(domain.quantities['xmomentum'].\ 2349 explicit_update[k], 2350 S*u) 2351 assert num.allclose(domain.quantities['ymomentum'].\ 2352 explicit_update[k], 2353 S*v) 2354 2355 def test_windfield_from_file(self): 2356 import time 2357 from anuga.config import rho_a, rho_w, eta_w 2358 from math import pi, cos, sin 2359 from anuga.config import time_format 2360 from anuga.abstract_2d_finite_volumes.util import file_function 2361 2362 a = [0.0, 0.0] 2363 b = [0.0, 2.0] 2364 c = [2.0, 0.0] 2365 d = [0.0, 4.0] 2366 e = [2.0, 2.0] 2367 f = [4.0, 0.0] 2368 2369 points = [a, b, c, d, e, f] 2370 # bac, bce, ecf, dbe 2371 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2372 2373 domain = Domain(points, vertices) 2374 2375 # Flat surface with 1m of water 2376 domain.set_quantity('elevation', 0) 2377 domain.set_quantity('stage', 1.0) 2378 domain.set_quantity('friction', 0) 2379 2380 Br = Reflective_boundary(domain) 2381 domain.set_boundary({'exterior': Br}) 2382 2383 domain.time = 7 # Take a time that is represented in file (not zero) 2384 2385 # Write wind stress file (ensure that domain.time is covered) 2386 # Take x=1 and y=0 2387 filename = 'test_windstress_from_file' 2388 start = time.mktime(time.strptime('2000', '%Y')) 2389 fid = open(filename + '.txt', 'w') 2390 dt = 1 # One second interval 2391 t = 0.0 2392 while t <= 10.0: 2393 t_string = time.strftime(time_format, time.gmtime(t+start)) 2394 2395 fid.write('%s, %f %f\n' % 2396 (t_string, speed(t,[1],[0])[0], angle(t,[1],[0])[0])) 2397 t += dt 2398 2399 fid.close() 2400 2401 # Convert ASCII file to NetCDF (Which is what we really like!) 2402 from data_manager import timefile2netcdf 2403 2404 timefile2netcdf(filename) 2405 os.remove(filename + '.txt') 2406 2407 # Setup wind stress 2408 F = file_function(filename + '.tms', 2409 quantities=['Attribute0', 'Attribute1']) 2410 os.remove(filename + '.tms') 2411 2412 W = Wind_stress(F) 2413 2414 domain.forcing_terms = [] 2415 domain.forcing_terms.append(W) 2416 2417 domain.compute_forcing_terms() 2418 2419 # Compute reference solution 2420 const = eta_w*rho_a / rho_w 2421 2422 N = len(domain) # number_of_triangles 2423 2424 t = domain.time 2425 2426 s = speed(t, [1], [0])[0] 2427 phi = angle(t, [1], [0])[0] 2428 2429 # Convert to radians 2430 phi = phi*pi / 180 2431 2432 # Compute velocity vector (u, v) 2433 u = s*cos(phi) 2434 v = s*sin(phi) 2435 2436 # Compute wind stress 2437 S = const * num.sqrt(u**2 + v**2) 2438 2439 for k in range(N): 2440 assert num.allclose(domain.quantities['stage'].explicit_update[k], 2441 0) 2442 assert num.allclose(domain.quantities['xmomentum'].\ 2443 explicit_update[k], 2444 S*u) 2445 assert num.allclose(domain.quantities['ymomentum'].\ 2446 explicit_update[k], 2447 S*v) 2448 2449 def test_windfield_from_file_seconds(self): 2450 import time 2451 from anuga.config import rho_a, rho_w, eta_w 2452 from math import pi, cos, sin 2453 from anuga.config import time_format 2454 from anuga.abstract_2d_finite_volumes.util import file_function 2455 2456 a = [0.0, 0.0] 2457 b = [0.0, 2.0] 2458 c = [2.0, 0.0] 2459 d = [0.0, 4.0] 2460 e = [2.0, 2.0] 2461 f = [4.0, 0.0] 2462 2463 points = [a, b, c, d, e, f] 2464 # bac, bce, ecf, dbe 2465 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2466 2467 domain = Domain(points, vertices) 2468 2469 # Flat surface with 1m of water 2470 domain.set_quantity('elevation', 0) 2471 domain.set_quantity('stage', 1.0) 2472 domain.set_quantity('friction', 0) 2473 2474 Br = Reflective_boundary(domain) 2475 domain.set_boundary({'exterior': Br}) 2476 2477 domain.time = 7 # Take a time that is represented in file (not zero) 2478 2479 # Write wind stress file (ensure that domain.time is covered) 2480 # Take x=1 and y=0 2481 filename = 'test_windstress_from_file' 2482 start = time.mktime(time.strptime('2000', '%Y')) 2483 fid = open(filename + '.txt', 'w') 2484 dt = 0.5 # Half second interval 2485 t = 0.0 2486 while t <= 10.0: 2487 fid.write('%s, %f %f\n' 2488 % (str(t), speed(t, [1], [0])[0], angle(t, [1], [0])[0])) 2489 t += dt 2490 2491 fid.close() 2492 2493 # Convert ASCII file to NetCDF (Which is what we really like!) 2494 from data_manager import timefile2netcdf 2495 2496 timefile2netcdf(filename, time_as_seconds=True) 2497 os.remove(filename + '.txt') 2498 2499 # Setup wind stress 2500 F = file_function(filename + '.tms', 2501 quantities=['Attribute0', 'Attribute1']) 2502 os.remove(filename + '.tms') 2503 2504 W = Wind_stress(F) 2505 2506 domain.forcing_terms = [] 2507 domain.forcing_terms.append(W) 2508 2509 domain.compute_forcing_terms() 2510 2511 # Compute reference solution 2512 const = eta_w*rho_a / rho_w 2513 2514 N = len(domain) # number_of_triangles 2515 2516 t = domain.time 2517 2518 s = speed(t, [1], [0])[0] 2519 phi = angle(t, [1], [0])[0] 2520 2521 # Convert to radians 2522 phi = phi*pi / 180 2523 2524 # Compute velocity vector (u, v) 2525 u = s*cos(phi) 2526 v = s*sin(phi) 2527 2528 # Compute wind stress 2529 S = const * num.sqrt(u**2 + v**2) 2530 2531 for k in range(N): 2532 assert num.allclose(domain.quantities['stage'].explicit_update[k], 2533 0) 2534 assert num.allclose(domain.quantities['xmomentum'].\ 2535 explicit_update[k], 2536 S*u) 2537 assert num.allclose(domain.quantities['ymomentum'].\ 2538 explicit_update[k], 2539 S*v) 2540 2541 def test_wind_stress_error_condition(self): 2542 """Test that windstress reacts properly when forcing functions 2543 are wrong - e.g. returns a scalar 2544 """ 2545 2546 from math import pi, cos, sin 2547 from anuga.config import rho_a, rho_w, eta_w 2548 2549 a = [0.0, 0.0] 2550 b = [0.0, 2.0] 2551 c = [2.0, 0.0] 2552 d = [0.0, 4.0] 2553 e = [2.0, 2.0] 2554 f = [4.0, 0.0] 2555 2556 points = [a, b, c, d, e, f] 2557 # bac, bce, ecf, dbe 2558 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2559 2560 domain = Domain(points, vertices) 2561 2562 # Flat surface with 1m of water 2563 domain.set_quantity('elevation', 0) 2564 domain.set_quantity('stage', 1.0) 2565 domain.set_quantity('friction', 0) 2566 2567 Br = Reflective_boundary(domain) 2568 domain.set_boundary({'exterior': Br}) 2569 2570 domain.time = 5.54 # Take a random time (not zero) 2571 2572 # Setup only one forcing term, bad func 2573 domain.forcing_terms = [] 2574 2575 try: 2576 domain.forcing_terms.append(Wind_stress(s=scalar_func_list, 2577 phi=angle)) 2578 except AssertionError: 2579 pass 2580 else: 2581 msg = 'Should have raised exception' 2582 raise Exception, msg 2583 2584 try: 2585 domain.forcing_terms.append(Wind_stress(s=speed, phi=scalar_func)) 2586 except Exception: 2587 pass 2588 else: 2589 msg = 'Should have raised exception' 2590 raise Exception, msg 2591 2592 try: 2593 domain.forcing_terms.append(Wind_stress(s=speed, phi='xx')) 2594 except: 2595 pass 2596 else: 2597 msg = 'Should have raised exception' 2598 raise Exception, msg 2599 2600 def test_rainfall(self): 2601 from math import pi, cos, sin 2602 2603 a = [0.0, 0.0] 2604 b = [0.0, 2.0] 2605 c = [2.0, 0.0] 2606 d = [0.0, 4.0] 2607 e = [2.0, 2.0] 2608 f = [4.0, 0.0] 2609 2610 points = [a, b, c, d, e, f] 2611 # bac, bce, ecf, dbe 2612 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2613 2614 domain = Domain(points, vertices) 2615 2616 # Flat surface with 1m of water 2617 domain.set_quantity('elevation', 0) 2618 domain.set_quantity('stage', 1.0) 2619 domain.set_quantity('friction', 0) 2620 2621 Br = Reflective_boundary(domain) 2622 domain.set_boundary({'exterior': Br}) 2623 2624 # Setup only one forcing term, constant rainfall 2625 domain.forcing_terms = [] 2626 domain.forcing_terms.append(Rainfall(domain, rate=2.0)) 2627 2628 domain.compute_forcing_terms() 2629 assert num.allclose(domain.quantities['stage'].explicit_update, 2630 2.0/1000) 2631 2632 def test_rainfall_restricted_by_polygon(self): 2633 from math import pi, cos, sin 2634 2635 a = [0.0, 0.0] 2636 b = [0.0, 2.0] 2637 c = [2.0, 0.0] 2638 d = [0.0, 4.0] 2639 e = [2.0, 2.0] 2640 f = [4.0, 0.0] 2641 2642 points = [a, b, c, d, e, f] 2643 # bac, bce, ecf, dbe 2644 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2645 2646 domain = Domain(points, vertices) 2647 2648 # Flat surface with 1m of water 2649 domain.set_quantity('elevation', 0) 2650 domain.set_quantity('stage', 1.0) 2651 domain.set_quantity('friction', 0) 2652 2653 Br = Reflective_boundary(domain) 2654 domain.set_boundary({'exterior': Br}) 2655 2656 # Setup only one forcing term, constant rainfall 2657 # restricted to a polygon enclosing triangle #1 (bce) 2658 domain.forcing_terms = [] 2659 R = Rainfall(domain, rate=2.0, polygon=[[1,1], [2,1], [2,2], [1,2]]) 2660 2661 assert num.allclose(R.exchange_area, 2) 2662 2663 domain.forcing_terms.append(R) 2664 2665 domain.compute_forcing_terms() 2666 2667 assert num.allclose(domain.quantities['stage'].explicit_update[1], 2668 2.0/1000) 2669 assert num.allclose(domain.quantities['stage'].explicit_update[0], 0) 2670 assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) 2671 2672 def test_time_dependent_rainfall_restricted_by_polygon(self): 2673 a = [0.0, 0.0] 2674 b = [0.0, 2.0] 2675 c = [2.0, 0.0] 2676 d = [0.0, 4.0] 2677 e = [2.0, 2.0] 2678 f = [4.0, 0.0] 2679 2680 points = [a, b, c, d, e, f] 2681 # bac, bce, ecf, dbe 2682 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2683 2684 domain = Domain(points, vertices) 2685 2686 # Flat surface with 1m of water 2687 domain.set_quantity('elevation', 0) 2688 domain.set_quantity('stage', 1.0) 2689 domain.set_quantity('friction', 0) 2690 2691 Br = Reflective_boundary(domain) 2692 domain.set_boundary({'exterior': Br}) 2693 2694 # Setup only one forcing term, time dependent rainfall 2695 # restricted to a polygon enclosing triangle #1 (bce) 2696 domain.forcing_terms = [] 2697 R = Rainfall(domain, 2698 rate=lambda t: 3*t + 7, 2699 polygon = [[1,1], [2,1], [2,2], [1,2]]) 2700 2701 assert num.allclose(R.exchange_area, 2) 2702 2703 domain.forcing_terms.append(R) 2704 2705 domain.time = 10. 2706 2707 domain.compute_forcing_terms() 2708 2709 assert num.allclose(domain.quantities['stage'].explicit_update[1], 2710 (3*domain.time + 7)/1000) 2711 assert num.allclose(domain.quantities['stage'].explicit_update[0], 0) 2712 assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) 2713 2714 def test_time_dependent_rainfall_using_starttime(self): 2715 rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float) 2716 2717 a = [0.0, 0.0] 2718 b = [0.0, 2.0] 2719 c = [2.0, 0.0] 2720 d = [0.0, 4.0] 2721 e = [2.0, 2.0] 2722 f = [4.0, 0.0] 2723 2724 points = [a, b, c, d, e, f] 2725 # bac, bce, ecf, dbe 2726 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2727 2728 domain = Domain(points, vertices) 2729 2730 # Flat surface with 1m of water 2731 domain.set_quantity('elevation', 0) 2732 domain.set_quantity('stage', 1.0) 2733 domain.set_quantity('friction', 0) 2734 2735 Br = Reflective_boundary(domain) 2736 domain.set_boundary({'exterior': Br}) 2737 2738 # Setup only one forcing term, time dependent rainfall 2739 # restricted to a polygon enclosing triangle #1 (bce) 2740 domain.forcing_terms = [] 2741 R = Rainfall(domain, 2742 rate=lambda t: 3*t + 7, 2743 polygon=rainfall_poly) 2744 2745 assert num.allclose(R.exchange_area, 2) 2746 2747 domain.forcing_terms.append(R) 2748 2749 # This will test that time used in the forcing function takes 2750 # startime into account. 2751 domain.starttime = 5.0 2752 2753 domain.time = 7. 2754 2755 domain.compute_forcing_terms() 2756 2757 assert num.allclose(domain.quantities['stage'].explicit_update[1], 2758 (3*domain.get_time() + 7)/1000) 2759 assert num.allclose(domain.quantities['stage'].explicit_update[1], 2760 (3*(domain.time + domain.starttime) + 7)/1000) 2761 2762 # Using internal time her should fail 2763 assert not num.allclose(domain.quantities['stage'].explicit_update[1], 2764 (3*domain.time + 7)/1000) 2765 2766 assert num.allclose(domain.quantities['stage'].explicit_update[0], 0) 2767 assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) 2768 2769 def test_time_dependent_rainfall_using_georef(self): 2770 """test_time_dependent_rainfall_using_georef 2771 2772 This will also test the General forcing term using georef 2773 """ 2774 2775 # Mesh in zone 56 (absolute coords) 2776 x0 = 314036.58727982 2777 y0 = 6224951.2960092 2778 2779 rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float) 2780 rainfall_poly += [x0, y0] 2781 2782 a = [0.0, 0.0] 2783 b = [0.0, 2.0] 2784 c = [2.0, 0.0] 2785 d = [0.0, 4.0] 2786 e = [2.0, 2.0] 2787 f = [4.0, 0.0] 2788 2789 points = [a, b, c, d, e, f] 2790 # bac, bce, ecf, dbe 2791 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2792 2793 domain = Domain(points, vertices, 2794 geo_reference=Geo_reference(56, x0, y0)) 2795 2796 # Flat surface with 1m of water 2797 domain.set_quantity('elevation', 0) 2798 domain.set_quantity('stage', 1.0) 2799 domain.set_quantity('friction', 0) 2800 2801 Br = Reflective_boundary(domain) 2802 domain.set_boundary({'exterior': Br}) 2803 2804 # Setup only one forcing term, time dependent rainfall 2805 # restricted to a polygon enclosing triangle #1 (bce) 2806 domain.forcing_terms = [] 2807 R = Rainfall(domain, 2808 rate=lambda t: 3*t + 7, 2809 polygon=rainfall_poly) 2810 2811 assert num.allclose(R.exchange_area, 2) 2812 2813 domain.forcing_terms.append(R) 2814 2815 # This will test that time used in the forcing function takes 2816 # startime into account. 2817 domain.starttime = 5.0 2818 2819 domain.time = 7. 2820 2821 domain.compute_forcing_terms() 2822 2823 assert num.allclose(domain.quantities['stage'].explicit_update[1], 2824 (3*domain.get_time() + 7)/1000) 2825 assert num.allclose(domain.quantities['stage'].explicit_update[1], 2826 (3*(domain.time + domain.starttime) + 7)/1000) 2827 2828 # Using internal time her should fail 2829 assert not num.allclose(domain.quantities['stage'].explicit_update[1], 2830 (3*domain.time + 7)/1000) 2831 2832 assert num.allclose(domain.quantities['stage'].explicit_update[0], 0) 2833 assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) 2834 2835 def test_time_dependent_rainfall_restricted_by_polygon_with_default(self): 2836 """ 2837 Test that default rainfall can be used when given rate runs out of data. 2838 """ 2839 2840 a = [0.0, 0.0] 2841 b = [0.0, 2.0] 2842 c = [2.0, 0.0] 2843 d = [0.0, 4.0] 2844 e = [2.0, 2.0] 2845 f = [4.0, 0.0] 2846 2847 points = [a, b, c, d, e, f] 2848 # bac, bce, ecf, dbe 2849 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2850 2851 domain = Domain(points, vertices) 2852 2853 # Flat surface with 1m of water 2854 domain.set_quantity('elevation', 0) 2855 domain.set_quantity('stage', 1.0) 2856 domain.set_quantity('friction', 0) 2857 2858 Br = Reflective_boundary(domain) 2859 domain.set_boundary({'exterior': Br}) 2860 2861 # Setup only one forcing term, time dependent rainfall 2862 # that expires at t==20 2863 from anuga.fit_interpolate.interpolate import Modeltime_too_late 2864 2865 def main_rate(t): 2866 if t > 20: 2867 msg = 'Model time exceeded.' 2868 raise Modeltime_too_late, msg 2869 else: 2870 return 3*t + 7 2871 2872 domain.forcing_terms = [] 2873 R = Rainfall(domain, 2874 rate=main_rate, 2875 polygon = [[1,1], [2,1], [2,2], [1,2]], 2876 default_rate=5.0) 2877 2878 assert num.allclose(R.exchange_area, 2) 2879 2880 domain.forcing_terms.append(R) 2881 2882 domain.time = 10. 2883 2884 domain.compute_forcing_terms() 2885 2886 assert num.allclose(domain.quantities['stage'].explicit_update[1], 2887 (3*domain.time+7)/1000) 2888 assert num.allclose(domain.quantities['stage'].explicit_update[0], 0) 2889 assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) 2890 2891 domain.time = 100. 2892 domain.quantities['stage'].explicit_update[:] = 0.0 # Reset 2893 domain.compute_forcing_terms() 2894 2895 assert num.allclose(domain.quantities['stage'].explicit_update[1], 2896 5.0/1000) # Default value 2897 assert num.allclose(domain.quantities['stage'].explicit_update[0], 0) 2898 assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) 2899 2900 def test_rainfall_forcing_with_evolve(self): 2901 """test_rainfall_forcing_with_evolve 2902 2903 Test how forcing terms are called within evolve 2904 """ 2905 2906 # FIXME(Ole): This test is just to experiment 2907 2908 a = [0.0, 0.0] 2909 b = [0.0, 2.0] 2910 c = [2.0, 0.0] 2911 d = [0.0, 4.0] 2912 e = [2.0, 2.0] 2913 f = [4.0, 0.0] 2914 2915 points = [a, b, c, d, e, f] 2916 # bac, bce, ecf, dbe 2917 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2918 2919 domain = Domain(points, vertices) 2920 2921 # Flat surface with 1m of water 2922 domain.set_quantity('elevation', 0) 2923 domain.set_quantity('stage', 1.0) 2924 domain.set_quantity('friction', 0) 2925 2926 Br = Reflective_boundary(domain) 2927 domain.set_boundary({'exterior': Br}) 2928 2929 # Setup only one forcing term, time dependent rainfall 2930 # that expires at t==20 2931 from anuga.fit_interpolate.interpolate import Modeltime_too_late 2932 2933 def main_rate(t): 2934 if t > 20: 2935 msg = 'Model time exceeded.' 2936 raise Modeltime_too_late, msg 2937 else: 2938 return 3*t + 7 2939 2940 domain.forcing_terms = [] 2941 R = Rainfall(domain, 2942 rate=main_rate, 2943 polygon=[[1,1], [2,1], [2,2], [1,2]], 2944 default_rate=5.0) 2945 2946 assert num.allclose(R.exchange_area, 2) 2947 2948 domain.forcing_terms.append(R) 2949 2950 for t in domain.evolve(yieldstep=1, finaltime=25): 2951 pass 2952 #FIXME(Ole): A test here is hard because explicit_update also 2953 # receives updates from the flux calculation. 2954 2955 2956 def test_rainfall_forcing_with_evolve_1(self): 2957 """test_rainfall_forcing_with_evolve 2958 2959 Test how forcing terms are called within evolve. 2960 This test checks that proper exception is thrown when no default_rate is set 2961 """ 2962 2963 2964 a = [0.0, 0.0] 2965 b = [0.0, 2.0] 2966 c = [2.0, 0.0] 2967 d = [0.0, 4.0] 2968 e = [2.0, 2.0] 2969 f = [4.0, 0.0] 2970 2971 points = [a, b, c, d, e, f] 2972 # bac, bce, ecf, dbe 2973 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2974 2975 domain = Domain(points, vertices) 2976 2977 # Flat surface with 1m of water 2978 domain.set_quantity('elevation', 0) 2979 domain.set_quantity('stage', 1.0) 2980 domain.set_quantity('friction', 0) 2981 2982 Br = Reflective_boundary(domain) 2983 domain.set_boundary({'exterior': Br}) 2984 2985 # Setup only one forcing term, time dependent rainfall 2986 # that expires at t==20 2987 from anuga.fit_interpolate.interpolate import Modeltime_too_late 2988 2989 def main_rate(t): 2990 if t > 20: 2991 msg = 'Model time exceeded.' 2992 raise Modeltime_too_late, msg 2993 else: 2994 return 3*t + 7 2995 2996 domain.forcing_terms = [] 2997 R = Rainfall(domain, 2998 rate=main_rate, 2999 polygon=[[1,1], [2,1], [2,2], [1,2]]) 3000 3001 3002 assert num.allclose(R.exchange_area, 2) 3003 3004 domain.forcing_terms.append(R) 3005 #for t in domain.evolve(yieldstep=1, finaltime=25): 3006 # pass 3007 3008 try: 3009 for t in domain.evolve(yieldstep=1, finaltime=25): 3010 pass 3011 except Modeltime_too_late, e: 3012 # Test that error message is as expected 3013 assert 'can specify keyword argument default_rate in the forcing function' in str(e) 3014 else: 3015 raise Exception, 'Should have raised exception' 3016 3017 2182 3018 2183 3019 2184 def test_inflow_using_circle(self): … … 6616 5781 import rectangular_cross 6617 5782 from anuga.shallow_water import Domain 6618 from anuga.shallow_water. shallow_water_domainimport Inflow5783 from anuga.shallow_water.forcing import Inflow 6619 5784 from anuga.shallow_water.data_manager \ 6620 5785 import get_flow_through_cross_section … … 6707 5872 from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross 6708 5873 from anuga.shallow_water import Domain 6709 from anuga.shallow_water.shallow_water_domain import Inflow6710 5874 from anuga.shallow_water.data_manager import get_flow_through_cross_section 6711 5875 … … 7276 6440 #---------------------------------------------------------------------- 7277 6441 from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross 7278 from anuga.shallow_water import Domain7279 from anuga.shallow_water.shallow_water_domain import Reflective_boundary7280 from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary7281 from anuga.shallow_water.shallow_water_domain import Inflow_boundary7282 6442 from anuga.shallow_water.data_manager import get_flow_through_cross_section 7283 6443 from anuga.abstract_2d_finite_volumes.util import sww2csv_gauges, csv2timeseries_graphs … … 7416 6576 from anuga.shallow_water.shallow_water_domain import Reflective_boundary 7417 6577 from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary 7418 from anuga.shallow_water. shallow_water_domainimport Inflow6578 from anuga.shallow_water.forcing import Inflow 7419 6579 from anuga.shallow_water.data_manager \ 7420 6580 import get_flow_through_cross_section -
anuga_core/source/anuga/shallow_water_balanced/test_swb_conservation.py
r7573 r7733 458 458 from anuga.shallow_water.shallow_water_domain import Reflective_boundary 459 459 from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary 460 from anuga.shallow_water. shallow_water_domainimport Inflow460 from anuga.shallow_water.forcing import Inflow 461 461 from anuga.shallow_water.data_manager \ 462 462 import get_flow_through_cross_section … … 551 551 from anuga.shallow_water.shallow_water_domain import Reflective_boundary 552 552 from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary 553 from anuga.shallow_water. shallow_water_domainimport Inflow553 from anuga.shallow_water.forcing import Inflow 554 554 from anuga.shallow_water.data_manager import get_flow_through_cross_section 555 555 -
anuga_core/source/anuga/shallow_water_balanced/test_swb_forcing_terms.py
r7562 r7733 1362 1362 from anuga.shallow_water.shallow_water_domain import Reflective_boundary 1363 1363 from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary 1364 from anuga.shallow_water. shallow_water_domainimport Inflow1364 from anuga.shallow_water.forcing import Inflow 1365 1365 from anuga.shallow_water.data_manager \ 1366 1366 import get_flow_through_cross_section … … 1521 1521 from anuga.shallow_water.shallow_water_domain import Reflective_boundary 1522 1522 from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary 1523 from anuga.shallow_water. shallow_water_domainimport Inflow_boundary1523 from anuga.shallow_water.forcing import Inflow_boundary 1524 1524 from anuga.shallow_water.data_manager import get_flow_through_cross_section 1525 1525 from anuga.abstract_2d_finite_volumes.util import sww2csv_gauges, csv2timeseries_graphs … … 1658 1658 from anuga.shallow_water.shallow_water_domain import Reflective_boundary 1659 1659 from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary 1660 from anuga.shallow_water. shallow_water_domainimport Inflow1660 from anuga.shallow_water.forcing import Inflow 1661 1661 from anuga.shallow_water.data_manager \ 1662 1662 import get_flow_through_cross_section
Note: See TracChangeset
for help on using the changeset viewer.