source: trunk/anuga_core/source/anuga/damage_modelling/test_inundation_damage.py @ 7778

Last change on this file since 7778 was 7778, checked in by hudson, 14 years ago

Cleaned up unit tests to use API.

File size: 21.7 KB
Line 
1#!/usr/bin/env python
2#
3
4import unittest
5import tempfile
6import os, sys
7import time
8import csv
9
10#from anuga.damage.inundation_damage import _calc_collapse_structures
11from inundation_damage import *
12from anuga.geospatial_data.geospatial_data import Geospatial_data
13from anuga.pmesh.mesh import Mesh
14from anuga.coordinate_transforms.geo_reference import Geo_reference
15from anuga.utilities.numerical_tools import mean
16from anuga.file.sww import SWW_file
17
18import numpy as num
19
20
21def elevation_function(x, y):
22    return -x
23
24class Test_inundation_damage(unittest.TestCase):
25    # Class variable
26    verbose = False
27
28    def set_verbose(self):
29        Test_Data_Manager.verbose = True
30       
31    def setUp(self):
32        #print "****set up****"
33        # Create an sww file
34       
35        # Set up an sww that has a geo ref.
36        # have it cover an area in Australia.  'gong maybe
37        #Don't have many triangles though!
38       
39        #Site Name:    GDA-MGA: (UTM with GRS80 ellipsoid)
40        #Zone:   56   
41        #Easting:  222908.705  Northing: 6233785.284
42        #Latitude:   -34  0 ' 0.00000 ''  Longitude: 150  0 ' 0.00000 ''
43        #Grid Convergence:  -1  40 ' 43.13 ''  Point Scale: 1.00054660
44
45        #geo-ref
46        #Zone:   56   
47        #Easting:  220000  Northing: 6230000
48
49
50        #have  a big area covered.
51        mesh_file = tempfile.mktemp(".tsh")
52        points_lat_long = [[-33,152],[-35,152],[-35,150],[-33,150]]
53        spat = Geospatial_data(data_points=points_lat_long,
54                               points_are_lats_longs=True)
55        points_ab = spat.get_data_points( absolute = True)
56       
57        geo =  Geo_reference(56,400000,6000000)
58        spat.set_geo_reference(geo)
59        m = Mesh()
60        m.add_vertices(spat)
61        m.auto_segment()
62        m.generate_mesh(verbose=False)
63        m.export_mesh_file(mesh_file)
64       
65        #Create shallow water domain
66        domain = Domain(mesh_file)
67
68        os.remove(mesh_file)
69       
70        domain.default_order=2
71        #Set some field values
72        #domain.set_quantity('stage', 1.0)
73        domain.set_quantity('elevation', -0.5)
74        domain.set_quantity('friction', 0.03)
75
76        ######################
77        # Boundary conditions
78        B = Transmissive_boundary(domain)
79        domain.set_boundary( {'exterior': B})
80
81        ######################
82        #Initial condition - with jumps
83        bed = domain.quantities['elevation'].vertex_values
84        stage = num.zeros(bed.shape, num.float)
85
86        h = 0.3
87        for i in range(stage.shape[0]):
88            if i % 2 == 0:
89                stage[i,:] = bed[i,:] + h
90            else:
91                stage[i,:] = bed[i,:]
92
93        domain.set_quantity('stage', stage)
94        domain.set_quantity('xmomentum', stage*22.0)
95        domain.set_quantity('ymomentum', stage*55.0)
96
97        domain.distribute_to_vertices_and_edges()
98
99        self.domain = domain
100
101        C = domain.get_vertex_coordinates()
102        self.X = C[:,0:6:2].copy()
103        self.Y = C[:,1:6:2].copy()
104
105        self.F = bed
106     
107        #sww_file = tempfile.mktemp("")
108        self.domain.set_name('tid_P0')
109        self.domain.format = 'sww'
110        self.domain.smooth = True
111        self.domain.reduction = mean
112
113        sww = SWW_file(self.domain)
114        sww.store_connectivity()
115        sww.store_timestep()
116        self.domain.time = 2.
117        sww.store_timestep()
118        self.sww = sww # so it can be deleted
119       
120        #Create another sww file
121        mesh_file = tempfile.mktemp(".tsh")
122        points_lat_long = [[-35,152],[-36,152],[-36,150],[-35,150]]
123        spat = Geospatial_data(data_points=points_lat_long,
124                               points_are_lats_longs=True)
125        points_ab = spat.get_data_points( absolute = True)
126       
127        geo =  Geo_reference(56,400000,6000000)
128        spat.set_geo_reference(geo)
129        m = Mesh()
130        m.add_vertices(spat)
131        m.auto_segment()
132        m.generate_mesh(verbose=False)
133        m.export_mesh_file(mesh_file)
134       
135        #Create shallow water domain
136        domain = Domain(mesh_file)
137
138        os.remove(mesh_file)
139       
140        domain.default_order=2
141        #Set some field values
142        #domain.set_quantity('stage', 1.0)
143        domain.set_quantity('elevation', -40)
144        domain.set_quantity('friction', 0.03)
145
146        ######################
147        # Boundary conditions
148        B = Transmissive_boundary(domain)
149        domain.set_boundary( {'exterior': B})
150
151        ######################
152        #Initial condition - with jumps
153        bed = domain.quantities['elevation'].vertex_values
154        stage = num.zeros(bed.shape, num.float)
155
156        h = 30.
157        for i in range(stage.shape[0]):
158            if i % 2 == 0:
159                stage[i,:] = bed[i,:] + h
160            else:
161                stage[i,:] = bed[i,:]
162
163        domain.set_quantity('stage', stage)
164        domain.set_quantity('xmomentum', stage*22.0)
165        domain.set_quantity('ymomentum', stage*55.0)
166
167        domain.distribute_to_vertices_and_edges()
168
169        self.domain2 = domain
170
171        C = domain.get_vertex_coordinates()
172        self.X2 = C[:,0:6:2].copy()
173        self.Y2 = C[:,1:6:2].copy()
174
175        self.F2 = bed
176     
177        #sww_file = tempfile.mktemp("")
178        domain.set_name('tid_P1')
179        domain.format = 'sww'
180        domain.smooth = True
181        domain.reduction = mean
182
183        sww = SWW_file(domain)
184        sww.store_connectivity()
185        sww.store_timestep()
186        domain.time = 2.
187        sww.store_timestep()
188        self.swwII = sww # so it can be deleted
189
190        # print "sww.filename", sww.filename
191        #Create a csv file
192        self.csv_file = tempfile.mktemp(".csv")
193        fd = open(self.csv_file,'wb')
194        writer = csv.writer(fd)
195        writer.writerow(['LONGITUDE','LATITUDE',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
196        writer.writerow(['151.5','-34','199770','130000','Metal','Timber',20.])
197        writer.writerow(['151','-34.5','150000','76000','Metal','Double Brick',200.])
198        writer.writerow(['151','-34.25','150000','76000','Metal','Brick Veneer',200.])
199        fd.close()
200
201        #Create a csv file
202        self.csv_fileII = tempfile.mktemp(".csv")
203        fd = open(self.csv_fileII,'wb')
204        writer = csv.writer(fd)
205        writer.writerow(['LONGITUDE','LATITUDE',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
206        writer.writerow(['151.5','-34','199770','130000','Metal','Timber',20.])
207        writer.writerow(['151','-34.5','150000','76000','Metal','Double Brick',200.])
208        writer.writerow(['151','-34.25','150000','76000','Metal','Brick Veneer',200.])
209        fd.close()
210       
211    def tearDown(self):
212        #print "***** tearDown  ********"
213
214        # FIXME (Ole): Sometimes this fails - is the file open or is it sometimes not created?
215        try:
216            # Sometimes this fails - don't know why.
217            # Seems to be that the file is not created, since after it
218            # fails there are no sww files in the anuga directory
219            os.remove(self.sww.filename)
220            os.remove(self.swwII.filename)
221        except OSError:
222            pass
223        os.remove(self.csv_file)
224        os.remove(self.csv_fileII)
225
226   
227    def test_inundation_damage(self):
228
229        # Note, this isn't testing the results,
230        # just that is all runs
231        sww_file = self.domain.get_name() + "." + self.domain.format
232        #print "sww_file",sww_file
233        inundation_damage(sww_file, self.csv_file, verbose=False)
234
235   
236    def test_inundation_damage_list_as_input(self):
237
238        # Note, this isn't testing the results,
239        # just that is all runs
240        sww_file = self.domain.get_name() + "." + self.domain.format
241        #print "sww_file",sww_file
242        inundation_damage(sww_file,
243                          [self.csv_file, self.csv_fileII], verbose=False)
244
245    def test_inundation_damage2(self):
246
247        # create mesh
248        mesh_file = tempfile.mktemp(".tsh")   
249        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
250        m = Mesh()
251        m.add_vertices(points)
252        m.auto_segment()
253        m.generate_mesh(verbose=False)
254        m.export_mesh_file(mesh_file)
255       
256        #Create shallow water domain
257        domain = Domain(mesh_file)
258        os.remove(mesh_file)
259       
260        domain.default_order=2
261
262        #Set some field values
263        domain.set_quantity('elevation', elevation_function)
264        domain.set_quantity('friction', 0.03)
265        domain.set_quantity('xmomentum', 22.0)
266        domain.set_quantity('ymomentum', 55.0)
267
268        ######################
269        # Boundary conditions
270        B = Transmissive_boundary(domain)
271        domain.set_boundary( {'exterior': B})
272
273        # This call mangles the stage values.
274        domain.distribute_to_vertices_and_edges()
275        domain.set_quantity('stage', 0.3)
276
277        #sww_file = tempfile.mktemp("")
278        domain.set_name('datatest' + str(time.time()))
279        domain.format = 'sww'
280        domain.smooth = True
281        domain.reduction = mean
282
283        sww = SWW_file(domain)
284        sww.store_connectivity()
285        sww.store_timestep()
286        domain.set_quantity('stage', -0.3)
287        domain.time = 2.
288        sww.store_timestep()
289       
290        #Create a csv file
291        csv_file = tempfile.mktemp(".csv")
292        fd = open(csv_file,'wb')
293        writer = csv.writer(fd)
294        writer.writerow(['x', 'y', STR_VALUE_LABEL, CONT_VALUE_LABEL, \
295        'ROOF_TYPE', WALL_TYPE_LABEL, SHORE_DIST_LABEL])
296        writer.writerow([5.5,0.5,'10','130000','Metal','Timber',20])
297        writer.writerow([4.5,1.0,'150','76000','Metal','Double Brick',20])
298        writer.writerow([0.1,1.5,'100','76000','Metal','Brick Veneer',300])
299        writer.writerow([6.1,1.5,'100','76000','Metal','Brick Veneer',300])
300        fd.close()
301
302        sww_file = domain.get_name() + "." + domain.format
303        #print "sww_file",sww_file
304        inundation_damage(sww_file, csv_file, verbose=False)
305
306        csv_handle = Exposure(csv_file)
307        struct_loss = csv_handle.get_column(EventDamageModel.STRUCT_LOSS_TITLE)
308        #print "struct_loss",struct_loss
309        struct_loss = [float(x) for x in struct_loss]
310        assert num.allclose(struct_loss,[10,150,16.9,0])
311        depth = csv_handle.get_column(EventDamageModel.MAX_DEPTH_TITLE)
312        #print "depth",depth
313        depth = [float(x) for x in depth]
314        assert num.allclose(depth,[5.5,4.5,0.1,-0.3])
315        os.remove(sww.filename)
316        os.remove(csv_file)
317         
318    def test_inundation_damage_list(self):
319
320        # create mesh
321        mesh_file = tempfile.mktemp(".tsh")   
322        points = [[0.0,0.0],[6.0,0.0],[6.0,6.0],[0.0,6.0]]
323        m = Mesh()
324        m.add_vertices(points)
325        m.auto_segment()
326        m.generate_mesh(verbose=False)
327        m.export_mesh_file(mesh_file)
328       
329        #Create shallow water domain
330        domain = Domain(mesh_file)
331        os.remove(mesh_file)
332       
333        domain.default_order=2
334
335        #Set some field values
336        domain.set_quantity('elevation', elevation_function)
337        domain.set_quantity('friction', 0.03)
338        domain.set_quantity('xmomentum', 22.0)
339        domain.set_quantity('ymomentum', 55.0)
340
341        ######################
342        # Boundary conditions
343        B = Transmissive_boundary(domain)
344        domain.set_boundary( {'exterior': B})
345
346        # This call mangles the stage values.
347        domain.distribute_to_vertices_and_edges()
348        domain.set_quantity('stage', 0.3)
349
350        #sww_file = tempfile.mktemp("")
351        domain.set_name('datatest' + str(time.time()))
352        domain.format = 'sww'
353        domain.smooth = True
354        domain.reduction = mean
355
356        sww = SWW_file(domain)
357        sww.store_connectivity()
358        sww.store_timestep()
359        domain.set_quantity('stage', -0.3)
360        domain.time = 2.
361        sww.store_timestep()
362       
363        #Create a csv file
364        csv_file = tempfile.mktemp(".csv")
365        fd = open(csv_file,'wb')
366        writer = csv.writer(fd)
367        writer.writerow(['x','y',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
368        writer.writerow([5.5,0.5,'10','130000','Metal','Timber',20])
369        writer.writerow([4.5,1.0,'150','76000','Metal','Double Brick',20])
370        writer.writerow([0.1,1.5,'100','76000','Metal','Brick Veneer',300])
371        writer.writerow([6.1,1.5,'100','76000','Metal','Brick Veneer',300])
372        fd.close()
373       
374        extension = ".csv"
375        csv_fileII = tempfile.mktemp(extension)
376        fd = open(csv_fileII,'wb')
377        writer = csv.writer(fd)
378        writer.writerow(['x','y',STR_VALUE_LABEL,CONT_VALUE_LABEL,'ROOF_TYPE',WALL_TYPE_LABEL, SHORE_DIST_LABEL])
379        writer.writerow([5.5,0.5,'10','130000','Metal','Timber',20])
380        writer.writerow([4.5,1.0,'150','76000','Metal','Double Brick',20])
381        writer.writerow([0.1,1.5,'100','76000','Metal','Brick Veneer',300])
382        writer.writerow([6.1,1.5,'100','76000','Metal','Brick Veneer',300])
383        fd.close()
384       
385        sww_file = domain.get_name() + "." + domain.format
386        #print "sww_file",sww_file
387        marker='_gosh'
388        inundation_damage(sww_file, [csv_file, csv_fileII],
389                          exposure_file_out_marker=marker,
390                          verbose=False)
391
392        # Test one file
393        csv_handle = Exposure(csv_file[:-4]+marker+extension)
394        struct_loss = csv_handle.get_column(EventDamageModel.STRUCT_LOSS_TITLE)
395        #print "struct_loss",struct_loss
396        struct_loss = [float(x) for x in struct_loss]
397        assert num.allclose(struct_loss,[10,150,16.9,0])       
398        depth = csv_handle.get_column(EventDamageModel.MAX_DEPTH_TITLE)
399        #print "depth",depth
400        depth = [float(x) for x in depth]
401        assert num.allclose(depth,[5.5,4.5,0.1,-0.3])
402       
403        # Test another file
404        csv_handle = Exposure(csv_fileII[:-4]+marker+extension)
405        struct_loss = csv_handle.get_column(EventDamageModel.STRUCT_LOSS_TITLE)
406        #print "struct_loss",struct_loss
407        struct_loss = [float(x) for x in struct_loss]
408        assert num.allclose(struct_loss,[10,150,16.9,0])       
409        depth = csv_handle.get_column(EventDamageModel.MAX_DEPTH_TITLE)
410        #print "depth",depth
411        depth = [float(x) for x in depth]
412        assert num.allclose(depth,[5.5,4.5,0.1,-0.3]) 
413        os.remove(sww.filename)
414        os.remove(csv_file)
415        os.remove(csv_fileII)
416       
417    def ztest_add_depth_and_momentum2csv(self):
418        sww_file = self.domain.get_name() + "." + self.domain.format
419        #print "sww_file",sww_file
420       
421        out_csv = tempfile.mktemp(".csv")
422        print "out_csv",out_csv
423        add_depth_and_momentum2csv(sww_file, self.csv_file,
424                                   out_csv, verbose=False)
425       
426    def test_calc_damage_percentages(self):
427        max_depths = [-0.3, 0.0, 1.0,-0.3, 0.0, 1.0,-0.3, 0.0, 1.0]
428        shore_distances = [100, 100, 100, 100, 100, 100, 100, 100, 100]
429        walls = ['Double Brick',
430                 'Double Brick',
431                 'Double Brick',
432                 'Timber',
433                 'Timber',
434                 'Timber',
435                 'Brick Veneer',
436                 'Brick Veneer',
437                 'Brick Veneer']
438        struct_costs = [10,
439                        10,
440                        10,
441                        10,
442                        10,
443                        10,
444                        1,
445                        1,
446                        1]
447        content_costs = [100,
448                        100,
449                        100,
450                        100,
451                        100,
452                        100,
453                        10,
454                        10,
455                        10]
456
457        edm = EventDamageModel(max_depths, shore_distances, walls,
458                               struct_costs, content_costs)
459        edm.calc_damage_percentages()
460        assert num.allclose(edm.struct_damage,[0.0,0.016,0.572,
461                                               0.0,0.016,0.618,
462                                               0.0,0.016,0.618])
463        assert num.allclose(edm.contents_damage,[0.0,0.013,0.970,
464                                                 0.0,0.013,0.970,
465                                                 0.0,0.013,0.970])
466        edm.calc_cost()
467        assert num.allclose(edm.struct_loss,[0.0,.16,5.72,
468                                             0.0,.16,6.18,
469                                             0.0,0.016,0.618])
470        assert num.allclose(edm.contents_loss,[0.0,1.3,97,
471                                               0.0,1.3,97,
472                                               0.0,0.13,9.7])
473       
474       
475    def test_calc_collapse_structures(self):
476        edm = EventDamageModel([0.0]*17, [0.0]*17, [0.0]*17,
477                               [0.0]*17, [0.0]*17)
478        edm.struct_damage = num.zeros(17,num.float) 
479        edm.contents_damage = num.zeros(17,num.float) 
480        collapse_probability = {0.4:[0], #0
481                                0.6:[1], #1
482                                0.5:[2], #1
483                                0.25:[3,4], #1
484                                0.1:[5,6,7,8], #0
485                                0.2:[9,10,11,12,13,14,15,16]} #2
486        edm._calc_collapse_structures(collapse_probability, verbose_csv=True)
487
488        self.failUnless( edm.struct_damage[0]  == 0.0 and
489                         edm.contents_damage[0]  == 0.0,
490                        'Error!')
491        self.failUnless( edm.struct_damage[1]  == 1.0 and
492                         edm.contents_damage[1]  == 1.0,
493                        'Error!')
494        self.failUnless( edm.struct_damage[2]  == 1.0 and
495                         edm.contents_damage[2]  == 1.0,
496                        'Error!')
497        self.failUnless( edm.struct_damage[3]+ edm.struct_damage[4] == 1.0 and
498                         edm.contents_damage[3] + edm.contents_damage[4] ==1.0,
499                        'Error!')
500        sum_struct = 0.0
501        sum_contents = 0.0
502        for i in [5,6,7,8]:
503            sum_struct += edm.struct_damage[i]
504            sum_contents += edm.contents_damage[i]
505        print "", 
506        self.failUnless( sum_struct == 0.0 and sum_contents  == 0.0,
507                        'Error!')
508        sum_struct = 0.0
509        sum_contents = 0.0
510        for i in [9,10,11,12,13,14,15,16]:
511            sum_struct += edm.struct_damage[i]
512            sum_contents += edm.contents_damage[i]
513        self.failUnless( sum_struct == 2.0 and sum_contents  == 2.0,
514                        'Error!')
515       
516    def test_calc_collapse_probability(self):
517        depth =          [0.0, 0.5, 0.5  , 1.5, 2.5, 4.5, 10000, 2.0]
518        shore_distance = [0.0, 125, 250.1, 0.0, 150, 225, 10000, 251]
519        dummy = depth
520        edm = EventDamageModel(depth, shore_distance, dummy, dummy, dummy)
521        struct_coll_prob = edm.calc_collapse_probability()
522        answer = {0.05:[1,7],
523                  0.6:[3],
524                  0.4:[4],
525                  0.5:[5],
526                  0.45:[6]}
527        #print "struct_coll_prob",struct_coll_prob
528        #print "answer",answer
529
530        self.failUnless( struct_coll_prob ==  answer,
531                        'Error!')
532       
533       
534    def test_calc_damage_and_costs(self):
535                             
536        max_depths = [-0.3, 0.0, 1.0,-0.3, 0.0, 1.0,-0.3, 0.0, 10.0]
537        shore_distances = [100, 100, 100, 100, 100, 100, 100, 100, 100]
538        walls = ['Double Brick',
539                 'Double Brick',
540                 'Double Brick',
541                 'Timber',
542                 'Timber',
543                 'Timber',
544                 'Brick Veneer',
545                 'Brick Veneer',
546                 'Brick Veneer']
547        struct_costs = [10,
548                        10,
549                        10,
550                        10,
551                        10,
552                        10,
553                        1,
554                        1,
555                        1]
556        content_costs = [100,
557                        100,
558                        100,
559                        100,
560                        100,
561                        100,
562                        10,
563                        10,
564                        10]
565
566        edm = EventDamageModel(max_depths, shore_distances, walls,
567                               struct_costs, content_costs)
568        results_dic = edm.calc_damage_and_costs(verbose_csv=True)
569        #print "results_dic",results_dic
570       
571    def test_calc_max_depth_and_momentum(self):
572        sww_file = "tid" # self.domain.get_name() + "." + self.domain.format
573        points_lat_long = [[-34, 151.5],[-35.5, 151.5],[-50, 151]]
574        spat = Geospatial_data(data_points=points_lat_long,
575                               points_are_lats_longs=True)
576        points_ab = spat.get_data_points( absolute = True)
577        deps, _ = calc_max_depth_and_momentum(sww_file,
578                                              points_ab,
579                                              verbose=self.verbose,
580                                              use_cache = False)
581
582        # Test values based on returned results, so not an excellent test
583       
584        assert num.allclose(deps[0],0.113204555211)
585        assert num.allclose(deps[1],11.3215)
586        assert num.allclose(deps[2],0.0) # this value is outside both sww files
587       
588#-------------------------------------------------------------
589if __name__ == "__main__":
590    if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V':
591        Test_inundation_damage.verbose=True
592        saveout = sys.stdout   
593        filename = ".temp_verbose"
594        fid = open(filename, 'w')
595        sys.stdout = fid
596    else:
597        pass
598    suite = unittest.makeSuite(Test_inundation_damage,'test')
599    runner = unittest.TextTestRunner()
600    runner.run(suite)
601
602    # Cleaning up
603    if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V':
604        sys.stdout = saveout
605        fid.close() 
606        os.remove(filename)
Note: See TracBrowser for help on using the repository browser.