Changeset 2394


Ignore:
Timestamp:
Feb 14, 2006, 12:56:50 PM (18 years ago)
Author:
duncan
Message:

adding interpolation_function capability

Location:
inundation/fit_interpolate
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • inundation/fit_interpolate/interpolate.py

    r2201 r2394  
    2121
    2222from Numeric import zeros, array, Float, Int, dot, transpose, concatenate, \
    23      ArrayType, allclose, take
     23     ArrayType, allclose, take, NewAxis
    2424
    2525from pyvolution.mesh import Mesh
     
    6565
    6666        """
     67
     68        # why no alpha in parameter list?
    6769       
    6870        # Initialise variabels
     
    7375        triangles = ensure_numeric(triangles, Int)
    7476        vertex_coordinates = ensure_numeric(vertex_coordinates, Float)
    75 
    7677        #Build underlying mesh
    7778        if verbose: print 'Building mesh'
     
    282283          Interpolated values at inputted points (z).
    283284        """
    284        
     285        #print "point_coordinates interpolate.interpolate",point_coordinates
    285286        # Can I interpolate, based on previous point_coordinates?
    286287        if point_coordinates is None:
     
    338339    def _get_point_data_z(self, f, verbose=False):
    339340        return self._A * f
    340        
     341
     342
     343
     344class Interpolation_interface:
     345    """Interpolation_interface - creates callable object f(t, id) or f(t,x,y)
     346    which is interpolated from time series defined at vertices of
     347    triangular mesh (such as those stored in sww files)
     348
     349    Let m be the number of vertices, n the number of triangles
     350    and p the number of timesteps.
     351
     352    Mandatory input
     353        time:               px1 array of monotonously increasing times (Float)
     354        quantities:         Dictionary of arrays or 1 array (Float)
     355                            The arrays must either have dimensions pxm or mx1.
     356                            The resulting function will be time dependent in
     357                            the former case while it will be constan with
     358                            respect to time in the latter case.
     359       
     360    Optional input:
     361        quantity_names:     List of keys into the quantities dictionary
     362        vertex_coordinates: mx2 array of coordinates (Float)
     363        triangles:          nx3 array of indices into vertex_coordinates (Int)
     364        interpolation_points: Nx2 array of coordinates to be interpolated to
     365        verbose:            Level of reporting
     366   
     367   
     368    The quantities returned by the callable object are specified by
     369    the list quantities which must contain the names of the
     370    quantities to be returned and also reflect the order, e.g. for
     371    the shallow water wave equation, on would have
     372    quantities = ['stage', 'xmomentum', 'ymomentum']
     373
     374    The parameter interpolation_points decides at which points interpolated
     375    quantities are to be computed whenever object is called.
     376    If None, return average value
     377    """
     378
     379   
     380   
     381    def __init__(self,
     382                 time,
     383                 quantities,
     384                 quantity_names = None, 
     385                 vertex_coordinates = None,
     386                 triangles = None,
     387                 interpolation_points = None,
     388                 verbose = False):
     389        """Initialise object and build spatial interpolation if required
     390        """
     391
     392        from Numeric import array, zeros, Float, alltrue, concatenate,\
     393             reshape, ArrayType
     394
     395
     396        from util import mean, ensure_numeric
     397        from config import time_format
     398        import types
     399
     400
     401        #Check temporal info
     402        time = ensure_numeric(time)       
     403        msg = 'Time must be a monotonuosly '
     404        msg += 'increasing sequence %s' %time
     405        assert alltrue(time[1:] - time[:-1] >= 0 ), msg
     406
     407
     408        #Check if quantities is a single array only
     409        if type(quantities) != types.DictType:
     410            quantities = ensure_numeric(quantities)
     411            quantity_names = ['Attribute']
     412
     413            #Make it a dictionary
     414            quantities = {quantity_names[0]: quantities}
     415
     416
     417        #Use keys if no names are specified
     418        if quantity_names is None:
     419            quantity_names = quantities.keys()
     420
     421
     422        #Check spatial info
     423        if vertex_coordinates is None:
     424            self.spatial = False
     425        else:   
     426            vertex_coordinates = ensure_numeric(vertex_coordinates)
     427
     428            assert triangles is not None, 'Triangles array must be specified'
     429            triangles = ensure_numeric(triangles)
     430            self.spatial = True           
     431
     432             
     433        #Save for use with statistics
     434        self.quantity_names = quantity_names       
     435        self.quantities = quantities       
     436        self.vertex_coordinates = vertex_coordinates
     437        self.interpolation_points = interpolation_points
     438        self.T = time[:]  # Time assumed to be relative to starttime
     439        self.index = 0    # Initial time index
     440        self.precomputed_values = {}
     441           
     442        #Precomputed spatial interpolation if requested
     443        if interpolation_points is not None:
     444            if self.spatial is False:
     445                raise 'Triangles and vertex_coordinates must be specified'
     446           
     447            try:
     448                self.interpolation_points = ensure_numeric(interpolation_points)
     449            except:
     450                msg = 'Interpolation points must be an N x 2 Numeric array '+\
     451                      'or a list of points\n'
     452                msg += 'I got: %s.' %(str(self.interpolation_points)[:60] +\
     453                                      '...')
     454                raise msg
     455
     456
     457            m = len(self.interpolation_points)
     458            p = len(self.T)
     459           
     460            for name in quantity_names:
     461                self.precomputed_values[name] = zeros((p, m), Float)
     462
     463            #Build interpolator
     464            interpol = Interpolate(vertex_coordinates,
     465                                     triangles,
     466                                     #point_coordinates = \
     467                                     #self.interpolation_points,
     468                                     #alpha = 0,
     469                                     verbose = verbose)
     470
     471            if verbose: print 'Interpolate'
     472            for i, t in enumerate(self.T):
     473                #Interpolate quantities at this timestep
     474                if verbose and i%((p+10)/10)==0:
     475                    print ' time step %d of %d' %(i, p)
     476                   
     477                for name in quantity_names:
     478                    if len(quantities[name].shape) == 2:
     479                        result = interpol.interpolate(quantities[name][i,:],
     480                                     point_coordinates = \
     481                                     self.interpolation_points)
     482                    else:
     483                       #Assume no time dependency
     484                       result = interpol.interpolate(quantities[name][:],
     485                                     point_coordinates = \
     486                                     self.interpolation_points)
     487                       
     488                    self.precomputed_values[name][i, :] = result
     489                   
     490            #Report
     491            if verbose:
     492                print self.statistics()
     493                #self.print_statistics()
     494           
     495        else:
     496            #Store quantitites as is
     497            for name in quantity_names:
     498                self.precomputed_values[name] = quantities[name]
     499
     500
     501    def __repr__(self):
     502        #return 'Interpolation function (spatio-temporal)'
     503        return self.statistics()
     504   
     505
     506    def __call__(self, t, point_id = None, x = None, y = None):
     507        """Evaluate f(t), f(t, point_id) or f(t, x, y)
     508
     509        Inputs:
     510          t: time - Model time. Must lie within existing timesteps
     511          point_id: index of one of the preprocessed points.
     512          x, y:     Overrides location, point_id ignored
     513         
     514          If spatial info is present and all of x,y,point_id
     515          are None an exception is raised
     516                   
     517          If no spatial info is present, point_id and x,y arguments are ignored
     518          making f a function of time only.
     519
     520         
     521          FIXME: point_id could also be a slice
     522          FIXME: What if x and y are vectors?
     523          FIXME: What about f(x,y) without t?
     524        """
     525
     526        from math import pi, cos, sin, sqrt
     527        from Numeric import zeros, Float
     528        from util import mean       
     529
     530        if self.spatial is True:
     531            if point_id is None:
     532                if x is None or y is None:
     533                    msg = 'Either point_id or x and y must be specified'
     534                    raise msg
     535            else:
     536                if self.interpolation_points is None:
     537                    msg = 'Interpolation_function must be instantiated ' +\
     538                          'with a list of interpolation points before parameter ' +\
     539                          'point_id can be used'
     540                    raise msg
     541
     542
     543        msg = 'Time interval [%s:%s]' %(self.T[0], self.T[1])
     544        msg += ' does not match model time: %s\n' %t
     545        if t < self.T[0]: raise msg
     546        if t > self.T[-1]: raise msg
     547
     548        oldindex = self.index #Time index
     549
     550        #Find current time slot
     551        while t > self.T[self.index]: self.index += 1
     552        while t < self.T[self.index]: self.index -= 1
     553
     554        if t == self.T[self.index]:
     555            #Protect against case where t == T[-1] (last time)
     556            # - also works in general when t == T[i]
     557            ratio = 0
     558        else:
     559            #t is now between index and index+1
     560            ratio = (t - self.T[self.index])/\
     561                    (self.T[self.index+1] - self.T[self.index])
     562
     563        #Compute interpolated values
     564        q = zeros(len(self.quantity_names), Float)
     565
     566        for i, name in enumerate(self.quantity_names):
     567            Q = self.precomputed_values[name]
     568
     569            if self.spatial is False:
     570                #If there is no spatial info               
     571                assert len(Q.shape) == 1
     572
     573                Q0 = Q[self.index]
     574                if ratio > 0: Q1 = Q[self.index+1]
     575
     576            else:
     577                if x is not None and y is not None:
     578                    #Interpolate to x, y
     579                   
     580                    raise 'x,y interpolation not yet implemented'
     581                else:
     582                    #Use precomputed point
     583                    Q0 = Q[self.index, point_id]
     584                    if ratio > 0: Q1 = Q[self.index+1, point_id]
     585
     586            #Linear temporal interpolation   
     587            if ratio > 0:
     588                q[i] = Q0 + ratio*(Q1 - Q0)
     589            else:
     590                q[i] = Q0
     591
     592
     593        #Return vector of interpolated values
     594        #if len(q) == 1:
     595        #    return q[0]
     596        #else:
     597        #    return q
     598
     599
     600        #Return vector of interpolated values
     601        #FIXME:
     602        if self.spatial is True:
     603            return q
     604        else:
     605            #Replicate q according to x and y
     606            #This is e.g used for Wind_stress
     607            if x is None or y is None:
     608                return q
     609            else:
     610                try:
     611                    N = len(x)
     612                except:
     613                    return q
     614                else:
     615                    from Numeric import ones, Float
     616                    #x is a vector - Create one constant column for each value
     617                    N = len(x)
     618                    assert len(y) == N, 'x and y must have same length'
     619                    res = []
     620                    for col in q:
     621                        res.append(col*ones(N, Float))
     622                       
     623                return res
     624
     625
     626    def statistics(self):
     627        """Output statistics about interpolation_function
     628        """
     629       
     630        vertex_coordinates = self.vertex_coordinates
     631        interpolation_points = self.interpolation_points               
     632        quantity_names = self.quantity_names
     633        quantities = self.quantities
     634        precomputed_values = self.precomputed_values                 
     635               
     636        x = vertex_coordinates[:,0]
     637        y = vertex_coordinates[:,1]               
     638
     639        str =  '------------------------------------------------\n'
     640        str += 'Interpolation_function (spatio-temporal) statistics:\n'
     641        str += '  Extent:\n'
     642        str += '    x in [%f, %f], len(x) == %d\n'\
     643               %(min(x), max(x), len(x))
     644        str += '    y in [%f, %f], len(y) == %d\n'\
     645               %(min(y), max(y), len(y))
     646        str += '    t in [%f, %f], len(t) == %d\n'\
     647               %(min(self.T), max(self.T), len(self.T))
     648        str += '  Quantities:\n'
     649        for name in quantity_names:
     650            q = quantities[name][:].flat
     651            str += '    %s in [%f, %f]\n' %(name, min(q), max(q))
     652
     653        if interpolation_points is not None:   
     654            str += '  Interpolation points (xi, eta):'\
     655                   ' number of points == %d\n' %interpolation_points.shape[0]
     656            str += '    xi in [%f, %f]\n' %(min(interpolation_points[:,0]),
     657                                            max(interpolation_points[:,0]))
     658            str += '    eta in [%f, %f]\n' %(min(interpolation_points[:,1]),
     659                                             max(interpolation_points[:,1]))
     660            str += '  Interpolated quantities (over all timesteps):\n'
     661       
     662            for name in quantity_names:
     663                q = precomputed_values[name][:].flat
     664                str += '    %s at interpolation points in [%f, %f]\n'\
     665                       %(name, min(q), max(q))
     666        str += '------------------------------------------------\n'
     667
     668        return str
     669
     670def interpolate_sww(sww_file, time, interpolation_points,
     671                    quantity_names = None, verbose = False):
     672    """
     673    obsolete.
     674    use file_function in utils
     675    """
     676    #open sww file
     677    x, y, volumes, time, quantities = read_sww(sww_file)
     678    print "x",x
     679    print "y",y
     680   
     681    print "time", time
     682    print "quantities", quantities
     683
     684    #Add the x and y together
     685    vertex_coordinates = concatenate((x[:,NewAxis], y[:,NewAxis]),axis=1)
     686
     687    #Will return the quantity values at the specified times and locations
     688    interp =  Interpolation_interface(
     689                 time,
     690                 quantities,
     691                 quantity_names = quantity_names, 
     692                 vertex_coordinates = vertex_coordinates,
     693                 triangles = volumes,
     694                 interpolation_points = interpolation_points,
     695                 verbose = verbose)
     696
     697
     698def read_sww(file_name):
     699    """
     700    Read in an sww file.
     701   
     702    Input;
     703    file_name - the sww file
     704   
     705    Output;
     706    x - Vector of x values
     707    y - Vector of y values
     708    z - Vector of bed elevation
     709    volumes - Array.  Each row has 3 values, representing
     710    the vertices that define the volume
     711    time - Vector of the times where there is stage information
     712    stage - array with respect to time and vertices (x,y)
     713    """
     714   
     715    #FIXME Have this reader as part of data_manager?
     716   
     717    from Scientific.IO.NetCDF import NetCDFFile     
     718    import tempfile
     719    import sys
     720    import os
     721   
     722    #Check contents
     723    #Get NetCDF
     724   
     725    # see if the file is there.  Throw a QUIET IO error if it isn't
     726    # I don't think I could implement the above
     727   
     728    #throws prints to screen if file not present
     729    junk = tempfile.mktemp(".txt")
     730    fd = open(junk,'w')
     731    stdout = sys.stdout
     732    sys.stdout = fd
     733    fid = NetCDFFile(file_name, 'r')
     734    sys.stdout = stdout
     735    fd.close()
     736    #clean up
     737    os.remove(junk)     
     738     
     739    # Get the variables
     740    x = fid.variables['x'][:]
     741    y = fid.variables['y'][:]
     742    volumes = fid.variables['volumes'][:]
     743    time = fid.variables['time'][:]
     744
     745    keys = fid.variables.keys()
     746    keys.remove("x")
     747    keys.remove("y")
     748    keys.remove("volumes")
     749    keys.remove("time")
     750     #Turn NetCDF objects into Numeric arrays
     751    quantities = {}
     752    for name in keys:
     753        quantities[name] = fid.variables[name][:]
     754           
     755    fid.close()
     756    return x, y, volumes, time, quantities
     757
    341758#-------------------------------------------------------------
    342759if __name__ == "__main__":
  • inundation/fit_interpolate/test_interpolate.py

    r2201 r2394  
    22
    33#TEST
     4
     5#import time, os
     6
     7
    48import sys
     9import os
    510import unittest
    611from math import sqrt
    7 
    8 
     12import tempfile
     13
     14from Scientific.IO.NetCDF import NetCDFFile
     15from Numeric import allclose, array, transpose, zeros, Float
     16
     17
     18# ANUGA code imports
    919from interpolate import *
    10 from Numeric import allclose, array, transpose
    11 
    1220from coordinate_transforms.geo_reference import Geo_reference
     21from shallow_water import Domain, Transmissive_boundary #, mean
     22from util import mean
     23from data_manager import get_dataobject
    1324
    1425def distance(x, y):
     
    2334
    2435    def setUp(self):
    25         pass
     36
     37        import time
     38        from mesh_factory import rectangular
     39
     40
     41        #Create basic mesh
     42        points, vertices, boundary = rectangular(2, 2)
     43
     44        #Create shallow water domain
     45        domain = Domain(points, vertices, boundary)
     46        domain.default_order=2
     47        domain.beta_h = 0
     48
     49
     50        #Set some field values
     51        domain.set_quantity('elevation', lambda x,y: -x)
     52        domain.set_quantity('friction', 0.03)
     53
     54
     55        ######################
     56        # Boundary conditions
     57        B = Transmissive_boundary(domain)
     58        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
     59
     60
     61        ######################
     62        #Initial condition - with jumps
     63
     64        bed = domain.quantities['elevation'].vertex_values
     65        stage = zeros(bed.shape, Float)
     66
     67        h = 0.3
     68        for i in range(stage.shape[0]):
     69            if i % 2 == 0:
     70                stage[i,:] = bed[i,:] + h
     71            else:
     72                stage[i,:] = bed[i,:]
     73
     74        domain.set_quantity('stage', stage)
     75
     76        domain.distribute_to_vertices_and_edges()
     77
     78
     79        self.domain = domain
     80
     81        C = domain.get_vertex_coordinates()
     82        self.X = C[:,0:6:2].copy()
     83        self.Y = C[:,1:6:2].copy()
     84
     85        self.F = bed
     86
     87
    2688
    2789    def tearDown(self):
     
    530592       
    531593
     594
     595    def test_interpolation_interface_time_only(self):
     596        """Test spatio-temporal interpolation
     597        Test that spatio temporal function performs the correct
     598        interpolations in both time and space
     599        """
     600
     601
     602        #Three timesteps
     603        time = [1.0, 5.0, 6.0]
     604       
     605
     606        #One quantity
     607        Q = zeros( (3,6), Float )
     608
     609        #Linear in time and space
     610        a = [0.0, 0.0]
     611        b = [0.0, 2.0]
     612        c = [2.0, 0.0]
     613        d = [0.0, 4.0]
     614        e = [2.0, 2.0]
     615        f = [4.0, 0.0]
     616
     617        points = [a, b, c, d, e, f]
     618       
     619        for i, t in enumerate(time):
     620            Q[i, :] = t*linear_function(points)
     621
     622           
     623        #Check basic interpolation of one quantity using averaging
     624        #(no interpolation points or spatial info)
     625        from util import mean       
     626        I = Interpolation_interface(time, [mean(Q[0,:]),
     627                                          mean(Q[1,:]),
     628                                          mean(Q[2,:])])
     629
     630
     631
     632        #Check temporal interpolation
     633        for i in [0,1,2]:
     634            assert allclose(I(time[i]), mean(Q[i,:]))
     635
     636        #Midway   
     637        assert allclose(I( (time[0] + time[1])/2 ),
     638                        (I(time[0]) + I(time[1]))/2 )
     639
     640        assert allclose(I( (time[1] + time[2])/2 ),
     641                        (I(time[1]) + I(time[2]))/2 )
     642
     643        assert allclose(I( (time[0] + time[2])/2 ),
     644                        (I(time[0]) + I(time[2]))/2 )                 
     645
     646        #1/3
     647        assert allclose(I( (time[0] + time[2])/3 ),
     648                        (I(time[0]) + I(time[2]))/3 )                         
     649
     650
     651        #Out of bounds checks
     652        try:
     653            I(time[0]-1)
     654        except:
     655            pass
     656        else:
     657            raise 'Should raise exception'
     658
     659        try:
     660            I(time[-1]+1)
     661        except:
     662            pass
     663        else:
     664            raise 'Should raise exception'       
     665
     666
     667       
     668
     669    def test_interpolation_interface_spatial_only(self):
     670        """Test spatio-temporal interpolation with constant time
     671        """
     672
     673        #Three timesteps
     674        time = [1.0, 5.0, 6.0]
     675       
     676       
     677        #Setup mesh used to represent fitted function
     678        a = [0.0, 0.0]
     679        b = [0.0, 2.0]
     680        c = [2.0, 0.0]
     681        d = [0.0, 4.0]
     682        e = [2.0, 2.0]
     683        f = [4.0, 0.0]
     684
     685        points = [a, b, c, d, e, f]
     686        #bac, bce, ecf, dbe
     687        triangles = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
     688
     689
     690        #New datapoints where interpolated values are sought
     691        interpolation_points = [[ 0.0, 0.0],
     692                                [ 0.5, 0.5],
     693                                [ 0.7, 0.7],
     694                                [ 1.0, 0.5],
     695                                [ 2.0, 0.4],
     696                                [ 2.8, 1.2]]
     697
     698
     699        #One quantity linear in space
     700        Q = linear_function(points)
     701
     702
     703        #Check interpolation of one quantity using interpolaton points
     704        I = Interpolation_interface(time, Q,
     705                                   vertex_coordinates = points,
     706                                   triangles = triangles,
     707                                   interpolation_points = interpolation_points,
     708                                   verbose = False)
     709
     710
     711        answer = linear_function(interpolation_points)
     712
     713        t = time[0]
     714        for j in range(50): #t in [1, 6]
     715            for id in range(len(interpolation_points)):
     716                assert allclose(I(t, id), answer[id])
     717
     718            t += 0.1   
     719
     720
     721        try:   
     722            I(1)
     723        except:
     724            pass
     725        else:
     726            raise 'Should raise exception'
     727           
     728
     729
     730    def test_interpolation_interface(self):
     731        """Test spatio-temporal interpolation
     732        Test that spatio temporal function performs the correct
     733        interpolations in both time and space
     734        """
     735
     736
     737        #Three timesteps
     738        time = [1.0, 5.0, 6.0]
     739       
     740
     741        #Setup mesh used to represent fitted function
     742        a = [0.0, 0.0]
     743        b = [0.0, 2.0]
     744        c = [2.0, 0.0]
     745        d = [0.0, 4.0]
     746        e = [2.0, 2.0]
     747        f = [4.0, 0.0]
     748
     749        points = [a, b, c, d, e, f]
     750        #bac, bce, ecf, dbe
     751        triangles = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]]
     752
     753
     754        #New datapoints where interpolated values are sought
     755        interpolation_points = [[ 0.0, 0.0],
     756                                [ 0.5, 0.5],
     757                                [ 0.7, 0.7],
     758                                [ 1.0, 0.5],
     759                                [ 2.0, 0.4],
     760                                [ 2.8, 1.2]]
     761
     762
     763        #One quantity
     764        Q = zeros( (3,6), Float )
     765
     766        #Linear in time and space
     767        for i, t in enumerate(time):
     768            Q[i, :] = t*linear_function(points)
     769
     770
     771        #Check interpolation of one quantity using interpolaton points)
     772        I = Interpolation_interface(time, Q,
     773                                   vertex_coordinates = points,
     774                                   triangles = triangles,
     775                                   interpolation_points = interpolation_points,
     776                                   verbose = False)
     777
     778
     779        answer = linear_function(interpolation_points)
     780       
     781        t = time[0]
     782        for j in range(50): #t in [1, 6]
     783            for id in range(len(interpolation_points)):
     784                assert allclose(I(t, id), t*answer[id])
     785
     786            t += 0.1   
     787
     788        try:   
     789            I(1)
     790        except:
     791            pass
     792        else:
     793            raise 'Should raise exception'
     794
     795       
     796    def BADtest_interpolate_sww(self):
     797        """Not a unit test, rather a system test for interpolate_sww
     798        This function is obsolete
     799        """
     800       
     801        self.domain.filename = 'datatest' + str(time.time())
     802        self.domain.format = 'sww'
     803        self.domain.smooth = True
     804        self.domain.reduction = mean
     805
     806        sww = get_dataobject(self.domain)
     807        sww.store_connectivity()
     808        sww.store_timestep('stage')
     809        self.domain.time = 2.
     810        sww.store_timestep('stage')
     811
     812        #print "self.domain.filename",self.domain.filename
     813        interp = interpolate_sww(sww.filename, [0.0, 2.0],
     814                                 [[0,1],[0.5,0.5]],
     815                                 ['stage'])
     816        #assert allclose(interp,[0.0,2.0])
     817
     818        #Cleanup
     819        os.remove(sww.filename)
     820       
    532821#-------------------------------------------------------------
    533822if __name__ == "__main__":
Note: See TracChangeset for help on using the changeset viewer.