#!/usr/bin/env python #TEST #import time, os import sys import os import unittest from math import sqrt import tempfile from Scientific.IO.NetCDF import NetCDFFile from Numeric import allclose, array, transpose, zeros, Float # ANUGA code imports from interpolate import * from coordinate_transforms.geo_reference import Geo_reference from shallow_water import Domain, Transmissive_boundary #, mean from anuga.pyvolution.util import mean from anuga.pyvolution.data_manager import get_dataobject def distance(x, y): return sqrt( sum( (array(x)-array(y))**2 )) def linear_function(point): point = array(point) return point[:,0]+point[:,1] class Test_Interpolate(unittest.TestCase): def setUp(self): import time from mesh_factory import rectangular #Create basic mesh points, vertices, boundary = rectangular(2, 2) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.default_order=2 domain.beta_h = 0 #Set some field values domain.set_quantity('elevation', lambda x,y: -x) domain.set_quantity('friction', 0.03) ###################### # Boundary conditions B = Transmissive_boundary(domain) domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B}) ###################### #Initial condition - with jumps bed = domain.quantities['elevation'].vertex_values stage = zeros(bed.shape, Float) h = 0.3 for i in range(stage.shape[0]): if i % 2 == 0: stage[i,:] = bed[i,:] + h else: stage[i,:] = bed[i,:] domain.set_quantity('stage', stage) domain.distribute_to_vertices_and_edges() self.domain = domain C = domain.get_vertex_coordinates() self.X = C[:,0:6:2].copy() self.Y = C[:,1:6:2].copy() self.F = bed def tearDown(self): pass def test_datapoint_at_centroid(self): a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] points = [a, b, c] vertices = [ [1,0,2] ] #bac data = [ [2.0/3, 2.0/3] ] #Use centroid as one data point interp = Interpolate(points, vertices) assert allclose(interp._build_interpolation_matrix_A(data).todense(), [[1./3, 1./3, 1./3]]) def test_quad_tree(self): p0 = [-10.0, -10.0] p1 = [20.0, -10.0] p2 = [-10.0, 20.0] p3 = [10.0, 50.0] p4 = [30.0, 30.0] p5 = [50.0, 10.0] p6 = [40.0, 60.0] p7 = [60.0, 40.0] p8 = [-66.0, 20.0] p9 = [10.0, -66.0] points = [p0, p1, p2, p3, p4, p5, p6, p7, p8, p9] triangles = [ [0, 1, 2], [3, 2, 4], [4, 2, 1], [4, 1, 5], [3, 4, 6], [6, 4, 7], [7, 4, 5], [8, 0, 2], [0, 9, 1]] data = [ [4,4] ] interp = Interpolate(points, triangles, max_vertices_per_cell = 4) #print "PDSG - interp.get_A()", interp.get_A() answer = [ [ 0.06666667, 0.46666667, 0.46666667, 0., 0., 0. , 0., 0., 0., 0.]] assert allclose(interp._build_interpolation_matrix_A(data).todense(), answer) #interp.set_point_coordinates([[-30, -30]]) #point outside of mesh #print "PDSG - interp.get_A()", interp.get_A() data = [[-30, -30]] answer = [ [ 0.0, 0.0, 0.0, 0., 0., 0. , 0., 0., 0., 0.]] assert allclose(interp._build_interpolation_matrix_A(data).todense(), answer) #point outside of quad tree root cell #interp.set_point_coordinates([[-70, -70]]) #print "PDSG - interp.get_A()", interp.get_A() data = [[-70, -70]] answer = [ [ 0.0, 0.0, 0.0, 0., 0., 0. , 0., 0., 0., 0.]] assert allclose(interp._build_interpolation_matrix_A(data).todense(), answer) def test_datapoints_at_vertices(self): """Test that data points coinciding with vertices yield a diagonal matrix """ a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] points = [a, b, c] vertices = [ [1,0,2] ] #bac data = points #Use data at vertices interp = Interpolate(points, vertices) answer = [[1., 0., 0.], [0., 1., 0.], [0., 0., 1.]] assert allclose(interp._build_interpolation_matrix_A(data).todense(), answer) def test_datapoints_on_edge_midpoints(self): """Try datapoints midway on edges - each point should affect two matrix entries equally """ a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] points = [a, b, c] vertices = [ [1,0,2] ] #bac data = [ [0., 1.], [1., 0.], [1., 1.] ] answer = [[0.5, 0.5, 0.0], #Affects vertex 1 and 0 [0.5, 0.0, 0.5], #Affects vertex 0 and 2 [0.0, 0.5, 0.5]] interp = Interpolate(points, vertices, data) assert allclose(interp._build_interpolation_matrix_A(data).todense(), answer) def test_datapoints_on_edges(self): """Try datapoints on edges - each point should affect two matrix entries in proportion """ a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] points = [a, b, c] vertices = [ [1,0,2] ] #bac data = [ [0., 1.5], [1.5, 0.], [1.5, 0.5] ] answer = [[0.25, 0.75, 0.0], #Affects vertex 1 and 0 [0.25, 0.0, 0.75], #Affects vertex 0 and 2 [0.0, 0.25, 0.75]] interp = Interpolate(points, vertices, data) assert allclose(interp._build_interpolation_matrix_A(data).todense(), answer) def test_arbitrary_datapoints(self): """Try arbitrary datapoints """ from Numeric import sum a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] points = [a, b, c] vertices = [ [1,0,2] ] #bac data = [ [0.2, 1.5], [0.123, 1.768], [1.43, 0.44] ] interp = Interpolate(points, vertices, data) #print "interp.get_A()", interp.get_A() results = interp._build_interpolation_matrix_A(data).todense() assert allclose(sum(results, axis=1), 1.0) #FIXME - have to change this test to check default info def NO_test_arbitrary_datapoints_some_outside(self): """Try arbitrary datapoints one outside the triangle. That one should be ignored """ from Numeric import sum a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0,0.0] points = [a, b, c] vertices = [ [1,0,2] ] #bac data = [ [0.2, 1.5], [0.123, 1.768], [1.43, 0.44], [5.0, 7.0]] interp = Interpolate(points, vertices, data, precrop = True) results = interp._build_interpolation_matrix_A(data).todense() assert allclose(sum(results, axis=1), 1.0) interp = Interpolate(points, vertices, data, precrop = False) results = interp._build_interpolation_matrix_A(data).todense() assert allclose(sum(results, axis=1), [1,1,1,0]) # this causes a memory error in scipy.sparse def test_more_triangles(self): a = [-1.0, 0.0] b = [3.0, 4.0] c = [4.0,1.0] d = [-3.0, 2.0] #3 e = [-1.0,-2.0] f = [1.0, -2.0] #5 points = [a, b, c, d,e,f] triangles = [[0,1,3],[1,0,2],[0,4,5], [0,5,2]] #abd bac aef afc #Data points data = [ [-3., 2.0], [-2, 1], [0.0, 1], [0, 3], [2, 3], [-1.0/3,-4./3] ] interp = Interpolate(points, triangles) answer = [[0.0, 0.0, 0.0, 1.0, 0.0, 0.0], #Affects point d [0.5, 0.0, 0.0, 0.5, 0.0, 0.0], #Affects points a and d [0.75, 0.25, 0.0, 0.0, 0.0, 0.0], #Affects points a and b [0.0, 0.5, 0.0, 0.5, 0.0, 0.0], #Affects points a and d [0.25, 0.75, 0.0, 0.0, 0.0, 0.0], #Affects points a and b [1./3, 0.0, 0.0, 0.0, 1./3, 1./3]] #Affects points a, e and f A = interp._build_interpolation_matrix_A(data).todense() for i in range(A.shape[0]): for j in range(A.shape[1]): if not allclose(A[i,j], answer[i][j]): print i,j,':',A[i,j], answer[i][j] #results = interp._build_interpolation_matrix_A(data).todense() assert allclose(A, answer) def test_interpolate_attributes_to_points(self): v0 = [0.0, 0.0] v1 = [0.0, 5.0] v2 = [5.0, 0.0] vertices = [v0, v1, v2] triangles = [ [1,0,2] ] #bac d0 = [1.0, 1.0] d1 = [1.0, 2.0] d2 = [3.0, 1.0] point_coords = [ d0, d1, d2] interp = Interpolate(vertices, triangles, point_coords) f = linear_function(vertices) z = interp.interpolate(f, point_coords) answer = linear_function(point_coords) assert allclose(z, answer) def test_interpolate_attributes_to_pointsII(self): a = [-1.0, 0.0] b = [3.0, 4.0] c = [4.0, 1.0] d = [-3.0, 2.0] #3 e = [-1.0, -2.0] f = [1.0, -2.0] #5 vertices = [a, b, c, d,e,f] triangles = [[0,1,3], [1,0,2], [0,4,5], [0,5,2]] #abd bac aef afc point_coords = [[-2.0, 2.0], [-1.0, 1.0], [0.0, 2.0], [1.0, 1.0], [2.0, 1.0], [0.0, 0.0], [1.0, 0.0], [0.0, -1.0], [-0.2, -0.5], [-0.9, -1.5], [0.5, -1.9], [3.0, 1.0]] interp = Interpolate(vertices, triangles) f = linear_function(vertices) z = interp.interpolate(f, point_coords) answer = linear_function(point_coords) #print "z",z #print "answer",answer assert allclose(z, answer) def test_interpolate_attributes_to_pointsIII(self): """Test linear interpolation of known values at vertices to new points inside a triangle """ a = [0.0, 0.0] b = [0.0, 5.0] c = [5.0, 0.0] d = [5.0, 5.0] vertices = [a, b, c, d] triangles = [ [1,0,2], [2,3,1] ] #bac, cdb #Points within triangle 1 d0 = [1.0, 1.0] d1 = [1.0, 2.0] d2 = [3.0, 1.0] #Point within triangle 2 d3 = [4.0, 3.0] #Points on common edge d4 = [2.5, 2.5] d5 = [4.0, 1.0] #Point on common vertex d6 = [0., 5.] point_coords = [d0, d1, d2, d3, d4, d5, d6] interp = Interpolate(vertices, triangles) #Known values at vertices #Functions are x+y, x+2y, 2x+y, x-y-5 f = [ [0., 0., 0., -5.], # (0,0) [5., 10., 5., -10.], # (0,5) [5., 5., 10.0, 0.], # (5,0) [10., 15., 15., -5.]] # (5,5) z = interp.interpolate(f, point_coords) answer = [ [2., 3., 3., -5.], # (1,1) [3., 5., 4., -6.], # (1,2) [4., 5., 7., -3.], # (3,1) [7., 10., 11., -4.], # (4,3) [5., 7.5, 7.5, -5.], # (2.5, 2.5) [5., 6., 9., -2.], # (4,1) [5., 10., 5., -10.]] # (0,5) #print "***********" #print "z",z #print "answer",answer #print "***********" #Should an error message be returned if points are outside # of the mesh? Not currently. assert allclose(z, answer) def test_interpolate_point_outside_of_mesh(self): """Test linear interpolation of known values at vertices to new points inside a triangle """ a = [0.0, 0.0] b = [0.0, 5.0] c = [5.0, 0.0] d = [5.0, 5.0] vertices = [a, b, c, d] triangles = [ [1,0,2], [2,3,1] ] #bac, cdb #Far away point d7 = [-1., -1.] point_coords = [ d7] interp = Interpolate(vertices, triangles) #Known values at vertices #Functions are x+y, x+2y, 2x+y, x-y-5 f = [ [0., 0., 0., -5.], # (0,0) [5., 10., 5., -10.], # (0,5) [5., 5., 10.0, 0.], # (5,0) [10., 15., 15., -5.]] # (5,5) z = interp.interpolate(f, point_coords) answer = [ [0., 0., 0., 0.]] # (-1,-1) #print "***********" #print "z",z #print "answer",answer #print "***********" #Should an error message be returned if points are outside # of the mesh? Not currently. assert allclose(z, answer) def test_interpolate_attributes_to_pointsIV(self): a = [-1.0, 0.0] b = [3.0, 4.0] c = [4.0, 1.0] d = [-3.0, 2.0] #3 e = [-1.0, -2.0] f = [1.0, -2.0] #5 vertices = [a, b, c, d,e,f] triangles = [[0,1,3], [1,0,2], [0,4,5], [0,5,2]] #abd bac aef afc point_coords = [[-2.0, 2.0], [-1.0, 1.0], [0.0, 2.0], [1.0, 1.0], [2.0, 1.0], [0.0, 0.0], [1.0, 0.0], [0.0, -1.0], [-0.2, -0.5], [-0.9, -1.5], [0.5, -1.9], [3.0, 1.0]] interp = Interpolate(vertices, triangles) f = array([linear_function(vertices),2*linear_function(vertices) ]) f = transpose(f) #print "f",f z = interp.interpolate(f, point_coords) answer = [linear_function(point_coords), 2*linear_function(point_coords) ] answer = transpose(answer) #print "z",z #print "answer",answer assert allclose(z, answer) def test_interpolate_blocking(self): a = [-1.0, 0.0] b = [3.0, 4.0] c = [4.0, 1.0] d = [-3.0, 2.0] #3 e = [-1.0, -2.0] f = [1.0, -2.0] #5 vertices = [a, b, c, d,e,f] triangles = [[0,1,3], [1,0,2], [0,4,5], [0,5,2]] #abd bac aef afc point_coords = [[-2.0, 2.0], [-1.0, 1.0], [0.0, 2.0], [1.0, 1.0], [2.0, 1.0], [0.0, 0.0], [1.0, 0.0], [0.0, -1.0], [-0.2, -0.5], [-0.9, -1.5], [0.5, -1.9], [3.0, 1.0]] interp = Interpolate(vertices, triangles) f = array([linear_function(vertices),2*linear_function(vertices) ]) f = transpose(f) #print "f",f for blocking_max in range(len(point_coords)+2): #if True: # blocking_max = 5 z = interp.interpolate(f, point_coords, start_blocking_len=blocking_max) answer = [linear_function(point_coords), 2*linear_function(point_coords) ] answer = transpose(answer) #print "z",z #print "answer",answer assert allclose(z, answer) def test_interpolate_reuse(self): a = [-1.0, 0.0] b = [3.0, 4.0] c = [4.0, 1.0] d = [-3.0, 2.0] #3 e = [-1.0, -2.0] f = [1.0, -2.0] #5 vertices = [a, b, c, d,e,f] triangles = [[0,1,3], [1,0,2], [0,4,5], [0,5,2]] #abd bac aef afc point_coords = [[-2.0, 2.0], [-1.0, 1.0], [0.0, 2.0], [1.0, 1.0], [2.0, 1.0], [0.0, 0.0], [1.0, 0.0], [0.0, -1.0], [-0.2, -0.5], [-0.9, -1.5], [0.5, -1.9], [3.0, 1.0]] interp = Interpolate(vertices, triangles) f = array([linear_function(vertices),2*linear_function(vertices) ]) f = transpose(f) z = interp.interpolate(f, point_coords, start_blocking_len=20) answer = [linear_function(point_coords), 2*linear_function(point_coords) ] answer = transpose(answer) #print "z",z #print "answer",answer assert allclose(z, answer) assert allclose(interp._A_can_be_reused, True) z = interp.interpolate(f) assert allclose(z, answer) # This causes blocking to occur. z = interp.interpolate(f, start_blocking_len=10) assert allclose(z, answer) assert allclose(interp._A_can_be_reused, False) #A is recalculated z = interp.interpolate(f) assert allclose(z, answer) assert allclose(interp._A_can_be_reused, True) interp = Interpolate(vertices, triangles) #Must raise an exception, no points specified try: z = interp.interpolate(f) except: pass def test_interpolation_interface_time_only(self): """Test spatio-temporal interpolation Test that spatio temporal function performs the correct interpolations in both time and space """ #Three timesteps time = [1.0, 5.0, 6.0] #One quantity Q = zeros( (3,6), Float ) #Linear in time and space a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] for i, t in enumerate(time): Q[i, :] = t*linear_function(points) #Check basic interpolation of one quantity using averaging #(no interpolation points or spatial info) from anuga.pyvolution.util import mean I = Interpolation_interface(time, [mean(Q[0,:]), mean(Q[1,:]), mean(Q[2,:])]) #Check temporal interpolation for i in [0,1,2]: assert allclose(I(time[i]), mean(Q[i,:])) #Midway assert allclose(I( (time[0] + time[1])/2 ), (I(time[0]) + I(time[1]))/2 ) assert allclose(I( (time[1] + time[2])/2 ), (I(time[1]) + I(time[2]))/2 ) assert allclose(I( (time[0] + time[2])/2 ), (I(time[0]) + I(time[2]))/2 ) #1/3 assert allclose(I( (time[0] + time[2])/3 ), (I(time[0]) + I(time[2]))/3 ) #Out of bounds checks try: I(time[0]-1) except: pass else: raise 'Should raise exception' try: I(time[-1]+1) except: pass else: raise 'Should raise exception' def test_interpolation_interface_spatial_only(self): """Test spatio-temporal interpolation with constant time """ #Three timesteps time = [1.0, 5.0, 6.0] #Setup mesh used to represent fitted function a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe triangles = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] #New datapoints where interpolated values are sought interpolation_points = [[ 0.0, 0.0], [ 0.5, 0.5], [ 0.7, 0.7], [ 1.0, 0.5], [ 2.0, 0.4], [ 2.8, 1.2]] #One quantity linear in space Q = linear_function(points) #Check interpolation of one quantity using interpolaton points I = Interpolation_interface(time, Q, vertex_coordinates = points, triangles = triangles, interpolation_points = interpolation_points, verbose = False) answer = linear_function(interpolation_points) t = time[0] for j in range(50): #t in [1, 6] for id in range(len(interpolation_points)): assert allclose(I(t, id), answer[id]) t += 0.1 try: I(1) except: pass else: raise 'Should raise exception' def test_interpolation_interface(self): """Test spatio-temporal interpolation Test that spatio temporal function performs the correct interpolations in both time and space """ #Three timesteps time = [1.0, 5.0, 6.0] #Setup mesh used to represent fitted function a = [0.0, 0.0] b = [0.0, 2.0] c = [2.0, 0.0] d = [0.0, 4.0] e = [2.0, 2.0] f = [4.0, 0.0] points = [a, b, c, d, e, f] #bac, bce, ecf, dbe triangles = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] #New datapoints where interpolated values are sought interpolation_points = [[ 0.0, 0.0], [ 0.5, 0.5], [ 0.7, 0.7], [ 1.0, 0.5], [ 2.0, 0.4], [ 2.8, 1.2]] #One quantity Q = zeros( (3,6), Float ) #Linear in time and space for i, t in enumerate(time): Q[i, :] = t*linear_function(points) #Check interpolation of one quantity using interpolaton points) I = Interpolation_interface(time, Q, vertex_coordinates = points, triangles = triangles, interpolation_points = interpolation_points, verbose = False) answer = linear_function(interpolation_points) t = time[0] for j in range(50): #t in [1, 6] for id in range(len(interpolation_points)): assert allclose(I(t, id), t*answer[id]) t += 0.1 try: I(1) except: pass else: raise 'Should raise exception' def BADtest_interpolate_sww(self): """Not a unit test, rather a system test for interpolate_sww This function is obsolete """ self.domain.filename = 'datatest' + str(time.time()) self.domain.format = 'sww' self.domain.smooth = True self.domain.reduction = mean sww = get_dataobject(self.domain) sww.store_connectivity() sww.store_timestep('stage') self.domain.time = 2. sww.store_timestep('stage') #print "self.domain.filename",self.domain.filename interp = interpolate_sww(sww.filename, [0.0, 2.0], [[0,1],[0.5,0.5]], ['stage']) #assert allclose(interp,[0.0,2.0]) #Cleanup os.remove(sww.filename) #------------------------------------------------------------- if __name__ == "__main__": suite = unittest.makeSuite(Test_Interpolate,'test') runner = unittest.TextTestRunner(verbosity=1) runner.run(suite)