source: anuga_core/source/anuga/damage_modelling/test_inundation_damage.py @ 7743

Last change on this file since 7743 was 7743, checked in by hudson, 14 years ago

Removed more modules from data_handler: code to do with building destruction.

File size: 21.8 KB
Line 
1#!/usr/bin/env python
2#
3
4import unittest
5import tempfile
6import os, sys
7import time
8import csv
9
10#from anuga.damage.inundation_damage import _calc_collapse_structures
11from inundation_damage import *
12from anuga.geospatial_data.geospatial_data import Geospatial_data
13from anuga.pmesh.mesh import Mesh
14from anuga.coordinate_transforms.geo_reference import Geo_reference
15from anuga.shallow_water import Domain, Transmissive_boundary
16from anuga.utilities.numerical_tools import mean
17from anuga.shallow_water.sww_file import SWW_file
18
19import numpy as num
20
21
22def elevation_function(x, y):
23    return -x
24
25class Test_inundation_damage(unittest.TestCase):
26    # Class variable
27    verbose = False
28
29    def set_verbose(self):
30        Test_Data_Manager.verbose = True
31       
32    def setUp(self):
33        #print "****set up****"
34        # Create an sww file
35       
36        # Set up an sww that has a geo ref.
37        # have it cover an area in Australia.  'gong maybe
38        #Don't have many triangles though!
39       
40        #Site Name:    GDA-MGA: (UTM with GRS80 ellipsoid)
41        #Zone:   56   
42        #Easting:  222908.705  Northing: 6233785.284
43        #Latitude:   -34  0 ' 0.00000 ''  Longitude: 150  0 ' 0.00000 ''
44        #Grid Convergence:  -1  40 ' 43.13 ''  Point Scale: 1.00054660
45
46        #geo-ref
47        #Zone:   56   
48        #Easting:  220000  Northing: 6230000
49
50
51        #have  a big area covered.
52        mesh_file = tempfile.mktemp(".tsh")
53        points_lat_long = [[-33,152],[-35,152],[-35,150],[-33,150]]
54        spat = Geospatial_data(data_points=points_lat_long,
55                               points_are_lats_longs=True)
56        points_ab = spat.get_data_points( absolute = True)
57       
58        geo =  Geo_reference(56,400000,6000000)
59        spat.set_geo_reference(geo)
60        m = Mesh()
61        m.add_vertices(spat)
62        m.auto_segment()
63        m.generate_mesh(verbose=False)
64        m.export_mesh_file(mesh_file)
65       
66        #Create shallow water domain
67        domain = Domain(mesh_file)
68
69        os.remove(mesh_file)
70       
71        domain.default_order=2
72        #Set some field values
73        #domain.set_quantity('stage', 1.0)
74        domain.set_quantity('elevation', -0.5)
75        domain.set_quantity('friction', 0.03)
76
77        ######################
78        # Boundary conditions
79        B = Transmissive_boundary(domain)
80        domain.set_boundary( {'exterior': B})
81
82        ######################
83        #Initial condition - with jumps
84        bed = domain.quantities['elevation'].vertex_values
85        stage = num.zeros(bed.shape, num.float)
86
87        h = 0.3
88        for i in range(stage.shape[0]):
89            if i % 2 == 0:
90                stage[i,:] = bed[i,:] + h
91            else:
92                stage[i,:] = bed[i,:]
93
94        domain.set_quantity('stage', stage)
95        domain.set_quantity('xmomentum', stage*22.0)
96        domain.set_quantity('ymomentum', stage*55.0)
97
98        domain.distribute_to_vertices_and_edges()
99
100        self.domain = domain
101
102        C = domain.get_vertex_coordinates()
103        self.X = C[:,0:6:2].copy()
104        self.Y = C[:,1:6:2].copy()
105
106        self.F = bed
107     
108        #sww_file = tempfile.mktemp("")
109        self.domain.set_name('tid_P0')
110        self.domain.format = 'sww'
111        self.domain.smooth = True
112        self.domain.reduction = mean
113
114        sww = SWW_file(self.domain)
115        sww.store_connectivity()
116        sww.store_timestep()
117        self.domain.time = 2.
118        sww.store_timestep()
119        self.sww = sww # so it can be deleted
120       
121        #Create another sww file
122        mesh_file = tempfile.mktemp(".tsh")
123        points_lat_long = [[-35,152],[-36,152],[-36,150],[-35,150]]
124        spat = Geospatial_data(data_points=points_lat_long,
125                               points_are_lats_longs=True)
126        points_ab = spat.get_data_points( absolute = True)
127       
128        geo =  Geo_reference(56,400000,6000000)
129        spat.set_geo_reference(geo)
130        m = Mesh()
131        m.add_vertices(spat)
132        m.auto_segment()
133        m.generate_mesh(verbose=False)
134        m.export_mesh_file(mesh_file)
135       
136        #Create shallow water domain
137        domain = Domain(mesh_file)
138
139        os.remove(mesh_file)
140       
141        domain.default_order=2
142        #Set some field values
143        #domain.set_quantity('stage', 1.0)
144        domain.set_quantity('elevation', -40)
145        domain.set_quantity('friction', 0.03)
146
147        ######################
148        # Boundary conditions
149        B = Transmissive_boundary(domain)
150        domain.set_boundary( {'exterior': B})
151
152        ######################
153        #Initial condition - with jumps
154        bed = domain.quantities['elevation'].vertex_values
155        stage = num.zeros(bed.shape, num.float)
156
157        h = 30.
158        for i in range(stage.shape[0]):
159            if i % 2 == 0:
160                stage[i,:] = bed[i,:] + h
161            else:
162                stage[i,:] = bed[i,:]
163
164        domain.set_quantity('stage', stage)
165        domain.set_quantity('xmomentum', stage*22.0)
166        domain.set_quantity('ymomentum', stage*55.0)
167
168        domain.distribute_to_vertices_and_edges()
169
170        self.domain2 = domain
171
172        C = domain.get_vertex_coordinates()
173        self.X2 = C[:,0:6:2].copy()
174        self.Y2 = C[:,1:6:2].copy()
175
176        self.F2 = bed
177     
178        #sww_file = tempfile.mktemp("")
179        domain.set_name('tid_P1')
180        domain.format = 'sww'
181        domain.smooth = True
182        domain.reduction = mean
183
184        sww = SWW_file(domain)
185        sww.store_connectivity()
186        sww.store_timestep()
187        domain.time = 2.
188        sww.store_timestep()
189        self.swwII = sww # so it can be deleted
190
191        # print "sww.filename", sww.filename
192        #Create a csv file
193        self.csv_file = tempfile.mktemp(".csv")
194        fd = open(self.csv_file,'wb')
195        writer = csv.writer(fd)
196        writer.writerow(['LONGITUDE','LATITUDE',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
197        writer.writerow(['151.5','-34','199770','130000','Metal','Timber',20.])
198        writer.writerow(['151','-34.5','150000','76000','Metal','Double Brick',200.])
199        writer.writerow(['151','-34.25','150000','76000','Metal','Brick Veneer',200.])
200        fd.close()
201
202        #Create a csv file
203        self.csv_fileII = tempfile.mktemp(".csv")
204        fd = open(self.csv_fileII,'wb')
205        writer = csv.writer(fd)
206        writer.writerow(['LONGITUDE','LATITUDE',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
207        writer.writerow(['151.5','-34','199770','130000','Metal','Timber',20.])
208        writer.writerow(['151','-34.5','150000','76000','Metal','Double Brick',200.])
209        writer.writerow(['151','-34.25','150000','76000','Metal','Brick Veneer',200.])
210        fd.close()
211       
212    def tearDown(self):
213        #print "***** tearDown  ********"
214
215        # FIXME (Ole): Sometimes this fails - is the file open or is it sometimes not created?
216        try:
217            # Sometimes this fails - don't know why.
218            # Seems to be that the file is not created, since after it
219            # fails there are no sww files in the anuga directory
220            os.remove(self.sww.filename)
221            os.remove(self.swwII.filename)
222        except OSError:
223            pass
224        os.remove(self.csv_file)
225        os.remove(self.csv_fileII)
226
227   
228    def test_inundation_damage(self):
229
230        # Note, this isn't testing the results,
231        # just that is all runs
232        sww_file = self.domain.get_name() + "." + self.domain.format
233        #print "sww_file",sww_file
234        inundation_damage(sww_file, self.csv_file, verbose=False)
235
236   
237    def test_inundation_damage_list_as_input(self):
238
239        # Note, this isn't testing the results,
240        # just that is all runs
241        sww_file = self.domain.get_name() + "." + self.domain.format
242        #print "sww_file",sww_file
243        inundation_damage(sww_file,
244                          [self.csv_file, self.csv_fileII], verbose=False)
245
246    def test_inundation_damage2(self):
247
248        # create mesh
249        mesh_file = tempfile.mktemp(".tsh")   
250        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
251        m = Mesh()
252        m.add_vertices(points)
253        m.auto_segment()
254        m.generate_mesh(verbose=False)
255        m.export_mesh_file(mesh_file)
256       
257        #Create shallow water domain
258        domain = Domain(mesh_file)
259        os.remove(mesh_file)
260       
261        domain.default_order=2
262
263        #Set some field values
264        domain.set_quantity('elevation', elevation_function)
265        domain.set_quantity('friction', 0.03)
266        domain.set_quantity('xmomentum', 22.0)
267        domain.set_quantity('ymomentum', 55.0)
268
269        ######################
270        # Boundary conditions
271        B = Transmissive_boundary(domain)
272        domain.set_boundary( {'exterior': B})
273
274        # This call mangles the stage values.
275        domain.distribute_to_vertices_and_edges()
276        domain.set_quantity('stage', 0.3)
277
278        #sww_file = tempfile.mktemp("")
279        domain.set_name('datatest' + str(time.time()))
280        domain.format = 'sww'
281        domain.smooth = True
282        domain.reduction = mean
283
284        sww = SWW_file(domain)
285        sww.store_connectivity()
286        sww.store_timestep()
287        domain.set_quantity('stage', -0.3)
288        domain.time = 2.
289        sww.store_timestep()
290       
291        #Create a csv file
292        csv_file = tempfile.mktemp(".csv")
293        fd = open(csv_file,'wb')
294        writer = csv.writer(fd)
295        writer.writerow(['x', 'y', STR_VALUE_LABEL, CONT_VALUE_LABEL, \
296        'ROOF_TYPE', WALL_TYPE_LABEL, SHORE_DIST_LABEL])
297        writer.writerow([5.5,0.5,'10','130000','Metal','Timber',20])
298        writer.writerow([4.5,1.0,'150','76000','Metal','Double Brick',20])
299        writer.writerow([0.1,1.5,'100','76000','Metal','Brick Veneer',300])
300        writer.writerow([6.1,1.5,'100','76000','Metal','Brick Veneer',300])
301        fd.close()
302
303        sww_file = domain.get_name() + "." + domain.format
304        #print "sww_file",sww_file
305        inundation_damage(sww_file, csv_file, verbose=False)
306
307        csv_handle = Exposure(csv_file)
308        struct_loss = csv_handle.get_column(EventDamageModel.STRUCT_LOSS_TITLE)
309        #print "struct_loss",struct_loss
310        struct_loss = [float(x) for x in struct_loss]
311        assert num.allclose(struct_loss,[10,150,16.9,0])
312        depth = csv_handle.get_column(EventDamageModel.MAX_DEPTH_TITLE)
313        #print "depth",depth
314        depth = [float(x) for x in depth]
315        assert num.allclose(depth,[5.5,4.5,0.1,-0.3])
316        os.remove(sww.filename)
317        os.remove(csv_file)
318         
319    def test_inundation_damage_list(self):
320
321        # create mesh
322        mesh_file = tempfile.mktemp(".tsh")   
323        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
324        m = Mesh()
325        m.add_vertices(points)
326        m.auto_segment()
327        m.generate_mesh(verbose=False)
328        m.export_mesh_file(mesh_file)
329       
330        #Create shallow water domain
331        domain = Domain(mesh_file)
332        os.remove(mesh_file)
333       
334        domain.default_order=2
335
336        #Set some field values
337        domain.set_quantity('elevation', elevation_function)
338        domain.set_quantity('friction', 0.03)
339        domain.set_quantity('xmomentum', 22.0)
340        domain.set_quantity('ymomentum', 55.0)
341
342        ######################
343        # Boundary conditions
344        B = Transmissive_boundary(domain)
345        domain.set_boundary( {'exterior': B})
346
347        # This call mangles the stage values.
348        domain.distribute_to_vertices_and_edges()
349        domain.set_quantity('stage', 0.3)
350
351        #sww_file = tempfile.mktemp("")
352        domain.set_name('datatest' + str(time.time()))
353        domain.format = 'sww'
354        domain.smooth = True
355        domain.reduction = mean
356
357        sww = SWW_file(domain)
358        sww.store_connectivity()
359        sww.store_timestep()
360        domain.set_quantity('stage', -0.3)
361        domain.time = 2.
362        sww.store_timestep()
363       
364        #Create a csv file
365        csv_file = tempfile.mktemp(".csv")
366        fd = open(csv_file,'wb')
367        writer = csv.writer(fd)
368        writer.writerow(['x','y',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
369        writer.writerow([5.5,0.5,'10','130000','Metal','Timber',20])
370        writer.writerow([4.5,1.0,'150','76000','Metal','Double Brick',20])
371        writer.writerow([0.1,1.5,'100','76000','Metal','Brick Veneer',300])
372        writer.writerow([6.1,1.5,'100','76000','Metal','Brick Veneer',300])
373        fd.close()
374       
375        extension = ".csv"
376        csv_fileII = tempfile.mktemp(extension)
377        fd = open(csv_fileII,'wb')
378        writer = csv.writer(fd)
379        writer.writerow(['x','y',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
380        writer.writerow([5.5,0.5,'10','130000','Metal','Timber',20])
381        writer.writerow([4.5,1.0,'150','76000','Metal','Double Brick',20])
382        writer.writerow([0.1,1.5,'100','76000','Metal','Brick Veneer',300])
383        writer.writerow([6.1,1.5,'100','76000','Metal','Brick Veneer',300])
384        fd.close()
385       
386        sww_file = domain.get_name() + "." + domain.format
387        #print "sww_file",sww_file
388        marker='_gosh'
389        inundation_damage(sww_file, [csv_file, csv_fileII],
390                          exposure_file_out_marker=marker,
391                          verbose=False)
392
393        # Test one file
394        csv_handle = Exposure(csv_file[:-4]+marker+extension)
395        struct_loss = csv_handle.get_column(EventDamageModel.STRUCT_LOSS_TITLE)
396        #print "struct_loss",struct_loss
397        struct_loss = [float(x) for x in struct_loss]
398        assert num.allclose(struct_loss,[10,150,16.9,0])       
399        depth = csv_handle.get_column(EventDamageModel.MAX_DEPTH_TITLE)
400        #print "depth",depth
401        depth = [float(x) for x in depth]
402        assert num.allclose(depth,[5.5,4.5,0.1,-0.3])
403       
404        # Test another file
405        csv_handle = Exposure(csv_fileII[:-4]+marker+extension)
406        struct_loss = csv_handle.get_column(EventDamageModel.STRUCT_LOSS_TITLE)
407        #print "struct_loss",struct_loss
408        struct_loss = [float(x) for x in struct_loss]
409        assert num.allclose(struct_loss,[10,150,16.9,0])       
410        depth = csv_handle.get_column(EventDamageModel.MAX_DEPTH_TITLE)
411        #print "depth",depth
412        depth = [float(x) for x in depth]
413        assert num.allclose(depth,[5.5,4.5,0.1,-0.3]) 
414        os.remove(sww.filename)
415        os.remove(csv_file)
416        os.remove(csv_fileII)
417       
418    def ztest_add_depth_and_momentum2csv(self):
419        sww_file = self.domain.get_name() + "." + self.domain.format
420        #print "sww_file",sww_file
421       
422        out_csv = tempfile.mktemp(".csv")
423        print "out_csv",out_csv
424        add_depth_and_momentum2csv(sww_file, self.csv_file,
425                                   out_csv, verbose=False)
426       
427    def test_calc_damage_percentages(self):
428        max_depths = [-0.3, 0.0, 1.0,-0.3, 0.0, 1.0,-0.3, 0.0, 1.0]
429        shore_distances = [100, 100, 100, 100, 100, 100, 100, 100, 100]
430        walls = ['Double Brick',
431                 'Double Brick',
432                 'Double Brick',
433                 'Timber',
434                 'Timber',
435                 'Timber',
436                 'Brick Veneer',
437                 'Brick Veneer',
438                 'Brick Veneer']
439        struct_costs = [10,
440                        10,
441                        10,
442                        10,
443                        10,
444                        10,
445                        1,
446                        1,
447                        1]
448        content_costs = [100,
449                        100,
450                        100,
451                        100,
452                        100,
453                        100,
454                        10,
455                        10,
456                        10]
457
458        edm = EventDamageModel(max_depths, shore_distances, walls,
459                               struct_costs, content_costs)
460        edm.calc_damage_percentages()
461        assert num.allclose(edm.struct_damage,[0.0,0.016,0.572,
462                                               0.0,0.016,0.618,
463                                               0.0,0.016,0.618])
464        assert num.allclose(edm.contents_damage,[0.0,0.013,0.970,
465                                                 0.0,0.013,0.970,
466                                                 0.0,0.013,0.970])
467        edm.calc_cost()
468        assert num.allclose(edm.struct_loss,[0.0,.16,5.72,
469                                             0.0,.16,6.18,
470                                             0.0,0.016,0.618])
471        assert num.allclose(edm.contents_loss,[0.0,1.3,97,
472                                               0.0,1.3,97,
473                                               0.0,0.13,9.7])
474       
475       
476    def test_calc_collapse_structures(self):
477        edm = EventDamageModel([0.0]*17, [0.0]*17, [0.0]*17,
478                               [0.0]*17, [0.0]*17)
479        edm.struct_damage = num.zeros(17,num.float) 
480        edm.contents_damage = num.zeros(17,num.float) 
481        collapse_probability = {0.4:[0], #0
482                                0.6:[1], #1
483                                0.5:[2], #1
484                                0.25:[3,4], #1
485                                0.1:[5,6,7,8], #0
486                                0.2:[9,10,11,12,13,14,15,16]} #2
487        edm._calc_collapse_structures(collapse_probability, verbose_csv=True)
488
489        self.failUnless( edm.struct_damage[0]  == 0.0 and
490                         edm.contents_damage[0]  == 0.0,
491                        'Error!')
492        self.failUnless( edm.struct_damage[1]  == 1.0 and
493                         edm.contents_damage[1]  == 1.0,
494                        'Error!')
495        self.failUnless( edm.struct_damage[2]  == 1.0 and
496                         edm.contents_damage[2]  == 1.0,
497                        'Error!')
498        self.failUnless( edm.struct_damage[3]+ edm.struct_damage[4] == 1.0 and
499                         edm.contents_damage[3] + edm.contents_damage[4] ==1.0,
500                        'Error!')
501        sum_struct = 0.0
502        sum_contents = 0.0
503        for i in [5,6,7,8]:
504            sum_struct += edm.struct_damage[i]
505            sum_contents += edm.contents_damage[i]
506        print "", 
507        self.failUnless( sum_struct == 0.0 and sum_contents  == 0.0,
508                        'Error!')
509        sum_struct = 0.0
510        sum_contents = 0.0
511        for i in [9,10,11,12,13,14,15,16]:
512            sum_struct += edm.struct_damage[i]
513            sum_contents += edm.contents_damage[i]
514        self.failUnless( sum_struct == 2.0 and sum_contents  == 2.0,
515                        'Error!')
516       
517    def test_calc_collapse_probability(self):
518        depth =          [0.0, 0.5, 0.5  , 1.5, 2.5, 4.5, 10000, 2.0]
519        shore_distance = [0.0, 125, 250.1, 0.0, 150, 225, 10000, 251]
520        dummy = depth
521        edm = EventDamageModel(depth, shore_distance, dummy, dummy, dummy)
522        struct_coll_prob = edm.calc_collapse_probability()
523        answer = {0.05:[1,7],
524                  0.6:[3],
525                  0.4:[4],
526                  0.5:[5],
527                  0.45:[6]}
528        #print "struct_coll_prob",struct_coll_prob
529        #print "answer",answer
530
531        self.failUnless( struct_coll_prob ==  answer,
532                        'Error!')
533       
534       
535    def test_calc_damage_and_costs(self):
536                             
537        max_depths = [-0.3, 0.0, 1.0,-0.3, 0.0, 1.0,-0.3, 0.0, 10.0]
538        shore_distances = [100, 100, 100, 100, 100, 100, 100, 100, 100]
539        walls = ['Double Brick',
540                 'Double Brick',
541                 'Double Brick',
542                 'Timber',
543                 'Timber',
544                 'Timber',
545                 'Brick Veneer',
546                 'Brick Veneer',
547                 'Brick Veneer']
548        struct_costs = [10,
549                        10,
550                        10,
551                        10,
552                        10,
553                        10,
554                        1,
555                        1,
556                        1]
557        content_costs = [100,
558                        100,
559                        100,
560                        100,
561                        100,
562                        100,
563                        10,
564                        10,
565                        10]
566
567        edm = EventDamageModel(max_depths, shore_distances, walls,
568                               struct_costs, content_costs)
569        results_dic = edm.calc_damage_and_costs(verbose_csv=True)
570        #print "results_dic",results_dic
571       
572    def test_calc_max_depth_and_momentum(self):
573        sww_file = "tid" # self.domain.get_name() + "." + self.domain.format
574        points_lat_long = [[-34, 151.5],[-35.5, 151.5],[-50, 151]]
575        spat = Geospatial_data(data_points=points_lat_long,
576                               points_are_lats_longs=True)
577        points_ab = spat.get_data_points( absolute = True)
578        deps, _ = calc_max_depth_and_momentum(sww_file,
579                                              points_ab,
580                                              verbose=self.verbose,
581                                              use_cache = False)
582
583        # Test values based on returned results, so not an excellent test
584       
585        assert num.allclose(deps[0],0.113204555211)
586        assert num.allclose(deps[1],11.3215)
587        assert num.allclose(deps[2],0.0) # this value is outside both sww files
588       
589#-------------------------------------------------------------
590if __name__ == "__main__":
591    if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V':
592        Test_inundation_damage.verbose=True
593        saveout = sys.stdout   
594        filename = ".temp_verbose"
595        fid = open(filename, 'w')
596        sys.stdout = fid
597    else:
598        pass
599    suite = unittest.makeSuite(Test_inundation_damage,'test')
600    runner = unittest.TextTestRunner()
601    runner.run(suite)
602
603    # Cleaning up
604    if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V':
605        sys.stdout = saveout
606        fid.close() 
607        os.remove(filename)
Note: See TracBrowser for help on using the repository browser.