1 | #!/usr/bin/env python |
---|
2 | |
---|
3 | '''Routine to add a header to a CSV file. |
---|
4 | ''' |
---|
5 | |
---|
6 | import os |
---|
7 | import tempfile |
---|
8 | |
---|
9 | |
---|
10 | ## |
---|
11 | # @brief Add a header line to a text CSV file. |
---|
12 | # @param file Path to the file to add header to. |
---|
13 | # @param header_list A list of strings - the header to insert. |
---|
14 | # @param be_green If True, go easy on memory, but slower. |
---|
15 | # @note Checks that the first line of the original file has the same |
---|
16 | # number of fields as the new header line. |
---|
17 | # @note Raises exception if above is not true. |
---|
18 | # @note The 'be_green' option is not yet implemented. |
---|
19 | def add_csv_header(file, header_list, be_green=False): |
---|
20 | '''Add a CSV header line to a text file.''' |
---|
21 | |
---|
22 | # open the file to insert header into |
---|
23 | try: |
---|
24 | fd = open(file, 'r') |
---|
25 | except IOError, e: |
---|
26 | msg = "Can't open file '%s': %s" % (file, str(e)) |
---|
27 | raise Exception, msg |
---|
28 | except: |
---|
29 | msg = "Can't open file '%s'" % file |
---|
30 | raise Exception, msg |
---|
31 | |
---|
32 | # get a temporary file |
---|
33 | (tmp_f, tmp_filename) = tempfile.mkstemp() |
---|
34 | tmp_fd = os.fdopen(tmp_f, 'w') |
---|
35 | |
---|
36 | # check the header, create header _string. |
---|
37 | if header_list is None or header_list is []: |
---|
38 | msg = "add_csv_header: header_list can't be None or []" |
---|
39 | raise Exception, msg |
---|
40 | |
---|
41 | header_string = ','.join(header_list) + '\n' |
---|
42 | |
---|
43 | # copy header to output file, then input file |
---|
44 | tmp_fd.write(header_string) |
---|
45 | data = fd.readlines() |
---|
46 | columns = data[0].strip().split(',') |
---|
47 | |
---|
48 | if len(columns) != len(header_list): |
---|
49 | msg = ("add_csv_header: File %s has %d columns but header " |
---|
50 | "has %d columns" % (file, len(columns), len(header_list))) |
---|
51 | raise Exception, msg |
---|
52 | |
---|
53 | data = ''.join(data) |
---|
54 | tmp_fd.write(data) |
---|
55 | |
---|
56 | # close and rename all files |
---|
57 | tmp_fd.close() |
---|
58 | fd.close() |
---|
59 | os.rename(tmp_filename, file) |
---|
60 | |
---|
61 | |
---|
62 | if __name__ == '__main__': |
---|
63 | import sys |
---|
64 | |
---|
65 | file_data = '1,2,3\n4,5,6\n7,8,9' |
---|
66 | header = ['alpha', 'bravo', 'charlie'] |
---|
67 | filename = '/tmp/add_csv_header.csv' |
---|
68 | |
---|
69 | # create test file |
---|
70 | fd = open(filename, 'w') |
---|
71 | fd.write(file_data) |
---|
72 | fd.close() |
---|
73 | |
---|
74 | add_csv_header(filename, header) |
---|
75 | |
---|
76 | # read test file |
---|
77 | fd = open(filename, 'r') |
---|
78 | data = fd.readlines() |
---|
79 | fd.close |
---|
80 | |
---|
81 | # check if data as expected |
---|
82 | data = ''.join(data) |
---|
83 | header_string = ','.join(header) + '\n' |
---|
84 | expected = header_string + file_data |
---|
85 | |
---|
86 | msg = 'Expected data:\n%s\ngot:\n%s' % (expected, data) |
---|
87 | assert expected == data, msg |
---|
88 | |
---|