source: trunk/anuga_core/source/anuga/utilities/test_csv_tools.py @ 7967

Last change on this file since 7967 was 7126, checked in by rwilson, 15 years ago

Added test case for expected exception on zero-length file_list input.

File size: 13.2 KB
Line 
1#!/usr/bin/env python
2
3import os
4import unittest
5import tempfile
6import csv
7
8import csv_tools
9
10
11# this dictionary sets the column header string for
12# column number modulo 4.
13col_text_string = {0: 'col_%d',
14                   1: ' col_%d',
15                   2: 'col_%d ',
16                   3: ' col_%d '
17                  }
18
19class Test_CSV_utils(unittest.TestCase):
20
21    NUM_FILES = 10
22    NUM_COLS = 6
23    NUM_LINES = 10
24    OUTPUT_FILE = 'test.csv'
25   
26    def setUp(self):
27        # create temporary scratch directory
28        self.tmp_dir = tempfile.mkdtemp()
29
30        # create 4 test CSV files
31        self.num_files = self.NUM_FILES
32        self.filenames = []
33        for i in range(self.NUM_FILES):
34            self.filenames.append(tempfile.mktemp('.csv'))
35        for (i, fn) in enumerate(self.filenames):
36            fd = open(fn, 'w')
37            csv_fd = csv.writer(fd)
38            # write colums row
39            columns = []
40            for j in range(self.NUM_COLS):
41                columns.append(col_text_string[j % 4] % j)
42            csv_fd.writerow(columns)
43
44            # write data rows
45            for j in xrange(self.NUM_LINES):
46                data = [j, j, '%d.%d' % (j, i)] + ['qwert']*(self.NUM_COLS-3)
47                csv_fd.writerow(data)
48            fd.close()
49
50
51    def tearDown(self):
52        for fn in self.filenames:
53            try:
54                os.remove(fn)
55            except:
56                pass
57        try:
58            os.remove(self.OUTPUT_FILE)
59        except:
60            pass
61
62
63    def test_merge_one_file(self):
64        """Test merging a single CSV file.
65       
66        This is the same as a two coluymn extract, with column rename.
67        """
68
69        file_title_list = [(self.filenames[0], 'test')]
70        csv_tools.merge_csv_key_values(file_title_list, self.OUTPUT_FILE,
71                                       key_col='col_0', data_col='col_3')
72
73        expected = '''col_0,test
740,qwert
751,qwert
762,qwert
773,qwert
784,qwert
795,qwert
806,qwert
817,qwert
828,qwert
839,qwert
84'''
85
86        got = self.get_file_contents(self.OUTPUT_FILE)
87        msg = ('Merging one file,\n'
88               'expected file=\n'
89               '--------------------\n'
90               '%s'
91               '--------------------\n'
92               'got file=\n'
93               '--------------------\n'
94               '%s'
95               '--------------------\n'
96               % (expected, got))
97        self.failUnless(self.str_cmp(got, expected), msg)
98
99
100    def test_merge_two_files(self):
101        """Test merging two CSV files."""
102
103        file_title_list = [(self.filenames[0], 'test0'),
104                           (self.filenames[1], 'test1')]
105        csv_tools.merge_csv_key_values(file_title_list, self.OUTPUT_FILE,
106                                       key_col='col_0', data_col='col_3')
107
108        expected = '''col_0,test0,test1
1090,qwert,qwert
1101,qwert,qwert
1112,qwert,qwert
1123,qwert,qwert
1134,qwert,qwert
1145,qwert,qwert
1156,qwert,qwert
1167,qwert,qwert
1178,qwert,qwert
1189,qwert,qwert
119'''
120
121        got = self.get_file_contents(self.OUTPUT_FILE)
122        msg = ('Merging two files,\n'
123               'expected file=\n'
124               '--------------------\n'
125               '%s'
126               '--------------------\n'
127               'got file=\n'
128               '--------------------\n'
129               '%s'
130               '--------------------\n'
131               % (expected, got))
132        self.failUnless(self.str_cmp(got, expected), msg)
133
134
135    def test_merge_two_files2(self):
136        """Test merging two CSV files."""
137
138        file_title_list = [(self.filenames[0], 'test0'),
139                           (self.filenames[1], 'test1')]
140        csv_tools.merge_csv_key_values(file_title_list, self.OUTPUT_FILE,
141                                       key_col='col_0', data_col='col_2')
142
143        expected = '''col_0,test0,test1
1440,0.0,0.1
1451,1.0,1.1
1462,2.0,2.1
1473,3.0,3.1
1484,4.0,4.1
1495,5.0,5.1
1506,6.0,6.1
1517,7.0,7.1
1528,8.0,8.1
1539,9.0,9.1
154'''
155
156        got = self.get_file_contents(self.OUTPUT_FILE)
157        msg = ('Merging two file,\n'
158               'expected file=\n'
159               '--------------------\n'
160               '%s'
161               '--------------------\n'
162               'got file=\n'
163               '--------------------\n'
164               '%s'
165               '--------------------\n'
166               % (expected, got))
167        self.failUnless(self.str_cmp(got, expected), msg)
168
169
170    def test_merge_four_files(self):
171        """Test merging four CSV files."""
172
173        file_title_list = [(self.filenames[0], 'test0'),
174                           (self.filenames[1], 'test1'),
175                           (self.filenames[2], 'test2'),
176                           (self.filenames[3], 'test3')]
177        csv_tools.merge_csv_key_values(file_title_list, self.OUTPUT_FILE,
178                                       key_col='col_0', data_col='col_2')
179
180        expected = '''col_0,test0,test1,test2,test3
1810,0.0,0.1,0.2,0.3
1821,1.0,1.1,1.2,1.3
1832,2.0,2.1,2.2,2.3
1843,3.0,3.1,3.2,3.3
1854,4.0,4.1,4.2,4.3
1865,5.0,5.1,5.2,5.3
1876,6.0,6.1,6.2,6.3
1887,7.0,7.1,7.2,7.3
1898,8.0,8.1,8.2,8.3
1909,9.0,9.1,9.2,9.3
191'''
192
193        got = self.get_file_contents(self.OUTPUT_FILE)
194        msg = ('Merging four files,\n'
195               'expected file=\n'
196               '--------------------\n'
197               '%s'
198               '--------------------\n'
199               'got file=\n'
200               '--------------------\n'
201               '%s'
202               '--------------------\n'
203               % (expected, got))
204        self.failUnless(self.str_cmp(got, expected), msg)
205
206
207    def test_merge_ten_files(self):
208        """Test merging ten CSV files."""
209
210        file_title_list = [(self.filenames[0], 'test0'),
211                           (self.filenames[1], 'test1'),
212                           (self.filenames[2], 'test2'),
213                           (self.filenames[3], 'test3'),
214                           (self.filenames[4], 'test4'),
215                           (self.filenames[5], 'test5'),
216                           (self.filenames[6], 'test6'),
217                           (self.filenames[7], 'test7'),
218                           (self.filenames[8], 'test8'),
219                           (self.filenames[9], 'test9')]
220        csv_tools.merge_csv_key_values(file_title_list, self.OUTPUT_FILE,
221                                       key_col='col_1', data_col='col_2')
222
223        expected = '''col_1,test0,test1,test2,test3,test4,test5,test6,test7,test8,test9
2240,0.0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9
2251,1.0,1.1,1.2,1.3,1.4,1.5,1.6,1.7,1.8,1.9
2262,2.0,2.1,2.2,2.3,2.4,2.5,2.6,2.7,2.8,2.9
2273,3.0,3.1,3.2,3.3,3.4,3.5,3.6,3.7,3.8,3.9
2284,4.0,4.1,4.2,4.3,4.4,4.5,4.6,4.7,4.8,4.9
2295,5.0,5.1,5.2,5.3,5.4,5.5,5.6,5.7,5.8,5.9
2306,6.0,6.1,6.2,6.3,6.4,6.5,6.6,6.7,6.8,6.9
2317,7.0,7.1,7.2,7.3,7.4,7.5,7.6,7.7,7.8,7.9
2328,8.0,8.1,8.2,8.3,8.4,8.5,8.6,8.7,8.8,8.9
2339,9.0,9.1,9.2,9.3,9.4,9.5,9.6,9.7,9.8,9.9
234'''
235
236        got = self.get_file_contents(self.OUTPUT_FILE)
237        msg = ('Merging four files,\n'
238               'expected file=\n'
239               '--------------------\n'
240               '%s'
241               '--------------------\n'
242               'got file=\n'
243               '--------------------\n'
244               '%s'
245               '--------------------\n'
246               % (expected, got))
247        self.failUnless(self.str_cmp(got, expected), msg)
248
249
250    def test_no_key_column(self):
251        """Test merging two CSV files with expected missing key column."""
252
253        file_title_list = [(self.filenames[0], 'test0'),
254                           (self.filenames[2], 'test2')]
255        self.failUnlessRaises(Exception,
256                              csv_tools.merge_csv_key_values,
257                              file_title_list,
258                              self.OUTPUT_FILE,
259                              key_col='col_A',
260                              data_col='col_2'
261                             )
262
263
264    def test_no_input_files(self):
265        """Test merging *zero* CSV files!"""
266
267        file_title_list = []
268        self.failUnlessRaises(Exception,
269                              csv_tools.merge_csv_key_values,
270                              file_title_list,
271                              self.OUTPUT_FILE,
272                              key_col='col_1',
273                              data_col='col_A'
274                             )
275
276
277    def test_no_data_column(self):
278        """Test merging two CSV files with expected missing data column."""
279
280        file_title_list = [(self.filenames[0], 'test0'),
281                           (self.filenames[2], 'test2')]
282        self.failUnlessRaises(Exception,
283                              csv_tools.merge_csv_key_values,
284                              file_title_list,
285                              self.OUTPUT_FILE,
286                              key_col='col_1',
287                              data_col='col_A'
288                             )
289
290
291    def test_different_num_rows(self):
292        """Test merging two CSV files with different number of rows."""
293
294        # get data from file [1]
295        fd = open(self.filenames[1], 'r')
296        data = fd.readlines()
297        fd.close()
298
299        # delete a row in data and write to test file
300        test_filename = 'my_test.csv'
301        fd = open(test_filename, 'w')
302        fd.write(''.join(data[0:-1]))
303        fd.close()
304
305        file_title_list = [(self.filenames[0], 'test0'),
306                           (test_filename, 'test2')]
307        self.failUnlessRaises(Exception,
308                              csv_tools.merge_csv_key_values,
309                              file_title_list,
310                              self.OUTPUT_FILE,
311                              key_col='col_1',
312                              data_col='col_A'
313                             )
314
315        try:
316            os.remove(test_filename)
317        except:
318            pass
319
320
321    def test_different_key_values(self):
322        """Test merging two CSV files with different key values."""
323
324        # get data from file [1]
325        fd = open(self.filenames[1], 'r')
326        data = fd.readlines()
327        fd.close()
328
329        # chnage a row key value in data and write to test file
330        test_filename = 'my_test.csv'
331        fd = open(test_filename, 'w')
332        data[3] = '1' + data[3]
333        fd.write(''.join(data))
334        fd.close()
335
336        file_title_list = [(self.filenames[0], 'test0'),
337                           (test_filename, 'test2')]
338        self.failUnlessRaises(Exception,
339                              csv_tools.merge_csv_key_values,
340                              file_title_list,
341                              self.OUTPUT_FILE,
342                              key_col='col_1',
343                              data_col='col_A'
344                             )
345
346        try:
347            os.remove(test_filename)
348        except:
349            pass
350
351
352    def test_latex_example(self):
353        """Test merging two CSV files - example from latex doc."""
354
355        fd = open('alpha.csv', 'w')
356        csv_fd = csv.writer(fd)
357        csv_fd.writerow(['time', 'hours', 'stage', 'depth'])
358        csv_fd.writerow(['3600', '1.00', '100.3', '10.2'])
359        csv_fd.writerow(['3636', '1.01', '100.3', '10.0'])
360        csv_fd.writerow(['3672', '1.02', '100.3', '9.7'])
361        csv_fd.writerow(['3708', '1.03', '100.3', '8.9'])
362        csv_fd.writerow(['3744', '1.04', '100.3', '7.1'])
363        fd.close()
364
365        fd = open('beta.csv', 'w')
366        csv_fd = csv.writer(fd)
367        csv_fd.writerow(['time', 'hours', 'stage', 'depth'])
368        csv_fd.writerow(['3600', '1.00', '100.3', '11.3'])
369        csv_fd.writerow(['3636', '1.01', '100.3', '10.5'])
370        csv_fd.writerow(['3672', '1.02', '100.3', '10.0'])
371        csv_fd.writerow(['3708', '1.03', '100.3', '9.7'])
372        csv_fd.writerow(['3744', '1.04', '100.3', '8.2'])
373        fd.close()
374
375        file_title_list = [('alpha.csv', 'alpha'),
376                           ('beta.csv',  'beta')]
377        csv_tools.merge_csv_key_values(file_title_list,
378                                       'gamma.csv',
379                                       key_col='hours',
380                                       data_col='depth')
381
382        expected = '''hours,alpha,beta
3831.00,10.2,11.3
3841.01,10.0,10.5
3851.02,9.7,10.0
3861.03,8.9,9.7
3871.04,7.1,8.2
388'''
389
390        got = self.get_file_contents('gamma.csv')
391        msg = ('Merging two files,\n'
392               'expected file=\n'
393               '--------------------\n'
394               '%s'
395               '--------------------\n'
396               'got file=\n'
397               '--------------------\n'
398               '%s'
399               '--------------------\n'
400               % (expected, got))
401        self.failUnless(self.str_cmp(got, expected), msg)
402
403        try:
404            os.remove('alpha.csv')
405            os.remove('beta.csv')
406            os.remove('gamma.csv')
407        except:
408            pass
409
410
411    def str_cmp(self, str1, str2):
412        '''Compare 2 strings, removing end-of-line stuff first.'''
413
414        s1 = str1.split('\n')
415        s2 = str2.split('\n')
416        for (sub1, sub2) in zip(s1, s2):
417            if sub1 != sub2:
418                return False
419        return True
420
421
422    def get_file_contents(self, filename):
423        '''Return file contents as a string.'''
424
425        fd = open(filename, 'r')
426        data = fd.readlines()
427        fd.close()
428        return ''.join(data).replace('\r', '')
429
430################################################################################
431
432if __name__ == "__main__":
433    suite = unittest.makeSuite(Test_CSV_utils, 'test')
434    runner = unittest.TextTestRunner()
435    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.