Changeset 7762
- Timestamp:
- Jun 1, 2010, 1:08:26 PM (14 years ago)
- Location:
- trunk/anuga_core/source/anuga/file
- Files:
-
- 2 added
- 2 edited
- 4 copied
Legend:
- Unmodified
- Added
- Removed
-
trunk/anuga_core/source/anuga/file/sts.py
r7760 r7762 1 2 3 ## 4 # @brief A class to write STS files. 1 import numpy as num 2 from anuga.config import max_float 3 from anuga.config import netcdf_float, netcdf_float32, netcdf_int 4 from anuga.utilities.numerical_tools import ensure_numeric 5 from anuga.coordinate_transforms.geo_reference import Geo_reference, \ 6 ensure_geo_reference 7 8 5 9 class Write_sts: 6 sts_quantities = ['stage','xmomentum','ymomentum'] 10 """ A class to write STS files. 11 """ 12 sts_quantities = ['stage', 'xmomentum', 'ymomentum'] 7 13 RANGE = '_range' 8 14 EXTREMA = ':extrema' … … 73 79 outfile.createVariable('x', sts_precision, ('number_of_points',)) 74 80 outfile.createVariable('y', sts_precision, ('number_of_points',)) 75 outfile.createVariable('elevation',sts_precision, ('number_of_points',)) 81 outfile.createVariable('elevation', sts_precision, \ 82 ('number_of_points',)) 76 83 77 84 q = 'elevation' … … 288 295 y = fid.variables['y'][:] + yllcorner 289 296 290 x = num.reshape(x, (len(x), 1))291 y = num.reshape(y, (len(y), 1))297 x = num.reshape(x, (len(x), 1)) 298 y = num.reshape(y, (len(y), 1)) 292 299 sts_points = num.concatenate((x,y), axis=1) 293 300 -
trunk/anuga_core/source/anuga/file/test_mux.py
r7760 r7762 4 4 import os 5 5 from struct import pack, unpack 6 from Scientific.IO.NetCDF import NetCDFFile 6 7 7 8 from anuga.utilities.numerical_tools import ensure_numeric 8 9 from anuga.coordinate_transforms.redfearn import redfearn 10 from anuga.coordinate_transforms.geo_reference import Geo_reference 9 11 10 12 from mux import WAVEHEIGHT_MUX2_LABEL, EAST_VELOCITY_MUX2_LABEL, \ … … 12 14 13 15 from mux import read_mux2_py 16 from anuga.file_conversion.urs2sts import urs2sts 14 17 15 18 class TestCase(unittest.TestCase): … … 246 249 247 250 return base_name, files 251 248 252 249 253 def test_urs2sts_read_mux2_pyI(self): … … 1094 1098 1095 1099 self.delete_mux(filesII) 1096 1097 1100 1101 1102 1103 def test_urs2sts_nonstandard_projection_reverse(self): 1104 """ 1105 Test that a point not in the specified zone can occur first 1106 """ 1107 tide=0 1108 time_step_count = 3 1109 time_step = 2 1110 lat_long_points =[(-21.,113.5),(-21.,114.5),(-21.,114.), (-21.,115.)] 1111 n=len(lat_long_points) 1112 first_tstep=num.ones(n,num.int) 1113 first_tstep[0]+=1 1114 first_tstep[2]+=1 1115 last_tstep=(time_step_count)*num.ones(n,num.int) 1116 last_tstep[0]-=1 1117 1118 gauge_depth=20*num.ones(n,num.float) 1119 ha=2*num.ones((n,time_step_count),num.float) 1120 ha[0]=num.arange(0,time_step_count) 1121 ha[1]=num.arange(time_step_count,2*time_step_count) 1122 ha[2]=num.arange(2*time_step_count,3*time_step_count) 1123 ha[3]=num.arange(3*time_step_count,4*time_step_count) 1124 ua=5*num.ones((n,time_step_count),num.float) 1125 va=-10*num.ones((n,time_step_count),num.float) 1126 1127 base_name, files = self.write_mux2(lat_long_points, 1128 time_step_count, time_step, 1129 first_tstep, last_tstep, 1130 depth=gauge_depth, 1131 ha=ha, 1132 ua=ua, 1133 va=va) 1134 1135 urs2sts(base_name, 1136 basename_out=base_name, 1137 zone=50, 1138 mean_stage=tide,verbose=False) 1139 1140 # now I want to check the sts file ... 1141 sts_file = base_name + '.sts' 1142 1143 #Let's interigate the sww file 1144 # Note, the sww info is not gridded. It is point data. 1145 fid = NetCDFFile(sts_file) 1146 1147 # Make x and y absolute 1148 x = fid.variables['x'][:] 1149 y = fid.variables['y'][:] 1150 1151 geo_reference = Geo_reference(NetCDFObject=fid) 1152 points = geo_reference.get_absolute(map(None, x, y)) 1153 points = ensure_numeric(points) 1154 1155 x = points[:,0] 1156 y = points[:,1] 1157 1158 # Check that all coordinate are correctly represented 1159 # Using the non standard projection (50) 1160 for i in range(4): 1161 zone, e, n = redfearn(lat_long_points[i][0], lat_long_points[i][1], 1162 zone=50) 1163 assert num.allclose([x[i],y[i]], [e,n]) 1164 assert zone==geo_reference.zone 1165 1166 self.delete_mux(files) 1167 1168 1169 def test_urs2stsII(self): 1170 """ 1171 Test multiple sources 1172 """ 1173 tide=0 1174 time_step_count = 3 1175 time_step = 2 1176 lat_long_points =[(-21.5,114.5),(-21,114.5),(-21.5,115), (-21.,115.)] 1177 n=len(lat_long_points) 1178 first_tstep=num.ones(n,num.int) 1179 first_tstep[0]+=1 1180 first_tstep[2]+=1 1181 last_tstep=(time_step_count)*num.ones(n,num.int) 1182 last_tstep[0]-=1 1183 1184 gauge_depth=20*num.ones(n,num.float) 1185 ha=2*num.ones((n,time_step_count),num.float) 1186 ha[0]=num.arange(0,time_step_count) 1187 ha[1]=num.arange(time_step_count,2*time_step_count) 1188 ha[2]=num.arange(2*time_step_count,3*time_step_count) 1189 ha[3]=num.arange(3*time_step_count,4*time_step_count) 1190 ua=5*num.ones((n,time_step_count),num.float) 1191 va=-10*num.ones((n,time_step_count),num.float) 1192 1193 # Create two identical mux files to be combined by urs2sts 1194 base_nameI, filesI = self.write_mux2(lat_long_points, 1195 time_step_count, time_step, 1196 first_tstep, last_tstep, 1197 depth=gauge_depth, 1198 ha=ha, 1199 ua=ua, 1200 va=va) 1201 1202 base_nameII, filesII = self.write_mux2(lat_long_points, 1203 time_step_count, time_step, 1204 first_tstep, last_tstep, 1205 depth=gauge_depth, 1206 ha=ha, 1207 ua=ua, 1208 va=va) 1209 1210 # Call urs2sts with multiple mux files 1211 urs2sts([base_nameI, base_nameII], 1212 basename_out=base_nameI, 1213 weights=[1.0, 1.0], 1214 mean_stage=tide, 1215 verbose=False) 1216 1217 # now I want to check the sts file ... 1218 sts_file = base_nameI + '.sts' 1219 1220 #Let's interrogate the sts file 1221 # Note, the sts info is not gridded. It is point data. 1222 fid = NetCDFFile(sts_file) 1223 1224 # Make x and y absolute 1225 x = fid.variables['x'][:] 1226 y = fid.variables['y'][:] 1227 1228 geo_reference = Geo_reference(NetCDFObject=fid) 1229 points = geo_reference.get_absolute(map(None, x, y)) 1230 points = ensure_numeric(points) 1231 1232 x = points[:,0] 1233 y = points[:,1] 1234 1235 #Check that first coordinate is correctly represented 1236 #Work out the UTM coordinates for first point 1237 zone, e, n = redfearn(lat_long_points[0][0], lat_long_points[0][1]) 1238 assert num.allclose([x[0],y[0]], [e,n]) 1239 1240 #Check the time vector 1241 times = fid.variables['time'][:] 1242 1243 times_actual = [] 1244 for i in range(time_step_count): 1245 times_actual.append(time_step * i) 1246 1247 assert num.allclose(ensure_numeric(times), 1248 ensure_numeric(times_actual)) 1249 1250 #Check first value 1251 stage = fid.variables['stage'][:] 1252 xmomentum = fid.variables['xmomentum'][:] 1253 ymomentum = fid.variables['ymomentum'][:] 1254 elevation = fid.variables['elevation'][:] 1255 1256 # Set original data used to write mux file to be zero when gauges are 1257 # not recdoring 1258 1259 ha[0][0]=0.0 1260 ha[0][time_step_count-1]=0.0 1261 ha[2][0]=0.0 1262 ua[0][0]=0.0 1263 ua[0][time_step_count-1]=0.0 1264 ua[2][0]=0.0 1265 va[0][0]=0.0 1266 va[0][time_step_count-1]=0.0 1267 va[2][0]=0.0; 1268 1269 # The stage stored in the .sts file should be the sum of the stage 1270 # in the two mux2 files because both have weights = 1. In this case 1271 # the mux2 files are the same so stage == 2.0 * ha 1272 #print 2.0*num.transpose(ha) - stage 1273 assert num.allclose(2.0*num.transpose(ha), stage) #Meters 1274 1275 #Check the momentums - ua 1276 #momentum = velocity*(stage-elevation) 1277 # elevation = - depth 1278 #momentum = velocity_ua *(stage+depth) 1279 1280 depth=num.zeros((len(lat_long_points),time_step_count),num.float) 1281 for i in range(len(lat_long_points)): 1282 depth[i]=gauge_depth[i]+tide+2.0*ha[i] 1283 #2.0*ha necessary because using two files with weights=1 are used 1284 1285 # The xmomentum stored in the .sts file should be the sum of the ua 1286 # in the two mux2 files multiplied by the depth. 1287 assert num.allclose(2.0*num.transpose(ua*depth), xmomentum) 1288 1289 #Check the momentums - va 1290 #momentum = velocity*(stage-elevation) 1291 # elevation = - depth 1292 #momentum = velocity_va *(stage+depth) 1293 1294 # The ymomentum stored in the .sts file should be the sum of the va 1295 # in the two mux2 files multiplied by the depth. 1296 assert num.allclose(2.0*num.transpose(va*depth), ymomentum) 1297 1298 # check the elevation values. 1299 # -ve since urs measures depth, sww meshers height, 1300 assert num.allclose(-elevation, gauge_depth) #Meters 1301 1302 fid.close() 1303 self.delete_mux(filesI) 1304 self.delete_mux(filesII) 1305 os.remove(sts_file) 1306 1307 1308 def test_urs2sts0(self): 1309 """ 1310 Test single source 1311 """ 1312 tide=0 1313 time_step_count = 3 1314 time_step = 2 1315 lat_long_points =[(-21.5,114.5),(-21,114.5),(-21.5,115), (-21.,115.)] 1316 n=len(lat_long_points) 1317 first_tstep=num.ones(n,num.int) 1318 first_tstep[0]+=1 1319 first_tstep[2]+=1 1320 last_tstep=(time_step_count)*num.ones(n,num.int) 1321 last_tstep[0]-=1 1322 1323 gauge_depth=20*num.ones(n,num.float) 1324 ha=2*num.ones((n,time_step_count),num.float) 1325 ha[0]=num.arange(0,time_step_count) 1326 ha[1]=num.arange(time_step_count,2*time_step_count) 1327 ha[2]=num.arange(2*time_step_count,3*time_step_count) 1328 ha[3]=num.arange(3*time_step_count,4*time_step_count) 1329 ua=5*num.ones((n,time_step_count),num.float) 1330 va=-10*num.ones((n,time_step_count),num.float) 1331 1332 base_name, files = self.write_mux2(lat_long_points, 1333 time_step_count, time_step, 1334 first_tstep, last_tstep, 1335 depth=gauge_depth, 1336 ha=ha, 1337 ua=ua, 1338 va=va) 1339 1340 urs2sts(base_name, 1341 basename_out=base_name, 1342 mean_stage=tide,verbose=False) 1343 1344 # now I want to check the sts file ... 1345 sts_file = base_name + '.sts' 1346 1347 #Let's interigate the sww file 1348 # Note, the sww info is not gridded. It is point data. 1349 fid = NetCDFFile(sts_file) 1350 1351 # Make x and y absolute 1352 x = fid.variables['x'][:] 1353 y = fid.variables['y'][:] 1354 1355 geo_reference = Geo_reference(NetCDFObject=fid) 1356 points = geo_reference.get_absolute(map(None, x, y)) 1357 points = ensure_numeric(points) 1358 1359 x = points[:,0] 1360 y = points[:,1] 1361 1362 #Check that first coordinate is correctly represented 1363 #Work out the UTM coordinates for first point 1364 for i in range(4): 1365 zone, e, n = redfearn(lat_long_points[i][0], lat_long_points[i][1]) 1366 assert num.allclose([x[i],y[i]], [e,n]) 1367 1368 #Check the time vector 1369 times = fid.variables['time'][:] 1370 1371 times_actual = [] 1372 for i in range(time_step_count): 1373 times_actual.append(time_step * i) 1374 1375 assert num.allclose(ensure_numeric(times), 1376 ensure_numeric(times_actual)) 1377 1378 #Check first value 1379 stage = fid.variables['stage'][:] 1380 xmomentum = fid.variables['xmomentum'][:] 1381 ymomentum = fid.variables['ymomentum'][:] 1382 elevation = fid.variables['elevation'][:] 1383 1384 # Set original data used to write mux file to be zero when gauges are 1385 #not recdoring 1386 ha[0][0]=0.0 1387 ha[0][time_step_count-1]=0.0; 1388 ha[2][0]=0.0; 1389 ua[0][0]=0.0 1390 ua[0][time_step_count-1]=0.0; 1391 ua[2][0]=0.0; 1392 va[0][0]=0.0 1393 va[0][time_step_count-1]=0.0; 1394 va[2][0]=0.0; 1395 1396 assert num.allclose(num.transpose(ha),stage) #Meters 1397 1398 #Check the momentums - ua 1399 #momentum = velocity*(stage-elevation) 1400 # elevation = - depth 1401 #momentum = velocity_ua *(stage+depth) 1402 1403 depth=num.zeros((len(lat_long_points),time_step_count),num.float) 1404 for i in range(len(lat_long_points)): 1405 depth[i]=gauge_depth[i]+tide+ha[i] 1406 assert num.allclose(num.transpose(ua*depth),xmomentum) 1407 1408 #Check the momentums - va 1409 #momentum = velocity*(stage-elevation) 1410 # elevation = - depth 1411 #momentum = velocity_va *(stage+depth) 1412 1413 assert num.allclose(num.transpose(va*depth),ymomentum) 1414 1415 # check the elevation values. 1416 # -ve since urs measures depth, sww meshers height, 1417 assert num.allclose(-elevation, gauge_depth) #Meters 1418 1419 fid.close() 1420 self.delete_mux(files) 1421 os.remove(sts_file) 1422 1423 def test_urs2sts_nonstandard_meridian(self): 1424 """ 1425 Test single source using the meridian from zone 50 as a nonstandard meridian 1426 """ 1427 tide=0 1428 time_step_count = 3 1429 time_step = 2 1430 lat_long_points =[(-21.,114.5),(-21.,113.5),(-21.,114.), (-21.,115.)] 1431 n=len(lat_long_points) 1432 first_tstep=num.ones(n,num.int) 1433 first_tstep[0]+=1 1434 first_tstep[2]+=1 1435 last_tstep=(time_step_count)*num.ones(n,num.int) 1436 last_tstep[0]-=1 1437 1438 gauge_depth=20*num.ones(n,num.float) 1439 ha=2*num.ones((n,time_step_count),num.float) 1440 ha[0]=num.arange(0,time_step_count) 1441 ha[1]=num.arange(time_step_count,2*time_step_count) 1442 ha[2]=num.arange(2*time_step_count,3*time_step_count) 1443 ha[3]=num.arange(3*time_step_count,4*time_step_count) 1444 ua=5*num.ones((n,time_step_count),num.float) 1445 va=-10*num.ones((n,time_step_count),num.float) 1446 1447 base_name, files = self.write_mux2(lat_long_points, 1448 time_step_count, time_step, 1449 first_tstep, last_tstep, 1450 depth=gauge_depth, 1451 ha=ha, 1452 ua=ua, 1453 va=va) 1454 1455 urs2sts(base_name, 1456 basename_out=base_name, 1457 central_meridian=123, 1458 mean_stage=tide, 1459 verbose=False) 1460 1461 # now I want to check the sts file ... 1462 sts_file = base_name + '.sts' 1463 1464 #Let's interigate the sww file 1465 # Note, the sww info is not gridded. It is point data. 1466 fid = NetCDFFile(sts_file) 1467 1468 # Make x and y absolute 1469 x = fid.variables['x'][:] 1470 y = fid.variables['y'][:] 1471 1472 geo_reference = Geo_reference(NetCDFObject=fid) 1473 points = geo_reference.get_absolute(map(None, x, y)) 1474 points = ensure_numeric(points) 1475 1476 x = points[:,0] 1477 y = points[:,1] 1478 1479 # Check that all coordinate are correctly represented 1480 # Using the non standard projection (50) 1481 for i in range(4): 1482 zone, e, n = redfearn(lat_long_points[i][0], 1483 lat_long_points[i][1], 1484 central_meridian=123) 1485 assert num.allclose([x[i],y[i]], [e,n]) 1486 assert zone==-1 1487 1488 self.delete_mux(files) 1489 1490 1098 1491 ################################################################################ 1099 1492 -
trunk/anuga_core/source/anuga/file/test_read_sww.py
r7755 r7762 9 9 import os 10 10 import numpy as num 11 12 13 import sww 11 14 12 # This is needed to run the tests of local functions13 import data_manager14 15 15 16 class Test_read_sww(unittest.TestCase): … … 35 36 # Import necessary modules 36 37 #--------------------------------------------------------------------- 37 from anuga.interface import rectangular_cross 38 from anuga.interface import Domain 38 from anuga.abstract_2d_finite_volumes.mesh_factory import \ 39 rectangular_cross 40 from anuga.shallow_water.shallow_water_domain import Domain 39 41 from boundaries import Reflective_boundary 40 42 from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\ 41 import Dirichlet_boundary 42 from anuga.interface import Time_boundary 43 import Dirichlet_boundary, Time_boundary 43 44 44 45 #--------------------------------------------------------------------- … … 104 105 #M, N = stage.shape 105 106 106 sww_file = data_manager.Read_sww(source)107 sww_file = sww.Read_sww(source) 107 108 108 109 #print 'last frame number',sww_file.get_last_frame_number()
Note: See TracChangeset
for help on using the changeset viewer.