source: branches/numpy/anuga/lib/add_csv_header/add_csv_header.py @ 6553

Last change on this file since 6553 was 6553, checked in by rwilson, 15 years ago

Merged trunk into numpy, all tests and validations work.

File size: 3.6 KB
Line 
1#!/usr/bin/env python
2
3'''Routine to add a header to a CSV file.
4'''
5
6import os
7import tempfile
8
9
10##
11# @brief Add a header line to a text CSV file.
12# @param file Path to the file to add header to.
13# @param header_list A list of strings - the header to insert.
14# @param be_green If True, go easy on memory, but slower.
15# @note Checks that the first line of the original file has the same
16#       number of fields as the new header line.
17# @note Raises exception if above is not true.
18# @note The 'be_green' option is not yet implemented.
19def add_csv_header(file, header_list, be_green=False):
20    '''Add a CSV header line to a text file.'''
21
22    # open the file to insert header into
23    try:
24        fd = open(file, 'r')
25    except IOError, e:
26        msg = "Can't open file '%s': %s" % (file, str(e))
27        raise Exception, msg
28    except:
29        msg = "Can't open file '%s'" % file
30        raise Exception, msg
31
32    # get a temporary file.
33    # must create in same directory as input file, as we rename it.
34    input_dir = os.path.dirname(file)
35    (tmp_f, tmp_filename) = tempfile.mkstemp(dir=input_dir)
36    tmp_fd = os.fdopen(tmp_f, 'w')
37
38    # check the header, create header _string.
39    if header_list is None or header_list is []:
40        msg = "add_csv_header: header_list can't be None or []"
41        raise Exception, msg
42
43    header_string = ','.join(header_list) + '\n'
44
45    # copy header to output file, then input file
46    tmp_fd.write(header_string)
47
48    if be_green:
49        first_line = True
50        for line in fd:
51            if first_line:
52                first_line = False
53                columns = line.strip().split(',')
54                if len(columns) != len(header_list):
55                    msg = ("add_csv_header: File %s has %d columns but header "
56                           "has %d columns" % (file, len(columns), len(header_list)))
57                    raise Exception, msg
58            tmp_fd.write(line)
59    else:
60        data = fd.readlines()
61        columns = data[0].strip().split(',')
62
63        if len(columns) != len(header_list):
64            msg = ("add_csv_header: File %s has %d columns but header "
65                   "has %d columns" % (file, len(columns), len(header_list)))
66            raise Exception, msg
67
68        data = ''.join(data)
69        tmp_fd.write(data)
70
71    # close and rename all files
72    tmp_fd.close()
73    fd.close()
74    os.rename(tmp_filename, file)
75
76
77if __name__ == '__main__':
78    import sys
79    import os
80
81    file_data = '1,2,3\n4,5,6\n7,8,9'
82    header = ['alpha', 'bravo', 'charlie']
83    filename = '/tmp/add_csv_header.csv'
84    filename2 = '/tmp/add_csv_header2.csv'
85
86######
87# Create file and test function.
88######
89
90    # create test file
91    fd = open(filename, 'w')
92    fd.write(file_data)
93    fd.close()
94
95    add_csv_header(filename, header)
96
97    # read test file
98    fd = open(filename, 'r')
99    data = fd.readlines()
100    fd.close
101
102    # check if data as expected
103    data = ''.join(data)
104    header_string = ','.join(header) + '\n'
105    expected = header_string + file_data
106
107    msg = 'Expected data:\n%s\ngot:\n%s' % (expected, data)
108    assert expected == data, msg
109
110######
111# Test the 'be_green' option.
112######
113
114    # create test file
115    fd = open(filename2, 'w')
116    fd.write(file_data)
117    fd.close()
118
119    add_csv_header(filename2, header, be_green=True)
120
121    # read test file
122    fd = open(filename2, 'r')
123    data = fd.readlines()
124    fd.close
125
126    # check if data as expected
127    data = ''.join(data)
128    header_string = ','.join(header) + '\n'
129    expected = header_string + file_data
130
131    msg = 'Expected data:\n%s\ngot:\n%s' % (expected, data)
132    assert expected == data, msg
133
134    os.remove(filename)
135    os.remove(filename2)
Note: See TracBrowser for help on using the repository browser.