# Changeset 6121

Ignore:
Timestamp:
Jan 7, 2009, 4:34:45 PM (14 years ago)
Message:

Changed culvert polygons to compute enquiry points and verify that they are always outside exchange area.
Worked on volumetric conservation of culverts - not succeeding. Test is disabled.

Location:
anuga_core/source/anuga
Files:
6 edited

Unmodified
Removed
• ## anuga_core/source/anuga/culvert_flows/culvert_class.py

 r6111 from anuga.utilities.polygon import plot_polygons from anuga.utilities.numerical_tools import mean from anuga.utilities.numerical_tools import ensure_numeric from anuga.config import g, epsilon from Numeric import take, sqrt from anuga.config import velocity_protection from Numeric import allclose from Numeric import sqrt, sum import sys # FIXME(Ole): Write in C and reuse this function by similar code in interpolate.py # FIXME(Ole): Write in C and reuse this function by similar code # in interpolate.py def interpolate_linearly(x, xvec, yvec): msg = 'Input to function interpolate_linearly could not be converted ' msg += 'to numerical scalar: x = %s' % str(x) try: x = float(x) except: raise Exception, msg # Check bounds if x < xvec[0]: msg = 'Value provided = %.2f, interpolation minimum = %.2f.' %(x, xvec[0]) msg = 'Value provided = %.2f, interpolation minimum = %.2f.'\ % (x, xvec[0]) raise Below_interval, msg if x > xvec[-1]: msg = 'Value provided = %.2f, interpolation maximum = %.2f.' %(x, xvec[-1]) msg = 'Value provided = %.2f, interpolation maximum = %.2f.'\ %(x, xvec[-1]) raise Above_interval, msg barrel_velocity = float(fields[2].strip()) rating_curve.append( [head_difference, flow_rate, barrel_velocity] ) rating_curve.append([head_difference, flow_rate, barrel_velocity]) if i == 0: end_point0=None, end_point1=None, enquiry_point0=None, enquiry_point1=None, update_interval=None, log_file=False, verbose=False): from Numeric import sqrt, sum number_of_barrels=number_of_barrels) # Select enquiry points if enquiry_point0 is None: enquiry_point0 = P['enquiry_point0'] if enquiry_point1 is None: enquiry_point1 = P['enquiry_point1'] if verbose is True: pass #               P['exchange_polygon0'], #               P['exchange_polygon1'], #               P['enquiry_polygon0'], #               P['enquiry_polygon1']], #               [enquiry_point0, 1.005*enquiry_point0], #               [enquiry_point1, 1.005*enquiry_point1]], #              figname='culvert_polygon_output') # Compute the average point for enquiry enquiry_point0 = sum(P['enquiry_polygon0'][:2])/2 enquiry_point1 = sum(P['enquiry_polygon1'][:2])/2 self.enquiry_points = [enquiry_point0, enquiry_point1] self.enquiry_indices = [] for point in self.enquiry_points: dq = domain.quantities for i, opening in enumerate(self.openings): #elevation = dq['elevation'].get_values(location='centroids', #                                      interpolation_points=[self.enquiry_points[i]]) elevation = dq['elevation'].get_values(location='centroids', indices=[self.enquiry_indices[i]]) self.verbose = verbose self.last_update = 0.0 # For use with update_interval self.last_time = 0.0 self.update_interval = update_interval def __call__(self, domain): from anuga.utilities.numerical_tools import mean from anuga.config import g, epsilon from Numeric import take, sqrt from anuga.config import velocity_protection # Time stuff if self.update_interval is None: update = True delta_t = domain.timestep # Next timestep has been computed in domain.py else: if time - self.last_update > self.update_interval or time == 0.0: update = True delta_t = self.update_interval s = '\nTime = %.2f, delta_t = %f' %(time, delta_t) if hasattr(self, 'log_filename'): log_to_file(self.log_filename, s) if update is True: self.last_update = time dq = domain.quantities # Compute mean values of selected quantitites in the # enquiry area in front of the culvert # Stage and velocity comes from enquiry area # and elevation from exchange area stage = dq['stage'].get_values(location='centroids', log_to_file(self.log_filename, msg) except Above_interval, e: Q = self.rating_curve[-1,1] Q = self.rating_curve[-1,1] msg = '%.2fs: Delta head greater than rating curve maximum: ' %time msg += str(e) Q *= self.number_of_barrels # Adjust Q downwards depending on available water at inlet stage = self.inlet.get_quantity_values(quantity_name='stage') elevation = self.inlet.get_quantity_values(quantity_name='elevation') depth = stage-elevation V = 0 for i, d in enumerate(depth): V += d * domain.areas[i] #Vsimple = mean(depth)*self.inlet.exchange_area # Current volume in exchange area #print 'Q', Q, 'dt', delta_t, 'Q*dt', Q*delta_t, 'V', V, 'Vsimple', Vsimple dt = delta_t if Q*dt > V: Q_reduced = 0.9*V/dt # Reduce with safety factor msg = '%.2fs: Computed extraction for this time interval (Q*dt) is ' % time msg += 'greater than current volume (V) at inlet.\n' msg += ' Q will be reduced from %.2f m^3/s to %.2f m^3/s.' % (Q, Q_reduced) #print msg if self.verbose is True: print msg if hasattr(self, 'log_filename'): log_to_file(self.log_filename, msg) Q = Q_reduced self.inlet.rate = -Q self.outlet.rate = Q fid.close() # Store value of time self.last_time = time culvert_routine=None, number_of_barrels=1, enquiry_point0=None, enquiry_point1=None, update_interval=None, verbose=False): number_of_barrels=number_of_barrels) # Select enquiry points if enquiry_point0 is None: enquiry_point0 = P['enquiry_point0'] if enquiry_point1 is None: enquiry_point1 = P['enquiry_point1'] if verbose is True: pass #               P['exchange_polygon0'], #               P['exchange_polygon1'], #               P['enquiry_polygon0'], #               P['enquiry_polygon1']], #               [enquiry_point0, 1.005*enquiry_point0], #               [enquiry_point1, 1.005*enquiry_point1]], #              figname='culvert_polygon_output') #import sys; sys.exit() # Compute the average point for enquiry enquiry_point0 = sum(P['enquiry_polygon0'][:2])/2 enquiry_point1 = sum(P['enquiry_polygon1'][:2])/2 self.enquiry_points = [enquiry_point0, enquiry_point1] self.enquiry_indices = [] for point in self.enquiry_points: for key in P.keys(): if key in ['exchange_polygon0', 'exchange_polygon1', 'enquiry_polygon0', 'enquiry_polygon1']: 'exchange_polygon1']: for point in P[key]: self.invert_levels = [invert_level0, invert_level1] #self.enquiry_polygons = [P['enquiry_polygon0'], P['enquiry_polygon1']] self.enquiry_polylines = [P['enquiry_polygon0'][:2], P['enquiry_polygon1'][:2]] #self.enquiry_polylines = [P['enquiry_polygon0'][:2], #                          P['enquiry_polygon1'][:2]] self.vector = P['vector'] self.length = P['length']; assert self.length > 0.0 def __call__(self, domain): from anuga.utilities.numerical_tools import mean from anuga.config import g, epsilon from Numeric import take, sqrt from anuga.config import velocity_protection log_filename = self.log_filename time = domain.get_time() # Short hand dq = domain.quantities update = False if self.update_interval is None: update = True delta_t = domain.timestep # Next timestep has been computed in domain.py else: if time - self.last_update > self.update_interval or time == 0.0: update = True #print 'call', time, time - self.last_update delta_t = self.update_interval s = '\nTime = %.2f, delta_t = %f' %(time, delta_t) if hasattr(self, 'log_filename'): log_to_file(log_filename, s) if update is True: #print 'Updating', time, time - self.last_update self.last_update = time delta_t = time-self.last_time s = '\nTime = %.2f, delta_t = %f' %(time, delta_t) log_to_file(log_filename, s) msg = 'Time did not advance' if time > 0.0: assert delta_t > 0.0, msg openings = self.openings   # There are two Opening [0] and [1] for i, opening in enumerate(openings): dq = domain.quantities # Compute mean values of selected quantitites in the # exchange area in front of the culvert # Stage and velocity comes from enquiry area # and elevation from exchange area stage = dq['stage'].get_values(location='centroids', indices=opening.exchange_indices) stage = opening.get_quantity_values(quantity_name='stage') w = mean(stage) # Average stage z = invert_level else: elevation = dq['elevation'].get_values(location='centroids', indices=opening.exchange_indices) elevation = opening.get_quantity_values(quantity_name='elevation') z = mean(elevation) # Average elevation # Average measures of energy in front of this opening #polyline = self.enquiry_polylines[i] #opening.total_energy = domain.get_energy_through_cross_section(polyline, #                                                               kind='total') id = [self.enquiry_indices[i]] indices=id) elevation = dq['elevation'].get_values(location='centroids', indices=id) indices=id) xmomentum = dq['xmomentum'].get_values(location='centroids', indices=id) indices=id) ymomentum = dq['xmomentum'].get_values(location='centroids', indices=id) # Update momentum delta_t = time - self.last_time if delta_t > 0.0: xmomentum_rate = outlet_mom_x - outlet.momentum[0].value log_to_file(log_filename, s) # Store value of time self.last_time = time self.outlet.momentum[1](domain) # Store value of time #FIXME(Ole): Maybe only every time we update self.last_time = time
