Changeset 7693
- Timestamp:
- Apr 23, 2010, 4:46:14 PM (15 years ago)
- Location:
- anuga_core/source/anuga/abstract_2d_finite_volumes
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
anuga_core/source/anuga/abstract_2d_finite_volumes/gauge.py
r7691 r7693 23 23 from math import sqrt 24 24 25 def _quantities2csv(quantities, point_quantities, centroids, point_i): 26 points_list = [] 27 28 for quantity in quantities: 29 #core quantities that are exported from the interpolator 30 if quantity == 'stage': 31 points_list.append(point_quantities[0]) 32 33 if quantity == 'elevation': 34 points_list.append(point_quantities[1]) 35 36 if quantity == 'xmomentum': 37 points_list.append(point_quantities[2]) 38 39 if quantity == 'ymomentum': 40 points_list.append(point_quantities[3]) 41 42 #derived quantities that are calculated from the core ones 43 if quantity == 'depth': 44 points_list.append(point_quantities[0] 45 - point_quantities[1]) 46 47 if quantity == 'momentum': 48 momentum = sqrt(point_quantities[2]**2 49 + point_quantities[3]**2) 50 points_list.append(momentum) 51 52 if quantity == 'speed': 53 #if depth is less than 0.001 then speed = 0.0 54 if point_quantities[0] - point_quantities[1] < 0.001: 55 vel = 0.0 56 else: 57 if point_quantities[2] < 1.0e6: 58 momentum = sqrt(point_quantities[2]**2 59 + point_quantities[3]**2) 60 vel = momentum / (point_quantities[0] 61 - point_quantities[1]) 62 else: 63 momentum = 0 64 vel = 0 65 66 points_list.append(vel) 67 68 if quantity == 'bearing': 69 points_list.append(calc_bearing(point_quantities[2], 70 point_quantities[3])) 71 if quantity == 'xcentroid': 72 points_list.append(centroids[point_i][0]) 73 74 if quantity == 'ycentroid': 75 points_list.append(centroids[point_i][1]) 76 77 return points_list 78 79 25 80 ## 26 81 # @brief Take gauge readings, given a gauge file and a sww mesh … … 163 218 heading.insert(0,'time') 164 219 heading.insert(1,'hours') 165 166 #create a list of csv writers for all the points and write header167 points_writer = []168 for point_i,point in enumerate(points):169 points_writer.append(writer(file(dir_name + sep + gauge_file170 + point_name[point_i] + '.csv', "wb")))171 points_writer[point_i].writerow(heading)172 220 173 221 if verbose: log.critical('Writing csv files') … … 187 235 quake_offset_time = callable_sww.starttime 188 236 237 for point_i, point in enumerate(points_array): 238 is_opened = False 189 239 for time in callable_sww.get_time(): 190 for point_i, point in enumerate(points_array): 191 #add domain starttime to relative time. 192 quake_time = time + quake_offset_time 193 points_list = [quake_time, quake_time/3600.]# fudge around SWW time bug 194 point_quantities = callable_sww(time, point_i) # __call__ is overridden 195 196 for quantity in quantities: 197 if quantity == NAN: 198 log.critical('quantity does not exist in %s' 199 % callable_sww.get_name) 200 else: 201 #core quantities that are exported from the interpolator 202 if quantity == 'stage': 203 points_list.append(point_quantities[0]) 204 205 if quantity == 'elevation': 206 points_list.append(point_quantities[1]) 207 208 if quantity == 'xmomentum': 209 points_list.append(point_quantities[2]) 210 211 if quantity == 'ymomentum': 212 points_list.append(point_quantities[3]) 213 214 #derived quantities that are calculated from the core ones 215 if quantity == 'depth': 216 points_list.append(point_quantities[0] 217 - point_quantities[1]) 218 219 if quantity == 'momentum': 220 momentum = sqrt(point_quantities[2]**2 221 + point_quantities[3]**2) 222 points_list.append(momentum) 223 224 if quantity == 'speed': 225 #if depth is less than 0.001 then speed = 0.0 226 if point_quantities[0] - point_quantities[1] < 0.001: 227 vel = 0.0 228 else: 229 if point_quantities[2] < 1.0e6: 230 momentum = sqrt(point_quantities[2]**2 231 + point_quantities[3]**2) 232 vel = momentum / (point_quantities[0] 233 - point_quantities[1]) 234 else: 235 momentum = 0 236 vel = 0 237 238 points_list.append(vel) 239 240 if quantity == 'bearing': 241 points_list.append(calc_bearing(point_quantities[2], 242 point_quantities[3])) 243 if quantity == 'xcentroid': 244 points_list.append(callable_sww.centroids[point_i][0]) 245 246 if quantity == 'ycentroid': 247 points_list.append(callable_sww.centroids[point_i][1]) 248 249 points_writer[point_i].writerow(points_list) 250 240 #add domain starttime to relative time. 241 quake_time = time + quake_offset_time 242 point_quantities = callable_sww(time, point_i) # __call__ is overridden 243 244 if point_quantities[0] != NAN: 245 if is_opened == False: 246 points_writer = writer(file(dir_name + sep + gauge_file 247 + point_name[point_i] + '.csv', "wb")) 248 points_writer.writerow(heading) 249 is_opened = True 250 points_list = [quake_time, quake_time/3600.] + _quantities2csv(quantities, point_quantities, callable_sww.centroids, point_i) 251 points_writer.writerow(points_list) 252 else: 253 msg = 'gauge' + point_name[point_i] + 'falls off the mesh in file ' + sww_file + '.' 254 log.warning(msg) 251 255 ## 252 256 # @brief Read a .sww file and plot the time series. -
anuga_core/source/anuga/abstract_2d_finite_volumes/test_gauge.py
r7690 r7693 301 301 302 302 303 #def test_sww2csv_gauge_point_off_mesh(self):304 #from anuga.pmesh.mesh import Mesh305 #from anuga.shallow_water import Domain, Transmissive_boundary306 #from csv import reader,writer307 #import time308 #import string309 310 #"""Most of this test was copied from test_interpolate311 #test_interpole_sww2csv312 313 #This is testing the sww2csv_gauges function with one gauge off the mesh, by creating a sww file and314 #then exporting the gauges and checking the results.315 316 #This tests the correct values for when a gauge is off the mesh, which is important for parallel.317 #"""318 319 #domain = self.domain320 # self._create_sww()303 def test_sww2csv_gauge_point_off_mesh(self): 304 from anuga.pmesh.mesh import Mesh 305 from anuga.shallow_water import Domain, Transmissive_boundary 306 from csv import reader,writer 307 import time 308 import string 309 310 """Most of this test was copied from test_interpolate 311 test_interpole_sww2csv 312 313 This is testing the sww2csv_gauges function with one gauge off the mesh, by creating a sww file and 314 then exporting the gauges and checking the results. 315 316 This tests the correct values for when a gauge is off the mesh, which is important for parallel. 317 """ 318 319 domain = self.domain 320 sww = self._create_sww() 321 321 322 # # test the function 323 # points = [[50.0,1.],[50.5,-20.25]] 324 325 # # points_file = tempfile.mktemp(".csv") 326 # points_file = 'test_point.csv' 327 # file_id = open(points_file,"w") 328 # file_id.write("name,easting,northing \n\ 329 # point1, 50.0, 1.0\n\ 330 # point2, 50.5, 20.25\n") 331 # file_id.close() 332 333 # sww2csv_gauges(sww.filename, 334 # points_file, 335 # quantities=['stage', 'elevation'], 336 # use_cache=False, 337 # verbose=False) 338 339 # point1_answers_array = [[0.0,1.0,-5.0], [2.0,10.0,-5.0]] 340 # point1_filename = 'gauge_point1.csv' 341 # point1_handle = file(point1_filename) 342 # point1_reader = reader(point1_handle) 343 # point1_reader.next() 344 345 # line=[] 346 # for i,row in enumerate(point1_reader): 347 # # print 'i',i,'row',row 348 # # note the 'hole' (element 1) below - skip the new 'hours' field 349 # line.append([float(row[0]),float(row[2]),float(row[3])]) 350 # #print 'line',line[i],'point1',point1_answers_array[i] 351 # assert num.allclose(line[i], point1_answers_array[i]) 352 353 # point2_answers_array = [[0.0,1.0,-0.5], [2.0,10.0,-0.5]] 354 # point2_filename = 'gauge_point2.csv' 355 # point2_handle = file(point2_filename) 356 # point2_reader = reader(point2_handle) 357 # point2_reader.next() 358 359 # line=[] 360 # for i,row in enumerate(point2_reader): 361 # # print 'i',i,'row',row 362 # # note the 'hole' (element 1) below - skip the new 'hours' field 363 # line.append([float(row[0]),float(row[2]),float(row[3])]) 364 # # print 'line',line[i],'point1',point1_answers_array[i] 365 # assert num.allclose(line[i], point2_answers_array[i]) 366 367 # # clean up 368 # point1_handle.close() 369 # point2_handle.close() 370 # os.remove(points_file) 371 # # os.remove(point1_filename) 372 # # os.remove(point2_filename) 322 # test the function 323 points = [[50.0,1.],[50.5,-20.25]] 324 325 # points_file = tempfile.mktemp(".csv") 326 points_file = 'test_point.csv' 327 file_id = open(points_file,"w") 328 file_id.write("name,easting,northing \n\ 329 offmesh1, 50.0, 1.0\n\ 330 offmesh2, 50.5, 20.25\n") 331 file_id.close() 332 333 points_files = ['offmesh1.csv', 'offmesh2.csv'] 334 335 for point_filename in points_files: 336 if os.path.exists(point_filename): os.remove(point_filename) 337 338 sww2csv_gauges(self.sww.filename, 339 points_file, 340 quantities=['stage', 'elevation', 'bearing'], 341 use_cache=False, 342 verbose=False) 343 344 for point_filename in points_files: 345 assert not os.path.exists(point_filename) 346 347 os.remove(points_file) 373 348 374 349 -
anuga_core/source/anuga/abstract_2d_finite_volumes/util.py
r7690 r7693 14 14 from shutil import copy 15 15 16 from anuga.utilities.numerical_tools import ensure_numeric 16 from anuga.utilities.numerical_tools import ensure_numeric, angle 17 17 18 18 from math import sqrt, atan, degrees … … 300 300 # * moving the reference direction from [1,0] to North 301 301 # * changing from counter clockwise to clocwise. 302 303 angle = degrees(atan(vh/(uh+1.e-15))) 304 305 if (0 < angle < 90.0): 306 if vh > 0: 307 bearing = 90.0 - abs(angle) 308 if vh < 0: 309 bearing = 270.0 - abs(angle) 310 311 if (-90 < angle < 0): 312 if vh < 0: 313 bearing = 90.0 - (angle) 314 if vh > 0: 315 bearing = 270.0 - (angle) 316 if angle == 0: bearing = 0.0 317 318 return bearing 302 303 return degrees(angle([uh, vh], [0, -1])) 319 304 320 305
Note: See TracChangeset
for help on using the changeset viewer.