Changeset 5276
- Timestamp:
- May 5, 2008, 5:24:45 PM (17 years ago)
- Location:
- anuga_core
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
anuga_core/source/anuga/abstract_2d_finite_volumes/neighbour_mesh.py
r4872 r5276 859 859 860 860 return str 861 861 862 862 863 def get_triangle_containing_point(self, point): … … 895 896 896 897 898 def get_intersecting_segments(self, polyline): 899 """Find edges intersected by polyline 900 901 Input: 902 polyline - list of points forming a segmented line 903 904 Output: 905 dictionary where 906 keys are triangle ids for those elements that are intersected 907 values are a list of segments each represented as a triplet: 908 length of the intersecting segment 909 right hand normal 910 midpoint of intersecting segment 911 912 The polyline may break inside any triangle causing multiple segments per triabgle. 913 """ 914 915 pass 916 917 918 897 919 def get_triangle_neighbours(self, tri_id): 898 920 """ Given a triangle id, Return an array of the -
anuga_core/source/anuga/abstract_2d_finite_volumes/test_neighbour_mesh.py
r4872 r5276 1245 1245 assert neighbours == [] 1246 1246 neighbours = mesh.get_triangle_neighbours(2) 1247 assert neighbours == [] 1248 1247 assert neighbours == [] 1248 1249 1250 def NOtest_get_intersecting_segments(self): 1251 """test_get_intersecting_segments(self): 1252 1253 """ 1254 1255 pass 1256 1249 1257 #------------------------------------------------------------- 1250 1258 if __name__ == "__main__": -
anuga_core/source/anuga/shallow_water/data_manager.py
r5275 r5276 3198 3198 3199 3199 3200 def sww2domain(filename, boundary=None,t=None,\3201 fail_if_NaN=True ,NaN_filler=0\3202 ,verbose = False,very_verbose = False):3200 def sww2domain(filename, boundary=None, t=None, 3201 fail_if_NaN=True ,NaN_filler=0, 3202 verbose = False, very_verbose = False): 3203 3203 """ 3204 3204 Usage: domain = sww2domain('file.sww',t=time (default = last time in file)) … … 3233 3233 starttime = fid.starttime[0] 3234 3234 volumes = fid.variables['volumes'][:] #Connectivity 3235 coordinates=transpose(asarray([x.tolist(),y.tolist()])) 3235 coordinates = transpose(asarray([x.tolist(),y.tolist()])) 3236 #FIXME (Ole): Something like this might be better: concatenate( (x, y), axis=1 ) 3237 # or concatenate( (x[:,NewAxis],x[:,NewAxis]), axis=1 ) 3236 3238 3237 3239 conserved_quantities = [] … … 3355 3357 return domain 3356 3358 3359 3357 3360 def interpolated_quantity(saved_quantity,time_interp): 3358 3361 … … 3366 3369 #Return vector of interpolated values 3367 3370 return q 3371 3368 3372 3369 3373 def get_time_interp(time,t=None): … … 5531 5535 #----------------------------------------------- 5532 5536 5533 def get_mesh_and_quantities_from_sww_file(filename, quantity_names, verbose=False): 5537 def get_mesh_and_quantities_from_file(filename, 5538 quantities=None, 5539 verbose=False): 5534 5540 """Get and rebuild mesh structure and the associated quantities from sww file 5535 """ 5536 5537 #FIXME(Ole): This is work in progress 5538 5541 5542 Input: 5543 filename - Name os sww file 5544 quantities - Names of quantities to load 5545 5546 Output: 5547 mesh - instance of class Interpolate 5548 (including mesh and interpolation functionality) 5549 quantities - arrays with quantity values at each mesh node 5550 time - vector of stored timesteps 5551 """ 5552 5553 # FIXME (Ole): Maybe refactor filefunction using this more fundamental code. 5554 5539 5555 import types 5540 # FIXME (Ole): Maybe refactor filefunction using this more fundamental code. 5541 5542 return 5543 5544 # Open NetCDF file 5545 if verbose: print 'Reading', filename 5546 fid = NetCDFFile(filename, 'r') 5547 5548 if type(quantity_names) == types.StringType: 5549 quantity_names = [quantity_names] 5550 5551 if quantity_names is None or len(quantity_names) < 1: 5552 msg = 'No quantities are specified' 5553 raise Exception, msg 5554 5555 # Now assert that requested quantitites (and the independent ones) 5556 # are present in file 5557 missing = [] 5558 for quantity in ['x', 'y', 'volumes', 'time'] + quantity_names: 5559 if not fid.variables.has_key(quantity): 5560 missing.append(quantity) 5561 5562 if len(missing) > 0: 5563 msg = 'Quantities %s could not be found in file %s'\ 5564 %(str(missing), filename) 5556 from Scientific.IO.NetCDF import NetCDFFile 5557 from shallow_water import Domain 5558 from Numeric import asarray, transpose, resize 5559 from anuga.abstract_2d_finite_volumes.neighbour_mesh import Mesh 5560 5561 if verbose: print 'Reading from ', filename 5562 fid = NetCDFFile(filename, 'r') # Open existing file for read 5563 time = fid.variables['time'] # Time vector 5564 time += fid.starttime[0] 5565 5566 # Get the variables as Numeric arrays 5567 x = fid.variables['x'][:] # x-coordinates of nodes 5568 y = fid.variables['y'][:] # y-coordinates of nodes 5569 elevation = fid.variables['elevation'][:] # Elevation 5570 stage = fid.variables['stage'][:] # Water level 5571 xmomentum = fid.variables['xmomentum'][:] # Momentum in the x-direction 5572 ymomentum = fid.variables['ymomentum'][:] # Momentum in the y-direction 5573 5574 5575 # Mesh (nodes (Mx2), triangles (Nx3)) 5576 nodes = concatenate( (x[:, NewAxis], y[:, NewAxis]), axis=1 ) 5577 triangles = fid.variables['volumes'][:] 5578 5579 # Get geo_reference 5580 try: 5581 geo_reference = Geo_reference(NetCDFObject=fid) 5582 except: #AttributeError, e: 5583 # Sww files don't have to have a geo_ref 5584 geo_reference = None 5585 5586 if verbose: print ' building mesh from sww file %s' %filename 5587 boundary = None 5588 5589 #FIXME (Peter Row): Should this be in mesh? 5590 if fid.smoothing != 'Yes': 5591 nodes = nodes.tolist() 5592 triangles = triangles.tolist() 5593 nodes, triangles, boundary=weed(nodes, triangles, boundary) 5594 5595 5596 try: 5597 mesh = Mesh(nodes, triangles, boundary, 5598 geo_reference=geo_reference) 5599 except AssertionError, e: 5565 5600 fid.close() 5566 raise Exception, msg 5567 5568 if not filename.endswith('.sww'): 5569 msg = 'Filename must have extension .sww' 5570 raise Exception, msg 5571 5572 # Get first timestep 5573 try: 5574 starttime = fid.starttime[0] 5575 except ValueError: 5576 msg = 'Could not read starttime from file %s' %filename 5577 raise msg 5578 5579 # Get variables 5580 time = fid.variables['time'][:] 5581 5582 # Get origin 5583 xllcorner = fid.xllcorner[0] 5584 yllcorner = fid.yllcorner[0] 5585 zone = fid.zone[0] 5586 georeference = Geo_reference(zone, xllcorner, yllcorner) 5587 5588 5589 x = fid.variables['x'][:] 5590 y = fid.variables['y'][:] 5591 triangles = fid.variables['volumes'][:] 5592 5593 x = reshape(x, (len(x),1)) 5594 y = reshape(y, (len(y),1)) 5595 vertex_coordinates = concatenate((x,y), axis=1) #m x 2 array 5596 5597 #if interpolation_points is not None: 5598 # # Adjust for georef 5599 # interpolation_points[:,0] -= xllcorner 5600 # interpolation_points[:,1] -= yllcorner 5601 5602 5603 # Produce values for desired data points at 5604 # each timestep for each quantity 5601 msg = 'Domain could not be created: %s. "' %e 5602 raise DataDomainError, msg 5603 5604 5605 5605 quantities = {} 5606 for name in quantity_names: 5607 quantities[name] = fid.variables[name][:] 5608 5609 fid.close() 5610 5611 # Create mesh and quad tree 5612 #interpolator = Interpolate(vertex_coordinates, triangles) 5613 5614 #return interpolator, quantities, geo_reference, time 5606 quantities['elevation'] = elevation 5607 quantities['stage'] = stage 5608 quantities['xmomentum'] = xmomentum 5609 quantities['ymomentum'] = ymomentum 5610 5611 return mesh, quantities, time 5612 5615 5613 5616 5614 … … 5618 5616 polyline, 5619 5617 verbose=False): 5620 """Obtain flow (m^3/s) perpendicular to cross section given by the argument polyline.5618 """Obtain flow (m^3/s) perpendicular to specified cross section. 5621 5619 5622 5620 Inputs: … … 5626 5624 5627 5625 Output: 5628 Q: Hydrograph of total flow across given segments for all stored timesteps. 5629 5630 The normal flow is computed for each triangle intersected by the polyline and added up. 5631 If multiple segments at different angles are specified the normal flows may partially cancel each other. 5626 time: All stored times 5627 Q: Hydrograph of total flow across given segments for all stored times. 5628 5629 The normal flow is computed for each triangle intersected by the polyline and 5630 added up. multiple segments at different angles are specified the normal flows 5631 may partially cancel each other. 5632 5632 5633 5633 """ 5634 5634 5635 5635 # Get mesh and quantities from sww file 5636 X = get_mesh_and_quantities_from_ sww_file(filename,5637 5638 'stage',5639 'xmomentum',5640 'ymomentum'],5641 5642 interpolator, quantities, geo_reference, time = X5636 X = get_mesh_and_quantities_from_file(filename, 5637 quantities=['elevation', 5638 'stage', 5639 'xmomentum', 5640 'ymomentum'], 5641 verbose=verbose) 5642 mesh, quantities, time = X 5643 5643 5644 5644 … … 5646 5646 # Find all intersections and associated triangles. 5647 5647 5648 get_intersecting_segments(polyline)5648 mesh.get_intersecting_segments(polyline) 5649 5649 5650 5650 # Then store for each triangle the length of the intersecting segment(s), 5651 # right hand normal(s) and midpoints. 5652 pass 5651 # right hand normal(s) and midpoints. 5652 5653 return time, Q 5654 5655 5653 5656 5654 5657 -
anuga_core/source/anuga/shallow_water/test_data_manager.py
r5274 r5276 7423 7423 """ 7424 7424 7425 # Generate a test sww file 7425 # Generate a test sww file with non trivial georeference 7426 7426 7427 7427 import time, os … … 7434 7434 # Create basic mesh (100m x 5m) 7435 7435 width = 5 7436 len = 100 7437 points, vertices, boundary = rectangular(len, width, 100, 5) 7436 length = 100 7437 t_end = 10 7438 points, vertices, boundary = rectangular(length, width, 100, 5) 7438 7439 7439 7440 # Create shallow water domain 7440 domain = Domain(points, vertices, boundary) 7441 domain = Domain(points, vertices, boundary, 7442 geo_reference = Geo_reference(56,308500,6189000)) 7441 7443 7442 7444 domain.set_name('flowtest') … … 7449 7451 domain.set_boundary( {'left': Bd, 'right': Bd, 'top': Br, 'bottom': Br}) 7450 7452 7451 for t in domain.evolve(yieldstep=1, finaltime = 50):7453 for t in domain.evolve(yieldstep=1, finaltime = t_end): 7452 7454 pass 7453 7455 7454 7456 7455 7457 # Read it 7456 7457 # FIXME (Ole): TODO 7458 7459 7458 7459 # Get mesh and quantities from sww file 7460 X = get_mesh_and_quantities_from_file(swwfile, 7461 quantities=['elevation', 7462 'stage', 7463 'xmomentum', 7464 'ymomentum'], 7465 verbose=False) 7466 mesh, quantities, time = X 7467 7468 7469 # Check that mesh has been recovered 7470 assert alltrue(mesh.triangles == domain.triangles) 7471 assert allclose(mesh.nodes, domain.nodes) 7472 7473 # Check that time has been recovered 7474 assert allclose(time, range(t_end+1)) 7475 7476 # Check that quantities have been recovered 7477 # (sww files use single precision) 7478 z=domain.get_quantity('elevation').get_values(location='unique vertices') 7479 assert allclose(quantities['elevation'], z) 7480 7481 for q in ['stage', 'xmomentum', 'ymomentum']: 7482 # Get quantity at last timestep 7483 q_ref=domain.get_quantity(q).get_values(location='unique vertices') 7484 7485 #print q,quantities[q] 7486 q_sww=quantities[q][-1,:] 7487 7488 msg = 'Quantity %s failed to be recovered' %q 7489 assert allclose(q_ref, q_sww, atol=1.0e-6), msg 7490 7491 # Cleanup 7492 os.remove(swwfile) 7493 7494 7460 7495 def NOtest_get_flow_through_cross_section(self): 7461 7496 """test_get_flow_through_cross_section(self): … … 7485 7520 # Create basic mesh (100m x 5m) 7486 7521 width = 5 7487 len = 100 7488 points, vertices, boundary = rectangular(len, width, 100, 5) 7522 length = 100 7523 t_end = 10 7524 points, vertices, boundary = rectangular(length, width, 7525 length, width) 7489 7526 7490 7527 # Create shallow water domain … … 7515 7552 domain.set_boundary( {'left': Bd, 'right': Bd, 'top': Br, 'bottom': Br}) 7516 7553 7517 for t in domain.evolve(yieldstep=1, finaltime = 50):7554 for t in domain.evolve(yieldstep=1, finaltime = t_end): 7518 7555 pass 7519 7556 7520 7557 # Check that momentum is as it should be in the interior 7558 7559 I = [[0, width/2.], 7560 [length/2., width/2.], 7561 [length, width/2.]] 7562 7521 7563 f = file_function(swwfile, 7522 7564 quantities=['stage', 'xmomentum', 'ymomentum'], 7523 interpolation_points=[[0,width/2], 7524 [len/2, width/2], 7525 [len, width/2]], 7565 interpolation_points=I, 7526 7566 verbose=False) 7527 7567 7528 for t in range( 50):7568 for t in range(t_end+1): 7529 7569 for i in range(3): 7530 7570 assert allclose(f(t, i), [1, 2, 0]) … … 7533 7573 7534 7574 # Check flow through the middle 7535 cross_section = [[len /2,0], [len/2,width]]7575 cross_section = [[length/2., 0], [length/2., width]] 7536 7576 Q = get_flow_through_cross_section(swwfile, 7537 7577 cross_section, -
anuga_core/test_all.py
r5018 r5276 1 from anuga.utilities.data_audit_wrapper import IP_verified1 #from anuga.utilities.data_audit_wrapper import IP_verified 2 2 from tempfile import mktemp 3 3 … … 57 57 58 58 print 'Verifying data IP' 59 if not IP_verified(temp_dir):60 msg = 'Files have not been verified for IP.\n'61 msg += 'Each data file must have a license file with it.'62 raise Exception, msg59 #if not IP_verified(temp_dir): 60 # msg = 'Files have not been verified for IP.\n' 61 # msg += 'Each data file must have a license file with it.' 62 # raise Exception, msg 63 63 64 64
Note: See TracChangeset
for help on using the changeset viewer.