• ## anuga_core/source/anuga/culvert_flows/culvert_polygons.py

 r6098 from math import sqrt from Numeric import array, sum from anuga.utilities.polygon import inside_polygon, polygon_area def create_culvert_polygons(end_point0, end_point1, width, height=None, enquiry_gap_factor=1.0, enquiry_shape_factor=2.0, enquiry_gap_factor=0.2, number_of_barrels=1): """Create polygons at the end of a culvert inlet and outlet. Input (optional): height - culvert height, defaults to width making a square culvert enquiry_gap_factor - sets the distance to the enquiry polygon enquiry_shape_factor - sets the shape of the enquiry polygon (large value widens polygon but reduces height to preserve same area as exchange polygon) enquiry_gap_factor - sets the distance to the enquiry point as fraction of the height number_of_barrels - number of identical pipes. 'exchange_polygon0' - polygon defining the flow area at end_point0 'exchange_polygon1' - polygon defining the flow area at end_point1 'enquiry_polygon0' - polygon defining the enquiry field near end_point0 'enquiry_polygon1' - polygon defining the enquiry field near end_point1 'enquiry_point0' - point beyond exchange_polygon0 'enquiry_point1' - point beyond exchange_polygon1 'vector' 'length' 'normal' """ # Unit direction vector and normal vector /= length normal = array([-dy, dx])/length vector /= length                 # Unit vector in culvert direction normal = array([-dy, dx])/length # Normal vector culvert_polygons['vector'] = vector culvert_polygons['length'] = length culvert_polygons['normal'] = normal # Short hands w = 0.5*width*normal # Perpendicular vector of 1/2 width h = height*vector  # Vector of length=height in the # direction of the culvert h = height*vector    # Vector of length=height in the # direction of the culvert gap = (1 + enquiry_gap_factor)*h # Build exchange polygon 0 # Build exchange polygon and enquiry point for opening 0 p0 = end_point0 + w p1 = end_point0 - w p3 = p0 - h culvert_polygons['exchange_polygon0'] = array([p0,p1,p2,p3]) culvert_polygons['enquiry_point0'] = end_point0 - gap # Build exchange polygon 1 # Build exchange polygon and enquiry point for opening 1 p0 = end_point1 + w p1 = end_point1 - w p3 = p0 + h culvert_polygons['exchange_polygon1'] = array([p0,p1,p2,p3]) culvert_polygons['enquiry_point1'] = end_point1 + gap # Check that enquiry polygons are outside exchange polygons for key1 in ['exchange_polygon0', 'exchange_polygon1']: polygon = culvert_polygons[key1] area = polygon_area(polygon) msg = 'Polygon %s ' %(polygon) msg += ' has area = %f' % area assert area > 0.0, msg # Redefine shorthands for enquiry polygons w = w*enquiry_shape_factor h = h/enquiry_shape_factor gap = (enquiry_gap_factor + h)*vector # Build enquiry polygon 0 p0 = end_point0 + w - gap p1 = end_point0 - w - gap p2 = p1 - h p3 = p0 - h culvert_polygons['enquiry_polygon0'] = array([p0,p1,p2,p3]) # Build enquiry polygon 1 p0 = end_point1 + w + gap p1 = end_point1 - w + gap p2 = p1 + h p3 = p0 + h culvert_polygons['enquiry_polygon1'] = array([p0,p1,p2,p3]) for key2 in ['enquiry_point0', 'enquiry_point1']: point = culvert_polygons[key2] msg = 'Enquiry point falls inside an enquiry point.' msg += 'Email Ole.Nielsen@ga.gov.au' assert not inside_polygon(point, polygon), msg # Return results culvert_polygons['vector'] = vector culvert_polygons['length'] = length culvert_polygons['normal'] = normal return culvert_polygons
