Changeset 7770
- Timestamp:
- Jun 2, 2010, 5:17:47 PM (15 years ago)
- Location:
- trunk/anuga_core
- Files:
-
- 3 added
- 11 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/anuga_core/documentation/user_manual/demos/channel1.py
r7086 r7770 7 7 # Import necessary modules 8 8 #------------------------------------------------------------------------------ 9 10 # Import standard shallow water domain and standard boundaries. 11 import anuga 12 13 # Import rectangular cross mesh generator 9 14 from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular_cross 10 from anuga.shallow_water import Domain 11 from anuga.shallow_water import Reflective_boundary 12 from anuga.shallow_water import Dirichlet_boundary 15 13 16 14 17 #------------------------------------------------------------------------------ … … 18 21 len1=10.0, len2=5.0) # Mesh 19 22 20 domain = Domain(points, vertices, boundary) # Create domain23 domain = anuga.Domain(points, vertices, boundary) # Create domain 21 24 domain.set_name('channel1') # Output name 22 25 … … 35 38 # Setup boundary conditions 36 39 #------------------------------------------------------------------------------ 37 Bi = Dirichlet_boundary([0.4, 0, 0]) # Inflow38 Br = Reflective_boundary(domain) # Solid reflective wall40 Bi = anuga.Dirichlet_boundary([0.4, 0, 0]) # Inflow 41 Br = anuga.Reflective_boundary(domain) # Solid reflective wall 39 42 40 43 domain.set_boundary({'left': Bi, 'right': Br, 'top': Br, 'bottom': Br}) -
trunk/anuga_core/source/anuga/file/sww.py
r7767 r7770 1196 1196 1197 1197 ## 1198 # @brief Get mesh and quantity data from an SWW file. 1199 # @param filename Path to data file to read. 1200 # @param quantities UNUSED! 1201 # @param verbose True if this function is to be verbose. 1202 # @return (mesh, quantities, time) where mesh is the mesh data, quantities is 1203 # a dictionary of {name: value}, and time is the time vector. 1204 # @note Quantities extracted: 'elevation', 'stage', 'xmomentum' and 'ymomentum' 1205 def get_mesh_and_quantities_from_file(filename, 1206 quantities=None, 1207 verbose=False): 1208 """Get and rebuild mesh structure and associated quantities from sww file 1209 1210 Input: 1211 filename - Name os sww file 1212 quantities - Names of quantities to load 1213 1214 Output: 1215 mesh - instance of class Interpolate 1216 (including mesh and interpolation functionality) 1217 quantities - arrays with quantity values at each mesh node 1218 time - vector of stored timesteps 1219 1220 This function is used by e.g.: 1221 get_interpolated_quantities_at_polyline_midpoints 1222 """ 1223 1224 # FIXME (Ole): Maybe refactor filefunction using this more fundamental code. 1225 1226 import types 1227 from Scientific.IO.NetCDF import NetCDFFile 1228 from shallow_water import Domain 1229 from anuga.abstract_2d_finite_volumes.neighbour_mesh import Mesh 1230 1231 if verbose: log.critical('Reading from %s' % filename) 1232 1233 fid = NetCDFFile(filename, netcdf_mode_r) # Open existing file for read 1234 time = fid.variables['time'][:] # Time vector 1235 time += fid.starttime[0] 1236 1237 # Get the variables as numeric arrays 1238 x = fid.variables['x'][:] # x-coordinates of nodes 1239 y = fid.variables['y'][:] # y-coordinates of nodes 1240 elevation = fid.variables['elevation'][:] # Elevation 1241 stage = fid.variables['stage'][:] # Water level 1242 xmomentum = fid.variables['xmomentum'][:] # Momentum in the x-direction 1243 ymomentum = fid.variables['ymomentum'][:] # Momentum in the y-direction 1244 1245 # Mesh (nodes (Mx2), triangles (Nx3)) 1246 nodes = num.concatenate((x[:,num.newaxis], y[:,num.newaxis]), axis=1) 1247 triangles = fid.variables['volumes'][:] 1248 1249 # Get geo_reference 1250 try: 1251 geo_reference = Geo_reference(NetCDFObject=fid) 1252 except: #AttributeError, e: 1253 # Sww files don't have to have a geo_ref 1254 geo_reference = None 1255 1256 if verbose: log.critical(' building mesh from sww file %s' % filename) 1257 1258 boundary = None 1259 1260 #FIXME (Peter Row): Should this be in mesh? 1261 if fid.smoothing != 'Yes': 1262 nodes = nodes.tolist() 1263 triangles = triangles.tolist() 1264 nodes, triangles, boundary = weed(nodes, triangles, boundary) 1265 1266 try: 1267 mesh = Mesh(nodes, triangles, boundary, geo_reference=geo_reference) 1268 except AssertionError, e: 1269 fid.close() 1270 msg = 'Domain could not be created: %s. "' % e 1271 raise DataDomainError, msg 1272 1273 quantities = {} 1274 quantities['elevation'] = elevation 1275 quantities['stage'] = stage 1276 quantities['xmomentum'] = xmomentum 1277 quantities['ymomentum'] = ymomentum 1278 1279 fid.close() 1280 1281 return mesh, quantities, time 1282 1283 1284 1285 ## 1198 1286 # @brief 1199 1287 # @parm time … … 1251 1339 1252 1340 1253 1254 1255 1341 ## 1256 1342 # @brief … … 1310 1396 1311 1397 return coordinates, volumes, boundary 1398 1399 1400 -
trunk/anuga_core/source/anuga/file/test_sww.py
r7767 r7770 8 8 from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular 9 9 from anuga.shallow_water.shallow_water_domain import Domain 10 from sww import load_sww_as_domain 10 from sww import load_sww_as_domain, weed, get_mesh_and_quantities_from_file 11 11 12 12 # boundary functions … … 158 158 159 159 160 def test_get_mesh_and_quantities_from_sww_file(self): 161 """test_get_mesh_and_quantities_from_sww_file(self): 162 """ 163 164 # Generate a test sww file with non trivial georeference 165 166 import time, os 167 from Scientific.IO.NetCDF import NetCDFFile 168 169 # Setup 170 from mesh_factory import rectangular 171 172 # Create basic mesh (100m x 5m) 173 width = 5 174 length = 50 175 t_end = 10 176 points, vertices, boundary = rectangular(length, width, 50, 5) 177 178 # Create shallow water domain 179 domain = Domain(points, vertices, boundary, 180 geo_reference = Geo_reference(56,308500,6189000)) 181 182 domain.set_name('flowtest') 183 swwfile = domain.get_name() + '.sww' 184 domain.set_datadir('.') 185 186 Br = Reflective_boundary(domain) # Side walls 187 Bd = Dirichlet_boundary([1, 0, 0]) # inflow 188 189 domain.set_boundary( {'left': Bd, 'right': Bd, 'top': Br, 'bottom': Br}) 190 191 for t in domain.evolve(yieldstep=1, finaltime = t_end): 192 pass 193 194 195 # Read it 196 197 # Get mesh and quantities from sww file 198 X = get_mesh_and_quantities_from_file(swwfile, 199 quantities=['elevation', 200 'stage', 201 'xmomentum', 202 'ymomentum'], 203 verbose=False) 204 mesh, quantities, time = X 205 206 207 # Check that mesh has been recovered 208 assert num.alltrue(mesh.triangles == domain.get_triangles()) 209 assert num.allclose(mesh.nodes, domain.get_nodes()) 210 211 # Check that time has been recovered 212 assert num.allclose(time, range(t_end+1)) 213 214 # Check that quantities have been recovered 215 # (sww files use single precision) 216 z=domain.get_quantity('elevation').get_values(location='unique vertices') 217 assert num.allclose(quantities['elevation'], z) 218 219 for q in ['stage', 'xmomentum', 'ymomentum']: 220 # Get quantity at last timestep 221 q_ref=domain.get_quantity(q).get_values(location='unique vertices') 222 223 #print q,quantities[q] 224 q_sww=quantities[q][-1,:] 225 226 msg = 'Quantity %s failed to be recovered' %q 227 assert num.allclose(q_ref, q_sww, atol=1.0e-6), msg 228 229 # Cleanup 230 os.remove(swwfile) 231 232 233 234 def test_weed(self): 235 coordinates1 = [[0.,0.],[1.,0.],[1.,1.],[1.,0.],[2.,0.],[1.,1.]] 236 volumes1 = [[0,1,2],[3,4,5]] 237 boundary1= {(0,1): 'external',(1,2): 'not external',(2,0): 'external',(3,4): 'external',(4,5): 'external',(5,3): 'not external'} 238 coordinates2,volumes2,boundary2=weed(coordinates1,volumes1,boundary1) 239 240 points2 = {(0.,0.):None,(1.,0.):None,(1.,1.):None,(2.,0.):None} 241 242 assert len(points2)==len(coordinates2) 243 for i in range(len(coordinates2)): 244 coordinate = tuple(coordinates2[i]) 245 assert points2.has_key(coordinate) 246 points2[coordinate]=i 247 248 for triangle in volumes1: 249 for coordinate in triangle: 250 assert coordinates2[points2[tuple(coordinates1[coordinate])]][0]==coordinates1[coordinate][0] 251 assert coordinates2[points2[tuple(coordinates1[coordinate])]][1]==coordinates1[coordinate][1] 252 160 253 ################################################################################# 161 254 -
trunk/anuga_core/source/anuga/file_conversion/file_conversion.py
r7765 r7770 488 488 489 489 490 ## 491 # @brief 492 # @param filename 493 # @param x 494 # @param y 495 # @param z 496 def write_obj(filename, x, y, z): 497 """Store x,y,z vectors into filename (obj format). 498 499 Vectors are assumed to have dimension (M,3) where 500 M corresponds to the number elements. 501 triangles are assumed to be disconnected 502 503 The three numbers in each vector correspond to three vertices, 504 505 e.g. the x coordinate of vertex 1 of element i is in x[i,1] 506 """ 507 508 import os.path 509 510 root, ext = os.path.splitext(filename) 511 if ext == '.obj': 512 FN = filename 513 else: 514 FN = filename + '.obj' 515 516 outfile = open(FN, 'wb') 517 outfile.write("# Triangulation as an obj file\n") 518 519 M, N = x.shape 520 assert N == 3 #Assuming three vertices per element 521 522 for i in range(M): 523 for j in range(N): 524 outfile.write("v %f %f %f\n" % (x[i,j], y[i,j], z[i,j])) 525 526 for i in range(M): 527 base = i * N 528 outfile.write("f %d %d %d\n" % (base+1, base+2, base+3)) 529 530 outfile.close() 531 -
trunk/anuga_core/source/anuga/file_conversion/test_file_conversion.py
r7758 r7770 8 8 from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular 9 9 from anuga.shallow_water.sww_file import SWW_file 10 from data_managerimport extent_sww10 from anuga.file.sww import extent_sww 11 11 from anuga.config import netcdf_float, epsilon, g 12 12 from Scientific.IO.NetCDF import NetCDFFile 13 13 from anuga.file_conversion.file_conversion import tsh2sww, \ 14 14 pmesh_to_domain_instance 15 16 from anuga.file.mux import WAVEHEIGHT_MUX_LABEL, EAST_VELOCITY_LABEL, \ 17 NORTH_VELOCITY_LABEL 15 18 16 19 import sys … … 1570 1573 #print "sww_file",sww_file 1571 1574 #dem_file = tempfile.mktemp(".dem") 1572 domain = sww2domain(sww_file) ###, dem_file)1575 domain = load_sww_as_domain(sww_file) ###, dem_file) 1573 1576 domain.check_integrity() 1574 1577 -
trunk/anuga_core/source/anuga/file_conversion/urs2sww.py
r7758 r7770 1 import numpy as num 2 from Scientific.IO.NetCDF import NetCDFFile 3 4 from anuga.file.urs import Read_urs 5 6 from anuga.coordinate_transforms.redfearn import redfearn, \ 7 convert_from_latlon_to_utm 8 9 from anuga.geospatial_data.geospatial_data import ensure_absolute, \ 10 Geospatial_data 11 12 from mux import WAVEHEIGHT_MUX_LABEL, EAST_VELOCITY_LABEL, \ 13 NORTH_VELOCITY_LABEL 14 15 from anuga.utilities.numerical_tools import ensure_numeric 16 17 from anuga.config import netcdf_mode_r, netcdf_mode_w, netcdf_mode_a, \ 18 netcdf_float 19 20 from sww_file import Read_sww, Write_sww 21 22 1 23 ################################################################################ 2 24 # CONVERTING UNGRIDDED URS DATA TO AN SWW FILE 3 25 ################################################################################ 4 5 WAVEHEIGHT_MUX_LABEL = '-z-mux'6 EAST_VELOCITY_LABEL = '-e-mux'7 NORTH_VELOCITY_LABEL = '-n-mux'8 26 9 27 ## … … 95 113 mux = {} 96 114 for quantity, file in map(None, quantities, files_in): 97 mux[quantity] = Urs_points(file)115 mux[quantity] = Read_urs(file) 98 116 99 117 # Could check that the depth is the same. (hashing) … … 143 161 for i in range(a_mux.time_step_count): 144 162 mux_times.append(a_mux.time_step * i) 145 (mux_times_start_i, mux_times_fin_i) = mux2sww_time(mux_times, mint, maxt)163 (mux_times_start_i, mux_times_fin_i) = read_time_from_mux(mux_times, mint, maxt) 146 164 times = mux_times[mux_times_start_i:mux_times_fin_i] 147 165 … … 209 227 210 228 # Do some conversions while writing the sww file 229 230 231 def read_time_from_mux(mux_times, mint, maxt): 232 """ 233 Read a list of mux times. 234 Return start and finish times which lie within the passed time period. 235 """ 236 237 if mint == None: 238 mux_times_start_i = 0 239 else: 240 mux_times_start_i = num.searchsorted(mux_times, mint) 241 242 if maxt == None: 243 mux_times_fin_i = len(mux_times) 244 else: 245 maxt += 0.5 # so if you specify a time where there is 246 # data that time will be included 247 mux_times_fin_i = num.searchsorted(mux_times, maxt) 248 249 return mux_times_start_i, mux_times_fin_i 250 -
trunk/anuga_core/source/anuga/file_conversion/urs2txt.py
r7758 r7770 1 1 from anuga.file.urs import Read_urs 2 2 3 3 ## … … 20 20 mux = {} 21 21 for quantity, file in map(None, quantities, files_in): 22 mux[quantity] = Urs_points(file)22 mux[quantity] = Read_urs(file) 23 23 24 24 # Could check that the depth is the same. (hashing) -
trunk/anuga_core/source/anuga/pmesh/mesh_interface.py
r7711 r7770 26 26 interior_regions=None, 27 27 interior_holes=None, 28 hole_tags=None, 28 29 poly_geo_reference=None, 29 30 mesh_geo_reference=None, … … 53 54 throw an error 54 55 55 Interior_holes is a list of ploygons for each hole. 56 Interior_holes is a list of polygons for each hole. 57 hole_tags is an optional list of boundary tags for the holes, see 58 boundary_tags parameter. 56 59 57 60 This function does not allow segments to share points - use underlying … … 92 95 'interior_regions': interior_regions, 93 96 'interior_holes': interior_holes, 97 'hole_tags': hole_tags, 94 98 'poly_geo_reference': poly_geo_reference, 95 99 'mesh_geo_reference': mesh_geo_reference, … … 127 131 interior_regions=None, 128 132 interior_holes=None, 133 hole_tags=None, 129 134 poly_geo_reference=None, 130 135 mesh_geo_reference=None, … … 311 316 for polygon in interior_holes: 312 317 m.add_hole_from_polygon(polygon, 318 segment_tags=hole_tags, 313 319 geo_reference=poly_geo_reference) 314 320 -
trunk/anuga_core/source/anuga/shallow_water/data_manager.py
r7769 r7770 268 268 269 269 270 ##271 # @brief272 # @param filename273 # @param x274 # @param y275 # @param z276 def write_obj(filename, x, y, z):277 """Store x,y,z vectors into filename (obj format).278 279 Vectors are assumed to have dimension (M,3) where280 M corresponds to the number elements.281 triangles are assumed to be disconnected282 283 The three numbers in each vector correspond to three vertices,284 285 e.g. the x coordinate of vertex 1 of element i is in x[i,1]286 """287 288 import os.path289 290 root, ext = os.path.splitext(filename)291 if ext == '.obj':292 FN = filename293 else:294 FN = filename + '.obj'295 296 outfile = open(FN, 'wb')297 outfile.write("# Triangulation as an obj file\n")298 299 M, N = x.shape300 assert N == 3 #Assuming three vertices per element301 302 for i in range(M):303 for j in range(N):304 outfile.write("v %f %f %f\n" % (x[i,j], y[i,j], z[i,j]))305 306 for i in range(M):307 base = i * N308 outfile.write("f %d %d %d\n" % (base+1, base+2, base+3))309 310 outfile.close()311 270 312 271 ## … … 949 908 log.critical(msg) 950 909 951 ################################################################################952 # Functions to obtain diagnostics from sww files953 ################################################################################954 955 956 957 ##958 # @brief Get values for quantities interpolated to polyline midpoints from SWW.959 # @param filename Path to file to read.960 # @param quantity_names Quantity names to get.961 # @param polyline Representation of desired cross-section.962 # @param verbose True if this function is to be verbose.963 # @return (segments, i_func) where segments is a list of Triangle_intersection964 # instances and i_func is an instance of Interpolation_function.965 # @note For 'polyline' assume absolute UTM coordinates.966 def get_interpolated_quantities_at_polyline_midpoints(filename,967 quantity_names=None,968 polyline=None,969 verbose=False):970 """Get values for quantities interpolated to polyline midpoints from SWW971 972 Input:973 filename - Name of sww file974 quantity_names - Names of quantities to load975 polyline: Representation of desired cross section - it may contain976 multiple sections allowing for complex shapes. Assume977 absolute UTM coordinates.978 Format [[x0, y0], [x1, y1], ...]979 980 Output:981 segments: list of instances of class Triangle_intersection982 interpolation_function: Instance of class Interpolation_function983 984 985 This function is used by get_flow_through_cross_section and986 get_energy_through_cross_section987 """988 989 from anuga.fit_interpolate.interpolate import Interpolation_function990 991 # Get mesh and quantities from sww file992 X = get_mesh_and_quantities_from_file(filename,993 quantities=quantity_names,994 verbose=verbose)995 mesh, quantities, time = X996 997 # Find all intersections and associated triangles.998 segments = mesh.get_intersecting_segments(polyline, verbose=verbose)999 1000 # Get midpoints1001 interpolation_points = segment_midpoints(segments)1002 1003 # Interpolate1004 if verbose:1005 log.critical('Interpolating - total number of interpolation points = %d'1006 % len(interpolation_points))1007 1008 I = Interpolation_function(time,1009 quantities,1010 quantity_names=quantity_names,1011 vertex_coordinates=mesh.nodes,1012 triangles=mesh.triangles,1013 interpolation_points=interpolation_points,1014 verbose=verbose)1015 1016 return segments, I1017 1018 1019 ##1020 # @brief Obtain flow (m^3/s) perpendicular to specified cross section.1021 # @param filename Path to file to read.1022 # @param polyline Representation of desired cross-section.1023 # @param verbose Trie if this function is to be verbose.1024 # @return (time, Q) where time and Q are lists of time and flow respectively.1025 def get_flow_through_cross_section(filename, polyline, verbose=False):1026 """Obtain flow (m^3/s) perpendicular to specified cross section.1027 1028 Inputs:1029 filename: Name of sww file1030 polyline: Representation of desired cross section - it may contain1031 multiple sections allowing for complex shapes. Assume1032 absolute UTM coordinates.1033 Format [[x0, y0], [x1, y1], ...]1034 1035 Output:1036 time: All stored times in sww file1037 Q: Hydrograph of total flow across given segments for all stored times.1038 1039 The normal flow is computed for each triangle intersected by the polyline1040 and added up. Multiple segments at different angles are specified the1041 normal flows may partially cancel each other.1042 1043 The typical usage of this function would be to get flow through a channel,1044 and the polyline would then be a cross section perpendicular to the flow.1045 """1046 1047 quantity_names =['elevation',1048 'stage',1049 'xmomentum',1050 'ymomentum']1051 1052 # Get values for quantities at each midpoint of poly line from sww file1053 X = get_interpolated_quantities_at_polyline_midpoints(filename,1054 quantity_names=\1055 quantity_names,1056 polyline=polyline,1057 verbose=verbose)1058 segments, interpolation_function = X1059 1060 # Get vectors for time and interpolation_points1061 time = interpolation_function.time1062 interpolation_points = interpolation_function.interpolation_points1063 1064 if verbose: log.critical('Computing hydrograph')1065 1066 # Compute hydrograph1067 Q = []1068 for t in time:1069 total_flow = 01070 for i in range(len(interpolation_points)):1071 elevation, stage, uh, vh = interpolation_function(t, point_id=i)1072 normal = segments[i].normal1073 1074 # Inner product of momentum vector with segment normal [m^2/s]1075 normal_momentum = uh*normal[0] + vh*normal[1]1076 1077 # Flow across this segment [m^3/s]1078 segment_flow = normal_momentum * segments[i].length1079 1080 # Accumulate1081 total_flow += segment_flow1082 1083 # Store flow at this timestep1084 Q.append(total_flow)1085 1086 1087 return time, Q1088 1089 1090 ##1091 # @brief Get average energy across a cross-section.1092 # @param filename Path to file of interest.1093 # @param polyline Representation of desired cross-section.1094 # @param kind Select energy to compute: 'specific' or 'total'.1095 # @param verbose True if this function is to be verbose.1096 # @return (time, E) where time and E are lists of timestep and energy.1097 def get_energy_through_cross_section(filename,1098 polyline,1099 kind='total',1100 verbose=False):1101 """Obtain average energy head [m] across specified cross section.1102 1103 Inputs:1104 polyline: Representation of desired cross section - it may contain1105 multiple sections allowing for complex shapes. Assume1106 absolute UTM coordinates.1107 Format [[x0, y0], [x1, y1], ...]1108 kind: Select which energy to compute.1109 Options are 'specific' and 'total' (default)1110 1111 Output:1112 E: Average energy [m] across given segments for all stored times.1113 1114 The average velocity is computed for each triangle intersected by1115 the polyline and averaged weighted by segment lengths.1116 1117 The typical usage of this function would be to get average energy of1118 flow in a channel, and the polyline would then be a cross section1119 perpendicular to the flow.1120 1121 #FIXME (Ole) - need name for this energy reflecting that its dimension1122 is [m].1123 """1124 1125 from anuga.config import g, epsilon, velocity_protection as h01126 1127 quantity_names =['elevation',1128 'stage',1129 'xmomentum',1130 'ymomentum']1131 1132 # Get values for quantities at each midpoint of poly line from sww file1133 X = get_interpolated_quantities_at_polyline_midpoints(filename,1134 quantity_names=\1135 quantity_names,1136 polyline=polyline,1137 verbose=verbose)1138 segments, interpolation_function = X1139 1140 # Get vectors for time and interpolation_points1141 time = interpolation_function.time1142 interpolation_points = interpolation_function.interpolation_points1143 1144 if verbose: log.critical('Computing %s energy' % kind)1145 1146 # Compute total length of polyline for use with weighted averages1147 total_line_length = 0.01148 for segment in segments:1149 total_line_length += segment.length1150 1151 # Compute energy1152 E = []1153 for t in time:1154 average_energy = 0.01155 for i, p in enumerate(interpolation_points):1156 elevation, stage, uh, vh = interpolation_function(t, point_id=i)1157 1158 # Depth1159 h = depth = stage-elevation1160 1161 # Average velocity across this segment1162 if h > epsilon:1163 # Use protection against degenerate velocities1164 u = uh / (h + h0/h)1165 v = vh / (h + h0/h)1166 else:1167 u = v = 0.01168 1169 speed_squared = u*u + v*v1170 kinetic_energy = 0.5 * speed_squared / g1171 1172 if kind == 'specific':1173 segment_energy = depth + kinetic_energy1174 elif kind == 'total':1175 segment_energy = stage + kinetic_energy1176 else:1177 msg = 'Energy kind must be either "specific" or "total". '1178 msg += 'I got %s' % kind1179 1180 # Add to weighted average1181 weigth = segments[i].length / total_line_length1182 average_energy += segment_energy * weigth1183 1184 # Store energy at this timestep1185 E.append(average_energy)1186 1187 return time, E1188 1189 1190 ##1191 # @brief Return highest elevation where depth > 0.1192 # @param filename Path to SWW file of interest.1193 # @param polygon If specified resrict to points inside this polygon.1194 # @param time_interval If specified resrict to within the time specified.1195 # @param verbose True if this function is to be verbose.1196 def get_maximum_inundation_elevation(filename,1197 polygon=None,1198 time_interval=None,1199 verbose=False):1200 """Return highest elevation where depth > 01201 1202 Usage:1203 max_runup = get_maximum_inundation_elevation(filename,1204 polygon=None,1205 time_interval=None,1206 verbose=False)1207 1208 filename is a NetCDF sww file containing ANUGA model output.1209 Optional arguments polygon and time_interval restricts the maximum1210 runup calculation1211 to a points that lie within the specified polygon and time interval.1212 1213 If no inundation is found within polygon and time_interval the return value1214 is None signifying "No Runup" or "Everything is dry".1215 1216 See general function get_maximum_inundation_data for details.1217 """1218 1219 runup, _ = get_maximum_inundation_data(filename,1220 polygon=polygon,1221 time_interval=time_interval,1222 verbose=verbose)1223 return runup1224 1225 1226 ##1227 # @brief Return location of highest elevation where h > 01228 # @param filename Path to SWW file to read.1229 # @param polygon If specified resrict to points inside this polygon.1230 # @param time_interval If specified resrict to within the time specified.1231 # @param verbose True if this function is to be verbose.1232 def get_maximum_inundation_location(filename,1233 polygon=None,1234 time_interval=None,1235 verbose=False):1236 """Return location of highest elevation where h > 01237 1238 Usage:1239 max_runup_location = get_maximum_inundation_location(filename,1240 polygon=None,1241 time_interval=None,1242 verbose=False)1243 1244 filename is a NetCDF sww file containing ANUGA model output.1245 Optional arguments polygon and time_interval restricts the maximum1246 runup calculation1247 to a points that lie within the specified polygon and time interval.1248 1249 If no inundation is found within polygon and time_interval the return value1250 is None signifying "No Runup" or "Everything is dry".1251 1252 See general function get_maximum_inundation_data for details.1253 """1254 1255 _, max_loc = get_maximum_inundation_data(filename,1256 polygon=polygon,1257 time_interval=time_interval,1258 verbose=verbose)1259 return max_loc1260 1261 1262 ##1263 # @brief Compute maximum run up height from SWW file.1264 # @param filename Path to SWW file to read.1265 # @param polygon If specified resrict to points inside this polygon.1266 # @param time_interval If specified resrict to within the time specified.1267 # @param use_centroid_values1268 # @param verbose True if this function is to be verbose.1269 # @return (maximal_runup, maximal_runup_location)1270 def get_maximum_inundation_data(filename, polygon=None, time_interval=None,1271 use_centroid_values=False,1272 verbose=False):1273 """Compute maximum run up height from sww file.1274 1275 Usage:1276 runup, location = get_maximum_inundation_data(filename,1277 polygon=None,1278 time_interval=None,1279 verbose=False)1280 1281 Algorithm is as in get_maximum_inundation_elevation from1282 shallow_water_domain except that this function works with the sww file and1283 computes the maximal runup height over multiple timesteps.1284 1285 Optional arguments polygon and time_interval restricts the maximum runup1286 calculation to a points that lie within the specified polygon and time1287 interval.1288 1289 Polygon is assumed to be in (absolute) UTM coordinates in the same zone1290 as domain.1291 1292 If no inundation is found within polygon and time_interval the return value1293 is None signifying "No Runup" or "Everything is dry".1294 """1295 1296 # We are using nodal values here as that is what is stored in sww files.1297 1298 # Water depth below which it is considered to be 0 in the model1299 # FIXME (Ole): Allow this to be specified as a keyword argument as well1300 1301 from anuga.geometry.polygon import inside_polygon1302 from anuga.config import minimum_allowed_height1303 from Scientific.IO.NetCDF import NetCDFFile1304 1305 dir, base = os.path.split(filename)1306 1307 iterate_over = get_all_swwfiles(dir, base)1308 1309 # Read sww file1310 if verbose: log.critical('Reading from %s' % filename)1311 # FIXME: Use general swwstats (when done)1312 1313 maximal_runup = None1314 maximal_runup_location = None1315 1316 for file, swwfile in enumerate (iterate_over):1317 # Read sww file1318 filename = join(dir, swwfile+'.sww')1319 1320 if verbose: log.critical('Reading from %s' % filename)1321 # FIXME: Use general swwstats (when done)1322 1323 fid = NetCDFFile(filename)1324 1325 # Get geo_reference1326 # sww files don't have to have a geo_ref1327 try:1328 geo_reference = Geo_reference(NetCDFObject=fid)1329 except AttributeError, e:1330 geo_reference = Geo_reference() # Default georef object1331 1332 xllcorner = geo_reference.get_xllcorner()1333 yllcorner = geo_reference.get_yllcorner()1334 zone = geo_reference.get_zone()1335 1336 # Get extent1337 volumes = fid.variables['volumes'][:]1338 x = fid.variables['x'][:] + xllcorner1339 y = fid.variables['y'][:] + yllcorner1340 1341 # Get the relevant quantities (Convert from single precison)1342 elevation = num.array(fid.variables['elevation'][:], num.float)1343 stage = num.array(fid.variables['stage'][:], num.float)1344 1345 # Here's where one could convert nodal information to centroid1346 # information but is probably something we need to write in C.1347 # Here's a Python thought which is NOT finished!!!1348 if use_centroid_values is True:1349 x = get_centroid_values(x, volumes)1350 y = get_centroid_values(y, volumes)1351 elevation = get_centroid_values(elevation, volumes)1352 1353 # Spatial restriction1354 if polygon is not None:1355 msg = 'polygon must be a sequence of points.'1356 assert len(polygon[0]) == 2, msg1357 # FIXME (Ole): Make a generic polygon input check in polygon.py1358 # and call it here1359 points = num.ascontiguousarray(num.concatenate((x[:,num.newaxis],1360 y[:,num.newaxis]),1361 axis=1))1362 point_indices = inside_polygon(points, polygon)1363 1364 # Restrict quantities to polygon1365 elevation = num.take(elevation, point_indices, axis=0)1366 stage = num.take(stage, point_indices, axis=1)1367 1368 # Get info for location of maximal runup1369 points_in_polygon = num.take(points, point_indices, axis=0)1370 1371 x = points_in_polygon[:,0]1372 y = points_in_polygon[:,1]1373 else:1374 # Take all points1375 point_indices = num.arange(len(x))1376 1377 # Temporal restriction1378 time = fid.variables['time'][:]1379 all_timeindices = num.arange(len(time))1380 if time_interval is not None:1381 msg = 'time_interval must be a sequence of length 2.'1382 assert len(time_interval) == 2, msg1383 msg = 'time_interval %s must not be decreasing.' % time_interval1384 assert time_interval[1] >= time_interval[0], msg1385 msg = 'Specified time interval [%.8f:%.8f] ' % tuple(time_interval)1386 msg += 'must does not match model time interval: [%.8f, %.8f]\n' \1387 % (time[0], time[-1])1388 if time_interval[1] < time[0]: raise ValueError(msg)1389 if time_interval[0] > time[-1]: raise ValueError(msg)1390 1391 # Take time indices corresponding to interval (& is bitwise AND)1392 timesteps = num.compress((time_interval[0] <= time) \1393 & (time <= time_interval[1]),1394 all_timeindices)1395 1396 msg = 'time_interval %s did not include any model timesteps.' \1397 % time_interval1398 assert not num.alltrue(timesteps == 0), msg1399 else:1400 # Take them all1401 timesteps = all_timeindices1402 1403 fid.close()1404 1405 # Compute maximal runup for each timestep1406 #maximal_runup = None1407 #maximal_runup_location = None1408 #maximal_runups = [None]1409 #maximal_runup_locations = [None]1410 1411 for i in timesteps:1412 if use_centroid_values is True:1413 stage_i = get_centroid_values(stage[i,:], volumes)1414 else:1415 stage_i = stage[i,:]1416 1417 depth = stage_i - elevation1418 1419 # Get wet nodes i.e. nodes with depth>0 within given region1420 # and timesteps1421 wet_nodes = num.compress(depth > minimum_allowed_height,1422 num.arange(len(depth)))1423 1424 if num.alltrue(wet_nodes == 0):1425 runup = None1426 else:1427 # Find maximum elevation among wet nodes1428 wet_elevation = num.take(elevation, wet_nodes, axis=0)1429 runup_index = num.argmax(wet_elevation)1430 runup = max(wet_elevation)1431 assert wet_elevation[runup_index] == runup # Must be True1432 1433 if runup > maximal_runup:1434 maximal_runup = runup # works even if maximal_runup is None1435 1436 # Record location1437 wet_x = num.take(x, wet_nodes, axis=0)1438 wet_y = num.take(y, wet_nodes, axis=0)1439 maximal_runup_location = [wet_x[runup_index],wet_y[runup_index]]1440 1441 return maximal_runup, maximal_runup_location1442 910 1443 911 -
trunk/anuga_core/source/anuga/shallow_water/test_data_manager.py
r7769 r7770 1351 1351 assert min(min(domain2.get_quantity('ymomentum').get_values()))==filler 1352 1352 1353 1354 1355 def test_weed(self):1356 from data_manager import weed1357 1358 coordinates1 = [[0.,0.],[1.,0.],[1.,1.],[1.,0.],[2.,0.],[1.,1.]]1359 volumes1 = [[0,1,2],[3,4,5]]1360 boundary1= {(0,1): 'external',(1,2): 'not external',(2,0): 'external',(3,4): 'external',(4,5): 'external',(5,3): 'not external'}1361 coordinates2,volumes2,boundary2=weed(coordinates1,volumes1,boundary1)1362 1363 points2 = {(0.,0.):None,(1.,0.):None,(1.,1.):None,(2.,0.):None}1364 1365 assert len(points2)==len(coordinates2)1366 for i in range(len(coordinates2)):1367 coordinate = tuple(coordinates2[i])1368 assert points2.has_key(coordinate)1369 points2[coordinate]=i1370 1371 for triangle in volumes1:1372 for coordinate in triangle:1373 assert coordinates2[points2[tuple(coordinates1[coordinate])]][0]==coordinates1[coordinate][0]1374 assert coordinates2[points2[tuple(coordinates1[coordinate])]][1]==coordinates1[coordinate][1]1375 1353 1376 1354 … … 2488 2466 2489 2467 2490 def test_get_maximum_inundation(self):2491 """Test that sww information can be converted correctly to maximum2492 runup elevation and location (without and with georeferencing)2493 2494 This test creates a slope and a runup which is maximal (~11m) at around 10s2495 and levels out to the boundary condition (1m) at about 30s.2496 """2497 2498 import time, os2499 from Scientific.IO.NetCDF import NetCDFFile2500 2501 #Setup2502 2503 from mesh_factory import rectangular2504 2505 # Create basic mesh (100m x 100m)2506 points, vertices, boundary = rectangular(20, 5, 100, 50)2507 2508 # Create shallow water domain2509 domain = Domain(points, vertices, boundary)2510 domain.default_order = 22511 domain.set_minimum_storable_height(0.01)2512 2513 domain.set_name('runuptest')2514 swwfile = domain.get_name() + '.sww'2515 2516 domain.set_datadir('.')2517 domain.format = 'sww'2518 domain.smooth = True2519 2520 # FIXME (Ole): Backwards compatibility2521 # Look at sww file and see what happens when2522 # domain.tight_slope_limiters = 12523 domain.tight_slope_limiters = 02524 domain.use_centroid_velocities = 0 # Backwards compatibility (7/5/8)2525 2526 Br = Reflective_boundary(domain)2527 Bd = Dirichlet_boundary([1.0,0,0])2528 2529 2530 #---------- First run without geo referencing2531 2532 domain.set_quantity('elevation', lambda x,y: -0.2*x + 14) # Slope2533 domain.set_quantity('stage', -6)2534 domain.set_boundary( {'left': Br, 'right': Bd, 'top': Br, 'bottom': Br})2535 2536 for t in domain.evolve(yieldstep=1, finaltime = 50):2537 pass2538 2539 2540 # Check maximal runup2541 runup = get_maximum_inundation_elevation(swwfile)2542 location = get_maximum_inundation_location(swwfile)2543 #print 'Runup, location', runup, location2544 assert num.allclose(runup, 11) or num.allclose(runup, 12) # old limiters2545 assert num.allclose(location[0], 15) or num.allclose(location[0], 10)2546 2547 # Check final runup2548 runup = get_maximum_inundation_elevation(swwfile, time_interval=[45,50])2549 location = get_maximum_inundation_location(swwfile, time_interval=[45,50])2550 # print 'Runup, location:',runup, location2551 assert num.allclose(runup, 1)2552 assert num.allclose(location[0], 65)2553 2554 # Check runup restricted to a polygon2555 p = [[50,1], [99,1], [99,49], [50,49]]2556 runup = get_maximum_inundation_elevation(swwfile, polygon=p)2557 location = get_maximum_inundation_location(swwfile, polygon=p)2558 #print runup, location2559 assert num.allclose(runup, 4)2560 assert num.allclose(location[0], 50)2561 2562 # Check that mimimum_storable_height works2563 fid = NetCDFFile(swwfile, netcdf_mode_r) # Open existing file2564 2565 stage = fid.variables['stage'][:]2566 z = fid.variables['elevation'][:]2567 xmomentum = fid.variables['xmomentum'][:]2568 ymomentum = fid.variables['ymomentum'][:]2569 2570 2571 2572 for i in range(stage.shape[0]):2573 h = stage[i]-z # depth vector at time step i2574 2575 # Check every node location2576 for j in range(stage.shape[1]):2577 # Depth being either exactly zero implies2578 # momentum being zero.2579 # Or else depth must be greater than or equal to2580 # the minimal storable height2581 if h[j] == 0.0:2582 assert xmomentum[i,j] == 0.02583 assert ymomentum[i,j] == 0.02584 else:2585 assert h[j] >= domain.minimum_storable_height2586 2587 fid.close()2588 2589 # Cleanup2590 os.remove(swwfile)2591 2592 2593 2594 #------------- Now the same with georeferencing2595 2596 domain.time=0.02597 E = 3085002598 N = 61890002599 #E = N = 02600 domain.geo_reference = Geo_reference(56, E, N)2601 2602 domain.set_quantity('elevation', lambda x,y: -0.2*x + 14) # Slope2603 domain.set_quantity('stage', -6)2604 domain.set_boundary( {'left': Br, 'right': Bd, 'top': Br, 'bottom': Br})2605 2606 for t in domain.evolve(yieldstep=1, finaltime = 50):2607 pass2608 2609 # Check maximal runup2610 runup = get_maximum_inundation_elevation(swwfile)2611 location = get_maximum_inundation_location(swwfile)2612 assert num.allclose(runup, 11) or num.allclose(runup, 12) # old limiters2613 assert num.allclose(location[0], 15+E) or num.allclose(location[0], 10+E)2614 2615 # Check final runup2616 runup = get_maximum_inundation_elevation(swwfile, time_interval=[45,50])2617 location = get_maximum_inundation_location(swwfile, time_interval=[45,50])2618 assert num.allclose(runup, 1)2619 assert num.allclose(location[0], 65+E)2620 2621 # Check runup restricted to a polygon2622 p = num.array([[50,1], [99,1], [99,49], [50,49]], num.int) + num.array([E, N], num.int) #array default#2623 2624 runup = get_maximum_inundation_elevation(swwfile, polygon=p)2625 location = get_maximum_inundation_location(swwfile, polygon=p)2626 assert num.allclose(runup, 4)2627 assert num.allclose(location[0], 50+E)2628 2629 2630 # Cleanup2631 os.remove(swwfile)2632 2633 2634 def test_get_mesh_and_quantities_from_sww_file(self):2635 """test_get_mesh_and_quantities_from_sww_file(self):2636 """2637 2638 # Generate a test sww file with non trivial georeference2639 2640 import time, os2641 from Scientific.IO.NetCDF import NetCDFFile2642 2643 # Setup2644 from mesh_factory import rectangular2645 2646 # Create basic mesh (100m x 5m)2647 width = 52648 length = 502649 t_end = 102650 points, vertices, boundary = rectangular(length, width, 50, 5)2651 2652 # Create shallow water domain2653 domain = Domain(points, vertices, boundary,2654 geo_reference = Geo_reference(56,308500,6189000))2655 2656 domain.set_name('flowtest')2657 swwfile = domain.get_name() + '.sww'2658 domain.set_datadir('.')2659 2660 Br = Reflective_boundary(domain) # Side walls2661 Bd = Dirichlet_boundary([1, 0, 0]) # inflow2662 2663 domain.set_boundary( {'left': Bd, 'right': Bd, 'top': Br, 'bottom': Br})2664 2665 for t in domain.evolve(yieldstep=1, finaltime = t_end):2666 pass2667 2668 2669 # Read it2670 2671 # Get mesh and quantities from sww file2672 X = get_mesh_and_quantities_from_file(swwfile,2673 quantities=['elevation',2674 'stage',2675 'xmomentum',2676 'ymomentum'],2677 verbose=False)2678 mesh, quantities, time = X2679 2680 2681 # Check that mesh has been recovered2682 assert num.alltrue(mesh.triangles == domain.get_triangles())2683 assert num.allclose(mesh.nodes, domain.get_nodes())2684 2685 # Check that time has been recovered2686 assert num.allclose(time, range(t_end+1))2687 2688 # Check that quantities have been recovered2689 # (sww files use single precision)2690 z=domain.get_quantity('elevation').get_values(location='unique vertices')2691 assert num.allclose(quantities['elevation'], z)2692 2693 for q in ['stage', 'xmomentum', 'ymomentum']:2694 # Get quantity at last timestep2695 q_ref=domain.get_quantity(q).get_values(location='unique vertices')2696 2697 #print q,quantities[q]2698 q_sww=quantities[q][-1,:]2699 2700 msg = 'Quantity %s failed to be recovered' %q2701 assert num.allclose(q_ref, q_sww, atol=1.0e-6), msg2702 2703 # Cleanup2704 os.remove(swwfile)2705 2706 2707 def test_get_flow_through_cross_section(self):2708 """test_get_flow_through_cross_section(self):2709 2710 Test that the total flow through a cross section can be2711 correctly obtained from an sww file.2712 2713 This test creates a flat bed with a known flow through it and tests2714 that the function correctly returns the expected flow.2715 2716 The specifics are2717 u = 2 m/s2718 h = 1 m2719 w = 3 m (width of channel)2720 2721 q = u*h*w = 6 m^3/s2722 2723 #---------- First run without geo referencing2724 2725 """2726 2727 import time, os2728 from Scientific.IO.NetCDF import NetCDFFile2729 2730 # Setup2731 from mesh_factory import rectangular2732 2733 # Create basic mesh (20m x 3m)2734 width = 32735 length = 202736 t_end = 32737 points, vertices, boundary = rectangular(length, width,2738 length, width)2739 2740 # Create shallow water domain2741 domain = Domain(points, vertices, boundary)2742 domain.default_order = 22743 domain.set_minimum_storable_height(0.01)2744 2745 domain.set_name('flowtest')2746 swwfile = domain.get_name() + '.sww'2747 2748 domain.set_datadir('.')2749 domain.format = 'sww'2750 domain.smooth = True2751 2752 h = 1.02753 u = 2.02754 uh = u*h2755 2756 Br = Reflective_boundary(domain) # Side walls2757 Bd = Dirichlet_boundary([h, uh, 0]) # 2 m/s across the 3 m inlet:2758 2759 2760 2761 domain.set_quantity('elevation', 0.0)2762 domain.set_quantity('stage', h)2763 domain.set_quantity('xmomentum', uh)2764 domain.set_boundary( {'left': Bd, 'right': Bd, 'top': Br, 'bottom': Br})2765 2766 for t in domain.evolve(yieldstep=1, finaltime = t_end):2767 pass2768 2769 # Check that momentum is as it should be in the interior2770 2771 I = [[0, width/2.],2772 [length/2., width/2.],2773 [length, width/2.]]2774 2775 f = file_function(swwfile,2776 quantities=['stage', 'xmomentum', 'ymomentum'],2777 interpolation_points=I,2778 verbose=False)2779 for t in range(t_end+1):2780 for i in range(3):2781 assert num.allclose(f(t, i), [1, 2, 0], atol=1.0e-6)2782 2783 2784 # Check flows through the middle2785 for i in range(5):2786 x = length/2. + i*0.23674563 # Arbitrary2787 cross_section = [[x, 0], [x, width]]2788 time, Q = get_flow_through_cross_section(swwfile,2789 cross_section,2790 verbose=False)2791 2792 assert num.allclose(Q, uh*width)2793 2794 2795 2796 # Try the same with partial lines2797 x = length/2.2798 for i in range(5):2799 start_point = [length/2., i*width/5.]2800 #print start_point2801 2802 cross_section = [start_point, [length/2., width]]2803 time, Q = get_flow_through_cross_section(swwfile,2804 cross_section,2805 verbose=False)2806 2807 #print i, Q, (width-start_point[1])2808 assert num.allclose(Q, uh*(width-start_point[1]))2809 2810 2811 # Verify no flow when line is parallel to flow2812 cross_section = [[length/2.-10, width/2.], [length/2.+10, width/2.]]2813 time, Q = get_flow_through_cross_section(swwfile,2814 cross_section,2815 verbose=False)2816 2817 #print i, Q2818 assert num.allclose(Q, 0, atol=1.0e-5)2819 2820 2821 # Try with lines on an angle (all flow still runs through here)2822 cross_section = [[length/2., 0], [length/2.+width, width]]2823 time, Q = get_flow_through_cross_section(swwfile,2824 cross_section,2825 verbose=False)2826 2827 assert num.allclose(Q, uh*width)2828 2829 2830 2831 2832 def test_get_flow_through_cross_section_with_geo(self):2833 """test_get_flow_through_cross_section(self):2834 2835 Test that the total flow through a cross section can be2836 correctly obtained from an sww file.2837 2838 This test creates a flat bed with a known flow through it and tests2839 that the function correctly returns the expected flow.2840 2841 The specifics are2842 u = 2 m/s2843 h = 2 m2844 w = 3 m (width of channel)2845 2846 q = u*h*w = 12 m^3/s2847 2848 2849 This run tries it with georeferencing and with elevation = -12850 2851 """2852 2853 import time, os2854 from Scientific.IO.NetCDF import NetCDFFile2855 2856 # Setup2857 from mesh_factory import rectangular2858 2859 # Create basic mesh (20m x 3m)2860 width = 32861 length = 202862 t_end = 12863 points, vertices, boundary = rectangular(length, width,2864 length, width)2865 2866 # Create shallow water domain2867 domain = Domain(points, vertices, boundary,2868 geo_reference = Geo_reference(56,308500,6189000))2869 2870 domain.default_order = 22871 domain.set_minimum_storable_height(0.01)2872 2873 domain.set_name('flowtest')2874 swwfile = domain.get_name() + '.sww'2875 2876 domain.set_datadir('.')2877 domain.format = 'sww'2878 domain.smooth = True2879 2880 e = -1.02881 w = 1.02882 h = w-e2883 u = 2.02884 uh = u*h2885 2886 Br = Reflective_boundary(domain) # Side walls2887 Bd = Dirichlet_boundary([w, uh, 0]) # 2 m/s across the 3 m inlet:2888 2889 2890 2891 2892 domain.set_quantity('elevation', e)2893 domain.set_quantity('stage', w)2894 domain.set_quantity('xmomentum', uh)2895 domain.set_boundary( {'left': Bd, 'right': Bd, 'top': Br, 'bottom': Br})2896 2897 for t in domain.evolve(yieldstep=1, finaltime = t_end):2898 pass2899 2900 # Check that momentum is as it should be in the interior2901 2902 I = [[0, width/2.],2903 [length/2., width/2.],2904 [length, width/2.]]2905 2906 I = domain.geo_reference.get_absolute(I)2907 f = file_function(swwfile,2908 quantities=['stage', 'xmomentum', 'ymomentum'],2909 interpolation_points=I,2910 verbose=False)2911 2912 for t in range(t_end+1):2913 for i in range(3):2914 #print i, t, f(t, i)2915 assert num.allclose(f(t, i), [w, uh, 0], atol=1.0e-6)2916 2917 2918 # Check flows through the middle2919 for i in range(5):2920 x = length/2. + i*0.23674563 # Arbitrary2921 cross_section = [[x, 0], [x, width]]2922 2923 cross_section = domain.geo_reference.get_absolute(cross_section)2924 time, Q = get_flow_through_cross_section(swwfile,2925 cross_section,2926 verbose=False)2927 2928 assert num.allclose(Q, uh*width)2929 2930 2931 2932 def test_get_energy_through_cross_section(self):2933 """test_get_energy_through_cross_section(self):2934 2935 Test that the specific and total energy through a cross section can be2936 correctly obtained from an sww file.2937 2938 This test creates a flat bed with a known flow through it and tests2939 that the function correctly returns the expected energies.2940 2941 The specifics are2942 u = 2 m/s2943 h = 1 m2944 w = 3 m (width of channel)2945 2946 q = u*h*w = 6 m^3/s2947 Es = h + 0.5*v*v/g # Specific energy head [m]2948 Et = w + 0.5*v*v/g # Total energy head [m]2949 2950 2951 This test uses georeferencing2952 2953 """2954 2955 import time, os2956 from Scientific.IO.NetCDF import NetCDFFile2957 2958 # Setup2959 from mesh_factory import rectangular2960 2961 # Create basic mesh (20m x 3m)2962 width = 32963 length = 202964 t_end = 12965 points, vertices, boundary = rectangular(length, width,2966 length, width)2967 2968 # Create shallow water domain2969 domain = Domain(points, vertices, boundary,2970 geo_reference = Geo_reference(56,308500,6189000))2971 2972 domain.default_order = 22973 domain.set_minimum_storable_height(0.01)2974 2975 domain.set_name('flowtest')2976 swwfile = domain.get_name() + '.sww'2977 2978 domain.set_datadir('.')2979 domain.format = 'sww'2980 domain.smooth = True2981 2982 e = -1.02983 w = 1.02984 h = w-e2985 u = 2.02986 uh = u*h2987 2988 Br = Reflective_boundary(domain) # Side walls2989 Bd = Dirichlet_boundary([w, uh, 0]) # 2 m/s across the 3 m inlet:2990 2991 2992 domain.set_quantity('elevation', e)2993 domain.set_quantity('stage', w)2994 domain.set_quantity('xmomentum', uh)2995 domain.set_boundary( {'left': Bd, 'right': Bd, 'top': Br, 'bottom': Br})2996 2997 for t in domain.evolve(yieldstep=1, finaltime = t_end):2998 pass2999 3000 # Check that momentum is as it should be in the interior3001 3002 I = [[0, width/2.],3003 [length/2., width/2.],3004 [length, width/2.]]3005 3006 I = domain.geo_reference.get_absolute(I)3007 f = file_function(swwfile,3008 quantities=['stage', 'xmomentum', 'ymomentum'],3009 interpolation_points=I,3010 verbose=False)3011 3012 for t in range(t_end+1):3013 for i in range(3):3014 #print i, t, f(t, i)3015 assert num.allclose(f(t, i), [w, uh, 0], atol=1.0e-6)3016 3017 3018 # Check energies through the middle3019 for i in range(5):3020 x = length/2. + i*0.23674563 # Arbitrary3021 cross_section = [[x, 0], [x, width]]3022 3023 cross_section = domain.geo_reference.get_absolute(cross_section)3024 3025 time, Es = get_energy_through_cross_section(swwfile,3026 cross_section,3027 kind='specific',3028 verbose=False)3029 assert num.allclose(Es, h + 0.5*u*u/g)3030 3031 time, Et = get_energy_through_cross_section(swwfile,3032 cross_section,3033 kind='total',3034 verbose=False)3035 assert num.allclose(Et, w + 0.5*u*u/g)3036 3037 3038 2468 3039 2469 def test_get_all_swwfiles(self): -
trunk/anuga_core/source/anuga/shallow_water/test_shallow_water_domain.py
r7769 r7770 29 29 from anuga.shallow_water.forcing import Rainfall, Wind_stress 30 30 from anuga.shallow_water.forcing import Inflow, Cross_section 31 from anuga.shallow_water. data_managerimport get_flow_through_cross_section31 from anuga.shallow_water.sww_interrogate import get_flow_through_cross_section 32 32 33 33 # boundary functions … … 1257 1257 assert num.alltrue(wet_elevations < final_runup_height) 1258 1258 assert num.allclose(wet_elevations, z) 1259 1260 def test_get_maximum_inundation_from_sww(self):1261 """test_get_maximum_inundation_from_sww(self)1262 1263 Test of get_maximum_inundation_elevation()1264 and get_maximum_inundation_location() from data_manager.py1265 1266 This is based on test_get_maximum_inundation_3(self) but works with the1267 stored results instead of with the internal data structure.1268 1269 This test uses the underlying get_maximum_inundation_data for tests1270 """1271 1272 from anuga.abstract_2d_finite_volumes.mesh_factory \1273 import rectangular_cross1274 from data_manager import get_maximum_inundation_elevation1275 from data_manager import get_maximum_inundation_location1276 from data_manager import get_maximum_inundation_data1277 1278 initial_runup_height = -0.41279 final_runup_height = -0.31280 1281 #--------------------------------------------------------------1282 # Setup computational domain1283 #--------------------------------------------------------------1284 N = 101285 points, vertices, boundary = rectangular_cross(N, N)1286 domain = Domain(points, vertices, boundary)1287 domain.set_name('runup_test')1288 domain.set_maximum_allowed_speed(1.0)1289 1290 # FIXME: This works better with old limiters so far1291 domain.tight_slope_limiters = 01292 1293 #--------------------------------------------------------------1294 # Setup initial conditions1295 #--------------------------------------------------------------1296 def topography(x, y):1297 return -x/2 # linear bed slope1298 1299 # Use function for elevation1300 domain.set_quantity('elevation', topography)1301 domain.set_quantity('friction', 0.) # Zero friction1302 # Constant negative initial stage1303 domain.set_quantity('stage', initial_runup_height)1304 1305 #--------------------------------------------------------------1306 # Setup boundary conditions1307 #--------------------------------------------------------------1308 Br = Reflective_boundary(domain) # Reflective wall1309 Bd = Dirichlet_boundary([final_runup_height, 0, 0]) # Constant inflow1310 1311 # All reflective to begin with (still water)1312 domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br})1313 1314 #--------------------------------------------------------------1315 # Test initial inundation height1316 #--------------------------------------------------------------1317 indices = domain.get_wet_elements()1318 z = domain.get_quantity('elevation').\1319 get_values(location='centroids', indices=indices)1320 assert num.alltrue(z < initial_runup_height)1321 1322 q_ref = domain.get_maximum_inundation_elevation()1323 # First order accuracy1324 assert num.allclose(q_ref, initial_runup_height, rtol=1.0/N)1325 1326 #--------------------------------------------------------------1327 # Let triangles adjust1328 #--------------------------------------------------------------1329 for t in domain.evolve(yieldstep = 0.1, finaltime = 1.0):1330 pass1331 1332 #--------------------------------------------------------------1333 # Test inundation height again1334 #--------------------------------------------------------------1335 q_ref = domain.get_maximum_inundation_elevation()1336 q = get_maximum_inundation_elevation('runup_test.sww')1337 msg = 'We got %f, should have been %f' % (q, q_ref)1338 assert num.allclose(q, q_ref, rtol=1.0/N), msg1339 1340 q = get_maximum_inundation_elevation('runup_test.sww')1341 msg = 'We got %f, should have been %f' % (q, initial_runup_height)1342 assert num.allclose(q, initial_runup_height, rtol = 1.0/N), msg1343 1344 # Test error condition if time interval is out1345 try:1346 q = get_maximum_inundation_elevation('runup_test.sww',1347 time_interval=[2.0, 3.0])1348 except ValueError:1349 pass1350 else:1351 msg = 'should have caught wrong time interval'1352 raise Exception, msg1353 1354 # Check correct time interval1355 q, loc = get_maximum_inundation_data('runup_test.sww',1356 time_interval=[0.0, 3.0])1357 msg = 'We got %f, should have been %f' % (q, initial_runup_height)1358 assert num.allclose(q, initial_runup_height, rtol = 1.0/N), msg1359 assert num.allclose(-loc[0]/2, q) # From topography formula1360 1361 #--------------------------------------------------------------1362 # Update boundary to allow inflow1363 #--------------------------------------------------------------1364 domain.set_boundary({'right': Bd})1365 1366 #--------------------------------------------------------------1367 # Evolve system through time1368 #--------------------------------------------------------------1369 q_max = None1370 for t in domain.evolve(yieldstep = 0.1, finaltime = 3.0,1371 skip_initial_step = True):1372 q = domain.get_maximum_inundation_elevation()1373 if q > q_max:1374 q_max = q1375 1376 #--------------------------------------------------------------1377 # Test inundation height again1378 #--------------------------------------------------------------1379 indices = domain.get_wet_elements()1380 z = domain.get_quantity('elevation').\1381 get_values(location='centroids', indices=indices)1382 1383 assert num.alltrue(z < final_runup_height)1384 1385 q = domain.get_maximum_inundation_elevation()1386 # First order accuracy1387 assert num.allclose(q, final_runup_height, rtol=1.0/N)1388 1389 q, loc = get_maximum_inundation_data('runup_test.sww',1390 time_interval=[3.0, 3.0])1391 msg = 'We got %f, should have been %f' % (q, final_runup_height)1392 assert num.allclose(q, final_runup_height, rtol=1.0/N), msg1393 assert num.allclose(-loc[0]/2, q) # From topography formula1394 1395 q = get_maximum_inundation_elevation('runup_test.sww')1396 loc = get_maximum_inundation_location('runup_test.sww')1397 msg = 'We got %f, should have been %f' % (q, q_max)1398 assert num.allclose(q, q_max, rtol=1.0/N), msg1399 assert num.allclose(-loc[0]/2, q) # From topography formula1400 1401 q = get_maximum_inundation_elevation('runup_test.sww',1402 time_interval=[0, 3])1403 msg = 'We got %f, should have been %f' % (q, q_max)1404 assert num.allclose(q, q_max, rtol=1.0/N), msg1405 1406 # Check polygon mode1407 # Runup region1408 polygon = [[0.3, 0.0], [0.9, 0.0], [0.9, 1.0], [0.3, 1.0]]1409 q = get_maximum_inundation_elevation('runup_test.sww',1410 polygon = polygon,1411 time_interval=[0, 3])1412 msg = 'We got %f, should have been %f' % (q, q_max)1413 assert num.allclose(q, q_max, rtol=1.0/N), msg1414 1415 # Offshore region1416 polygon = [[0.9, 0.0], [1.0, 0.0], [1.0, 1.0], [0.9, 1.0]]1417 q, loc = get_maximum_inundation_data('runup_test.sww',1418 polygon = polygon,1419 time_interval=[0, 3])1420 msg = 'We got %f, should have been %f' % (q, -0.475)1421 assert num.allclose(q, -0.475, rtol=1.0/N), msg1422 assert is_inside_polygon(loc, polygon)1423 assert num.allclose(-loc[0]/2, q) # From topography formula1424 1425 # Dry region1426 polygon = [[0.0, 0.0], [0.4, 0.0], [0.4, 1.0], [0.0, 1.0]]1427 q, loc = get_maximum_inundation_data('runup_test.sww',1428 polygon = polygon,1429 time_interval=[0, 3])1430 msg = 'We got %s, should have been None' % (q)1431 assert q is None, msg1432 msg = 'We got %s, should have been None' % (loc)1433 assert loc is None, msg1434 1435 # Check what happens if no time point is within interval1436 try:1437 q = get_maximum_inundation_elevation('runup_test.sww',1438 time_interval=[2.75, 2.75])1439 except AssertionError:1440 pass1441 else:1442 msg = 'Time interval should have raised an exception'1443 raise Exception, msg1444 1445 # Cleanup1446 try:1447 os.remove(domain.get_name() + '.sww')1448 except:1449 pass1450 #FIXME(Ole): Windows won't allow removal of this1451 1259 1452 1260 def test_get_flow_through_cross_section_with_geo(self): … … 6085 5893 from anuga.shallow_water.shallow_water_domain import Dirichlet_boundary 6086 5894 from anuga.shallow_water.shallow_water_domain import Rainfall 6087 from anuga.shallow_water. data_managerimport get_flow_through_cross_section5895 from anuga.shallow_water.sww_interrogate import get_flow_through_cross_section 6088 5896 6089 5897 #----------------------------------------------------------------------
Note: See TracChangeset
for help on using the changeset viewer.