#!/usr/bin/env python # """ Testing interpolate_sww, based on test_data_manageer, so there maybe code that isn't needed, eg in the setup file """ import unittest from Numeric import zeros, array, allclose, Float from util import mean from interpolate_sww import * from shallow_water import * from data_manager import * #from config import epsilon class dataTestCase(unittest.TestCase): def setUp(self): import time from mesh_factory import rectangular #Create basic mesh points, vertices, boundary = rectangular(2, 2) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.default_order=2 #Set some field values domain.set_quantity('elevation', lambda x,y: -x) domain.set_quantity('friction', 0.03) ###################### # Boundary conditions B = Transmissive_boundary(domain) domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B}) ###################### #Initial condition - with jumps bed = domain.quantities['elevation'].vertex_values level = zeros(bed.shape, Float) h = 0.3 for i in range(level.shape[0]): if i % 2 == 0: level[i,:] = bed[i,:] + h else: level[i,:] = bed[i,:] domain.set_quantity('level', level) domain.distribute_to_vertices_and_edges() self.domain = domain C = domain.get_vertex_coordinates() self.X = C[:,0:6:2].copy() self.Y = C[:,1:6:2].copy() self.F = bed def tearDown(self): pass def test_sww_DSG(self): """Not a test, rather a look at the sww format """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile self.domain.filename = 'datatest' + str(time.time()) self.domain.format = 'sww' self.domain.smooth = True self.domain.reduction = mean sww = get_dataobject(self.domain) sww.store_connectivity() sww.store_timestep('level') self.domain.time = 2. sww.store_timestep('level') #Check contents #Get NetCDF fid = NetCDFFile(sww.filename, 'r') # Get the variables x = fid.variables['x'] y = fid.variables['y'] z = fid.variables['z'] volumes = fid.variables['volumes'] time = fid.variables['time'] # 2D stage = fid.variables['stage'] X = x[:] Y = y[:] Z = z[:] V = volumes[:] T = time[:] S = stage[:,:] if False: print "****************************" print "X ",X print "****************************" print "Y ",Y print "****************************" print "Z ",Z print "****************************" print "V ",V print "****************************" print "Time ",T print "****************************" print "Stage ",S print "****************************" fid.close() #Cleanup os.remove(sww.filename) def test_interpolate_sww(self): """Not reaa unit test, rather a system test for """ import time, os from Numeric import array, zeros, allclose, Float, concatenate, \ transpose from Scientific.IO.NetCDF import NetCDFFile import tempfile from load_mesh.loadASCII import load_xya_file self.domain.filename = 'datatest' + str(time.time()) self.domain.format = 'sww' self.domain.smooth = True self.domain.reduction = mean sww = get_dataobject(self.domain) sww.store_connectivity() sww.store_timestep('level') self.domain.time = 2. sww.store_timestep('level') #print "self.domain.filename",self.domain.filename interp = Interpolate_sww(sww.filename, 'height') assert allclose(interp.time,[0.0,2.0]) #answer = [ 0.15, 0.1, 0., -0.3, -0.35, -0.4, -0.7, -0.8, -0.850] #print "answer",answer #print interp.stage[0] #stage_t = transpose(interp.stage) #assert allclose(stage_t[0], answer) #assert allclose(stage_t[1],stage_t[0]) # create an .xya file point_file = tempfile.mktemp(".xya") fd = open(point_file,'w') fd.write("# demo \n 0.0, 0.6,2.,4 \n 0.0, 0.9,4,8 \n 0.0,0.1,4.,8 \n 0.4,1.0,4.,8 \n") fd.close() interp.interpolate_xya(point_file) answer = [[0.08, 0.08], [0.02, 0.02], [0.14, 0.14], [.08,.08]] #print "answer",answer assert allclose(interp.interpolated_quantity,answer) # create an output .xya file point_file_out = tempfile.mktemp(".xya") interp.write_depth_xya(point_file_out) #check the output file xya_dict = load_xya_file(point_file_out) assert allclose(interp.point_coordinates, xya_dict['pointlist']) assert allclose(interp.interpolated_quantity, xya_dict['pointattributelist'] ) title = xya_dict['title'].split('\n') #print "title",title string_list = title[0].split(',') # assume a title has only one line time_list = [] # Try another quantity interp = Interpolate_sww(sww.filename, 'stage') interp.interpolate_xya(point_file) answer = [[0.08, 0.08], [0.02, 0.02], [0.14, 0.14], [-.32,-.32]] #print "answer",answer assert allclose(interp.interpolated_quantity,answer) # look at error catching try: interp = Interpolate_sww(sww.filename, 'funky!') except KeyError: pass else: self.failUnless(0==1, 'bad key did not raise an error!') # look at error catching try: interp = Interpolate_sww(sww.filename, 'z') except KeyError: pass else: self.failUnless(0==1, 'bad key did not raise an error!') #Cleanup os.remove(sww.filename) os.remove(point_file_out) os.remove(point_file) #------------------------------------------------------------- if __name__ == "__main__": suite = unittest.makeSuite(dataTestCase,'test') runner = unittest.TextTestRunner() runner.run(suite)