1 | #!/usr/bin/env python |
---|
2 | # |
---|
3 | |
---|
4 | import unittest |
---|
5 | import tempfile |
---|
6 | import os |
---|
7 | import time |
---|
8 | import csv |
---|
9 | |
---|
10 | #from anuga.damage.inundation_damage import _calc_collapse_structures |
---|
11 | from anuga.damage.inundation_damage import * |
---|
12 | from anuga.geospatial_data.geospatial_data import Geospatial_data |
---|
13 | from anuga.pmesh.mesh import Mesh |
---|
14 | from anuga.coordinate_transforms.geo_reference import Geo_reference |
---|
15 | from anuga.pyvolution.shallow_water import Domain, Transmissive_boundary |
---|
16 | from anuga.utilities.numerical_tools import mean |
---|
17 | from anuga.pyvolution.data_manager import get_dataobject |
---|
18 | |
---|
19 | from Numeric import zeros, Float, allclose |
---|
20 | |
---|
21 | |
---|
22 | def elevation_function(x, y): |
---|
23 | return -x |
---|
24 | |
---|
25 | class Test_inundation_damage(unittest.TestCase): |
---|
26 | def setUp(self): |
---|
27 | #print "****set up****" |
---|
28 | # Create an sww file |
---|
29 | |
---|
30 | # Set up an sww that has a geo ref. |
---|
31 | # have it cover an area in Australia. 'gong maybe |
---|
32 | #Don't have many triangles though! |
---|
33 | |
---|
34 | #Site Name: GDA-MGA: (UTM with GRS80 ellipsoid) |
---|
35 | #Zone: 56 |
---|
36 | #Easting: 222908.705 Northing: 6233785.284 |
---|
37 | #Latitude: -34 0 ' 0.00000 '' Longitude: 150 0 ' 0.00000 '' |
---|
38 | #Grid Convergence: -1 40 ' 43.13 '' Point Scale: 1.00054660 |
---|
39 | |
---|
40 | #geo-ref |
---|
41 | #Zone: 56 |
---|
42 | #Easting: 220000 Northing: 6230000 |
---|
43 | |
---|
44 | |
---|
45 | #have a big area covered. |
---|
46 | |
---|
47 | mesh_file = tempfile.mktemp(".tsh") |
---|
48 | |
---|
49 | points_lat_long = [[-33,152],[-35,152],[-35,150],[-33,150]] |
---|
50 | |
---|
51 | spat = Geospatial_data(data_points=points_lat_long, |
---|
52 | points_are_lats_longs=True) |
---|
53 | points_ab = spat.get_data_points( absolute = True) |
---|
54 | |
---|
55 | |
---|
56 | geo = Geo_reference(56,400000,6000000) |
---|
57 | spat.set_geo_reference(geo) |
---|
58 | |
---|
59 | m = Mesh() |
---|
60 | m.add_vertices(spat) |
---|
61 | m.auto_segment() |
---|
62 | m.generate_mesh(verbose=False) |
---|
63 | m.export_mesh_file(mesh_file) |
---|
64 | |
---|
65 | |
---|
66 | #Create shallow water domain |
---|
67 | domain = Domain(mesh_file) |
---|
68 | |
---|
69 | os.remove(mesh_file) |
---|
70 | |
---|
71 | domain.default_order=2 |
---|
72 | domain.beta_h = 0 |
---|
73 | |
---|
74 | |
---|
75 | #Set some field values |
---|
76 | #domain.set_quantity('stage', 1.0) |
---|
77 | domain.set_quantity('elevation', -0.5) |
---|
78 | domain.set_quantity('friction', 0.03) |
---|
79 | |
---|
80 | |
---|
81 | ###################### |
---|
82 | # Boundary conditions |
---|
83 | B = Transmissive_boundary(domain) |
---|
84 | domain.set_boundary( {'exterior': B}) |
---|
85 | |
---|
86 | |
---|
87 | ###################### |
---|
88 | #Initial condition - with jumps |
---|
89 | |
---|
90 | bed = domain.quantities['elevation'].vertex_values |
---|
91 | stage = zeros(bed.shape, Float) |
---|
92 | |
---|
93 | h = 0.3 |
---|
94 | for i in range(stage.shape[0]): |
---|
95 | if i % 2 == 0: |
---|
96 | stage[i,:] = bed[i,:] + h |
---|
97 | else: |
---|
98 | stage[i,:] = bed[i,:] |
---|
99 | |
---|
100 | domain.set_quantity('stage', stage) |
---|
101 | domain.set_quantity('xmomentum', stage*22.0) |
---|
102 | domain.set_quantity('ymomentum', stage*55.0) |
---|
103 | |
---|
104 | domain.distribute_to_vertices_and_edges() |
---|
105 | |
---|
106 | |
---|
107 | self.domain = domain |
---|
108 | |
---|
109 | C = domain.get_vertex_coordinates() |
---|
110 | self.X = C[:,0:6:2].copy() |
---|
111 | self.Y = C[:,1:6:2].copy() |
---|
112 | |
---|
113 | self.F = bed |
---|
114 | |
---|
115 | |
---|
116 | #sww_file = tempfile.mktemp("") |
---|
117 | self.domain.filename = 'datatest' + str(time.time()) |
---|
118 | #self.domain.filename = sww_file |
---|
119 | #print "self.domain.filename",self.domain.filename |
---|
120 | self.domain.format = 'sww' |
---|
121 | self.domain.smooth = True |
---|
122 | self.domain.reduction = mean |
---|
123 | |
---|
124 | sww = get_dataobject(self.domain) |
---|
125 | sww.store_connectivity() |
---|
126 | sww.store_timestep(['stage', 'xmomentum', 'ymomentum']) |
---|
127 | self.domain.time = 2. |
---|
128 | sww.store_timestep(['stage', 'xmomentum', 'ymomentum']) |
---|
129 | self.sww = sww # so it can be deleted |
---|
130 | |
---|
131 | #Create a csv file |
---|
132 | self.csv_file = tempfile.mktemp(".csv") |
---|
133 | fd = open(self.csv_file,'wb') |
---|
134 | writer = csv.writer(fd) |
---|
135 | writer.writerow(['LONGITUDE','LATITUDE','STR_VALUE','C_VALUE','ROOF_TYPE','WALLS', 'SHORE_DIST']) |
---|
136 | writer.writerow(['151.5','-34','199770','130000','Metal','Timber',20.]) |
---|
137 | writer.writerow(['151','-34.5','150000','76000','Metal','Double Brick',200.]) |
---|
138 | writer.writerow(['151','-34.25','150000','76000','Metal','Brick Veneer',200.]) |
---|
139 | fd.close() |
---|
140 | |
---|
141 | |
---|
142 | def tearDown(self): |
---|
143 | #print "***** tearDown ********" |
---|
144 | os.remove(self.sww.filename) |
---|
145 | os.remove(self.csv_file) |
---|
146 | |
---|
147 | |
---|
148 | def test_inundation_damage(self): |
---|
149 | |
---|
150 | # Note, this isn't testing the results, |
---|
151 | # just that is all runs |
---|
152 | sww_file = self.domain.filename + "." + self.domain.format |
---|
153 | #print "sww_file",sww_file |
---|
154 | inundation_damage(sww_file, self.csv_file, verbose=False) |
---|
155 | |
---|
156 | |
---|
157 | def test_inundation_damage2(self): |
---|
158 | |
---|
159 | # create mesh |
---|
160 | mesh_file = tempfile.mktemp(".tsh") |
---|
161 | points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]] |
---|
162 | m = Mesh() |
---|
163 | m.add_vertices(points) |
---|
164 | m.auto_segment() |
---|
165 | m.generate_mesh(verbose=False) |
---|
166 | m.export_mesh_file(mesh_file) |
---|
167 | |
---|
168 | #Create shallow water domain |
---|
169 | domain = Domain(mesh_file) |
---|
170 | os.remove(mesh_file) |
---|
171 | |
---|
172 | domain.default_order=2 |
---|
173 | domain.beta_h = 0 |
---|
174 | |
---|
175 | #Set some field values |
---|
176 | domain.set_quantity('elevation', elevation_function) |
---|
177 | domain.set_quantity('friction', 0.03) |
---|
178 | domain.set_quantity('xmomentum', 22.0) |
---|
179 | domain.set_quantity('ymomentum', 55.0) |
---|
180 | |
---|
181 | ###################### |
---|
182 | # Boundary conditions |
---|
183 | B = Transmissive_boundary(domain) |
---|
184 | domain.set_boundary( {'exterior': B}) |
---|
185 | |
---|
186 | # This call mangles the stage values. |
---|
187 | domain.distribute_to_vertices_and_edges() |
---|
188 | domain.set_quantity('stage', 0.3) |
---|
189 | |
---|
190 | #sww_file = tempfile.mktemp("") |
---|
191 | domain.filename = 'datatest' + str(time.time()) |
---|
192 | #domain.filename = sww_file |
---|
193 | #print "domain.filename",domain.filename |
---|
194 | domain.format = 'sww' |
---|
195 | domain.smooth = True |
---|
196 | domain.reduction = mean |
---|
197 | |
---|
198 | sww = get_dataobject(domain) |
---|
199 | sww.store_connectivity() |
---|
200 | sww.store_timestep(['stage', 'xmomentum', 'ymomentum']) |
---|
201 | domain.set_quantity('stage', -0.3) |
---|
202 | domain.time = 2. |
---|
203 | sww.store_timestep(['stage', 'xmomentum', 'ymomentum']) |
---|
204 | |
---|
205 | #Create a csv file |
---|
206 | csv_file = tempfile.mktemp(".csv") |
---|
207 | fd = open(csv_file,'wb') |
---|
208 | writer = csv.writer(fd) |
---|
209 | writer.writerow(['x','y','STR_VALUE','C_VALUE','ROOF_TYPE','WALLS', 'SHORE_DIST']) |
---|
210 | writer.writerow([5.5,0.5,'10','130000','Metal','Timber',20]) |
---|
211 | writer.writerow([4.5,1.0,'150','76000','Metal','Double Brick',20]) |
---|
212 | writer.writerow([0.1,1.5,'100','76000','Metal','Brick Veneer',300]) |
---|
213 | writer.writerow([6.1,1.5,'100','76000','Metal','Brick Veneer',300]) |
---|
214 | fd.close() |
---|
215 | |
---|
216 | sww_file = domain.filename + "." + domain.format |
---|
217 | #print "sww_file",sww_file |
---|
218 | inundation_damage(sww_file, csv_file, verbose=False) |
---|
219 | |
---|
220 | csv_handle = Exposure_csv(csv_file) |
---|
221 | struct_loss = csv_handle.get_column(EventDamageModel.STRUCT_LOSS_TITLE) |
---|
222 | #print "struct_loss",struct_loss |
---|
223 | struct_loss = [float(x) for x in struct_loss] |
---|
224 | assert allclose(struct_loss,[10,150,16.9,0]) |
---|
225 | depth = csv_handle.get_column(EventDamageModel.MAX_DEPTH_TITLE) |
---|
226 | #print "depth",depth |
---|
227 | depth = [float(x) for x in depth] |
---|
228 | assert allclose(depth,[5.5,4.5,0.1,-0.3]) |
---|
229 | os.remove(sww.filename) |
---|
230 | os.remove(csv_file) |
---|
231 | |
---|
232 | def ztest_add_depth_and_momentum2csv(self): |
---|
233 | sww_file = self.domain.filename + "." + self.domain.format |
---|
234 | #print "sww_file",sww_file |
---|
235 | |
---|
236 | out_csv = tempfile.mktemp(".csv") |
---|
237 | print "out_csv",out_csv |
---|
238 | add_depth_and_momentum2csv(sww_file, self.csv_file, |
---|
239 | out_csv, verbose=False) |
---|
240 | |
---|
241 | def test_calc_damage_percentages(self): |
---|
242 | max_depths = [-0.3, 0.0, 1.0,-0.3, 0.0, 1.0,-0.3, 0.0, 1.0] |
---|
243 | shore_distances = [100, 100, 100, 100, 100, 100, 100, 100, 100] |
---|
244 | walls = ['Double Brick', |
---|
245 | 'Double Brick', |
---|
246 | 'Double Brick', |
---|
247 | 'Timber', |
---|
248 | 'Timber', |
---|
249 | 'Timber', |
---|
250 | 'Brick Veneer', |
---|
251 | 'Brick Veneer', |
---|
252 | 'Brick Veneer'] |
---|
253 | struct_costs = [10, |
---|
254 | 10, |
---|
255 | 10, |
---|
256 | 10, |
---|
257 | 10, |
---|
258 | 10, |
---|
259 | 1, |
---|
260 | 1, |
---|
261 | 1] |
---|
262 | content_costs = [100, |
---|
263 | 100, |
---|
264 | 100, |
---|
265 | 100, |
---|
266 | 100, |
---|
267 | 100, |
---|
268 | 10, |
---|
269 | 10, |
---|
270 | 10] |
---|
271 | |
---|
272 | edm = EventDamageModel(max_depths, shore_distances, walls, |
---|
273 | struct_costs, content_costs) |
---|
274 | edm.calc_damage_percentages() |
---|
275 | assert allclose(edm.struct_damage,[0.0,0.016,0.572, |
---|
276 | 0.0,0.016,0.618, |
---|
277 | 0.0,0.016,0.618]) |
---|
278 | assert allclose(edm.contents_damage,[0.0,0.013,0.970, |
---|
279 | 0.0,0.013,0.970, |
---|
280 | 0.0,0.013,0.970]) |
---|
281 | edm.calc_cost() |
---|
282 | assert allclose(edm.struct_loss,[0.0,.16,5.72, |
---|
283 | 0.0,.16,6.18, |
---|
284 | 0.0,0.016,0.618]) |
---|
285 | assert allclose(edm.contents_loss,[0.0,1.3,97, |
---|
286 | 0.0,1.3,97, |
---|
287 | 0.0,0.13,9.7]) |
---|
288 | |
---|
289 | |
---|
290 | def test_calc_collapse_structures(self): |
---|
291 | edm = EventDamageModel([0.0]*17, [0.0]*17, [0.0]*17, |
---|
292 | [0.0]*17, [0.0]*17) |
---|
293 | edm.struct_damage = zeros(17,Float) |
---|
294 | edm.contents_damage = zeros(17,Float) |
---|
295 | collapse_probability = {0.4:[0], #0 |
---|
296 | 0.6:[1], #1 |
---|
297 | 0.5:[2], #1 |
---|
298 | 0.25:[3,4], #1 |
---|
299 | 0.1:[5,6,7,8], #0 |
---|
300 | 0.2:[9,10,11,12,13,14,15,16]} #2 |
---|
301 | edm._calc_collapse_structures(collapse_probability, verbose_csv=True) |
---|
302 | |
---|
303 | self.failUnless( edm.struct_damage[0] == 0.0 and |
---|
304 | edm.contents_damage[0] == 0.0, |
---|
305 | 'Error!') |
---|
306 | self.failUnless( edm.struct_damage[1] == 1.0 and |
---|
307 | edm.contents_damage[1] == 1.0, |
---|
308 | 'Error!') |
---|
309 | self.failUnless( edm.struct_damage[2] == 1.0 and |
---|
310 | edm.contents_damage[2] == 1.0, |
---|
311 | 'Error!') |
---|
312 | self.failUnless( edm.struct_damage[3]+ edm.struct_damage[4] == 1.0 and |
---|
313 | edm.contents_damage[3] + edm.contents_damage[4] ==1.0, |
---|
314 | 'Error!') |
---|
315 | sum_struct = 0.0 |
---|
316 | sum_contents = 0.0 |
---|
317 | for i in [5,6,7,8]: |
---|
318 | sum_struct += edm.struct_damage[i] |
---|
319 | sum_contents += edm.contents_damage[i] |
---|
320 | print "", |
---|
321 | self.failUnless( sum_struct == 0.0 and sum_contents == 0.0, |
---|
322 | 'Error!') |
---|
323 | sum_struct = 0.0 |
---|
324 | sum_contents = 0.0 |
---|
325 | for i in [9,10,11,12,13,14,15,16]: |
---|
326 | sum_struct += edm.struct_damage[i] |
---|
327 | sum_contents += edm.contents_damage[i] |
---|
328 | self.failUnless( sum_struct == 2.0 and sum_contents == 2.0, |
---|
329 | 'Error!') |
---|
330 | |
---|
331 | def test_calc_collapse_probability(self): |
---|
332 | depth = [0.0, 0.5, 0.5 , 1.5, 2.5, 4.5, 10000, 2.0] |
---|
333 | shore_distance = [0.0, 125, 250.1, 0.0, 150, 225, 10000, 251] |
---|
334 | dummy = depth |
---|
335 | edm = EventDamageModel(depth, shore_distance, dummy, dummy, dummy) |
---|
336 | struct_coll_prob = edm.calc_collapse_probability() |
---|
337 | answer = {0.05:[1,7], |
---|
338 | 0.6:[3], |
---|
339 | 0.4:[4], |
---|
340 | 0.5:[5], |
---|
341 | 0.45:[6]} |
---|
342 | #print "struct_coll_prob",struct_coll_prob |
---|
343 | #print "answer",answer |
---|
344 | |
---|
345 | self.failUnless( struct_coll_prob == answer, |
---|
346 | 'Error!') |
---|
347 | |
---|
348 | |
---|
349 | def test_calc_damage_and_costs(self): |
---|
350 | |
---|
351 | max_depths = [-0.3, 0.0, 1.0,-0.3, 0.0, 1.0,-0.3, 0.0, 10.0] |
---|
352 | shore_distances = [100, 100, 100, 100, 100, 100, 100, 100, 100] |
---|
353 | walls = ['Double Brick', |
---|
354 | 'Double Brick', |
---|
355 | 'Double Brick', |
---|
356 | 'Timber', |
---|
357 | 'Timber', |
---|
358 | 'Timber', |
---|
359 | 'Brick Veneer', |
---|
360 | 'Brick Veneer', |
---|
361 | 'Brick Veneer'] |
---|
362 | struct_costs = [10, |
---|
363 | 10, |
---|
364 | 10, |
---|
365 | 10, |
---|
366 | 10, |
---|
367 | 10, |
---|
368 | 1, |
---|
369 | 1, |
---|
370 | 1] |
---|
371 | content_costs = [100, |
---|
372 | 100, |
---|
373 | 100, |
---|
374 | 100, |
---|
375 | 100, |
---|
376 | 100, |
---|
377 | 10, |
---|
378 | 10, |
---|
379 | 10] |
---|
380 | |
---|
381 | edm = EventDamageModel(max_depths, shore_distances, walls, |
---|
382 | struct_costs, content_costs) |
---|
383 | results_dic = edm.calc_damage_and_costs(verbose_csv=True) |
---|
384 | #print "results_dic",results_dic |
---|
385 | #------------------------------------------------------------- |
---|
386 | if __name__ == "__main__": |
---|
387 | #suite = unittest.makeSuite(Test_inundation_damage,'test_in_damage2') |
---|
388 | suite = unittest.makeSuite(Test_inundation_damage,'test') |
---|
389 | runner = unittest.TextTestRunner() |
---|
390 | runner.run(suite) |
---|
391 | |
---|