- Timestamp:
- May 16, 2007, 2:58:52 PM (17 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
anuga_core/source/anuga/shallow_water/test_data_manager.py
r4437 r4453 1 1 #!/usr/bin/env python 2 2 # 3 3 4 4 5 import unittest … … 22 23 import data_manager 23 24 from anuga.coordinate_transforms.redfearn import redfearn 24 from anuga.coordinate_transforms.geo_reference import Geo_reference 25 from anuga.coordinate_transforms.geo_reference import Geo_reference, \ 26 DEFAULT_ZONE 25 27 26 28 class Test_Data_Manager(unittest.TestCase): … … 168 170 (non smooth) 169 171 """ 170 171 import time, os172 from Numeric import array, zeros, allclose, Float, concatenate173 from Scientific.IO.NetCDF import NetCDFFile174 175 172 self.domain.set_name('datatest' + str(id(self))) 176 173 self.domain.format = 'sww' #Remove?? 177 174 self.domain.smooth = False 178 175 179 176 sww = get_dataobject(self.domain) 180 177 sww.store_connectivity() 181 178 182 #Check contents183 #Get NetCDF184 179 fid = NetCDFFile(sww.filename, 'r') #Open existing file for append 185 180 … … 188 183 y = fid.variables['y'] 189 184 z = fid.variables['elevation'] 190 191 volumes = fid.variables['volumes'] 192 185 V = fid.variables['volumes'] 193 186 194 187 assert allclose (x[:], self.X.flat) 195 188 assert allclose (y[:], self.Y.flat) 196 189 assert allclose (z[:], self.F.flat) 197 198 V = volumes199 190 200 191 P = len(self.domain) … … 204 195 assert V[k, 2] == 3*k+2 205 196 206 207 fid.close() 208 209 #Cleanup 197 fid.close() 210 198 os.remove(sww.filename) 211 199 212 200 def test_sww_header(self): 201 """Test that constant sww information can be written correctly 202 (non smooth) 203 """ 204 self.domain.set_name('datatest' + str(id(self))) 205 self.domain.format = 'sww' #Remove?? 206 self.domain.smooth = False 207 208 sww = get_dataobject(self.domain) 209 sww.store_connectivity() 210 211 #Check contents 212 #Get NetCDF 213 fid = NetCDFFile(sww.filename, 'r') #Open existing file for append 214 215 # Get the variables 216 sww_revision = fid.revision_number 217 try: 218 revision_number = get_revision_number() 219 except: 220 revision_number = None 221 222 assert str(revision_number) == sww_revision 223 fid.close() 224 225 #print "sww.filename", sww.filename 226 os.remove(sww.filename) 227 228 def test_sww_range(self): 229 """Test that constant sww information can be written correctly 230 (non smooth) 231 """ 232 self.domain.set_name('datatest' + str(id(self))) 233 self.domain.format = 'sww' 234 self.domain.smooth = True 235 236 sww = get_dataobject(self.domain) 237 sww.store_connectivity() 238 for t in self.domain.evolve(yieldstep = 1, finaltime = 1): 239 pass 240 241 #Get NetCDF 242 fid = NetCDFFile(sww.filename, 'r') #Open existing file for append 243 244 from anuga.shallow_water.shallow_water_domain import Domain 245 sww_quantities = Domain.conserved_quantities 246 # Get the variables 247 range = fid.variables['stage_range'][:] 248 assert allclose(range,[-0.93519, 0.15]) 249 range = fid.variables['xmomentum_range'][:] 250 assert allclose(range,[0,0.46950444]) 251 range = fid.variables['ymomentum_range'][:] 252 assert allclose(range,[0,0.02174380]) 253 254 fid.close() 255 #print "sww.filename", sww.filename 256 os.remove(sww.filename) 257 213 258 def test_sww_constant_smooth(self): 214 259 """Test that constant sww information can be written correctly 215 260 (non smooth) 216 261 """ 217 218 import time, os219 from Numeric import array, zeros, allclose, Float, concatenate220 from Scientific.IO.NetCDF import NetCDFFile221 222 262 self.domain.set_name('datatest' + str(id(self))) 223 263 self.domain.format = 'sww' … … 232 272 233 273 # Get the variables 234 x = fid.variables['x'] 235 y = fid.variables['y'] 236 z = fid.variables['elevation'] 237 238 volumes = fid.variables['volumes'] 239 240 X = x[:] 241 Y = y[:] 274 X = fid.variables['x'][:] 275 Y = fid.variables['y'][:] 276 Z = fid.variables['elevation'][:] 277 V = fid.variables['volumes'] 242 278 243 279 assert allclose([X[0], Y[0]], array([0.0, 0.0])) 244 280 assert allclose([X[1], Y[1]], array([0.0, 0.5])) 245 281 assert allclose([X[2], Y[2]], array([0.0, 1.0])) 246 247 282 assert allclose([X[4], Y[4]], array([0.5, 0.5])) 248 249 283 assert allclose([X[7], Y[7]], array([1.0, 0.5])) 250 284 251 Z = z[:]252 285 assert Z[4] == -0.5 253 286 254 V = volumes255 287 assert V[2,0] == 4 256 288 assert V[2,1] == 5 257 289 assert V[2,2] == 1 258 259 290 assert V[4,0] == 6 260 291 assert V[4,1] == 7 261 292 assert V[4,2] == 3 262 293 263 264 fid.close() 265 266 #Cleanup 294 fid.close() 267 295 os.remove(sww.filename) 268 269 296 270 297 271 298 def test_sww_variable(self): 272 299 """Test that sww information can be written correctly 273 300 """ 274 275 import time, os276 from Numeric import array, zeros, allclose, Float, concatenate277 from Scientific.IO.NetCDF import NetCDFFile278 279 301 self.domain.set_name('datatest' + str(id(self))) 280 302 self.domain.format = 'sww' … … 290 312 fid = NetCDFFile(sww.filename, 'r') #Open existing file for append 291 313 314 315 # Get the variables 316 time = fid.variables['time'] 317 stage = fid.variables['stage'] 318 319 Q = self.domain.quantities['stage'] 320 Q0 = Q.vertex_values[:,0] 321 Q1 = Q.vertex_values[:,1] 322 Q2 = Q.vertex_values[:,2] 323 324 A = stage[0,:] 325 #print A[0], (Q2[0,0] + Q1[1,0])/2 326 assert allclose(A[0], (Q2[0] + Q1[1])/2) 327 assert allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3) 328 assert allclose(A[2], Q0[3]) 329 assert allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3) 330 331 #Center point 332 assert allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\ 333 Q0[5] + Q2[6] + Q1[7])/6) 334 335 fid.close() 336 os.remove(sww.filename) 337 338 339 def test_sww_variable2(self): 340 """Test that sww information can be written correctly 341 multiple timesteps. Use average as reduction operator 342 """ 343 344 import time, os 345 from Numeric import array, zeros, allclose, Float, concatenate 346 from Scientific.IO.NetCDF import NetCDFFile 347 348 self.domain.set_name('datatest' + str(id(self))) 349 self.domain.format = 'sww' 350 self.domain.smooth = True 351 352 self.domain.reduction = mean 353 354 sww = get_dataobject(self.domain) 355 sww.store_connectivity() 356 sww.store_timestep('stage') 357 #self.domain.limit2007 = 1 358 self.domain.evolve_to_end(finaltime = 0.01) 359 sww.store_timestep('stage') 360 361 362 #Check contents 363 #Get NetCDF 364 fid = NetCDFFile(sww.filename, 'r') #Open existing file for append 292 365 293 366 # Get the variables … … 298 371 stage = fid.variables['stage'] 299 372 300 373 #Check values 301 374 Q = self.domain.quantities['stage'] 302 375 Q0 = Q.vertex_values[:,0] … … 304 377 Q2 = Q.vertex_values[:,2] 305 378 306 A = stage[0,:] 307 #print A[0], (Q2[0,0] + Q1[1,0])/2 379 A = stage[1,:] 308 380 assert allclose(A[0], (Q2[0] + Q1[1])/2) 309 381 assert allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3) … … 316 388 317 389 318 319 390 fid.close() 320 391 … … 322 393 os.remove(sww.filename) 323 394 324 325 def test_sww_variable2(self): 395 def test_sww_variable3(self): 326 396 """Test that sww information can be written correctly 327 multiple timesteps . Use average as reduction operator397 multiple timesteps using a different reduction operator (min) 328 398 """ 329 399 … … 335 405 self.domain.format = 'sww' 336 406 self.domain.smooth = True 337 338 self.domain.reduction = mean 407 self.domain.reduction = min 339 408 340 409 sww = get_dataobject(self.domain) … … 348 417 #Check contents 349 418 #Get NetCDF 350 fid = NetCDFFile(sww.filename, 'r') #Open existing file for append351 352 # Get the variables353 x = fid.variables['x']354 y = fid.variables['y']355 z = fid.variables['elevation']356 time = fid.variables['time']357 stage = fid.variables['stage']358 359 #Check values360 Q = self.domain.quantities['stage']361 Q0 = Q.vertex_values[:,0]362 Q1 = Q.vertex_values[:,1]363 Q2 = Q.vertex_values[:,2]364 365 A = stage[1,:]366 assert allclose(A[0], (Q2[0] + Q1[1])/2)367 assert allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3)368 assert allclose(A[2], Q0[3])369 assert allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3)370 371 #Center point372 assert allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\373 Q0[5] + Q2[6] + Q1[7])/6)374 375 376 fid.close()377 378 #Cleanup379 os.remove(sww.filename)380 381 def test_sww_variable3(self):382 """Test that sww information can be written correctly383 multiple timesteps using a different reduction operator (min)384 """385 386 import time, os387 from Numeric import array, zeros, allclose, Float, concatenate388 from Scientific.IO.NetCDF import NetCDFFile389 390 self.domain.set_name('datatest' + str(id(self)))391 self.domain.format = 'sww'392 self.domain.smooth = True393 self.domain.reduction = min394 395 sww = get_dataobject(self.domain)396 sww.store_connectivity()397 sww.store_timestep('stage')398 #self.domain.limit2007 = 1399 self.domain.evolve_to_end(finaltime = 0.01)400 sww.store_timestep('stage')401 402 403 #Check contents404 #Get NetCDF405 419 fid = NetCDFFile(sww.filename, 'r') 406 407 420 408 421 # Get the variables … … 1707 1720 os.remove(prjfile) 1708 1721 os.remove(ascfile) 1709 #os.remove(swwfile)1722 os.remove(swwfile) 1710 1723 1711 1724 … … 1820 1833 os.remove(prjfile) 1821 1834 os.remove(ascfile) 1822 #os.remove(swwfile)1835 os.remove(swwfile) 1823 1836 1824 1837 … … 2856 2869 ################### 2857 2870 2858 #os.remove(domain.get_name() + '.sww')2859 2871 os.remove(filename) 2860 2872 … … 4065 4077 # is the sww file readable? 4066 4078 #Lets see if we can convert it to a dem! 4079 # if you uncomment, remember to delete the file 4067 4080 #print "sww_file",sww_file 4068 4081 #dem_file = tempfile.mktemp(".dem") … … 4092 4105 os.remove(sww_file) 4093 4106 4094 # remove dem file4095 #os.remove(dem_file)4096 4107 4097 4108 def test_get_min_max_indexes(self): … … 5063 5074 5064 5075 5076 time = fid.variables['time'][:] 5077 #print "time", time 5078 assert allclose([0.,0.5,1.], time) 5079 assert fid.starttime == 0.0 5065 5080 #Check that first coordinate is correctly represented 5066 5081 #Work out the UTM coordinates for first point … … 5168 5183 fid.close() 5169 5184 self.delete_mux(files) 5185 os.remove(sww_file) 5186 5187 def test_urs2sww_minmaxmintmaxt(self): 5188 5189 #longitudes = [150.66667, 150.83334, 151., 151.16667] 5190 #latitudes = [-34.5, -34.33333, -34.16667, -34] 5191 5192 tide = 1 5193 base_name, files = self.create_mux() 5194 urs2sww(base_name, 5195 mint=0.25, 5196 maxt=0.75, 5197 mean_stage=tide, 5198 remove_nc_files=True, 5199 verbose=self.verbose 5200 ) 5201 sww_file = base_name + '.sww' 5202 5203 #Let's interigate the sww file 5204 # Note, the sww info is not gridded. It is point data. 5205 fid = NetCDFFile(sww_file) 5206 5207 5208 time = fid.variables['time'][:] 5209 assert allclose(time, [0.0]) # the time is relative 5210 assert fid.starttime == 0.5 5211 5212 fid.close() 5213 self.delete_mux(files) 5214 #print "sww_file", sww_file 5170 5215 os.remove(sww_file) 5171 5216 … … 5670 5715 number_of_volumes = fid.variables['volumes'] 5671 5716 #print "number_of_volumes",len(number_of_volumes) 5672 assert allclose(1 2, len(number_of_volumes))5717 assert allclose(16, len(number_of_volumes)) 5673 5718 5674 5719 fid.close() … … 5956 6001 # extend this so it interpolates onto the boundary. 5957 6002 # have it fail if there is NaN 5958 5959 def davids_test_points_urs_ungridded2sww(self): 5960 tide = 5.0 5961 base_name = 'o' 5962 urs_ungridded2sww(base_name, mean_stage=tide) 5963 os.remove( base_name + '.sww') 5964 # extend this so it interpolates onto the boundary. 5965 # have it fail if there is NaN 5966 5967 def not_really_test_urs2txt(self): 5968 # not really a test, since it doesn't check the output data 5969 5970 #This will write 3 non-gridded mux files, for testing. 5971 #If no quantities are passed in, 5972 #na and va quantities will be the Easting values. 5973 #Depth and ua will be the Northing value. 5974 # this was manually checked to be correct 5975 5976 tide = 1 5977 time_step_count = 3 5978 time_step = 2 5979 5980 #Zone: 50 5981 #Easting: 240992.578 Northing: 7620442.472 5982 #Latitude: -21 30 ' 0.00000 '' Longitude: 114 30 ' 0.00000 '' 5983 5984 # This is gridded 5985 lat_long_points =[(-21.5,114.5),(-21,114.5),(-21.5,115), (-21.,115.)] 5986 base_name, files = self.write_mux(lat_long_points, 5987 time_step_count, time_step) 5988 urs2txt(base_name, 0) 5989 print "base_name", base_name 5990 5991 self.delete_mux(files) 5992 #os.remove(sww_file) 5993 # delete the txt file if this becomes automatic 5994 5995 def daves_urs2txt(self): 5996 # not really a test, since it doesn't check the output data 5997 5998 #This will write 3 non-gridded mux files, for testing. 5999 #If no quantities are passed in, 6000 #na and va quantities will be the Easting values. 6001 #Depth and ua will be the Northing value. 6002 # this was manually checked to be correct 6003 6004 tide = 1 6005 time_step_count = 3 6006 time_step = 2 6007 6008 #Zone: 50 6009 #Easting: 240992.578 Northing: 7620442.472 6010 #Latitude: -21 30 ' 0.00000 '' Longitude: 114 30 ' 0.00000 '' 6011 6012 # This is gridded 6013 lat_long_points =[(-21.5,114.5),(-21,114.5),(-21.5,115), (-21.,115.)] 6014 base_name, files = self.write_mux(lat_long_points, 6015 time_step_count, time_step) 6016 urs2txt(base_name, 0) 6017 print "base_name", base_name 6018 6019 self.delete_mux(files) 6020 #os.remove(sww_file) 6021 # delete the txt file if this becomes automatic 6003 6004 def test_triangulation(self): 6005 # 6006 # 6007 6008 filename = tempfile.mktemp("_data_manager.sww") 6009 outfile = NetCDFFile(filename, "w") 6010 points_utm = array([[0.,0.],[1.,1.], [0.,1.]]) 6011 volumes = (0,1,2) 6012 elevation = [0,1,2] 6013 new_origin = None 6014 new_origin = Geo_reference(56, 0, 0) 6015 times = [0, 10] 6016 number_of_volumes = len(volumes) 6017 number_of_points = len(points_utm) 6018 sww = Write_sww() 6019 sww.header(outfile, times, number_of_volumes, 6020 number_of_points, description='fully sick testing', 6021 verbose=self.verbose) 6022 sww.triangulation(outfile, points_utm, volumes, 6023 elevation, new_origin=new_origin, 6024 verbose=self.verbose) 6025 outfile.close() 6026 fid = NetCDFFile(filename) 6027 6028 x = fid.variables['x'][:] 6029 y = fid.variables['y'][:] 6030 fid.close() 6031 6032 assert allclose(array(map(None, x,y)), points_utm) 6033 os.remove(filename) 6034 6035 6036 def test_triangulationII(self): 6037 # 6038 # 6039 6040 filename = tempfile.mktemp("_data_manager.sww") 6041 outfile = NetCDFFile(filename, "w") 6042 points_utm = array([[0.,0.],[1.,1.], [0.,1.]]) 6043 volumes = (0,1,2) 6044 elevation = [0,1,2] 6045 new_origin = None 6046 #new_origin = Geo_reference(56, 0, 0) 6047 times = [0, 10] 6048 number_of_volumes = len(volumes) 6049 number_of_points = len(points_utm) 6050 sww = Write_sww() 6051 sww.header(outfile, times, number_of_volumes, 6052 number_of_points, description='fully sick testing', 6053 verbose=self.verbose) 6054 sww.triangulation(outfile, points_utm, volumes, 6055 elevation, new_origin=new_origin, 6056 verbose=self.verbose) 6057 outfile.close() 6058 fid = NetCDFFile(filename) 6059 6060 x = fid.variables['x'][:] 6061 y = fid.variables['y'][:] 6062 results_georef = Geo_reference() 6063 results_georef.read_NetCDF(fid) 6064 assert results_georef == Geo_reference(DEFAULT_ZONE, 0, 0) 6065 fid.close() 6066 6067 assert allclose(array(map(None, x,y)), points_utm) 6068 os.remove(filename) 6069 6070 6071 def test_triangulation_new_origin(self): 6072 # 6073 # 6074 6075 filename = tempfile.mktemp("_data_manager.sww") 6076 outfile = NetCDFFile(filename, "w") 6077 points_utm = array([[0.,0.],[1.,1.], [0.,1.]]) 6078 volumes = (0,1,2) 6079 elevation = [0,1,2] 6080 new_origin = None 6081 new_origin = Geo_reference(56, 1, 554354) 6082 points_utm = new_origin.change_points_geo_ref(points_utm) 6083 times = [0, 10] 6084 number_of_volumes = len(volumes) 6085 number_of_points = len(points_utm) 6086 sww = Write_sww() 6087 sww.header(outfile, times, number_of_volumes, 6088 number_of_points, description='fully sick testing', 6089 verbose=self.verbose) 6090 sww.triangulation(outfile, points_utm, volumes, 6091 elevation, new_origin=new_origin, 6092 verbose=self.verbose) 6093 outfile.close() 6094 fid = NetCDFFile(filename) 6095 6096 x = fid.variables['x'][:] 6097 y = fid.variables['y'][:] 6098 results_georef = Geo_reference() 6099 results_georef.read_NetCDF(fid) 6100 assert results_georef == new_origin 6101 fid.close() 6102 6103 absolute = Geo_reference(56, 0,0) 6104 assert allclose(array( \ 6105 absolute.change_points_geo_ref(map(None, x,y), 6106 new_origin)),points_utm) 6107 6108 os.remove(filename) 6109 6110 def test_triangulation_points_georeference(self): 6111 # 6112 # 6113 6114 filename = tempfile.mktemp("_data_manager.sww") 6115 outfile = NetCDFFile(filename, "w") 6116 points_utm = array([[0.,0.],[1.,1.], [0.,1.]]) 6117 volumes = (0,1,2) 6118 elevation = [0,1,2] 6119 new_origin = None 6120 points_georeference = Geo_reference(56, 1, 554354) 6121 points_utm = points_georeference.change_points_geo_ref(points_utm) 6122 times = [0, 10] 6123 number_of_volumes = len(volumes) 6124 number_of_points = len(points_utm) 6125 sww = Write_sww() 6126 sww.header(outfile, times, number_of_volumes, 6127 number_of_points, description='fully sick testing', 6128 verbose=self.verbose) 6129 sww.triangulation(outfile, points_utm, volumes, 6130 elevation, new_origin=new_origin, 6131 points_georeference=points_georeference, 6132 verbose=self.verbose) 6133 outfile.close() 6134 fid = NetCDFFile(filename) 6135 6136 x = fid.variables['x'][:] 6137 y = fid.variables['y'][:] 6138 results_georef = Geo_reference() 6139 results_georef.read_NetCDF(fid) 6140 assert results_georef == points_georeference 6141 fid.close() 6142 6143 assert allclose(array(map(None, x,y)), points_utm) 6144 os.remove(filename) 6145 6146 def test_triangulation_2_geo_refs(self): 6147 # 6148 # 6149 6150 filename = tempfile.mktemp("_data_manager.sww") 6151 outfile = NetCDFFile(filename, "w") 6152 points_utm = array([[0.,0.],[1.,1.], [0.,1.]]) 6153 volumes = (0,1,2) 6154 elevation = [0,1,2] 6155 new_origin = Geo_reference(56, 1, 1) 6156 points_georeference = Geo_reference(56, 0, 0) 6157 points_utm = points_georeference.change_points_geo_ref(points_utm) 6158 times = [0, 10] 6159 number_of_volumes = len(volumes) 6160 number_of_points = len(points_utm) 6161 sww = Write_sww() 6162 sww.header(outfile, times, number_of_volumes, 6163 number_of_points, description='fully sick testing', 6164 verbose=self.verbose) 6165 sww.triangulation(outfile, points_utm, volumes, 6166 elevation, new_origin=new_origin, 6167 points_georeference=points_georeference, 6168 verbose=self.verbose) 6169 outfile.close() 6170 fid = NetCDFFile(filename) 6171 6172 x = fid.variables['x'][:] 6173 y = fid.variables['y'][:] 6174 results_georef = Geo_reference() 6175 results_georef.read_NetCDF(fid) 6176 assert results_georef == new_origin 6177 fid.close() 6178 6179 6180 absolute = Geo_reference(56, 0,0) 6181 assert allclose(array( \ 6182 absolute.change_points_geo_ref(map(None, x,y), 6183 new_origin)),points_utm) 6184 os.remove(filename) 6022 6185 #------------------------------------------------------------- 6023 6186 if __name__ == "__main__": 6024 6187 #suite = unittest.makeSuite(Test_Data_Manager,'test_urs2sww_origin') 6025 #suite = unittest.makeSuite(Test_Data_Manager,' cache_test_URS_points_needed_and_urs_ungridded2sww')6026 #suite = unittest.makeSuite(Test_Data_Manager,'test_ urs_ungridded_hole')6188 #suite = unittest.makeSuite(Test_Data_Manager,'test_sww_header') 6189 #suite = unittest.makeSuite(Test_Data_Manager,'test_sww_range') 6027 6190 if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V': 6028 6191 Test_Data_Manager.verbose=True
Note: See TracChangeset
for help on using the changeset viewer.