source: anuga_core/source/anuga/damage_modelling/test_inundation_damage.py @ 7675

Last change on this file since 7675 was 7342, checked in by ole, 15 years ago

Refactored stofrage of quantities to use new concept of static and dynamic quantities.
This is in preparation for flexibly allowing quantities such as elevation or friction
to be time dependent.

All tests and the okushiri validation pass.

File size: 21.8 KB
Line 
1#!/usr/bin/env python
2#
3
4import unittest
5import tempfile
6import os, sys
7import time
8import csv
9
10#from anuga.damage.inundation_damage import _calc_collapse_structures
11from inundation_damage import *
12from anuga.geospatial_data.geospatial_data import Geospatial_data
13from anuga.pmesh.mesh import Mesh
14from anuga.coordinate_transforms.geo_reference import Geo_reference
15from anuga.shallow_water import Domain, Transmissive_boundary
16from anuga.utilities.numerical_tools import mean
17from anuga.shallow_water.data_manager import SWW_file
18
19import numpy as num
20
21
22def elevation_function(x, y):
23    return -x
24
25class Test_inundation_damage(unittest.TestCase):
26    # Class variable
27    verbose = False
28
29    def set_verbose(self):
30        Test_Data_Manager.verbose = True
31       
32    def setUp(self):
33        #print "****set up****"
34        # Create an sww file
35       
36        # Set up an sww that has a geo ref.
37        # have it cover an area in Australia.  'gong maybe
38        #Don't have many triangles though!
39       
40        #Site Name:    GDA-MGA: (UTM with GRS80 ellipsoid)
41        #Zone:   56   
42        #Easting:  222908.705  Northing: 6233785.284
43        #Latitude:   -34  0 ' 0.00000 ''  Longitude: 150  0 ' 0.00000 ''
44        #Grid Convergence:  -1  40 ' 43.13 ''  Point Scale: 1.00054660
45
46        #geo-ref
47        #Zone:   56   
48        #Easting:  220000  Northing: 6230000
49
50
51        #have  a big area covered.
52        mesh_file = tempfile.mktemp(".tsh")
53        points_lat_long = [[-33,152],[-35,152],[-35,150],[-33,150]]
54        spat = Geospatial_data(data_points=points_lat_long,
55                               points_are_lats_longs=True)
56        points_ab = spat.get_data_points( absolute = True)
57       
58        geo =  Geo_reference(56,400000,6000000)
59        spat.set_geo_reference(geo)
60        m = Mesh()
61        m.add_vertices(spat)
62        m.auto_segment()
63        m.generate_mesh(verbose=False)
64        m.export_mesh_file(mesh_file)
65       
66        #Create shallow water domain
67        domain = Domain(mesh_file)
68
69        os.remove(mesh_file)
70       
71        domain.default_order=2
72        #Set some field values
73        #domain.set_quantity('stage', 1.0)
74        domain.set_quantity('elevation', -0.5)
75        domain.set_quantity('friction', 0.03)
76
77        ######################
78        # Boundary conditions
79        B = Transmissive_boundary(domain)
80        domain.set_boundary( {'exterior': B})
81
82        ######################
83        #Initial condition - with jumps
84        bed = domain.quantities['elevation'].vertex_values
85        stage = num.zeros(bed.shape, num.float)
86
87        h = 0.3
88        for i in range(stage.shape[0]):
89            if i % 2 == 0:
90                stage[i,:] = bed[i,:] + h
91            else:
92                stage[i,:] = bed[i,:]
93
94        domain.set_quantity('stage', stage)
95        domain.set_quantity('xmomentum', stage*22.0)
96        domain.set_quantity('ymomentum', stage*55.0)
97
98        domain.distribute_to_vertices_and_edges()
99
100        self.domain = domain
101
102        C = domain.get_vertex_coordinates()
103        self.X = C[:,0:6:2].copy()
104        self.Y = C[:,1:6:2].copy()
105
106        self.F = bed
107     
108        #sww_file = tempfile.mktemp("")
109        self.domain.set_name('tid_P0')
110        self.domain.format = 'sww'
111        self.domain.smooth = True
112        self.domain.reduction = mean
113
114        sww = SWW_file(self.domain)
115        sww.store_connectivity()
116        sww.store_timestep()
117        self.domain.time = 2.
118        sww.store_timestep()
119        self.sww = sww # so it can be deleted
120       
121        #Create another sww file
122        mesh_file = tempfile.mktemp(".tsh")
123        points_lat_long = [[-35,152],[-36,152],[-36,150],[-35,150]]
124        spat = Geospatial_data(data_points=points_lat_long,
125                               points_are_lats_longs=True)
126        points_ab = spat.get_data_points( absolute = True)
127       
128        geo =  Geo_reference(56,400000,6000000)
129        spat.set_geo_reference(geo)
130        m = Mesh()
131        m.add_vertices(spat)
132        m.auto_segment()
133        m.generate_mesh(verbose=False)
134        m.export_mesh_file(mesh_file)
135       
136        #Create shallow water domain
137        domain = Domain(mesh_file)
138
139        os.remove(mesh_file)
140       
141        domain.default_order=2
142        #Set some field values
143        #domain.set_quantity('stage', 1.0)
144        domain.set_quantity('elevation', -40)
145        domain.set_quantity('friction', 0.03)
146
147        ######################
148        # Boundary conditions
149        B = Transmissive_boundary(domain)
150        domain.set_boundary( {'exterior': B})
151
152        ######################
153        #Initial condition - with jumps
154        bed = domain.quantities['elevation'].vertex_values
155        stage = num.zeros(bed.shape, num.float)
156
157        h = 30.
158        for i in range(stage.shape[0]):
159            if i % 2 == 0:
160                stage[i,:] = bed[i,:] + h
161            else:
162                stage[i,:] = bed[i,:]
163
164        domain.set_quantity('stage', stage)
165        domain.set_quantity('xmomentum', stage*22.0)
166        domain.set_quantity('ymomentum', stage*55.0)
167
168        domain.distribute_to_vertices_and_edges()
169
170        self.domain2 = domain
171
172        C = domain.get_vertex_coordinates()
173        self.X2 = C[:,0:6:2].copy()
174        self.Y2 = C[:,1:6:2].copy()
175
176        self.F2 = bed
177     
178        #sww_file = tempfile.mktemp("")
179        domain.set_name('tid_P1')
180        domain.format = 'sww'
181        domain.smooth = True
182        domain.reduction = mean
183
184        sww = SWW_file(domain)
185        sww.store_connectivity()
186        sww.store_timestep()
187        domain.time = 2.
188        sww.store_timestep()
189        self.swwII = sww # so it can be deleted
190
191        # print "sww.filename", sww.filename
192        #Create a csv file
193        self.csv_file = tempfile.mktemp(".csv")
194        fd = open(self.csv_file,'wb')
195        writer = csv.writer(fd)
196        writer.writerow(['LONGITUDE','LATITUDE',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
197        writer.writerow(['151.5','-34','199770','130000','Metal','Timber',20.])
198        writer.writerow(['151','-34.5','150000','76000','Metal','Double Brick',200.])
199        writer.writerow(['151','-34.25','150000','76000','Metal','Brick Veneer',200.])
200        fd.close()
201
202        #Create a csv file
203        self.csv_fileII = tempfile.mktemp(".csv")
204        fd = open(self.csv_fileII,'wb')
205        writer = csv.writer(fd)
206        writer.writerow(['LONGITUDE','LATITUDE',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
207        writer.writerow(['151.5','-34','199770','130000','Metal','Timber',20.])
208        writer.writerow(['151','-34.5','150000','76000','Metal','Double Brick',200.])
209        writer.writerow(['151','-34.25','150000','76000','Metal','Brick Veneer',200.])
210        fd.close()
211       
212    def tearDown(self):
213        #print "***** tearDown  ********"
214
215        # FIXME (Ole): Sometimes this fails - is the file open or is it sometimes not created?
216        try:
217            # Sometimes this fails - don't know why.
218            # Seems to be that the file is not created, since after it
219            # fails there are no sww files in the anuga directory
220            os.remove(self.sww.filename)
221            os.remove(self.swwII.filename)
222        except OSError:
223            pass
224        os.remove(self.csv_file)
225        os.remove(self.csv_fileII)
226
227   
228    def test_inundation_damage(self):
229
230        # Note, this isn't testing the results,
231        # just that is all runs
232        sww_file = self.domain.get_name() + "." + self.domain.format
233        #print "sww_file",sww_file
234        inundation_damage(sww_file, self.csv_file, verbose=False)
235
236   
237    def test_inundation_damage_list_as_input(self):
238
239        # Note, this isn't testing the results,
240        # just that is all runs
241        sww_file = self.domain.get_name() + "." + self.domain.format
242        #print "sww_file",sww_file
243        inundation_damage(sww_file,
244                          [self.csv_file, self.csv_fileII], verbose=False)
245
246    def test_inundation_damage2(self):
247
248        # create mesh
249        mesh_file = tempfile.mktemp(".tsh")   
250        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
251        m = Mesh()
252        m.add_vertices(points)
253        m.auto_segment()
254        m.generate_mesh(verbose=False)
255        m.export_mesh_file(mesh_file)
256       
257        #Create shallow water domain
258        domain = Domain(mesh_file)
259        os.remove(mesh_file)
260       
261        domain.default_order=2
262
263        #Set some field values
264        domain.set_quantity('elevation', elevation_function)
265        domain.set_quantity('friction', 0.03)
266        domain.set_quantity('xmomentum', 22.0)
267        domain.set_quantity('ymomentum', 55.0)
268
269        ######################
270        # Boundary conditions
271        B = Transmissive_boundary(domain)
272        domain.set_boundary( {'exterior': B})
273
274        # This call mangles the stage values.
275        domain.distribute_to_vertices_and_edges()
276        domain.set_quantity('stage', 0.3)
277
278        #sww_file = tempfile.mktemp("")
279        domain.set_name('datatest' + str(time.time()))
280        domain.format = 'sww'
281        domain.smooth = True
282        domain.reduction = mean
283
284        sww = SWW_file(domain)
285        sww.store_connectivity()
286        sww.store_timestep()
287        domain.set_quantity('stage', -0.3)
288        domain.time = 2.
289        sww.store_timestep()
290       
291        #Create a csv file
292        csv_file = tempfile.mktemp(".csv")
293        fd = open(csv_file,'wb')
294        writer = csv.writer(fd)
295        writer.writerow(['x','y',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
296        writer.writerow([5.5,0.5,'10','130000','Metal','Timber',20])
297        writer.writerow([4.5,1.0,'150','76000','Metal','Double Brick',20])
298        writer.writerow([0.1,1.5,'100','76000','Metal','Brick Veneer',300])
299        writer.writerow([6.1,1.5,'100','76000','Metal','Brick Veneer',300])
300        fd.close()
301
302        sww_file = domain.get_name() + "." + domain.format
303        #print "sww_file",sww_file
304        inundation_damage(sww_file, csv_file, verbose=False)
305
306        csv_handle = Exposure_csv(csv_file)
307        struct_loss = csv_handle.get_column(EventDamageModel.STRUCT_LOSS_TITLE)
308        #print "struct_loss",struct_loss
309        struct_loss = [float(x) for x in struct_loss]
310        assert num.allclose(struct_loss,[10,150,16.9,0])
311        depth = csv_handle.get_column(EventDamageModel.MAX_DEPTH_TITLE)
312        #print "depth",depth
313        depth = [float(x) for x in depth]
314        assert num.allclose(depth,[5.5,4.5,0.1,-0.3])
315        os.remove(sww.filename)
316        os.remove(csv_file)
317         
318    def test_inundation_damage_list(self):
319
320        # create mesh
321        mesh_file = tempfile.mktemp(".tsh")   
322        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
323        m = Mesh()
324        m.add_vertices(points)
325        m.auto_segment()
326        m.generate_mesh(verbose=False)
327        m.export_mesh_file(mesh_file)
328       
329        #Create shallow water domain
330        domain = Domain(mesh_file)
331        os.remove(mesh_file)
332       
333        domain.default_order=2
334
335        #Set some field values
336        domain.set_quantity('elevation', elevation_function)
337        domain.set_quantity('friction', 0.03)
338        domain.set_quantity('xmomentum', 22.0)
339        domain.set_quantity('ymomentum', 55.0)
340
341        ######################
342        # Boundary conditions
343        B = Transmissive_boundary(domain)
344        domain.set_boundary( {'exterior': B})
345
346        # This call mangles the stage values.
347        domain.distribute_to_vertices_and_edges()
348        domain.set_quantity('stage', 0.3)
349
350        #sww_file = tempfile.mktemp("")
351        domain.set_name('datatest' + str(time.time()))
352        domain.format = 'sww'
353        domain.smooth = True
354        domain.reduction = mean
355
356        sww = SWW_file(domain)
357        sww.store_connectivity()
358        sww.store_timestep()
359        domain.set_quantity('stage', -0.3)
360        domain.time = 2.
361        sww.store_timestep()
362       
363        #Create a csv file
364        csv_file = tempfile.mktemp(".csv")
365        fd = open(csv_file,'wb')
366        writer = csv.writer(fd)
367        writer.writerow(['x','y',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
368        writer.writerow([5.5,0.5,'10','130000','Metal','Timber',20])
369        writer.writerow([4.5,1.0,'150','76000','Metal','Double Brick',20])
370        writer.writerow([0.1,1.5,'100','76000','Metal','Brick Veneer',300])
371        writer.writerow([6.1,1.5,'100','76000','Metal','Brick Veneer',300])
372        fd.close()
373       
374        extension = ".csv"
375        csv_fileII = tempfile.mktemp(extension)
376        fd = open(csv_fileII,'wb')
377        writer = csv.writer(fd)
378        writer.writerow(['x','y',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
379        writer.writerow([5.5,0.5,'10','130000','Metal','Timber',20])
380        writer.writerow([4.5,1.0,'150','76000','Metal','Double Brick',20])
381        writer.writerow([0.1,1.5,'100','76000','Metal','Brick Veneer',300])
382        writer.writerow([6.1,1.5,'100','76000','Metal','Brick Veneer',300])
383        fd.close()
384       
385        sww_file = domain.get_name() + "." + domain.format
386        #print "sww_file",sww_file
387        marker='_gosh'
388        inundation_damage(sww_file, [csv_file, csv_fileII],
389                          exposure_file_out_marker=marker,
390                          verbose=False)
391
392        # Test one file
393        csv_handle = Exposure_csv(csv_file[:-4]+marker+extension)
394        struct_loss = csv_handle.get_column(EventDamageModel.STRUCT_LOSS_TITLE)
395        #print "struct_loss",struct_loss
396        struct_loss = [float(x) for x in struct_loss]
397        assert num.allclose(struct_loss,[10,150,16.9,0])       
398        depth = csv_handle.get_column(EventDamageModel.MAX_DEPTH_TITLE)
399        #print "depth",depth
400        depth = [float(x) for x in depth]
401        assert num.allclose(depth,[5.5,4.5,0.1,-0.3])
402       
403        # Test another file
404        csv_handle = Exposure_csv(csv_fileII[:-4]+marker+extension)
405        struct_loss = csv_handle.get_column(EventDamageModel.STRUCT_LOSS_TITLE)
406        #print "struct_loss",struct_loss
407        struct_loss = [float(x) for x in struct_loss]
408        assert num.allclose(struct_loss,[10,150,16.9,0])       
409        depth = csv_handle.get_column(EventDamageModel.MAX_DEPTH_TITLE)
410        #print "depth",depth
411        depth = [float(x) for x in depth]
412        assert num.allclose(depth,[5.5,4.5,0.1,-0.3]) 
413        os.remove(sww.filename)
414        os.remove(csv_file)
415        os.remove(csv_fileII)
416       
417    def ztest_add_depth_and_momentum2csv(self):
418        sww_file = self.domain.get_name() + "." + self.domain.format
419        #print "sww_file",sww_file
420       
421        out_csv = tempfile.mktemp(".csv")
422        print "out_csv",out_csv
423        add_depth_and_momentum2csv(sww_file, self.csv_file,
424                                   out_csv, verbose=False)
425       
426    def test_calc_damage_percentages(self):
427        max_depths = [-0.3, 0.0, 1.0,-0.3, 0.0, 1.0,-0.3, 0.0, 1.0]
428        shore_distances = [100, 100, 100, 100, 100, 100, 100, 100, 100]
429        walls = ['Double Brick',
430                 'Double Brick',
431                 'Double Brick',
432                 'Timber',
433                 'Timber',
434                 'Timber',
435                 'Brick Veneer',
436                 'Brick Veneer',
437                 'Brick Veneer']
438        struct_costs = [10,
439                        10,
440                        10,
441                        10,
442                        10,
443                        10,
444                        1,
445                        1,
446                        1]
447        content_costs = [100,
448                        100,
449                        100,
450                        100,
451                        100,
452                        100,
453                        10,
454                        10,
455                        10]
456
457        edm = EventDamageModel(max_depths, shore_distances, walls,
458                               struct_costs, content_costs)
459        edm.calc_damage_percentages()
460        assert num.allclose(edm.struct_damage,[0.0,0.016,0.572,
461                                               0.0,0.016,0.618,
462                                               0.0,0.016,0.618])
463        assert num.allclose(edm.contents_damage,[0.0,0.013,0.970,
464                                                 0.0,0.013,0.970,
465                                                 0.0,0.013,0.970])
466        edm.calc_cost()
467        assert num.allclose(edm.struct_loss,[0.0,.16,5.72,
468                                             0.0,.16,6.18,
469                                             0.0,0.016,0.618])
470        assert num.allclose(edm.contents_loss,[0.0,1.3,97,
471                                               0.0,1.3,97,
472                                               0.0,0.13,9.7])
473       
474       
475    def test_calc_collapse_structures(self):
476        edm = EventDamageModel([0.0]*17, [0.0]*17, [0.0]*17,
477                               [0.0]*17, [0.0]*17)
478        edm.struct_damage = num.zeros(17,num.float) 
479        edm.contents_damage = num.zeros(17,num.float) 
480        collapse_probability = {0.4:[0], #0
481                                0.6:[1], #1
482                                0.5:[2], #1
483                                0.25:[3,4], #1
484                                0.1:[5,6,7,8], #0
485                                0.2:[9,10,11,12,13,14,15,16]} #2
486        edm._calc_collapse_structures(collapse_probability, verbose_csv=True)
487
488        self.failUnless( edm.struct_damage[0]  == 0.0 and
489                         edm.contents_damage[0]  == 0.0,
490                        'Error!')
491        self.failUnless( edm.struct_damage[1]  == 1.0 and
492                         edm.contents_damage[1]  == 1.0,
493                        'Error!')
494        self.failUnless( edm.struct_damage[2]  == 1.0 and
495                         edm.contents_damage[2]  == 1.0,
496                        'Error!')
497        self.failUnless( edm.struct_damage[3]+ edm.struct_damage[4] == 1.0 and
498                         edm.contents_damage[3] + edm.contents_damage[4] ==1.0,
499                        'Error!')
500        sum_struct = 0.0
501        sum_contents = 0.0
502        for i in [5,6,7,8]:
503            sum_struct += edm.struct_damage[i]
504            sum_contents += edm.contents_damage[i]
505        print "", 
506        self.failUnless( sum_struct == 0.0 and sum_contents  == 0.0,
507                        'Error!')
508        sum_struct = 0.0
509        sum_contents = 0.0
510        for i in [9,10,11,12,13,14,15,16]:
511            sum_struct += edm.struct_damage[i]
512            sum_contents += edm.contents_damage[i]
513        self.failUnless( sum_struct == 2.0 and sum_contents  == 2.0,
514                        'Error!')
515       
516    def test_calc_collapse_probability(self):
517        depth =          [0.0, 0.5, 0.5  , 1.5, 2.5, 4.5, 10000, 2.0]
518        shore_distance = [0.0, 125, 250.1, 0.0, 150, 225, 10000, 251]
519        dummy = depth
520        edm = EventDamageModel(depth, shore_distance, dummy, dummy, dummy)
521        struct_coll_prob = edm.calc_collapse_probability()
522        answer = {0.05:[1,7],
523                  0.6:[3],
524                  0.4:[4],
525                  0.5:[5],
526                  0.45:[6]}
527        #print "struct_coll_prob",struct_coll_prob
528        #print "answer",answer
529
530        self.failUnless( struct_coll_prob ==  answer,
531                        'Error!')
532       
533       
534    def test_calc_damage_and_costs(self):
535                             
536        max_depths = [-0.3, 0.0, 1.0,-0.3, 0.0, 1.0,-0.3, 0.0, 10.0]
537        shore_distances = [100, 100, 100, 100, 100, 100, 100, 100, 100]
538        walls = ['Double Brick',
539                 'Double Brick',
540                 'Double Brick',
541                 'Timber',
542                 'Timber',
543                 'Timber',
544                 'Brick Veneer',
545                 'Brick Veneer',
546                 'Brick Veneer']
547        struct_costs = [10,
548                        10,
549                        10,
550                        10,
551                        10,
552                        10,
553                        1,
554                        1,
555                        1]
556        content_costs = [100,
557                        100,
558                        100,
559                        100,
560                        100,
561                        100,
562                        10,
563                        10,
564                        10]
565
566        edm = EventDamageModel(max_depths, shore_distances, walls,
567                               struct_costs, content_costs)
568        results_dic = edm.calc_damage_and_costs(verbose_csv=True)
569        #print "results_dic",results_dic
570       
571    def test_calc_max_depth_and_momentum(self):
572        sww_file = "tid" # self.domain.get_name() + "." + self.domain.format
573        points_lat_long = [[-34, 151.5],[-35.5, 151.5],[-50, 151]]
574        spat = Geospatial_data(data_points=points_lat_long,
575                               points_are_lats_longs=True)
576        points_ab = spat.get_data_points( absolute = True)
577        deps, _ = calc_max_depth_and_momentum(sww_file,
578                                              points_ab,
579                                              verbose=self.verbose,
580                                              use_cache = False)
581
582        # Test values based on returned results, so not an excellent test
583       
584        assert num.allclose(deps[0],0.113204555211)
585        assert num.allclose(deps[1],11.3215)
586        assert num.allclose(deps[2],0.0) # this value is outside both sww files
587       
588#-------------------------------------------------------------
589if __name__ == "__main__":
590    if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V':
591        Test_inundation_damage.verbose=True
592        saveout = sys.stdout   
593        filename = ".temp_verbose"
594        fid = open(filename, 'w')
595        sys.stdout = fid
596    else:
597        pass
598    suite = unittest.makeSuite(Test_inundation_damage,'test')
599    runner = unittest.TextTestRunner()
600    runner.run(suite)
601
602    # Cleaning up
603    if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V':
604        sys.stdout = saveout
605        fid.close() 
606        os.remove(filename)
Note: See TracBrowser for help on using the repository browser.