1 | #!/usr/bin/env python |
---|
2 | |
---|
3 | '''Routine to add a header to a CSV file. |
---|
4 | ''' |
---|
5 | |
---|
6 | import os |
---|
7 | import tempfile |
---|
8 | |
---|
9 | |
---|
10 | ## |
---|
11 | # @brief Add a header line to a text CSV file. |
---|
12 | # @param file Path to the file to add header to. |
---|
13 | # @param header_list A list of strings - the header to insert. |
---|
14 | # @param be_green If True, go easy on memory, but slower. |
---|
15 | # @note Checks that the first line of the original file has the same |
---|
16 | # number of fields as the new header line. |
---|
17 | # @note Raises exception if above is not true. |
---|
18 | # @note The 'be_green' option is not yet implemented. |
---|
19 | def add_csv_header(file, header_list, be_green=False): |
---|
20 | '''Add a CSV header line to a text file.''' |
---|
21 | |
---|
22 | # open the file to insert header into |
---|
23 | try: |
---|
24 | fd = open(file, 'r') |
---|
25 | except IOError, e: |
---|
26 | msg = "Can't open file '%s': %s" % (file, str(e)) |
---|
27 | raise Exception, msg |
---|
28 | except: |
---|
29 | msg = "Can't open file '%s'" % file |
---|
30 | raise Exception, msg |
---|
31 | |
---|
32 | # get a temporary file. |
---|
33 | # must create in same directory as input file, as we rename it. |
---|
34 | input_dir = os.path.dirname(file) |
---|
35 | (tmp_f, tmp_filename) = tempfile.mkstemp(dir=input_dir) |
---|
36 | tmp_fd = os.fdopen(tmp_f, 'w') |
---|
37 | |
---|
38 | # check the header, create header _string. |
---|
39 | if header_list is None or header_list is []: |
---|
40 | msg = "add_csv_header: header_list can't be None or []" |
---|
41 | raise Exception, msg |
---|
42 | |
---|
43 | header_string = ','.join(header_list) + '\n' |
---|
44 | |
---|
45 | # copy header to output file, then input file |
---|
46 | tmp_fd.write(header_string) |
---|
47 | |
---|
48 | if be_green: |
---|
49 | first_line = True |
---|
50 | for line in fd: |
---|
51 | if first_line: |
---|
52 | first_line = False |
---|
53 | columns = line.strip().split(',') |
---|
54 | if len(columns) != len(header_list): |
---|
55 | msg = ("add_csv_header: File %s has %d columns but header " |
---|
56 | "has %d columns" % (file, len(columns), len(header_list))) |
---|
57 | raise Exception, msg |
---|
58 | tmp_fd.write(line) |
---|
59 | else: |
---|
60 | data = fd.readlines() |
---|
61 | columns = data[0].strip().split(',') |
---|
62 | |
---|
63 | if len(columns) != len(header_list): |
---|
64 | msg = ("add_csv_header: File %s has %d columns but header " |
---|
65 | "has %d columns" % (file, len(columns), len(header_list))) |
---|
66 | raise Exception, msg |
---|
67 | |
---|
68 | data = ''.join(data) |
---|
69 | tmp_fd.write(data) |
---|
70 | |
---|
71 | # close and rename all files |
---|
72 | tmp_fd.close() |
---|
73 | fd.close() |
---|
74 | os.rename(tmp_filename, file) |
---|
75 | |
---|
76 | |
---|
77 | if __name__ == '__main__': |
---|
78 | import sys |
---|
79 | import os |
---|
80 | |
---|
81 | file_data = '1,2,3\n4,5,6\n7,8,9' |
---|
82 | header = ['alpha', 'bravo', 'charlie'] |
---|
83 | filename = '/tmp/add_csv_header.csv' |
---|
84 | filename2 = '/tmp/add_csv_header2.csv' |
---|
85 | |
---|
86 | ###### |
---|
87 | # Create file and test function. |
---|
88 | ###### |
---|
89 | |
---|
90 | # create test file |
---|
91 | fd = open(filename, 'w') |
---|
92 | fd.write(file_data) |
---|
93 | fd.close() |
---|
94 | |
---|
95 | add_csv_header(filename, header) |
---|
96 | |
---|
97 | # read test file |
---|
98 | fd = open(filename, 'r') |
---|
99 | data = fd.readlines() |
---|
100 | fd.close |
---|
101 | |
---|
102 | # check if data as expected |
---|
103 | data = ''.join(data) |
---|
104 | header_string = ','.join(header) + '\n' |
---|
105 | expected = header_string + file_data |
---|
106 | |
---|
107 | msg = 'Expected data:\n%s\ngot:\n%s' % (expected, data) |
---|
108 | assert expected == data, msg |
---|
109 | |
---|
110 | ###### |
---|
111 | # Test the 'be_green' option. |
---|
112 | ###### |
---|
113 | |
---|
114 | # create test file |
---|
115 | fd = open(filename2, 'w') |
---|
116 | fd.write(file_data) |
---|
117 | fd.close() |
---|
118 | |
---|
119 | add_csv_header(filename2, header, be_green=True) |
---|
120 | |
---|
121 | # read test file |
---|
122 | fd = open(filename2, 'r') |
---|
123 | data = fd.readlines() |
---|
124 | fd.close |
---|
125 | |
---|
126 | # check if data as expected |
---|
127 | data = ''.join(data) |
---|
128 | header_string = ','.join(header) + '\n' |
---|
129 | expected = header_string + file_data |
---|
130 | |
---|
131 | msg = 'Expected data:\n%s\ngot:\n%s' % (expected, data) |
---|
132 | assert expected == data, msg |
---|
133 | |
---|
134 | os.remove(filename) |
---|
135 | os.remove(filename2) |
---|