source: trunk/anuga_core/source/anuga/utilities/file_utils.py @ 7756

Last change on this file since 7756 was 7756, checked in by hudson, 14 years ago

Fixed broken unit tests in file utils.

File size: 12.9 KB
Line 
1""" Generic file utilities for creating, parsing deleting
2    and naming files in a manner consistent across ANUGA.
3"""
4
5
6import os, sys
7import csv
8import numpy as num
9import shutil
10
11##
12# @brief Convert a possible filename into a standard form.
13# @param s Filename to process.
14# @return The new filename string.
15def make_filename(s):
16    """Transform argument string into a Sexsuitable filename
17    """
18
19    s = s.strip()
20    s = s.replace(' ', '_')
21    s = s.replace('(', '')
22    s = s.replace(')', '')
23    s = s.replace('__', '_')
24
25    return s
26
27
28##
29# @brief Check that a specified filesystem directory path exists.
30# @param path The dirstory path to check.
31# @param verbose True if this function is to be verbose.
32# @note If directory path doesn't exist, it will be created.
33def check_dir(path, verbose=None):
34    """Check that specified path exists.
35    If path does not exist it will be created if possible
36
37    USAGE:
38       checkdir(path, verbose):
39
40    ARGUMENTS:
41        path -- Directory
42        verbose -- Flag verbose output (default: None)
43
44    RETURN VALUE:
45        Verified path including trailing separator
46    """
47
48    import os.path
49
50    if sys.platform in ['nt', 'dos', 'win32', 'what else?']:
51        unix = 0
52    else:
53        unix = 1
54
55    # add terminal separator, if it's not already there
56    if path[-1] != os.sep:
57        path = path + os.sep
58
59    # expand ~ or ~username in path
60    path = os.path.expanduser(path)
61
62    # create directory if required
63    if not (os.access(path, os.R_OK and os.W_OK) or path == ''):
64        try:
65            exitcode = os.mkdir(path)
66
67            # Change access rights if possible
68            if unix:
69                exitcode = os.system('chmod 775 ' + path)
70            else:
71                pass  # FIXME: What about access rights under Windows?
72
73            if verbose: log.critical('MESSAGE: Directory %s created.' % path)
74        except:
75            log.critical('WARNING: Directory %s could not be created.' % path)
76            if unix:
77                path = '/tmp/'
78            else:
79                path = 'C:' + os.sep
80
81            log.critical("Using directory '%s' instead" % path)
82
83    return path
84
85
86##
87# @brief Delete directory and all sub-directories.
88# @param path Path to the directory to delete.
89def del_dir(path):
90    """Recursively delete directory path and all its contents
91    """
92
93    if os.path.isdir(path):
94        for file in os.listdir(path):
95            X = os.path.join(path, file)
96
97            if os.path.isdir(X) and not os.path.islink(X):
98                del_dir(X)
99            else:
100                try:
101                    os.remove(X)
102                except:
103                    log.critical("Could not remove file %s" % X)
104
105        os.rmdir(path)
106
107
108##
109# @brief ??
110# @param path
111# @param __func__
112# @param verbose True if this function is to be verbose.
113# @note ANOTHER OPTION, IF NEED IN THE FUTURE, Nick B 7/2007
114def rmgeneric(path, func, verbose=False):
115    ERROR_STR= """Error removing %(path)s, %(error)s """
116
117    try:
118        func(path)
119        if verbose: log.critical('Removed %s' % path)
120    except OSError, (errno, strerror):
121        log.critical(ERROR_STR % {'path' : path, 'error': strerror })
122
123
124##
125# @brief Remove directory and all sub-directories.
126# @param path Filesystem path to directory to remove.
127# @param verbose True if this function is to be verbose.
128def removeall(path, verbose=False):
129    if not os.path.isdir(path):
130        return
131
132    for x in os.listdir(path):
133        fullpath = os.path.join(path, x)
134        if os.path.isfile(fullpath):
135            f = os.remove
136            rmgeneric(fullpath, f)
137        elif os.path.isdir(fullpath):
138            removeall(fullpath)
139            f = os.rmdir
140            rmgeneric(fullpath, f, verbose)
141
142
143##
144# @brief Create a standard filename.
145# @param datadir Directory where file is to be created.
146# @param filename Filename 'stem'.
147# @param format Format of the file, becomes filename extension.
148# @param size Size of file, becomes part of filename.
149# @param time Time (float), becomes part of filename.
150# @return The complete filename path, including directory.
151# @note The containing directory is created, if necessary.
152def create_filename(datadir, filename, format, size=None, time=None):
153    FN = check_dir(datadir) + filename
154
155    if size is not None:
156        FN += '_size%d' % size
157
158    if time is not None:
159        FN += '_time%.2f' % time
160
161    FN += '.' + format
162
163    return FN
164
165
166##
167# @brief Get all files with a standard name and a given set of attributes.
168# @param datadir Directory files must be in.
169# @param filename Filename stem.
170# @param format Filename extension.
171# @param size Filename size.
172# @return A list of fielnames (including directory) that match the attributes.
173def get_files(datadir, filename, format, size):
174    """Get all file (names) with given name, size and format
175    """
176
177    import glob
178
179    dir = check_dir(datadir)
180    pattern = dir + os.sep + filename + '_size=%d*.%s' % (size, format)
181
182    return glob.glob(pattern)
183
184
185##
186# @brief Find all files in a directory that contain a given string.
187# @param look_in_dir Path to the directory to look in.
188# @param base_name String that files must contain.
189# @param verbose True if this function is to be verbose.
190def get_all_directories_with_name(look_in_dir='', base_name='', verbose=False):
191    '''
192    Finds all the directories in a "look_in_dir" which contains a "base_name".
193
194    Returns: a list of strings
195
196    Usage:     iterate_over = get_all_directories_with_name(dir, name)
197    then:      for swwfile in iterate_over:
198                   do stuff
199
200    Check "export_grids" and "get_maximum_inundation_data" for examples
201    '''
202
203    if look_in_dir == "":
204        look_in_dir = "."                                  # Unix compatibility
205
206    dir_ls = os.listdir(look_in_dir)
207    iterate_over = [x for x in dir_ls if base_name in x]
208
209    if len(iterate_over) == 0:
210        msg = 'No files of the base name %s' % base_name
211        raise IOError, msg
212
213    if verbose: log.critical('iterate over %s' % iterate_over)
214
215    return iterate_over
216
217
218
219##
220# @brief Find all SWW files in a directory with given stem name.
221# @param look_in_dir The directory to look in.
222# @param base_name The file stem name.
223# @param verbose True if this function is to be verbose.
224# @return A list of found filename strings.
225# @note Will accept 'base_name' with or without '.sww' extension.
226# @note If no files found, raises IOError exception.
227def get_all_swwfiles(look_in_dir='', base_name='', verbose=False):
228    '''
229    Finds all the sww files in a "look_in_dir" which contains a "base_name".
230    will accept base_name with or without the extension ".sww"
231
232    Returns: a list of strings
233
234    Usage:     iterate_over = get_all_swwfiles(dir, name)
235    then
236               for swwfile in iterate_over:
237                   do stuff
238
239    Check "export_grids" and "get_maximum_inundation_data" for examples
240    '''
241
242    # plus tests the extension
243    name, extension = os.path.splitext(base_name)
244
245    if extension != '' and extension != '.sww':
246        msg = 'file %s%s must be a NetCDF sww file!' % (base_name, extension)
247        raise IOError, msg
248
249    if look_in_dir == "":
250        look_in_dir = "."                                   # Unix compatibility
251
252    dir_ls = os.listdir(look_in_dir)
253    iterate_over = [x[:-4] for x in dir_ls if name in x and x[-4:] == '.sww']
254    if len(iterate_over) == 0:
255        msg = 'No files of the base name %s' % name
256        raise IOError, msg
257
258    if verbose: log.critical('iterate over %s' % iterate_over)
259
260    return iterate_over
261
262
263##
264# @brief Find all files in a directory that contain a string and have extension.
265# @param look_in_dir Path to the directory to look in.
266# @param base_name Stem filename of the file(s) of interest.
267# @param extension Extension of the files to look for.
268# @param verbose True if this function is to be verbose.
269# @return A list of found filename strings.
270# @note If no files found, raises IOError exception.
271def get_all_files_with_extension(look_in_dir='',
272                                 base_name='',
273                                 extension='.sww',
274                                 verbose=False):
275    '''Find all files in a directory with given stem name.
276    Finds all the sww files in a "look_in_dir" which contains a "base_name".
277
278    Returns: a list of strings
279
280    Usage:     iterate_over = get_all_swwfiles(dir, name)
281    then
282               for swwfile in iterate_over:
283                   do stuff
284
285    Check "export_grids" and "get_maximum_inundation_data" for examples
286    '''
287
288    # plus tests the extension
289    name, ext = os.path.splitext(base_name)
290
291    if ext != '' and ext != extension:
292        msg = 'base_name %s must be a file with %s extension!' \
293              % (base_name, extension)
294        raise IOError, msg
295
296    if look_in_dir == "":
297        look_in_dir = "."                               # Unix compatibility
298
299    dir_ls = os.listdir(look_in_dir)
300    iterate_over = [x[:-4] for x in dir_ls if name in x and x[-4:] == extension]
301
302    if len(iterate_over) == 0:
303        msg = 'No files of the base name %s in %s' % (name, look_in_dir)
304        raise IOError, msg
305
306    if verbose: log.critical('iterate over %s' % iterate_over)
307
308    return iterate_over
309
310
311##
312# @brief Read a CSV file and convert to a dictionary of {key: column}.
313# @param file_name The path to the file to read.
314# @param title_check_list List of titles that *must* be columns in the file.
315# @return Two dicts: ({key:column}, {title:index}).
316# @note WARNING: Values are returned as strings.
317def load_csv_as_dict(file_name, title_check_list=None):
318    """
319    Load in the csv as a dictionary, title as key and column info as value.
320    Also, create a dictionary, title as key and column index as value,
321    to keep track of the column order.
322
323    Two dictionaries are returned.
324
325    WARNING: Values are returned as strings.
326             Do this to change a list of strings to a list of floats
327                 time = [float(x) for x in time]
328    """
329
330    # FIXME(Ole): Consider dealing with files without headers
331    # FIXME(Ole): Consider a wrapper automatically converting text fields
332    #             to the right type by trying for: int, float, string
333   
334    attribute_dic = {}
335    title_index_dic = {}
336    titles_stripped = [] # List of titles
337
338    reader = csv.reader(file(file_name))
339
340    # Read in and manipulate the title info
341    titles = reader.next()
342    for i, title in enumerate(titles):
343        header = title.strip()
344        titles_stripped.append(header)
345        title_index_dic[header] = i
346    title_count = len(titles_stripped)
347
348    # Check required columns
349    if title_check_list is not None:
350        for title_check in title_check_list:
351            if not title_index_dic.has_key(title_check):
352                msg = 'Reading error. This row is not present %s' % title_check
353                raise IOError, msg
354
355    # Create a dictionary of column values, indexed by column title
356    for line in reader:
357        n = len(line) # Number of entries
358        if n != title_count:
359            msg = 'Entry in file %s had %d columns ' % (file_name, n)
360            msg += 'although there were %d headers' % title_count
361            raise IOError, msg
362        for i, value in enumerate(line):
363            attribute_dic.setdefault(titles_stripped[i], []).append(value)
364
365    return attribute_dic, title_index_dic
366
367
368         
369##
370# @brief Convert CSV file to a dictionary of arrays.
371# @param file_name The path to the file to read.
372def load_csv_as_array(file_name):
373    """
374    Convert CSV files of the form:
375
376    time, discharge, velocity
377    0.0,  1.2,       0.0
378    0.1,  3.2,       1.1
379    ...
380
381    to a dictionary of numeric arrays.
382
383
384    See underlying function load_csv_as_dict for more details.
385    """
386
387    X, _ = load_csv_as_dict(file_name)
388
389    Y = {}
390    for key in X.keys():
391        Y[key] = num.array([float(x) for x in X[key]])
392
393    return Y
394
395
396
397def copy_code_files(dir_name, filename1, filename2=None, verbose=False):
398    """Copies "filename1" and "filename2" to "dir_name".
399
400    Each 'filename' may be a string or list of filename strings.
401
402    Filenames must be absolute pathnames
403    """
404
405    ##
406    # @brief copies a file or sequence to destination directory.
407    # @param dest The destination directory to copy to.
408    # @param file A filename string or sequence of filename strings.
409    def copy_file_or_sequence(dest, file):
410        if hasattr(file, '__iter__'):
411            for f in file:
412                shutil.copy(f, dir_name)
413                if verbose:
414                    log.critical('File %s copied' % f)
415        else:
416            shutil.copy(file, dir_name)
417            if verbose:
418                log.critical('File %s copied' % file)
419
420    # check we have a destination directory, create if necessary
421    if not os.path.isdir(dir_name):
422        if verbose:
423            log.critical('Make directory %s' % dir_name)
424        mkdir(dir_name, 0777)
425
426    if verbose:
427        log.critical('Output directory: %s' % dir_name)       
428
429    copy_file_or_sequence(dir_name, filename1)
430
431    if not filename2 is None:
432        copy_file_or_sequence(dir_name, filename2)
Note: See TracBrowser for help on using the repository browser.