# Changeset 623

Ignore:
Timestamp:
Nov 24, 2004, 4:43:40 PM (19 years ago)
Message:

Created and tested general File_function (reading and interpolating time series from file) and used it to refactor and simplify File_boundary

Location:
inundation/ga/storm_surge/pyvolution
Files:
6 edited

Unmodified
Removed

• ## inundation/ga/storm_surge/pyvolution/shallow_water.py

 r620 """ #FIXME: Experimental #FIXME: Experimental (from old version). Not in use at the moment #Shortcuts xmomc[k] = ymomc[k] = 0.0 #FIXME: Delete #From 'newstyle #if hc[k] < domain.minimum_allowed_height: def check_forcefield(f): """Check that f is either 1: a callable object of x,y,t, where x and y are vectors 1: a callable object f(t,x,y), where x and y are vectors and that it returns an array or a list of same length as x and y y = ones(3, Float) try: q = f(x, y, 1.0) q = f(1.0, x, y) except Exception, e: msg = 'Function %s could not be executed:\n%s' %(f, e) class Wind_stress: """Apply wind stress to water momentum """Apply wind stress to water momentum in terms of wind speed [m/s] and wind direction [degrees] """ map from True north to grid north. Arguments can also be Python functions of x,y,t as in def speed(x,y,t): Arguments can also be Python functions of t,x,y as in def speed(t,x,y): ... return s def angle(x,y,t): def angle(t,x,y): ... return phi forcing_terms as in Alternatively, one vector valued function for (speed, angle) can be applied, providing both quantities simultaneously. FIXME: Not yet implemented domain.forcing_terms.append(W) """ if callable(self.speed): xc = domain.get_centroid_coordinates() s_vec = self.speed(xc[:,0], xc[:,1], t) s_vec = self.speed(t, xc[:,0], xc[:,1]) else: #Assume s is a scalar if callable(self.phi): xc = domain.get_centroid_coordinates() phi_vec = self.phi(xc[:,0], xc[:,1], t) phi_vec = self.phi(t, xc[:,0], xc[:,1]) else: #Assume phi is a scalar ymom_update[k] += S*v class Wind_stress_from_file: """Apply wind stress read from a file to water momentum At this stage the wind field is assumed to depend on time only, i.e no spatial dependency. FIXME: This class may be incorporated in the generic Wind_stress class Superseded """ def __init__(self, filename): """Initialise windfield from a file with the following format time [DD/MM/YY hh:mm:ss], speed [m/s] direction [degrees] e.g. 03/09/04 19:15:00, 9.53 39 domain.forcing_terms.append(W) """ import time from Numeric import array from config import time_format from config import rho_a, rho_w, eta_w self.const = eta_w*rho_a/rho_w try: fid = open(filename) except Exception, e: msg = 'Wind stress file %s could not be opened: %s\n'\ %(filename, e) raise msg line = fid.readline() fid.close() fields = line.split(',') msg = 'File %s must have the format date, values' assert len(fields) == 2, msg try: starttime = time.mktime(time.strptime(fields[0], time_format)) except ValueError: msg = 'First field in file %s must be' %filename msg += ' date-time with format %s.\n' %time_format msg += 'I got %s instead.' %fields[0] raise msg #Split values values = [] for value in fields[1].split(): values.append(float(value)) q = array(values) msg = 'ERROR: File function must return a 1d list or array ' assert len(q.shape) == 1, msg msg = 'Values specified in file must convert to an array of length 2' assert len(q) == 2, msg self.filename = filename self.domain = domain self.starttime = starttime if domain.starttime is None: domain.starttime = starttime else: msg = 'Start time as specified in domain (%s) is earlier ' 'than the starttime of file %s: %s'\ %(domain.starttime, self.filename, self.starttime) if self.starttime > domain.starttime: raise msg self.read_times() #Now read all times in. def read_times(self): from Numeric import zeros, Float, alltrue from config import time_format import time fid = open(self.filename) lines = fid.readlines() fid.close() N = len(lines) T = zeros(N, Float)       #Time Q = zeros((N, 2), Float)  #Wind field (s, phi) for i, line in enumerate(lines): fields = line.split(',') real_time = time.mktime(time.strptime(fields[0], time_format)) T[i] = real_time - self.start_time for j, value in enumerate(fields[1].split()): Q[i, j] = float(value) msg = 'File %s must list time as a monotonuosly ' %self.filename msg += 'increasing sequence' assert alltrue( T[1:] - T[:-1] > 0 ), msg self.T = T     #Time self.Q = Q     #Windfied self.index = 0 #Initial index def __repr__(self): return 'Wind field from file' def __call__(self, domain): """Evaluate windfield based on values found in domain """ from math import pi, cos, sin, sqrt xmom_update = domain.quantities['xmomentum'].explicit_update ymom_update = domain.quantities['ymomentum'].explicit_update N = domain.number_of_elements t = self.domain.time msg = 'Time given in File %s does not match model time'\ %self.filename if t < self.T[0]: raise msg if t > self.T[-1]: raise msg oldindex = self.index #Find slot while t > self.T[self.index]: self.index += 1 while t < self.T[self.index]: self.index -= 1 #t is now between index and index+1 ratio = (t - self.T[self.index])/\ (self.T[self.index+1] - self.T[self.index]) #Compute interpolated values for s and phi q = self.Q[self.index,:] +\ ratio*(self.Q[self.index+1,:] - self.Q[self.index,:]) s = q[0] #Convert to radians phi = q[1]*pi/180 #Compute velocity vector (u, v) u = s*cos(phi) v = s*sin(phi) #Compute wind stress for this time step S = self.const * sqrt(u**2 + v**2) xmom_update += S*u ymom_update += S*v
