source: anuga_core/source/anuga/damage_modelling/test_inundation_damage.py @ 5589

Last change on this file since 5589 was 5442, checked in by ole, 16 years ago

Retired h-limiter and beta_h as per ticket:194.
All unit tests and validation tests pass.

File size: 22.7 KB
Line 
1#!/usr/bin/env python
2#
3
4import unittest
5import tempfile
6import os, sys
7import time
8import csv
9
10#from anuga.damage.inundation_damage import _calc_collapse_structures
11from inundation_damage import *
12from anuga.geospatial_data.geospatial_data import Geospatial_data
13from anuga.pmesh.mesh import Mesh
14from anuga.coordinate_transforms.geo_reference import Geo_reference
15from anuga.shallow_water import Domain, Transmissive_boundary
16from anuga.utilities.numerical_tools import mean
17from anuga.shallow_water.data_manager import get_dataobject
18
19from Numeric import zeros, Float, allclose
20
21
22def elevation_function(x, y):
23    return -x
24
25class Test_inundation_damage(unittest.TestCase):
26    # Class variable
27    verbose = False
28
29    def set_verbose(self):
30        Test_Data_Manager.verbose = True
31       
32    def setUp(self):
33        #print "****set up****"
34        # Create an sww file
35       
36        # Set up an sww that has a geo ref.
37        # have it cover an area in Australia.  'gong maybe
38        #Don't have many triangles though!
39       
40        #Site Name:    GDA-MGA: (UTM with GRS80 ellipsoid)
41        #Zone:   56   
42        #Easting:  222908.705  Northing: 6233785.284
43        #Latitude:   -34  0 ' 0.00000 ''  Longitude: 150  0 ' 0.00000 ''
44        #Grid Convergence:  -1  40 ' 43.13 ''  Point Scale: 1.00054660
45
46        #geo-ref
47        #Zone:   56   
48        #Easting:  220000  Northing: 6230000
49
50
51        #have  a big area covered.
52        mesh_file = tempfile.mktemp(".tsh")
53        points_lat_long = [[-33,152],[-35,152],[-35,150],[-33,150]]
54        spat = Geospatial_data(data_points=points_lat_long,
55                               points_are_lats_longs=True)
56        points_ab = spat.get_data_points( absolute = True)
57       
58        geo =  Geo_reference(56,400000,6000000)
59        spat.set_geo_reference(geo)
60        m = Mesh()
61        m.add_vertices(spat)
62        m.auto_segment()
63        m.generate_mesh(verbose=False)
64        m.export_mesh_file(mesh_file)
65       
66        #Create shallow water domain
67        domain = Domain(mesh_file)
68
69        os.remove(mesh_file)
70       
71        domain.default_order=2
72        #Set some field values
73        #domain.set_quantity('stage', 1.0)
74        domain.set_quantity('elevation', -0.5)
75        domain.set_quantity('friction', 0.03)
76
77        ######################
78        # Boundary conditions
79        B = Transmissive_boundary(domain)
80        domain.set_boundary( {'exterior': B})
81
82        ######################
83        #Initial condition - with jumps
84        bed = domain.quantities['elevation'].vertex_values
85        stage = zeros(bed.shape, Float)
86
87        h = 0.3
88        for i in range(stage.shape[0]):
89            if i % 2 == 0:
90                stage[i,:] = bed[i,:] + h
91            else:
92                stage[i,:] = bed[i,:]
93
94        domain.set_quantity('stage', stage)
95        domain.set_quantity('xmomentum', stage*22.0)
96        domain.set_quantity('ymomentum', stage*55.0)
97
98        domain.distribute_to_vertices_and_edges()
99
100        self.domain = domain
101
102        C = domain.get_vertex_coordinates()
103        self.X = C[:,0:6:2].copy()
104        self.Y = C[:,1:6:2].copy()
105
106        self.F = bed
107     
108        #sww_file = tempfile.mktemp("")
109        self.domain.set_name('tid_P0')
110        self.domain.format = 'sww'
111        self.domain.smooth = True
112        self.domain.reduction = mean
113
114        sww = get_dataobject(self.domain)
115        sww.store_connectivity()
116        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
117        self.domain.time = 2.
118        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
119        self.sww = sww # so it can be deleted
120       
121        #Create another sww file
122        mesh_file = tempfile.mktemp(".tsh")
123        points_lat_long = [[-35,152],[-36,152],[-36,150],[-35,150]]
124        spat = Geospatial_data(data_points=points_lat_long,
125                               points_are_lats_longs=True)
126        points_ab = spat.get_data_points( absolute = True)
127       
128        geo =  Geo_reference(56,400000,6000000)
129        spat.set_geo_reference(geo)
130        m = Mesh()
131        m.add_vertices(spat)
132        m.auto_segment()
133        m.generate_mesh(verbose=False)
134        m.export_mesh_file(mesh_file)
135       
136        #Create shallow water domain
137        domain = Domain(mesh_file)
138
139        os.remove(mesh_file)
140       
141        domain.default_order=2
142        #Set some field values
143        #domain.set_quantity('stage', 1.0)
144        domain.set_quantity('elevation', -40)
145        domain.set_quantity('friction', 0.03)
146
147        ######################
148        # Boundary conditions
149        B = Transmissive_boundary(domain)
150        domain.set_boundary( {'exterior': B})
151
152        ######################
153        #Initial condition - with jumps
154        bed = domain.quantities['elevation'].vertex_values
155        stage = zeros(bed.shape, Float)
156
157        h = 30.
158        for i in range(stage.shape[0]):
159            if i % 2 == 0:
160                stage[i,:] = bed[i,:] + h
161            else:
162                stage[i,:] = bed[i,:]
163
164        domain.set_quantity('stage', stage)
165        domain.set_quantity('xmomentum', stage*22.0)
166        domain.set_quantity('ymomentum', stage*55.0)
167
168        domain.distribute_to_vertices_and_edges()
169
170        self.domain2 = domain
171
172        C = domain.get_vertex_coordinates()
173        self.X2 = C[:,0:6:2].copy()
174        self.Y2 = C[:,1:6:2].copy()
175
176        self.F2 = bed
177     
178        #sww_file = tempfile.mktemp("")
179        domain.set_name('tid_P1')
180        domain.format = 'sww'
181        domain.smooth = True
182        domain.reduction = mean
183
184        sww = get_dataobject(domain)
185        sww.store_connectivity()
186        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
187        domain.time = 2.
188        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
189        self.swwII = sww # so it can be deleted
190
191        # print "sww.filename", sww.filename
192        #Create a csv file
193        self.csv_file = tempfile.mktemp(".csv")
194        fd = open(self.csv_file,'wb')
195        writer = csv.writer(fd)
196        writer.writerow(['LONGITUDE','LATITUDE',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
197        writer.writerow(['151.5','-34','199770','130000','Metal','Timber',20.])
198        writer.writerow(['151','-34.5','150000','76000','Metal','Double Brick',200.])
199        writer.writerow(['151','-34.25','150000','76000','Metal','Brick Veneer',200.])
200        fd.close()
201
202        #Create a csv file
203        self.csv_fileII = tempfile.mktemp(".csv")
204        fd = open(self.csv_fileII,'wb')
205        writer = csv.writer(fd)
206        writer.writerow(['LONGITUDE','LATITUDE',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
207        writer.writerow(['151.5','-34','199770','130000','Metal','Timber',20.])
208        writer.writerow(['151','-34.5','150000','76000','Metal','Double Brick',200.])
209        writer.writerow(['151','-34.25','150000','76000','Metal','Brick Veneer',200.])
210        fd.close()
211       
212    def tearDown(self):
213        #print "***** tearDown  ********"
214
215        # FIXME (Ole): Sometimes this fails - is the file open or is it sometimes not created?
216        try:
217            # Sometimes this fails - don't know why.
218            # Seems to be that the file is not created, since after it
219            # fails there are no sww files in the anuga directory
220            os.remove(self.sww.filename)
221            os.remove(self.swwII.filename)
222        except OSError:
223            pass
224        os.remove(self.csv_file)
225        os.remove(self.csv_fileII)
226
227   
228    def test_inundation_damage(self):
229
230        # Note, this isn't testing the results,
231        # just that is all runs
232        sww_file = self.domain.get_name() + "." + self.domain.format
233        #print "sww_file",sww_file
234        inundation_damage(sww_file, self.csv_file, verbose=False)
235
236   
237    def test_inundation_damage_list_as_input(self):
238
239        # Note, this isn't testing the results,
240        # just that is all runs
241        sww_file = self.domain.get_name() + "." + self.domain.format
242        #print "sww_file",sww_file
243        inundation_damage(sww_file,
244                          [self.csv_file, self.csv_fileII], verbose=False)
245
246    def test_inundation_damage2(self):
247
248        # create mesh
249        mesh_file = tempfile.mktemp(".tsh")   
250        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
251        m = Mesh()
252        m.add_vertices(points)
253        m.auto_segment()
254        m.generate_mesh(verbose=False)
255        m.export_mesh_file(mesh_file)
256       
257        #Create shallow water domain
258        domain = Domain(mesh_file)
259        os.remove(mesh_file)
260       
261        domain.default_order=2
262
263        #Set some field values
264        domain.set_quantity('elevation', elevation_function)
265        domain.set_quantity('friction', 0.03)
266        domain.set_quantity('xmomentum', 22.0)
267        domain.set_quantity('ymomentum', 55.0)
268
269        ######################
270        # Boundary conditions
271        B = Transmissive_boundary(domain)
272        domain.set_boundary( {'exterior': B})
273
274        # This call mangles the stage values.
275        domain.distribute_to_vertices_and_edges()
276        domain.set_quantity('stage', 0.3)
277
278        #sww_file = tempfile.mktemp("")
279        domain.set_name('datatest' + str(time.time()))
280        domain.format = 'sww'
281        domain.smooth = True
282        domain.reduction = mean
283
284        sww = get_dataobject(domain)
285        sww.store_connectivity()
286        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
287        domain.set_quantity('stage', -0.3)
288        domain.time = 2.
289        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
290       
291        #Create a csv file
292        csv_file = tempfile.mktemp(".csv")
293        fd = open(csv_file,'wb')
294        writer = csv.writer(fd)
295        writer.writerow(['x','y',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
296        writer.writerow([5.5,0.5,'10','130000','Metal','Timber',20])
297        writer.writerow([4.5,1.0,'150','76000','Metal','Double Brick',20])
298        writer.writerow([0.1,1.5,'100','76000','Metal','Brick Veneer',300])
299        writer.writerow([6.1,1.5,'100','76000','Metal','Brick Veneer',300])
300        fd.close()
301
302        sww_file = domain.get_name() + "." + domain.format
303        #print "sww_file",sww_file
304        inundation_damage(sww_file, csv_file, verbose=False)
305
306        csv_handle = Exposure_csv(csv_file)
307        struct_loss = csv_handle.get_column(EventDamageModel.STRUCT_LOSS_TITLE)
308        #print "struct_loss",struct_loss
309        struct_loss = [float(x) for x in struct_loss]
310        assert allclose(struct_loss,[10,150,16.9,0])
311        depth = csv_handle.get_column(EventDamageModel.MAX_DEPTH_TITLE)
312        #print "depth",depth
313        depth = [float(x) for x in depth]
314        assert allclose(depth,[5.5,4.5,0.1,-0.3])
315        os.remove(sww.filename)
316        os.remove(csv_file)
317         
318    def test_inundation_damage_list(self):
319
320        # create mesh
321        mesh_file = tempfile.mktemp(".tsh")   
322        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
323        m = Mesh()
324        m.add_vertices(points)
325        m.auto_segment()
326        m.generate_mesh(verbose=False)
327        m.export_mesh_file(mesh_file)
328       
329        #Create shallow water domain
330        domain = Domain(mesh_file)
331        os.remove(mesh_file)
332       
333        domain.default_order=2
334
335        #Set some field values
336        domain.set_quantity('elevation', elevation_function)
337        domain.set_quantity('friction', 0.03)
338        domain.set_quantity('xmomentum', 22.0)
339        domain.set_quantity('ymomentum', 55.0)
340
341        ######################
342        # Boundary conditions
343        B = Transmissive_boundary(domain)
344        domain.set_boundary( {'exterior': B})
345
346        # This call mangles the stage values.
347        domain.distribute_to_vertices_and_edges()
348        domain.set_quantity('stage', 0.3)
349
350        #sww_file = tempfile.mktemp("")
351        domain.set_name('datatest' + str(time.time()))
352        domain.format = 'sww'
353        domain.smooth = True
354        domain.reduction = mean
355
356        sww = get_dataobject(domain)
357        sww.store_connectivity()
358        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
359        domain.set_quantity('stage', -0.3)
360        domain.time = 2.
361        sww.store_timestep(['stage', 'xmomentum', 'ymomentum'])
362       
363        #Create a csv file
364        csv_file = tempfile.mktemp(".csv")
365        fd = open(csv_file,'wb')
366        writer = csv.writer(fd)
367        writer.writerow(['x','y',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
368        writer.writerow([5.5,0.5,'10','130000','Metal','Timber',20])
369        writer.writerow([4.5,1.0,'150','76000','Metal','Double Brick',20])
370        writer.writerow([0.1,1.5,'100','76000','Metal','Brick Veneer',300])
371        writer.writerow([6.1,1.5,'100','76000','Metal','Brick Veneer',300])
372        fd.close()
373       
374        extension = ".csv"
375        csv_fileII = tempfile.mktemp(extension)
376        fd = open(csv_fileII,'wb')
377        writer = csv.writer(fd)
378        writer.writerow(['x','y',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
379        writer.writerow([5.5,0.5,'10','130000','Metal','Timber',20])
380        writer.writerow([4.5,1.0,'150','76000','Metal','Double Brick',20])
381        writer.writerow([0.1,1.5,'100','76000','Metal','Brick Veneer',300])
382        writer.writerow([6.1,1.5,'100','76000','Metal','Brick Veneer',300])
383        fd.close()
384       
385        sww_file = domain.get_name() + "." + domain.format
386        #print "sww_file",sww_file
387        marker='_gosh'
388        inundation_damage(sww_file, [csv_file, csv_fileII],
389                          exposure_file_out_marker=marker,
390                          verbose=False)
391
392        # Test one file
393        csv_handle = Exposure_csv(csv_file[:-4]+marker+extension)
394        struct_loss = csv_handle.get_column(EventDamageModel.STRUCT_LOSS_TITLE)
395        #print "struct_loss",struct_loss
396        struct_loss = [float(x) for x in struct_loss]
397        assert allclose(struct_loss,[10,150,16.9,0])       
398        depth = csv_handle.get_column(EventDamageModel.MAX_DEPTH_TITLE)
399        #print "depth",depth
400        depth = [float(x) for x in depth]
401        assert allclose(depth,[5.5,4.5,0.1,-0.3])
402       
403        # Test another file
404        csv_handle = Exposure_csv(csv_fileII[:-4]+marker+extension)
405        struct_loss = csv_handle.get_column(EventDamageModel.STRUCT_LOSS_TITLE)
406        #print "struct_loss",struct_loss
407        struct_loss = [float(x) for x in struct_loss]
408        assert allclose(struct_loss,[10,150,16.9,0])       
409        depth = csv_handle.get_column(EventDamageModel.MAX_DEPTH_TITLE)
410        #print "depth",depth
411        depth = [float(x) for x in depth]
412        assert allclose(depth,[5.5,4.5,0.1,-0.3]) 
413        os.remove(sww.filename)
414        os.remove(csv_file)
415        os.remove(csv_fileII)
416       
417    def ztest_add_depth_and_momentum2csv(self):
418        sww_file = self.domain.get_name() + "." + self.domain.format
419        #print "sww_file",sww_file
420       
421        out_csv = tempfile.mktemp(".csv")
422        print "out_csv",out_csv
423        add_depth_and_momentum2csv(sww_file, self.csv_file,
424                                   out_csv, verbose=False)
425       
426    def test_calc_damage_percentages(self):
427        max_depths = [-0.3, 0.0, 1.0,-0.3, 0.0, 1.0,-0.3, 0.0, 1.0]
428        shore_distances = [100, 100, 100, 100, 100, 100, 100, 100, 100]
429        walls = ['Double Brick',
430                 'Double Brick',
431                 'Double Brick',
432                 'Timber',
433                 'Timber',
434                 'Timber',
435                 'Brick Veneer',
436                 'Brick Veneer',
437                 'Brick Veneer']
438        struct_costs = [10,
439                        10,
440                        10,
441                        10,
442                        10,
443                        10,
444                        1,
445                        1,
446                        1]
447        content_costs = [100,
448                        100,
449                        100,
450                        100,
451                        100,
452                        100,
453                        10,
454                        10,
455                        10]
456
457        edm = EventDamageModel(max_depths, shore_distances, walls,
458                               struct_costs, content_costs)
459        edm.calc_damage_percentages()
460        assert allclose(edm.struct_damage,[0.0,0.016,0.572,
461                                            0.0,0.016,0.618,
462                                            0.0,0.016,0.618])
463        assert allclose(edm.contents_damage,[0.0,0.013,0.970,
464                                             0.0,0.013,0.970,
465                                             0.0,0.013,0.970])
466        edm.calc_cost()
467        assert allclose(edm.struct_loss,[0.0,.16,5.72,
468                                            0.0,.16,6.18,
469                                            0.0,0.016,0.618])
470        assert allclose(edm.contents_loss,[0.0,1.3,97,
471                                             0.0,1.3,97,
472                                             0.0,0.13,9.7])
473       
474       
475    def test_calc_collapse_structures(self):
476        edm = EventDamageModel([0.0]*17, [0.0]*17, [0.0]*17,
477                               [0.0]*17, [0.0]*17)
478        edm.struct_damage = zeros(17,Float) 
479        edm.contents_damage = zeros(17,Float) 
480        collapse_probability = {0.4:[0], #0
481                                0.6:[1], #1
482                                0.5:[2], #1
483                                0.25:[3,4], #1
484                                0.1:[5,6,7,8], #0
485                                0.2:[9,10,11,12,13,14,15,16]} #2
486        edm._calc_collapse_structures(collapse_probability, verbose_csv=True)
487
488        self.failUnless( edm.struct_damage[0]  == 0.0 and
489                         edm.contents_damage[0]  == 0.0,
490                        'Error!')
491        self.failUnless( edm.struct_damage[1]  == 1.0 and
492                         edm.contents_damage[1]  == 1.0,
493                        'Error!')
494        self.failUnless( edm.struct_damage[2]  == 1.0 and
495                         edm.contents_damage[2]  == 1.0,
496                        'Error!')
497        self.failUnless( edm.struct_damage[3]+ edm.struct_damage[4] == 1.0 and
498                         edm.contents_damage[3] + edm.contents_damage[4] ==1.0,
499                        'Error!')
500        sum_struct = 0.0
501        sum_contents = 0.0
502        for i in [5,6,7,8]:
503            sum_struct += edm.struct_damage[i]
504            sum_contents += edm.contents_damage[i]
505        print "", 
506        self.failUnless( sum_struct == 0.0 and sum_contents  == 0.0,
507                        'Error!')
508        sum_struct = 0.0
509        sum_contents = 0.0
510        for i in [9,10,11,12,13,14,15,16]:
511            sum_struct += edm.struct_damage[i]
512            sum_contents += edm.contents_damage[i]
513        self.failUnless( sum_struct == 2.0 and sum_contents  == 2.0,
514                        'Error!')
515       
516    def test_calc_collapse_probability(self):
517        depth =          [0.0, 0.5, 0.5  , 1.5, 2.5, 4.5, 10000, 2.0]
518        shore_distance = [0.0, 125, 250.1, 0.0, 150, 225, 10000, 251]
519        dummy = depth
520        edm = EventDamageModel(depth, shore_distance, dummy, dummy, dummy)
521        struct_coll_prob = edm.calc_collapse_probability()
522        answer = {0.05:[1,7],
523                  0.6:[3],
524                  0.4:[4],
525                  0.5:[5],
526                  0.45:[6]}
527        #print "struct_coll_prob",struct_coll_prob
528        #print "answer",answer
529
530        self.failUnless( struct_coll_prob ==  answer,
531                        'Error!')
532       
533       
534    def test_calc_damage_and_costs(self):
535                             
536        max_depths = [-0.3, 0.0, 1.0,-0.3, 0.0, 1.0,-0.3, 0.0, 10.0]
537        shore_distances = [100, 100, 100, 100, 100, 100, 100, 100, 100]
538        walls = ['Double Brick',
539                 'Double Brick',
540                 'Double Brick',
541                 'Timber',
542                 'Timber',
543                 'Timber',
544                 'Brick Veneer',
545                 'Brick Veneer',
546                 'Brick Veneer']
547        struct_costs = [10,
548                        10,
549                        10,
550                        10,
551                        10,
552                        10,
553                        1,
554                        1,
555                        1]
556        content_costs = [100,
557                        100,
558                        100,
559                        100,
560                        100,
561                        100,
562                        10,
563                        10,
564                        10]
565
566        edm = EventDamageModel(max_depths, shore_distances, walls,
567                               struct_costs, content_costs)
568        results_dic = edm.calc_damage_and_costs(verbose_csv=True)
569        #print "results_dic",results_dic
570       
571    def test_calc_max_depth_and_momentum(self):
572        sww_file = "tid" # self.domain.get_name() + "." + self.domain.format
573        points_lat_long = [[-34, 151.5],[-35.5, 151.5],[-50, 151]]
574        spat = Geospatial_data(data_points=points_lat_long,
575                               points_are_lats_longs=True)
576        points_ab = spat.get_data_points( absolute = True)
577        deps, _ = calc_max_depth_and_momentum(sww_file,
578                                              points_ab,
579                                              verbose=self.verbose,
580                                              use_cache = False)
581
582        # Test values based on returned results, so not an excellent test
583       
584        assert allclose(deps[0],0.113204555211)
585        assert allclose(deps[1],11.3215)
586        assert allclose(deps[2],0.0) # this value is outside both sww files
587       
588#-------------------------------------------------------------
589if __name__ == "__main__":
590    if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V':
591        Test_inundation_damage.verbose=True
592        saveout = sys.stdout   
593        filename = ".temp_verbose"
594        fid = open(filename, 'w')
595        sys.stdout = fid
596    else:
597        pass
598    suite = unittest.makeSuite(Test_inundation_damage,'test')
599    #suite = unittest.makeSuite(Test_inundation_damage,'test_inundation_damage_list_as_input')
600    runner = unittest.TextTestRunner()
601    runner.run(suite)
602
603    # Cleaning up
604    if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V':
605        sys.stdout = saveout
606        fid.close() 
607        os.remove(filename)
Note: See TracBrowser for help on using the repository browser.