1 | #!/usr/bin/env python |
---|
2 | |
---|
3 | |
---|
4 | import unittest |
---|
5 | import tempfile |
---|
6 | import random |
---|
7 | import Numeric as num |
---|
8 | import zlib |
---|
9 | from os.path import join, split, sep |
---|
10 | from anuga.config import netcdf_mode_r, netcdf_mode_w, netcdf_mode_a |
---|
11 | |
---|
12 | |
---|
13 | # Please, don't add anuga.utilities to these imports. |
---|
14 | # I'm trying to keep this file general, so it works for EQRM and ANUGA |
---|
15 | # EQRM also uses this file, but has a different directory structure |
---|
16 | from system_tools import * |
---|
17 | |
---|
18 | class Test_system_tools(unittest.TestCase): |
---|
19 | def setUp(self): |
---|
20 | pass |
---|
21 | |
---|
22 | def tearDown(self): |
---|
23 | pass |
---|
24 | |
---|
25 | def test_user_name(self): |
---|
26 | user = get_user_name() |
---|
27 | |
---|
28 | # print user |
---|
29 | assert isinstance(user, basestring), 'User name should be a string' |
---|
30 | |
---|
31 | def test_host_name(self): |
---|
32 | host = get_host_name() |
---|
33 | |
---|
34 | # print host |
---|
35 | assert isinstance(host, basestring), 'User name should be a string' |
---|
36 | |
---|
37 | def test_compute_checksum(self): |
---|
38 | """test_compute_checksum(self): |
---|
39 | |
---|
40 | Check that checksums on files are OK |
---|
41 | """ |
---|
42 | |
---|
43 | from tempfile import mkstemp, mktemp |
---|
44 | |
---|
45 | # Generate a text file |
---|
46 | tmp_fd , tmp_name = mkstemp(suffix='.tmp', dir='.') |
---|
47 | fid = os.fdopen(tmp_fd, 'w+b') |
---|
48 | string = 'My temp file with textual content. AAAABBBBCCCC1234' |
---|
49 | fid.write(string) |
---|
50 | fid.close() |
---|
51 | |
---|
52 | # Have to apply the 64 bit fix here since we aren't comparing two |
---|
53 | # files, but rather a string and a file. |
---|
54 | ref_crc = safe_crc(string) |
---|
55 | |
---|
56 | checksum = compute_checksum(tmp_name) |
---|
57 | assert checksum == ref_crc |
---|
58 | |
---|
59 | os.remove(tmp_name) |
---|
60 | |
---|
61 | |
---|
62 | |
---|
63 | # Binary file |
---|
64 | tmp_fd , tmp_name = mkstemp(suffix='.tmp', dir='.') |
---|
65 | fid = os.fdopen(tmp_fd, 'w+b') |
---|
66 | |
---|
67 | string = 'My temp file with binary content. AAAABBBBCCCC1234' |
---|
68 | fid.write(string) |
---|
69 | fid.close() |
---|
70 | |
---|
71 | ref_crc = safe_crc(string) |
---|
72 | checksum = compute_checksum(tmp_name) |
---|
73 | |
---|
74 | assert checksum == ref_crc |
---|
75 | |
---|
76 | os.remove(tmp_name) |
---|
77 | |
---|
78 | # Binary NetCDF File X 2 (use mktemp's name) |
---|
79 | |
---|
80 | try: |
---|
81 | from Scientific.IO.NetCDF import NetCDFFile |
---|
82 | except ImportError: |
---|
83 | # This code is also used by EQRM which does not require NetCDF |
---|
84 | pass |
---|
85 | else: |
---|
86 | test_array = num.array([[7.0, 3.14], [-31.333, 0.0]]) |
---|
87 | |
---|
88 | # First file |
---|
89 | filename1 = mktemp(suffix='.nc', dir='.') |
---|
90 | fid = NetCDFFile(filename1, netcdf_mode_w) |
---|
91 | fid.createDimension('two', 2) |
---|
92 | fid.createVariable('test_array', num.Float, |
---|
93 | ('two', 'two')) |
---|
94 | fid.variables['test_array'][:] = test_array |
---|
95 | fid.close() |
---|
96 | |
---|
97 | # Second file |
---|
98 | filename2 = mktemp(suffix='.nc', dir='.') |
---|
99 | fid = NetCDFFile(filename2, netcdf_mode_w) |
---|
100 | fid.createDimension('two', 2) |
---|
101 | fid.createVariable('test_array', num.Float, |
---|
102 | ('two', 'two')) |
---|
103 | fid.variables['test_array'][:] = test_array |
---|
104 | fid.close() |
---|
105 | |
---|
106 | |
---|
107 | checksum1 = compute_checksum(filename1) |
---|
108 | checksum2 = compute_checksum(filename2) |
---|
109 | assert checksum1 == checksum2 |
---|
110 | |
---|
111 | |
---|
112 | os.remove(filename1) |
---|
113 | os.remove(filename2) |
---|
114 | |
---|
115 | |
---|
116 | def test_compute_checksum_real(self): |
---|
117 | """test_compute_checksum(self): |
---|
118 | |
---|
119 | Check that checksums on a png file is OK |
---|
120 | """ |
---|
121 | |
---|
122 | # Get path where this test is run |
---|
123 | # I'm trying to keep this file general, so it works for EQRM and ANUGA |
---|
124 | path, tail = split(__file__) |
---|
125 | if path == '': |
---|
126 | path = '.' + sep |
---|
127 | |
---|
128 | filename = path + sep + 'crc_test_file.png' |
---|
129 | |
---|
130 | ref_crc = 1203293305 # Computed on Windows box |
---|
131 | checksum = compute_checksum(filename) |
---|
132 | |
---|
133 | msg = 'Computed checksum = %s, should have been %s'\ |
---|
134 | %(checksum, ref_crc) |
---|
135 | assert checksum == ref_crc, msg |
---|
136 | #print checksum |
---|
137 | |
---|
138 | |
---|
139 | def test_get_vars_in_expression(self): |
---|
140 | '''Test the 'get vars from expression' code.''' |
---|
141 | |
---|
142 | def test_it(source, expected): |
---|
143 | result = get_vars_in_expression(source) |
---|
144 | result.sort() |
---|
145 | expected.sort() |
---|
146 | msg = ("Source: '%s'\nResult: %s\nExpected: %s" |
---|
147 | % (source, str(result), str(expected))) |
---|
148 | self.failUnlessEqual(result, expected, msg) |
---|
149 | |
---|
150 | source = 'fred' |
---|
151 | expected = ['fred'] |
---|
152 | test_it(source, expected) |
---|
153 | |
---|
154 | source = 'tom + dick' |
---|
155 | expected = ['tom', 'dick'] |
---|
156 | test_it(source, expected) |
---|
157 | |
---|
158 | source = 'tom * (dick + harry)' |
---|
159 | expected = ['tom', 'dick', 'harry'] |
---|
160 | test_it(source, expected) |
---|
161 | |
---|
162 | source = 'tom + dick**0.5 / (harry - tom)' |
---|
163 | expected = ['tom', 'dick', 'harry'] |
---|
164 | test_it(source, expected) |
---|
165 | |
---|
166 | |
---|
167 | def test_tar_untar_files(self): |
---|
168 | '''Test that tarring & untarring files is OK.''' |
---|
169 | |
---|
170 | num_lines = 100 |
---|
171 | line_size = 100 |
---|
172 | |
---|
173 | # these test files must exist in the current directory |
---|
174 | # create them with random data |
---|
175 | files = ('alpha', 'beta', 'gamma') |
---|
176 | for file in files: |
---|
177 | fd = open(file, 'w') |
---|
178 | line = '' |
---|
179 | for i in range(num_lines): |
---|
180 | for j in range(line_size): |
---|
181 | line += chr(random.randint(ord('A'), ord('Z'))) |
---|
182 | line += '\n' |
---|
183 | fd.write(line) |
---|
184 | fd.close() |
---|
185 | |
---|
186 | # name of tar file and test (temp) directory |
---|
187 | tar_filename = 'test.tgz' |
---|
188 | tmp_dir = tempfile.mkdtemp() |
---|
189 | |
---|
190 | # tar and untar the test files into a temporary directory |
---|
191 | tar_file(files, tar_filename) |
---|
192 | untar_file(tar_filename, tmp_dir) |
---|
193 | |
---|
194 | # see if original files and untarred ones are the same |
---|
195 | for file in files: |
---|
196 | fd = open(file, 'r') |
---|
197 | orig = fd.readlines() |
---|
198 | fd.close() |
---|
199 | |
---|
200 | fd = open(os.path.join(tmp_dir, file), 'r') |
---|
201 | copy = fd.readlines() |
---|
202 | fd.close() |
---|
203 | |
---|
204 | msg = "Original file %s isn't the same as untarred copy?" % file |
---|
205 | self.failUnless(orig == copy, msg) |
---|
206 | |
---|
207 | # clean up |
---|
208 | for file in files: |
---|
209 | os.remove(file) |
---|
210 | os.remove(tar_filename) |
---|
211 | |
---|
212 | |
---|
213 | def test_file_digest(self): |
---|
214 | '''Test that file digest functions give 'correct' answer. |
---|
215 | |
---|
216 | Not a good test as we get 'expected_digest' from a digest file, |
---|
217 | but *does* alert us if the digest algorithm gives us a different string. |
---|
218 | ''' |
---|
219 | |
---|
220 | # we expect this digest string from the data file |
---|
221 | expected_digest = '831a1dde6edd365ec4163a47871fa21b' |
---|
222 | |
---|
223 | # prepare test directory and filenames |
---|
224 | tmp_dir = tempfile.mkdtemp() |
---|
225 | data_file = os.path.join(tmp_dir, 'test.data') |
---|
226 | digest_file = os.path.join(tmp_dir, 'test.digest') |
---|
227 | |
---|
228 | # create the data file |
---|
229 | data_line = 'The quick brown fox jumps over the lazy dog. 0123456789\n' |
---|
230 | fd = open(data_file, 'w') |
---|
231 | for line in range(100): |
---|
232 | fd.write(data_line) |
---|
233 | fd.close() |
---|
234 | |
---|
235 | # create the digest file |
---|
236 | make_digest_file(data_file, digest_file) |
---|
237 | |
---|
238 | # get digest string for the data file |
---|
239 | digest = get_file_hexdigest(data_file) |
---|
240 | |
---|
241 | # check that digest is as expected, string |
---|
242 | msg = ("Digest string wrong, got '%s', expected '%s'" |
---|
243 | % (digest, expected_digest)) |
---|
244 | self.failUnless(expected_digest == digest, msg) |
---|
245 | |
---|
246 | # check that digest is as expected, file |
---|
247 | msg = ("Digest file wrong, got '%s', expected '%s'" |
---|
248 | % (digest, expected_digest)) |
---|
249 | fd = open(digest_file, 'r') |
---|
250 | digest = fd.readline() |
---|
251 | fd.close() |
---|
252 | self.failUnless(expected_digest == digest, msg) |
---|
253 | |
---|
254 | def test_file_length_function(self): |
---|
255 | '''Test that file_length() give 'correct' answer.''' |
---|
256 | |
---|
257 | # prepare test directory and filenames |
---|
258 | tmp_dir = tempfile.mkdtemp() |
---|
259 | test_file1 = os.path.join(tmp_dir, 'test.file1') |
---|
260 | test_file2 = os.path.join(tmp_dir, 'test.file2') |
---|
261 | test_file3 = os.path.join(tmp_dir, 'test.file3') |
---|
262 | test_file4 = os.path.join(tmp_dir, 'test.file4') |
---|
263 | |
---|
264 | # create files of known length |
---|
265 | fd = open(test_file1, 'w') # 0 lines |
---|
266 | fd.close |
---|
267 | fd = open(test_file2, 'w') # 5 lines, all '\n' |
---|
268 | for i in range(5): |
---|
269 | fd.write('\n') |
---|
270 | fd.close() |
---|
271 | fd = open(test_file3, 'w') # 25 chars, no \n, 1 lines |
---|
272 | fd.write('no newline at end of line') |
---|
273 | fd.close() |
---|
274 | fd = open(test_file4, 'w') # 1000 lines |
---|
275 | for i in range(1000): |
---|
276 | fd.write('The quick brown fox jumps over the lazy dog.\n') |
---|
277 | fd.close() |
---|
278 | |
---|
279 | # use file_length() to get and check lengths |
---|
280 | size1 = file_length(test_file1) |
---|
281 | msg = 'Expected file_length() to return 0, but got %d' % size1 |
---|
282 | self.failUnless(size1 == 0, msg) |
---|
283 | size2 = file_length(test_file2) |
---|
284 | msg = 'Expected file_length() to return 5, but got %d' % size2 |
---|
285 | self.failUnless(size2 == 5, msg) |
---|
286 | size3 = file_length(test_file3) |
---|
287 | msg = 'Expected file_length() to return 1, but got %d' % size3 |
---|
288 | self.failUnless(size3 == 1, msg) |
---|
289 | size4 = file_length(test_file4) |
---|
290 | msg = 'Expected file_length() to return 1000, but got %d' % size4 |
---|
291 | self.failUnless(size4 == 1000, msg) |
---|
292 | |
---|
293 | |
---|
294 | #------------------------------------------------------------- |
---|
295 | if __name__ == "__main__": |
---|
296 | suite = unittest.makeSuite(Test_system_tools, 'test') |
---|
297 | runner = unittest.TextTestRunner() |
---|
298 | runner.run(suite) |
---|
299 | |
---|