source: anuga_core/source/anuga/damage_modelling/test_inundation_damage.py @ 4742

Last change on this file since 4742 was 4742, checked in by duncan, 17 years ago

ticket#196

File size: 22.8 KB
Line 
1#!/usr/bin/env python
2#
3
4import unittest
5import tempfile
6import os, sys
7import time
8import csv
9
10#from anuga.damage.inundation_damage import _calc_collapse_structures
11from inundation_damage import *
12from anuga.geospatial_data.geospatial_data import Geospatial_data
13from anuga.pmesh.mesh import Mesh
14from anuga.coordinate_transforms.geo_reference import Geo_reference
15from anuga.shallow_water import Domain, Transmissive_boundary
16from anuga.utilities.numerical_tools import mean
17from anuga.shallow_water.data_manager import get_dataobject
18
19from Numeric import zeros, Float, allclose
20
21
22def elevation_function(x, y):
23    return -x
24
25class Test_inundation_damage(unittest.TestCase):
26    # Class variable
27    verbose = False
28
29    def set_verbose(self):
30        Test_Data_Manager.verbose = True
31       
32    def setUp(self):
33        #print "****set up****"
34        # Create an sww file
35       
36        # Set up an sww that has a geo ref.
37        # have it cover an area in Australia.  'gong maybe
38        #Don't have many triangles though!
39       
40        #Site Name:    GDA-MGA: (UTM with GRS80 ellipsoid)
41        #Zone:   56   
42        #Easting:  222908.705  Northing: 6233785.284
43        #Latitude:   -34  0 ' 0.00000 ''  Longitude: 150  0 ' 0.00000 ''
44        #Grid Convergence:  -1  40 ' 43.13 ''  Point Scale: 1.00054660
45
46        #geo-ref
47        #Zone:   56   
48        #Easting:  220000  Northing: 6230000
49
50
51        #have  a big area covered.
52        mesh_file = tempfile.mktemp(".tsh")
53        points_lat_long = [[-33,152],[-35,152],[-35,150],[-33,150]]
54        spat = Geospatial_data(data_points=points_lat_long,
55                               points_are_lats_longs=True)
56        points_ab = spat.get_data_points( absolute = True)
57       
58        geo =  Geo_reference(56,400000,6000000)
59        spat.set_geo_reference(geo)
60        m = Mesh()
61        m.add_vertices(spat)
62        m.auto_segment()
63        m.generate_mesh(verbose=False)
64        m.export_mesh_file(mesh_file)
65       
66        #Create shallow water domain
67        domain = Domain(mesh_file)
68
69        os.remove(mesh_file)
70       
71        domain.default_order=2
72        domain.beta_h = 0
73        #Set some field values
74        #domain.set_quantity('stage', 1.0)
75        domain.set_quantity('elevation', -0.5)
76        domain.set_quantity('friction', 0.03)
77
78        ######################
79        # Boundary conditions
80        B = Transmissive_boundary(domain)
81        domain.set_boundary( {'exterior': B})
82
83        ######################
84        #Initial condition - with jumps
85        bed = domain.quantities['elevation'].vertex_values
86        stage = zeros(bed.shape, Float)
87
88        h = 0.3
89        for i in range(stage.shape[0]):
90            if i % 2 == 0:
91                stage[i,:] = bed[i,:] + h
92            else:
93                stage[i,:] = bed[i,:]
94
95        domain.set_quantity('stage', stage)
96        domain.set_quantity('xmomentum', stage*22.0)
97        domain.set_quantity('ymomentum', stage*55.0)
98
99        domain.distribute_to_vertices_and_edges()
100
101        self.domain = domain
102
103        C = domain.get_vertex_coordinates()
104        self.X = C[:,0:6:2].copy()
105        self.Y = C[:,1:6:2].copy()
106
107        self.F = bed
108     
109        #sww_file = tempfile.mktemp("")
110        self.domain.set_name('tid_P0')
111        self.domain.format = 'sww'
112        self.domain.smooth = True
113        self.domain.reduction = mean
114
115        sww = get_dataobject(self.domain)
116        sww.store_connectivity()
117        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
118        self.domain.time = 2.
119        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
120        self.sww = sww # so it can be deleted
121       
122        #Create another sww file
123        mesh_file = tempfile.mktemp(".tsh")
124        points_lat_long = [[-35,152],[-36,152],[-36,150],[-35,150]]
125        spat = Geospatial_data(data_points=points_lat_long,
126                               points_are_lats_longs=True)
127        points_ab = spat.get_data_points( absolute = True)
128       
129        geo =  Geo_reference(56,400000,6000000)
130        spat.set_geo_reference(geo)
131        m = Mesh()
132        m.add_vertices(spat)
133        m.auto_segment()
134        m.generate_mesh(verbose=False)
135        m.export_mesh_file(mesh_file)
136       
137        #Create shallow water domain
138        domain = Domain(mesh_file)
139
140        os.remove(mesh_file)
141       
142        domain.default_order=2
143        domain.beta_h = 0
144        #Set some field values
145        #domain.set_quantity('stage', 1.0)
146        domain.set_quantity('elevation', -40)
147        domain.set_quantity('friction', 0.03)
148
149        ######################
150        # Boundary conditions
151        B = Transmissive_boundary(domain)
152        domain.set_boundary( {'exterior': B})
153
154        ######################
155        #Initial condition - with jumps
156        bed = domain.quantities['elevation'].vertex_values
157        stage = zeros(bed.shape, Float)
158
159        h = 30.
160        for i in range(stage.shape[0]):
161            if i % 2 == 0:
162                stage[i,:] = bed[i,:] + h
163            else:
164                stage[i,:] = bed[i,:]
165
166        domain.set_quantity('stage', stage)
167        domain.set_quantity('xmomentum', stage*22.0)
168        domain.set_quantity('ymomentum', stage*55.0)
169
170        domain.distribute_to_vertices_and_edges()
171
172        self.domain2 = domain
173
174        C = domain.get_vertex_coordinates()
175        self.X2 = C[:,0:6:2].copy()
176        self.Y2 = C[:,1:6:2].copy()
177
178        self.F2 = bed
179     
180        #sww_file = tempfile.mktemp("")
181        domain.set_name('tid_P1')
182        domain.format = 'sww'
183        domain.smooth = True
184        domain.reduction = mean
185
186        sww = get_dataobject(domain)
187        sww.store_connectivity()
188        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
189        domain.time = 2.
190        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
191        self.swwII = sww # so it can be deleted
192
193        # print "sww.filename", sww.filename
194        #Create a csv file
195        self.csv_file = tempfile.mktemp(".csv")
196        fd = open(self.csv_file,'wb')
197        writer = csv.writer(fd)
198        writer.writerow(['LONGITUDE','LATITUDE',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
199        writer.writerow(['151.5','-34','199770','130000','Metal','Timber',20.])
200        writer.writerow(['151','-34.5','150000','76000','Metal','Double Brick',200.])
201        writer.writerow(['151','-34.25','150000','76000','Metal','Brick Veneer',200.])
202        fd.close()
203
204        #Create a csv file
205        self.csv_fileII = tempfile.mktemp(".csv")
206        fd = open(self.csv_fileII,'wb')
207        writer = csv.writer(fd)
208        writer.writerow(['LONGITUDE','LATITUDE',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
209        writer.writerow(['151.5','-34','199770','130000','Metal','Timber',20.])
210        writer.writerow(['151','-34.5','150000','76000','Metal','Double Brick',200.])
211        writer.writerow(['151','-34.25','150000','76000','Metal','Brick Veneer',200.])
212        fd.close()
213       
214    def tearDown(self):
215        #print "***** tearDown  ********"
216
217        # FIXME (Ole): Sometimes this fails - is the file open or is it sometimes not created?
218        try:
219            # Sometimes this fails - don't know why.
220            # Seems to be that the file is not created, since after it
221            # fails there are no sww files in the anuga directory
222            os.remove(self.sww.filename)
223            os.remove(self.swwII.filename)
224        except OSError:
225            pass
226        os.remove(self.csv_file)
227        os.remove(self.csv_fileII)
228
229   
230    def test_inundation_damage(self):
231
232        # Note, this isn't testing the results,
233        # just that is all runs
234        sww_file = self.domain.get_name() + "." + self.domain.format
235        #print "sww_file",sww_file
236        inundation_damage(sww_file, self.csv_file, verbose=False)
237
238   
239    def test_inundation_damage_list_as_input(self):
240
241        # Note, this isn't testing the results,
242        # just that is all runs
243        sww_file = self.domain.get_name() + "." + self.domain.format
244        #print "sww_file",sww_file
245        inundation_damage(sww_file,
246                          [self.csv_file, self.csv_fileII], verbose=False)
247
248    def test_inundation_damage2(self):
249
250        # create mesh
251        mesh_file = tempfile.mktemp(".tsh")   
252        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
253        m = Mesh()
254        m.add_vertices(points)
255        m.auto_segment()
256        m.generate_mesh(verbose=False)
257        m.export_mesh_file(mesh_file)
258       
259        #Create shallow water domain
260        domain = Domain(mesh_file)
261        os.remove(mesh_file)
262       
263        domain.default_order=2
264        domain.beta_h = 0
265
266        #Set some field values
267        domain.set_quantity('elevation', elevation_function)
268        domain.set_quantity('friction', 0.03)
269        domain.set_quantity('xmomentum', 22.0)
270        domain.set_quantity('ymomentum', 55.0)
271
272        ######################
273        # Boundary conditions
274        B = Transmissive_boundary(domain)
275        domain.set_boundary( {'exterior': B})
276
277        # This call mangles the stage values.
278        domain.distribute_to_vertices_and_edges()
279        domain.set_quantity('stage', 0.3)
280
281        #sww_file = tempfile.mktemp("")
282        domain.set_name('datatest' + str(time.time()))
283        domain.format = 'sww'
284        domain.smooth = True
285        domain.reduction = mean
286
287        sww = get_dataobject(domain)
288        sww.store_connectivity()
289        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
290        domain.set_quantity('stage', -0.3)
291        domain.time = 2.
292        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
293       
294        #Create a csv file
295        csv_file = tempfile.mktemp(".csv")
296        fd = open(csv_file,'wb')
297        writer = csv.writer(fd)
298        writer.writerow(['x','y',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
299        writer.writerow([5.5,0.5,'10','130000','Metal','Timber',20])
300        writer.writerow([4.5,1.0,'150','76000','Metal','Double Brick',20])
301        writer.writerow([0.1,1.5,'100','76000','Metal','Brick Veneer',300])
302        writer.writerow([6.1,1.5,'100','76000','Metal','Brick Veneer',300])
303        fd.close()
304
305        sww_file = domain.get_name() + "." + domain.format
306        #print "sww_file",sww_file
307        inundation_damage(sww_file, csv_file, verbose=False)
308
309        csv_handle = Exposure_csv(csv_file)
310        struct_loss = csv_handle.get_column(EventDamageModel.STRUCT_LOSS_TITLE)
311        #print "struct_loss",struct_loss
312        struct_loss = [float(x) for x in struct_loss]
313        assert allclose(struct_loss,[10,150,16.9,0])
314        depth = csv_handle.get_column(EventDamageModel.MAX_DEPTH_TITLE)
315        #print "depth",depth
316        depth = [float(x) for x in depth]
317        assert allclose(depth,[5.5,4.5,0.1,-0.3])
318        os.remove(sww.filename)
319        os.remove(csv_file)
320         
321    def test_inundation_damage_list(self):
322
323        # create mesh
324        mesh_file = tempfile.mktemp(".tsh")   
325        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
326        m = Mesh()
327        m.add_vertices(points)
328        m.auto_segment()
329        m.generate_mesh(verbose=False)
330        m.export_mesh_file(mesh_file)
331       
332        #Create shallow water domain
333        domain = Domain(mesh_file)
334        os.remove(mesh_file)
335       
336        domain.default_order=2
337        domain.beta_h = 0
338
339        #Set some field values
340        domain.set_quantity('elevation', elevation_function)
341        domain.set_quantity('friction', 0.03)
342        domain.set_quantity('xmomentum', 22.0)
343        domain.set_quantity('ymomentum', 55.0)
344
345        ######################
346        # Boundary conditions
347        B = Transmissive_boundary(domain)
348        domain.set_boundary( {'exterior': B})
349
350        # This call mangles the stage values.
351        domain.distribute_to_vertices_and_edges()
352        domain.set_quantity('stage', 0.3)
353
354        #sww_file = tempfile.mktemp("")
355        domain.set_name('datatest' + str(time.time()))
356        domain.format = 'sww'
357        domain.smooth = True
358        domain.reduction = mean
359
360        sww = get_dataobject(domain)
361        sww.store_connectivity()
362        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
363        domain.set_quantity('stage', -0.3)
364        domain.time = 2.
365        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
366       
367        #Create a csv file
368        csv_file = tempfile.mktemp(".csv")
369        fd = open(csv_file,'wb')
370        writer = csv.writer(fd)
371        writer.writerow(['x','y',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
372        writer.writerow([5.5,0.5,'10','130000','Metal','Timber',20])
373        writer.writerow([4.5,1.0,'150','76000','Metal','Double Brick',20])
374        writer.writerow([0.1,1.5,'100','76000','Metal','Brick Veneer',300])
375        writer.writerow([6.1,1.5,'100','76000','Metal','Brick Veneer',300])
376        fd.close()
377       
378        extension = ".csv"
379        csv_fileII = tempfile.mktemp(extension)
380        fd = open(csv_fileII,'wb')
381        writer = csv.writer(fd)
382        writer.writerow(['x','y',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
383        writer.writerow([5.5,0.5,'10','130000','Metal','Timber',20])
384        writer.writerow([4.5,1.0,'150','76000','Metal','Double Brick',20])
385        writer.writerow([0.1,1.5,'100','76000','Metal','Brick Veneer',300])
386        writer.writerow([6.1,1.5,'100','76000','Metal','Brick Veneer',300])
387        fd.close()
388       
389        sww_file = domain.get_name() + "." + domain.format
390        #print "sww_file",sww_file
391        marker='_gosh'
392        inundation_damage(sww_file, [csv_file, csv_fileII],
393                          exposure_file_out_marker=marker,
394                          verbose=False)
395
396        # Test one file
397        csv_handle = Exposure_csv(csv_file[:-4]+marker+extension)
398        struct_loss = csv_handle.get_column(EventDamageModel.STRUCT_LOSS_TITLE)
399        #print "struct_loss",struct_loss
400        struct_loss = [float(x) for x in struct_loss]
401        assert allclose(struct_loss,[10,150,16.9,0])       
402        depth = csv_handle.get_column(EventDamageModel.MAX_DEPTH_TITLE)
403        #print "depth",depth
404        depth = [float(x) for x in depth]
405        assert allclose(depth,[5.5,4.5,0.1,-0.3])
406       
407        # Test another file
408        csv_handle = Exposure_csv(csv_fileII[:-4]+marker+extension)
409        struct_loss = csv_handle.get_column(EventDamageModel.STRUCT_LOSS_TITLE)
410        #print "struct_loss",struct_loss
411        struct_loss = [float(x) for x in struct_loss]
412        assert allclose(struct_loss,[10,150,16.9,0])       
413        depth = csv_handle.get_column(EventDamageModel.MAX_DEPTH_TITLE)
414        #print "depth",depth
415        depth = [float(x) for x in depth]
416        assert allclose(depth,[5.5,4.5,0.1,-0.3]) 
417        os.remove(sww.filename)
418        os.remove(csv_file)
419        os.remove(csv_fileII)
420       
421    def ztest_add_depth_and_momentum2csv(self):
422        sww_file = self.domain.get_name() + "." + self.domain.format
423        #print "sww_file",sww_file
424       
425        out_csv = tempfile.mktemp(".csv")
426        print "out_csv",out_csv
427        add_depth_and_momentum2csv(sww_file, self.csv_file,
428                                   out_csv, verbose=False)
429       
430    def test_calc_damage_percentages(self):
431        max_depths = [-0.3, 0.0, 1.0,-0.3, 0.0, 1.0,-0.3, 0.0, 1.0]
432        shore_distances = [100, 100, 100, 100, 100, 100, 100, 100, 100]
433        walls = ['Double Brick',
434                 'Double Brick',
435                 'Double Brick',
436                 'Timber',
437                 'Timber',
438                 'Timber',
439                 'Brick Veneer',
440                 'Brick Veneer',
441                 'Brick Veneer']
442        struct_costs = [10,
443                        10,
444                        10,
445                        10,
446                        10,
447                        10,
448                        1,
449                        1,
450                        1]
451        content_costs = [100,
452                        100,
453                        100,
454                        100,
455                        100,
456                        100,
457                        10,
458                        10,
459                        10]
460
461        edm = EventDamageModel(max_depths, shore_distances, walls,
462                               struct_costs, content_costs)
463        edm.calc_damage_percentages()
464        assert allclose(edm.struct_damage,[0.0,0.016,0.572,
465                                            0.0,0.016,0.618,
466                                            0.0,0.016,0.618])
467        assert allclose(edm.contents_damage,[0.0,0.013,0.970,
468                                             0.0,0.013,0.970,
469                                             0.0,0.013,0.970])
470        edm.calc_cost()
471        assert allclose(edm.struct_loss,[0.0,.16,5.72,
472                                            0.0,.16,6.18,
473                                            0.0,0.016,0.618])
474        assert allclose(edm.contents_loss,[0.0,1.3,97,
475                                             0.0,1.3,97,
476                                             0.0,0.13,9.7])
477       
478       
479    def test_calc_collapse_structures(self):
480        edm = EventDamageModel([0.0]*17, [0.0]*17, [0.0]*17,
481                               [0.0]*17, [0.0]*17)
482        edm.struct_damage = zeros(17,Float) 
483        edm.contents_damage = zeros(17,Float) 
484        collapse_probability = {0.4:[0], #0
485                                0.6:[1], #1
486                                0.5:[2], #1
487                                0.25:[3,4], #1
488                                0.1:[5,6,7,8], #0
489                                0.2:[9,10,11,12,13,14,15,16]} #2
490        edm._calc_collapse_structures(collapse_probability, verbose_csv=True)
491
492        self.failUnless( edm.struct_damage[0]  == 0.0 and
493                         edm.contents_damage[0]  == 0.0,
494                        'Error!')
495        self.failUnless( edm.struct_damage[1]  == 1.0 and
496                         edm.contents_damage[1]  == 1.0,
497                        'Error!')
498        self.failUnless( edm.struct_damage[2]  == 1.0 and
499                         edm.contents_damage[2]  == 1.0,
500                        'Error!')
501        self.failUnless( edm.struct_damage[3]+ edm.struct_damage[4] == 1.0 and
502                         edm.contents_damage[3] + edm.contents_damage[4] ==1.0,
503                        'Error!')
504        sum_struct = 0.0
505        sum_contents = 0.0
506        for i in [5,6,7,8]:
507            sum_struct += edm.struct_damage[i]
508            sum_contents += edm.contents_damage[i]
509        print "", 
510        self.failUnless( sum_struct == 0.0 and sum_contents  == 0.0,
511                        'Error!')
512        sum_struct = 0.0
513        sum_contents = 0.0
514        for i in [9,10,11,12,13,14,15,16]:
515            sum_struct += edm.struct_damage[i]
516            sum_contents += edm.contents_damage[i]
517        self.failUnless( sum_struct == 2.0 and sum_contents  == 2.0,
518                        'Error!')
519       
520    def test_calc_collapse_probability(self):
521        depth =          [0.0, 0.5, 0.5  , 1.5, 2.5, 4.5, 10000, 2.0]
522        shore_distance = [0.0, 125, 250.1, 0.0, 150, 225, 10000, 251]
523        dummy = depth
524        edm = EventDamageModel(depth, shore_distance, dummy, dummy, dummy)
525        struct_coll_prob = edm.calc_collapse_probability()
526        answer = {0.05:[1,7],
527                  0.6:[3],
528                  0.4:[4],
529                  0.5:[5],
530                  0.45:[6]}
531        #print "struct_coll_prob",struct_coll_prob
532        #print "answer",answer
533
534        self.failUnless( struct_coll_prob ==  answer,
535                        'Error!')
536       
537       
538    def test_calc_damage_and_costs(self):
539                             
540        max_depths = [-0.3, 0.0, 1.0,-0.3, 0.0, 1.0,-0.3, 0.0, 10.0]
541        shore_distances = [100, 100, 100, 100, 100, 100, 100, 100, 100]
542        walls = ['Double Brick',
543                 'Double Brick',
544                 'Double Brick',
545                 'Timber',
546                 'Timber',
547                 'Timber',
548                 'Brick Veneer',
549                 'Brick Veneer',
550                 'Brick Veneer']
551        struct_costs = [10,
552                        10,
553                        10,
554                        10,
555                        10,
556                        10,
557                        1,
558                        1,
559                        1]
560        content_costs = [100,
561                        100,
562                        100,
563                        100,
564                        100,
565                        100,
566                        10,
567                        10,
568                        10]
569
570        edm = EventDamageModel(max_depths, shore_distances, walls,
571                               struct_costs, content_costs)
572        results_dic = edm.calc_damage_and_costs(verbose_csv=True)
573        #print "results_dic",results_dic
574       
575    def test_calc_max_depth_and_momentum(self):
576        sww_file = "tid" # self.domain.get_name() + "." + self.domain.format
577        points_lat_long = [[-34, 151.5],[-35.5, 151.5],[-50, 151]]
578        spat = Geospatial_data(data_points=points_lat_long,
579                               points_are_lats_longs=True)
580        points_ab = spat.get_data_points( absolute = True)
581        deps, _ = calc_max_depth_and_momentum(sww_file,
582                                              points_ab,
583                                              verbose=self.verbose,
584                                              use_cache = False)
585
586        # Test values based on returned results, so not an excellent test
587       
588        assert allclose(deps[0],0.113204555211)
589        assert allclose(deps[1],11.3215)
590        assert allclose(deps[2],0.0) # this value is outside both sww files
591       
592#-------------------------------------------------------------
593if __name__ == "__main__":
594    if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V':
595        Test_inundation_damage.verbose=True
596        saveout = sys.stdout   
597        filename = ".temp_verbose"
598        fid = open(filename, 'w')
599        sys.stdout = fid
600    else:
601        pass
602    suite = unittest.makeSuite(Test_inundation_damage,'test')
603    #suite = unittest.makeSuite(Test_inundation_damage,'test_inundation_damage_list_as_input')
604    runner = unittest.TextTestRunner()
605    runner.run(suite)
606
607    # Cleaning up
608    if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V':
609        sys.stdout = saveout
610        fid.close() 
611        os.remove(filename)
Note: See TracBrowser for help on using the repository browser.