source: trunk/anuga_core/source/anuga/utilities/test_system_tools.py @ 7876

Last change on this file since 7876 was 7486, checked in by ole, 15 years ago

Reverted changeset:6860 as there are too many types of output from svnversion.
We are now back to svn info.
Also moved nested function declarations out and made them Python style internals using
Added simple unit test of this functionality.

File size: 15.4 KB
Line 
1#!/usr/bin/env python
2
3
4import unittest
5import numpy as num
6import random
7import tempfile
8import zlib
9import os
10from os.path import join, split, sep
11from Scientific.IO.NetCDF import NetCDFFile
12from anuga.config import netcdf_mode_r, netcdf_mode_w, netcdf_mode_a
13from anuga.config import netcdf_float, netcdf_char, netcdf_int
14
15
16# Please, don't add anuga.utilities to these imports.
17# I'm trying to keep this file general, so it works for EQRM and ANUGA
18# EQRM also uses this file, but has a different directory structure
19from system_tools import *
20
21class Test_system_tools(unittest.TestCase):
22    def setUp(self):
23        pass
24
25    def tearDown(self):
26        pass
27
28    def test_user_name(self):
29        user = get_user_name()
30
31        # print user
32        assert isinstance(user, basestring), 'User name should be a string'
33
34    def test_host_name(self):
35        host = get_host_name()
36
37        # print host
38        assert isinstance(host, basestring), 'User name should be a string'       
39
40    def test_compute_checksum(self):
41        """test_compute_checksum(self):
42
43        Check that checksums on files are OK
44        """
45
46        from tempfile import mkstemp, mktemp
47
48        # Generate a text file
49        tmp_fd , tmp_name = mkstemp(suffix='.tmp', dir='.')
50        fid = os.fdopen(tmp_fd, 'w+b')
51        string = 'My temp file with textual content. AAAABBBBCCCC1234'
52        fid.write(string)
53        fid.close()
54
55        # Have to apply the 64 bit fix here since we aren't comparing two
56        # files, but rather a string and a file.
57        ref_crc = safe_crc(string)
58
59        checksum = compute_checksum(tmp_name)
60        assert checksum == ref_crc
61
62        os.remove(tmp_name)
63
64
65
66        # Binary file
67        tmp_fd , tmp_name = mkstemp(suffix='.tmp', dir='.')
68        fid = os.fdopen(tmp_fd, 'w+b')
69
70        string = 'My temp file with binary content. AAAABBBBCCCC1234'
71        fid.write(string)
72        fid.close()
73
74        ref_crc = safe_crc(string)
75        checksum = compute_checksum(tmp_name)
76
77        assert checksum == ref_crc
78
79        os.remove(tmp_name)       
80
81        # Binary NetCDF File X 2 (use mktemp's name)
82
83        try:
84            from Scientific.IO.NetCDF import NetCDFFile
85        except ImportError:
86            # This code is also used by EQRM which does not require NetCDF
87            pass
88        else:
89            test_array = num.array([[7.0, 3.14], [-31.333, 0.0]])
90
91            # First file
92            filename1 = mktemp(suffix='.nc', dir='.')
93            fid = NetCDFFile(filename1, netcdf_mode_w)
94            fid.createDimension('two', 2)
95            fid.createVariable('test_array', netcdf_float,
96                               ('two', 'two'))
97            fid.variables['test_array'][:] = test_array
98            fid.close()
99
100            # Second file
101            filename2 = mktemp(suffix='.nc', dir='.')
102            fid = NetCDFFile(filename2, netcdf_mode_w)
103            fid.createDimension('two', 2)
104            fid.createVariable('test_array', netcdf_float,
105                               ('two', 'two'))
106            fid.variables['test_array'][:] = test_array
107            fid.close()
108
109
110            checksum1 = compute_checksum(filename1)
111            checksum2 = compute_checksum(filename2)       
112            assert checksum1 == checksum2
113
114
115            os.remove(filename1)
116            os.remove(filename2)
117
118
119    def test_compute_checksum_real(self):
120        """test_compute_checksum(self):
121
122        Check that checksums on a png file is OK
123        """
124
125        # Get path where this test is run
126        # I'm trying to keep this file general, so it works for EQRM and ANUGA
127        path, tail = split(__file__)
128        if path == '':
129            path = '.' + sep
130
131        filename = path + sep +  'crc_test_file.png'
132
133        ref_crc = 1203293305 # Computed on Windows box
134        checksum = compute_checksum(filename)
135
136        msg = 'Computed checksum = %s, should have been %s'\
137              %(checksum, ref_crc)
138        assert checksum == ref_crc, msg
139        #print checksum
140
141################################################################################
142# Test the clean_line() utility function.
143################################################################################
144
145    # helper routine to test clean_line()
146    def clean_line_test(self, instr, delim, expected):
147        result = clean_line(instr, delim)
148        self.failUnless(result == expected,
149                        "clean_line('%s', '%s'), expected %s, got %s"
150                        % (str(instr), str(delim), str(expected), str(result)))
151
152    def test_clean_line_01(self):
153        self.clean_line_test('abc, ,,xyz,123', ',', ['abc', '', 'xyz', '123'])
154
155    def test_clean_line_02(self):
156        self.clean_line_test(' abc , ,, xyz  , 123  ', ',',
157                             ['abc', '', 'xyz', '123'])
158
159    def test_clean_line_03(self):
160        self.clean_line_test('1||||2', '|', ['1', '2'])
161
162    def test_clean_line_04(self):
163        self.clean_line_test('abc, ,,xyz,123, ', ',',
164                             ['abc', '', 'xyz', '123']) 
165
166    def test_clean_line_05(self):
167        self.clean_line_test('abc, ,,xyz,123, ,    ', ',',
168                             ['abc', '', 'xyz', '123', ''])
169
170    def test_clean_line_06(self):
171        self.clean_line_test(',,abc, ,,xyz,123, ,    ', ',',
172                             ['abc', '', 'xyz', '123', ''])
173
174    def test_clean_line_07(self):
175        self.clean_line_test('|1||||2', '|', ['1', '2'])
176
177    def test_clean_line_08(self):
178        self.clean_line_test(' ,a,, , ,b,c , ,, , ', ',',
179                             ['a', '', '', 'b', 'c', '', ''])
180
181    def test_clean_line_09(self):
182        self.clean_line_test('a:b:c', ':', ['a', 'b', 'c'])
183
184    def test_clean_line_10(self):
185        self.clean_line_test('a:b:c:', ':', ['a', 'b', 'c'])
186
187################################################################################
188# Test the string_to_char() and char_to_string() utility functions.
189################################################################################
190
191    def test_string_to_char(self):
192        import random
193
194        MAX_CHARS = 10
195        MAX_ENTRIES = 10000
196        A_INT = ord('a')
197        Z_INT = ord('z')
198
199        # generate some random strings in a list, with guaranteed lengths
200        str_list = ['x' * MAX_CHARS]        # make first maximum length
201        for entry in xrange(MAX_ENTRIES):
202            length = random.randint(1, MAX_CHARS)
203            s = ''
204            for c in range(length):
205                s += chr(random.randint(A_INT, Z_INT))
206            str_list.append(s)
207
208        x = string_to_char(str_list)
209        new_str_list = char_to_string(x)
210
211        self.failUnlessEqual(new_str_list, str_list)
212
213    # special test - input list is ['']
214    def test_string_to_char2(self):
215        # generate a special list shown bad in load_mesh testing
216        str_list = ['']
217
218        x = string_to_char(str_list)
219        new_str_list = char_to_string(x)
220
221        self.failUnlessEqual(new_str_list, str_list)
222
223
224################################################################################
225# Test the raw I/O to NetCDF files of string data encoded/decoded with
226# string_to_char() and char_to_string().
227################################################################################
228
229    ##
230    # @brief Helper function to write a list of strings to a NetCDF file.
231    # @param filename Path to the file to write.
232    # @param l The list of strings to write.
233    def helper_write_msh_file(self, filename, l):
234        # open the NetCDF file
235        fd = NetCDFFile(filename, netcdf_mode_w)
236        fd.description = 'Test file - string arrays'
237
238        # convert list of strings to num.array
239        al = num.array(string_to_char(l), num.character)
240
241        # write the list
242        fd.createDimension('num_of_strings', al.shape[0])
243        fd.createDimension('size_of_strings', al.shape[1])
244
245        var = fd.createVariable('strings', netcdf_char,
246                                ('num_of_strings', 'size_of_strings'))
247        var[:] = al
248
249        fd.close()
250
251
252    ##
253    # @brief Helper function to read a NetCDF file and return a list of strings.
254    # @param filename Path to the file to read.
255    # @return A list of strings from the file.
256    def helper_read_msh_file(self, filename):
257        fid = NetCDFFile(filename, netcdf_mode_r)
258        mesh = {}
259
260        # Get the 'strings' variable
261        strings = fid.variables['strings'][:]
262
263        fid.close()
264
265        return char_to_string(strings)
266
267
268    # test random strings to a NetCDF file
269    def test_string_to_netcdf(self):
270        import random
271
272        MAX_CHARS = 10
273        MAX_ENTRIES = 10000
274
275        A_INT = ord('a')
276        Z_INT = ord('z')
277
278        FILENAME = 'test.msh'
279
280        # generate some random strings in a list, with guaranteed lengths
281        str_list = ['x' * MAX_CHARS]        # make first maximum length
282        for entry in xrange(MAX_ENTRIES):
283            length = random.randint(1, MAX_CHARS)
284            s = ''
285            for c in range(length):
286                s += chr(random.randint(A_INT, Z_INT))
287            str_list.append(s)
288
289        self.helper_write_msh_file(FILENAME, str_list)
290        new_str_list = self.helper_read_msh_file(FILENAME)
291
292        self.failUnlessEqual(new_str_list, str_list)
293        os.remove(FILENAME)
294
295    # special test - list [''] to a NetCDF file
296    def test_string_to_netcdf2(self):
297        FILENAME = 'test.msh'
298
299        # generate some random strings in a list, with guaranteed lengths
300        str_list = ['']
301
302        self.helper_write_msh_file(FILENAME, str_list)
303        new_str_list = self.helper_read_msh_file(FILENAME)
304
305        self.failUnlessEqual(new_str_list, str_list)
306        os.remove(FILENAME)
307
308
309    def test_get_vars_in_expression(self):
310        '''Test the 'get vars from expression' code.'''
311
312        def test_it(source, expected):
313            result = get_vars_in_expression(source)
314            result.sort()
315            expected.sort()
316            msg = ("Source: '%s'\nResult: %s\nExpected: %s"
317                   % (source, str(result), str(expected)))
318            self.failUnlessEqual(result, expected, msg)
319               
320        source = 'fred'
321        expected = ['fred']
322        test_it(source, expected)
323
324        source = 'tom + dick'
325        expected = ['tom', 'dick']
326        test_it(source, expected)
327
328        source = 'tom * (dick + harry)'
329        expected = ['tom', 'dick', 'harry']
330        test_it(source, expected)
331
332        source = 'tom + dick**0.5 / (harry - tom)'
333        expected = ['tom', 'dick', 'harry']
334        test_it(source, expected)
335
336
337    def test_tar_untar_files(self):
338        '''Test that tarring & untarring files is OK.'''
339
340        num_lines = 100
341        line_size = 100
342
343        # these test files must exist in the current directory
344        # create them with random data
345        files = ('alpha', 'beta', 'gamma')
346        for file in files:
347            fd = open(file, 'w')
348            line = ''
349            for i in range(num_lines):
350                for j in range(line_size):
351                    line += chr(random.randint(ord('A'), ord('Z')))
352                line += '\n'
353                fd.write(line)
354            fd.close()
355
356        # name of tar file and test (temp) directory
357        tar_filename = 'test.tgz'
358        tmp_dir = tempfile.mkdtemp()
359
360        # tar and untar the test files into a temporary directory
361        tar_file(files, tar_filename)
362        untar_file(tar_filename, tmp_dir)
363
364        # see if original files and untarred ones are the same
365        for file in files:
366            fd = open(file, 'r')
367            orig = fd.readlines()
368            fd.close()
369
370            fd = open(os.path.join(tmp_dir, file), 'r')
371            copy = fd.readlines()
372            fd.close()
373
374            msg = "Original file %s isn't the same as untarred copy?" % file
375            self.failUnless(orig == copy, msg)
376
377        # clean up
378        for file in files:
379            os.remove(file)
380        os.remove(tar_filename)
381
382
383    def test_file_digest(self):
384        '''Test that file digest functions give 'correct' answer.
385       
386        Not a good test as we get 'expected_digest' from a digest file,
387        but *does* alert us if the digest algorithm ever changes.
388        '''
389
390        # we expect this digest string from the data file
391        expected_digest = '831a1dde6edd365ec4163a47871fa21b'
392
393        # prepare test directory and filenames
394        tmp_dir = tempfile.mkdtemp()
395        data_file = os.path.join(tmp_dir, 'test.data')
396        digest_file = os.path.join(tmp_dir, 'test.digest')
397
398        # create the data file
399        data_line = 'The quick brown fox jumps over the lazy dog. 0123456789\n'
400        fd = open(data_file, 'w')
401        for line in range(100):
402            fd.write(data_line)
403        fd.close()
404
405        # create the digest file
406        make_digest_file(data_file, digest_file)
407
408        # get digest string for the data file
409        digest = get_file_hexdigest(data_file)
410
411        # check that digest is as expected, string
412        msg = ("Digest string wrong, got '%s', expected '%s'"
413               % (digest, expected_digest))
414        self.failUnless(expected_digest == digest, msg)
415
416        # check that digest is as expected, file
417        msg = ("Digest file wrong, got '%s', expected '%s'"
418               % (digest, expected_digest))
419        fd = open(digest_file, 'r')
420        digest = fd.readline()
421        fd.close()
422        self.failUnless(expected_digest == digest, msg)
423
424
425    def test_file_length_function(self):
426        '''Test that file_length() give 'correct' answer.'''
427
428        # prepare test directory and filenames
429        tmp_dir = tempfile.mkdtemp()
430        test_file1 = os.path.join(tmp_dir, 'test.file1')
431        test_file2 = os.path.join(tmp_dir, 'test.file2')
432        test_file3 = os.path.join(tmp_dir, 'test.file3')
433        test_file4 = os.path.join(tmp_dir, 'test.file4')
434
435        # create files of known length
436        fd = open(test_file1, 'w')      # 0 lines
437        fd.close
438        fd = open(test_file2, 'w')      # 5 lines, all '\n'
439        for i in range(5):
440            fd.write('\n')
441        fd.close()
442        fd = open(test_file3, 'w')      # 25 chars, no \n, 1 lines
443        fd.write('no newline at end of line')
444        fd.close()
445        fd = open(test_file4, 'w')      # 1000 lines
446        for i in range(1000):
447            fd.write('The quick brown fox jumps over the lazy dog.\n')
448        fd.close()
449
450        # use file_length() to get and check lengths
451        size1 = file_length(test_file1)
452        msg = 'Expected file_length() to return 0, but got %d' % size1
453        self.failUnless(size1 == 0, msg)
454        size2 = file_length(test_file2)
455        msg = 'Expected file_length() to return 5, but got %d' % size2
456        self.failUnless(size2 == 5, msg)
457        size3 = file_length(test_file3)
458        msg = 'Expected file_length() to return 1, but got %d' % size3
459        self.failUnless(size3 == 1, msg)
460        size4 = file_length(test_file4)
461        msg = 'Expected file_length() to return 1000, but got %d' % size4
462        self.failUnless(size4 == 1000, msg)
463       
464       
465       
466    def test_get_revision_number(self):
467        """test_get_revision_number
468       
469        Test that a revision number is returned.
470        This should work both from a sandpit with access to Subversion
471        and also in distributions where revision number has been stored
472        explicitly in anuga.stored_version_info.version_info
473        """
474
475        x = get_revision_number()
476        assert int(x)
477
478################################################################################
479
480if __name__ == "__main__":
481    suite = unittest.makeSuite(Test_system_tools, 'test')
482    runner = unittest.TextTestRunner()
483    runner.run(suite)
484
Note: See TracBrowser for help on using the repository browser.