source: anuga_core/source/anuga/utilities/file_utils.py @ 7744

Last change on this file since 7744 was 7744, checked in by hudson, 14 years ago

Split off sww2dem from data_manager.

File size: 11.7 KB
Line 
1""" Generic file utilities for creating, parsing deleting
2    and naming files in a manner consistent across ANUGA.
3"""
4
5
6import os, sys
7import csv
8import numpy as num
9
10##
11# @brief Convert a possible filename into a standard form.
12# @param s Filename to process.
13# @return The new filename string.
14def make_filename(s):
15    """Transform argument string into a Sexsuitable filename
16    """
17
18    s = s.strip()
19    s = s.replace(' ', '_')
20    s = s.replace('(', '')
21    s = s.replace(')', '')
22    s = s.replace('__', '_')
23
24    return s
25
26
27##
28# @brief Check that a specified filesystem directory path exists.
29# @param path The dirstory path to check.
30# @param verbose True if this function is to be verbose.
31# @note If directory path doesn't exist, it will be created.
32def check_dir(path, verbose=None):
33    """Check that specified path exists.
34    If path does not exist it will be created if possible
35
36    USAGE:
37       checkdir(path, verbose):
38
39    ARGUMENTS:
40        path -- Directory
41        verbose -- Flag verbose output (default: None)
42
43    RETURN VALUE:
44        Verified path including trailing separator
45    """
46
47    import os.path
48
49    if sys.platform in ['nt', 'dos', 'win32', 'what else?']:
50        unix = 0
51    else:
52        unix = 1
53
54    # add terminal separator, if it's not already there
55    if path[-1] != os.sep:
56        path = path + os.sep
57
58    # expand ~ or ~username in path
59    path = os.path.expanduser(path)
60
61    # create directory if required
62    if not (os.access(path, os.R_OK and os.W_OK) or path == ''):
63        try:
64            exitcode = os.mkdir(path)
65
66            # Change access rights if possible
67            if unix:
68                exitcode = os.system('chmod 775 ' + path)
69            else:
70                pass  # FIXME: What about access rights under Windows?
71
72            if verbose: log.critical('MESSAGE: Directory %s created.' % path)
73        except:
74            log.critical('WARNING: Directory %s could not be created.' % path)
75            if unix:
76                path = '/tmp/'
77            else:
78                path = 'C:' + os.sep
79
80            log.critical("Using directory '%s' instead" % path)
81
82    return path
83
84
85##
86# @brief Delete directory and all sub-directories.
87# @param path Path to the directory to delete.
88def del_dir(path):
89    """Recursively delete directory path and all its contents
90    """
91
92    if os.path.isdir(path):
93        for file in os.listdir(path):
94            X = os.path.join(path, file)
95
96            if os.path.isdir(X) and not os.path.islink(X):
97                del_dir(X)
98            else:
99                try:
100                    os.remove(X)
101                except:
102                    log.critical("Could not remove file %s" % X)
103
104        os.rmdir(path)
105
106
107##
108# @brief ??
109# @param path
110# @param __func__
111# @param verbose True if this function is to be verbose.
112# @note ANOTHER OPTION, IF NEED IN THE FUTURE, Nick B 7/2007
113def rmgeneric(path, func, verbose=False):
114    ERROR_STR= """Error removing %(path)s, %(error)s """
115
116    try:
117        func(path)
118        if verbose: log.critical('Removed %s' % path)
119    except OSError, (errno, strerror):
120        log.critical(ERROR_STR % {'path' : path, 'error': strerror })
121
122
123##
124# @brief Remove directory and all sub-directories.
125# @param path Filesystem path to directory to remove.
126# @param verbose True if this function is to be verbose.
127def removeall(path, verbose=False):
128    if not os.path.isdir(path):
129        return
130
131    for x in os.listdir(path):
132        fullpath = os.path.join(path, x)
133        if os.path.isfile(fullpath):
134            f = os.remove
135            rmgeneric(fullpath, f)
136        elif os.path.isdir(fullpath):
137            removeall(fullpath)
138            f = os.rmdir
139            rmgeneric(fullpath, f, verbose)
140
141
142##
143# @brief Create a standard filename.
144# @param datadir Directory where file is to be created.
145# @param filename Filename 'stem'.
146# @param format Format of the file, becomes filename extension.
147# @param size Size of file, becomes part of filename.
148# @param time Time (float), becomes part of filename.
149# @return The complete filename path, including directory.
150# @note The containing directory is created, if necessary.
151def create_filename(datadir, filename, format, size=None, time=None):
152    FN = check_dir(datadir) + filename
153
154    if size is not None:
155        FN += '_size%d' % size
156
157    if time is not None:
158        FN += '_time%.2f' % time
159
160    FN += '.' + format
161
162    return FN
163
164
165##
166# @brief Get all files with a standard name and a given set of attributes.
167# @param datadir Directory files must be in.
168# @param filename Filename stem.
169# @param format Filename extension.
170# @param size Filename size.
171# @return A list of fielnames (including directory) that match the attributes.
172def get_files(datadir, filename, format, size):
173    """Get all file (names) with given name, size and format
174    """
175
176    import glob
177
178    dir = check_dir(datadir)
179    pattern = dir + os.sep + filename + '_size=%d*.%s' % (size, format)
180
181    return glob.glob(pattern)
182
183
184##
185# @brief Find all files in a directory that contain a given string.
186# @param look_in_dir Path to the directory to look in.
187# @param base_name String that files must contain.
188# @param verbose True if this function is to be verbose.
189def get_all_directories_with_name(look_in_dir='', base_name='', verbose=False):
190    '''
191    Finds all the directories in a "look_in_dir" which contains a "base_name".
192
193    Returns: a list of strings
194
195    Usage:     iterate_over = get_all_directories_with_name(dir, name)
196    then:      for swwfile in iterate_over:
197                   do stuff
198
199    Check "export_grids" and "get_maximum_inundation_data" for examples
200    '''
201
202    if look_in_dir == "":
203        look_in_dir = "."                                  # Unix compatibility
204
205    dir_ls = os.listdir(look_in_dir)
206    iterate_over = [x for x in dir_ls if base_name in x]
207
208    if len(iterate_over) == 0:
209        msg = 'No files of the base name %s' % base_name
210        raise IOError, msg
211
212    if verbose: log.critical('iterate over %s' % iterate_over)
213
214    return iterate_over
215
216
217
218##
219# @brief Find all SWW files in a directory with given stem name.
220# @param look_in_dir The directory to look in.
221# @param base_name The file stem name.
222# @param verbose True if this function is to be verbose.
223# @return A list of found filename strings.
224# @note Will accept 'base_name' with or without '.sww' extension.
225# @note If no files found, raises IOError exception.
226def get_all_swwfiles(look_in_dir='', base_name='', verbose=False):
227    '''
228    Finds all the sww files in a "look_in_dir" which contains a "base_name".
229    will accept base_name with or without the extension ".sww"
230
231    Returns: a list of strings
232
233    Usage:     iterate_over = get_all_swwfiles(dir, name)
234    then
235               for swwfile in iterate_over:
236                   do stuff
237
238    Check "export_grids" and "get_maximum_inundation_data" for examples
239    '''
240
241    # plus tests the extension
242    name, extension = os.path.splitext(base_name)
243
244    if extension != '' and extension != '.sww':
245        msg = 'file %s%s must be a NetCDF sww file!' % (base_name, extension)
246        raise IOError, msg
247
248    if look_in_dir == "":
249        look_in_dir = "."                                   # Unix compatibility
250
251    dir_ls = os.listdir(look_in_dir)
252    iterate_over = [x[:-4] for x in dir_ls if name in x and x[-4:] == '.sww']
253    if len(iterate_over) == 0:
254        msg = 'No files of the base name %s' % name
255        raise IOError, msg
256
257    if verbose: log.critical('iterate over %s' % iterate_over)
258
259    return iterate_over
260
261
262##
263# @brief Find all files in a directory that contain a string and have extension.
264# @param look_in_dir Path to the directory to look in.
265# @param base_name Stem filename of the file(s) of interest.
266# @param extension Extension of the files to look for.
267# @param verbose True if this function is to be verbose.
268# @return A list of found filename strings.
269# @note If no files found, raises IOError exception.
270def get_all_files_with_extension(look_in_dir='',
271                                 base_name='',
272                                 extension='.sww',
273                                 verbose=False):
274    '''Find all files in a directory with given stem name.
275    Finds all the sww files in a "look_in_dir" which contains a "base_name".
276
277    Returns: a list of strings
278
279    Usage:     iterate_over = get_all_swwfiles(dir, name)
280    then
281               for swwfile in iterate_over:
282                   do stuff
283
284    Check "export_grids" and "get_maximum_inundation_data" for examples
285    '''
286
287    # plus tests the extension
288    name, ext = os.path.splitext(base_name)
289
290    if ext != '' and ext != extension:
291        msg = 'base_name %s must be a file with %s extension!' \
292              % (base_name, extension)
293        raise IOError, msg
294
295    if look_in_dir == "":
296        look_in_dir = "."                               # Unix compatibility
297
298    dir_ls = os.listdir(look_in_dir)
299    iterate_over = [x[:-4] for x in dir_ls if name in x and x[-4:] == extension]
300
301    if len(iterate_over) == 0:
302        msg = 'No files of the base name %s in %s' % (name, look_in_dir)
303        raise IOError, msg
304
305    if verbose: log.critical('iterate over %s' % iterate_over)
306
307    return iterate_over
308
309
310##
311# @brief Read a CSV file and convert to a dictionary of {key: column}.
312# @param file_name The path to the file to read.
313# @param title_check_list List of titles that *must* be columns in the file.
314# @return Two dicts: ({key:column}, {title:index}).
315# @note WARNING: Values are returned as strings.
316def load_csv_as_dict(file_name, title_check_list=None):
317    """
318    Load in the csv as a dictionary, title as key and column info as value.
319    Also, create a dictionary, title as key and column index as value,
320    to keep track of the column order.
321
322    Two dictionaries are returned.
323
324    WARNING: Values are returned as strings.
325             Do this to change a list of strings to a list of floats
326                 time = [float(x) for x in time]
327    """
328
329    # FIXME(Ole): Consider dealing with files without headers
330    # FIXME(Ole): Consider a wrapper automatically converting text fields
331    #             to the right type by trying for: int, float, string
332   
333    attribute_dic = {}
334    title_index_dic = {}
335    titles_stripped = [] # List of titles
336
337    reader = csv.reader(file(file_name))
338
339    # Read in and manipulate the title info
340    titles = reader.next()
341    for i, title in enumerate(titles):
342        header = title.strip()
343        titles_stripped.append(header)
344        title_index_dic[header] = i
345    title_count = len(titles_stripped)
346
347    # Check required columns
348    if title_check_list is not None:
349        for title_check in title_check_list:
350            if not title_index_dic.has_key(title_check):
351                msg = 'Reading error. This row is not present %s' % title_check
352                raise IOError, msg
353
354    # Create a dictionary of column values, indexed by column title
355    for line in reader:
356        n = len(line) # Number of entries
357        if n != title_count:
358            msg = 'Entry in file %s had %d columns ' % (file_name, n)
359            msg += 'although there were %d headers' % title_count
360            raise IOError, msg
361        for i, value in enumerate(line):
362            attribute_dic.setdefault(titles_stripped[i], []).append(value)
363
364    return attribute_dic, title_index_dic
365
366
367         
368##
369# @brief Convert CSV file to a dictionary of arrays.
370# @param file_name The path to the file to read.
371def load_csv_as_array(file_name):
372    """
373    Convert CSV files of the form:
374
375    time, discharge, velocity
376    0.0,  1.2,       0.0
377    0.1,  3.2,       1.1
378    ...
379
380    to a dictionary of numeric arrays.
381
382
383    See underlying function load_csv_as_dict for more details.
384    """
385
386    X, _ = load_csv_as_dict(file_name)
387
388    Y = {}
389    for key in X.keys():
390        Y[key] = num.array([float(x) for x in X[key]])
391
392    return Y
Note: See TracBrowser for help on using the repository browser.