source: branches/numpy/anuga/utilities/test_csv_utils.py @ 7100

Last change on this file since 7100 was 7100, checked in by rwilson, 15 years ago

Added the CSV utility functions into numpy branch.

File size: 10.8 KB
Line 
1#!/usr/bin/env python
2
3import os
4import unittest
5import tempfile
6import csv
7
8import csv_utils
9
10
11# this dictionary sets the column header string for
12# column number modulo 4.
13col_text_string = {0: 'col_%d',
14                   1: ' col_%d',
15                   2: 'col_%d ',
16                   3: ' col_%d '
17                  }
18
19class Test_CSV_utils(unittest.TestCase):
20
21    NUM_FILES = 10
22    NUM_COLS = 6
23    NUM_LINES = 10
24    OUTPUT_FILE = 'test.csv'
25   
26    def setUp(self):
27        # create temporary scratch directory
28        self.tmp_dir = tempfile.mkdtemp()
29
30        # create 4 test CSV files
31        self.num_files = self.NUM_FILES
32        self.filenames = []
33        for i in range(self.NUM_FILES):
34            self.filenames.append(tempfile.mktemp('.csv'))
35        for (i, fn) in enumerate(self.filenames):
36            fd = open(fn, 'w')
37            csv_fd = csv.writer(fd)
38            # write colums row
39            columns = []
40            for j in range(self.NUM_COLS):
41                columns.append(col_text_string[j % 4] % j)
42            csv_fd.writerow(columns)
43
44            # write data rows
45            for j in xrange(self.NUM_LINES):
46                data = [j, j, '%d.%d' % (j, i)] + ['qwert']*(self.NUM_COLS-3)
47                csv_fd.writerow(data)
48            fd.close()
49
50
51    def tearDown(self):
52        for fn in self.filenames:
53            try:
54                os.remove(fn)
55            except:
56                pass
57        try:
58            os.remove(self.OUTPUT_FILE)
59        except:
60            pass
61
62
63    def test_merge_one_file(self):
64        """Test merging a single CSV file.
65       
66        This is the same as a two coluymn extract, with column rename.
67        """
68
69        file_title_list = [(self.filenames[0], 'test')]
70        csv_utils.merge_csv_key_values(file_title_list, self.OUTPUT_FILE,
71                                       key_col='col_0', data_col='col_3')
72
73        expected = '''col_0,test
740,qwert
751,qwert
762,qwert
773,qwert
784,qwert
795,qwert
806,qwert
817,qwert
828,qwert
839,qwert
84'''
85
86        got = self.get_file_contents(self.OUTPUT_FILE)
87        msg = ('Merging one file,\n'
88               'expected file=\n'
89               '--------------------\n'
90               '%s'
91               '--------------------\n'
92               'got file=\n'
93               '--------------------\n'
94               '%s'
95               '--------------------\n'
96               % (expected, got))
97        self.failUnless(self.str_cmp(got, expected), msg)
98
99
100    def test_merge_two_files(self):
101        """Test merging two CSV files."""
102
103        file_title_list = [(self.filenames[0], 'test0'),
104                           (self.filenames[1], 'test1')]
105        csv_utils.merge_csv_key_values(file_title_list, self.OUTPUT_FILE,
106                                       key_col='col_0', data_col='col_3')
107
108        expected = '''col_0,test0,test1
1090,qwert,qwert
1101,qwert,qwert
1112,qwert,qwert
1123,qwert,qwert
1134,qwert,qwert
1145,qwert,qwert
1156,qwert,qwert
1167,qwert,qwert
1178,qwert,qwert
1189,qwert,qwert
119'''
120
121        got = self.get_file_contents(self.OUTPUT_FILE)
122        msg = ('Merging two files,\n'
123               'expected file=\n'
124               '--------------------\n'
125               '%s'
126               '--------------------\n'
127               'got file=\n'
128               '--------------------\n'
129               '%s'
130               '--------------------\n'
131               % (expected, got))
132        self.failUnless(self.str_cmp(got, expected), msg)
133
134
135    def test_merge_two_files2(self):
136        """Test merging two CSV files."""
137
138        file_title_list = [(self.filenames[0], 'test0'),
139                           (self.filenames[1], 'test1')]
140        csv_utils.merge_csv_key_values(file_title_list, self.OUTPUT_FILE,
141                                       key_col='col_0', data_col='col_2')
142
143        expected = '''col_0,test0,test1
1440,0.0,0.1
1451,1.0,1.1
1462,2.0,2.1
1473,3.0,3.1
1484,4.0,4.1
1495,5.0,5.1
1506,6.0,6.1
1517,7.0,7.1
1528,8.0,8.1
1539,9.0,9.1
154'''
155
156        got = self.get_file_contents(self.OUTPUT_FILE)
157        msg = ('Merging two file,\n'
158               'expected file=\n'
159               '--------------------\n'
160               '%s'
161               '--------------------\n'
162               'got file=\n'
163               '--------------------\n'
164               '%s'
165               '--------------------\n'
166               % (expected, got))
167        self.failUnless(self.str_cmp(got, expected), msg)
168
169
170    def test_merge_four_files(self):
171        """Test merging four CSV files."""
172
173        file_title_list = [(self.filenames[0], 'test0'),
174                           (self.filenames[1], 'test1'),
175                           (self.filenames[2], 'test2'),
176                           (self.filenames[3], 'test3')]
177        csv_utils.merge_csv_key_values(file_title_list, self.OUTPUT_FILE,
178                                       key_col='col_0', data_col='col_2')
179
180        expected = '''col_0,test0,test1,test2,test3
1810,0.0,0.1,0.2,0.3
1821,1.0,1.1,1.2,1.3
1832,2.0,2.1,2.2,2.3
1843,3.0,3.1,3.2,3.3
1854,4.0,4.1,4.2,4.3
1865,5.0,5.1,5.2,5.3
1876,6.0,6.1,6.2,6.3
1887,7.0,7.1,7.2,7.3
1898,8.0,8.1,8.2,8.3
1909,9.0,9.1,9.2,9.3
191'''
192
193        got = self.get_file_contents(self.OUTPUT_FILE)
194        msg = ('Merging four files,\n'
195               'expected file=\n'
196               '--------------------\n'
197               '%s'
198               '--------------------\n'
199               'got file=\n'
200               '--------------------\n'
201               '%s'
202               '--------------------\n'
203               % (expected, got))
204        self.failUnless(self.str_cmp(got, expected), msg)
205
206
207    def test_merge_ten_files(self):
208        """Test merging ten CSV files."""
209
210        file_title_list = [(self.filenames[0], 'test0'),
211                           (self.filenames[1], 'test1'),
212                           (self.filenames[2], 'test2'),
213                           (self.filenames[3], 'test3'),
214                           (self.filenames[4], 'test4'),
215                           (self.filenames[5], 'test5'),
216                           (self.filenames[6], 'test6'),
217                           (self.filenames[7], 'test7'),
218                           (self.filenames[8], 'test8'),
219                           (self.filenames[9], 'test9')]
220        csv_utils.merge_csv_key_values(file_title_list, self.OUTPUT_FILE,
221                                       key_col='col_1', data_col='col_2')
222
223        expected = '''col_1,test0,test1,test2,test3,test4,test5,test6,test7,test8,test9
2240,0.0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9
2251,1.0,1.1,1.2,1.3,1.4,1.5,1.6,1.7,1.8,1.9
2262,2.0,2.1,2.2,2.3,2.4,2.5,2.6,2.7,2.8,2.9
2273,3.0,3.1,3.2,3.3,3.4,3.5,3.6,3.7,3.8,3.9
2284,4.0,4.1,4.2,4.3,4.4,4.5,4.6,4.7,4.8,4.9
2295,5.0,5.1,5.2,5.3,5.4,5.5,5.6,5.7,5.8,5.9
2306,6.0,6.1,6.2,6.3,6.4,6.5,6.6,6.7,6.8,6.9
2317,7.0,7.1,7.2,7.3,7.4,7.5,7.6,7.7,7.8,7.9
2328,8.0,8.1,8.2,8.3,8.4,8.5,8.6,8.7,8.8,8.9
2339,9.0,9.1,9.2,9.3,9.4,9.5,9.6,9.7,9.8,9.9
234'''
235
236        got = self.get_file_contents(self.OUTPUT_FILE)
237        msg = ('Merging four files,\n'
238               'expected file=\n'
239               '--------------------\n'
240               '%s'
241               '--------------------\n'
242               'got file=\n'
243               '--------------------\n'
244               '%s'
245               '--------------------\n'
246               % (expected, got))
247        self.failUnless(self.str_cmp(got, expected), msg)
248
249
250    def test_no_key_column(self):
251        """Test merging two CSV files with expected missing key column."""
252
253        file_title_list = [(self.filenames[0], 'test0'),
254                           (self.filenames[2], 'test2')]
255        self.failUnlessRaises(Exception,
256                              csv_utils.merge_csv_key_values,
257                              file_title_list,
258                              self.OUTPUT_FILE,
259                              key_col='col_A',
260                              data_col='col_2'
261                             )
262
263
264    def test_no_data_column(self):
265        """Test merging two CSV files with expected missing data column."""
266
267        file_title_list = [(self.filenames[0], 'test0'),
268                           (self.filenames[2], 'test2')]
269        self.failUnlessRaises(Exception,
270                              csv_utils.merge_csv_key_values,
271                              file_title_list,
272                              self.OUTPUT_FILE,
273                              key_col='col_1',
274                              data_col='col_A'
275                             )
276
277
278    def test_different_num_rows(self):
279        """Test merging two CSV files with different number of rows."""
280
281        # get data from file [1]
282        fd = open(self.filenames[1], 'r')
283        data = fd.readlines()
284        fd.close()
285
286        # delete a row in data and write to test file
287        test_filename = 'my_test.csv'
288        fd = open(test_filename, 'w')
289        fd.write(''.join(data[0:-1]))
290        fd.close()
291
292        file_title_list = [(self.filenames[0], 'test0'),
293                           (test_filename, 'test2')]
294        self.failUnlessRaises(Exception,
295                              csv_utils.merge_csv_key_values,
296                              file_title_list,
297                              self.OUTPUT_FILE,
298                              key_col='col_1',
299                              data_col='col_A'
300                             )
301
302        try:
303            os.remove(test_filename)
304        except:
305            pass
306
307
308    def test_different_key_values(self):
309        """Test merging two CSV files with different key values."""
310
311        # get data from file [1]
312        fd = open(self.filenames[1], 'r')
313        data = fd.readlines()
314        fd.close()
315
316        # chnage a row key value in data and write to test file
317        test_filename = 'my_test.csv'
318        fd = open(test_filename, 'w')
319        data[3] = '1' + data[3]
320        fd.write(''.join(data))
321        fd.close()
322
323        file_title_list = [(self.filenames[0], 'test0'),
324                           (test_filename, 'test2')]
325        self.failUnlessRaises(Exception,
326                              csv_utils.merge_csv_key_values,
327                              file_title_list,
328                              self.OUTPUT_FILE,
329                              key_col='col_1',
330                              data_col='col_A'
331                             )
332
333        try:
334            os.remove(test_filename)
335        except:
336            pass
337
338
339    def str_cmp(self, str1, str2):
340        '''Compare 2 strings, removing end-of-line stuff first.'''
341
342        s1 = str1.split('\n')
343        s2 = str2.split('\n')
344        for (sub1, sub2) in zip(s1, s2):
345            if sub1 != sub2:
346                return False
347        return True
348
349
350    def get_file_contents(self, filename):
351        '''Return file contents as a string.'''
352
353        fd = open(filename, 'r')
354        data = fd.readlines()
355        fd.close()
356        return ''.join(data).replace('\r', '')
357
358################################################################################
359
360if __name__ == "__main__":
361    suite = unittest.makeSuite(Test_CSV_utils, 'test')
362    runner = unittest.TextTestRunner()
363    runner.run(suite)
Note: See TracBrowser for help on using the repository browser.