1 | import unittest |
---|
2 | import tempfile |
---|
3 | import os |
---|
4 | import shutil |
---|
5 | import sys |
---|
6 | |
---|
7 | from anuga.utilities.file_utils import copy_code_files, get_all_swwfiles |
---|
8 | from anuga.utilities.file_utils import del_dir |
---|
9 | from sww_merge import sww_merge |
---|
10 | |
---|
11 | |
---|
12 | class Test_FileUtils(unittest.TestCase): |
---|
13 | |
---|
14 | def test_copy_code_files(self): |
---|
15 | '''test that the copy_code_files() function is sane.''' |
---|
16 | |
---|
17 | def create_file(f): |
---|
18 | fd = open(f, 'w') |
---|
19 | fd.write('%s\n' % f) |
---|
20 | fd.close() |
---|
21 | |
---|
22 | # create working directories and test files |
---|
23 | work_dir = tempfile.mkdtemp() |
---|
24 | dst_dir = tempfile.mkdtemp(dir=work_dir) |
---|
25 | src_dir = tempfile.mkdtemp(dir=work_dir) |
---|
26 | |
---|
27 | f1 = 'file1' |
---|
28 | filename1 = os.path.join(src_dir, f1) |
---|
29 | create_file(filename1) |
---|
30 | f2 = 'file2' |
---|
31 | filename2 = os.path.join(src_dir, f2) |
---|
32 | create_file(filename2) |
---|
33 | f3 = 'file3' |
---|
34 | filename3 = os.path.join(src_dir, f3) |
---|
35 | create_file(filename3) |
---|
36 | f4 = 'file4' |
---|
37 | filename4 = os.path.join(src_dir, f4) |
---|
38 | create_file(filename4) |
---|
39 | f5 = 'file5' |
---|
40 | filename5 = os.path.join(src_dir, f5) |
---|
41 | create_file(filename5) |
---|
42 | |
---|
43 | # exercise the copy function |
---|
44 | copy_code_files(dst_dir, filename1) |
---|
45 | copy_code_files(dst_dir, filename1, filename2) |
---|
46 | copy_code_files(dst_dir, (filename4, filename5, filename3)) |
---|
47 | |
---|
48 | # test that files were actually copied |
---|
49 | self.failUnless(os.access(os.path.join(dst_dir, f1), os.F_OK)) |
---|
50 | self.failUnless(os.access(os.path.join(dst_dir, f2), os.F_OK)) |
---|
51 | self.failUnless(os.access(os.path.join(dst_dir, f3), os.F_OK)) |
---|
52 | self.failUnless(os.access(os.path.join(dst_dir, f4), os.F_OK)) |
---|
53 | self.failUnless(os.access(os.path.join(dst_dir, f5), os.F_OK)) |
---|
54 | |
---|
55 | # clean up |
---|
56 | shutil.rmtree(work_dir) |
---|
57 | |
---|
58 | def test_get_all_swwfiles(self): |
---|
59 | try: |
---|
60 | swwfiles = get_all_swwfiles('','test.txt') #Invalid |
---|
61 | except IOError: |
---|
62 | pass |
---|
63 | else: |
---|
64 | raise 'Should have raised exception' |
---|
65 | |
---|
66 | def test_get_all_swwfiles1(self): |
---|
67 | |
---|
68 | temp_dir = tempfile.mkdtemp('','sww_test') |
---|
69 | filename0 = tempfile.mktemp('.sww','test',temp_dir) |
---|
70 | filename1 = tempfile.mktemp('.sww','test',temp_dir) |
---|
71 | filename2 = tempfile.mktemp('.sww','test',temp_dir) |
---|
72 | filename3 = tempfile.mktemp('.sww','test',temp_dir) |
---|
73 | |
---|
74 | #print'filename', filename0,filename1,filename2,filename3 |
---|
75 | |
---|
76 | fid0 = open(filename0, 'w') |
---|
77 | fid1 = open(filename1, 'w') |
---|
78 | fid2 = open(filename2, 'w') |
---|
79 | fid3 = open(filename3, 'w') |
---|
80 | |
---|
81 | fid0.write('hello') |
---|
82 | fid1.write('hello') |
---|
83 | fid2.write('hello') |
---|
84 | fid3.write('hello') |
---|
85 | |
---|
86 | fid0.close() |
---|
87 | fid1.close() |
---|
88 | fid2.close() |
---|
89 | fid3.close() |
---|
90 | |
---|
91 | |
---|
92 | dir, name0 = os.path.split(filename0) |
---|
93 | #print 'dir',dir,name0 |
---|
94 | |
---|
95 | iterate=get_all_swwfiles(dir,'test') |
---|
96 | |
---|
97 | del_dir(temp_dir) |
---|
98 | # removeall(temp_dir) |
---|
99 | |
---|
100 | _, name0 = os.path.split(filename0) |
---|
101 | #print'name0',name0[:-4],iterate[0] |
---|
102 | _, name1 = os.path.split(filename1) |
---|
103 | _, name2 = os.path.split(filename2) |
---|
104 | _, name3 = os.path.split(filename3) |
---|
105 | |
---|
106 | assert name0[:-4] in iterate |
---|
107 | assert name1[:-4] in iterate |
---|
108 | assert name2[:-4] in iterate |
---|
109 | assert name3[:-4] in iterate |
---|
110 | |
---|
111 | assert len(iterate)==4 |
---|
112 | |
---|
113 | def test_merge_swwfiles(self): |
---|
114 | from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular, \ |
---|
115 | rectangular_cross |
---|
116 | from anuga.shallow_water.shallow_water_domain import Domain |
---|
117 | from anuga.file.sww import SWW_file |
---|
118 | from anuga.abstract_2d_finite_volumes.generic_boundary_conditions import \ |
---|
119 | Dirichlet_boundary |
---|
120 | |
---|
121 | Bd = Dirichlet_boundary([0.5, 0., 0.]) |
---|
122 | |
---|
123 | # Create shallow water domain |
---|
124 | domain = Domain(*rectangular_cross(2, 2)) |
---|
125 | domain.set_name('test1') |
---|
126 | domain.set_quantity('elevation', 2) |
---|
127 | domain.set_quantity('stage', 5) |
---|
128 | domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd}) |
---|
129 | for t in domain.evolve(yieldstep=0.5, finaltime=1): |
---|
130 | pass |
---|
131 | |
---|
132 | domain = Domain(*rectangular(3, 3)) |
---|
133 | domain.set_name('test2') |
---|
134 | domain.set_quantity('elevation', 3) |
---|
135 | domain.set_quantity('stage', 50) |
---|
136 | domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd}) |
---|
137 | for t in domain.evolve(yieldstep=0.5, finaltime=1): |
---|
138 | pass |
---|
139 | |
---|
140 | outfile = 'test_out.sww' |
---|
141 | sww_merge(['test1.sww', 'test2.sww'], outfile) |
---|
142 | self.failUnless(os.access(outfile, os.F_OK)) |
---|
143 | |
---|
144 | # remove temp files |
---|
145 | if not sys.platform == 'win32': |
---|
146 | os.remove('test1.sww') |
---|
147 | os.remove('test2.sww') |
---|
148 | os.remove(outfile) |
---|
149 | |
---|
150 | |
---|
151 | |
---|
152 | #------------------------------------------------------------- |
---|
153 | |
---|
154 | if __name__ == "__main__": |
---|
155 | suite = unittest.makeSuite(Test_FileUtils, 'test') |
---|
156 | runner = unittest.TextTestRunner() #verbosity=2) |
---|
157 | runner.run(suite) |
---|