source: trunk/anuga_core/source/anuga/utilities/test_file_utils.py @ 7820

Last change on this file since 7820 was 7820, checked in by hudson, 14 years ago

Added unit test for sww_merge.

File size: 5.1 KB
RevLine 
[7756]1import unittest
2import tempfile
3import os
4import shutil
[7745]5
[7780]6from anuga.utilities.file_utils import copy_code_files, get_all_swwfiles
7from anuga.utilities.file_utils import del_dir
[7820]8from sww_merge import sww_merge
[7745]9
[7756]10
[7745]11class Test_FileUtils(unittest.TestCase):
12               
13    def test_copy_code_files(self):
14        '''test that the copy_code_files() function is sane.'''
15
16        def create_file(f):
17            fd = open(f, 'w')
18            fd.write('%s\n' % f)
19            fd.close()
20
21        # create working directories and test files
22        work_dir = tempfile.mkdtemp()
23        dst_dir = tempfile.mkdtemp(dir=work_dir)
24        src_dir = tempfile.mkdtemp(dir=work_dir)
25
26        f1 = 'file1'       
27        filename1 = os.path.join(src_dir, f1)
28        create_file(filename1)
29        f2 = 'file2'       
30        filename2 = os.path.join(src_dir, f2)
31        create_file(filename2)
32        f3 = 'file3'       
33        filename3 = os.path.join(src_dir, f3)
34        create_file(filename3)
35        f4 = 'file4'       
36        filename4 = os.path.join(src_dir, f4)
37        create_file(filename4)
38        f5 = 'file5'       
39        filename5 = os.path.join(src_dir, f5)
40        create_file(filename5)
41
42        # exercise the copy function
43        copy_code_files(dst_dir, filename1)
44        copy_code_files(dst_dir, filename1, filename2)
45        copy_code_files(dst_dir, (filename4, filename5, filename3))
46
47        # test that files were actually copied
[7756]48        self.failUnless(os.access(os.path.join(dst_dir, f1), os.F_OK))
49        self.failUnless(os.access(os.path.join(dst_dir, f2), os.F_OK))
50        self.failUnless(os.access(os.path.join(dst_dir, f3), os.F_OK))
51        self.failUnless(os.access(os.path.join(dst_dir, f4), os.F_OK))
52        self.failUnless(os.access(os.path.join(dst_dir, f5), os.F_OK))
[7745]53
54        # clean up
55        shutil.rmtree(work_dir)
[7780]56       
57    def test_get_all_swwfiles(self):
58        try:
59            swwfiles = get_all_swwfiles('','test.txt')  #Invalid
60        except IOError:
61            pass
62        else:
63            raise 'Should have raised exception' 
64       
65    def test_get_all_swwfiles1(self):
66       
67        temp_dir = tempfile.mkdtemp('','sww_test')
68        filename0 = tempfile.mktemp('.sww','test',temp_dir)
69        filename1 = tempfile.mktemp('.sww','test',temp_dir)
70        filename2 = tempfile.mktemp('.sww','test',temp_dir)
71        filename3 = tempfile.mktemp('.sww','test',temp_dir)
72       
73        #print'filename', filename0,filename1,filename2,filename3
74       
75        fid0 = open(filename0, 'w')
76        fid1 = open(filename1, 'w')
77        fid2 = open(filename2, 'w')
78        fid3 = open(filename3, 'w')
[7745]79
[7780]80        fid0.write('hello')
81        fid1.write('hello')
82        fid2.write('hello')
83        fid3.write('hello')
84       
85        fid0.close()
86        fid1.close()
87        fid2.close()
88        fid3.close()
89       
90       
91        dir, name0 = os.path.split(filename0)
92        #print 'dir',dir,name0
93       
94        iterate=get_all_swwfiles(dir,'test')
95       
96        del_dir(temp_dir)
97#        removeall(temp_dir)
98
99        _, name0 = os.path.split(filename0) 
100        #print'name0',name0[:-4],iterate[0]   
101        _, name1 = os.path.split(filename1)       
102        _, name2 = os.path.split(filename2)       
103        _, name3 = os.path.split(filename3)       
104
105        assert name0[:-4] in iterate
106        assert name1[:-4] in iterate
107        assert name2[:-4] in iterate
108        assert name3[:-4] in iterate
109       
110        assert len(iterate)==4           
111
[7820]112    def test_merge_swwfiles(self):
113        from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular, \
114                                                                    rectangular_cross
115        from anuga.shallow_water.shallow_water_domain import Domain
116        from anuga.file.sww import SWW_file
117        from anuga.abstract_2d_finite_volumes.generic_boundary_conditions import \
118            Dirichlet_boundary
119
120        Bd = Dirichlet_boundary([0.5, 0., 0.])
121
122        # Create shallow water domain
123        domain = Domain(*rectangular_cross(2, 2))
124        domain.set_name('test1')
125        domain.set_quantity('elevation', 2)
126        domain.set_quantity('stage', 5)
127        domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd})
128        for t in domain.evolve(yieldstep=0.5, finaltime=1):
129            pass
130           
131        domain = Domain(*rectangular(3, 3))
132        domain.set_name('test2')
133        domain.set_quantity('elevation', 3)
134        domain.set_quantity('stage', 50)
135        domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd})
136        for t in domain.evolve(yieldstep=0.5, finaltime=1):
137            pass
138               
139        outfile = 'test_out.sww'
140        sww_merge(['test1.sww', 'test2.sww'], outfile)
141        self.failUnless(os.access(outfile, os.F_OK)) 
142       
143        # remove temp files
144        os.remove('test1.sww')
145        os.remove('test2.sww')
146      #  os.remove(outfile)     
147       
148       
149
[7745]150#-------------------------------------------------------------
151
152if __name__ == "__main__":
[7820]153    suite = unittest.makeSuite(Test_FileUtils, 'test')
[7765]154    runner = unittest.TextTestRunner() #verbosity=2)
155    runner.run(suite)   
Note: See TracBrowser for help on using the repository browser.