• ## anuga_core/source/anuga/culvert_flows/culvert_routines.py

 r5862 """ #NOTE: # Inlet control:  Delta_Et > Es at the inlet # Outlet control: Delta_Et < Es at the inlet # where Et is total energy (w + 0.5*v^2/g) and # Es is the specific energy (h + 0.5*v^2/g) #   NEW DEFINED CULVERT FLOW---- Flow from INLET 1 ------> INLET 2 (Outlet) if inlet.depth_trigger >= 0.01 and inlet.depth >= 0.01: # Calculate driving energy # FIXME(Ole): Should this be specific energy? E = inlet.total_energy
• ## anuga_core/source/anuga/culvert_flows/test_culvert_class.py

 r6111 def test_that_culvert_dry_bed_rating(self): def test_that_culvert_dry_bed_rating_does_not_produce_flow(self): """test_that_culvert_in_dry_bed_does_not_produce_flow(self): def test_that_culvert_dry_bed_boyd(self): """test_that_culvert_in_dry_bed_does_not_produce_flow(self): def Xtest_that_culvert_rating_limits_flow_in_shallow_inlet_condition(self): """test_that_culvert_rating_limits_flow_in_shallow_inlet_condition Test that culvert on a sloping dry bed limits flows when very little water is present at inlet This one is using the rating curve variant """ path = get_pathname_from_package('anuga.culvert_flows') length = 40. width = 5. dx = dy = 1           # Resolution: Length of subdivisions on both axes points, vertices, boundary = rectangular_cross(int(length/dx), int(width/dy), len1=length, len2=width) domain = Domain(points, vertices, boundary) domain.set_name('Test_culvert_shallow') # Output name domain.set_default_order(2) #---------------------------------------------------------------------- # Setup initial conditions #---------------------------------------------------------------------- def topography(x, y): """Set up a weir A culvert will connect either side """ # General Slope of Topography z=-x/1000 N = len(x) for i in range(N): # Sloping Embankment Across Channel if 5.0 < x[i] < 10.1: # Cut Out Segment for Culvert FACE if  1.0+(x[i]-5.0)/5.0 <  y[i]  < 4.0 - (x[i]-5.0)/5.0: z[i]=z[i] else: z[i] +=  0.5*(x[i] -5.0)    # Sloping Segment  U/S Face if 10.0 < x[i] < 12.1: z[i] +=  2.5                    # Flat Crest of Embankment if 12.0 < x[i] < 14.5: # Cut Out Segment for Culvert FACE if  2.0-(x[i]-12.0)/2.5 <  y[i]  < 3.0 + (x[i]-12.0)/2.5: z[i]=z[i] else: z[i] +=  2.5-1.0*(x[i] -12.0) # Sloping D/S Face return z domain.set_quantity('elevation', topography) domain.set_quantity('friction', 0.01)         # Constant friction domain.set_quantity('stage', expression='elevation + 0.2') # Shallow initial condition # NOTE: Shallow values may cause this test to fail regardless of the # culvert due to initial adjustments. A good value is 0.2 filename = os.path.join(path, 'example_rating_curve.csv') culvert = Culvert_flow_rating(domain, culvert_description_filename=filename, end_point0=[9.0, 2.5], end_point1=[13.0, 2.5], verbose=False) domain.forcing_terms.append(culvert) #----------------------------------------------------------------------- # Setup boundary conditions #----------------------------------------------------------------------- # Inflow based on Flow Depth and Approaching Momentum Br = Reflective_boundary(domain)              # Solid reflective wall domain.set_boundary({'left': Br, 'right': Br, 'top': Br, 'bottom': Br}) #----------------------------------------------------------------------- # Evolve system through time #----------------------------------------------------------------------- ref_volume = domain.get_quantity('stage').get_integral() for t in domain.evolve(yieldstep = 1, finaltime = 25): print domain.timestepping_statistics() new_volume = domain.get_quantity('stage').get_integral() msg = 'Total volume has changed: Is %.2f m^3 should have been %.2f m^3'\ % (new_volume, ref_volume) assert allclose(new_volume, ref_volume), msg def test_that_culvert_dry_bed_boyd_does_not_produce_flow(self): """test_that_culvert_in_dry_bed_boyd_does_not_produce_flow(self): Test that culvert on a sloping dry bed doesn't produce flows #------------------------------------------------------------- if __name__ == "__main__": #suite = unittest.makeSuite(Test_Culvert, 'test_predicted_boyd_flow') suite = unittest.makeSuite(Test_Culvert, 'test') suite = unittest.makeSuite(Test_Culvert, 'test_that_culvert_rating_limits_flow_in_shallow_inlet_condition') #suite = unittest.makeSuite(Test_Culvert, 'test') runner = unittest.TextTestRunner() runner.run(suite)
