"""Classes for implementing damage curves and economic damage Duncan Gray, Ole Nielsen, Jane Sexton, Nick Bartzis Geoscience Australia, 2006 """ from math import sqrt from Scientific.Functions.Interpolation import InterpolatingFunction from Numeric import array, ravel, Float, zeros from random import choice try: import kinds except ImportError: # Hand-built mockup of the things we need from the kinds package, since it # was recently removed from the standard Numeric distro. Some users may # not have it by default. class _bunch: pass class _kinds(_bunch): default_float_kind = _bunch() default_float_kind.MIN = 2.2250738585072014e-308 #smallest +ve number default_float_kind.MAX = 1.7976931348623157e+308 kinds = _kinds() from utilities.numerical_tools import ensure_numeric from pyvolution.data_manager import Exposure_csv from pyvolution.util import file_function from geospatial_data.geospatial_data import ensure_absolute from utilities.numerical_tools import INF from anuga_config import epsilon depth_epsilon = epsilon def inundation_damage(sww_file, exposure_file_in, exposure_file_out=None, ground_floor_height=0.3, overwrite=False, verbose=True, use_cache = True): """ This is the main function for calculating tsunami damage due to inundation. It gets the location of structures from the exposure file and gets the inundation of these structures from the sww file. It then calculates the damage loss. """ csv = Exposure_csv(exposure_file_in) geospatial = csv.get_location() max_depths, max_momentums = calc_max_depth_and_momentum(sww_file, geospatial, ground_floor_height=ground_floor_height, verbose=verbose, use_cache=use_cache) edm = EventDamageModel(max_depths, csv.get_column('SHORE_DIST'), csv.get_column('WALLS'), csv.get_column('STR_VALUE'), csv.get_column('C_VALUE') ) results_dic = edm.calc_damage_and_costs(verbose_csv=True, verbose=verbose) for title, value in results_dic.iteritems(): csv.set_column(title, value, overwrite=overwrite) # Save info back to csv file if exposure_file_out == None: exposure_file_out = exposure_file_in csv.save(exposure_file_out) def add_depth_and_momentum2csv(sww_file, exposure_file_in, exposure_file_out=None, overwrite=False, verbose=True, use_cache = True): """ """ csv = Exposure_csv(exposure_file_in) geospatial = csv.get_location() max_depths, max_momentums = calc_max_depth_and_momentum(sww_file, geospatial, verbose=verbose, use_cache=use_cache) csv.set_column("MAX INUNDATION DEPTH (m)",max_depths, overwrite=overwrite) csv.set_column("MOMENTUM (m^2/s) ",max_momentums, overwrite=overwrite) csv.save(exposure_file_out) def calc_max_depth_and_momentum(sww_file, points, ground_floor_height=0.0, verbose=True, use_cache = True): """ Calculate the maximum inundation height above ground floor (mihagf) for a list of locations. The mihagf value is in the range -ground_floor_height to overflow errors. """ quantities = ['stage', 'elevation', 'xmomentum', 'ymomentum'] points = ensure_absolute(points) point_count = len(points) callable_sww = file_function(sww_file, quantities=quantities, interpolation_points=points, verbose=verbose, use_cache=use_cache) # initialise the max lists max_depths = [-ground_floor_height]*point_count max_momentums = [-ground_floor_height]*point_count for point_i, point in enumerate(points): for time in callable_sww.get_time(): quantities = callable_sww(time,point_i) #print "quantities", quantities w = quantities[0] z = quantities[1] uh = quantities[2] vh = quantities[3] # -ground_floor_height is the minimum value. depth = w - z - ground_floor_height if depth > max_depths[point_i]: max_depths[point_i] = depth if w == INF or z == INF or uh == INF or vh == INF: continue momentum = sqrt(uh*uh + vh*vh) if momentum > max_momentums[point_i]: max_momentums[point_i] = momentum return max_depths, max_momentums class EventDamageModel: """ """ STRUCT_LOSS_TITLE = "Structure Loss ($)" CONTENTS_LOSS_TITLE = "Contents Loss ($)" CONTENTS_DAMAGE_TITLE = "Contents damaged (fraction)" STRUCT_DAMAGE_TITLE = "Structure damaged (fraction)" COLLAPSE_CSV_INFO_TITLE = "Calculation notes" MAX_DEPTH_TITLE = "Inundation height above ground floor (m)" STRUCT_COLLAPSED_TITLE = "collapsed structure if 1" STRUCT_INUNDATED_TITLE = "inundated structure if 1" double_brick_damage_array = array([[-kinds.default_float_kind.MAX, 0.0], [0.0-depth_epsilon, 0.0], [0.0,0.016], [0.1,0.150], [0.3,0.425], [0.5,0.449], [1.0,0.572], [1.5,0.582], [2.0,0.587], [2.5,0.647], [kinds.default_float_kind.MAX,64.7]]) double_brick_damage_curve = InterpolatingFunction( \ (ravel(double_brick_damage_array[:,0:1]),), ravel(double_brick_damage_array[:,1:])) brick_veeer_damage_array = array([[-kinds.default_float_kind.MAX, 0.0], [0.0-depth_epsilon, 0.0], [0.0,0.016], [0.1,0.169], [0.3,0.445], [0.5,0.472], [1.0,0.618], [1.5,0.629], [2.0,0.633], [2.5,0.694], [kinds.default_float_kind.MAX,69.4]]) brick_veeer_damage_curve = InterpolatingFunction( \ (ravel(brick_veeer_damage_array[:,0:1]),), ravel(brick_veeer_damage_array[:,1:])) struct_damage_curve = {'Double Brick':double_brick_damage_curve, 'Brick Veneer':brick_veeer_damage_curve} default_struct_damage_curve = brick_veeer_damage_curve contents_damage_array = array([[-kinds.default_float_kind.MAX, 0.0], [0.0-depth_epsilon, 0.0], [0.0,0.013], [0.1,0.102], [0.3,0.381], [0.5,0.500], [1.0,0.970], [1.5,0.976], [2.0,0.986], [kinds.default_float_kind.MAX,98.6]]) contents_damage_curve = InterpolatingFunction( \ (ravel(contents_damage_array[:,0:1]),), ravel(contents_damage_array[:,1:])) #building collapse probability # inundation depth above ground floor, m depth_upper_limits = [depth_epsilon, 1.0, 2.0, 3.0, 5.0, kinds.default_float_kind.MAX] # shore mistance, m shore_upper_limits = [125,200,250, kinds.default_float_kind.MAX] # Building collapse probability collapse_probability = [[0.0, 0.0, 0.0, 0.0], #Code below assumes 0.0 [0.05, 0.02, 0.01, 0.0], [0.6, 0.3, 0.1, 0.05], [0.8, 0.4, 0.25, 0.15], [0.95, 0.7, 0.5, 0.3], [0.99, 0.9, 0.65, 0.45]] def __init__(self,max_depths, shore_distances, walls, struct_costs, content_costs): """ max depth is Inundation height above ground floor (m), so the ground floor has been taken into account. """ self.max_depths = [float(x) for x in max_depths] self.shore_distances = [float(x) for x in shore_distances] self.walls = walls self.struct_costs = [float(x) for x in struct_costs] self.content_costs = [float(x) for x in content_costs] self.structure_count = len(self.max_depths) #Fixme expand assert self.structure_count == len(self.shore_distances) assert self.structure_count == len(self.walls) assert self.structure_count == len(self.struct_costs) assert self.structure_count == len(self.content_costs) #assert self.structure_count == len(self.) def calc_damage_and_costs(self, verbose_csv=False, verbose=False): self.calc_damage_percentages() collapse_probability = self.calc_collapse_probability() self._calc_collapse_structures(collapse_probability, verbose_csv=verbose_csv) self.calc_cost() results_dict = {self.STRUCT_LOSS_TITLE:self.struct_loss ,self.STRUCT_DAMAGE_TITLE:self.struct_damage ,self.CONTENTS_LOSS_TITLE:self.contents_loss ,self.CONTENTS_DAMAGE_TITLE:self.contents_damage ,self.MAX_DEPTH_TITLE:self.max_depths ,self.STRUCT_COLLAPSED_TITLE:self.struct_collapsed ,self.STRUCT_INUNDATED_TITLE:self.struct_inundated } if verbose_csv: results_dict[self.COLLAPSE_CSV_INFO_TITLE] = self.collapse_csv_info return results_dict def calc_damage_percentages(self): # the data being created struct_damage = zeros(self.structure_count,Float) contents_damage = zeros(self.structure_count,Float) self.struct_inundated = ['']* self.structure_count for i,max_depth,shore_distance,wall in map(None, range(self.structure_count), self.max_depths, self.shore_distances, self.walls): #print "i",i #print "max_depth",max_depth #print "shore_distance",shore_distance #print "wall",wall ## WARNING SKIP IF DEPTH < 0.0 if 0.0 > max_depth: continue # The definition of inundated is if the max_depth is > 0.0 self.struct_inundated[i] = 1.0 #calc structural damage % damage_curve = self.struct_damage_curve.get(wall, self.default_struct_damage_curve) struct_damage[i] = damage_curve(max_depth) contents_damage[i] = self.contents_damage_curve(max_depth) self.struct_damage = struct_damage self.contents_damage = contents_damage def calc_cost(self): # ensure_numeric does not cut it. self.struct_loss = self.struct_damage * \ ensure_numeric(self.struct_costs) self.contents_loss = self.contents_damage * \ ensure_numeric(self.content_costs) def calc_collapse_probability(self): """ return a dict of which structures have x probability of collapse. key is collapse probability value is list of struct indexes with key probability of collapse """ # I could've done this is the calc_damage_percentages and # Just had one loop. # But for ease of testing and bug finding I'm seperating the loops. # I'm make the outer loop for both of them the same though, # so this loop can easily be folded into the other loop. # dict of which structures have x probability of collapse. # key of collapse probability # value of list of struct indexes struct_coll_prob = {} for i,max_depth,shore_distance,wall in map(None, range(self.structure_count), self.max_depths, self.shore_distances, self.walls): #print "i",i #print "max_depth",max_depth #print "shore_distance",shore_distance #print "wall",wall # WARNING ASSUMING THE FIRST BIN OF DEPTHS GIVE A ZERO PROBABILITY depth_upper_limits = self.depth_upper_limits shore_upper_limits = self.shore_upper_limits collapse_probability = self.collapse_probability if max_depth <= depth_upper_limits[0]: continue start = 1 for i_depth, depth_limit in enumerate(depth_upper_limits[start:]): #Have to change i_depth so it indexes into the lists correctly i_depth += start if max_depth <= depth_limit: for i_shore, shore_limit in enumerate(shore_upper_limits): if shore_distance <= shore_limit: coll_prob = collapse_probability[i_depth][i_shore] if 0.0 == collapse_probability[i_depth][i_shore]: break struct_coll_prob.setdefault(coll_prob,[]).append(i) break break return struct_coll_prob def _calc_collapse_structures(self, collapse_probability, verbose_csv=False): """ Given the collapse probabilities, throw the dice and collapse some houses """ self.struct_collapsed = ['']* self.structure_count if verbose_csv: self.collapse_csv_info = ['']* self.structure_count #for a given 'bin', work out how many houses will collapse for probability, house_indexes in collapse_probability.iteritems(): collapse_count = round(len(house_indexes) *probability) if verbose_csv: for i in house_indexes: # This could be sped up I think self.collapse_csv_info[i] = str(probability) + ' prob.( ' \ + str(int(collapse_count)) + ' collapsed out of ' \ + str(len(house_indexes)) + ')' for _ in range(int(collapse_count)): house_index = choice(house_indexes) self.struct_damage[house_index] = 1.0 self.contents_damage[house_index] = 1.0 house_indexes.remove(house_index) self.struct_collapsed[house_index] = 1 # Warning, the collapse_probability list now lists # houses that did not collapse, (though not all of them) #print "",self.collapse_csv_info ############################################################################# if __name__ == "__main__": from Scientific.Functions.Interpolation import InterpolatingFunction from Numeric import array, ravel a = array([[0,0],[1,10]]) c = array([0,1]) axis = ravel(a[:,0:1]) values = ravel(a[:,1:]) #axis = array([0,1]) #values = array([0,10]) print "axis",axis print "values",values i = InterpolatingFunction((axis,), values) print "value",i(0.5) i = InterpolatingFunction((array([0,1]),),array([0,10])) print "value",i(0.5) dict = {1:10, 2:20} print dict.get(100,dict[2]) inundation_damage('source.sww', 'augmented_buildings_high.csv', 'augmented_buildings_high_dsg.csv')