source: trunk/anuga_core/source/anuga/utilities/file_utils.py @ 8879

Last change on this file since 8879 was 8146, checked in by wilsonr, 14 years ago

Removed '@brief' comments.

File size: 8.2 KB
Line 
1""" Generic file utilities for creating, parsing deleting
2    and naming files in a manner consistent across ANUGA.
3"""
4
5
6import os, sys
7import csv
8import numpy as num
9import shutil
10import log
11
12from exceptions import IOError
13
14def make_filename(s):
15    """Transform argument string into a standard filename
16   
17        Convert a possible filename into a standard form.
18        s Filename to process.
19        The new filename string.
20    """
21
22    s = s.strip()
23    s = s.replace(' ', '_')
24    s = s.replace('(', '')
25    s = s.replace(')', '')
26    s = s.replace('__', '_')
27
28    return s
29
30
31def check_dir(path, verbose=None):
32    """Check that specified path exists.
33    If path does not exist it will be created if possible
34
35    USAGE:
36       checkdir(path, verbose):
37
38    ARGUMENTS:
39        path -- Directory
40        verbose -- Flag verbose output (default: None)
41
42    RETURN VALUE:
43        Verified path including trailing separator
44    """
45
46    import os.path
47
48    if sys.platform in ['nt', 'dos', 'win32', 'what else?']:
49        unix = 0
50    else:
51        unix = 1
52
53    # add terminal separator, if it's not already there
54    if path[-1] != os.sep:
55        path = path + os.sep
56
57    # expand ~ or ~username in path
58    path = os.path.expanduser(path)
59
60    # create directory if required
61    if not (os.access(path, os.R_OK and os.W_OK) or path == ''):
62        try:
63            exitcode = os.mkdir(path)
64
65            # Change access rights if possible
66            if unix:
67                exitcode = os.system('chmod 775 ' + path)
68            else:
69                pass  # FIXME: What about access rights under Windows?
70
71            if verbose: log.critical('MESSAGE: Directory %s created.' % path)
72        except:
73            log.critical('WARNING: Directory %s could not be created.' % path)
74            if unix:
75                path = '/tmp/'
76            else:
77                path = 'C:' + os.sep
78
79            log.critical("Using directory '%s' instead" % path)
80
81    return path
82
83
84def del_dir(path):
85    """Recursively delete directory path and all its contents
86    """
87
88    if os.path.isdir(path):
89        for file in os.listdir(path):
90            X = os.path.join(path, file)
91
92            if os.path.isdir(X) and not os.path.islink(X):
93                del_dir(X)
94            else:
95                try:
96                    os.remove(X)
97                except:
98                    log.critical("Could not remove file %s" % X)
99
100        os.rmdir(path)
101
102
103def rmgeneric(path, func, verbose=False):
104    ERROR_STR= """Error removing %(path)s, %(error)s """
105
106    try:
107        func(path)
108        if verbose: log.critical('Removed %s' % path)
109    except OSError, (errno, strerror):
110        log.critical(ERROR_STR % {'path' : path, 'error': strerror })
111
112
113def removeall(path, verbose=False):
114    if not os.path.isdir(path):
115        return
116
117    for x in os.listdir(path):
118        fullpath = os.path.join(path, x)
119        if os.path.isfile(fullpath):
120            f = os.remove
121            rmgeneric(fullpath, f)
122        elif os.path.isdir(fullpath):
123            removeall(fullpath)
124            f = os.rmdir
125            rmgeneric(fullpath, f, verbose)
126
127
128def create_filename(datadir, filename, format, size=None, time=None):
129    """Create a standard filename.
130
131    datadir   directory where file is to be created
132    filename  filename 'stem'
133    format    format of the file, becomes filename extension
134    size      size of file, becomes part of filename
135    time      time (float), becomes part of filename
136
137    Returns the complete filename path, including directory.
138
139    The containing directory is created, if necessary.
140    """
141
142    FN = check_dir(datadir) + filename
143
144    if size is not None:
145        FN += '_size%d' % size
146
147    if time is not None:
148        FN += '_time%.2f' % time
149
150    FN += '.' + format
151
152    return FN
153
154
155def get_files(datadir, filename, format, size):
156    """Get all file (names) with given name, size and format
157    """
158
159    import glob
160
161    dir = check_dir(datadir)
162    pattern = dir + os.sep + filename + '_size=%d*.%s' % (size, format)
163
164    return glob.glob(pattern)
165
166
167def get_all_directories_with_name(look_in_dir='', base_name='', verbose=False):
168    '''
169    Finds all the directories in a "look_in_dir" which contains a "base_name".
170
171    Returns: a list of strings
172
173    Usage:     iterate_over = get_all_directories_with_name(dir, name)
174    then:      for swwfile in iterate_over:
175                   do stuff
176
177    Check "export_grids" and "get_maximum_inundation_data" for examples
178    '''
179
180    if look_in_dir == "":
181        look_in_dir = "."                                  # Unix compatibility
182
183    dir_ls = os.listdir(look_in_dir)
184    iterate_over = [x for x in dir_ls if base_name in x]
185
186    if len(iterate_over) == 0:
187        msg = 'No files of the base name %s' % base_name
188        raise IOError, msg
189
190    if verbose: log.critical('iterate over %s' % iterate_over)
191
192    return iterate_over
193
194
195
196def get_all_swwfiles(look_in_dir='', base_name='', verbose=False):
197    '''
198    Finds all the sww files in a "look_in_dir" which contains a "base_name".
199    will accept base_name with or without the extension ".sww"
200
201    Returns: a list of strings
202
203    Usage:     iterate_over = get_all_swwfiles(dir, name)
204    then
205               for swwfile in iterate_over:
206                   do stuff
207
208    Check "export_grids" and "get_maximum_inundation_data" for examples
209    '''
210
211    # plus tests the extension
212    name, extension = os.path.splitext(base_name)
213
214    if extension != '' and extension != '.sww':
215        msg = 'file %s%s must be a NetCDF sww file!' % (base_name, extension)
216        raise IOError, msg
217
218    if look_in_dir == "":
219        look_in_dir = "."                                   # Unix compatibility
220
221    dir_ls = os.listdir(look_in_dir)
222    iterate_over = [x[:-4] for x in dir_ls if name in x and x[-4:] == '.sww']
223    if len(iterate_over) == 0:
224        msg = 'No files of the base name %s' % name
225        raise IOError, msg
226
227    if verbose: log.critical('iterate over %s' % iterate_over)
228
229    return iterate_over
230
231
232def get_all_files_with_extension(look_in_dir='',
233                                 base_name='',
234                                 extension='.sww',
235                                 verbose=False):
236    '''Find all files in a directory with given stem name.
237    Finds all the sww files in a "look_in_dir" which contains a "base_name".
238
239    Returns: a list of strings
240
241    Usage:     iterate_over = get_all_swwfiles(dir, name)
242    then
243               for swwfile in iterate_over:
244                   do stuff
245
246    Check "export_grids" and "get_maximum_inundation_data" for examples
247    '''
248
249    # plus tests the extension
250    name, ext = os.path.splitext(base_name)
251
252    if ext != '' and ext != extension:
253        msg = 'base_name %s must be a file with %s extension!' \
254              % (base_name, extension)
255        raise IOError, msg
256
257    if look_in_dir == "":
258        look_in_dir = "."                               # Unix compatibility
259
260    dir_ls = os.listdir(look_in_dir)
261    iterate_over = [x[:-4] for x in dir_ls if name in x and x[-4:] == extension]
262
263    if len(iterate_over) == 0:
264        msg = 'No files of the base name %s in %s' % (name, look_in_dir)
265        raise IOError, msg
266
267    if verbose: log.critical('iterate over %s' % iterate_over)
268
269    return iterate_over
270
271
272
273
274def copy_code_files(dir_name, filename1, filename2=None, verbose=False):
275    """Copies "filename1" and "filename2" to "dir_name".
276
277    Each 'filename' may be a string or list of filename strings.
278
279    Filenames must be absolute pathnames
280    """
281
282    def copy_file_or_sequence(dest, file):
283        if hasattr(file, '__iter__'):
284            for f in file:
285                shutil.copy(f, dir_name)
286                if verbose:
287                    log.critical('File %s copied' % f)
288        else:
289            shutil.copy(file, dir_name)
290            if verbose:
291                log.critical('File %s copied' % file)
292
293    # check we have a destination directory, create if necessary
294    if not os.path.isdir(dir_name):
295        if verbose:
296            log.critical('Make directory %s' % dir_name)
297        os.mkdir(dir_name, 0777)
298
299    if verbose:
300        log.critical('Output directory: %s' % dir_name)       
301
302    copy_file_or_sequence(dir_name, filename1)
303
304    if not filename2 is None:
305        copy_file_or_sequence(dir_name, filename2)
Note: See TracBrowser for help on using the repository browser.