• ## anuga_core/source/anuga/culvert_flows/test_culvert_polygons.py

 r6098 P = create_culvert_polygons(end_point0, end_point1, width=width, height=height, number_of_barrels=number_of_barrels) end_point1, width=width, height=height, number_of_barrels=number_of_barrels) # Compute area and check that it is greater than 0 for key in ['exchange_polygon0', 'exchange_polygon1', 'enquiry_polygon0', 'enquiry_polygon1']: polygon = P[key] for key1 in ['exchange_polygon0', 'exchange_polygon1']: polygon = P[key1] area = polygon_area(polygon) assert area > 0.0, msg for key2 in ['enquiry_point0', 'enquiry_point1']: point = P[key2] assert not inside_polygon(point, polygon) def test_2(self): P = create_culvert_polygons(end_point0, end_point1, width=width, height=height, number_of_barrels=number_of_barrels) end_point1, width=width, height=height, number_of_barrels=number_of_barrels) # Compute area and check that it is greater than 0 for key in ['exchange_polygon0', 'exchange_polygon1', 'enquiry_polygon0', 'enquiry_polygon1']: polygon = P[key] for key1 in ['exchange_polygon0', 'exchange_polygon1']: polygon = P[key1] area = polygon_area(polygon) msg += ' has area = %f' % area assert area > 0.0, msg for key2 in ['enquiry_point0', 'enquiry_point1']: point = P[key2] assert not inside_polygon(point, polygon)
• ## anuga_core/source/anuga/shallow_water/shallow_water_domain.py

 r6086 def get_quantity_values(self): def get_quantity_values(self, quantity_name=None): """Return values for specified quantity restricted to opening """ q = self.domain.quantities[self.quantity_name] return q.get_values(indices=self.exchange_indices) Optionally a quantity name can be specified if values from another quantity is sought """ if quantity_name is None: quantity_name = self.quantity_name q = self.domain.quantities[quantity_name] return q.get_values(location='centroids', indices=self.exchange_indices) def set_quantity_values(self, val): def set_quantity_values(self, val, quantity_name=None): """Set values for specified quantity restricted to opening """ Optionally a quantity name can be specified if values from another quantity is sought """ if quantity_name is None: quantity_name = self.quantity_name q = self.domain.quantities[self.quantity_name] q.set_values(val, indices=self.exchange_indices) q.set_values(val, location='centroids', indices=self.exchange_indices)
Note: See TracChangeset for help on using the changeset viewer.