source: anuga_core/source/anuga/lib/add_csv_header/add_csv_header.py @ 6355

Last change on this file since 6355 was 6355, checked in by rwilson, 15 years ago

Made add_csv_header() craete temp file in same directory as input file, so we can always rename.

File size: 2.5 KB
Line 
1#!/usr/bin/env python
2
3'''Routine to add a header to a CSV file.
4'''
5
6import os
7import tempfile
8
9
10##
11# @brief Add a header line to a text CSV file.
12# @param file Path to the file to add header to.
13# @param header_list A list of strings - the header to insert.
14# @param be_green If True, go easy on memory, but slower.
15# @note Checks that the first line of the original file has the same
16#       number of fields as the new header line.
17# @note Raises exception if above is not true.
18# @note The 'be_green' option is not yet implemented.
19def add_csv_header(file, header_list, be_green=False):
20    '''Add a CSV header line to a text file.'''
21
22    # open the file to insert header into
23    try:
24        fd = open(file, 'r')
25    except IOError, e:
26        msg = "Can't open file '%s': %s" % (file, str(e))
27        raise Exception, msg
28    except:
29        msg = "Can't open file '%s'" % file
30        raise Exception, msg
31
32    # get a temporary file.
33    # must create in same directory as input file, as we rename it.
34    input_dir = os.path.dirname(file)
35    (tmp_f, tmp_filename) = tempfile.mkstemp(dir=input_dir)
36    tmp_fd = os.fdopen(tmp_f, 'w')
37
38    # check the header, create header _string.
39    if header_list is None or header_list is []:
40        msg = "add_csv_header: header_list can't be None or []"
41        raise Exception, msg
42
43    header_string = ','.join(header_list) + '\n'
44
45    # copy header to output file, then input file
46    tmp_fd.write(header_string)
47    data = fd.readlines()
48    columns = data[0].strip().split(',')
49
50    if len(columns) != len(header_list):
51        msg = ("add_csv_header: File %s has %d columns but header "
52               "has %d columns" % (file, len(columns), len(header_list)))
53        raise Exception, msg
54
55    data = ''.join(data)
56    tmp_fd.write(data)
57
58    # close and rename all files
59    tmp_fd.close()
60    fd.close()
61    os.rename(tmp_filename, file)
62
63
64if __name__ == '__main__':
65    import sys
66
67    file_data = '1,2,3\n4,5,6\n7,8,9'
68    header = ['alpha', 'bravo', 'charlie']
69    filename = '/tmp/add_csv_header.csv'
70
71    # create test file
72    fd = open(filename, 'w')
73    fd.write(file_data)
74    fd.close()
75
76    add_csv_header(filename, header)
77
78    # read test file
79    fd = open(filename, 'r')
80    data = fd.readlines()
81    fd.close
82
83    # check if data as expected
84    data = ''.join(data)
85    header_string = ','.join(header) + '\n'
86    expected = header_string + file_data
87
88    msg = 'Expected data:\n%s\ngot:\n%s' % (expected, data)
89    assert expected == data, msg
90
Note: See TracBrowser for help on using the repository browser.