1 | import unittest |
---|
2 | import tempfile |
---|
3 | import os |
---|
4 | import shutil |
---|
5 | |
---|
6 | from anuga.utilities.file_utils import copy_code_files, get_all_swwfiles |
---|
7 | from anuga.utilities.file_utils import del_dir |
---|
8 | from sww_merge import sww_merge |
---|
9 | |
---|
10 | |
---|
11 | class Test_FileUtils(unittest.TestCase): |
---|
12 | |
---|
13 | def test_copy_code_files(self): |
---|
14 | '''test that the copy_code_files() function is sane.''' |
---|
15 | |
---|
16 | def create_file(f): |
---|
17 | fd = open(f, 'w') |
---|
18 | fd.write('%s\n' % f) |
---|
19 | fd.close() |
---|
20 | |
---|
21 | # create working directories and test files |
---|
22 | work_dir = tempfile.mkdtemp() |
---|
23 | dst_dir = tempfile.mkdtemp(dir=work_dir) |
---|
24 | src_dir = tempfile.mkdtemp(dir=work_dir) |
---|
25 | |
---|
26 | f1 = 'file1' |
---|
27 | filename1 = os.path.join(src_dir, f1) |
---|
28 | create_file(filename1) |
---|
29 | f2 = 'file2' |
---|
30 | filename2 = os.path.join(src_dir, f2) |
---|
31 | create_file(filename2) |
---|
32 | f3 = 'file3' |
---|
33 | filename3 = os.path.join(src_dir, f3) |
---|
34 | create_file(filename3) |
---|
35 | f4 = 'file4' |
---|
36 | filename4 = os.path.join(src_dir, f4) |
---|
37 | create_file(filename4) |
---|
38 | f5 = 'file5' |
---|
39 | filename5 = os.path.join(src_dir, f5) |
---|
40 | create_file(filename5) |
---|
41 | |
---|
42 | # exercise the copy function |
---|
43 | copy_code_files(dst_dir, filename1) |
---|
44 | copy_code_files(dst_dir, filename1, filename2) |
---|
45 | copy_code_files(dst_dir, (filename4, filename5, filename3)) |
---|
46 | |
---|
47 | # test that files were actually copied |
---|
48 | self.failUnless(os.access(os.path.join(dst_dir, f1), os.F_OK)) |
---|
49 | self.failUnless(os.access(os.path.join(dst_dir, f2), os.F_OK)) |
---|
50 | self.failUnless(os.access(os.path.join(dst_dir, f3), os.F_OK)) |
---|
51 | self.failUnless(os.access(os.path.join(dst_dir, f4), os.F_OK)) |
---|
52 | self.failUnless(os.access(os.path.join(dst_dir, f5), os.F_OK)) |
---|
53 | |
---|
54 | # clean up |
---|
55 | shutil.rmtree(work_dir) |
---|
56 | |
---|
57 | def test_get_all_swwfiles(self): |
---|
58 | try: |
---|
59 | swwfiles = get_all_swwfiles('','test.txt') #Invalid |
---|
60 | except IOError: |
---|
61 | pass |
---|
62 | else: |
---|
63 | raise 'Should have raised exception' |
---|
64 | |
---|
65 | def test_get_all_swwfiles1(self): |
---|
66 | |
---|
67 | temp_dir = tempfile.mkdtemp('','sww_test') |
---|
68 | filename0 = tempfile.mktemp('.sww','test',temp_dir) |
---|
69 | filename1 = tempfile.mktemp('.sww','test',temp_dir) |
---|
70 | filename2 = tempfile.mktemp('.sww','test',temp_dir) |
---|
71 | filename3 = tempfile.mktemp('.sww','test',temp_dir) |
---|
72 | |
---|
73 | #print'filename', filename0,filename1,filename2,filename3 |
---|
74 | |
---|
75 | fid0 = open(filename0, 'w') |
---|
76 | fid1 = open(filename1, 'w') |
---|
77 | fid2 = open(filename2, 'w') |
---|
78 | fid3 = open(filename3, 'w') |
---|
79 | |
---|
80 | fid0.write('hello') |
---|
81 | fid1.write('hello') |
---|
82 | fid2.write('hello') |
---|
83 | fid3.write('hello') |
---|
84 | |
---|
85 | fid0.close() |
---|
86 | fid1.close() |
---|
87 | fid2.close() |
---|
88 | fid3.close() |
---|
89 | |
---|
90 | |
---|
91 | dir, name0 = os.path.split(filename0) |
---|
92 | #print 'dir',dir,name0 |
---|
93 | |
---|
94 | iterate=get_all_swwfiles(dir,'test') |
---|
95 | |
---|
96 | del_dir(temp_dir) |
---|
97 | # removeall(temp_dir) |
---|
98 | |
---|
99 | _, name0 = os.path.split(filename0) |
---|
100 | #print'name0',name0[:-4],iterate[0] |
---|
101 | _, name1 = os.path.split(filename1) |
---|
102 | _, name2 = os.path.split(filename2) |
---|
103 | _, name3 = os.path.split(filename3) |
---|
104 | |
---|
105 | assert name0[:-4] in iterate |
---|
106 | assert name1[:-4] in iterate |
---|
107 | assert name2[:-4] in iterate |
---|
108 | assert name3[:-4] in iterate |
---|
109 | |
---|
110 | assert len(iterate)==4 |
---|
111 | |
---|
112 | def test_merge_swwfiles(self): |
---|
113 | from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular, \ |
---|
114 | rectangular_cross |
---|
115 | from anuga.shallow_water.shallow_water_domain import Domain |
---|
116 | from anuga.file.sww import SWW_file |
---|
117 | from anuga.abstract_2d_finite_volumes.generic_boundary_conditions import \ |
---|
118 | Dirichlet_boundary |
---|
119 | |
---|
120 | Bd = Dirichlet_boundary([0.5, 0., 0.]) |
---|
121 | |
---|
122 | # Create shallow water domain |
---|
123 | domain = Domain(*rectangular_cross(2, 2)) |
---|
124 | domain.set_name('test1') |
---|
125 | domain.set_quantity('elevation', 2) |
---|
126 | domain.set_quantity('stage', 5) |
---|
127 | domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd}) |
---|
128 | for t in domain.evolve(yieldstep=0.5, finaltime=1): |
---|
129 | pass |
---|
130 | |
---|
131 | domain = Domain(*rectangular(3, 3)) |
---|
132 | domain.set_name('test2') |
---|
133 | domain.set_quantity('elevation', 3) |
---|
134 | domain.set_quantity('stage', 50) |
---|
135 | domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd}) |
---|
136 | for t in domain.evolve(yieldstep=0.5, finaltime=1): |
---|
137 | pass |
---|
138 | |
---|
139 | outfile = 'test_out.sww' |
---|
140 | sww_merge(['test1.sww', 'test2.sww'], outfile) |
---|
141 | self.failUnless(os.access(outfile, os.F_OK)) |
---|
142 | |
---|
143 | # remove temp files |
---|
144 | os.remove('test1.sww') |
---|
145 | os.remove('test2.sww') |
---|
146 | # os.remove(outfile) |
---|
147 | |
---|
148 | |
---|
149 | |
---|
150 | #------------------------------------------------------------- |
---|
151 | |
---|
152 | if __name__ == "__main__": |
---|
153 | suite = unittest.makeSuite(Test_FileUtils, 'test') |
---|
154 | runner = unittest.TextTestRunner() #verbosity=2) |
---|
155 | runner.run(suite) |
---|