• ## inundation/ga/storm_surge/pyvolution/test_generic_boundary_conditions.py

 r324 t_string = time.strftime(time_format, time.gmtime(t)) fid.write('%s,%f %f\n' %(t_string, 1.0*i, sin(i*2*pi/10))) fid.close() F = File_boundary(domain, filename) F = File_boundary(filename, domain) from config import default_boundary_tag os.remove(filename) def test_fileboundary_exception(self): """Test that boundary object complians if number of conserved quantities are wrong """ from domain import Domain from quantity import Quantity import time, os from math import sin, pi from config import time_format a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0,0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe elements = [ [1,0,2], [1,2,4], [4,2,5], [3,1,4] ] domain = Domain(points, elements) domain.conserved_quantities = ['level', 'xmomentum', 'ymomentum'] domain.quantities['level'] =\ Quantity(domain, [[1,2,3], [5,5,5], [0,0,9], [-6, 3, 3]]) domain.quantities['xmomentum'] =\ Quantity(domain, [[2,3,4], [5,5,5], [0,0,9], [-6, 3, 3]]) domain.quantities['ymomentum'] =\ Quantity(domain, [[2,3,4], [5,5,5], [0,0,9], [-6, 3, 3]]) domain.check_integrity() #Write file (with only two values) filename = 'boundarytest' + str(time.time()) fid = open(filename, 'w') start = time.mktime(time.strptime('2000', '%Y')) dt = 5*60  #Five minute intervals for i in range(10): t = start + i*dt t_string = time.strftime(time_format, time.gmtime(t)) fid.write('%s,%f %f\n' %(t_string, 1.0*i, sin(i*2*pi/10))) fid.close() try: F = File_boundary(filename, domain) except AssertionError: pass else: raise 'Should have raised an exception' os.remove(filename) #-------------------------------------------------------------
• ## inundation/ga/storm_surge/pyvolution/test_shallow_water.py

 r612 #Variable windfield implemented using functions def speed(x,y,t): def speed(t,x,y): """Large speeds halfway between center and edges Low speeds at center and edges def scalar_func(x,y,t): def scalar_func(t,x,y): """Function that returns a scalar. Used to test error message when Numeric array is expected def angle(x,y,t): def angle(t,x,y): """Rotating field """ x = xc[:,0] y = xc[:,1] s_vec = speed(x,y,t) phi_vec = angle(x,y,t) s_vec = speed(t,x,y) phi_vec = angle(t,x,y)
• ## inundation/ga/storm_surge/pyvolution/test_util.py

 r258 def test_file_function_time(self): """Test that File function interpolates correctly between given times. No x,y dependency here. """ #Write file import os, time from config import time_format from math import sin, pi finaltime = 1200 filename = 'test_file_function.txt' fid = open(filename, 'w') start = time.mktime(time.strptime('2000', '%Y')) dt = 60  #One minute intervals t = 0.0 while t <= finaltime: t_string = time.strftime(time_format, time.gmtime(t+start)) fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600))) t += dt fid.close() F = File_function(filename) #Now try interpolation for i in range(20): t = i*10 q = F(t) #Exact linear intpolation assert allclose(q[0], 2*t) if i%6 == 0: assert allclose(q[1], t**2) assert allclose(q[2], sin(t*pi/600)) #Check non-exact t = 90 #Halfway between 60 and 120 q = F(t) assert allclose( (120**2 + 60**2)/2, q[1] ) assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] ) t = 100 #Two thirds of the way between between 60 and 120 q = F(t) assert allclose( 2*120**2/3 + 60**2/3, q[1] ) assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] ) def test_file_function_time_with_domain(self): """Test that File function interpolates correctly between given times. No x,y dependency here. Use domain with starttime """ #Write file import os, time, calendar from config import time_format from math import sin, pi from domain import Domain finaltime = 1200 filename = 'test_file_function.txt' fid = open(filename, 'w') start = time.mktime(time.strptime('2000', '%Y')) dt = 60  #One minute intervals t = 0.0 while t <= finaltime: t_string = time.strftime(time_format, time.gmtime(t+start)) fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600))) t += dt fid.close() a = [0.0, 0.0] b = [4.0, 0.0] c = [0.0, 3.0] points = [a, b, c] vertices = [[0,1,2]] domain = Domain(points, vertices) #Check that domain.starttime is updated if non-existing F = File_function(filename, domain) assert allclose(domain.starttime, start) #Check that domain.starttime is updated if too early domain.starttime = start - 1 F = File_function(filename, domain) assert allclose(domain.starttime, start) #Check that domain.starttime isn't updated if later domain.starttime = start + 1 F = File_function(filename, domain) assert allclose(domain.starttime, start+1) #Now try interpolation for i in range(20): t = i*10 q = F(t) #Exact linear intpolation assert allclose(q[0], 2*t) if i%6 == 0: assert allclose(q[1], t**2) assert allclose(q[2], sin(t*pi/600)) #Check non-exact t = 90 #Halfway between 60 and 120 q = F(t) assert allclose( (120**2 + 60**2)/2, q[1] ) assert allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] ) t = 100 #Two thirds of the way between between 60 and 120 q = F(t) assert allclose( 2*120**2/3 + 60**2/3, q[1] ) assert allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] ) #-------------------------------------------------------------
