Changeset 7770


Ignore:
Timestamp:
Jun 2, 2010, 5:17:47 PM (15 years ago)
Author:
James Hudson
Message:

Broke up data_manager into more modules - some failing tests.

Location:
trunk/anuga_core
Files:
3 added
11 edited

Legend:

Unmodified
Added
Removed
  • trunk/anuga_core/documentation/user_manual/demos/channel1.py

    r7086 r7770  
    77# Import necessary modules
    88#------------------------------------------------------------------------------
     9
     10# Import standard shallow water domain and standard boundaries.
     11import anuga
     12
     13# Import rectangular cross mesh generator
    914from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross
    10 from anuga.shallow_water import Domain
    11 from anuga.shallow_water import Reflective_boundary
    12 from anuga.shallow_water import Dirichlet_boundary
     15
    1316
    1417#------------------------------------------------------------------------------
     
    1821                                               len1=10.0, len2=5.0) # Mesh
    1922
    20 domain = Domain(points, vertices, boundary)  # Create domain
     23domain = anuga.Domain(points, vertices, boundary)  # Create domain
    2124domain.set_name('channel1')                  # Output name
    2225
     
    3538# Setup boundary conditions
    3639#------------------------------------------------------------------------------
    37 Bi = Dirichlet_boundary([0.4, 0, 0])         # Inflow
    38 Br = Reflective_boundary(domain)             # Solid reflective wall
     40Bi = anuga.Dirichlet_boundary([0.4, 0, 0])         # Inflow
     41Br = anuga.Reflective_boundary(domain)             # Solid reflective wall
    3942
    4043domain.set_boundary({'left': Bi, 'right': Br, 'top': Br, 'bottom': Br})
  • trunk/anuga_core/source/anuga/file/sww.py

    r7767 r7770  
    11961196
    11971197##
     1198# @brief Get mesh and quantity data from an SWW file.
     1199# @param filename Path to data file to read.
     1200# @param quantities UNUSED!
     1201# @param verbose True if this function is to be verbose.
     1202# @return (mesh, quantities, time) where mesh is the mesh data, quantities is
     1203#         a dictionary of {name: value}, and time is the time vector.
     1204# @note Quantities extracted: 'elevation', 'stage', 'xmomentum' and 'ymomentum'
     1205def get_mesh_and_quantities_from_file(filename,
     1206                                      quantities=None,
     1207                                      verbose=False):
     1208    """Get and rebuild mesh structure and associated quantities from sww file
     1209
     1210    Input:
     1211        filename - Name os sww file
     1212        quantities - Names of quantities to load
     1213
     1214    Output:
     1215        mesh - instance of class Interpolate
     1216               (including mesh and interpolation functionality)
     1217        quantities - arrays with quantity values at each mesh node
     1218        time - vector of stored timesteps
     1219
     1220    This function is used by e.g.:
     1221        get_interpolated_quantities_at_polyline_midpoints
     1222    """
     1223
     1224    # FIXME (Ole): Maybe refactor filefunction using this more fundamental code.
     1225
     1226    import types
     1227    from Scientific.IO.NetCDF import NetCDFFile
     1228    from shallow_water import Domain
     1229    from anuga.abstract_2d_finite_volumes.neighbour_mesh import Mesh
     1230
     1231    if verbose: log.critical('Reading from %s' % filename)
     1232
     1233    fid = NetCDFFile(filename, netcdf_mode_r)    # Open existing file for read
     1234    time = fid.variables['time'][:]    # Time vector
     1235    time += fid.starttime[0]
     1236
     1237    # Get the variables as numeric arrays
     1238    x = fid.variables['x'][:]                   # x-coordinates of nodes
     1239    y = fid.variables['y'][:]                   # y-coordinates of nodes
     1240    elevation = fid.variables['elevation'][:]   # Elevation
     1241    stage = fid.variables['stage'][:]           # Water level
     1242    xmomentum = fid.variables['xmomentum'][:]   # Momentum in the x-direction
     1243    ymomentum = fid.variables['ymomentum'][:]   # Momentum in the y-direction
     1244
     1245    # Mesh (nodes (Mx2), triangles (Nx3))
     1246    nodes = num.concatenate((x[:,num.newaxis], y[:,num.newaxis]), axis=1)
     1247    triangles = fid.variables['volumes'][:]
     1248
     1249    # Get geo_reference
     1250    try:
     1251        geo_reference = Geo_reference(NetCDFObject=fid)
     1252    except: #AttributeError, e:
     1253        # Sww files don't have to have a geo_ref
     1254        geo_reference = None
     1255
     1256    if verbose: log.critical('    building mesh from sww file %s' % filename)
     1257
     1258    boundary = None
     1259
     1260    #FIXME (Peter Row): Should this be in mesh?
     1261    if fid.smoothing != 'Yes':
     1262        nodes = nodes.tolist()
     1263        triangles = triangles.tolist()
     1264        nodes, triangles, boundary = weed(nodes, triangles, boundary)
     1265
     1266    try:
     1267        mesh = Mesh(nodes, triangles, boundary, geo_reference=geo_reference)
     1268    except AssertionError, e:
     1269        fid.close()
     1270        msg = 'Domain could not be created: %s. "' % e
     1271        raise DataDomainError, msg
     1272
     1273    quantities = {}
     1274    quantities['elevation'] = elevation
     1275    quantities['stage'] = stage
     1276    quantities['xmomentum'] = xmomentum
     1277    quantities['ymomentum'] = ymomentum
     1278
     1279    fid.close()
     1280
     1281    return mesh, quantities, time
     1282
     1283
     1284
     1285##
    11981286# @brief
    11991287# @parm time
     
    12511339
    12521340
    1253 
    1254 
    12551341##
    12561342# @brief
     
    13101396
    13111397    return coordinates, volumes, boundary
     1398
     1399
     1400
  • trunk/anuga_core/source/anuga/file/test_sww.py

    r7767 r7770  
    88from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular
    99from anuga.shallow_water.shallow_water_domain import Domain
    10 from sww import load_sww_as_domain
     10from sww import load_sww_as_domain, weed, get_mesh_and_quantities_from_file
    1111
    1212# boundary functions
     
    158158
    159159
     160    def test_get_mesh_and_quantities_from_sww_file(self):
     161        """test_get_mesh_and_quantities_from_sww_file(self):
     162        """     
     163       
     164        # Generate a test sww file with non trivial georeference
     165       
     166        import time, os
     167        from Scientific.IO.NetCDF import NetCDFFile
     168
     169        # Setup
     170        from mesh_factory import rectangular
     171
     172        # Create basic mesh (100m x 5m)
     173        width = 5
     174        length = 50
     175        t_end = 10
     176        points, vertices, boundary = rectangular(length, width, 50, 5)
     177
     178        # Create shallow water domain
     179        domain = Domain(points, vertices, boundary,
     180                        geo_reference = Geo_reference(56,308500,6189000))
     181
     182        domain.set_name('flowtest')
     183        swwfile = domain.get_name() + '.sww'
     184        domain.set_datadir('.')
     185
     186        Br = Reflective_boundary(domain)    # Side walls
     187        Bd = Dirichlet_boundary([1, 0, 0])  # inflow
     188
     189        domain.set_boundary( {'left': Bd, 'right': Bd, 'top': Br, 'bottom': Br})
     190
     191        for t in domain.evolve(yieldstep=1, finaltime = t_end):
     192            pass
     193
     194       
     195        # Read it
     196
     197        # Get mesh and quantities from sww file
     198        X = get_mesh_and_quantities_from_file(swwfile,
     199                                              quantities=['elevation',
     200                                                          'stage',
     201                                                          'xmomentum',
     202                                                          'ymomentum'],
     203                                              verbose=False)
     204        mesh, quantities, time = X
     205       
     206
     207        # Check that mesh has been recovered
     208        assert num.alltrue(mesh.triangles == domain.get_triangles())
     209        assert num.allclose(mesh.nodes, domain.get_nodes())
     210
     211        # Check that time has been recovered
     212        assert num.allclose(time, range(t_end+1))
     213
     214        # Check that quantities have been recovered
     215        # (sww files use single precision)
     216        z=domain.get_quantity('elevation').get_values(location='unique vertices')
     217        assert num.allclose(quantities['elevation'], z)
     218
     219        for q in ['stage', 'xmomentum', 'ymomentum']:
     220            # Get quantity at last timestep
     221            q_ref=domain.get_quantity(q).get_values(location='unique vertices')
     222
     223            #print q,quantities[q]
     224            q_sww=quantities[q][-1,:]
     225
     226            msg = 'Quantity %s failed to be recovered' %q
     227            assert num.allclose(q_ref, q_sww, atol=1.0e-6), msg
     228           
     229        # Cleanup
     230        os.remove(swwfile)
     231       
     232       
     233
     234    def test_weed(self):
     235        coordinates1 = [[0.,0.],[1.,0.],[1.,1.],[1.,0.],[2.,0.],[1.,1.]]
     236        volumes1 = [[0,1,2],[3,4,5]]
     237        boundary1= {(0,1): 'external',(1,2): 'not external',(2,0): 'external',(3,4): 'external',(4,5): 'external',(5,3): 'not external'}
     238        coordinates2,volumes2,boundary2=weed(coordinates1,volumes1,boundary1)
     239
     240        points2 = {(0.,0.):None,(1.,0.):None,(1.,1.):None,(2.,0.):None}
     241
     242        assert len(points2)==len(coordinates2)
     243        for i in range(len(coordinates2)):
     244            coordinate = tuple(coordinates2[i])
     245            assert points2.has_key(coordinate)
     246            points2[coordinate]=i
     247
     248        for triangle in volumes1:
     249            for coordinate in triangle:
     250                assert coordinates2[points2[tuple(coordinates1[coordinate])]][0]==coordinates1[coordinate][0]
     251                assert coordinates2[points2[tuple(coordinates1[coordinate])]][1]==coordinates1[coordinate][1]
     252
    160253#################################################################################
    161254
  • trunk/anuga_core/source/anuga/file_conversion/file_conversion.py

    r7765 r7770  
    488488
    489489
     490##
     491# @brief
     492# @param filename
     493# @param x
     494# @param y
     495# @param z
     496def write_obj(filename, x, y, z):
     497    """Store x,y,z vectors into filename (obj format).
     498
     499       Vectors are assumed to have dimension (M,3) where
     500       M corresponds to the number elements.
     501       triangles are assumed to be disconnected
     502
     503       The three numbers in each vector correspond to three vertices,
     504
     505       e.g. the x coordinate of vertex 1 of element i is in x[i,1]
     506    """
     507
     508    import os.path
     509
     510    root, ext = os.path.splitext(filename)
     511    if ext == '.obj':
     512        FN = filename
     513    else:
     514        FN = filename + '.obj'
     515
     516    outfile = open(FN, 'wb')
     517    outfile.write("# Triangulation as an obj file\n")
     518
     519    M, N = x.shape
     520    assert N == 3  #Assuming three vertices per element
     521
     522    for i in range(M):
     523        for j in range(N):
     524            outfile.write("v %f %f %f\n" % (x[i,j], y[i,j], z[i,j]))
     525
     526    for i in range(M):
     527        base = i * N
     528        outfile.write("f %d %d %d\n" % (base+1, base+2, base+3))
     529
     530    outfile.close()
     531
  • trunk/anuga_core/source/anuga/file_conversion/test_file_conversion.py

    r7758 r7770  
    88from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular
    99from anuga.shallow_water.sww_file import SWW_file
    10 from data_manager import extent_sww
     10from anuga.file.sww import extent_sww
    1111from anuga.config import netcdf_float, epsilon, g
    1212from Scientific.IO.NetCDF import NetCDFFile
    1313from anuga.file_conversion.file_conversion import tsh2sww, \
    1414                        pmesh_to_domain_instance
     15
     16from anuga.file.mux import WAVEHEIGHT_MUX_LABEL, EAST_VELOCITY_LABEL, \
     17                            NORTH_VELOCITY_LABEL
    1518
    1619import sys
     
    15701573        #print "sww_file",sww_file
    15711574        #dem_file = tempfile.mktemp(".dem")
    1572         domain = sww2domain(sww_file) ###, dem_file)
     1575        domain = load_sww_as_domain(sww_file) ###, dem_file)
    15731576        domain.check_integrity()
    15741577
  • trunk/anuga_core/source/anuga/file_conversion/urs2sww.py

    r7758 r7770  
     1import numpy as num
     2from Scientific.IO.NetCDF import NetCDFFile
     3
     4from anuga.file.urs import Read_urs
     5
     6from anuga.coordinate_transforms.redfearn import redfearn, \
     7     convert_from_latlon_to_utm
     8
     9from anuga.geospatial_data.geospatial_data import ensure_absolute, \
     10                                                    Geospatial_data
     11
     12from mux import WAVEHEIGHT_MUX_LABEL, EAST_VELOCITY_LABEL, \
     13                            NORTH_VELOCITY_LABEL
     14                           
     15from anuga.utilities.numerical_tools import ensure_numeric                           
     16
     17from anuga.config import netcdf_mode_r, netcdf_mode_w, netcdf_mode_a, \
     18                            netcdf_float
     19
     20from sww_file import Read_sww, Write_sww
     21
     22
    123################################################################################
    224# CONVERTING UNGRIDDED URS DATA TO AN SWW FILE
    325################################################################################
    4 
    5 WAVEHEIGHT_MUX_LABEL = '-z-mux'
    6 EAST_VELOCITY_LABEL =  '-e-mux'
    7 NORTH_VELOCITY_LABEL =  '-n-mux'
    826
    927##
     
    95113    mux = {}
    96114    for quantity, file in map(None, quantities, files_in):
    97         mux[quantity] = Urs_points(file)
     115        mux[quantity] = Read_urs(file)
    98116
    99117    # Could check that the depth is the same. (hashing)
     
    143161    for i in range(a_mux.time_step_count):
    144162        mux_times.append(a_mux.time_step * i)
    145     (mux_times_start_i, mux_times_fin_i) = mux2sww_time(mux_times, mint, maxt)
     163    (mux_times_start_i, mux_times_fin_i) = read_time_from_mux(mux_times, mint, maxt)
    146164    times = mux_times[mux_times_start_i:mux_times_fin_i]
    147165
     
    209227
    210228    # Do some conversions while writing the sww file
     229
     230
     231def read_time_from_mux(mux_times, mint, maxt):
     232    """
     233        Read a list of mux times.
     234        Return start and finish times which lie within the passed time period.
     235    """
     236
     237    if mint == None:
     238        mux_times_start_i = 0
     239    else:
     240        mux_times_start_i = num.searchsorted(mux_times, mint)
     241
     242    if maxt == None:
     243        mux_times_fin_i = len(mux_times)
     244    else:
     245        maxt += 0.5 # so if you specify a time where there is
     246                    # data that time will be included
     247        mux_times_fin_i = num.searchsorted(mux_times, maxt)
     248
     249    return mux_times_start_i, mux_times_fin_i
     250
  • trunk/anuga_core/source/anuga/file_conversion/urs2txt.py

    r7758 r7770  
    1 
     1from anuga.file.urs import Read_urs
    22
    33##
     
    2020    mux = {}
    2121    for quantity, file in map(None, quantities, files_in):
    22         mux[quantity] = Urs_points(file)
     22        mux[quantity] = Read_urs(file)
    2323
    2424    # Could check that the depth is the same. (hashing)
  • trunk/anuga_core/source/anuga/pmesh/mesh_interface.py

    r7711 r7770  
    2626                             interior_regions=None,
    2727                             interior_holes=None,
     28                             hole_tags=None,
    2829                             poly_geo_reference=None,
    2930                             mesh_geo_reference=None,
     
    5354    throw an error
    5455   
    55     Interior_holes is a list of ploygons for each hole.
     56    Interior_holes is a list of polygons for each hole.
     57    hole_tags is an optional list of boundary tags for the holes, see
     58                boundary_tags parameter.
    5659
    5760    This function does not allow segments to share points - use underlying
     
    9295              'interior_regions': interior_regions,
    9396              'interior_holes': interior_holes,
     97              'hole_tags': hole_tags,
    9498              'poly_geo_reference': poly_geo_reference,
    9599              'mesh_geo_reference': mesh_geo_reference,
     
    127131                              interior_regions=None,
    128132                              interior_holes=None,
     133                              hole_tags=None,
    129134                              poly_geo_reference=None,
    130135                              mesh_geo_reference=None,
     
    311316        for polygon in interior_holes:
    312317            m.add_hole_from_polygon(polygon,
     318                                    segment_tags=hole_tags,
    313319                                    geo_reference=poly_geo_reference)
    314320       
  • trunk/anuga_core/source/anuga/shallow_water/data_manager.py

    r7769 r7770  
    268268
    269269
    270 ##
    271 # @brief
    272 # @param filename
    273 # @param x
    274 # @param y
    275 # @param z
    276 def write_obj(filename, x, y, z):
    277     """Store x,y,z vectors into filename (obj format).
    278 
    279        Vectors are assumed to have dimension (M,3) where
    280        M corresponds to the number elements.
    281        triangles are assumed to be disconnected
    282 
    283        The three numbers in each vector correspond to three vertices,
    284 
    285        e.g. the x coordinate of vertex 1 of element i is in x[i,1]
    286     """
    287 
    288     import os.path
    289 
    290     root, ext = os.path.splitext(filename)
    291     if ext == '.obj':
    292         FN = filename
    293     else:
    294         FN = filename + '.obj'
    295 
    296     outfile = open(FN, 'wb')
    297     outfile.write("# Triangulation as an obj file\n")
    298 
    299     M, N = x.shape
    300     assert N == 3  #Assuming three vertices per element
    301 
    302     for i in range(M):
    303         for j in range(N):
    304             outfile.write("v %f %f %f\n" % (x[i,j], y[i,j], z[i,j]))
    305 
    306     for i in range(M):
    307         base = i * N
    308         outfile.write("f %d %d %d\n" % (base+1, base+2, base+3))
    309 
    310     outfile.close()
    311270
    312271##
     
    949908        log.critical(msg)
    950909
    951 ################################################################################
    952 # Functions to obtain diagnostics from sww files
    953 ################################################################################
    954 
    955 
    956 
    957 ##
    958 # @brief Get values for quantities interpolated to polyline midpoints from SWW.
    959 # @param filename Path to file to read.
    960 # @param quantity_names Quantity names to get.
    961 # @param polyline Representation of desired cross-section.
    962 # @param verbose True if this function is to be verbose.
    963 # @return (segments, i_func) where segments is a list of Triangle_intersection
    964 #         instances and i_func is an instance of Interpolation_function.
    965 # @note For 'polyline' assume absolute UTM coordinates.
    966 def get_interpolated_quantities_at_polyline_midpoints(filename,
    967                                                       quantity_names=None,
    968                                                       polyline=None,
    969                                                       verbose=False):
    970     """Get values for quantities interpolated to polyline midpoints from SWW
    971 
    972     Input:
    973         filename - Name of sww file
    974         quantity_names - Names of quantities to load
    975         polyline: Representation of desired cross section - it may contain
    976                   multiple sections allowing for complex shapes. Assume
    977                   absolute UTM coordinates.
    978                   Format [[x0, y0], [x1, y1], ...]
    979 
    980     Output:
    981         segments: list of instances of class Triangle_intersection
    982         interpolation_function: Instance of class Interpolation_function
    983 
    984 
    985       This function is used by get_flow_through_cross_section and
    986       get_energy_through_cross_section
    987     """
    988 
    989     from anuga.fit_interpolate.interpolate import Interpolation_function
    990 
    991     # Get mesh and quantities from sww file
    992     X = get_mesh_and_quantities_from_file(filename,
    993                                           quantities=quantity_names,
    994                                           verbose=verbose)
    995     mesh, quantities, time = X
    996 
    997     # Find all intersections and associated triangles.
    998     segments = mesh.get_intersecting_segments(polyline, verbose=verbose)
    999 
    1000     # Get midpoints
    1001     interpolation_points = segment_midpoints(segments)
    1002 
    1003     # Interpolate
    1004     if verbose:
    1005         log.critical('Interpolating - total number of interpolation points = %d'
    1006                      % len(interpolation_points))
    1007 
    1008     I = Interpolation_function(time,
    1009                                quantities,
    1010                                quantity_names=quantity_names,
    1011                                vertex_coordinates=mesh.nodes,
    1012                                triangles=mesh.triangles,
    1013                                interpolation_points=interpolation_points,
    1014                                verbose=verbose)
    1015 
    1016     return segments, I
    1017 
    1018 
    1019 ##
    1020 # @brief Obtain flow (m^3/s) perpendicular to specified cross section.
    1021 # @param filename Path to file to read.
    1022 # @param polyline Representation of desired cross-section.
    1023 # @param verbose Trie if this function is to be verbose.
    1024 # @return (time, Q) where time and Q are lists of time and flow respectively.
    1025 def get_flow_through_cross_section(filename, polyline, verbose=False):
    1026     """Obtain flow (m^3/s) perpendicular to specified cross section.
    1027 
    1028     Inputs:
    1029         filename: Name of sww file
    1030         polyline: Representation of desired cross section - it may contain
    1031                   multiple sections allowing for complex shapes. Assume
    1032                   absolute UTM coordinates.
    1033                   Format [[x0, y0], [x1, y1], ...]
    1034 
    1035     Output:
    1036         time: All stored times in sww file
    1037         Q: Hydrograph of total flow across given segments for all stored times.
    1038 
    1039     The normal flow is computed for each triangle intersected by the polyline
    1040     and added up.  Multiple segments at different angles are specified the
    1041     normal flows may partially cancel each other.
    1042 
    1043     The typical usage of this function would be to get flow through a channel,
    1044     and the polyline would then be a cross section perpendicular to the flow.
    1045     """
    1046 
    1047     quantity_names =['elevation',
    1048                      'stage',
    1049                      'xmomentum',
    1050                      'ymomentum']
    1051 
    1052     # Get values for quantities at each midpoint of poly line from sww file
    1053     X = get_interpolated_quantities_at_polyline_midpoints(filename,
    1054                                                           quantity_names=\
    1055                                                               quantity_names,
    1056                                                           polyline=polyline,
    1057                                                           verbose=verbose)
    1058     segments, interpolation_function = X
    1059 
    1060     # Get vectors for time and interpolation_points
    1061     time = interpolation_function.time
    1062     interpolation_points = interpolation_function.interpolation_points
    1063 
    1064     if verbose: log.critical('Computing hydrograph')
    1065 
    1066     # Compute hydrograph
    1067     Q = []
    1068     for t in time:
    1069         total_flow = 0
    1070         for i in range(len(interpolation_points)):
    1071             elevation, stage, uh, vh = interpolation_function(t, point_id=i)
    1072             normal = segments[i].normal
    1073 
    1074             # Inner product of momentum vector with segment normal [m^2/s]
    1075             normal_momentum = uh*normal[0] + vh*normal[1]
    1076 
    1077             # Flow across this segment [m^3/s]
    1078             segment_flow = normal_momentum * segments[i].length
    1079 
    1080             # Accumulate
    1081             total_flow += segment_flow
    1082 
    1083         # Store flow at this timestep
    1084         Q.append(total_flow)
    1085 
    1086 
    1087     return time, Q
    1088 
    1089 
    1090 ##
    1091 # @brief Get average energy across a cross-section.
    1092 # @param filename Path to file of interest.
    1093 # @param polyline Representation of desired cross-section.
    1094 # @param kind Select energy to compute: 'specific' or 'total'.
    1095 # @param verbose True if this function is to be verbose.
    1096 # @return (time, E) where time and E are lists of timestep and energy.
    1097 def get_energy_through_cross_section(filename,
    1098                                      polyline,
    1099                                      kind='total',
    1100                                      verbose=False):
    1101     """Obtain average energy head [m] across specified cross section.
    1102 
    1103     Inputs:
    1104         polyline: Representation of desired cross section - it may contain
    1105                   multiple sections allowing for complex shapes. Assume
    1106                   absolute UTM coordinates.
    1107                   Format [[x0, y0], [x1, y1], ...]
    1108         kind:     Select which energy to compute.
    1109                   Options are 'specific' and 'total' (default)
    1110 
    1111     Output:
    1112         E: Average energy [m] across given segments for all stored times.
    1113 
    1114     The average velocity is computed for each triangle intersected by
    1115     the polyline and averaged weighted by segment lengths.
    1116 
    1117     The typical usage of this function would be to get average energy of
    1118     flow in a channel, and the polyline would then be a cross section
    1119     perpendicular to the flow.
    1120 
    1121     #FIXME (Ole) - need name for this energy reflecting that its dimension
    1122     is [m].
    1123     """
    1124 
    1125     from anuga.config import g, epsilon, velocity_protection as h0
    1126 
    1127     quantity_names =['elevation',
    1128                      'stage',
    1129                      'xmomentum',
    1130                      'ymomentum']
    1131 
    1132     # Get values for quantities at each midpoint of poly line from sww file
    1133     X = get_interpolated_quantities_at_polyline_midpoints(filename,
    1134                                                           quantity_names=\
    1135                                                               quantity_names,
    1136                                                           polyline=polyline,
    1137                                                           verbose=verbose)
    1138     segments, interpolation_function = X
    1139 
    1140     # Get vectors for time and interpolation_points
    1141     time = interpolation_function.time
    1142     interpolation_points = interpolation_function.interpolation_points
    1143 
    1144     if verbose: log.critical('Computing %s energy' % kind)
    1145 
    1146     # Compute total length of polyline for use with weighted averages
    1147     total_line_length = 0.0
    1148     for segment in segments:
    1149         total_line_length += segment.length
    1150 
    1151     # Compute energy
    1152     E = []
    1153     for t in time:
    1154         average_energy = 0.0
    1155         for i, p in enumerate(interpolation_points):
    1156             elevation, stage, uh, vh = interpolation_function(t, point_id=i)
    1157 
    1158             # Depth
    1159             h = depth = stage-elevation
    1160 
    1161             # Average velocity across this segment
    1162             if h > epsilon:
    1163                 # Use protection against degenerate velocities
    1164                 u = uh / (h + h0/h)
    1165                 v = vh / (h + h0/h)
    1166             else:
    1167                 u = v = 0.0
    1168 
    1169             speed_squared = u*u + v*v
    1170             kinetic_energy = 0.5 * speed_squared / g
    1171 
    1172             if kind == 'specific':
    1173                 segment_energy = depth + kinetic_energy
    1174             elif kind == 'total':
    1175                 segment_energy = stage + kinetic_energy
    1176             else:
    1177                 msg = 'Energy kind must be either "specific" or "total". '
    1178                 msg += 'I got %s' % kind
    1179 
    1180             # Add to weighted average
    1181             weigth = segments[i].length / total_line_length
    1182             average_energy += segment_energy * weigth
    1183 
    1184         # Store energy at this timestep
    1185         E.append(average_energy)
    1186 
    1187     return time, E
    1188 
    1189 
    1190 ##
    1191 # @brief Return highest elevation where depth > 0.
    1192 # @param filename Path to SWW file of interest.
    1193 # @param polygon If specified resrict to points inside this polygon.
    1194 # @param time_interval If specified resrict to within the time specified.
    1195 # @param verbose True if this function is  to be verbose.
    1196 def get_maximum_inundation_elevation(filename,
    1197                                      polygon=None,
    1198                                      time_interval=None,
    1199                                      verbose=False):
    1200     """Return highest elevation where depth > 0
    1201 
    1202     Usage:
    1203     max_runup = get_maximum_inundation_elevation(filename,
    1204                                                  polygon=None,
    1205                                                  time_interval=None,
    1206                                                  verbose=False)
    1207 
    1208     filename is a NetCDF sww file containing ANUGA model output.
    1209     Optional arguments polygon and time_interval restricts the maximum
    1210     runup calculation
    1211     to a points that lie within the specified polygon and time interval.
    1212 
    1213     If no inundation is found within polygon and time_interval the return value
    1214     is None signifying "No Runup" or "Everything is dry".
    1215 
    1216     See general function get_maximum_inundation_data for details.
    1217     """
    1218 
    1219     runup, _ = get_maximum_inundation_data(filename,
    1220                                            polygon=polygon,
    1221                                            time_interval=time_interval,
    1222                                            verbose=verbose)
    1223     return runup
    1224 
    1225 
    1226 ##
    1227 # @brief Return location of highest elevation where h > 0
    1228 # @param filename Path to SWW file to read.
    1229 # @param polygon If specified resrict to points inside this polygon.
    1230 # @param time_interval If specified resrict to within the time specified.
    1231 # @param verbose True if this function is  to be verbose.
    1232 def get_maximum_inundation_location(filename,
    1233                                     polygon=None,
    1234                                     time_interval=None,
    1235                                     verbose=False):
    1236     """Return location of highest elevation where h > 0
    1237 
    1238     Usage:
    1239     max_runup_location = get_maximum_inundation_location(filename,
    1240                                                          polygon=None,
    1241                                                          time_interval=None,
    1242                                                          verbose=False)
    1243 
    1244     filename is a NetCDF sww file containing ANUGA model output.
    1245     Optional arguments polygon and time_interval restricts the maximum
    1246     runup calculation
    1247     to a points that lie within the specified polygon and time interval.
    1248 
    1249     If no inundation is found within polygon and time_interval the return value
    1250     is None signifying "No Runup" or "Everything is dry".
    1251 
    1252     See general function get_maximum_inundation_data for details.
    1253     """
    1254 
    1255     _, max_loc = get_maximum_inundation_data(filename,
    1256                                              polygon=polygon,
    1257                                              time_interval=time_interval,
    1258                                              verbose=verbose)
    1259     return max_loc
    1260 
    1261 
    1262 ##
    1263 # @brief Compute maximum run up height from SWW file.
    1264 # @param filename Path to SWW file to read.
    1265 # @param polygon If specified resrict to points inside this polygon.
    1266 # @param time_interval If specified resrict to within the time specified.
    1267 # @param use_centroid_values
    1268 # @param verbose True if this function is to be verbose.
    1269 # @return (maximal_runup, maximal_runup_location)
    1270 def get_maximum_inundation_data(filename, polygon=None, time_interval=None,
    1271                                 use_centroid_values=False,
    1272                                 verbose=False):
    1273     """Compute maximum run up height from sww file.
    1274 
    1275     Usage:
    1276     runup, location = get_maximum_inundation_data(filename,
    1277                                                   polygon=None,
    1278                                                   time_interval=None,
    1279                                                   verbose=False)
    1280 
    1281     Algorithm is as in get_maximum_inundation_elevation from
    1282     shallow_water_domain except that this function works with the sww file and
    1283     computes the maximal runup height over multiple timesteps.
    1284 
    1285     Optional arguments polygon and time_interval restricts the maximum runup
    1286     calculation to a points that lie within the specified polygon and time
    1287     interval.
    1288 
    1289     Polygon is assumed to be in (absolute) UTM coordinates in the same zone
    1290     as domain.
    1291 
    1292     If no inundation is found within polygon and time_interval the return value
    1293     is None signifying "No Runup" or "Everything is dry".
    1294     """
    1295 
    1296     # We are using nodal values here as that is what is stored in sww files.
    1297 
    1298     # Water depth below which it is considered to be 0 in the model
    1299     # FIXME (Ole): Allow this to be specified as a keyword argument as well
    1300 
    1301     from anuga.geometry.polygon import inside_polygon
    1302     from anuga.config import minimum_allowed_height
    1303     from Scientific.IO.NetCDF import NetCDFFile
    1304 
    1305     dir, base = os.path.split(filename)
    1306 
    1307     iterate_over = get_all_swwfiles(dir, base)
    1308 
    1309     # Read sww file
    1310     if verbose: log.critical('Reading from %s' % filename)
    1311     # FIXME: Use general swwstats (when done)
    1312 
    1313     maximal_runup = None
    1314     maximal_runup_location = None
    1315 
    1316     for file, swwfile in enumerate (iterate_over):
    1317         # Read sww file
    1318         filename = join(dir, swwfile+'.sww')
    1319 
    1320         if verbose: log.critical('Reading from %s' % filename)
    1321         # FIXME: Use general swwstats (when done)
    1322 
    1323         fid = NetCDFFile(filename)
    1324 
    1325         # Get geo_reference
    1326         # sww files don't have to have a geo_ref
    1327         try:
    1328             geo_reference = Geo_reference(NetCDFObject=fid)
    1329         except AttributeError, e:
    1330             geo_reference = Geo_reference() # Default georef object
    1331 
    1332         xllcorner = geo_reference.get_xllcorner()
    1333         yllcorner = geo_reference.get_yllcorner()
    1334         zone = geo_reference.get_zone()
    1335 
    1336         # Get extent
    1337         volumes = fid.variables['volumes'][:]
    1338         x = fid.variables['x'][:] + xllcorner
    1339         y = fid.variables['y'][:] + yllcorner
    1340 
    1341         # Get the relevant quantities (Convert from single precison)
    1342         elevation = num.array(fid.variables['elevation'][:], num.float)
    1343         stage = num.array(fid.variables['stage'][:], num.float)
    1344 
    1345         # Here's where one could convert nodal information to centroid
    1346         # information but is probably something we need to write in C.
    1347         # Here's a Python thought which is NOT finished!!!
    1348         if use_centroid_values is True:
    1349             x = get_centroid_values(x, volumes)
    1350             y = get_centroid_values(y, volumes)
    1351             elevation = get_centroid_values(elevation, volumes)
    1352 
    1353         # Spatial restriction
    1354         if polygon is not None:
    1355             msg = 'polygon must be a sequence of points.'
    1356             assert len(polygon[0]) == 2, msg
    1357             # FIXME (Ole): Make a generic polygon input check in polygon.py
    1358             # and call it here
    1359             points = num.ascontiguousarray(num.concatenate((x[:,num.newaxis],
    1360                                                             y[:,num.newaxis]),
    1361                                                             axis=1))
    1362             point_indices = inside_polygon(points, polygon)
    1363 
    1364             # Restrict quantities to polygon
    1365             elevation = num.take(elevation, point_indices, axis=0)
    1366             stage = num.take(stage, point_indices, axis=1)
    1367 
    1368             # Get info for location of maximal runup
    1369             points_in_polygon = num.take(points, point_indices, axis=0)
    1370 
    1371             x = points_in_polygon[:,0]
    1372             y = points_in_polygon[:,1]
    1373         else:
    1374             # Take all points
    1375             point_indices = num.arange(len(x))
    1376 
    1377         # Temporal restriction
    1378         time = fid.variables['time'][:]
    1379         all_timeindices = num.arange(len(time))
    1380         if time_interval is not None:
    1381             msg = 'time_interval must be a sequence of length 2.'
    1382             assert len(time_interval) == 2, msg
    1383             msg = 'time_interval %s must not be decreasing.' % time_interval
    1384             assert time_interval[1] >= time_interval[0], msg
    1385             msg = 'Specified time interval [%.8f:%.8f] ' % tuple(time_interval)
    1386             msg += 'must does not match model time interval: [%.8f, %.8f]\n' \
    1387                    % (time[0], time[-1])
    1388             if time_interval[1] < time[0]: raise ValueError(msg)
    1389             if time_interval[0] > time[-1]: raise ValueError(msg)
    1390 
    1391             # Take time indices corresponding to interval (& is bitwise AND)
    1392             timesteps = num.compress((time_interval[0] <= time) \
    1393                                      & (time <= time_interval[1]),
    1394                                      all_timeindices)
    1395 
    1396             msg = 'time_interval %s did not include any model timesteps.' \
    1397                   % time_interval
    1398             assert not num.alltrue(timesteps == 0), msg
    1399         else:
    1400             # Take them all
    1401             timesteps = all_timeindices
    1402 
    1403         fid.close()
    1404 
    1405         # Compute maximal runup for each timestep
    1406         #maximal_runup = None
    1407         #maximal_runup_location = None
    1408         #maximal_runups = [None]
    1409         #maximal_runup_locations = [None]
    1410 
    1411         for i in timesteps:
    1412             if use_centroid_values is True:
    1413                 stage_i = get_centroid_values(stage[i,:], volumes)
    1414             else:
    1415                 stage_i = stage[i,:]
    1416 
    1417             depth = stage_i - elevation
    1418 
    1419             # Get wet nodes i.e. nodes with depth>0 within given region
    1420             # and timesteps
    1421             wet_nodes = num.compress(depth > minimum_allowed_height,
    1422                                      num.arange(len(depth)))
    1423 
    1424             if num.alltrue(wet_nodes == 0):
    1425                 runup = None
    1426             else:
    1427                 # Find maximum elevation among wet nodes
    1428                 wet_elevation = num.take(elevation, wet_nodes, axis=0)
    1429                 runup_index = num.argmax(wet_elevation)
    1430                 runup = max(wet_elevation)
    1431                 assert wet_elevation[runup_index] == runup       # Must be True
    1432 
    1433             if runup > maximal_runup:
    1434                 maximal_runup = runup      # works even if maximal_runup is None
    1435 
    1436                 # Record location
    1437                 wet_x = num.take(x, wet_nodes, axis=0)
    1438                 wet_y = num.take(y, wet_nodes, axis=0)
    1439                 maximal_runup_location = [wet_x[runup_index],wet_y[runup_index]]
    1440 
    1441     return maximal_runup, maximal_runup_location
    1442910
    1443911
  • trunk/anuga_core/source/anuga/shallow_water/test_data_manager.py

    r7769 r7770  
    13511351        assert min(min(domain2.get_quantity('ymomentum').get_values()))==filler
    13521352
    1353 
    1354 
    1355     def test_weed(self):
    1356         from data_manager import weed
    1357 
    1358         coordinates1 = [[0.,0.],[1.,0.],[1.,1.],[1.,0.],[2.,0.],[1.,1.]]
    1359         volumes1 = [[0,1,2],[3,4,5]]
    1360         boundary1= {(0,1): 'external',(1,2): 'not external',(2,0): 'external',(3,4): 'external',(4,5): 'external',(5,3): 'not external'}
    1361         coordinates2,volumes2,boundary2=weed(coordinates1,volumes1,boundary1)
    1362 
    1363         points2 = {(0.,0.):None,(1.,0.):None,(1.,1.):None,(2.,0.):None}
    1364 
    1365         assert len(points2)==len(coordinates2)
    1366         for i in range(len(coordinates2)):
    1367             coordinate = tuple(coordinates2[i])
    1368             assert points2.has_key(coordinate)
    1369             points2[coordinate]=i
    1370 
    1371         for triangle in volumes1:
    1372             for coordinate in triangle:
    1373                 assert coordinates2[points2[tuple(coordinates1[coordinate])]][0]==coordinates1[coordinate][0]
    1374                 assert coordinates2[points2[tuple(coordinates1[coordinate])]][1]==coordinates1[coordinate][1]
    13751353
    13761354
     
    24882466       
    24892467
    2490     def test_get_maximum_inundation(self):
    2491         """Test that sww information can be converted correctly to maximum
    2492         runup elevation and location (without and with georeferencing)
    2493 
    2494         This test creates a slope and a runup which is maximal (~11m) at around 10s
    2495         and levels out to the boundary condition (1m) at about 30s.
    2496         """
    2497 
    2498         import time, os
    2499         from Scientific.IO.NetCDF import NetCDFFile
    2500 
    2501         #Setup
    2502 
    2503         from mesh_factory import rectangular
    2504 
    2505         # Create basic mesh (100m x 100m)
    2506         points, vertices, boundary = rectangular(20, 5, 100, 50)
    2507 
    2508         # Create shallow water domain
    2509         domain = Domain(points, vertices, boundary)
    2510         domain.default_order = 2
    2511         domain.set_minimum_storable_height(0.01)
    2512 
    2513         domain.set_name('runuptest')
    2514         swwfile = domain.get_name() + '.sww'
    2515 
    2516         domain.set_datadir('.')
    2517         domain.format = 'sww'
    2518         domain.smooth = True
    2519 
    2520         # FIXME (Ole): Backwards compatibility
    2521         # Look at sww file and see what happens when
    2522         # domain.tight_slope_limiters = 1
    2523         domain.tight_slope_limiters = 0
    2524         domain.use_centroid_velocities = 0 # Backwards compatibility (7/5/8)       
    2525        
    2526         Br = Reflective_boundary(domain)
    2527         Bd = Dirichlet_boundary([1.0,0,0])
    2528 
    2529 
    2530         #---------- First run without geo referencing
    2531        
    2532         domain.set_quantity('elevation', lambda x,y: -0.2*x + 14) # Slope
    2533         domain.set_quantity('stage', -6)
    2534         domain.set_boundary( {'left': Br, 'right': Bd, 'top': Br, 'bottom': Br})
    2535 
    2536         for t in domain.evolve(yieldstep=1, finaltime = 50):
    2537             pass
    2538 
    2539 
    2540         # Check maximal runup
    2541         runup = get_maximum_inundation_elevation(swwfile)
    2542         location = get_maximum_inundation_location(swwfile)
    2543         #print 'Runup, location', runup, location
    2544         assert num.allclose(runup, 11) or num.allclose(runup, 12) # old limiters
    2545         assert num.allclose(location[0], 15) or num.allclose(location[0], 10)
    2546 
    2547         # Check final runup
    2548         runup = get_maximum_inundation_elevation(swwfile, time_interval=[45,50])
    2549         location = get_maximum_inundation_location(swwfile, time_interval=[45,50])
    2550         # print 'Runup, location:',runup, location       
    2551         assert num.allclose(runup, 1)
    2552         assert num.allclose(location[0], 65)
    2553 
    2554         # Check runup restricted to a polygon
    2555         p = [[50,1], [99,1], [99,49], [50,49]]
    2556         runup = get_maximum_inundation_elevation(swwfile, polygon=p)
    2557         location = get_maximum_inundation_location(swwfile, polygon=p)
    2558         #print runup, location       
    2559         assert num.allclose(runup, 4)
    2560         assert num.allclose(location[0], 50)               
    2561 
    2562         # Check that mimimum_storable_height works
    2563         fid = NetCDFFile(swwfile, netcdf_mode_r) # Open existing file
    2564        
    2565         stage = fid.variables['stage'][:]
    2566         z = fid.variables['elevation'][:]
    2567         xmomentum = fid.variables['xmomentum'][:]
    2568         ymomentum = fid.variables['ymomentum'][:]       
    2569 
    2570        
    2571        
    2572         for i in range(stage.shape[0]):
    2573             h = stage[i]-z # depth vector at time step i
    2574            
    2575             # Check every node location
    2576             for j in range(stage.shape[1]):
    2577                 # Depth being either exactly zero implies
    2578                 # momentum being zero.
    2579                 # Or else depth must be greater than or equal to
    2580                 # the minimal storable height
    2581                 if h[j] == 0.0:
    2582                     assert xmomentum[i,j] == 0.0
    2583                     assert ymomentum[i,j] == 0.0               
    2584                 else:
    2585                     assert h[j] >= domain.minimum_storable_height
    2586        
    2587         fid.close()
    2588 
    2589         # Cleanup
    2590         os.remove(swwfile)
    2591        
    2592 
    2593 
    2594         #------------- Now the same with georeferencing
    2595 
    2596         domain.time=0.0
    2597         E = 308500
    2598         N = 6189000
    2599         #E = N = 0
    2600         domain.geo_reference = Geo_reference(56, E, N)
    2601 
    2602         domain.set_quantity('elevation', lambda x,y: -0.2*x + 14) # Slope
    2603         domain.set_quantity('stage', -6)
    2604         domain.set_boundary( {'left': Br, 'right': Bd, 'top': Br, 'bottom': Br})
    2605 
    2606         for t in domain.evolve(yieldstep=1, finaltime = 50):
    2607             pass
    2608 
    2609         # Check maximal runup
    2610         runup = get_maximum_inundation_elevation(swwfile)
    2611         location = get_maximum_inundation_location(swwfile)
    2612         assert num.allclose(runup, 11) or num.allclose(runup, 12) # old limiters
    2613         assert num.allclose(location[0], 15+E) or num.allclose(location[0], 10+E)
    2614 
    2615         # Check final runup
    2616         runup = get_maximum_inundation_elevation(swwfile, time_interval=[45,50])
    2617         location = get_maximum_inundation_location(swwfile, time_interval=[45,50])
    2618         assert num.allclose(runup, 1)
    2619         assert num.allclose(location[0], 65+E)
    2620 
    2621         # Check runup restricted to a polygon
    2622         p = num.array([[50,1], [99,1], [99,49], [50,49]], num.int) + num.array([E, N], num.int)      #array default#
    2623 
    2624         runup = get_maximum_inundation_elevation(swwfile, polygon=p)
    2625         location = get_maximum_inundation_location(swwfile, polygon=p)
    2626         assert num.allclose(runup, 4)
    2627         assert num.allclose(location[0], 50+E)               
    2628 
    2629 
    2630         # Cleanup
    2631         os.remove(swwfile)
    2632 
    2633 
    2634     def test_get_mesh_and_quantities_from_sww_file(self):
    2635         """test_get_mesh_and_quantities_from_sww_file(self):
    2636         """     
    2637        
    2638         # Generate a test sww file with non trivial georeference
    2639        
    2640         import time, os
    2641         from Scientific.IO.NetCDF import NetCDFFile
    2642 
    2643         # Setup
    2644         from mesh_factory import rectangular
    2645 
    2646         # Create basic mesh (100m x 5m)
    2647         width = 5
    2648         length = 50
    2649         t_end = 10
    2650         points, vertices, boundary = rectangular(length, width, 50, 5)
    2651 
    2652         # Create shallow water domain
    2653         domain = Domain(points, vertices, boundary,
    2654                         geo_reference = Geo_reference(56,308500,6189000))
    2655 
    2656         domain.set_name('flowtest')
    2657         swwfile = domain.get_name() + '.sww'
    2658         domain.set_datadir('.')
    2659 
    2660         Br = Reflective_boundary(domain)    # Side walls
    2661         Bd = Dirichlet_boundary([1, 0, 0])  # inflow
    2662 
    2663         domain.set_boundary( {'left': Bd, 'right': Bd, 'top': Br, 'bottom': Br})
    2664 
    2665         for t in domain.evolve(yieldstep=1, finaltime = t_end):
    2666             pass
    2667 
    2668        
    2669         # Read it
    2670 
    2671         # Get mesh and quantities from sww file
    2672         X = get_mesh_and_quantities_from_file(swwfile,
    2673                                               quantities=['elevation',
    2674                                                           'stage',
    2675                                                           'xmomentum',
    2676                                                           'ymomentum'],
    2677                                               verbose=False)
    2678         mesh, quantities, time = X
    2679        
    2680 
    2681         # Check that mesh has been recovered
    2682         assert num.alltrue(mesh.triangles == domain.get_triangles())
    2683         assert num.allclose(mesh.nodes, domain.get_nodes())
    2684 
    2685         # Check that time has been recovered
    2686         assert num.allclose(time, range(t_end+1))
    2687 
    2688         # Check that quantities have been recovered
    2689         # (sww files use single precision)
    2690         z=domain.get_quantity('elevation').get_values(location='unique vertices')
    2691         assert num.allclose(quantities['elevation'], z)
    2692 
    2693         for q in ['stage', 'xmomentum', 'ymomentum']:
    2694             # Get quantity at last timestep
    2695             q_ref=domain.get_quantity(q).get_values(location='unique vertices')
    2696 
    2697             #print q,quantities[q]
    2698             q_sww=quantities[q][-1,:]
    2699 
    2700             msg = 'Quantity %s failed to be recovered' %q
    2701             assert num.allclose(q_ref, q_sww, atol=1.0e-6), msg
    2702            
    2703         # Cleanup
    2704         os.remove(swwfile)
    2705        
    2706 
    2707     def test_get_flow_through_cross_section(self):
    2708         """test_get_flow_through_cross_section(self):
    2709 
    2710         Test that the total flow through a cross section can be
    2711         correctly obtained from an sww file.
    2712        
    2713         This test creates a flat bed with a known flow through it and tests
    2714         that the function correctly returns the expected flow.
    2715 
    2716         The specifics are
    2717         u = 2 m/s
    2718         h = 1 m
    2719         w = 3 m (width of channel)
    2720 
    2721         q = u*h*w = 6 m^3/s
    2722 
    2723         #---------- First run without geo referencing       
    2724        
    2725         """
    2726 
    2727         import time, os
    2728         from Scientific.IO.NetCDF import NetCDFFile
    2729 
    2730         # Setup
    2731         from mesh_factory import rectangular
    2732 
    2733         # Create basic mesh (20m x 3m)
    2734         width = 3
    2735         length = 20
    2736         t_end = 3
    2737         points, vertices, boundary = rectangular(length, width,
    2738                                                  length, width)
    2739 
    2740         # Create shallow water domain
    2741         domain = Domain(points, vertices, boundary)
    2742         domain.default_order = 2
    2743         domain.set_minimum_storable_height(0.01)
    2744 
    2745         domain.set_name('flowtest')
    2746         swwfile = domain.get_name() + '.sww'
    2747 
    2748         domain.set_datadir('.')
    2749         domain.format = 'sww'
    2750         domain.smooth = True
    2751 
    2752         h = 1.0
    2753         u = 2.0
    2754         uh = u*h
    2755 
    2756         Br = Reflective_boundary(domain)     # Side walls
    2757         Bd = Dirichlet_boundary([h, uh, 0])  # 2 m/s across the 3 m inlet:
    2758 
    2759 
    2760        
    2761         domain.set_quantity('elevation', 0.0)
    2762         domain.set_quantity('stage', h)
    2763         domain.set_quantity('xmomentum', uh)
    2764         domain.set_boundary( {'left': Bd, 'right': Bd, 'top': Br, 'bottom': Br})
    2765 
    2766         for t in domain.evolve(yieldstep=1, finaltime = t_end):
    2767             pass
    2768 
    2769         # Check that momentum is as it should be in the interior
    2770 
    2771         I = [[0, width/2.],
    2772              [length/2., width/2.],
    2773              [length, width/2.]]
    2774        
    2775         f = file_function(swwfile,
    2776                           quantities=['stage', 'xmomentum', 'ymomentum'],
    2777                           interpolation_points=I,
    2778                           verbose=False)
    2779         for t in range(t_end+1):
    2780             for i in range(3):
    2781                 assert num.allclose(f(t, i), [1, 2, 0], atol=1.0e-6)
    2782            
    2783 
    2784         # Check flows through the middle
    2785         for i in range(5):
    2786             x = length/2. + i*0.23674563 # Arbitrary
    2787             cross_section = [[x, 0], [x, width]]
    2788             time, Q = get_flow_through_cross_section(swwfile,
    2789                                                      cross_section,
    2790                                                      verbose=False)
    2791 
    2792             assert num.allclose(Q, uh*width)
    2793 
    2794 
    2795        
    2796         # Try the same with partial lines
    2797         x = length/2.
    2798         for i in range(5):
    2799             start_point = [length/2., i*width/5.]
    2800             #print start_point
    2801                            
    2802             cross_section = [start_point, [length/2., width]]
    2803             time, Q = get_flow_through_cross_section(swwfile,
    2804                                                      cross_section,
    2805                                                      verbose=False)
    2806 
    2807             #print i, Q, (width-start_point[1])
    2808             assert num.allclose(Q, uh*(width-start_point[1]))
    2809 
    2810 
    2811         # Verify no flow when line is parallel to flow
    2812         cross_section = [[length/2.-10, width/2.], [length/2.+10, width/2.]]
    2813         time, Q = get_flow_through_cross_section(swwfile,
    2814                                                  cross_section,
    2815                                                  verbose=False)
    2816 
    2817         #print i, Q
    2818         assert num.allclose(Q, 0, atol=1.0e-5)       
    2819 
    2820 
    2821         # Try with lines on an angle (all flow still runs through here)
    2822         cross_section = [[length/2., 0], [length/2.+width, width]]
    2823         time, Q = get_flow_through_cross_section(swwfile,
    2824                                                  cross_section,
    2825                                                  verbose=False)
    2826 
    2827         assert num.allclose(Q, uh*width)       
    2828        
    2829 
    2830 
    2831                                      
    2832     def test_get_flow_through_cross_section_with_geo(self):
    2833         """test_get_flow_through_cross_section(self):
    2834 
    2835         Test that the total flow through a cross section can be
    2836         correctly obtained from an sww file.
    2837        
    2838         This test creates a flat bed with a known flow through it and tests
    2839         that the function correctly returns the expected flow.
    2840 
    2841         The specifics are
    2842         u = 2 m/s
    2843         h = 2 m
    2844         w = 3 m (width of channel)
    2845 
    2846         q = u*h*w = 12 m^3/s
    2847 
    2848 
    2849         This run tries it with georeferencing and with elevation = -1
    2850        
    2851         """
    2852 
    2853         import time, os
    2854         from Scientific.IO.NetCDF import NetCDFFile
    2855 
    2856         # Setup
    2857         from mesh_factory import rectangular
    2858 
    2859         # Create basic mesh (20m x 3m)
    2860         width = 3
    2861         length = 20
    2862         t_end = 1
    2863         points, vertices, boundary = rectangular(length, width,
    2864                                                  length, width)
    2865 
    2866         # Create shallow water domain
    2867         domain = Domain(points, vertices, boundary,
    2868                         geo_reference = Geo_reference(56,308500,6189000))
    2869 
    2870         domain.default_order = 2
    2871         domain.set_minimum_storable_height(0.01)
    2872 
    2873         domain.set_name('flowtest')
    2874         swwfile = domain.get_name() + '.sww'
    2875 
    2876         domain.set_datadir('.')
    2877         domain.format = 'sww'
    2878         domain.smooth = True
    2879 
    2880         e = -1.0
    2881         w = 1.0
    2882         h = w-e
    2883         u = 2.0
    2884         uh = u*h
    2885 
    2886         Br = Reflective_boundary(domain)     # Side walls
    2887         Bd = Dirichlet_boundary([w, uh, 0])  # 2 m/s across the 3 m inlet:
    2888 
    2889 
    2890 
    2891        
    2892         domain.set_quantity('elevation', e)
    2893         domain.set_quantity('stage', w)
    2894         domain.set_quantity('xmomentum', uh)
    2895         domain.set_boundary( {'left': Bd, 'right': Bd, 'top': Br, 'bottom': Br})
    2896 
    2897         for t in domain.evolve(yieldstep=1, finaltime = t_end):
    2898             pass
    2899 
    2900         # Check that momentum is as it should be in the interior
    2901 
    2902         I = [[0, width/2.],
    2903              [length/2., width/2.],
    2904              [length, width/2.]]
    2905        
    2906         I = domain.geo_reference.get_absolute(I)
    2907         f = file_function(swwfile,
    2908                           quantities=['stage', 'xmomentum', 'ymomentum'],
    2909                           interpolation_points=I,
    2910                           verbose=False)
    2911 
    2912         for t in range(t_end+1):
    2913             for i in range(3):
    2914                 #print i, t, f(t, i)           
    2915                 assert num.allclose(f(t, i), [w, uh, 0], atol=1.0e-6)
    2916            
    2917 
    2918         # Check flows through the middle
    2919         for i in range(5):
    2920             x = length/2. + i*0.23674563 # Arbitrary
    2921             cross_section = [[x, 0], [x, width]]
    2922 
    2923             cross_section = domain.geo_reference.get_absolute(cross_section)           
    2924             time, Q = get_flow_through_cross_section(swwfile,
    2925                                                      cross_section,
    2926                                                      verbose=False)
    2927 
    2928             assert num.allclose(Q, uh*width)
    2929 
    2930 
    2931            
    2932     def test_get_energy_through_cross_section(self):
    2933         """test_get_energy_through_cross_section(self):
    2934 
    2935         Test that the specific and total energy through a cross section can be
    2936         correctly obtained from an sww file.
    2937        
    2938         This test creates a flat bed with a known flow through it and tests
    2939         that the function correctly returns the expected energies.
    2940 
    2941         The specifics are
    2942         u = 2 m/s
    2943         h = 1 m
    2944         w = 3 m (width of channel)
    2945 
    2946         q = u*h*w = 6 m^3/s
    2947         Es = h + 0.5*v*v/g  # Specific energy head [m]
    2948         Et = w + 0.5*v*v/g  # Total energy head [m]       
    2949 
    2950 
    2951         This test uses georeferencing
    2952        
    2953         """
    2954 
    2955         import time, os
    2956         from Scientific.IO.NetCDF import NetCDFFile
    2957 
    2958         # Setup
    2959         from mesh_factory import rectangular
    2960 
    2961         # Create basic mesh (20m x 3m)
    2962         width = 3
    2963         length = 20
    2964         t_end = 1
    2965         points, vertices, boundary = rectangular(length, width,
    2966                                                  length, width)
    2967 
    2968         # Create shallow water domain
    2969         domain = Domain(points, vertices, boundary,
    2970                         geo_reference = Geo_reference(56,308500,6189000))
    2971 
    2972         domain.default_order = 2
    2973         domain.set_minimum_storable_height(0.01)
    2974 
    2975         domain.set_name('flowtest')
    2976         swwfile = domain.get_name() + '.sww'
    2977 
    2978         domain.set_datadir('.')
    2979         domain.format = 'sww'
    2980         domain.smooth = True
    2981 
    2982         e = -1.0
    2983         w = 1.0
    2984         h = w-e
    2985         u = 2.0
    2986         uh = u*h
    2987 
    2988         Br = Reflective_boundary(domain)     # Side walls
    2989         Bd = Dirichlet_boundary([w, uh, 0])  # 2 m/s across the 3 m inlet:
    2990 
    2991        
    2992         domain.set_quantity('elevation', e)
    2993         domain.set_quantity('stage', w)
    2994         domain.set_quantity('xmomentum', uh)
    2995         domain.set_boundary( {'left': Bd, 'right': Bd, 'top': Br, 'bottom': Br})
    2996 
    2997         for t in domain.evolve(yieldstep=1, finaltime = t_end):
    2998             pass
    2999 
    3000         # Check that momentum is as it should be in the interior
    3001 
    3002         I = [[0, width/2.],
    3003              [length/2., width/2.],
    3004              [length, width/2.]]
    3005        
    3006         I = domain.geo_reference.get_absolute(I)
    3007         f = file_function(swwfile,
    3008                           quantities=['stage', 'xmomentum', 'ymomentum'],
    3009                           interpolation_points=I,
    3010                           verbose=False)
    3011 
    3012         for t in range(t_end+1):
    3013             for i in range(3):
    3014                 #print i, t, f(t, i)
    3015                 assert num.allclose(f(t, i), [w, uh, 0], atol=1.0e-6)
    3016            
    3017 
    3018         # Check energies through the middle
    3019         for i in range(5):
    3020             x = length/2. + i*0.23674563 # Arbitrary
    3021             cross_section = [[x, 0], [x, width]]
    3022 
    3023             cross_section = domain.geo_reference.get_absolute(cross_section)           
    3024            
    3025             time, Es = get_energy_through_cross_section(swwfile,
    3026                                                        cross_section,
    3027                                                        kind='specific',
    3028                                                        verbose=False)
    3029             assert num.allclose(Es, h + 0.5*u*u/g)
    3030            
    3031             time, Et = get_energy_through_cross_section(swwfile,
    3032                                                         cross_section,
    3033                                                         kind='total',
    3034                                                         verbose=False)
    3035             assert num.allclose(Et, w + 0.5*u*u/g)           
    3036 
    3037            
    30382468       
    30392469    def test_get_all_swwfiles(self):
  • trunk/anuga_core/source/anuga/shallow_water/test_shallow_water_domain.py

    r7769 r7770  
    2929from anuga.shallow_water.forcing import Rainfall, Wind_stress
    3030from anuga.shallow_water.forcing import Inflow, Cross_section
    31 from anuga.shallow_water.data_manager import get_flow_through_cross_section
     31from anuga.shallow_water.sww_interrogate import get_flow_through_cross_section
    3232
    3333# boundary functions
     
    12571257        assert num.alltrue(wet_elevations < final_runup_height)
    12581258        assert num.allclose(wet_elevations, z)
    1259 
    1260     def test_get_maximum_inundation_from_sww(self):
    1261         """test_get_maximum_inundation_from_sww(self)
    1262 
    1263         Test of get_maximum_inundation_elevation()
    1264         and get_maximum_inundation_location() from data_manager.py
    1265 
    1266         This is based on test_get_maximum_inundation_3(self) but works with the
    1267         stored results instead of with the internal data structure.
    1268 
    1269         This test uses the underlying get_maximum_inundation_data for tests
    1270         """
    1271 
    1272         from anuga.abstract_2d_finite_volumes.mesh_factory \
    1273                 import rectangular_cross
    1274         from data_manager import get_maximum_inundation_elevation
    1275         from data_manager import get_maximum_inundation_location
    1276         from data_manager import get_maximum_inundation_data
    1277 
    1278         initial_runup_height = -0.4
    1279         final_runup_height = -0.3
    1280 
    1281         #--------------------------------------------------------------
    1282         # Setup computational domain
    1283         #--------------------------------------------------------------
    1284         N = 10
    1285         points, vertices, boundary = rectangular_cross(N, N)
    1286         domain = Domain(points, vertices, boundary)
    1287         domain.set_name('runup_test')
    1288         domain.set_maximum_allowed_speed(1.0)
    1289 
    1290         # FIXME: This works better with old limiters so far
    1291         domain.tight_slope_limiters = 0
    1292 
    1293         #--------------------------------------------------------------
    1294         # Setup initial conditions
    1295         #--------------------------------------------------------------
    1296         def topography(x, y):
    1297             return -x/2                             # linear bed slope
    1298 
    1299         # Use function for elevation
    1300         domain.set_quantity('elevation', topography)
    1301         domain.set_quantity('friction', 0.)                # Zero friction
    1302         # Constant negative initial stage
    1303         domain.set_quantity('stage', initial_runup_height)
    1304 
    1305         #--------------------------------------------------------------
    1306         # Setup boundary conditions
    1307         #--------------------------------------------------------------
    1308         Br = Reflective_boundary(domain)                       # Reflective wall
    1309         Bd = Dirichlet_boundary([final_runup_height, 0, 0])    # Constant inflow
    1310 
    1311         # All reflective to begin with (still water)
    1312         domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br})
    1313 
    1314         #--------------------------------------------------------------
    1315         # Test initial inundation height
    1316         #--------------------------------------------------------------
    1317         indices = domain.get_wet_elements()
    1318         z = domain.get_quantity('elevation').\
    1319                 get_values(location='centroids', indices=indices)
    1320         assert num.alltrue(z < initial_runup_height)
    1321 
    1322         q_ref = domain.get_maximum_inundation_elevation()
    1323         # First order accuracy
    1324         assert num.allclose(q_ref, initial_runup_height, rtol=1.0/N)
    1325 
    1326         #--------------------------------------------------------------
    1327         # Let triangles adjust
    1328         #--------------------------------------------------------------
    1329         for t in domain.evolve(yieldstep = 0.1, finaltime = 1.0):
    1330             pass
    1331 
    1332         #--------------------------------------------------------------
    1333         # Test inundation height again
    1334         #--------------------------------------------------------------
    1335         q_ref = domain.get_maximum_inundation_elevation()
    1336         q = get_maximum_inundation_elevation('runup_test.sww')
    1337         msg = 'We got %f, should have been %f' % (q, q_ref)
    1338         assert num.allclose(q, q_ref, rtol=1.0/N), msg
    1339 
    1340         q = get_maximum_inundation_elevation('runup_test.sww')
    1341         msg = 'We got %f, should have been %f' % (q, initial_runup_height)
    1342         assert num.allclose(q, initial_runup_height, rtol = 1.0/N), msg
    1343 
    1344         # Test error condition if time interval is out
    1345         try:
    1346             q = get_maximum_inundation_elevation('runup_test.sww',
    1347                                                  time_interval=[2.0, 3.0])
    1348         except ValueError:
    1349             pass
    1350         else:
    1351             msg = 'should have caught wrong time interval'
    1352             raise Exception, msg
    1353 
    1354         # Check correct time interval
    1355         q, loc = get_maximum_inundation_data('runup_test.sww',
    1356                                              time_interval=[0.0, 3.0])
    1357         msg = 'We got %f, should have been %f' % (q, initial_runup_height)
    1358         assert num.allclose(q, initial_runup_height, rtol = 1.0/N), msg
    1359         assert num.allclose(-loc[0]/2, q)    # From topography formula
    1360 
    1361         #--------------------------------------------------------------
    1362         # Update boundary to allow inflow
    1363         #--------------------------------------------------------------
    1364         domain.set_boundary({'right': Bd})
    1365 
    1366         #--------------------------------------------------------------
    1367         # Evolve system through time
    1368         #--------------------------------------------------------------
    1369         q_max = None
    1370         for t in domain.evolve(yieldstep = 0.1, finaltime = 3.0,
    1371                                skip_initial_step = True):
    1372             q = domain.get_maximum_inundation_elevation()
    1373             if q > q_max:
    1374                 q_max = q
    1375 
    1376         #--------------------------------------------------------------
    1377         # Test inundation height again
    1378         #--------------------------------------------------------------
    1379         indices = domain.get_wet_elements()
    1380         z = domain.get_quantity('elevation').\
    1381                 get_values(location='centroids', indices=indices)
    1382 
    1383         assert num.alltrue(z < final_runup_height)
    1384 
    1385         q = domain.get_maximum_inundation_elevation()
    1386         # First order accuracy
    1387         assert num.allclose(q, final_runup_height, rtol=1.0/N)
    1388 
    1389         q, loc = get_maximum_inundation_data('runup_test.sww',
    1390                                              time_interval=[3.0, 3.0])
    1391         msg = 'We got %f, should have been %f' % (q, final_runup_height)
    1392         assert num.allclose(q, final_runup_height, rtol=1.0/N), msg
    1393         assert num.allclose(-loc[0]/2, q)    # From topography formula
    1394 
    1395         q = get_maximum_inundation_elevation('runup_test.sww')
    1396         loc = get_maximum_inundation_location('runup_test.sww')
    1397         msg = 'We got %f, should have been %f' % (q, q_max)
    1398         assert num.allclose(q, q_max, rtol=1.0/N), msg
    1399         assert num.allclose(-loc[0]/2, q)    # From topography formula
    1400 
    1401         q = get_maximum_inundation_elevation('runup_test.sww',
    1402                                              time_interval=[0, 3])
    1403         msg = 'We got %f, should have been %f' % (q, q_max)
    1404         assert num.allclose(q, q_max, rtol=1.0/N), msg
    1405 
    1406         # Check polygon mode
    1407         # Runup region
    1408         polygon = [[0.3, 0.0], [0.9, 0.0], [0.9, 1.0], [0.3, 1.0]]
    1409         q = get_maximum_inundation_elevation('runup_test.sww',
    1410                                              polygon = polygon,
    1411                                              time_interval=[0, 3])
    1412         msg = 'We got %f, should have been %f' % (q, q_max)
    1413         assert num.allclose(q, q_max, rtol=1.0/N), msg
    1414 
    1415         # Offshore region
    1416         polygon = [[0.9, 0.0], [1.0, 0.0], [1.0, 1.0], [0.9, 1.0]]
    1417         q, loc = get_maximum_inundation_data('runup_test.sww',
    1418                                              polygon = polygon,
    1419                                              time_interval=[0, 3])
    1420         msg = 'We got %f, should have been %f' % (q, -0.475)
    1421         assert num.allclose(q, -0.475, rtol=1.0/N), msg
    1422         assert is_inside_polygon(loc, polygon)
    1423         assert num.allclose(-loc[0]/2, q)    # From topography formula
    1424 
    1425         # Dry region
    1426         polygon = [[0.0, 0.0], [0.4, 0.0], [0.4, 1.0], [0.0, 1.0]]
    1427         q, loc = get_maximum_inundation_data('runup_test.sww',
    1428                                              polygon = polygon,
    1429                                              time_interval=[0, 3])
    1430         msg = 'We got %s, should have been None' % (q)
    1431         assert q is None, msg
    1432         msg = 'We got %s, should have been None' % (loc)
    1433         assert loc is None, msg
    1434 
    1435         # Check what happens if no time point is within interval
    1436         try:
    1437             q = get_maximum_inundation_elevation('runup_test.sww',
    1438                                                  time_interval=[2.75, 2.75])
    1439         except AssertionError:
    1440             pass
    1441         else:
    1442             msg = 'Time interval should have raised an exception'
    1443             raise Exception, msg
    1444 
    1445         # Cleanup
    1446         try:
    1447             os.remove(domain.get_name() + '.sww')
    1448         except:
    1449             pass
    1450             #FIXME(Ole): Windows won't allow removal of this
    14511259
    14521260    def test_get_flow_through_cross_section_with_geo(self):
     
    60855893        from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary
    60865894        from anuga.shallow_water.shallow_water_domain import Rainfall
    6087         from anuga.shallow_water.data_manager import get_flow_through_cross_section
     5895        from anuga.shallow_water.sww_interrogate import get_flow_through_cross_section
    60885896
    60895897        #----------------------------------------------------------------------
Note: See TracChangeset for help on using the changeset viewer.