Changeset 2394
- Timestamp:
- Feb 14, 2006, 12:56:50 PM (19 years ago)
- Location:
- inundation/fit_interpolate
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
inundation/fit_interpolate/interpolate.py
r2201 r2394 21 21 22 22 from Numeric import zeros, array, Float, Int, dot, transpose, concatenate, \ 23 ArrayType, allclose, take 23 ArrayType, allclose, take, NewAxis 24 24 25 25 from pyvolution.mesh import Mesh … … 65 65 66 66 """ 67 68 # why no alpha in parameter list? 67 69 68 70 # Initialise variabels … … 73 75 triangles = ensure_numeric(triangles, Int) 74 76 vertex_coordinates = ensure_numeric(vertex_coordinates, Float) 75 76 77 #Build underlying mesh 77 78 if verbose: print 'Building mesh' … … 282 283 Interpolated values at inputted points (z). 283 284 """ 284 285 #print "point_coordinates interpolate.interpolate",point_coordinates 285 286 # Can I interpolate, based on previous point_coordinates? 286 287 if point_coordinates is None: … … 338 339 def _get_point_data_z(self, f, verbose=False): 339 340 return self._A * f 340 341 342 343 344 class Interpolation_interface: 345 """Interpolation_interface - creates callable object f(t, id) or f(t,x,y) 346 which is interpolated from time series defined at vertices of 347 triangular mesh (such as those stored in sww files) 348 349 Let m be the number of vertices, n the number of triangles 350 and p the number of timesteps. 351 352 Mandatory input 353 time: px1 array of monotonously increasing times (Float) 354 quantities: Dictionary of arrays or 1 array (Float) 355 The arrays must either have dimensions pxm or mx1. 356 The resulting function will be time dependent in 357 the former case while it will be constan with 358 respect to time in the latter case. 359 360 Optional input: 361 quantity_names: List of keys into the quantities dictionary 362 vertex_coordinates: mx2 array of coordinates (Float) 363 triangles: nx3 array of indices into vertex_coordinates (Int) 364 interpolation_points: Nx2 array of coordinates to be interpolated to 365 verbose: Level of reporting 366 367 368 The quantities returned by the callable object are specified by 369 the list quantities which must contain the names of the 370 quantities to be returned and also reflect the order, e.g. for 371 the shallow water wave equation, on would have 372 quantities = ['stage', 'xmomentum', 'ymomentum'] 373 374 The parameter interpolation_points decides at which points interpolated 375 quantities are to be computed whenever object is called. 376 If None, return average value 377 """ 378 379 380 381 def __init__(self, 382 time, 383 quantities, 384 quantity_names = None, 385 vertex_coordinates = None, 386 triangles = None, 387 interpolation_points = None, 388 verbose = False): 389 """Initialise object and build spatial interpolation if required 390 """ 391 392 from Numeric import array, zeros, Float, alltrue, concatenate,\ 393 reshape, ArrayType 394 395 396 from util import mean, ensure_numeric 397 from config import time_format 398 import types 399 400 401 #Check temporal info 402 time = ensure_numeric(time) 403 msg = 'Time must be a monotonuosly ' 404 msg += 'increasing sequence %s' %time 405 assert alltrue(time[1:] - time[:-1] >= 0 ), msg 406 407 408 #Check if quantities is a single array only 409 if type(quantities) != types.DictType: 410 quantities = ensure_numeric(quantities) 411 quantity_names = ['Attribute'] 412 413 #Make it a dictionary 414 quantities = {quantity_names[0]: quantities} 415 416 417 #Use keys if no names are specified 418 if quantity_names is None: 419 quantity_names = quantities.keys() 420 421 422 #Check spatial info 423 if vertex_coordinates is None: 424 self.spatial = False 425 else: 426 vertex_coordinates = ensure_numeric(vertex_coordinates) 427 428 assert triangles is not None, 'Triangles array must be specified' 429 triangles = ensure_numeric(triangles) 430 self.spatial = True 431 432 433 #Save for use with statistics 434 self.quantity_names = quantity_names 435 self.quantities = quantities 436 self.vertex_coordinates = vertex_coordinates 437 self.interpolation_points = interpolation_points 438 self.T = time[:] # Time assumed to be relative to starttime 439 self.index = 0 # Initial time index 440 self.precomputed_values = {} 441 442 #Precomputed spatial interpolation if requested 443 if interpolation_points is not None: 444 if self.spatial is False: 445 raise 'Triangles and vertex_coordinates must be specified' 446 447 try: 448 self.interpolation_points = ensure_numeric(interpolation_points) 449 except: 450 msg = 'Interpolation points must be an N x 2 Numeric array '+\ 451 'or a list of points\n' 452 msg += 'I got: %s.' %(str(self.interpolation_points)[:60] +\ 453 '...') 454 raise msg 455 456 457 m = len(self.interpolation_points) 458 p = len(self.T) 459 460 for name in quantity_names: 461 self.precomputed_values[name] = zeros((p, m), Float) 462 463 #Build interpolator 464 interpol = Interpolate(vertex_coordinates, 465 triangles, 466 #point_coordinates = \ 467 #self.interpolation_points, 468 #alpha = 0, 469 verbose = verbose) 470 471 if verbose: print 'Interpolate' 472 for i, t in enumerate(self.T): 473 #Interpolate quantities at this timestep 474 if verbose and i%((p+10)/10)==0: 475 print ' time step %d of %d' %(i, p) 476 477 for name in quantity_names: 478 if len(quantities[name].shape) == 2: 479 result = interpol.interpolate(quantities[name][i,:], 480 point_coordinates = \ 481 self.interpolation_points) 482 else: 483 #Assume no time dependency 484 result = interpol.interpolate(quantities[name][:], 485 point_coordinates = \ 486 self.interpolation_points) 487 488 self.precomputed_values[name][i, :] = result 489 490 #Report 491 if verbose: 492 print self.statistics() 493 #self.print_statistics() 494 495 else: 496 #Store quantitites as is 497 for name in quantity_names: 498 self.precomputed_values[name] = quantities[name] 499 500 501 def __repr__(self): 502 #return 'Interpolation function (spatio-temporal)' 503 return self.statistics() 504 505 506 def __call__(self, t, point_id = None, x = None, y = None): 507 """Evaluate f(t), f(t, point_id) or f(t, x, y) 508 509 Inputs: 510 t: time - Model time. Must lie within existing timesteps 511 point_id: index of one of the preprocessed points. 512 x, y: Overrides location, point_id ignored 513 514 If spatial info is present and all of x,y,point_id 515 are None an exception is raised 516 517 If no spatial info is present, point_id and x,y arguments are ignored 518 making f a function of time only. 519 520 521 FIXME: point_id could also be a slice 522 FIXME: What if x and y are vectors? 523 FIXME: What about f(x,y) without t? 524 """ 525 526 from math import pi, cos, sin, sqrt 527 from Numeric import zeros, Float 528 from util import mean 529 530 if self.spatial is True: 531 if point_id is None: 532 if x is None or y is None: 533 msg = 'Either point_id or x and y must be specified' 534 raise msg 535 else: 536 if self.interpolation_points is None: 537 msg = 'Interpolation_function must be instantiated ' +\ 538 'with a list of interpolation points before parameter ' +\ 539 'point_id can be used' 540 raise msg 541 542 543 msg = 'Time interval [%s:%s]' %(self.T[0], self.T[1]) 544 msg += ' does not match model time: %s\n' %t 545 if t < self.T[0]: raise msg 546 if t > self.T[-1]: raise msg 547 548 oldindex = self.index #Time index 549 550 #Find current time slot 551 while t > self.T[self.index]: self.index += 1 552 while t < self.T[self.index]: self.index -= 1 553 554 if t == self.T[self.index]: 555 #Protect against case where t == T[-1] (last time) 556 # - also works in general when t == T[i] 557 ratio = 0 558 else: 559 #t is now between index and index+1 560 ratio = (t - self.T[self.index])/\ 561 (self.T[self.index+1] - self.T[self.index]) 562 563 #Compute interpolated values 564 q = zeros(len(self.quantity_names), Float) 565 566 for i, name in enumerate(self.quantity_names): 567 Q = self.precomputed_values[name] 568 569 if self.spatial is False: 570 #If there is no spatial info 571 assert len(Q.shape) == 1 572 573 Q0 = Q[self.index] 574 if ratio > 0: Q1 = Q[self.index+1] 575 576 else: 577 if x is not None and y is not None: 578 #Interpolate to x, y 579 580 raise 'x,y interpolation not yet implemented' 581 else: 582 #Use precomputed point 583 Q0 = Q[self.index, point_id] 584 if ratio > 0: Q1 = Q[self.index+1, point_id] 585 586 #Linear temporal interpolation 587 if ratio > 0: 588 q[i] = Q0 + ratio*(Q1 - Q0) 589 else: 590 q[i] = Q0 591 592 593 #Return vector of interpolated values 594 #if len(q) == 1: 595 # return q[0] 596 #else: 597 # return q 598 599 600 #Return vector of interpolated values 601 #FIXME: 602 if self.spatial is True: 603 return q 604 else: 605 #Replicate q according to x and y 606 #This is e.g used for Wind_stress 607 if x is None or y is None: 608 return q 609 else: 610 try: 611 N = len(x) 612 except: 613 return q 614 else: 615 from Numeric import ones, Float 616 #x is a vector - Create one constant column for each value 617 N = len(x) 618 assert len(y) == N, 'x and y must have same length' 619 res = [] 620 for col in q: 621 res.append(col*ones(N, Float)) 622 623 return res 624 625 626 def statistics(self): 627 """Output statistics about interpolation_function 628 """ 629 630 vertex_coordinates = self.vertex_coordinates 631 interpolation_points = self.interpolation_points 632 quantity_names = self.quantity_names 633 quantities = self.quantities 634 precomputed_values = self.precomputed_values 635 636 x = vertex_coordinates[:,0] 637 y = vertex_coordinates[:,1] 638 639 str = '------------------------------------------------\n' 640 str += 'Interpolation_function (spatio-temporal) statistics:\n' 641 str += ' Extent:\n' 642 str += ' x in [%f, %f], len(x) == %d\n'\ 643 %(min(x), max(x), len(x)) 644 str += ' y in [%f, %f], len(y) == %d\n'\ 645 %(min(y), max(y), len(y)) 646 str += ' t in [%f, %f], len(t) == %d\n'\ 647 %(min(self.T), max(self.T), len(self.T)) 648 str += ' Quantities:\n' 649 for name in quantity_names: 650 q = quantities[name][:].flat 651 str += ' %s in [%f, %f]\n' %(name, min(q), max(q)) 652 653 if interpolation_points is not None: 654 str += ' Interpolation points (xi, eta):'\ 655 ' number of points == %d\n' %interpolation_points.shape[0] 656 str += ' xi in [%f, %f]\n' %(min(interpolation_points[:,0]), 657 max(interpolation_points[:,0])) 658 str += ' eta in [%f, %f]\n' %(min(interpolation_points[:,1]), 659 max(interpolation_points[:,1])) 660 str += ' Interpolated quantities (over all timesteps):\n' 661 662 for name in quantity_names: 663 q = precomputed_values[name][:].flat 664 str += ' %s at interpolation points in [%f, %f]\n'\ 665 %(name, min(q), max(q)) 666 str += '------------------------------------------------\n' 667 668 return str 669 670 def interpolate_sww(sww_file, time, interpolation_points, 671 quantity_names = None, verbose = False): 672 """ 673 obsolete. 674 use file_function in utils 675 """ 676 #open sww file 677 x, y, volumes, time, quantities = read_sww(sww_file) 678 print "x",x 679 print "y",y 680 681 print "time", time 682 print "quantities", quantities 683 684 #Add the x and y together 685 vertex_coordinates = concatenate((x[:,NewAxis], y[:,NewAxis]),axis=1) 686 687 #Will return the quantity values at the specified times and locations 688 interp = Interpolation_interface( 689 time, 690 quantities, 691 quantity_names = quantity_names, 692 vertex_coordinates = vertex_coordinates, 693 triangles = volumes, 694 interpolation_points = interpolation_points, 695 verbose = verbose) 696 697 698 def read_sww(file_name): 699 """ 700 Read in an sww file. 701 702 Input; 703 file_name - the sww file 704 705 Output; 706 x - Vector of x values 707 y - Vector of y values 708 z - Vector of bed elevation 709 volumes - Array. Each row has 3 values, representing 710 the vertices that define the volume 711 time - Vector of the times where there is stage information 712 stage - array with respect to time and vertices (x,y) 713 """ 714 715 #FIXME Have this reader as part of data_manager? 716 717 from Scientific.IO.NetCDF import NetCDFFile 718 import tempfile 719 import sys 720 import os 721 722 #Check contents 723 #Get NetCDF 724 725 # see if the file is there. Throw a QUIET IO error if it isn't 726 # I don't think I could implement the above 727 728 #throws prints to screen if file not present 729 junk = tempfile.mktemp(".txt") 730 fd = open(junk,'w') 731 stdout = sys.stdout 732 sys.stdout = fd 733 fid = NetCDFFile(file_name, 'r') 734 sys.stdout = stdout 735 fd.close() 736 #clean up 737 os.remove(junk) 738 739 # Get the variables 740 x = fid.variables['x'][:] 741 y = fid.variables['y'][:] 742 volumes = fid.variables['volumes'][:] 743 time = fid.variables['time'][:] 744 745 keys = fid.variables.keys() 746 keys.remove("x") 747 keys.remove("y") 748 keys.remove("volumes") 749 keys.remove("time") 750 #Turn NetCDF objects into Numeric arrays 751 quantities = {} 752 for name in keys: 753 quantities[name] = fid.variables[name][:] 754 755 fid.close() 756 return x, y, volumes, time, quantities 757 341 758 #------------------------------------------------------------- 342 759 if __name__ == "__main__": -
inundation/fit_interpolate/test_interpolate.py
r2201 r2394 2 2 3 3 #TEST 4 5 #import time, os 6 7 4 8 import sys 9 import os 5 10 import unittest 6 11 from math import sqrt 7 8 12 import tempfile 13 14 from Scientific.IO.NetCDF import NetCDFFile 15 from Numeric import allclose, array, transpose, zeros, Float 16 17 18 # ANUGA code imports 9 19 from interpolate import * 10 from Numeric import allclose, array, transpose11 12 20 from coordinate_transforms.geo_reference import Geo_reference 21 from shallow_water import Domain, Transmissive_boundary #, mean 22 from util import mean 23 from data_manager import get_dataobject 13 24 14 25 def distance(x, y): … … 23 34 24 35 def setUp(self): 25 pass 36 37 import time 38 from mesh_factory import rectangular 39 40 41 #Create basic mesh 42 points, vertices, boundary = rectangular(2, 2) 43 44 #Create shallow water domain 45 domain = Domain(points, vertices, boundary) 46 domain.default_order=2 47 domain.beta_h = 0 48 49 50 #Set some field values 51 domain.set_quantity('elevation', lambda x,y: -x) 52 domain.set_quantity('friction', 0.03) 53 54 55 ###################### 56 # Boundary conditions 57 B = Transmissive_boundary(domain) 58 domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B}) 59 60 61 ###################### 62 #Initial condition - with jumps 63 64 bed = domain.quantities['elevation'].vertex_values 65 stage = zeros(bed.shape, Float) 66 67 h = 0.3 68 for i in range(stage.shape[0]): 69 if i % 2 == 0: 70 stage[i,:] = bed[i,:] + h 71 else: 72 stage[i,:] = bed[i,:] 73 74 domain.set_quantity('stage', stage) 75 76 domain.distribute_to_vertices_and_edges() 77 78 79 self.domain = domain 80 81 C = domain.get_vertex_coordinates() 82 self.X = C[:,0:6:2].copy() 83 self.Y = C[:,1:6:2].copy() 84 85 self.F = bed 86 87 26 88 27 89 def tearDown(self): … … 530 592 531 593 594 595 def test_interpolation_interface_time_only(self): 596 """Test spatio-temporal interpolation 597 Test that spatio temporal function performs the correct 598 interpolations in both time and space 599 """ 600 601 602 #Three timesteps 603 time = [1.0, 5.0, 6.0] 604 605 606 #One quantity 607 Q = zeros( (3,6), Float ) 608 609 #Linear in time and space 610 a = [0.0, 0.0] 611 b = [0.0, 2.0] 612 c = [2.0, 0.0] 613 d = [0.0, 4.0] 614 e = [2.0, 2.0] 615 f = [4.0, 0.0] 616 617 points = [a, b, c, d, e, f] 618 619 for i, t in enumerate(time): 620 Q[i, :] = t*linear_function(points) 621 622 623 #Check basic interpolation of one quantity using averaging 624 #(no interpolation points or spatial info) 625 from util import mean 626 I = Interpolation_interface(time, [mean(Q[0,:]), 627 mean(Q[1,:]), 628 mean(Q[2,:])]) 629 630 631 632 #Check temporal interpolation 633 for i in [0,1,2]: 634 assert allclose(I(time[i]), mean(Q[i,:])) 635 636 #Midway 637 assert allclose(I( (time[0] + time[1])/2 ), 638 (I(time[0]) + I(time[1]))/2 ) 639 640 assert allclose(I( (time[1] + time[2])/2 ), 641 (I(time[1]) + I(time[2]))/2 ) 642 643 assert allclose(I( (time[0] + time[2])/2 ), 644 (I(time[0]) + I(time[2]))/2 ) 645 646 #1/3 647 assert allclose(I( (time[0] + time[2])/3 ), 648 (I(time[0]) + I(time[2]))/3 ) 649 650 651 #Out of bounds checks 652 try: 653 I(time[0]-1) 654 except: 655 pass 656 else: 657 raise 'Should raise exception' 658 659 try: 660 I(time[-1]+1) 661 except: 662 pass 663 else: 664 raise 'Should raise exception' 665 666 667 668 669 def test_interpolation_interface_spatial_only(self): 670 """Test spatio-temporal interpolation with constant time 671 """ 672 673 #Three timesteps 674 time = [1.0, 5.0, 6.0] 675 676 677 #Setup mesh used to represent fitted function 678 a = [0.0, 0.0] 679 b = [0.0, 2.0] 680 c = [2.0, 0.0] 681 d = [0.0, 4.0] 682 e = [2.0, 2.0] 683 f = [4.0, 0.0] 684 685 points = [a, b, c, d, e, f] 686 #bac, bce, ecf, dbe 687 triangles = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 688 689 690 #New datapoints where interpolated values are sought 691 interpolation_points = [[ 0.0, 0.0], 692 [ 0.5, 0.5], 693 [ 0.7, 0.7], 694 [ 1.0, 0.5], 695 [ 2.0, 0.4], 696 [ 2.8, 1.2]] 697 698 699 #One quantity linear in space 700 Q = linear_function(points) 701 702 703 #Check interpolation of one quantity using interpolaton points 704 I = Interpolation_interface(time, Q, 705 vertex_coordinates = points, 706 triangles = triangles, 707 interpolation_points = interpolation_points, 708 verbose = False) 709 710 711 answer = linear_function(interpolation_points) 712 713 t = time[0] 714 for j in range(50): #t in [1, 6] 715 for id in range(len(interpolation_points)): 716 assert allclose(I(t, id), answer[id]) 717 718 t += 0.1 719 720 721 try: 722 I(1) 723 except: 724 pass 725 else: 726 raise 'Should raise exception' 727 728 729 730 def test_interpolation_interface(self): 731 """Test spatio-temporal interpolation 732 Test that spatio temporal function performs the correct 733 interpolations in both time and space 734 """ 735 736 737 #Three timesteps 738 time = [1.0, 5.0, 6.0] 739 740 741 #Setup mesh used to represent fitted function 742 a = [0.0, 0.0] 743 b = [0.0, 2.0] 744 c = [2.0, 0.0] 745 d = [0.0, 4.0] 746 e = [2.0, 2.0] 747 f = [4.0, 0.0] 748 749 points = [a, b, c, d, e, f] 750 #bac, bce, ecf, dbe 751 triangles = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 752 753 754 #New datapoints where interpolated values are sought 755 interpolation_points = [[ 0.0, 0.0], 756 [ 0.5, 0.5], 757 [ 0.7, 0.7], 758 [ 1.0, 0.5], 759 [ 2.0, 0.4], 760 [ 2.8, 1.2]] 761 762 763 #One quantity 764 Q = zeros( (3,6), Float ) 765 766 #Linear in time and space 767 for i, t in enumerate(time): 768 Q[i, :] = t*linear_function(points) 769 770 771 #Check interpolation of one quantity using interpolaton points) 772 I = Interpolation_interface(time, Q, 773 vertex_coordinates = points, 774 triangles = triangles, 775 interpolation_points = interpolation_points, 776 verbose = False) 777 778 779 answer = linear_function(interpolation_points) 780 781 t = time[0] 782 for j in range(50): #t in [1, 6] 783 for id in range(len(interpolation_points)): 784 assert allclose(I(t, id), t*answer[id]) 785 786 t += 0.1 787 788 try: 789 I(1) 790 except: 791 pass 792 else: 793 raise 'Should raise exception' 794 795 796 def BADtest_interpolate_sww(self): 797 """Not a unit test, rather a system test for interpolate_sww 798 This function is obsolete 799 """ 800 801 self.domain.filename = 'datatest' + str(time.time()) 802 self.domain.format = 'sww' 803 self.domain.smooth = True 804 self.domain.reduction = mean 805 806 sww = get_dataobject(self.domain) 807 sww.store_connectivity() 808 sww.store_timestep('stage') 809 self.domain.time = 2. 810 sww.store_timestep('stage') 811 812 #print "self.domain.filename",self.domain.filename 813 interp = interpolate_sww(sww.filename, [0.0, 2.0], 814 [[0,1],[0.5,0.5]], 815 ['stage']) 816 #assert allclose(interp,[0.0,2.0]) 817 818 #Cleanup 819 os.remove(sww.filename) 820 532 821 #------------------------------------------------------------- 533 822 if __name__ == "__main__":
Note: See TracChangeset
for help on using the changeset viewer.