Changeset 8488
- Timestamp:
- Aug 1, 2012, 3:45:49 PM (13 years ago)
- Location:
- trunk/anuga_core/source/anuga
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/anuga_core/source/anuga/file/test_sww.py
r8455 r8488 60 60 Bt = Transmissive_boundary(domain) 61 61 Bd = Dirichlet_boundary([0.2,0.,0.]) 62 Bw = Time_boundary(domain=domain,f =lambda t: [(0.1*sin(t*2*pi)), 0.0, 0.0])62 Bw = Time_boundary(domain=domain,function=lambda t: [(0.1*sin(t*2*pi)), 0.0, 0.0]) 63 63 64 64 #domain.set_boundary({'left': Bd, 'right': Br, 'top': Br, 'bottom': Br}) -
trunk/anuga_core/source/anuga/operators/rain_operators.py
r8480 r8488 12 12 from anuga import Quantity 13 13 import numpy as num 14 from warnings import warn 14 15 import anuga.utilities.log as log 15 16 -
trunk/anuga_core/source/anuga/operators/rate_operators.py
r8477 r8488 14 14 from anuga import indent 15 15 import numpy as num 16 from warnings import warn 16 17 import anuga.utilities.log as log 17 18 … … 51 52 # Local variables 52 53 #------------------------------------------ 53 self.rate = rate54 54 self.indices = indices 55 55 self.factor = factor 56 56 57 #------------------------------------------ 58 # Check and store default_rate 59 #------------------------------------------ 60 msg = ('Keyword argument default_rate must be either None ' 61 'or a function of time.\nI got %s.' % str(default_rate)) 62 assert (default_rate is None or 63 isinstance(default_rate, (int, float)) or 64 callable(default_rate)), msg 65 66 67 #------------------------------------------ 68 # Allow application longer than data 69 #------------------------------------------ 70 if default_rate is not None: 71 # If it is a constant, make it a function 72 if not callable(default_rate): 73 tmp = default_rate 74 default_rate = lambda t: tmp 75 76 # Check that default_rate is a function of one argument 77 try: 78 default_rate(0.0) 79 except: 80 raise Exception(msg) 81 82 self.default_rate = default_rate 57 self.set_rate(rate) 58 self.set_default_rate(default_rate) 59 83 60 self.default_rate_invoked = False # Flag 84 85 86 87 61 88 62 def __call__(self): … … 109 83 % (self.quantity_name, domain.get_time(), rate)) 110 84 111 if self.indices is None: 112 self.stage_c[:] = self.stage_c[:] \ 113 + factor*rate*timestep 114 else: 115 self.stage_c[indices] = self.stage_c[indices] \ 116 + factor*rate*timestep 117 85 if rate >= 0.0: 86 if self.indices is None: 87 self.stage_c[:] = self.stage_c[:] \ 88 + factor*rate*timestep 89 else: 90 self.stage_c[indices] = self.stage_c[indices] \ 91 + factor*rate*timestep 92 else: # Be more careful if rate < 0 93 if self.indices is None: 94 self.stage_c[:] = num.maximun(self.stage_c \ 95 + factor*rate*timestep, self.elev_c ) 96 else: 97 self.stage_c[indices] = num.maximum(self.stage_c[indices] \ 98 + factor*rate*timestep, self.elev_c[indices]) 118 99 119 100 def get_rate(self, t=None): … … 142 123 if self.default_rate_invoked is False: 143 124 # Issue warning the first time 144 msg = (' %s\n'125 msg = ('\n%s' 145 126 'Instead I will use the default rate: %s\n' 146 127 'Note: Further warnings will be supressed' 147 % (str(e), s tr(self.default_rate)))128 % (str(e), self.default_rate(t))) 148 129 warn(msg) 149 130 … … 162 143 163 144 return rate 145 146 147 def set_rate(self, rate): 148 """Ability to change rate while running 149 """ 150 151 #------------------------------------------ 152 # Check and store rate 153 #------------------------------------------ 154 msg = ('Keyword argument rate must be a' 155 'scalar, or a function of time.') 156 assert (isinstance(rate, (int, float)) or 157 callable(rate)), msg 158 self.rate = rate 159 160 161 def set_default_rate(self, default_rate): 162 """ 163 Check and store default_rate 164 """ 165 msg = ('Default_rate must be either None ' 166 'a scalar, or a function of time.\nI got %s.' % str(default_rate)) 167 assert (default_rate is None or 168 isinstance(default_rate, (int, float)) or 169 callable(default_rate)), msg 170 171 172 #------------------------------------------ 173 # Allow longer than data 174 #------------------------------------------ 175 if default_rate is not None: 176 # If it is a constant, make it a function 177 if not callable(default_rate): 178 tmp = default_rate 179 default_rate = lambda t: tmp 180 181 # Check that default_rate is a function of one argument 182 try: 183 default_rate(0.0) 184 except: 185 raise Exception(msg) 186 187 self.default_rate = default_rate 188 164 189 165 190 def parallel_safe(self): -
trunk/anuga_core/source/anuga/operators/test_rate_operators.py
r8405 r8488 4 4 import unittest, os 5 5 import anuga 6 from anuga.shallow_water.shallow_water_domain import Domain 7 from boundaries import Reflective_boundary 8 from anuga.coordinate_transforms.geo_reference import Geo_reference 6 from anuga import Domain 7 from anuga import Reflective_boundary 8 from anuga import rectangular_cross_domain 9 from anuga import file_function 10 11 from anuga.config import netcdf_mode_r, netcdf_mode_w, netcdf_mode_a 9 12 from anuga.file_conversion.file_conversion import timefile2netcdf 10 from forcing import * 11 from mesh_factory import rectangular 12 #from anuga.file_conversion.sts2sww_mesh import sts2sww_mesh 13 from anuga.abstract_2d_finite_volumes.util import file_function 14 from anuga.config import netcdf_mode_r, netcdf_mode_w, netcdf_mode_a 13 from anuga.config import time_format 14 15 from rate_operators import * 15 16 16 17 import numpy as num 17 18 import warnings 18 19 20 def scalar_func_list(t, x, y): 21 """Function that returns a scalar. 22 23 Used to test error message when numeric array is expected 24 """ 25 26 return [17.7] 27 28 29 def speed(t, x, y): 30 """ 31 Variable windfield implemented using functions 32 Large speeds halfway between center and edges 33 34 Low speeds at center and edges 35 """ 36 37 from math import exp, cos, pi 38 39 x = num.array(x) 40 y = num.array(y) 41 42 N = len(x) 43 s = 0*x #New array 44 45 for k in range(N): 46 r = num.sqrt(x[k]**2 + y[k]**2) 47 factor = exp(-(r-0.15)**2) 48 s[k] = 4000 * factor * (cos(t*2*pi/150) + 2) 49 50 return s 51 52 53 def angle(t, x, y): 54 """Rotating field 55 """ 56 from math import atan, pi 57 58 x = num.array(x) 59 y = num.array(y) 60 61 N = len(x) 62 a = 0 * x # New array 63 64 for k in range(N): 65 r = num.sqrt(x[k]**2 + y[k]**2) 66 67 angle = atan(y[k]/x[k]) 68 69 if x[k] < 0: 70 angle += pi 71 72 # Take normal direction 73 angle -= pi/2 74 75 # Ensure positive radians 76 if angle < 0: 77 angle += 2*pi 78 79 a[k] = angle/pi*180 80 81 return a 82 83 def time_varying_speed(t, x, y): 84 """ 85 Variable speed windfield 86 """ 87 88 from math import exp, cos, pi 89 90 x = num.array(x,num.float) 91 y = num.array(y,num.float) 92 93 N = len(x) 94 s = 0*x #New array 95 96 #dx=x[-1]-x[0]; dy = y[-1]-y[0] 97 S=100. 98 for k in range(N): 99 s[k]=S*(1.+t/100.) 100 return s 101 102 103 def time_varying_angle(t, x, y): 104 """Rotating field 105 """ 106 from math import atan, pi 107 108 x = num.array(x,num.float) 109 y = num.array(y,num.float) 110 111 N = len(x) 112 a = 0 * x # New array 113 114 phi=135. 115 for k in range(N): 116 a[k]=phi*(1.+t/100.) 117 118 return a 119 120 121 def time_varying_pressure(t, x, y): 122 """Rotating field 123 """ 124 from math import atan, pi 125 126 x = num.array(x,num.float) 127 y = num.array(y,num.float) 128 129 N = len(x) 130 p = 0 * x # New array 131 132 p0=1000. 133 for k in range(N): 134 p[k]=p0*(1.-t/100.) 135 136 return p 137 138 def spatial_linear_varying_speed(t, x, y): 139 """ 140 Variable speed windfield 141 """ 142 143 from math import exp, cos, pi 144 145 x = num.array(x) 146 y = num.array(y) 147 148 N = len(x) 149 s = 0*x #New array 150 151 #dx=x[-1]-x[0]; dy = y[-1]-y[0] 152 s0=250. 153 ymin=num.min(y) 154 xmin=num.min(x) 155 a=0.000025; b=0.0000125; 156 for k in range(N): 157 s[k]=s0*(1+t/100.)+a*x[k]+b*y[k] 158 return s 159 160 161 def spatial_linear_varying_angle(t, x, y): 162 """Rotating field 163 """ 164 from math import atan, pi 165 166 x = num.array(x) 167 y = num.array(y) 168 169 N = len(x) 170 a = 0 * x # New array 171 172 phi=135. 173 b1=0.000025; b2=0.00001125; 174 for k in range(N): 175 a[k]=phi*(1+t/100.)+b1*x[k]+b2*y[k] 176 return a 177 178 def spatial_linear_varying_pressure(t, x, y): 179 p0=1000; 180 a=0.000025; b=0.0000125; 181 182 x = num.array(x) 183 y = num.array(y) 184 185 N = len(x) 186 p = 0 * x # New array 187 188 for k in range(N): 189 p[k]=p0*(1.-t/100.)+a*x[k]+b*y[k] 190 return p 191 192 193 def grid_1d(x0,dx,nx): 194 x = num.empty(nx,dtype=num.float) 195 for i in range(nx): 196 x[i]=x0+float(i)*dx 197 return x 198 199 200 def ndgrid(x,y): 201 nx = len(x) 202 ny = len(y) 203 X = num.empty(nx*ny,dtype=num.float) 204 Y = num.empty(nx*ny,dtype=num.float) 205 k=0 206 for i in range(nx): 207 for j in range(ny): 208 X[k]=x[i] 209 Y[k]=y[j] 210 k+=1 211 return X,Y 212 213 class Test_Forcing(unittest.TestCase): 19 import time 20 21 22 23 class Test_rate_operators(unittest.TestCase): 214 24 def setUp(self): 215 25 pass … … 217 27 def tearDown(self): 218 28 pass 219 220 def write_wind_pressure_field_sts(self, 221 field_sts_filename, 222 nrows=10, 223 ncols=10, 224 cellsize=25, 225 origin=(0.0,0.0), 226 refzone=50, 227 timestep=1, 228 number_of_timesteps=10, 229 angle=135.0, 230 speed=100.0, 231 pressure=1000.0): 232 233 xllcorner=origin[0] 234 yllcorner=origin[1] 235 starttime = 0; endtime = number_of_timesteps*timestep; 236 no_data = -9999 237 238 time = num.arange(starttime, endtime, timestep, dtype='i') 239 240 x = grid_1d(xllcorner,cellsize,ncols) 241 y = grid_1d(yllcorner,cellsize,nrows) 242 [X,Y] = ndgrid(x,y) 243 number_of_points = nrows*ncols 244 245 wind_speed = num.empty((number_of_timesteps,nrows*ncols),dtype=num.float) 246 wind_angle = num.empty((number_of_timesteps,nrows*ncols),dtype=num.float) 247 barometric_pressure = num.empty((number_of_timesteps,nrows*ncols), 248 dtype=num.float) 249 250 if ( callable(speed) and callable(angle) and callable(pressure) ): 251 x = num.ones(3, num.float) 252 y = num.ones(3, num.float) 253 try: 254 s = speed(1.0, x=x, y=y) 255 a = angle(1.0, x=x, y=y) 256 p = pressure(1.0, x=x, y=y) 257 use_function=True 258 except Exception, e: 259 msg = 'Function could not be executed.\n' 260 raise Exception, msg 261 else: 262 try : 263 speed=float(speed) 264 angle=float(angle) 265 pressure=float(pressure) 266 use_function=False 267 except: 268 msg = ('Force fields must be a scalar value coercible to float.') 269 raise Exception, msg 270 271 for i,t in enumerate(time): 272 if ( use_function ): 273 wind_speed[i,:] = speed(t,X,Y) 274 wind_angle[i,:] = angle(t,X,Y) 275 barometric_pressure[i,:] = pressure(t,X,Y) 276 else: 277 wind_speed[i,:] = speed 278 wind_angle[i,:] = angle 279 barometric_pressure[i,:] = pressure 280 281 # "Creating the field STS NetCDF file" 282 283 fid = NetCDFFile(field_sts_filename+'.sts', 'w') 284 fid.institution = 'Geoscience Australia' 285 fid.description = "description" 286 fid.starttime = 0.0 287 fid.ncols = ncols 288 fid.nrows = nrows 289 fid.cellsize = cellsize 290 fid.no_data = no_data 291 fid.createDimension('number_of_points', number_of_points) 292 fid.createDimension('number_of_timesteps', number_of_timesteps) 293 fid.createDimension('numbers_in_range', 2) 294 295 fid.createVariable('x', 'd', ('number_of_points',)) 296 fid.createVariable('y', 'd', ('number_of_points',)) 297 fid.createVariable('time', 'i', ('number_of_timesteps',)) 298 fid.createVariable('wind_speed', 'd', ('number_of_timesteps', 299 'number_of_points')) 300 fid.createVariable('wind_speed_range', 'd', ('numbers_in_range', )) 301 fid.createVariable('wind_angle', 'd', ('number_of_timesteps', 302 'number_of_points')) 303 fid.createVariable('wind_angle_range', 'd', ('numbers_in_range',)) 304 fid.createVariable('barometric_pressure', 'd', ('number_of_timesteps', 305 'number_of_points')) 306 fid.createVariable('barometric_pressure_range', 'd', ('numbers_in_range',)) 307 308 309 fid.variables['wind_speed_range'][:] = num.array([1e+036, -1e+036]) 310 fid.variables['wind_angle_range'][:] = num.array([1e+036, -1e+036]) 311 fid.variables['barometric_pressure_range'][:] = num.array([1e+036, -1e+036]) 312 fid.variables['time'][:] = time 313 314 ws = fid.variables['wind_speed'] 315 wa = fid.variables['wind_angle'] 316 pr = fid.variables['barometric_pressure'] 317 318 for i in xrange(number_of_timesteps): 319 ws[i] = wind_speed[i,:] 320 wa[i] = wind_angle[i,:] 321 pr[i] = barometric_pressure[i,:] 322 323 origin = anuga.coordinate_transforms.geo_reference.Geo_reference(refzone, 324 xllcorner, 325 yllcorner) 326 geo_ref = anuga.coordinate_transforms.geo_reference.write_NetCDF_georeference(origin, fid) 327 328 fid.variables['x'][:]=X-geo_ref.get_xllcorner() 329 fid.variables['y'][:]=Y-geo_ref.get_yllcorner() 330 331 332 fid.close() 333 334 def test_constant_wind_stress(self): 29 30 31 def test_rate_operator_simple(self): 335 32 from anuga.config import rho_a, rho_w, eta_w 336 33 from math import pi, cos, sin … … 357 54 domain.set_boundary({'exterior': Br}) 358 55 359 #Setup only one forcing term, constant wind stress 360 s = 100 361 phi = 135 362 domain.forcing_terms = [] 363 domain.forcing_terms.append(Wind_stress(s, phi)) 364 365 domain.compute_forcing_terms() 366 367 const = eta_w*rho_a / rho_w 368 369 #Convert to radians 370 phi = phi*pi / 180 371 372 #Compute velocity vector (u, v) 373 u = s*cos(phi) 374 v = s*sin(phi) 375 376 #Compute wind stress 377 S = const * num.sqrt(u**2 + v**2) 378 379 assert num.allclose(domain.quantities['stage'].explicit_update, 0) 380 assert num.allclose(domain.quantities['xmomentum'].explicit_update, S*u) 381 assert num.allclose(domain.quantities['ymomentum'].explicit_update, S*v) 382 383 def test_variable_wind_stress(self): 56 57 # print domain.quantities['stage'].centroid_values 58 # print domain.quantities['xmomentum'].centroid_values 59 # print domain.quantities['ymomentum'].centroid_values 60 61 # Apply operator to these triangles 62 indices = [0,1,3] 63 64 rate = 1.0 65 factor = 10.0 66 default_rate= 0.0 67 68 operator = Rate_operator(domain, rate=rate, factor=factor, \ 69 indices=indices, default_rate = default_rate) 70 71 # Apply Operator 72 domain.timestep = 2.0 73 operator() 74 75 stage_ex = [ 21., 21., 1., 21.] 76 77 78 # print domain.quantities['stage'].centroid_values 79 # print domain.quantities['xmomentum'].centroid_values 80 # print domain.quantities['ymomentum'].centroid_values 81 82 assert num.allclose(domain.quantities['stage'].centroid_values, stage_ex) 83 assert num.allclose(domain.quantities['xmomentum'].centroid_values, 0.0) 84 assert num.allclose(domain.quantities['ymomentum'].centroid_values, 0.0) 85 86 87 def test_rate_operator_negative_rate(self): 384 88 from anuga.config import rho_a, rho_w, eta_w 385 89 from math import pi, cos, sin … … 406 110 domain.set_boundary({'exterior': Br}) 407 111 408 domain.time = 5.54 # Take a random time (not zero) 409 410 #Setup only one forcing term, constant wind stress 411 s = 100 412 phi = 135 413 domain.forcing_terms = [] 414 domain.forcing_terms.append(Wind_stress(s=speed, phi=angle)) 415 416 domain.compute_forcing_terms() 417 418 #Compute reference solution 419 const = eta_w*rho_a / rho_w 420 421 N = len(domain) # number_of_triangles 422 423 xc = domain.get_centroid_coordinates() 424 t = domain.time 425 426 x = xc[:,0] 427 y = xc[:,1] 428 s_vec = speed(t,x,y) 429 phi_vec = angle(t,x,y) 430 431 for k in range(N): 432 # Convert to radians 433 phi = phi_vec[k]*pi / 180 434 s = s_vec[k] 435 436 # Compute velocity vector (u, v) 437 u = s*cos(phi) 438 v = s*sin(phi) 439 440 # Compute wind stress 441 S = const * num.sqrt(u**2 + v**2) 442 443 assert num.allclose(domain.quantities['stage'].explicit_update[k], 444 0) 445 assert num.allclose(domain.quantities['xmomentum'].\ 446 explicit_update[k], 447 S*u) 448 assert num.allclose(domain.quantities['ymomentum'].\ 449 explicit_update[k], 450 S*v) 451 452 def test_windfield_from_file(self): 453 import time 112 # print domain.quantities['elevation'].centroid_values 113 # print domain.quantities['stage'].centroid_values 114 # print domain.quantities['xmomentum'].centroid_values 115 # print domain.quantities['ymomentum'].centroid_values 116 117 # Apply operator to these triangles 118 indices = [0,1,3] 119 120 121 122 #Catchment_Rain_Polygon = read_polygon(join('CatchmentBdy.csv')) 123 #rainfall = file_function(join('1y120m.tms'), quantities=['rainfall']) 124 rate = -1.0 125 factor = 10.0 126 default_rate= 0.0 127 128 operator = Rate_operator(domain, rate=rate, factor=factor, \ 129 indices=indices, default_rate = default_rate) 130 131 132 # Apply Operator 133 domain.timestep = 2.0 134 operator() 135 136 stage_ex = [ 0., 0., 1., 0.] 137 138 # print domain.quantities['elevation'].centroid_values 139 # print domain.quantities['stage'].centroid_values 140 # print domain.quantities['xmomentum'].centroid_values 141 # print domain.quantities['ymomentum'].centroid_values 142 143 assert num.allclose(domain.quantities['stage'].centroid_values, stage_ex) 144 assert num.allclose(domain.quantities['xmomentum'].centroid_values, 0.0) 145 assert num.allclose(domain.quantities['ymomentum'].centroid_values, 0.0) 146 147 148 def test_rate_operator_rate_from_file(self): 454 149 from anuga.config import rho_a, rho_w, eta_w 455 150 from math import pi, cos, sin 456 from anuga.config import time_format457 from anuga.abstract_2d_finite_volumes.util import file_function458 151 459 152 a = [0.0, 0.0] … … 468 161 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 469 162 163 164 #--------------------------------- 165 #Typical ASCII file 166 #--------------------------------- 167 finaltime = 1200 168 filename = 'test_file_function' 169 fid = open(filename + '.txt', 'w') 170 start = time.mktime(time.strptime('2000', '%Y')) 171 dt = 60 #One minute intervals 172 t = 0.0 173 while t <= finaltime: 174 t_string = time.strftime(time_format, time.gmtime(t+start)) 175 fid.write('%s, %f %f %f\n' %(t_string, 2*t, t**2, sin(t*pi/600))) 176 t += dt 177 178 fid.close() 179 180 #Convert ASCII file to NetCDF (Which is what we really like!) 181 timefile2netcdf(filename+'.txt') 182 183 184 #Create file function from time series 185 F = file_function(filename + '.tms', 186 quantities = ['Attribute0', 187 'Attribute1', 188 'Attribute2']) 189 190 #Now try interpolation 191 for i in range(20): 192 t = i*10 193 q = F(t) 194 195 #Exact linear intpolation 196 assert num.allclose(q[0], 2*t) 197 if i%6 == 0: 198 assert num.allclose(q[1], t**2) 199 assert num.allclose(q[2], sin(t*pi/600)) 200 201 #Check non-exact 202 203 t = 90 #Halfway between 60 and 120 204 q = F(t) 205 assert num.allclose( (120**2 + 60**2)/2, q[1] ) 206 assert num.allclose( (sin(120*pi/600) + sin(60*pi/600))/2, q[2] ) 207 208 209 t = 100 #Two thirds of the way between between 60 and 120 210 q = F(t) 211 assert num.allclose( 2*120**2/3 + 60**2/3, q[1] ) 212 assert num.allclose( 2*sin(120*pi/600)/3 + sin(60*pi/600)/3, q[2] ) 213 214 #os.remove(filename + '.txt') 215 #os.remove(filename + '.tms') 216 217 470 218 domain = Domain(points, vertices) 471 219 472 # 220 #Flat surface with 1m of water 473 221 domain.set_quantity('elevation', 0) 474 222 domain.set_quantity('stage', 1.0) … … 478 226 domain.set_boundary({'exterior': Br}) 479 227 480 domain.time = 7 # Take a time that is represented in file (not zero) 481 482 # Write wind stress file (ensure that domain.time is covered) 483 # Take x=1 and y=0 484 filename = 'test_windstress_from_file' 485 start = time.mktime(time.strptime('2000', '%Y')) 486 fid = open(filename + '.txt', 'w') 487 dt = 1 # One second interval 488 t = 0.0 489 while t <= 10.0: 490 t_string = time.strftime(time_format, time.gmtime(t+start)) 491 492 fid.write('%s, %f %f\n' % 493 (t_string, speed(t,[1],[0])[0], angle(t,[1],[0])[0])) 494 t += dt 495 496 fid.close() 497 498 timefile2netcdf(filename + '.txt') 499 os.remove(filename + '.txt') 500 501 # Setup wind stress 502 F = file_function(filename + '.tms', 503 quantities=['Attribute0', 'Attribute1']) 504 os.remove(filename + '.tms') 505 506 W = Wind_stress(F) 507 508 domain.forcing_terms = [] 509 domain.forcing_terms.append(W) 510 511 domain.compute_forcing_terms() 512 513 # Compute reference solution 514 const = eta_w*rho_a / rho_w 515 516 N = len(domain) # number_of_triangles 517 518 t = domain.time 519 520 s = speed(t, [1], [0])[0] 521 phi = angle(t, [1], [0])[0] 522 523 # Convert to radians 524 phi = phi*pi / 180 525 526 # Compute velocity vector (u, v) 527 u = s*cos(phi) 528 v = s*sin(phi) 529 530 # Compute wind stress 531 S = const * num.sqrt(u**2 + v**2) 532 533 for k in range(N): 534 assert num.allclose(domain.quantities['stage'].explicit_update[k], 535 0) 536 assert num.allclose(domain.quantities['xmomentum'].\ 537 explicit_update[k], 538 S*u) 539 assert num.allclose(domain.quantities['ymomentum'].\ 540 explicit_update[k], 541 S*v) 542 543 def test_windfield_from_file_seconds(self): 544 import time 228 # print domain.quantities['elevation'].centroid_values 229 # print domain.quantities['stage'].centroid_values 230 # print domain.quantities['xmomentum'].centroid_values 231 # print domain.quantities['ymomentum'].centroid_values 232 233 # Apply operator to these triangles 234 indices = [0,1,3] 235 236 237 rate = file_function('test_file_function.tms', quantities=['Attribute1']) 238 239 factor = 1000.0 240 default_rate= 17.7 241 242 operator = Rate_operator(domain, rate=rate, factor=factor, \ 243 indices=indices, default_rate = default_rate) 244 245 246 # Apply Operator 247 domain.set_starttime(360.0) 248 domain.timestep = 1.0 249 250 operator() 251 252 253 d = domain.get_time()**2 * factor + 1.0 254 stage_ex0 = [ d, d, 1., d] 255 256 # print d, domain.get_time(), F(360.0) 257 258 # print domain.quantities['elevation'].centroid_values 259 # print domain.quantities['stage'].centroid_values 260 # print domain.quantities['xmomentum'].centroid_values 261 # print domain.quantities['ymomentum'].centroid_values 262 263 assert num.allclose(domain.quantities['stage'].centroid_values, stage_ex0) 264 assert num.allclose(domain.quantities['xmomentum'].centroid_values, 0.0) 265 assert num.allclose(domain.quantities['ymomentum'].centroid_values, 0.0) 266 267 268 domain.set_starttime(-10.0) 269 domain.timestep = 1.0 270 271 try: 272 operator() 273 except: 274 pass 275 else: 276 raise Exception('Should have raised an exception, time too early') 277 278 279 domain.set_starttime(1300.0) 280 domain.timestep = 1.0 281 282 operator() 283 284 d = default_rate*factor + d 285 stage_ex1 = [ d, d, 1., d] 286 287 # print domain.quantities['elevation'].centroid_values 288 # print domain.quantities['stage'].centroid_values 289 # print domain.quantities['xmomentum'].centroid_values 290 # print domain.quantities['ymomentum'].centroid_values 291 292 assert num.allclose(domain.quantities['stage'].centroid_values, stage_ex1) 293 assert num.allclose(domain.quantities['xmomentum'].centroid_values, 0.0) 294 assert num.allclose(domain.quantities['ymomentum'].centroid_values, 0.0) 295 296 297 def test_rate_operator_functions_rate_default_rate(self): 545 298 from anuga.config import rho_a, rho_w, eta_w 546 299 from math import pi, cos, sin 547 from anuga.config import time_format548 from anuga.abstract_2d_finite_volumes.util import file_function549 300 550 301 a = [0.0, 0.0] … … 561 312 domain = Domain(points, vertices) 562 313 563 # 314 #Flat surface with 1m of water 564 315 domain.set_quantity('elevation', 0) 565 316 domain.set_quantity('stage', 1.0) … … 569 320 domain.set_boundary({'exterior': Br}) 570 321 571 domain.time = 7 # Take a time that is represented in file (not zero) 572 573 # Write wind stress file (ensure that domain.time is covered) 574 # Take x=1 and y=0 575 filename = 'test_windstress_from_file' 576 start = time.mktime(time.strptime('2000', '%Y')) 577 fid = open(filename + '.txt', 'w') 578 dt = 0.5 # Half second interval 579 t = 0.0 580 while t <= 10.0: 581 fid.write('%s, %f %f\n' 582 % (str(t), speed(t, [1], [0])[0], angle(t, [1], [0])[0])) 583 t += dt 584 585 fid.close() 586 587 timefile2netcdf(filename + '.txt', time_as_seconds=True) 588 os.remove(filename + '.txt') 589 590 # Setup wind stress 591 F = file_function(filename + '.tms', 592 quantities=['Attribute0', 'Attribute1']) 593 os.remove(filename + '.tms') 594 595 W = Wind_stress(F) 596 597 domain.forcing_terms = [] 598 domain.forcing_terms.append(W) 599 600 domain.compute_forcing_terms() 601 602 # Compute reference solution 603 const = eta_w*rho_a / rho_w 604 605 N = len(domain) # number_of_triangles 606 607 t = domain.time 608 609 s = speed(t, [1], [0])[0] 610 phi = angle(t, [1], [0])[0] 611 612 # Convert to radians 613 phi = phi*pi / 180 614 615 # Compute velocity vector (u, v) 616 u = s*cos(phi) 617 v = s*sin(phi) 618 619 # Compute wind stress 620 S = const * num.sqrt(u**2 + v**2) 621 622 for k in range(N): 623 assert num.allclose(domain.quantities['stage'].explicit_update[k], 624 0) 625 assert num.allclose(domain.quantities['xmomentum'].\ 626 explicit_update[k], 627 S*u) 628 assert num.allclose(domain.quantities['ymomentum'].\ 629 explicit_update[k], 630 S*v) 631 632 def test_wind_stress_error_condition(self): 633 """Test that windstress reacts properly when forcing functions 634 are wrong - e.g. returns a scalar 635 """ 636 637 from math import pi, cos, sin 638 from anuga.config import rho_a, rho_w, eta_w 639 640 a = [0.0, 0.0] 641 b = [0.0, 2.0] 642 c = [2.0, 0.0] 643 d = [0.0, 4.0] 644 e = [2.0, 2.0] 645 f = [4.0, 0.0] 646 647 points = [a, b, c, d, e, f] 648 # bac, bce, ecf, dbe 649 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 650 651 domain = Domain(points, vertices) 652 653 # Flat surface with 1m of water 654 domain.set_quantity('elevation', 0) 655 domain.set_quantity('stage', 1.0) 656 domain.set_quantity('friction', 0) 657 658 Br = Reflective_boundary(domain) 659 domain.set_boundary({'exterior': Br}) 660 661 domain.time = 5.54 # Take a random time (not zero) 662 663 # Setup only one forcing term, bad func 664 domain.forcing_terms = [] 665 666 try: 667 domain.forcing_terms.append(Wind_stress(s=scalar_func_list, 668 phi=angle)) 669 except AssertionError: 670 pass 671 else: 672 msg = 'Should have raised exception' 673 raise Exception, msg 674 675 try: 676 domain.forcing_terms.append(Wind_stress(s=speed, phi=scalar_func)) 677 except Exception: 678 pass 679 else: 680 msg = 'Should have raised exception' 681 raise Exception, msg 682 683 try: 684 domain.forcing_terms.append(Wind_stress(s=speed, phi='xx')) 685 except: 686 pass 687 else: 688 msg = 'Should have raised exception' 689 raise Exception, msg 690 691 def test_rainfall(self): 692 from math import pi, cos, sin 693 694 a = [0.0, 0.0] 695 b = [0.0, 2.0] 696 c = [2.0, 0.0] 697 d = [0.0, 4.0] 698 e = [2.0, 2.0] 699 f = [4.0, 0.0] 700 701 points = [a, b, c, d, e, f] 702 # bac, bce, ecf, dbe 703 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 704 705 domain = Domain(points, vertices) 706 707 # Flat surface with 1m of water 708 domain.set_quantity('elevation', 0) 709 domain.set_quantity('stage', 1.0) 710 domain.set_quantity('friction', 0) 711 712 Br = Reflective_boundary(domain) 713 domain.set_boundary({'exterior': Br}) 714 715 # Setup only one forcing term, constant rainfall 716 domain.forcing_terms = [] 717 domain.forcing_terms.append(Rainfall(domain, rate=2.0)) 718 719 domain.compute_forcing_terms() 720 assert num.allclose(domain.quantities['stage'].explicit_update, 721 2.0/1000) 722 723 def test_rainfall_restricted_by_polygon(self): 724 from math import pi, cos, sin 725 726 a = [0.0, 0.0] 727 b = [0.0, 2.0] 728 c = [2.0, 0.0] 729 d = [0.0, 4.0] 730 e = [2.0, 2.0] 731 f = [4.0, 0.0] 732 733 points = [a, b, c, d, e, f] 734 # bac, bce, ecf, dbe 735 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 736 737 domain = Domain(points, vertices) 738 739 # Flat surface with 1m of water 740 domain.set_quantity('elevation', 0) 741 domain.set_quantity('stage', 1.0) 742 domain.set_quantity('friction', 0) 743 744 Br = Reflective_boundary(domain) 745 domain.set_boundary({'exterior': Br}) 746 747 # Setup only one forcing term, constant rainfall 748 # restricted to a polygon enclosing triangle #1 (bce) 749 domain.forcing_terms = [] 750 R = Rainfall(domain, rate=2.0, polygon=[[1,1], [2,1], [2,2], [1,2]]) 751 752 assert num.allclose(R.exchange_area, 2) 753 754 domain.forcing_terms.append(R) 755 756 domain.compute_forcing_terms() 757 758 assert num.allclose(domain.quantities['stage'].explicit_update[1], 759 2.0/1000) 760 assert num.allclose(domain.quantities['stage'].explicit_update[0], 0) 761 assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) 762 763 def test_time_dependent_rainfall_restricted_by_polygon(self): 764 a = [0.0, 0.0] 765 b = [0.0, 2.0] 766 c = [2.0, 0.0] 767 d = [0.0, 4.0] 768 e = [2.0, 2.0] 769 f = [4.0, 0.0] 770 771 points = [a, b, c, d, e, f] 772 # bac, bce, ecf, dbe 773 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 774 775 domain = Domain(points, vertices) 776 777 # Flat surface with 1m of water 778 domain.set_quantity('elevation', 0) 779 domain.set_quantity('stage', 1.0) 780 domain.set_quantity('friction', 0) 781 782 Br = Reflective_boundary(domain) 783 domain.set_boundary({'exterior': Br}) 784 785 # Setup only one forcing term, time dependent rainfall 786 # restricted to a polygon enclosing triangle #1 (bce) 787 domain.forcing_terms = [] 788 R = Rainfall(domain, 789 rate=lambda t: 3*t + 7, 790 polygon = [[1,1], [2,1], [2,2], [1,2]]) 791 792 assert num.allclose(R.exchange_area, 2) 322 verbose = False 793 323 794 domain.forcing_terms.append(R) 795 796 domain.time = 10. 797 798 domain.compute_forcing_terms() 799 800 assert num.allclose(domain.quantities['stage'].explicit_update[1], 801 (3*domain.time + 7)/1000) 802 assert num.allclose(domain.quantities['stage'].explicit_update[0], 0) 803 assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) 804 805 def test_time_dependent_rainfall_using_starttime(self): 806 rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float) 807 808 a = [0.0, 0.0] 809 b = [0.0, 2.0] 810 c = [2.0, 0.0] 811 d = [0.0, 4.0] 812 e = [2.0, 2.0] 813 f = [4.0, 0.0] 814 815 points = [a, b, c, d, e, f] 816 # bac, bce, ecf, dbe 817 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 818 819 domain = Domain(points, vertices) 820 821 # Flat surface with 1m of water 822 domain.set_quantity('elevation', 0) 823 domain.set_quantity('stage', 1.0) 824 domain.set_quantity('friction', 0) 825 826 Br = Reflective_boundary(domain) 827 domain.set_boundary({'exterior': Br}) 828 829 # Setup only one forcing term, time dependent rainfall 830 # restricted to a polygon enclosing triangle #1 (bce) 831 domain.forcing_terms = [] 832 R = Rainfall(domain, 833 rate=lambda t: 3*t + 7, 834 polygon=rainfall_poly) 835 836 assert num.allclose(R.exchange_area, 2) 837 838 domain.forcing_terms.append(R) 839 840 # This will test that time is set to starttime in set_starttime 841 domain.set_starttime(5.0) 842 843 domain.compute_forcing_terms() 844 845 assert num.allclose(domain.quantities['stage'].explicit_update[1], 846 (3*domain.get_time() + 7)/1000) 847 assert num.allclose(domain.quantities['stage'].explicit_update[1], 848 (3*domain.get_starttime() + 7)/1000) 849 850 assert num.allclose(domain.quantities['stage'].explicit_update[0], 0) 851 assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) 852 853 def test_time_dependent_rainfall_using_georef(self): 854 """test_time_dependent_rainfall_using_georef 855 856 This will also test the General forcing term using georef 857 """ 858 859 # Mesh in zone 56 (absolute coords) 860 x0 = 314036.58727982 861 y0 = 6224951.2960092 862 863 rainfall_poly = ensure_numeric([[1,1], [2,1], [2,2], [1,2]], num.float) 864 rainfall_poly += [x0, y0] 865 866 a = [0.0, 0.0] 867 b = [0.0, 2.0] 868 c = [2.0, 0.0] 869 d = [0.0, 4.0] 870 e = [2.0, 2.0] 871 f = [4.0, 0.0] 872 873 points = [a, b, c, d, e, f] 874 # bac, bce, ecf, dbe 875 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 876 877 domain = Domain(points, vertices, 878 geo_reference=Geo_reference(56, x0, y0)) 879 880 # Flat surface with 1m of water 881 domain.set_quantity('elevation', 0) 882 domain.set_quantity('stage', 1.0) 883 domain.set_quantity('friction', 0) 884 885 Br = Reflective_boundary(domain) 886 domain.set_boundary({'exterior': Br}) 887 888 # Setup only one forcing term, time dependent rainfall 889 # restricted to a polygon enclosing triangle #1 (bce) 890 domain.forcing_terms = [] 891 R = Rainfall(domain, 892 rate=lambda t: 3*t + 7, 893 polygon=rainfall_poly) 894 895 assert num.allclose(R.exchange_area, 2) 896 897 domain.forcing_terms.append(R) 898 899 # This will test that time is set to starttime in set_starttime 900 domain.set_starttime(5.0) 901 902 domain.compute_forcing_terms() 903 904 assert num.allclose(domain.quantities['stage'].explicit_update[1], 905 (3*domain.get_time() + 7)/1000) 906 assert num.allclose(domain.quantities['stage'].explicit_update[1], 907 (3*domain.get_starttime() + 7)/1000) 908 909 assert num.allclose(domain.quantities['stage'].explicit_update[0], 0) 910 assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) 911 912 def test_time_dependent_rainfall_restricted_by_polygon_with_default(self): 913 """ 914 Test that default rainfall can be used when given rate runs out of data. 915 """ 916 917 import warnings 918 warnings.simplefilter('ignore', UserWarning) 919 920 921 a = [0.0, 0.0] 922 b = [0.0, 2.0] 923 c = [2.0, 0.0] 924 d = [0.0, 4.0] 925 e = [2.0, 2.0] 926 f = [4.0, 0.0] 927 928 points = [a, b, c, d, e, f] 929 # bac, bce, ecf, dbe 930 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 931 932 domain = Domain(points, vertices) 933 934 # Flat surface with 1m of water 935 domain.set_quantity('elevation', 0) 936 domain.set_quantity('stage', 1.0) 937 domain.set_quantity('friction', 0) 938 939 Br = Reflective_boundary(domain) 940 domain.set_boundary({'exterior': Br}) 941 942 # Setup only one forcing term, time dependent rainfall 943 # that expires at t==20 944 from anuga.fit_interpolate.interpolate import Modeltime_too_late 324 if verbose: 325 print domain.quantities['elevation'].centroid_values 326 print domain.quantities['stage'].centroid_values 327 print domain.quantities['xmomentum'].centroid_values 328 print domain.quantities['ymomentum'].centroid_values 329 330 # Apply operator to these triangles 331 indices = [0,1,3] 332 factor = 10.0 333 945 334 946 335 def main_rate(t): … … 949 338 raise Modeltime_too_late, msg 950 339 else: 951 return 3*t + 7 952 953 domain.forcing_terms = [] 954 R = Rainfall(domain, 955 rate=main_rate, 956 polygon = [[1,1], [2,1], [2,2], [1,2]], 957 default_rate=5.0) 958 959 assert num.allclose(R.exchange_area, 2) 960 961 domain.forcing_terms.append(R) 962 963 domain.time = 10. 964 965 domain.compute_forcing_terms() 966 967 assert num.allclose(domain.quantities['stage'].explicit_update[1], 968 (3*domain.time+7)/1000) 969 assert num.allclose(domain.quantities['stage'].explicit_update[0], 0) 970 assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) 971 972 domain.time = 100. 973 domain.quantities['stage'].explicit_update[:] = 0.0 # Reset 974 domain.compute_forcing_terms() 975 976 assert num.allclose(domain.quantities['stage'].explicit_update[1], 977 5.0/1000) # Default value 978 assert num.allclose(domain.quantities['stage'].explicit_update[0], 0) 979 assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) 980 981 def test_rainfall_forcing_with_evolve(self): 982 """test_rainfall_forcing_with_evolve 983 984 Test how forcing terms are called within evolve 985 """ 986 987 # FIXME(Ole): This test is just to experiment 988 import warnings 989 warnings.simplefilter('ignore', UserWarning) 990 991 992 a = [0.0, 0.0] 993 b = [0.0, 2.0] 994 c = [2.0, 0.0] 995 d = [0.0, 4.0] 996 e = [2.0, 2.0] 997 f = [4.0, 0.0] 998 999 points = [a, b, c, d, e, f] 1000 # bac, bce, ecf, dbe 1001 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 1002 1003 domain = Domain(points, vertices) 1004 1005 # Flat surface with 1m of water 1006 domain.set_quantity('elevation', 0) 1007 domain.set_quantity('stage', 1.0) 1008 domain.set_quantity('friction', 0) 1009 1010 Br = Reflective_boundary(domain) 1011 domain.set_boundary({'exterior': Br}) 1012 1013 # Setup only one forcing term, time dependent rainfall 1014 # that expires at t==20 1015 from anuga.fit_interpolate.interpolate import Modeltime_too_late 1016 1017 def main_rate(t): 1018 if t > 20: 1019 msg = 'Model time exceeded.' 1020 raise Modeltime_too_late, msg 1021 else: 1022 return 3*t + 7 1023 1024 domain.forcing_terms = [] 1025 R = Rainfall(domain, 1026 rate=main_rate, 1027 polygon=[[1,1], [2,1], [2,2], [1,2]], 1028 default_rate=5.0) 1029 1030 assert num.allclose(R.exchange_area, 2) 1031 1032 domain.forcing_terms.append(R) 1033 1034 for t in domain.evolve(yieldstep=1, finaltime=25): 1035 pass 1036 #FIXME(Ole): A test here is hard because explicit_update also 1037 # receives updates from the flux calculation. 1038 1039 1040 def test_rainfall_forcing_with_evolve_1(self): 1041 """test_rainfall_forcing_with_evolve 1042 1043 Test how forcing terms are called within evolve. 1044 This test checks that proper exception is thrown when no default_rate is set 1045 """ 1046 1047 import warnings 1048 warnings.simplefilter('ignore', UserWarning) 1049 1050 1051 a = [0.0, 0.0] 1052 b = [0.0, 2.0] 1053 c = [2.0, 0.0] 1054 d = [0.0, 4.0] 1055 e = [2.0, 2.0] 1056 f = [4.0, 0.0] 1057 1058 points = [a, b, c, d, e, f] 1059 # bac, bce, ecf, dbe 1060 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 1061 1062 domain = Domain(points, vertices) 1063 1064 # Flat surface with 1m of water 1065 domain.set_quantity('elevation', 0) 1066 domain.set_quantity('stage', 1.0) 1067 domain.set_quantity('friction', 0) 1068 1069 Br = Reflective_boundary(domain) 1070 domain.set_boundary({'exterior': Br}) 1071 1072 # Setup only one forcing term, time dependent rainfall 1073 # that expires at t==20 1074 from anuga.fit_interpolate.interpolate import Modeltime_too_late 1075 1076 def main_rate(t): 1077 if t > 20: 1078 msg = 'Model time exceeded.' 1079 raise Modeltime_too_late, msg 1080 else: 1081 return 3*t + 7 1082 1083 domain.forcing_terms = [] 1084 R = Rainfall(domain, 1085 rate=main_rate, 1086 polygon=[[1,1], [2,1], [2,2], [1,2]]) 1087 1088 1089 assert num.allclose(R.exchange_area, 2) 1090 1091 domain.forcing_terms.append(R) 1092 #for t in domain.evolve(yieldstep=1, finaltime=25): 1093 # pass 1094 1095 try: 1096 for t in domain.evolve(yieldstep=1, finaltime=25): 1097 pass 1098 except Modeltime_too_late, e: 1099 # Test that error message is as expected 1100 assert 'can specify keyword argument default_rate in the forcing function' in str(e) 1101 else: 1102 raise Exception, 'Should have raised exception' 1103 1104 def test_constant_wind_stress_from_file(self): 1105 from anuga.config import rho_a, rho_w, eta_w 1106 from math import pi, cos, sin 1107 from anuga.file_conversion.sts2sww_mesh import sts2sww_mesh 1108 1109 cellsize = 25 1110 nrows=5; ncols = 6; 1111 refzone=50 1112 xllcorner=366000;yllcorner=6369500; 1113 number_of_timesteps = 6 1114 timestep=12*60 1115 eps=2e-16 1116 1117 points, vertices, boundary =rectangular(nrows-2,ncols-2, 1118 len1=cellsize*(ncols-1), 1119 len2=cellsize*(nrows-1), 1120 origin=(xllcorner,yllcorner)) 1121 1122 domain = Domain(points, vertices, boundary) 1123 midpoints = domain.get_centroid_coordinates() 1124 1125 # Flat surface with 1m of water 1126 domain.set_quantity('elevation', 0) 1127 domain.set_quantity('stage', 1.0) 1128 domain.set_quantity('friction', 0) 1129 1130 Br = Reflective_boundary(domain) 1131 domain.set_boundary({'top': Br, 'bottom' :Br, 'left': Br, 'right': Br}) 1132 1133 # Setup only one forcing term, constant wind stress 1134 s = 100 1135 phi = 135 1136 pressure=1000 1137 domain.forcing_terms = [] 1138 field_sts_filename = 'wind_field' 1139 self.write_wind_pressure_field_sts(field_sts_filename, 1140 nrows=nrows, 1141 ncols=ncols, 1142 cellsize=cellsize, 1143 origin=(xllcorner,yllcorner), 1144 refzone=50, 1145 timestep=timestep, 1146 number_of_timesteps=10, 1147 speed=s, 1148 angle=phi, 1149 pressure=pressure) 1150 1151 sts2sww_mesh(field_sts_filename,spatial_thinning=1, 1152 verbose=False) 1153 1154 # Setup wind stress 1155 F = file_function(field_sts_filename+'.sww', domain, 1156 quantities=['wind_speed', 'wind_angle'], 1157 interpolation_points = midpoints) 1158 1159 W = Wind_stress(F,use_coordinates=False) 1160 domain.forcing_terms.append(W) 1161 domain.compute_forcing_terms() 1162 1163 const = eta_w*rho_a / rho_w 1164 1165 # Convert to radians 1166 phi = phi*pi / 180 1167 1168 # Compute velocity vector (u, v) 1169 u = s*cos(phi) 1170 v = s*sin(phi) 1171 1172 # Compute wind stress 1173 S = const * num.sqrt(u**2 + v**2) 1174 1175 assert num.allclose(domain.quantities['stage'].explicit_update, 0) 1176 assert num.allclose(domain.quantities['xmomentum'].explicit_update, S*u) 1177 assert num.allclose(domain.quantities['ymomentum'].explicit_update, S*v) 1178 1179 def test_variable_windfield_from_file(self): 1180 from anuga.config import rho_a, rho_w, eta_w 1181 from math import pi, cos, sin 1182 from anuga.config import time_format 1183 from anuga.file_conversion.sts2sww_mesh import sts2sww_mesh 1184 1185 cellsize = 25 1186 #nrows=25; ncols = 25; 1187 nrows=10; ncols = 10; 1188 refzone=50 1189 xllcorner=366000;yllcorner=6369500; 1190 number_of_timesteps = 10 1191 timestep=1 1192 eps=2.e-16 1193 spatial_thinning=1 1194 1195 points, vertices, boundary =rectangular(nrows-2,ncols-2, 1196 len1=cellsize*(ncols-1), 1197 len2=cellsize*(nrows-1), 1198 origin=(xllcorner,yllcorner)) 1199 1200 time=num.arange(0,10,1,num.float) 1201 eval_time=time[7]; 1202 1203 domain = Domain(points, vertices, boundary) 1204 midpoints = domain.get_centroid_coordinates() 1205 vertexpoints = domain.get_nodes() 1206 1207 """ 1208 x=grid_1d(xllcorner,cellsize,ncols) 1209 y=grid_1d(yllcorner,cellsize,nrows) 1210 X,Y=num.meshgrid(x,y) 1211 interpolation_points=num.empty((X.shape[0]*X.shape[1],2),num.float) 1212 k=0 1213 for i in range(X.shape[0]): 1214 for j in range(X.shape[1]): 1215 interpolation_points[k,0]=X[i,j] 1216 interpolation_points[k,1]=Y[i,j] 1217 k+=1 1218 1219 z=spatial_linear_varying_speed(eval_time,interpolation_points[:,0], 1220 interpolation_points[:,1]) 1221 1222 k=0 1223 Z=num.empty((X.shape[0],X.shape[1]),num.float) 1224 for i in range(X.shape[0]): 1225 for j in range(X.shape[1]): 1226 Z[i,j]=z[k] 1227 k+=1 1228 1229 Q=num.empty((time.shape[0],points.shape[0]),num.float) 1230 for i, t in enumerate(time): 1231 Q[i,:]=spatial_linear_varying_speed(t,points[:,0],points[:,1]) 1232 1233 from interpolate import Interpolation_function 1234 I = Interpolation_function(time,Q, 1235 vertex_coordinates = points, 1236 triangles = domain.triangles, 1237 #interpolation_points = midpoints, 1238 interpolation_points=interpolation_points, 1239 verbose=False) 1240 1241 V=num.empty((X.shape[0],X.shape[1]),num.float) 1242 for k in range(len(interpolation_points)): 1243 assert num.allclose(I(eval_time,k),z[k]) 1244 V[k/X.shape[1],k%X.shape[1]]=I(eval_time,k) 1245 1246 1247 import mpl_toolkits.mplot3d.axes3d as p3 1248 fig=P.figure() 1249 ax = p3.Axes3D(fig) 1250 ax.plot_surface(X,Y,V) 1251 ax.plot_surface(X,Y,Z) 1252 P.show() 1253 1254 1255 """ 1256 1257 # Flat surface with 1m of water 1258 domain.set_quantity('elevation', 0) 1259 domain.set_quantity('stage', 1.0) 1260 domain.set_quantity('friction', 0) 1261 1262 domain.time = 7*timestep # Take a time that is represented in file (not zero) 1263 1264 # Write wind stress file (ensure that domain.time is covered) 1265 1266 field_sts_filename = 'wind_field' 1267 self.write_wind_pressure_field_sts(field_sts_filename, 1268 nrows=nrows, 1269 ncols=ncols, 1270 cellsize=cellsize, 1271 origin=(xllcorner,yllcorner), 1272 refzone=50, 1273 timestep=timestep, 1274 number_of_timesteps=10, 1275 speed=spatial_linear_varying_speed, 1276 angle=spatial_linear_varying_angle, 1277 pressure=spatial_linear_varying_pressure) 1278 1279 1280 sts2sww_mesh(field_sts_filename,spatial_thinning=spatial_thinning, 1281 verbose=False) 1282 1283 # Setup wind stress 1284 FW = file_function(field_sts_filename+'.sww', domain, 1285 quantities=['wind_speed', 'wind_angle'], 1286 interpolation_points = midpoints) 1287 1288 W = Wind_stress(FW,use_coordinates=False) 1289 1290 domain.forcing_terms = [] 1291 domain.forcing_terms.append(W) 1292 1293 domain.compute_forcing_terms() 1294 1295 # Compute reference solution 1296 const = eta_w*rho_a / rho_w 1297 1298 N = len(domain) # number_of_triangles 1299 1300 xc = domain.get_centroid_coordinates() 1301 t = domain.time 1302 1303 x = xc[:,0] 1304 y = xc[:,1] 1305 s_vec = spatial_linear_varying_speed(t,x,y) 1306 phi_vec = spatial_linear_varying_angle(t,x,y) 1307 1308 for k in range(N): 1309 # Convert to radians 1310 phi = phi_vec[k]*pi / 180 1311 s = s_vec[k] 1312 1313 # Compute velocity vector (u, v) 1314 u = s*cos(phi) 1315 v = s*sin(phi) 1316 1317 # Compute wind stress 1318 S = const * num.sqrt(u**2 + v**2) 1319 1320 assert num.allclose(domain.quantities['stage'].explicit_update[k],0) 1321 1322 assert num.allclose(domain.quantities['xmomentum'].\ 1323 explicit_update[k],S*u,eps) 1324 assert num.allclose(domain.quantities['ymomentum'].\ 1325 explicit_update[k],S*v,eps) 1326 1327 os.remove(field_sts_filename+'.sts') 1328 os.remove(field_sts_filename+'.sww') 1329 1330 def test_variable_pressurefield_from_file(self): 1331 from anuga.config import rho_a, rho_w, eta_w 1332 from math import pi, cos, sin 1333 from anuga.config import time_format 1334 from anuga.file_conversion.sts2sww_mesh import sts2sww_mesh 1335 1336 cellsize = 25 1337 #nrows=25; ncols = 25; 1338 nrows=10; ncols = 10; 1339 refzone=50 1340 xllcorner=366000;yllcorner=6369500; 1341 number_of_timesteps = 10 1342 timestep=1 1343 eps=2.e-16 1344 spatial_thinning=1 1345 1346 points, vertices, boundary =rectangular(nrows-2,ncols-2, 1347 len1=cellsize*(ncols-1), 1348 len2=cellsize*(nrows-1), 1349 origin=(xllcorner,yllcorner)) 1350 1351 time=num.arange(0,10,1,num.float) 1352 eval_time=time[7]; 1353 1354 domain = Domain(points, vertices, boundary) 1355 midpoints = domain.get_centroid_coordinates() 1356 vertexpoints = domain.get_nodes() 1357 1358 # Flat surface with 1m of water 1359 domain.set_quantity('elevation', 0) 1360 domain.set_quantity('stage', 1.0) 1361 domain.set_quantity('friction', 0) 1362 1363 domain.time = 7*timestep # Take a time that is represented in file (not zero) 1364 1365 # Write wind stress file (ensure that domain.time is covered) 1366 1367 field_sts_filename = 'wind_field' 1368 self.write_wind_pressure_field_sts(field_sts_filename, 1369 nrows=nrows, 1370 ncols=ncols, 1371 cellsize=cellsize, 1372 origin=(xllcorner,yllcorner), 1373 refzone=50, 1374 timestep=timestep, 1375 number_of_timesteps=10, 1376 speed=spatial_linear_varying_speed, 1377 angle=spatial_linear_varying_angle, 1378 pressure=spatial_linear_varying_pressure) 1379 1380 1381 sts2sww_mesh(field_sts_filename,spatial_thinning=spatial_thinning, 1382 verbose=False) 1383 1384 # Setup barometric pressure 1385 FP = file_function(field_sts_filename+'.sww', domain, 1386 quantities=['barometric_pressure'], 1387 interpolation_points = vertexpoints) 1388 1389 P = Barometric_pressure(FP,use_coordinates=False) 1390 1391 1392 domain.forcing_terms = [] 1393 domain.forcing_terms.append(P) 1394 1395 domain.compute_forcing_terms() 1396 1397 N = len(domain) # number_of_triangles 1398 1399 xc = domain.get_centroid_coordinates() 1400 t = domain.time 1401 1402 x = xc[:,0] 1403 y = xc[:,1] 1404 p_vec = spatial_linear_varying_pressure(t,x,y) 1405 1406 h=1 #depth 1407 px=0.000025 #pressure gradient in x-direction 1408 py=0.0000125 #pressure gradient in y-direction 1409 for k in range(N): 1410 # Convert to radians 1411 p = p_vec[k] 1412 1413 assert num.allclose(domain.quantities['stage'].explicit_update[k],0) 1414 1415 assert num.allclose(domain.quantities['xmomentum'].\ 1416 explicit_update[k],h*px/rho_w) 1417 1418 assert num.allclose(domain.quantities['ymomentum'].\ 1419 explicit_update[k],h*py/rho_w) 1420 1421 os.remove(field_sts_filename+'.sts') 1422 os.remove(field_sts_filename+'.sww') 1423 1424 def test_constant_wind_stress_from_file_evolve(self): 1425 from anuga.config import rho_a, rho_w, eta_w 1426 from math import pi, cos, sin 1427 from anuga.config import time_format 1428 from anuga.file_conversion.sts2sww_mesh import sts2sww_mesh 1429 1430 cellsize = 25 1431 nrows=5; ncols = 6; 1432 refzone=50 1433 xllcorner=366000;yllcorner=6369500; 1434 number_of_timesteps = 27 1435 timestep=1 1436 eps=2e-16 1437 1438 points, vertices, boundary =rectangular(nrows-2,ncols-2, 1439 len1=cellsize*(ncols-1), 1440 len2=cellsize*(nrows-1), 1441 origin=(xllcorner,yllcorner)) 1442 1443 domain = Domain(points, vertices, boundary) 1444 midpoints = domain.get_centroid_coordinates() 1445 1446 # Flat surface with 1m of water 1447 domain.set_quantity('elevation', 0) 1448 domain.set_quantity('stage', 1.0) 1449 domain.set_quantity('friction', 0) 1450 1451 Br = Reflective_boundary(domain) 1452 domain.set_boundary({'top': Br, 'bottom' :Br, 'left': Br, 'right': Br}) 1453 1454 # Setup only one forcing term, constant wind stress 1455 s = 100 1456 phi = 135 1457 field_sts_filename = 'wind_field' 1458 self.write_wind_pressure_field_sts(field_sts_filename, 1459 nrows=nrows, 1460 ncols=ncols, 1461 cellsize=cellsize, 1462 origin=(xllcorner,yllcorner), 1463 refzone=50, 1464 timestep=timestep, 1465 number_of_timesteps=number_of_timesteps, 1466 speed=s, 1467 angle=phi) 1468 1469 sts2sww_mesh(field_sts_filename,spatial_thinning=1, 1470 verbose=False) 1471 1472 # Setup wind stress 1473 F = file_function(field_sts_filename+'.sww', domain, 1474 quantities=['wind_speed', 'wind_angle'], 1475 interpolation_points = midpoints) 1476 1477 W = Wind_stress(F,use_coordinates=False) 1478 domain.forcing_terms.append(W) 1479 1480 valuesUsingFunction=num.empty((3,number_of_timesteps+1,midpoints.shape[0]), 1481 num.float) 1482 i=0 1483 for t in domain.evolve(yieldstep=1, finaltime=number_of_timesteps*timestep): 1484 valuesUsingFunction[0,i]=domain.quantities['stage'].explicit_update 1485 valuesUsingFunction[1,i]=domain.quantities['xmomentum'].explicit_update 1486 valuesUsingFunction[2,i]=domain.quantities['ymomentum'].explicit_update 1487 i+=1 1488 1489 1490 domain_II = Domain(points, vertices, boundary) 1491 1492 # Flat surface with 1m of water 1493 domain_II.set_quantity('elevation', 0) 1494 domain_II.set_quantity('stage', 1.0) 1495 domain_II.set_quantity('friction', 0) 1496 1497 Br = Reflective_boundary(domain_II) 1498 domain_II.set_boundary({'top': Br, 'bottom' :Br, 'left': Br, 'right': Br}) 1499 1500 s = 100 1501 phi = 135 1502 domain_II.forcing_terms = [] 1503 domain_II.forcing_terms.append(Wind_stress(s, phi)) 1504 1505 i=0; 1506 for t in domain_II.evolve(yieldstep=1, 1507 finaltime=number_of_timesteps*timestep): 1508 assert num.allclose(valuesUsingFunction[0,i],domain_II.quantities['stage'].explicit_update), max(valuesUsingFunction[0,i]-domain_II.quantities['stage'].explicit_update) 1509 assert num.allclose(valuesUsingFunction[1,i],domain_II.quantities['xmomentum'].explicit_update) 1510 assert num.allclose(valuesUsingFunction[2,i],domain_II.quantities['ymomentum'].explicit_update) 1511 i+=1 1512 1513 os.remove(field_sts_filename+'.sts') 1514 os.remove(field_sts_filename+'.sww') 1515 1516 def test_temporally_varying_wind_stress_from_file_evolve(self): 1517 from anuga.config import rho_a, rho_w, eta_w 1518 from math import pi, cos, sin 1519 from anuga.config import time_format 1520 from anuga.file_conversion.sts2sww_mesh import sts2sww_mesh 1521 1522 cellsize = 25 1523 #nrows=20; ncols = 20; 1524 nrows=10; ncols = 10; 1525 refzone=50 1526 xllcorner=366000;yllcorner=6369500; 1527 number_of_timesteps = 28 1528 timestep=1. 1529 eps=2e-16 1530 1531 #points, vertices, boundary =rectangular(10,10, 1532 points, vertices, boundary =rectangular(5,5, 1533 len1=cellsize*(ncols-1), 1534 len2=cellsize*(nrows-1), 1535 origin=(xllcorner,yllcorner)) 1536 1537 domain = Domain(points, vertices, boundary) 1538 midpoints = domain.get_centroid_coordinates() 1539 1540 # Flat surface with 1m of water 1541 domain.set_quantity('elevation', 0) 1542 domain.set_quantity('stage', 1.0) 1543 domain.set_quantity('friction', 0) 1544 1545 Br = Reflective_boundary(domain) 1546 domain.set_boundary({'top': Br, 'bottom' :Br, 'left': Br, 'right': Br}) 1547 1548 # Setup only one forcing term, constant wind stress 1549 field_sts_filename = 'wind_field' 1550 self.write_wind_pressure_field_sts(field_sts_filename, 1551 nrows=nrows, 1552 ncols=ncols, 1553 cellsize=cellsize, 1554 origin=(xllcorner,yllcorner), 1555 refzone=50, 1556 timestep=timestep, 1557 number_of_timesteps=number_of_timesteps, 1558 speed=time_varying_speed, 1559 angle=time_varying_angle, 1560 pressure=time_varying_pressure) 1561 1562 sts2sww_mesh(field_sts_filename,spatial_thinning=1, 1563 verbose=False) 1564 1565 # Setup wind stress 1566 F = file_function(field_sts_filename+'.sww', domain, 1567 quantities=['wind_speed', 'wind_angle'], 1568 interpolation_points = midpoints) 1569 1570 #W = Wind_stress(F,use_coordinates=False) 1571 W = Wind_stress_fast(F,filename=field_sts_filename+'.sww', domain=domain) 1572 domain.forcing_terms.append(W) 1573 1574 valuesUsingFunction=num.empty((3,2*number_of_timesteps,midpoints.shape[0]), 1575 num.float) 1576 i=0 1577 for t in domain.evolve(yieldstep=timestep/2., finaltime=(number_of_timesteps-1)*timestep): 1578 valuesUsingFunction[0,i]=domain.quantities['stage'].explicit_update 1579 valuesUsingFunction[1,i]=domain.quantities['xmomentum'].explicit_update 1580 valuesUsingFunction[2,i]=domain.quantities['ymomentum'].explicit_update 1581 i+=1 1582 1583 1584 domain_II = Domain(points, vertices, boundary) 1585 1586 # Flat surface with 1m of water 1587 domain_II.set_quantity('elevation', 0) 1588 domain_II.set_quantity('stage', 1.0) 1589 domain_II.set_quantity('friction', 0) 1590 1591 Br = Reflective_boundary(domain_II) 1592 domain_II.set_boundary({'top': Br, 'bottom' :Br, 'left': Br, 'right': Br}) 1593 1594 domain_II.forcing_terms.append(Wind_stress(s=time_varying_speed, 1595 phi=time_varying_angle)) 1596 1597 i=0; 1598 for t in domain_II.evolve(yieldstep=timestep/2., 1599 finaltime=(number_of_timesteps-1)*timestep): 1600 assert num.allclose(valuesUsingFunction[0,i], 1601 domain_II.quantities['stage'].explicit_update, 1602 eps) 1603 #print i,valuesUsingFunction[1,i] 1604 assert num.allclose(valuesUsingFunction[1,i], 1605 domain_II.quantities['xmomentum'].explicit_update, 1606 eps),(valuesUsingFunction[1,i]- 1607 domain_II.quantities['xmomentum'].explicit_update) 1608 assert num.allclose(valuesUsingFunction[2,i], 1609 domain_II.quantities['ymomentum'].explicit_update, 1610 eps) 1611 #if i==1: assert-1==1 1612 i+=1 1613 1614 os.remove(field_sts_filename+'.sts') 1615 os.remove(field_sts_filename+'.sww') 1616 1617 def test_spatially_varying_wind_stress_from_file_evolve(self): 1618 from anuga.config import rho_a, rho_w, eta_w 1619 from math import pi, cos, sin 1620 from anuga.config import time_format 1621 from anuga.file_conversion.sts2sww_mesh import sts2sww_mesh 1622 1623 cellsize = 25 1624 nrows=20; ncols = 20; 1625 nrows=10; ncols = 10; 1626 refzone=50 1627 xllcorner=366000;yllcorner=6369500; 1628 number_of_timesteps = 28 1629 timestep=1. 1630 eps=2e-16 1631 1632 #points, vertices, boundary =rectangular(10,10, 1633 points, vertices, boundary =rectangular(5,5, 1634 len1=cellsize*(ncols-1), 1635 len2=cellsize*(nrows-1), 1636 origin=(xllcorner,yllcorner)) 1637 1638 domain = Domain(points, vertices, boundary) 1639 midpoints = domain.get_centroid_coordinates() 1640 1641 # Flat surface with 1m of water 1642 domain.set_quantity('elevation', 0) 1643 domain.set_quantity('stage', 1.0) 1644 domain.set_quantity('friction', 0) 1645 1646 Br = Reflective_boundary(domain) 1647 domain.set_boundary({'top': Br, 'bottom' :Br, 'left': Br, 'right': Br}) 1648 1649 # Setup only one forcing term, constant wind stress 1650 field_sts_filename = 'wind_field' 1651 self.write_wind_pressure_field_sts(field_sts_filename, 1652 nrows=nrows, 1653 ncols=ncols, 1654 cellsize=cellsize, 1655 origin=(xllcorner,yllcorner), 1656 refzone=50, 1657 timestep=timestep, 1658 number_of_timesteps=number_of_timesteps, 1659 speed=spatial_linear_varying_speed, 1660 angle=spatial_linear_varying_angle, 1661 pressure=spatial_linear_varying_pressure) 1662 1663 sts2sww_mesh(field_sts_filename,spatial_thinning=1, 1664 verbose=False) 1665 1666 # Setup wind stress 1667 F = file_function(field_sts_filename+'.sww', domain, 1668 quantities=['wind_speed', 'wind_angle'], 1669 interpolation_points = midpoints) 1670 1671 W = Wind_stress(F,use_coordinates=False) 1672 domain.forcing_terms.append(W) 1673 1674 valuesUsingFunction=num.empty((3,number_of_timesteps,midpoints.shape[0]), 1675 num.float) 1676 i=0 1677 for t in domain.evolve(yieldstep=timestep, finaltime=(number_of_timesteps-1)*timestep): 1678 valuesUsingFunction[0,i]=domain.quantities['stage'].explicit_update 1679 valuesUsingFunction[1,i]=domain.quantities['xmomentum'].explicit_update 1680 valuesUsingFunction[2,i]=domain.quantities['ymomentum'].explicit_update 1681 i+=1 1682 1683 1684 domain_II = Domain(points, vertices, boundary) 1685 1686 # Flat surface with 1m of water 1687 domain_II.set_quantity('elevation', 0) 1688 domain_II.set_quantity('stage', 1.0) 1689 domain_II.set_quantity('friction', 0) 1690 1691 Br = Reflective_boundary(domain_II) 1692 domain_II.set_boundary({'top': Br, 'bottom' :Br, 'left': Br, 'right': Br}) 1693 1694 domain_II.forcing_terms.append(Wind_stress(s=spatial_linear_varying_speed, 1695 phi=spatial_linear_varying_angle)) 1696 1697 i=0; 1698 for t in domain_II.evolve(yieldstep=timestep, 1699 finaltime=(number_of_timesteps-1)*timestep): 1700 #print valuesUsingFunction[1,i],domain_II.quantities['xmomentum'].explicit_update 1701 assert num.allclose(valuesUsingFunction[0,i], 1702 domain_II.quantities['stage'].explicit_update, 1703 eps) 1704 assert num.allclose(valuesUsingFunction[1,i], 1705 domain_II.quantities['xmomentum'].explicit_update, 1706 eps) 1707 assert num.allclose(valuesUsingFunction[2,i], 1708 domain_II.quantities['ymomentum'].explicit_update, 1709 eps) 1710 i+=1 1711 1712 os.remove(field_sts_filename+'.sts') 1713 os.remove(field_sts_filename+'.sww') 1714 1715 def test_temporally_varying_pressure_stress_from_file_evolve(self): 1716 from anuga.config import rho_a, rho_w, eta_w 1717 from math import pi, cos, sin 1718 from anuga.config import time_format 1719 from anuga.file_conversion.sts2sww_mesh import sts2sww_mesh 1720 1721 cellsize = 25 1722 #nrows=20; ncols = 20; 1723 nrows=10; ncols = 10; 1724 refzone=50 1725 xllcorner=366000;yllcorner=6369500; 1726 number_of_timesteps = 28 1727 timestep=10. 1728 eps=2e-16 1729 1730 #print "Building mesh" 1731 #points, vertices, boundary =rectangular(10,10, 1732 points, vertices, boundary =rectangular(5,5, 1733 len1=cellsize*(ncols-1), 1734 len2=cellsize*(nrows-1), 1735 origin=(xllcorner,yllcorner)) 1736 1737 domain = Domain(points, vertices, boundary) 1738 vertexpoints = domain.get_nodes() 1739 1740 # Flat surface with 1m of water 1741 domain.set_quantity('elevation', 0) 1742 domain.set_quantity('stage', 1.0) 1743 domain.set_quantity('friction', 0) 1744 1745 Br = Reflective_boundary(domain) 1746 domain.set_boundary({'top': Br, 'bottom' :Br, 'left': Br, 'right': Br}) 1747 1748 # Setup only one forcing term, constant wind stress 1749 field_sts_filename = 'wind_field' 1750 #print 'Writing pressure field sts file' 1751 self.write_wind_pressure_field_sts(field_sts_filename, 1752 nrows=nrows, 1753 ncols=ncols, 1754 cellsize=cellsize, 1755 origin=(xllcorner,yllcorner), 1756 refzone=50, 1757 timestep=timestep, 1758 number_of_timesteps=number_of_timesteps, 1759 speed=time_varying_speed, 1760 angle=time_varying_angle, 1761 pressure=time_varying_pressure) 1762 1763 #print "converting sts to sww" 1764 sts2sww_mesh(field_sts_filename,spatial_thinning=1, 1765 verbose=False) 1766 1767 #print 'initialising file_function' 1768 # Setup wind stress 1769 F = file_function(field_sts_filename+'.sww', domain, 1770 quantities=['barometric_pressure'], 1771 interpolation_points = vertexpoints) 1772 1773 #P = Barometric_pressure(F,use_coordinates=False) 1774 #print 'initialising pressure forcing term' 1775 P = Barometric_pressure_fast(p=F,filename=field_sts_filename+'.sww',domain=domain) 1776 domain.forcing_terms.append(P) 1777 1778 valuesUsingFunction=num.empty((3,2*number_of_timesteps,len(domain)), 1779 num.float) 1780 i=0 1781 import time as timer 1782 t0=timer.time() 1783 for t in domain.evolve(yieldstep=timestep/2., finaltime=(number_of_timesteps-1)*timestep): 1784 valuesUsingFunction[0,i]=domain.quantities['stage'].explicit_update 1785 valuesUsingFunction[1,i]=domain.quantities['xmomentum'].explicit_update 1786 valuesUsingFunction[2,i]=domain.quantities['ymomentum'].explicit_update 1787 i+=1 1788 #domain.write_time() 1789 t1=timer.time() 1790 #print "That took %fs seconds" %(t1-t0) 1791 1792 1793 domain_II = Domain(points, vertices, boundary) 1794 1795 # Flat surface with 1m of water 1796 domain_II.set_quantity('elevation', 0) 1797 domain_II.set_quantity('stage', 1.0) 1798 domain_II.set_quantity('friction', 0) 1799 1800 Br = Reflective_boundary(domain_II) 1801 domain_II.set_boundary({'top': Br, 'bottom' :Br, 'left': Br, 'right': Br}) 1802 1803 domain_II.forcing_terms.append(Barometric_pressure(p=time_varying_pressure)) 1804 1805 i=0; 1806 for t in domain_II.evolve(yieldstep=timestep/2., 1807 finaltime=(number_of_timesteps-1)*timestep): 1808 assert num.allclose(valuesUsingFunction[0,i], 1809 domain_II.quantities['stage'].explicit_update, 1810 eps) 1811 assert num.allclose(valuesUsingFunction[1,i], 1812 domain_II.quantities['xmomentum'].explicit_update, 1813 eps) 1814 assert num.allclose(valuesUsingFunction[2,i], 1815 domain_II.quantities['ymomentum'].explicit_update, 1816 eps) 1817 i+=1 1818 1819 os.remove(field_sts_filename+'.sts') 1820 os.remove(field_sts_filename+'.sww') 1821 1822 def test_spatially_varying_pressure_stress_from_file_evolve(self): 1823 from anuga.config import rho_a, rho_w, eta_w 1824 from math import pi, cos, sin 1825 from anuga.config import time_format 1826 from anuga.file_conversion.sts2sww_mesh import sts2sww_mesh 1827 1828 cellsize = 25 1829 #nrows=20; ncols = 20; 1830 nrows=10; ncols = 10; 1831 refzone=50 1832 xllcorner=366000;yllcorner=6369500; 1833 number_of_timesteps = 28 1834 timestep=1. 1835 eps=2e-16 1836 1837 #points, vertices, boundary =rectangular(10,10, 1838 points, vertices, boundary =rectangular(5,5, 1839 len1=cellsize*(ncols-1), 1840 len2=cellsize*(nrows-1), 1841 origin=(xllcorner,yllcorner)) 1842 1843 domain = Domain(points, vertices, boundary) 1844 vertexpoints = domain.get_nodes() 1845 1846 # Flat surface with 1m of water 1847 domain.set_quantity('elevation', 0) 1848 domain.set_quantity('stage', 1.0) 1849 domain.set_quantity('friction', 0) 1850 1851 Br = Reflective_boundary(domain) 1852 domain.set_boundary({'top': Br, 'bottom' :Br, 'left': Br, 'right': Br}) 1853 1854 # Setup only one forcing term, constant wind stress 1855 field_sts_filename = 'wind_field' 1856 self.write_wind_pressure_field_sts(field_sts_filename, 1857 nrows=nrows, 1858 ncols=ncols, 1859 cellsize=cellsize, 1860 origin=(xllcorner,yllcorner), 1861 refzone=50, 1862 timestep=timestep, 1863 number_of_timesteps=number_of_timesteps, 1864 speed=spatial_linear_varying_speed, 1865 angle=spatial_linear_varying_angle, 1866 pressure=spatial_linear_varying_pressure) 1867 1868 sts2sww_mesh(field_sts_filename,spatial_thinning=1, 1869 verbose=False) 1870 1871 # Setup wind stress 1872 F = file_function(field_sts_filename+'.sww', domain, 1873 quantities=['barometric_pressure'], 1874 interpolation_points = vertexpoints) 1875 1876 P = Barometric_pressure(F,use_coordinates=False) 1877 domain.forcing_terms.append(P) 1878 1879 valuesUsingFunction=num.empty((3,number_of_timesteps,len(domain)), 1880 num.float) 1881 i=0 1882 for t in domain.evolve(yieldstep=timestep, finaltime=(number_of_timesteps-1)*timestep): 1883 valuesUsingFunction[0,i]=domain.quantities['stage'].explicit_update 1884 valuesUsingFunction[1,i]=domain.quantities['xmomentum'].explicit_update 1885 valuesUsingFunction[2,i]=domain.quantities['ymomentum'].explicit_update 1886 i+=1 1887 1888 1889 domain_II = Domain(points, vertices, boundary) 1890 1891 # Flat surface with 1m of water 1892 domain_II.set_quantity('elevation', 0) 1893 domain_II.set_quantity('stage', 1.0) 1894 domain_II.set_quantity('friction', 0) 1895 1896 Br = Reflective_boundary(domain_II) 1897 domain_II.set_boundary({'top': Br, 'bottom' :Br, 'left': Br, 'right': Br}) 1898 1899 domain_II.forcing_terms.append(Barometric_pressure(p=spatial_linear_varying_pressure)) 1900 1901 i=0; 1902 for t in domain_II.evolve(yieldstep=timestep, 1903 finaltime=(number_of_timesteps-1)*timestep): 1904 1905 assert num.allclose(valuesUsingFunction[0,i], 1906 domain_II.quantities['stage'].explicit_update, 1907 eps) 1908 assert num.allclose(valuesUsingFunction[1,i], 1909 domain_II.quantities['xmomentum'].explicit_update, 1910 eps) 1911 assert num.allclose(valuesUsingFunction[2,i], 1912 domain_II.quantities['ymomentum'].explicit_update, 1913 eps) 1914 i+=1 1915 1916 os.remove(field_sts_filename+'.sts') 1917 os.remove(field_sts_filename+'.sww') 1918 1919 def test_gravity(self): 1920 #Assuming no friction 1921 1922 from anuga.config import g 1923 1924 a = [0.0, 0.0] 1925 b = [0.0, 2.0] 1926 c = [2.0, 0.0] 1927 d = [0.0, 4.0] 1928 e = [2.0, 2.0] 1929 f = [4.0, 0.0] 1930 1931 points = [a, b, c, d, e, f] 1932 # bac, bce, ecf, dbe 1933 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 1934 1935 domain = Domain(points, vertices) 1936 B = Reflective_boundary(domain) 1937 domain.set_boundary( {'exterior': B}) 1938 1939 #Set up for a gradient of (3,0) at mid triangle (bce) 1940 def slope(x, y): 1941 return 3*x 1942 1943 h = 0.1 1944 def stage(x, y): 1945 return slope(x, y) + h 1946 1947 domain.set_quantity('elevation', slope) 1948 domain.set_quantity('stage', stage) 1949 1950 for name in domain.conserved_quantities: 1951 assert num.allclose(domain.quantities[name].explicit_update, 0) 1952 assert num.allclose(domain.quantities[name].semi_implicit_update, 0) 1953 1954 domain.update_boundary() 1955 domain.compute_fluxes() 1956 1957 assert num.allclose(domain.quantities['stage'].explicit_update, 0) 1958 assert num.allclose(domain.quantities['xmomentum'].explicit_update, 1959 -g*h*3) 1960 assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0) 1961 1962 1963 1964 def test_manning_friction_old(self): 1965 from anuga.config import g 1966 1967 a = [0.0, 0.0] 1968 b = [0.0, 2.0] 1969 c = [2.0, 0.0] 1970 d = [0.0, 4.0] 1971 e = [2.0, 2.0] 1972 f = [4.0, 0.0] 1973 1974 points = [a, b, c, d, e, f] 1975 # bac, bce, ecf, dbe 1976 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 1977 1978 domain = Domain(points, vertices) 1979 1980 domain.set_sloped_mannings_function(False) 1981 1982 B = Reflective_boundary(domain) 1983 domain.set_boundary( {'exterior': B}) 1984 1985 #Set up for a gradient of (3,0) at mid triangle (bce) 1986 def slope(x, y): 1987 return 3*x 1988 1989 h = 0.1 1990 def stage(x, y): 1991 return slope(x, y) + h 1992 1993 eta = 0.07 1994 domain.set_quantity('elevation', slope) 1995 domain.set_quantity('stage', stage) 1996 domain.set_quantity('friction', eta) 1997 1998 for name in domain.conserved_quantities: 1999 assert num.allclose(domain.quantities[name].explicit_update, 0) 2000 assert num.allclose(domain.quantities[name].semi_implicit_update, 0) 2001 2002 domain.compute_forcing_terms() 2003 2004 assert num.allclose(domain.quantities['stage'].explicit_update, 0) 2005 assert num.allclose(domain.quantities['xmomentum'].explicit_update, 2006 0) 2007 assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0) 2008 2009 assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0) 2010 assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update, 2011 0) 2012 assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update, 2013 0) 2014 2015 #Create some momentum for friction to work with 2016 domain.set_quantity('xmomentum', 1) 2017 S = -g*eta**2 / h**(7.0/3) 2018 2019 domain.compute_forcing_terms() 2020 assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0) 2021 assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update, 2022 S) 2023 assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update, 2024 0) 2025 2026 #A more complex example 2027 domain.quantities['stage'].semi_implicit_update[:] = 0.0 2028 domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0 2029 domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0 2030 2031 domain.set_quantity('xmomentum', 3) 2032 domain.set_quantity('ymomentum', 4) 2033 # sqrt(3^2 +4^2) = 5 2034 2035 S = -g*eta**2 / h**(7.0/3) * 5 2036 2037 domain.compute_forcing_terms() 2038 2039 assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0) 2040 assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,3*S) 2041 assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,4*S) 2042 2043 2044 def test_manning_friction_new(self): 2045 from anuga.config import g 2046 import math 2047 2048 a = [0.0, 0.0] 2049 b = [0.0, 2.0] 2050 c = [2.0, 0.0] 2051 d = [0.0, 4.0] 2052 e = [2.0, 2.0] 2053 f = [4.0, 0.0] 2054 2055 points = [a, b, c, d, e, f] 2056 # bac, bce, ecf, dbe 2057 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2058 2059 domain = Domain(points, vertices) 2060 B = Reflective_boundary(domain) 2061 domain.set_boundary( {'exterior': B}) 2062 2063 # Use the new function which takes into account the extra 2064 # wetted area due to slope of bed 2065 domain.set_sloped_mannings_function(True) 2066 2067 #Set up for a gradient of (3,0) at mid triangle (bce) 2068 def slope(x, y): 2069 return 3*x 2070 2071 h = 0.1 2072 def stage(x, y): 2073 return slope(x, y) + h 2074 2075 eta = 0.07 2076 domain.set_quantity('elevation', slope) 2077 domain.set_quantity('stage', stage) 2078 domain.set_quantity('friction', eta) 2079 2080 for name in domain.conserved_quantities: 2081 assert num.allclose(domain.quantities[name].explicit_update, 0) 2082 assert num.allclose(domain.quantities[name].semi_implicit_update, 0) 2083 2084 domain.compute_forcing_terms() 2085 2086 assert num.allclose(domain.quantities['stage'].explicit_update, 0) 2087 assert num.allclose(domain.quantities['xmomentum'].explicit_update, 2088 0) 2089 assert num.allclose(domain.quantities['ymomentum'].explicit_update, 0) 2090 2091 assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0) 2092 assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update, 2093 0) 2094 assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update, 2095 0) 2096 2097 #Create some momentum for friction to work with 2098 domain.set_quantity('xmomentum', 1) 2099 S = -g*eta**2 / h**(7.0/3) * math.sqrt(10) 2100 2101 domain.compute_forcing_terms() 2102 assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0) 2103 assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update, 2104 S) 2105 assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update, 2106 0) 2107 2108 #A more complex example 2109 domain.quantities['stage'].semi_implicit_update[:] = 0.0 2110 domain.quantities['xmomentum'].semi_implicit_update[:] = 0.0 2111 domain.quantities['ymomentum'].semi_implicit_update[:] = 0.0 2112 2113 domain.set_quantity('xmomentum', 3) 2114 domain.set_quantity('ymomentum', 4) 2115 2116 S = -g*eta**2 *5 / h**(7.0/3) * math.sqrt(10.0) 2117 2118 domain.compute_forcing_terms() 2119 2120 #print 'S', S 2121 #print domain.quantities['xmomentum'].semi_implicit_update 2122 #print domain.quantities['ymomentum'].semi_implicit_update 2123 2124 assert num.allclose(domain.quantities['stage'].semi_implicit_update, 0) 2125 assert num.allclose(domain.quantities['xmomentum'].semi_implicit_update,3*S) 2126 assert num.allclose(domain.quantities['ymomentum'].semi_implicit_update,4*S) 2127 2128 2129 2130 2131 2132 def test_inflow_using_circle(self): 2133 from math import pi, cos, sin 2134 2135 a = [0.0, 0.0] 2136 b = [0.0, 2.0] 2137 c = [2.0, 0.0] 2138 d = [0.0, 4.0] 2139 e = [2.0, 2.0] 2140 f = [4.0, 0.0] 2141 2142 points = [a, b, c, d, e, f] 2143 # bac, bce, ecf, dbe 2144 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2145 2146 domain = Domain(points, vertices) 2147 2148 # Flat surface with 1m of water 2149 domain.set_quantity('elevation', 0) 2150 domain.set_quantity('stage', 1.0) 2151 domain.set_quantity('friction', 0) 2152 2153 Br = Reflective_boundary(domain) 2154 domain.set_boundary({'exterior': Br}) 2155 2156 # Setup only one forcing term, constant inflow of 2 m^3/s 2157 # on a circle affecting triangles #0 and #1 (bac and bce) 2158 domain.forcing_terms = [] 2159 2160 I = Inflow(domain, rate=2.0, center=(1,1), radius=1) 2161 domain.forcing_terms.append(I) 2162 domain.compute_forcing_terms() 2163 2164 2165 A = I.exchange_area 2166 assert num.allclose(A, 4) # Two triangles 2167 2168 assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A) 2169 assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A) 2170 assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) 2171 2172 2173 def test_inflow_using_circle_function(self): 2174 from math import pi, cos, sin 2175 2176 a = [0.0, 0.0] 2177 b = [0.0, 2.0] 2178 c = [2.0, 0.0] 2179 d = [0.0, 4.0] 2180 e = [2.0, 2.0] 2181 f = [4.0, 0.0] 2182 2183 points = [a, b, c, d, e, f] 2184 # bac, bce, ecf, dbe 2185 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2186 2187 domain = Domain(points, vertices) 2188 2189 # Flat surface with 1m of water 2190 domain.set_quantity('elevation', 0) 2191 domain.set_quantity('stage', 1.0) 2192 domain.set_quantity('friction', 0) 2193 2194 Br = Reflective_boundary(domain) 2195 domain.set_boundary({'exterior': Br}) 2196 2197 # Setup only one forcing term, time dependent inflow of 2 m^3/s 2198 # on a circle affecting triangles #0 and #1 (bac and bce) 2199 domain.forcing_terms = [] 2200 I = Inflow(domain, rate=lambda t: 2., center=(1,1), radius=1) 2201 domain.forcing_terms.append(I) 2202 2203 domain.compute_forcing_terms() 2204 2205 A = I.exchange_area 2206 assert num.allclose(A, 4) # Two triangles 2207 2208 assert num.allclose(domain.quantities['stage'].explicit_update[1], 2.0/A) 2209 assert num.allclose(domain.quantities['stage'].explicit_update[0], 2.0/A) 2210 assert num.allclose(domain.quantities['stage'].explicit_update[2:], 0) 2211 2212 2213 2214 2215 def test_inflow_catch_too_few_triangles(self): 2216 """ 2217 Test that exception is thrown if no triangles are covered 2218 by the inflow area 2219 """ 2220 2221 from math import pi, cos, sin 2222 2223 a = [0.0, 0.0] 2224 b = [0.0, 2.0] 2225 c = [2.0, 0.0] 2226 d = [0.0, 4.0] 2227 e = [2.0, 2.0] 2228 f = [4.0, 0.0] 2229 2230 points = [a, b, c, d, e, f] 2231 # bac, bce, ecf, dbe 2232 vertices = [[1,0,2], [1,2,4], [4,2,5], [3,1,4]] 2233 2234 domain = Domain(points, vertices) 2235 2236 # Flat surface with 1m of water 2237 domain.set_quantity('elevation', 0) 2238 domain.set_quantity('stage', 1.0) 2239 domain.set_quantity('friction', 0) 2240 2241 Br = Reflective_boundary(domain) 2242 domain.set_boundary({'exterior': Br}) 2243 2244 # Setup only one forcing term, constant inflow of 2 m^3/s 2245 # on a circle affecting triangles #0 and #1 (bac and bce) 2246 try: 2247 Inflow(domain, rate=2.0, center=(1,1.1), radius=0.01) 2248 except: 2249 pass 2250 else: 2251 msg = 'Should have raised exception' 2252 raise Exception, msg 2253 340 return 3.0 * t + 7.0 341 342 default_rate = lambda t: 3*t + 7 343 344 345 operator = Rate_operator(domain, rate=main_rate, factor=factor, \ 346 indices=indices, default_rate = default_rate) 347 348 349 # Apply Operator 350 domain.timestep = 2.0 351 operator() 352 353 t = operator.get_time() 354 d = operator.get_timestep()*main_rate(t)*factor + 1 355 stage_ex = [ d, d, 1., d] 356 357 if verbose: 358 print domain.quantities['elevation'].centroid_values 359 print domain.quantities['stage'].centroid_values 360 print domain.quantities['xmomentum'].centroid_values 361 print domain.quantities['ymomentum'].centroid_values 362 363 assert num.allclose(domain.quantities['stage'].centroid_values, stage_ex) 364 assert num.allclose(domain.quantities['xmomentum'].centroid_values, 0.0) 365 assert num.allclose(domain.quantities['ymomentum'].centroid_values, 0.0) 366 367 domain.set_starttime(30.0) 368 domain.timestep = 1.0 369 operator() 370 371 t = operator.get_time() 372 d = operator.get_timestep()*default_rate(t)*factor + d 373 stage_ex = [ d, d, 1., d] 374 375 if verbose: 376 print domain.quantities['elevation'].centroid_values 377 print domain.quantities['stage'].centroid_values 378 print domain.quantities['xmomentum'].centroid_values 379 print domain.quantities['ymomentum'].centroid_values 380 381 assert num.allclose(domain.quantities['stage'].centroid_values, stage_ex) 382 assert num.allclose(domain.quantities['xmomentum'].centroid_values, 0.0) 383 assert num.allclose(domain.quantities['ymomentum'].centroid_values, 0.0) 2254 384 2255 385 2256 386 if __name__ == "__main__": 2257 suite = unittest.makeSuite(Test_ Forcing, 'test')387 suite = unittest.makeSuite(Test_rate_operators, 'test') 2258 388 runner = unittest.TextTestRunner(verbosity=1) 2259 389 runner.run(suite) -
trunk/anuga_core/source/anuga/structures/inlet_operator.py
r8361 r8488 46 46 t = self.domain.get_time() 47 47 48 49 50 # Need to run global command on all processors 51 current_volume = self.inlet.get_total_water_volume() 52 48 53 Q1 = self.update_Q(t) 49 54 Q2 = self.update_Q(t + timestep) … … 52 57 volume = Q*timestep 53 58 54 assert volume >= 0.0, 'Q < 0: Water to be removed from an inlet!'59 assert current_volume + volume >= 0.0, 'Requesting too much water to be removed from an inlet!' 55 60 56 61 # store last discharge
Note: See TracChangeset
for help on using the changeset viewer.