#!/usr/bin/env python # import unittest from Numeric import zeros, array, allclose, Float from util import mean from data_manager import * from shallow_water import * from config import epsilon class dataTestCase(unittest.TestCase): def setUp(self): import time from mesh_factory import rectangular #Create basic mesh points, vertices, boundary = rectangular(2, 2) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.default_order=2 #Set some field values domain.set_quantity('elevation', lambda x,y: -x) domain.set_quantity('friction', 0.03) ###################### # Boundary conditions B = Transmissive_boundary(domain) domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B}) ###################### #Initial condition - with jumps bed = domain.quantities['elevation'].vertex_values level = zeros(bed.shape, Float) h = 0.3 for i in range(level.shape[0]): if i % 2 == 0: level[i,:] = bed[i,:] + h else: level[i,:] = bed[i,:] domain.set_quantity('level', level) domain.distribute_to_vertices_and_edges() self.domain = domain C = domain.get_vertex_coordinates() self.X = C[:,0:6:2].copy() self.Y = C[:,1:6:2].copy() self.F = bed def tearDown(self): pass # def test_xya(self): # import os # from Numeric import concatenate # import time, os # from Numeric import array, zeros, allclose, Float, concatenate # domain = self.domain # domain.filename = 'datatest' + str(time.time()) # domain.format = 'xya' # domain.smooth = True # xya = get_dataobject(self.domain) # xya.store_all() # #Read back # file = open(xya.filename) # lFile = file.read().split('\n') # lFile = lFile[:-1] # file.close() # os.remove(xya.filename) # #Check contents # if domain.smooth: # self.failUnless(lFile[0] == '9 3 # [attributes]') # else: # self.failUnless(lFile[0] == '24 3 # [attributes]') # #Get smoothed field values with X and Y # X,Y,F,V = domain.get_vertex_values(xy=True, value_array='field_values', # indices = (0,1), precision = Float) # Q,V = domain.get_vertex_values(xy=False, value_array='conserved_quantities', # indices = (0,), precision = Float) # for i, line in enumerate(lFile[1:]): # fields = line.split() # assert len(fields) == 5 # assert allclose(float(fields[0]), X[i]) # assert allclose(float(fields[1]), Y[i]) # assert allclose(float(fields[2]), F[i,0]) # assert allclose(float(fields[3]), Q[i,0]) # assert allclose(float(fields[4]), F[i,1]) def test_sww_constant(self): """Test that constant sww information can be written correctly (non smooth) """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile self.domain.filename = 'datatest' + str(time.time()) self.domain.format = 'sww' self.domain.smooth = False sww = get_dataobject(self.domain) sww.store_connectivity() #Check contents #Get NetCDF fid = NetCDFFile(sww.filename, 'r') #Open existing file for append # Get the variables x = fid.variables['x'] y = fid.variables['y'] z = fid.variables['z'] volumes = fid.variables['volumes'] assert allclose (x[:], self.X.flat) assert allclose (y[:], self.Y.flat) assert allclose (z[:], self.F.flat) V = volumes P = len(self.domain) for k in range(P): assert V[k, 0] == 3*k assert V[k, 1] == 3*k+1 assert V[k, 2] == 3*k+2 fid.close() #Cleanup os.remove(sww.filename) def test_sww_constant_smooth(self): """Test that constant sww information can be written correctly (non smooth) """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile self.domain.filename = 'datatest' + str(time.time()) self.domain.format = 'sww' self.domain.smooth = True sww = get_dataobject(self.domain) sww.store_connectivity() #Check contents #Get NetCDF fid = NetCDFFile(sww.filename, 'r') #Open existing file for append # Get the variables x = fid.variables['x'] y = fid.variables['y'] z = fid.variables['z'] volumes = fid.variables['volumes'] X = x[:] Y = y[:] assert allclose([X[0], Y[0]], array([0.0, 0.0])) assert allclose([X[1], Y[1]], array([0.0, 0.5])) assert allclose([X[2], Y[2]], array([0.0, 1.0])) assert allclose([X[4], Y[4]], array([0.5, 0.5])) assert allclose([X[7], Y[7]], array([1.0, 0.5])) Z = z[:] assert Z[4] == -0.5 V = volumes assert V[2,0] == 4 assert V[2,1] == 5 assert V[2,2] == 1 assert V[4,0] == 6 assert V[4,1] == 7 assert V[4,2] == 3 fid.close() #Cleanup os.remove(sww.filename) def test_sww_variable(self): """Test that sww information can be written correctly """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile self.domain.filename = 'datatest' + str(time.time()) self.domain.format = 'sww' self.domain.smooth = True self.domain.reduction = mean sww = get_dataobject(self.domain) sww.store_connectivity() sww.store_timestep('level') #Check contents #Get NetCDF fid = NetCDFFile(sww.filename, 'r') #Open existing file for append # Get the variables x = fid.variables['x'] y = fid.variables['y'] z = fid.variables['z'] time = fid.variables['time'] stage = fid.variables['stage'] Q = self.domain.quantities['level'] Q0 = Q.vertex_values[:,0] Q1 = Q.vertex_values[:,1] Q2 = Q.vertex_values[:,2] A = stage[0,:] #print A[0], (Q2[0,0] + Q1[1,0])/2 assert allclose(A[0], (Q2[0] + Q1[1])/2) assert allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3) assert allclose(A[2], Q0[3]) assert allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3) #Center point assert allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\ Q0[5] + Q2[6] + Q1[7])/6) fid.close() #Cleanup os.remove(sww.filename) def test_sww_variable2(self): """Test that sww information can be written correctly multiple timesteps. Use average as reduction operator """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile self.domain.filename = 'datatest' + str(time.time()) self.domain.format = 'sww' self.domain.smooth = True self.domain.reduction = mean sww = get_dataobject(self.domain) sww.store_connectivity() sww.store_timestep('level') self.domain.evolve_to_end(finaltime = 0.01) sww.store_timestep('level') #Check contents #Get NetCDF fid = NetCDFFile(sww.filename, 'r') #Open existing file for append # Get the variables x = fid.variables['x'] y = fid.variables['y'] z = fid.variables['z'] time = fid.variables['time'] stage = fid.variables['stage'] #Check values Q = self.domain.quantities['level'] Q0 = Q.vertex_values[:,0] Q1 = Q.vertex_values[:,1] Q2 = Q.vertex_values[:,2] A = stage[1,:] assert allclose(A[0], (Q2[0] + Q1[1])/2) assert allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3) assert allclose(A[2], Q0[3]) assert allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3) #Center point assert allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\ Q0[5] + Q2[6] + Q1[7])/6) fid.close() #Cleanup os.remove(sww.filename) def test_sww_variable3(self): """Test that sww information can be written correctly multiple timesteps using a different reduction operator (min) """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile self.domain.filename = 'datatest' + str(time.time()) self.domain.format = 'sww' self.domain.smooth = True self.domain.reduction = min sww = get_dataobject(self.domain) sww.store_connectivity() sww.store_timestep('level') self.domain.evolve_to_end(finaltime = 0.01) sww.store_timestep('level') #Check contents #Get NetCDF fid = NetCDFFile(sww.filename, 'r') #Open existing file for append # Get the variables x = fid.variables['x'] y = fid.variables['y'] z = fid.variables['z'] time = fid.variables['time'] stage = fid.variables['stage'] #Check values #Check values Q = self.domain.quantities['level'] Q0 = Q.vertex_values[:,0] Q1 = Q.vertex_values[:,1] Q2 = Q.vertex_values[:,2] A = stage[1,:] assert allclose(A[0], min(Q2[0], Q1[1])) assert allclose(A[1], min(Q0[1], Q1[3], Q2[2])) assert allclose(A[2], Q0[3]) assert allclose(A[3], min(Q0[0], Q1[5], Q2[4])) #Center point assert allclose(A[4], min(Q1[0], Q2[1], Q0[2],\ Q0[5], Q2[6], Q1[7])) fid.close() #Cleanup os.remove(sww.filename) def test_sww_DSG(self): """Not a test, rather a look at the sww format """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile self.domain.filename = 'datatest' + str(time.time()) self.domain.format = 'sww' self.domain.smooth = True self.domain.reduction = mean sww = get_dataobject(self.domain) sww.store_connectivity() sww.store_timestep('level') #Check contents #Get NetCDF fid = NetCDFFile(sww.filename, 'r') # Get the variables x = fid.variables['x'] y = fid.variables['y'] z = fid.variables['z'] volumes = fid.variables['volumes'] time = fid.variables['time'] # 2D stage = fid.variables['stage'] X = x[:] Y = y[:] Z = z[:] V = volumes[:] T = time[:] S = stage[:,:] # print "****************************" # print "X ",X # print "****************************" # print "Y ",Y # print "****************************" # print "Z ",Z # print "****************************" # print "V ",V # print "****************************" # print "Time ",T # print "****************************" # print "Stage ",S # print "****************************" fid.close() #Cleanup os.remove(sww.filename) #------------------------------------------------------------- if __name__ == "__main__": suite = unittest.makeSuite(dataTestCase,'test') runner = unittest.TextTestRunner() runner.run(suite)