#!/usr/bin/env python # import unittest import copy from Numeric import zeros, array, allclose, Float from util import mean from data_manager import * from shallow_water import * from config import epsilon from coordinate_transforms.geo_reference import Geo_reference class Test_Data_Manager(unittest.TestCase): def setUp(self): import time from mesh_factory import rectangular #Create basic mesh points, vertices, boundary = rectangular(2, 2) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.default_order=2 #Set some field values domain.set_quantity('elevation', lambda x,y: -x) domain.set_quantity('friction', 0.03) ###################### # Boundary conditions B = Transmissive_boundary(domain) domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B}) ###################### #Initial condition - with jumps bed = domain.quantities['elevation'].vertex_values stage = zeros(bed.shape, Float) h = 0.3 for i in range(stage.shape[0]): if i % 2 == 0: stage[i,:] = bed[i,:] + h else: stage[i,:] = bed[i,:] domain.set_quantity('stage', stage) self.initial_stage = copy.copy(domain.quantities['stage'].vertex_values) domain.distribute_to_vertices_and_edges() self.domain = domain C = domain.get_vertex_coordinates() self.X = C[:,0:6:2].copy() self.Y = C[:,1:6:2].copy() self.F = bed def tearDown(self): pass # def test_xya(self): # import os # from Numeric import concatenate # import time, os # from Numeric import array, zeros, allclose, Float, concatenate # domain = self.domain # domain.filename = 'datatest' + str(time.time()) # domain.format = 'xya' # domain.smooth = True # xya = get_dataobject(self.domain) # xya.store_all() # #Read back # file = open(xya.filename) # lFile = file.read().split('\n') # lFile = lFile[:-1] # file.close() # os.remove(xya.filename) # #Check contents # if domain.smooth: # self.failUnless(lFile[0] == '9 3 # [attributes]') # else: # self.failUnless(lFile[0] == '24 3 # [attributes]') # #Get smoothed field values with X and Y # X,Y,F,V = domain.get_vertex_values(xy=True, value_array='field_values', # indices = (0,1), precision = Float) # Q,V = domain.get_vertex_values(xy=False, value_array='conserved_quantities', # indices = (0,), precision = Float) # for i, line in enumerate(lFile[1:]): # fields = line.split() # assert len(fields) == 5 # assert allclose(float(fields[0]), X[i]) # assert allclose(float(fields[1]), Y[i]) # assert allclose(float(fields[2]), F[i,0]) # assert allclose(float(fields[3]), Q[i,0]) # assert allclose(float(fields[4]), F[i,1]) def test_sww_constant(self): """Test that constant sww information can be written correctly (non smooth) """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile self.domain.filename = 'datatest' + str(id(self)) self.domain.format = 'sww' self.domain.smooth = False sww = get_dataobject(self.domain) sww.store_connectivity() #Check contents #Get NetCDF fid = NetCDFFile(sww.filename, 'r') #Open existing file for append # Get the variables x = fid.variables['x'] y = fid.variables['y'] z = fid.variables['elevation'] volumes = fid.variables['volumes'] assert allclose (x[:], self.X.flat) assert allclose (y[:], self.Y.flat) assert allclose (z[:], self.F.flat) V = volumes P = len(self.domain) for k in range(P): assert V[k, 0] == 3*k assert V[k, 1] == 3*k+1 assert V[k, 2] == 3*k+2 fid.close() #Cleanup os.remove(sww.filename) def test_sww_constant_smooth(self): """Test that constant sww information can be written correctly (non smooth) """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile self.domain.filename = 'datatest' + str(id(self)) self.domain.format = 'sww' self.domain.smooth = True sww = get_dataobject(self.domain) sww.store_connectivity() #Check contents #Get NetCDF fid = NetCDFFile(sww.filename, 'r') #Open existing file for append # Get the variables x = fid.variables['x'] y = fid.variables['y'] z = fid.variables['elevation'] volumes = fid.variables['volumes'] X = x[:] Y = y[:] assert allclose([X[0], Y[0]], array([0.0, 0.0])) assert allclose([X[1], Y[1]], array([0.0, 0.5])) assert allclose([X[2], Y[2]], array([0.0, 1.0])) assert allclose([X[4], Y[4]], array([0.5, 0.5])) assert allclose([X[7], Y[7]], array([1.0, 0.5])) Z = z[:] assert Z[4] == -0.5 V = volumes assert V[2,0] == 4 assert V[2,1] == 5 assert V[2,2] == 1 assert V[4,0] == 6 assert V[4,1] == 7 assert V[4,2] == 3 fid.close() #Cleanup os.remove(sww.filename) def test_sww_variable(self): """Test that sww information can be written correctly """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile self.domain.filename = 'datatest' + str(id(self)) self.domain.format = 'sww' self.domain.smooth = True self.domain.reduction = mean sww = get_dataobject(self.domain) sww.store_connectivity() sww.store_timestep('stage') #Check contents #Get NetCDF fid = NetCDFFile(sww.filename, 'r') #Open existing file for append # Get the variables x = fid.variables['x'] y = fid.variables['y'] z = fid.variables['elevation'] time = fid.variables['time'] stage = fid.variables['stage'] Q = self.domain.quantities['stage'] Q0 = Q.vertex_values[:,0] Q1 = Q.vertex_values[:,1] Q2 = Q.vertex_values[:,2] A = stage[0,:] #print A[0], (Q2[0,0] + Q1[1,0])/2 assert allclose(A[0], (Q2[0] + Q1[1])/2) assert allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3) assert allclose(A[2], Q0[3]) assert allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3) #Center point assert allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\ Q0[5] + Q2[6] + Q1[7])/6) fid.close() #Cleanup os.remove(sww.filename) def test_sww_variable2(self): """Test that sww information can be written correctly multiple timesteps. Use average as reduction operator """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile self.domain.filename = 'datatest' + str(id(self)) self.domain.format = 'sww' self.domain.smooth = True self.domain.reduction = mean sww = get_dataobject(self.domain) sww.store_connectivity() sww.store_timestep('stage') self.domain.evolve_to_end(finaltime = 0.01) sww.store_timestep('stage') #Check contents #Get NetCDF fid = NetCDFFile(sww.filename, 'r') #Open existing file for append # Get the variables x = fid.variables['x'] y = fid.variables['y'] z = fid.variables['elevation'] time = fid.variables['time'] stage = fid.variables['stage'] #Check values Q = self.domain.quantities['stage'] Q0 = Q.vertex_values[:,0] Q1 = Q.vertex_values[:,1] Q2 = Q.vertex_values[:,2] A = stage[1,:] assert allclose(A[0], (Q2[0] + Q1[1])/2) assert allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3) assert allclose(A[2], Q0[3]) assert allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3) #Center point assert allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\ Q0[5] + Q2[6] + Q1[7])/6) fid.close() #Cleanup os.remove(sww.filename) def test_sww_variable3(self): """Test that sww information can be written correctly multiple timesteps using a different reduction operator (min) """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile self.domain.filename = 'datatest' + str(id(self)) self.domain.format = 'sww' self.domain.smooth = True self.domain.reduction = min sww = get_dataobject(self.domain) sww.store_connectivity() sww.store_timestep('stage') self.domain.evolve_to_end(finaltime = 0.01) sww.store_timestep('stage') #Check contents #Get NetCDF fid = NetCDFFile(sww.filename, 'r') # Get the variables x = fid.variables['x'] y = fid.variables['y'] z = fid.variables['elevation'] time = fid.variables['time'] stage = fid.variables['stage'] #Check values Q = self.domain.quantities['stage'] Q0 = Q.vertex_values[:,0] Q1 = Q.vertex_values[:,1] Q2 = Q.vertex_values[:,2] A = stage[1,:] assert allclose(A[0], min(Q2[0], Q1[1])) assert allclose(A[1], min(Q0[1], Q1[3], Q2[2])) assert allclose(A[2], Q0[3]) assert allclose(A[3], min(Q0[0], Q1[5], Q2[4])) #Center point assert allclose(A[4], min(Q1[0], Q2[1], Q0[2],\ Q0[5], Q2[6], Q1[7])) fid.close() #Cleanup os.remove(sww.filename) def test_sync(self): """Test info stored at each timestep is as expected (incl initial condition) """ import time, os, config from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile self.domain.filename = 'synctest' self.domain.format = 'sww' self.domain.smooth = False self.domain.store = True self.domain.beta_h = 0 #Evolution for t in self.domain.evolve(yieldstep = 1.0, finaltime = 4.0): stage = self.domain.quantities['stage'].vertex_values #Get NetCDF fid = NetCDFFile(self.domain.writer.filename, 'r') stage_file = fid.variables['stage'] if t == 0.0: assert allclose(stage, self.initial_stage) assert allclose(stage_file[:], stage.flat) else: assert not allclose(stage, self.initial_stage) assert not allclose(stage_file[:], stage.flat) fid.close() os.remove(self.domain.writer.filename) def test_sww_DSG(self): """Not a test, rather a look at the sww format """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile self.domain.filename = 'datatest' + str(id(self)) self.domain.format = 'sww' self.domain.smooth = True self.domain.reduction = mean sww = get_dataobject(self.domain) sww.store_connectivity() sww.store_timestep('stage') #Check contents #Get NetCDF fid = NetCDFFile(sww.filename, 'r') # Get the variables x = fid.variables['x'] y = fid.variables['y'] z = fid.variables['elevation'] volumes = fid.variables['volumes'] time = fid.variables['time'] # 2D stage = fid.variables['stage'] X = x[:] Y = y[:] Z = z[:] V = volumes[:] T = time[:] S = stage[:,:] # print "****************************" # print "X ",X # print "****************************" # print "Y ",Y # print "****************************" # print "Z ",Z # print "****************************" # print "V ",V # print "****************************" # print "Time ",T # print "****************************" # print "Stage ",S # print "****************************" fid.close() #Cleanup os.remove(sww.filename) def test_dem2pts(self): """Test conversion from dem in ascii format to native NetCDF xya format """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile #Write test asc file root = 'demtest' filename = root+'.asc' fid = open(filename, 'w') fid.write("""ncols 5 nrows 6 xllcorner 2000.5 yllcorner 3000.5 cellsize 25 NODATA_value -9999 """) #Create linear function ref_points = [] ref_elevation = [] for i in range(6): y = (6-i)*25.0 for j in range(5): x = j*25.0 z = x+2*y ref_points.append( [x,y] ) ref_elevation.append(z) fid.write('%f ' %z) fid.write('\n') fid.close() #Write prj file with metadata metafilename = root+'.prj' fid = open(metafilename, 'w') fid.write("""Projection UTM Zone 56 Datum WGS84 Zunits NO Units METERS Spheroid WGS84 Xshift 0.0000000000 Yshift 10000000.0000000000 Parameters """) fid.close() #Convert to NetCDF pts convert_dem_from_ascii2netcdf(root) dem2pts(root) #Check contents #Get NetCDF fid = NetCDFFile(root+'.pts', 'r') # Get the variables #print fid.variables.keys() points = fid.variables['points'] elevation = fid.variables['elevation'] #Check values #print points[:] #print ref_points assert allclose(points, ref_points) #print attributes[:] #print ref_elevation assert allclose(elevation, ref_elevation) #Cleanup fid.close() os.remove(root + '.pts') os.remove(root + '.dem') os.remove(root + '.asc') os.remove(root + '.prj') def test_sww2asc_elevation(self): """Test that sww information can be converted correctly to asc/prj format readable by e.g. ArcView """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile #Setup self.domain.filename = 'datatest' prjfile = self.domain.filename + '_elevation.prj' ascfile = self.domain.filename + '_elevation.asc' swwfile = self.domain.filename + '.sww' self.domain.set_datadir('.') self.domain.format = 'sww' self.domain.smooth = True self.domain.set_quantity('elevation', lambda x,y: -x-y) self.domain.geo_reference = Geo_reference(56,308500,6189000) sww = get_dataobject(self.domain) sww.store_connectivity() sww.store_timestep('stage') self.domain.evolve_to_end(finaltime = 0.01) sww.store_timestep('stage') cellsize = 0.25 #Check contents #Get NetCDF fid = NetCDFFile(sww.filename, 'r') # Get the variables x = fid.variables['x'][:] y = fid.variables['y'][:] z = fid.variables['elevation'][:] time = fid.variables['time'][:] stage = fid.variables['stage'][:] #Export to ascii/prj files sww2asc(self.domain.filename, quantity = 'elevation', cellsize = cellsize, verbose = False) #Check prj (meta data) prjid = open(prjfile) lines = prjid.readlines() prjid.close() L = lines[0].strip().split() assert L[0].strip().lower() == 'projection' assert L[1].strip().lower() == 'utm' L = lines[1].strip().split() assert L[0].strip().lower() == 'zone' assert L[1].strip().lower() == '56' L = lines[2].strip().split() assert L[0].strip().lower() == 'datum' assert L[1].strip().lower() == 'wgs84' L = lines[3].strip().split() assert L[0].strip().lower() == 'zunits' assert L[1].strip().lower() == 'no' L = lines[4].strip().split() assert L[0].strip().lower() == 'units' assert L[1].strip().lower() == 'meters' L = lines[5].strip().split() assert L[0].strip().lower() == 'spheroid' assert L[1].strip().lower() == 'wgs84' L = lines[6].strip().split() assert L[0].strip().lower() == 'xshift' assert L[1].strip().lower() == '500000' L = lines[7].strip().split() assert L[0].strip().lower() == 'yshift' assert L[1].strip().lower() == '10000000' L = lines[8].strip().split() assert L[0].strip().lower() == 'parameters' #Check asc file ascid = open(ascfile) lines = ascid.readlines() ascid.close() L = lines[0].strip().split() assert L[0].strip().lower() == 'ncols' assert L[1].strip().lower() == '5' L = lines[1].strip().split() assert L[0].strip().lower() == 'nrows' assert L[1].strip().lower() == '5' L = lines[2].strip().split() assert L[0].strip().lower() == 'xllcorner' assert allclose(float(L[1].strip().lower()), 308500) L = lines[3].strip().split() assert L[0].strip().lower() == 'yllcorner' assert allclose(float(L[1].strip().lower()), 6189000) L = lines[4].strip().split() assert L[0].strip().lower() == 'cellsize' assert allclose(float(L[1].strip().lower()), cellsize) L = lines[5].strip().split() assert L[0].strip() == 'NODATA_value' assert L[1].strip().lower() == '-9999' #Check grid values for j in range(5): L = lines[6+j].strip().split() y = (4-j) * cellsize for i in range(5): assert allclose(float(L[i]), -i*cellsize - y) fid.close() #Cleanup os.remove(prjfile) os.remove(ascfile) os.remove(swwfile) def test_sww2asc_stage_reduction(self): """Test that sww information can be converted correctly to asc/prj format readable by e.g. ArcView This tests the reduction of quantity stage using min """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile #Setup self.domain.filename = 'datatest' prjfile = self.domain.filename + '_stage.prj' ascfile = self.domain.filename + '_stage.asc' swwfile = self.domain.filename + '.sww' self.domain.set_datadir('.') self.domain.format = 'sww' self.domain.smooth = True self.domain.set_quantity('elevation', lambda x,y: -x-y) self.domain.geo_reference = Geo_reference(56,308500,6189000) sww = get_dataobject(self.domain) sww.store_connectivity() sww.store_timestep('stage') self.domain.evolve_to_end(finaltime = 0.01) sww.store_timestep('stage') cellsize = 0.25 #Check contents #Get NetCDF fid = NetCDFFile(sww.filename, 'r') # Get the variables x = fid.variables['x'][:] y = fid.variables['y'][:] z = fid.variables['elevation'][:] time = fid.variables['time'][:] stage = fid.variables['stage'][:] #Export to ascii/prj files sww2asc(self.domain.filename, quantity = 'stage', cellsize = cellsize, reduction = min) #Check asc file ascid = open(ascfile) lines = ascid.readlines() ascid.close() L = lines[0].strip().split() assert L[0].strip().lower() == 'ncols' assert L[1].strip().lower() == '5' L = lines[1].strip().split() assert L[0].strip().lower() == 'nrows' assert L[1].strip().lower() == '5' L = lines[2].strip().split() assert L[0].strip().lower() == 'xllcorner' assert allclose(float(L[1].strip().lower()), 308500) L = lines[3].strip().split() assert L[0].strip().lower() == 'yllcorner' assert allclose(float(L[1].strip().lower()), 6189000) L = lines[4].strip().split() assert L[0].strip().lower() == 'cellsize' assert allclose(float(L[1].strip().lower()), cellsize) L = lines[5].strip().split() assert L[0].strip() == 'NODATA_value' assert L[1].strip().lower() == '-9999' #Check grid values (where applicable) for j in range(5): if j%2 == 0: L = lines[6+j].strip().split() jj = 4-j for i in range(5): if i%2 == 0: index = jj/2 + i/2*3 val0 = stage[0,index] val1 = stage[1,index] #print i, j, index, ':', L[i], val0, val1 assert allclose(float(L[i]), min(val0, val1)) fid.close() #Cleanup os.remove(prjfile) os.remove(ascfile) #os.remove(swwfile) def test_sww2asc_missing_points(self): """Test that sww information can be converted correctly to asc/prj format readable by e.g. ArcView This test includes the writing of missing values """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile #Setup mesh not coinciding with rectangle. #This will cause missing values to occur in gridded data points = [ [1.0, 1.0], [0.5, 0.5], [1.0, 0.5], [0.0, 0.0], [0.5, 0.0], [1.0, 0.0]] vertices = [ [4,1,3], [5,2,4], [1,4,2], [2,0,1]] #Create shallow water domain domain = Domain(points, vertices) domain.default_order=2 #Set some field values domain.set_quantity('elevation', lambda x,y: -x-y) domain.set_quantity('friction', 0.03) ###################### # Boundary conditions B = Transmissive_boundary(domain) domain.set_boundary( {'exterior': B} ) ###################### #Initial condition - with jumps bed = domain.quantities['elevation'].vertex_values stage = zeros(bed.shape, Float) h = 0.3 for i in range(stage.shape[0]): if i % 2 == 0: stage[i,:] = bed[i,:] + h else: stage[i,:] = bed[i,:] domain.set_quantity('stage', stage) domain.distribute_to_vertices_and_edges() domain.filename = 'datatest' prjfile = domain.filename + '_elevation.prj' ascfile = domain.filename + '_elevation.asc' swwfile = domain.filename + '.sww' domain.set_datadir('.') domain.format = 'sww' domain.smooth = True domain.geo_reference = Geo_reference(56,308500,6189000) sww = get_dataobject(domain) sww.store_connectivity() sww.store_timestep('stage') cellsize = 0.25 #Check contents #Get NetCDF fid = NetCDFFile(swwfile, 'r') # Get the variables x = fid.variables['x'][:] y = fid.variables['y'][:] z = fid.variables['elevation'][:] time = fid.variables['time'][:] try: geo_reference = Geo_reference(NetCDFObject=fid) except AttributeError, e: geo_reference = Geo_reference(DEFAULT_ZONE,0,0) #Export to ascii/prj files sww2asc(domain.filename, quantity = 'elevation', cellsize = cellsize, verbose = False) #Check asc file ascid = open(ascfile) lines = ascid.readlines() ascid.close() L = lines[0].strip().split() assert L[0].strip().lower() == 'ncols' assert L[1].strip().lower() == '5' L = lines[1].strip().split() assert L[0].strip().lower() == 'nrows' assert L[1].strip().lower() == '5' L = lines[2].strip().split() assert L[0].strip().lower() == 'xllcorner' assert allclose(float(L[1].strip().lower()), 308500) L = lines[3].strip().split() assert L[0].strip().lower() == 'yllcorner' assert allclose(float(L[1].strip().lower()), 6189000) L = lines[4].strip().split() assert L[0].strip().lower() == 'cellsize' assert allclose(float(L[1].strip().lower()), cellsize) L = lines[5].strip().split() assert L[0].strip() == 'NODATA_value' assert L[1].strip().lower() == '-9999' #Check grid values for j in range(5): L = lines[6+j].strip().split() y = (4-j) * cellsize for i in range(5): if i+j >= 4: assert allclose(float(L[i]), -i*cellsize - y) else: #Missing values assert allclose(float(L[i]), -9999) fid.close() #Cleanup os.remove(prjfile) os.remove(ascfile) os.remove(swwfile) def test_ferret2sww(self): """Test that georeferencing etc works when converting from ferret format (lat/lon) to sww format (UTM) """ from Scientific.IO.NetCDF import NetCDFFile #The test file has # LON = 150.66667, 150.83334, 151, 151.16667 # LAT = -34.5, -34.33333, -34.16667, -34 ; # TIME = 0, 0.1, 0.6, 1.1, 1.6, 2.1 ; # # First value (index=0) in small_ha.nc is 0.3400644 cm, # Fourth value (index==3) is -6.50198 cm from coordinate_transforms.redfearn import redfearn fid = NetCDFFile('small_ha.nc') first_value = fid.variables['HA'][:][0,0,0] fourth_value = fid.variables['HA'][:][0,0,3] #Call conversion (with zero origin) ferret2sww('small', verbose=False, origin = (56, 0, 0)) #Work out the UTM coordinates for first point zone, e, n = redfearn(-34.5, 150.66667) #print zone, e, n #Read output file 'small.sww' fid = NetCDFFile('small.sww') x = fid.variables['x'][:] y = fid.variables['y'][:] #Check that first coordinate is correctly represented assert allclose(x[0], e) assert allclose(y[0], n) #Check first value stage = fid.variables['stage'][:] xmomentum = fid.variables['xmomentum'][:] ymomentum = fid.variables['ymomentum'][:] #print ymomentum assert allclose(stage[0,0], first_value/100) #Meters #Check fourth value assert allclose(stage[0,3], fourth_value/100) #Meters fid.close() #Cleanup import os os.remove('small.sww') def test_ferret2sww_2(self): """Test that georeferencing etc works when converting from ferret format (lat/lon) to sww format (UTM) """ from Scientific.IO.NetCDF import NetCDFFile #The test file has # LON = 150.66667, 150.83334, 151, 151.16667 # LAT = -34.5, -34.33333, -34.16667, -34 ; # TIME = 0, 0.1, 0.6, 1.1, 1.6, 2.1 ; # # First value (index=0) in small_ha.nc is 0.3400644 cm, # Fourth value (index==3) is -6.50198 cm from coordinate_transforms.redfearn import redfearn fid = NetCDFFile('small_ha.nc') #Pick a coordinate and a value time_index = 1 lat_index = 0 lon_index = 2 test_value = fid.variables['HA'][:][time_index, lat_index, lon_index] test_time = fid.variables['TIME'][:][time_index] test_lat = fid.variables['LAT'][:][lat_index] test_lon = fid.variables['LON'][:][lon_index] linear_point_index = lat_index*4 + lon_index fid.close() #Call conversion (with zero origin) ferret2sww('small', verbose=False, origin = (56, 0, 0)) #Work out the UTM coordinates for test point zone, e, n = redfearn(test_lat, test_lon) #Read output file 'small.sww' fid = NetCDFFile('small.sww') x = fid.variables['x'][:] y = fid.variables['y'][:] #Check that test coordinate is correctly represented assert allclose(x[linear_point_index], e) assert allclose(y[linear_point_index], n) #Check test value stage = fid.variables['stage'][:] assert allclose(stage[time_index, linear_point_index], test_value/100) fid.close() #Cleanup import os os.remove('small.sww') def test_ferret2sww3(self): """ """ from Scientific.IO.NetCDF import NetCDFFile #The test file has # LON = 150.66667, 150.83334, 151, 151.16667 # LAT = -34.5, -34.33333, -34.16667, -34 ; # ELEVATION = [-1 -2 -3 -4 # -5 -6 -7 -8 # ... # ... -16] # where the top left corner is -1m, # and the ll corner is -13.0m # # First value (index=0) in small_ha.nc is 0.3400644 cm, # Fourth value (index==3) is -6.50198 cm from coordinate_transforms.redfearn import redfearn import os fid1 = NetCDFFile('test_ha.nc','w') fid2 = NetCDFFile('test_ua.nc','w') fid3 = NetCDFFile('test_va.nc','w') fid4 = NetCDFFile('test_e.nc','w') h1_list = [150.66667,150.83334,151.] h2_list = [-34.5,-34.33333] long_name = 'LON' lat_name = 'LAT' nx = 3 ny = 2 for fid in [fid1,fid2,fid3]: fid.createDimension(long_name,nx) fid.createVariable(long_name,'d',(long_name,)) fid.variables[long_name].point_spacing='uneven' fid.variables[long_name].units='degrees_east' fid.variables[long_name].assignValue(h1_list) fid.createDimension(lat_name,ny) fid.createVariable(lat_name,'d',(lat_name,)) fid.variables[lat_name].point_spacing='uneven' fid.variables[lat_name].units='degrees_north' fid.variables[lat_name].assignValue(h2_list) fid.createDimension('TIME',2) fid.createVariable('TIME','d',('TIME',)) fid.variables['TIME'].point_spacing='uneven' fid.variables['TIME'].units='seconds' fid.variables['TIME'].assignValue([0.,1.]) if fid == fid3: break for fid in [fid4]: fid.createDimension(long_name,nx) fid.createVariable(long_name,'d',(long_name,)) fid.variables[long_name].point_spacing='uneven' fid.variables[long_name].units='degrees_east' fid.variables[long_name].assignValue(h1_list) fid.createDimension(lat_name,ny) fid.createVariable(lat_name,'d',(lat_name,)) fid.variables[lat_name].point_spacing='uneven' fid.variables[lat_name].units='degrees_north' fid.variables[lat_name].assignValue(h2_list) name = {} name[fid1]='HA' name[fid2]='UA' name[fid3]='VA' name[fid4]='ELEVATION' units = {} units[fid1]='cm' units[fid2]='cm/s' units[fid3]='cm/s' units[fid4]='m' values = {} values[fid1]=[[[5., 10.,15.], [13.,18.,23.]],[[50.,100.,150.],[130.,180.,230.]]] values[fid2]=[[[1., 2.,3.], [4.,5.,6.]],[[7.,8.,9.],[10.,11.,12.]]] values[fid3]=[[[13., 12.,11.], [10.,9.,8.]],[[7.,6.,5.],[4.,3.,2.]]] values[fid4]=[[-3000,-3100,-3200],[-4000,-5000,-6000]] for fid in [fid1,fid2,fid3]: fid.createVariable(name[fid],'d',('TIME',lat_name,long_name)) fid.variables[name[fid]].point_spacing='uneven' fid.variables[name[fid]].units=units[fid] fid.variables[name[fid]].assignValue(values[fid]) fid.variables[name[fid]].missing_value = -99999999. if fid == fid3: break for fid in [fid4]: fid.createVariable(name[fid],'d',(lat_name,long_name)) fid.variables[name[fid]].point_spacing='uneven' fid.variables[name[fid]].units=units[fid] fid.variables[name[fid]].assignValue(values[fid]) fid.variables[name[fid]].missing_value = -99999999. fid1.sync(); fid1.close() fid2.sync(); fid2.close() fid3.sync(); fid3.close() fid4.sync(); fid4.close() fid1 = NetCDFFile('test_ha.nc','r') fid2 = NetCDFFile('test_e.nc','r') fid3 = NetCDFFile('test_va.nc','r') first_amp = fid1.variables['HA'][:][0,0,0] third_amp = fid1.variables['HA'][:][0,0,2] first_elevation = fid2.variables['ELEVATION'][0,0] third_elevation= fid2.variables['ELEVATION'][:][0,2] first_speed = fid3.variables['VA'][0,0,0] third_speed = fid3.variables['VA'][:][0,0,2] fid1.close() fid2.close() fid3.close() #Call conversion (with zero origin) ferret2sww('test', verbose=False, origin = (56, 0, 0)) os.remove('test_va.nc') os.remove('test_ua.nc') os.remove('test_ha.nc') os.remove('test_e.nc') #Read output file 'test.sww' fid = NetCDFFile('test.sww') #Check first value elevation = fid.variables['elevation'][:] stage = fid.variables['stage'][:] xmomentum = fid.variables['xmomentum'][:] ymomentum = fid.variables['ymomentum'][:] #print ymomentum first_height = first_amp/100 - first_elevation third_height = third_amp/100 - third_elevation first_momentum=first_speed*first_height/100 third_momentum=third_speed*third_height/100 assert allclose(ymomentum[0][0],first_momentum) #Meters assert allclose(ymomentum[0][2],third_momentum) #Meters fid.close() #Cleanup os.remove('test.sww') def test_sww_extent(self): """Not a test, rather a look at the sww format """ import time, os from Numeric import array, zeros, allclose, Float, concatenate from Scientific.IO.NetCDF import NetCDFFile self.domain.filename = 'datatest' + str(id(self)) self.domain.format = 'sww' self.domain.smooth = True self.domain.reduction = mean self.domain.set_datadir('.') sww = get_dataobject(self.domain) sww.store_connectivity() sww.store_timestep('stage') self.domain.time = 2. #Modify stage at second timestep stage = self.domain.quantities['stage'].vertex_values self.domain.set_quantity('stage', stage/2) sww.store_timestep('stage') file_and_extension_name = self.domain.filename + ".sww" #print "file_and_extension_name",file_and_extension_name [xmin, xmax, ymin, ymax, stagemin, stagemax] = \ extent_sww(file_and_extension_name ) assert allclose(xmin, 0.0) assert allclose(xmax, 1.0) assert allclose(ymin, 0.0) assert allclose(ymax, 1.0) assert allclose(stagemin, -0.85) assert allclose(stagemax, 0.15) #Cleanup os.remove(sww.filename) def test_ferret2sww_nz_origin(self): from Scientific.IO.NetCDF import NetCDFFile from coordinate_transforms.redfearn import redfearn #Call conversion (with nonzero origin) ferret2sww('small', verbose=False, origin = (56, 100000, 200000)) #Work out the UTM coordinates for first point zone, e, n = redfearn(-34.5, 150.66667) #Read output file 'small.sww' fid = NetCDFFile('small.sww', 'r') x = fid.variables['x'][:] y = fid.variables['y'][:] #Check that first coordinate is correctly represented assert allclose(x[0], e-100000) assert allclose(y[0], n-200000) fid.close() #Cleanup import os os.remove('small.sww') def test_sww2domain(self): ################################################ #Create a test domain, and evolve and save it. ################################################ from mesh_factory import rectangular from shallow_water import Domain, Reflective_boundary, Dirichlet_boundary,\ Constant_height, Time_boundary, Transmissive_boundary from Numeric import array #Create basic mesh yiel=0.01 points, vertices, boundary = rectangular(10,10) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.geo_reference = Geo_reference(56,11,11) domain.smooth = False domain.visualise = False domain.store = True domain.filename = 'bedslope' domain.default_order=2 #Bed-slope and friction domain.set_quantity('elevation', lambda x,y: -x/3) domain.set_quantity('friction', 0.1) # Boundary conditions from math import sin, pi Br = Reflective_boundary(domain) Bt = Transmissive_boundary(domain) Bd = Dirichlet_boundary([0.2,0.,0.]) Bw = Time_boundary(domain=domain, f=lambda t: [(0.1*sin(t*2*pi)), 0.0, 0.0]) #domain.set_boundary({'left': Bd, 'right': Br, 'top': Br, 'bottom': Br}) domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd}) domain.quantities_to_be_stored.extend(['xmomentum','ymomentum']) #Initial condition h = 0.05 elevation = domain.quantities['elevation'].vertex_values domain.set_quantity('stage', elevation + h) #elevation = domain.get_quantity('elevation') #domain.set_quantity('stage', elevation + h) domain.check_integrity() #Evolution for t in domain.evolve(yieldstep = yiel, finaltime = 0.05): # domain.write_time() pass ########################################## #Import the example's file as a new domain ########################################## from data_manager import sww2domain from Numeric import allclose import os filename = domain.datadir+os.sep+domain.filename+'.sww' domain2 = sww2domain(filename,None,fail_if_NaN=False,verbose = False) #points, vertices, boundary = rectangular(15,15) #domain2.boundary = boundary ################### ##NOW TEST IT!!! ################### bits = ['vertex_coordinates'] for quantity in ['elevation']+domain.quantities_to_be_stored: bits.append('quantities["%s"].get_integral()'%quantity) bits.append('get_quantity("%s")'%quantity) for bit in bits: #print 'testing that domain.'+bit+' has been restored' #print bit #print 'done' assert allclose(eval('domain.'+bit),eval('domain2.'+bit)) ###################################### #Now evolve them both, just to be sure ######################################x visualise = False #visualise = True domain.visualise = visualise domain.time = 0. from time import sleep final = .1 domain.set_quantity('friction', 0.1) domain.store = False domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd}) for t in domain.evolve(yieldstep = yiel, finaltime = final): if visualise: sleep(1.) #domain.write_time() pass final = final - (domain2.starttime-domain.starttime) #BUT since domain1 gets time hacked back to 0: final = final + (domain2.starttime-domain.starttime) domain2.smooth = False domain2.visualise = visualise domain2.store = False domain2.default_order=2 domain2.set_quantity('friction', 0.1) #Bed-slope and friction # Boundary conditions Bd2=Dirichlet_boundary([0.2,0.,0.]) domain2.boundary = domain.boundary #print 'domain2.boundary' #print domain2.boundary domain2.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd}) #domain2.set_boundary({'exterior': Bd}) domain2.check_integrity() for t in domain2.evolve(yieldstep = yiel, finaltime = final): if visualise: sleep(1.) #domain2.write_time() pass ################### ##NOW TEST IT!!! ################## bits = [ 'vertex_coordinates'] for quantity in ['elevation','xmomentum','ymomentum']:#+domain.quantities_to_be_stored: bits.append('quantities["%s"].get_integral()'%quantity) bits.append('get_quantity("%s")'%quantity) for bit in bits: #print bit assert allclose(eval('domain.'+bit),eval('domain2.'+bit)) def test_sww2domain2(self): ################################################################## #Same as previous test, but this checks how NaNs are handled. ################################################################## from mesh_factory import rectangular from shallow_water import Domain, Reflective_boundary, Dirichlet_boundary,\ Constant_height, Time_boundary, Transmissive_boundary from Numeric import array #Create basic mesh points, vertices, boundary = rectangular(2,2) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.smooth = False domain.visualise = False domain.store = True domain.filename = 'bedslope' domain.default_order=2 domain.quantities_to_be_stored=['stage'] domain.set_quantity('elevation', lambda x,y: -x/3) domain.set_quantity('friction', 0.1) from math import sin, pi Br = Reflective_boundary(domain) Bt = Transmissive_boundary(domain) Bd = Dirichlet_boundary([0.2,0.,0.]) Bw = Time_boundary(domain=domain, f=lambda t: [(0.1*sin(t*2*pi)), 0.0, 0.0]) domain.set_boundary({'left': Bd, 'right': Br, 'top': Br, 'bottom': Br}) h = 0.05 elevation = domain.quantities['elevation'].vertex_values domain.set_quantity('stage', elevation + h) domain.check_integrity() for t in domain.evolve(yieldstep = 1, finaltime = 2.0): pass #domain.write_time() ################################## #Import the file as a new domain ################################## from data_manager import sww2domain from Numeric import allclose import os filename = domain.datadir+os.sep+domain.filename+'.sww' #Fail because NaNs are present try: domain2 = sww2domain(filename,boundary,fail_if_NaN=True,verbose=False) assert True == False except: #Now import it, filling NaNs to be 0 filler = 0 domain2 = sww2domain(filename,None,fail_if_NaN=False,NaN_filler = filler,verbose=False) bits = [ 'geo_reference.get_xllcorner()', 'geo_reference.get_yllcorner()', 'vertex_coordinates'] for quantity in ['elevation']+domain.quantities_to_be_stored: bits.append('quantities["%s"].get_integral()'%quantity) bits.append('get_quantity("%s")'%quantity) for bit in bits: # print 'testing that domain.'+bit+' has been restored' assert allclose(eval('domain.'+bit),eval('domain2.'+bit)) assert max(max(domain2.get_quantity('xmomentum')))==filler assert min(min(domain2.get_quantity('xmomentum')))==filler assert max(max(domain2.get_quantity('ymomentum')))==filler assert min(min(domain2.get_quantity('ymomentum')))==filler #print 'passed' #cleanup #import os #os.remove(domain.datadir+'/'+domain.filename+'.sww') #def test_weed(self): from data_manager import weed coordinates1 = [[0.,0.],[1.,0.],[1.,1.],[1.,0.],[2.,0.],[1.,1.]] volumes1 = [[0,1,2],[3,4,5]] boundary1= {(0,1): 'external',(1,2): 'not external',(2,0): 'external',(3,4): 'external',(4,5): 'external',(5,3): 'not external'} coordinates2,volumes2,boundary2=weed(coordinates1,volumes1,boundary1) points2 = {(0.,0.):None,(1.,0.):None,(1.,1.):None,(2.,0.):None} assert len(points2)==len(coordinates2) for i in range(len(coordinates2)): coordinate = tuple(coordinates2[i]) assert points2.has_key(coordinate) points2[coordinate]=i for triangle in volumes1: for coordinate in triangle: assert coordinates2[points2[tuple(coordinates1[coordinate])]][0]==coordinates1[coordinate][0] assert coordinates2[points2[tuple(coordinates1[coordinate])]][1]==coordinates1[coordinate][1] #FIXME This fails - smooth makes the comparism too hard for allclose def ztest_sww2domain3(self): ################################################ #DOMAIN.SMOOTH = TRUE !!!!!!!!!!!!!!!!!!!!!!!!!! ################################################ from mesh_factory import rectangular from shallow_water import Domain, Reflective_boundary, Dirichlet_boundary,\ Constant_height, Time_boundary, Transmissive_boundary from Numeric import array #Create basic mesh yiel=0.01 points, vertices, boundary = rectangular(10,10) #Create shallow water domain domain = Domain(points, vertices, boundary) domain.geo_reference = Geo_reference(56,11,11) domain.smooth = True domain.visualise = False domain.store = True domain.filename = 'bedslope' domain.default_order=2 #Bed-slope and friction domain.set_quantity('elevation', lambda x,y: -x/3) domain.set_quantity('friction', 0.1) # Boundary conditions from math import sin, pi Br = Reflective_boundary(domain) Bt = Transmissive_boundary(domain) Bd = Dirichlet_boundary([0.2,0.,0.]) Bw = Time_boundary(domain=domain, f=lambda t: [(0.1*sin(t*2*pi)), 0.0, 0.0]) domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd}) domain.quantities_to_be_stored.extend(['xmomentum','ymomentum']) #Initial condition h = 0.05 elevation = domain.quantities['elevation'].vertex_values domain.set_quantity('stage', elevation + h) #elevation = domain.get_quantity('elevation') #domain.set_quantity('stage', elevation + h) domain.check_integrity() #Evolution for t in domain.evolve(yieldstep = yiel, finaltime = 0.05): # domain.write_time() pass ########################################## #Import the example's file as a new domain ########################################## from data_manager import sww2domain from Numeric import allclose import os filename = domain.datadir+os.sep+domain.filename+'.sww' domain2 = sww2domain(filename,None,fail_if_NaN=False,verbose = False) #points, vertices, boundary = rectangular(15,15) #domain2.boundary = boundary ################### ##NOW TEST IT!!! ################### #FIXME smooth domain so that they can be compared bits = []#'vertex_coordinates'] for quantity in ['elevation']+domain.quantities_to_be_stored: bits.append('quantities["%s"].get_integral()'%quantity) #bits.append('get_quantity("%s")'%quantity) for bit in bits: #print 'testing that domain.'+bit+' has been restored' #print bit #print 'done' #print ('domain.'+bit), eval('domain.'+bit) #print ('domain2.'+bit), eval('domain2.'+bit) assert allclose(eval('domain.'+bit),eval('domain2.'+bit),rtol=1.0e-1,atol=1.e-3) pass ###################################### #Now evolve them both, just to be sure ######################################x visualise = False visualise = True domain.visualise = visualise domain.time = 0. from time import sleep final = .5 domain.set_quantity('friction', 0.1) domain.store = False domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Br}) for t in domain.evolve(yieldstep = yiel, finaltime = final): if visualise: sleep(.03) #domain.write_time() pass domain2.smooth = True domain2.visualise = visualise domain2.store = False domain2.default_order=2 domain2.set_quantity('friction', 0.1) #Bed-slope and friction # Boundary conditions Bd2=Dirichlet_boundary([0.2,0.,0.]) Br2 = Reflective_boundary(domain2) domain2.boundary = domain.boundary #print 'domain2.boundary' #print domain2.boundary domain2.set_boundary({'left': Bd2, 'right': Bd2, 'top': Bd2, 'bottom': Br2}) #domain2.boundary = domain.boundary #domain2.set_boundary({'exterior': Bd}) domain2.check_integrity() for t in domain2.evolve(yieldstep = yiel, finaltime = final): if visualise: sleep(.03) #domain2.write_time() pass ################### ##NOW TEST IT!!! ################## bits = [ 'vertex_coordinates'] for quantity in ['elevation','xmomentum','ymomentum']:#+domain.quantities_to_be_stored: #bits.append('quantities["%s"].get_integral()'%quantity) bits.append('get_quantity("%s")'%quantity) for bit in bits: print bit assert allclose(eval('domain.'+bit),eval('domain2.'+bit)) #------------------------------------------------------------- if __name__ == "__main__": suite = unittest.makeSuite(Test_Data_Manager,'test') #suite = unittest.makeSuite(Test_Data_Manager,'test_sww2domain') runner = unittest.TextTestRunner() runner.run(suite)