• ## inundation/ga/storm_surge/pyvolution/util.py

 r274 return sum(x)/len(x) #FIXME: Maybe move to util as this is quite general class File_function: """Read time series from file and return a callable object: f(t,x,y) which will return interpolated values based on the input file. The file format is assumed to be either two fields separated by a comma: time [DD/MM/YY hh:mm:ss], value0 value1 value2 ... or four comma separated fields time [DD/MM/YY hh:mm:ss], x, y, value0 value1 value2 ... In either case, the callable obejct will return a tuple of interpolated values, one each value stated in the file. E.g 31/08/04 00:00:00, 1.328223 0 0 31/08/04 00:15:00, 1.292912 0 0 will provide a time dependent function f(t,x=None,y=None), ignoring x and y, which returns three values per call. NOTE: At this stage the function is assumed to depend on time only, i.e  no spatial dependency!!!!! When that is need we can use the least_squares interpolation. """ def __init__(self, filename, domain=None): """Initialise callable object from a file. See docstring for class File_function If domain is specified, model time (domain,starttime) will be checked and possibly modified ALl times are assumed to be in UTC """ import time, calendar from Numeric import array from config import time_format try: fid = open(filename) except Exception, e: msg = 'File %s could not be opened: %s\n'\ %(filename, e) raise msg line = fid.readline() fid.close() fields = line.split(',') msg = 'File %s must have the format date, value0 value1 value2 ...' msg += ' or date, x, y, value0 value1 value2 ...' mode = len(fields) assert mode in [2,4], msg try: starttime = calendar.timegm(time.strptime(fields[0], time_format)) except ValueError: msg = 'First field in file %s must be' %filename msg += ' date-time with format %s.\n' %time_format msg += 'I got %s instead.' %fields[0] raise msg #Split values values = [] for value in fields[mode-1].split(): values.append(float(value)) q = array(values) msg = 'ERROR: File must contain at least one independent value' assert len(q.shape) == 1, msg self.number_of_values = len(q) self.filename = filename self.starttime = starttime if domain is not None: if domain.starttime is None: domain.starttime = starttime else: msg = 'WARNING: Start time as specified in domain (%s)'\ %domain.starttime msg += ' is earlier than the starttime of file %s: %s.'\ %(self.filename, self.starttime) msg += 'Modifying starttime accordingly.' if self.starttime > domain.starttime: #FIXME: Print depending on some verbosity setting #print msg domain.starttime = self.starttime #Modifying model time if mode == 2: self.read_times() #Now read all times in. else: raise 'x,y dependecy not yet implemented' def read_times(self): """Read time and values """ from Numeric import zeros, Float, alltrue from config import time_format import time, calendar fid = open(self.filename) lines = fid.readlines() fid.close() N = len(lines) d = self.number_of_values T = zeros(N, Float)       #Time Q = zeros((N, d), Float)  #Values for i, line in enumerate(lines): fields = line.split(',') realtime = calendar.timegm(time.strptime(fields[0], time_format)) T[i] = realtime - self.starttime for j, value in enumerate(fields[1].split()): Q[i, j] = float(value) msg = 'File %s must list time as a monotonuosly ' %self.filename msg += 'increasing sequence' assert alltrue( T[1:] - T[:-1] > 0 ), msg self.T = T     #Time self.Q = Q     #Values self.index = 0 #Initial index def __repr__(self): return 'File function' def __call__(self, t, x=None, y=None): """Evaluate f(t,x,y) FIXME: x, y dependency not yet implemented """ from math import pi, cos, sin, sqrt msg = 'Time interval derived from file %s (%s:%s) does not match model time: %s'\ %(self.filename, self.T[0], self.T[1], t) if t < self.T[0]: raise msg if t > self.T[-1]: raise msg oldindex = self.index #Find slot while t > self.T[self.index]: self.index += 1 while t < self.T[self.index]: self.index -= 1 #t is now between index and index+1 ratio = (t - self.T[self.index])/\ (self.T[self.index+1] - self.T[self.index]) #Compute interpolated values for values q = self.Q[self.index,:] +\ ratio*(self.Q[self.index+1,:] - self.Q[self.index,:]) #Return vector of interpolated values return q #################################################################### #Python versions of function that are also implemented in util_gateway.c
Note: See TracChangeset for help on using the changeset viewer.