source: trunk/anuga_core/source/anuga/shallow_water/test_data_manager.py @ 7866

Last change on this file since 7866 was 7866, checked in by hudson, 14 years ago

More swb tests passing. Cleaned up some pylint errors.

File size: 30.1 KB
Line 
1#!/usr/bin/env python
2#
3
4"""
5Set of tests for the now-defunct data manager module.
6
7These could be split up into their correct modules.
8"""
9
10import unittest
11import copy
12import numpy as num
13import sys
14               
15import tempfile
16import os
17import shutil
18from struct import pack, unpack
19
20from Scientific.IO.NetCDF import NetCDFFile
21
22
23from anuga.anuga_exceptions import ANUGAError
24from anuga.file.sww import SWW_file
25from anuga.coordinate_transforms.geo_reference import Geo_reference                       
26from anuga.coordinate_transforms.redfearn import degminsec2decimal_degrees
27from anuga.abstract_2d_finite_volumes.util import file_function
28from anuga.utilities.system_tools import get_pathname_from_package, \
29                                            get_revision_number
30from anuga.utilities.file_utils import get_all_swwfiles
31from anuga.utilities.file_utils import del_dir
32from anuga.utilities.numerical_tools import ensure_numeric, mean
33from anuga.config import netcdf_mode_r, netcdf_mode_w, netcdf_mode_a
34from anuga.config import netcdf_float, epsilon, g
35from anuga.pmesh.mesh_interface import create_mesh_from_regions
36from anuga.file_conversion.sww2dem import sww2dem_batch
37from anuga.file.csv_file import load_csv_as_dict, load_csv_as_array, \
38                                load_csv_as_building_polygons, \
39                                load_csv_as_polygons
40from anuga.file.sts import create_sts_boundary
41from anuga.file.pts import load_pts_as_polygon
42from anuga.file.sww import Write_sww
43
44
45# import all the boundaries - some are generic, some are shallow water
46from boundaries import Reflective_boundary, \
47            Field_boundary, Transmissive_momentum_set_stage_boundary, \
48            Transmissive_stage_zero_momentum_boundary
49from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\
50     import Transmissive_boundary, Dirichlet_boundary, \
51            Time_boundary, File_boundary, AWI_boundary
52
53# This is needed to run the tests of local functions
54from anuga.file_conversion.urs2sts import urs2sts
55from anuga.coordinate_transforms.redfearn import redfearn
56from anuga.coordinate_transforms.geo_reference import Geo_reference, \
57     DEFAULT_ZONE
58from anuga.geospatial_data.geospatial_data import Geospatial_data
59
60from shallow_water_domain import Domain
61
62# use helper methods from other unit test
63from anuga.file.test_mux import Test_Mux
64
65
66class Test_Data_Manager(Test_Mux):
67    # Class variable
68    verbose = False
69
70    def set_verbose(self):
71        Test_Data_Manager.verbose = True
72       
73    def setUp(self):
74        import time
75        from mesh_factory import rectangular
76       
77        self.verbose = Test_Data_Manager.verbose
78        # Create basic mesh
79        points, vertices, boundary = rectangular(2, 2)
80
81        # Create shallow water domain
82        domain = Domain(points, vertices, boundary)
83        domain.default_order = 2
84
85        # Set some field values
86        domain.set_quantity('elevation', lambda x,y: -x)
87        domain.set_quantity('friction', 0.03)
88
89
90        ######################
91        # Boundary conditions
92        B = Transmissive_boundary(domain)
93        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
94
95
96        ######################
97        #Initial condition - with jumps
98        bed = domain.quantities['elevation'].vertex_values
99        stage = num.zeros(bed.shape, num.float)
100
101        h = 0.3
102        for i in range(stage.shape[0]):
103            if i % 2 == 0:
104                stage[i,:] = bed[i,:] + h
105            else:
106                stage[i,:] = bed[i,:]
107
108        domain.set_quantity('stage', stage)
109
110
111        domain.distribute_to_vertices_and_edges()               
112        self.initial_stage = copy.copy(domain.quantities['stage'].vertex_values)
113
114
115        self.domain = domain
116
117        C = domain.get_vertex_coordinates()
118        self.X = C[:,0:6:2].copy()
119        self.Y = C[:,1:6:2].copy()
120
121        self.F = bed
122
123        #Write A testfile (not realistic. Values aren't realistic)
124        self.test_MOST_file = 'most_small'
125
126        longitudes = [150.66667, 150.83334, 151., 151.16667]
127        latitudes = [-34.5, -34.33333, -34.16667, -34]
128
129        long_name = 'LON'
130        lat_name = 'LAT'
131
132        nx = 4
133        ny = 4
134        six = 6
135
136
137        for ext in ['_ha.nc', '_ua.nc', '_va.nc', '_e.nc']:
138            fid = NetCDFFile(self.test_MOST_file + ext, netcdf_mode_w)
139
140            fid.createDimension(long_name,nx)
141            fid.createVariable(long_name,netcdf_float,(long_name,))
142            fid.variables[long_name].point_spacing='uneven'
143            fid.variables[long_name].units='degrees_east'
144            fid.variables[long_name].assignValue(longitudes)
145
146            fid.createDimension(lat_name,ny)
147            fid.createVariable(lat_name,netcdf_float,(lat_name,))
148            fid.variables[lat_name].point_spacing='uneven'
149            fid.variables[lat_name].units='degrees_north'
150            fid.variables[lat_name].assignValue(latitudes)
151
152            fid.createDimension('TIME',six)
153            fid.createVariable('TIME',netcdf_float,('TIME',))
154            fid.variables['TIME'].point_spacing='uneven'
155            fid.variables['TIME'].units='seconds'
156            fid.variables['TIME'].assignValue([0.0, 0.1, 0.6, 1.1, 1.6, 2.1])
157
158
159            name = ext[1:3].upper()
160            if name == 'E.': name = 'ELEVATION'
161            fid.createVariable(name,netcdf_float,('TIME', lat_name, long_name))
162            fid.variables[name].units='CENTIMETERS'
163            fid.variables[name].missing_value=-1.e+034
164
165            fid.variables[name].assignValue([[[0.3400644, 0, -46.63519, -6.50198],
166                                              [-0.1214216, 0, 0, 0],
167                                              [0, 0, 0, 0],
168                                              [0, 0, 0, 0]],
169                                             [[0.3400644, 2.291054e-005, -23.33335, -6.50198],
170                                              [-0.1213987, 4.581959e-005, -1.594838e-007, 1.421085e-012],
171                                              [2.291054e-005, 4.582107e-005, 4.581715e-005, 1.854517e-009],
172                                              [0, 2.291054e-005, 2.291054e-005, 0]],
173                                             [[0.3400644, 0.0001374632, -23.31503, -6.50198],
174                                              [-0.1212842, 0.0002756907, 0.006325484, 1.380492e-006],
175                                              [0.0001374632, 0.0002749264, 0.0002742863, 6.665601e-008],
176                                              [0, 0.0001374632, 0.0001374632, 0]],
177                                             [[0.3400644, 0.0002520159, -23.29672, -6.50198],
178                                              [-0.1211696, 0.0005075303, 0.01264618, 6.208276e-006],
179                                              [0.0002520159, 0.0005040318, 0.0005027961, 2.23865e-007],
180                                              [0, 0.0002520159, 0.0002520159, 0]],
181                                             [[0.3400644, 0.0003665686, -23.27842, -6.50198],
182                                              [-0.1210551, 0.0007413362, 0.01896192, 1.447638e-005],
183                                              [0.0003665686, 0.0007331371, 0.0007313463, 4.734126e-007],
184                                              [0, 0.0003665686, 0.0003665686, 0]],
185                                             [[0.3400644, 0.0004811212, -23.26012, -6.50198],
186                                              [-0.1209405, 0.0009771062, 0.02527271, 2.617787e-005],
187                                              [0.0004811212, 0.0009622425, 0.0009599366, 8.152277e-007],
188                                              [0, 0.0004811212, 0.0004811212, 0]]])
189
190
191            fid.close()
192
193
194
195
196    def tearDown(self):
197        import os
198        for ext in ['_ha.nc', '_ua.nc', '_va.nc', '_e.nc']:
199            #print 'Trying to remove', self.test_MOST_file + ext
200            os.remove(self.test_MOST_file + ext)
201
202    def test_sww_constant(self):
203        """Test that constant sww information can be written correctly
204        (non smooth)
205        """
206        self.domain.set_name('datatest' + str(id(self)))
207        self.domain.format = 'sww' #Remove??
208        self.domain.smooth = False
209       
210        sww = SWW_file(self.domain)
211        sww.store_connectivity()
212
213        fid = NetCDFFile(sww.filename, netcdf_mode_r)  # Open existing file for append
214
215        # Get the variables
216        x = fid.variables['x']
217        y = fid.variables['y']
218        z = fid.variables['elevation']
219        V = fid.variables['volumes']
220
221        assert num.allclose (x[:], self.X.flatten())
222        assert num.allclose (y[:], self.Y.flatten())
223        assert num.allclose (z[:], self.F.flatten())
224
225        P = len(self.domain)
226        for k in range(P):
227            assert V[k, 0] == 3*k
228            assert V[k, 1] == 3*k+1
229            assert V[k, 2] == 3*k+2
230
231        fid.close()
232        os.remove(sww.filename)
233
234    def test_sww_header(self):
235        """Test that constant sww information can be written correctly
236        (non smooth)
237        """
238        self.domain.set_name('datatest' + str(id(self)))
239        self.domain.format = 'sww' #Remove??
240        self.domain.smooth = False
241
242        sww = SWW_file(self.domain)
243        sww.store_connectivity()
244
245        # Check contents
246        # Get NetCDF
247        fid = NetCDFFile(sww.filename, netcdf_mode_r)  # Open existing file for append
248
249        # Get the variables
250        sww_revision = fid.revision_number
251        try:
252            revision_number = get_revision_number()
253        except:
254            revision_number = None
255           
256        assert str(revision_number) == sww_revision
257        fid.close()
258
259        #print "sww.filename", sww.filename
260        os.remove(sww.filename)
261
262    def test_sww_range(self):
263        """Test that constant sww information can be written correctly
264        Use non-smooth to be able to compare to quantity values.
265        """
266
267        self.domain.set_name('datatest' + str(id(self)))
268        self.domain.format = 'sww'
269        self.domain.set_store_vertices_uniquely(True)
270       
271        sww = SWW_file(self.domain)       
272
273        dqs = self.domain.get_quantity('stage')
274        dqx = self.domain.get_quantity('xmomentum')
275        dqy = self.domain.get_quantity('ymomentum')       
276        xmom_min = ymom_min = stage_min = sys.maxint
277        xmom_max = ymom_max = stage_max = -stage_min       
278        for t in self.domain.evolve(yieldstep = 1, finaltime = 1):
279            wmax = max(dqs.get_values().flatten())
280            if wmax > stage_max: stage_max = wmax
281            wmin = min(dqs.get_values().flatten())
282            if wmin < stage_min: stage_min = wmin           
283           
284            uhmax = max(dqx.get_values().flatten())
285            if uhmax > xmom_max: xmom_max = uhmax
286            uhmin = min(dqx.get_values().flatten())
287            if uhmin < xmom_min: xmom_min = uhmin                       
288           
289            vhmax = max(dqy.get_values().flatten())
290            if vhmax > ymom_max: ymom_max = vhmax
291            vhmin = min(dqy.get_values().flatten())
292            if vhmin < ymom_min: ymom_min = vhmin                                   
293           
294           
295           
296        # Get NetCDF
297        fid = NetCDFFile(sww.filename, netcdf_mode_r) # Open existing file for append
298
299        # Get the variables
300        range = fid.variables['stage_range'][:]
301        assert num.allclose(range,[stage_min, stage_max])
302
303        range = fid.variables['xmomentum_range'][:]
304        #print range
305        assert num.allclose(range, [xmom_min, xmom_max])
306       
307        range = fid.variables['ymomentum_range'][:]
308        #print range
309        assert num.allclose(range, [ymom_min, ymom_max])       
310
311
312       
313        fid.close()
314        os.remove(sww.filename)
315
316    def test_sww_extrema(self):
317        """Test that extrema of quantities can be retrieved at every vertex
318        Extrema are updated at every *internal* timestep
319        """
320
321        domain = self.domain
322       
323        domain.set_name('extrema_test' + str(id(self)))
324        domain.format = 'sww'
325        domain.smooth = True
326
327        assert domain.quantities_to_be_monitored is None
328        assert domain.monitor_polygon is None
329        assert domain.monitor_time_interval is None       
330       
331        domain.set_quantities_to_be_monitored(['xmomentum',
332                                               'ymomentum',
333                                               'stage-elevation'])
334
335        assert domain.monitor_polygon is None
336        assert domain.monitor_time_interval is None
337
338
339        domain.set_quantities_to_be_monitored(['xmomentum',
340                                               'ymomentum',
341                                               'stage-elevation'],
342                                              polygon=domain.get_boundary_polygon(),
343                                              time_interval=[0,1])
344       
345       
346        assert len(domain.quantities_to_be_monitored) == 3
347        assert domain.quantities_to_be_monitored.has_key('stage-elevation')
348        assert domain.quantities_to_be_monitored.has_key('xmomentum')               
349        assert domain.quantities_to_be_monitored.has_key('ymomentum')       
350
351       
352        #domain.protect_against_isolated_degenerate_timesteps = True
353        #domain.tight_slope_limiters = 1
354        domain.tight_slope_limiters = 0 # Backwards compatibility
355        domain.use_centroid_velocities = 0 # Backwards compatibility (7/5/8)
356       
357       
358        sww = SWW_file(domain)
359
360        for t in domain.evolve(yieldstep = 1, finaltime = 1):
361            pass
362            #print domain.timestepping_statistics()
363            domain.quantity_statistics(precision = '%.8f') # Silent
364
365           
366        # Get NetCDF
367        fid = NetCDFFile(sww.filename, netcdf_mode_r) # Open existing file for append
368
369        # Get the variables
370        extrema = fid.variables['stage-elevation.extrema'][:]
371        assert num.allclose(extrema, [0.00, 0.30])
372
373        loc = fid.variables['stage-elevation.min_location'][:]
374        assert num.allclose(loc, [0.16666667, 0.33333333])
375
376        loc = fid.variables['stage-elevation.max_location'][:]       
377        assert num.allclose(loc, [0.8333333, 0.16666667])       
378
379        time = fid.variables['stage-elevation.max_time'][:]
380        assert num.allclose(time, 0.0)               
381
382        extrema = fid.variables['xmomentum.extrema'][:]
383        assert num.allclose(extrema,[-0.06062178, 0.47873023]) or \
384            num.allclose(extrema, [-0.06062178, 0.47847986]) or \
385            num.allclose(extrema, [-0.06062178, 0.47848481]) or \
386            num.allclose(extrema, [-0.06062178, 0.47763887]) # 18/09/09
387       
388        extrema = fid.variables['ymomentum.extrema'][:]
389        assert num.allclose(extrema,[0.00, 0.0625786]) or num.allclose(extrema,[0.00, 0.06062178])
390
391        time_interval = fid.variables['extrema.time_interval'][:]
392        assert num.allclose(time_interval, [0,1])
393       
394        polygon = fid.variables['extrema.polygon'][:]       
395        assert num.allclose(polygon, domain.get_boundary_polygon())
396       
397        fid.close()
398        #print "sww.filename", sww.filename
399        os.remove(sww.filename)
400
401       
402       
403    def test_sww_constant_smooth(self):
404        """Test that constant sww information can be written correctly
405        (non smooth)
406        """
407        self.domain.set_name('datatest' + str(id(self)))
408        self.domain.format = 'sww'
409        self.domain.smooth = True
410
411        sww = SWW_file(self.domain)
412        sww.store_connectivity()
413
414        # Check contents
415        # Get NetCDF
416        fid = NetCDFFile(sww.filename, netcdf_mode_r)  # Open existing file for append
417
418        # Get the variables
419        X = fid.variables['x'][:]
420        Y = fid.variables['y'][:]
421        Z = fid.variables['elevation'][:]
422        V = fid.variables['volumes']
423
424        assert num.allclose([X[0], Y[0]], num.array([0.0, 0.0]))
425        assert num.allclose([X[1], Y[1]], num.array([0.0, 0.5]))
426        assert num.allclose([X[2], Y[2]], num.array([0.0, 1.0]))
427        assert num.allclose([X[4], Y[4]], num.array([0.5, 0.5]))
428        assert num.allclose([X[7], Y[7]], num.array([1.0, 0.5]))
429
430        assert Z[4] == -0.5
431
432        assert V[2,0] == 4
433        assert V[2,1] == 5
434        assert V[2,2] == 1
435        assert V[4,0] == 6
436        assert V[4,1] == 7
437        assert V[4,2] == 3
438
439        fid.close()
440        os.remove(sww.filename)
441       
442
443    def test_sww_variable(self):
444        """Test that sww information can be written correctly
445        """
446        self.domain.set_name('datatest' + str(id(self)))
447        self.domain.format = 'sww'
448        self.domain.smooth = True
449        self.domain.reduction = mean
450
451        sww = SWW_file(self.domain)
452        sww.store_connectivity()
453        sww.store_timestep()
454
455        # Check contents
456        # Get NetCDF
457        fid = NetCDFFile(sww.filename, netcdf_mode_r)  # Open existing file for append
458
459
460        # Get the variables
461        time = fid.variables['time']
462        stage = fid.variables['stage']
463
464        Q = self.domain.quantities['stage']
465        Q0 = Q.vertex_values[:,0]
466        Q1 = Q.vertex_values[:,1]
467        Q2 = Q.vertex_values[:,2]
468
469        A = stage[0,:]
470        #print A[0], (Q2[0,0] + Q1[1,0])/2
471        assert num.allclose(A[0], (Q2[0] + Q1[1])/2)
472        assert num.allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3)
473        assert num.allclose(A[2], Q0[3])
474        assert num.allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3)
475
476        #Center point
477        assert num.allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\
478                                   Q0[5] + Q2[6] + Q1[7])/6)
479       
480        fid.close()
481        os.remove(sww.filename)
482
483
484    def test_sww_variable2(self):
485        """Test that sww information can be written correctly
486        multiple timesteps. Use average as reduction operator
487        """
488
489        import time, os
490        from Scientific.IO.NetCDF import NetCDFFile
491
492        self.domain.set_name('datatest' + str(id(self)))
493        self.domain.format = 'sww'
494        self.domain.smooth = True
495
496        self.domain.reduction = mean
497
498        sww = SWW_file(self.domain)
499        sww.store_connectivity()
500        sww.store_timestep()
501        #self.domain.tight_slope_limiters = 1
502        self.domain.evolve_to_end(finaltime = 0.01)
503        sww.store_timestep()
504
505        # Check contents
506        # Get NetCDF
507        fid = NetCDFFile(sww.filename, netcdf_mode_r)  # Open existing file for append
508
509        # Get the variables
510        x = fid.variables['x']
511        y = fid.variables['y']
512        z = fid.variables['elevation']
513        time = fid.variables['time']
514        stage = fid.variables['stage']
515
516        #Check values
517        Q = self.domain.quantities['stage']
518        Q0 = Q.vertex_values[:,0]
519        Q1 = Q.vertex_values[:,1]
520        Q2 = Q.vertex_values[:,2]
521
522        A = stage[1,:]
523        assert num.allclose(A[0], (Q2[0] + Q1[1])/2)
524        assert num.allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3)
525        assert num.allclose(A[2], Q0[3])
526        assert num.allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3)
527
528        #Center point
529        assert num.allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\
530                                   Q0[5] + Q2[6] + Q1[7])/6)
531
532
533        fid.close()
534
535        #Cleanup
536        os.remove(sww.filename)
537
538
539    def test_sync(self):
540        """test_sync - Test info stored at each timestep is as expected (incl initial condition)
541        """
542
543        import time, os, config
544        from Scientific.IO.NetCDF import NetCDFFile
545
546        self.domain.set_name('synctest')
547        self.domain.format = 'sww'
548        self.domain.smooth = False
549        self.domain.store = True
550
551        self.domain.tight_slope_limiters = True
552        self.domain.use_centroid_velocities = True       
553       
554        # In this case tight_slope_limiters as default
555        # in conjunction with protection
556        # against isolated degenerate timesteps works.
557        #self.domain.tight_slope_limiters = 1
558        #self.domain.protect_against_isolated_degenerate_timesteps = True
559
560        #print 'tight_sl', self.domain.tight_slope_limiters
561       
562
563        #Evolution
564        for t in self.domain.evolve(yieldstep = 1.0, finaltime = 4.0):
565           
566            #########self.domain.write_time(track_speeds=True)
567            stage = self.domain.quantities['stage'].vertex_values
568
569            #Get NetCDF
570            fid = NetCDFFile(self.domain.writer.filename, netcdf_mode_r)
571            stage_file = fid.variables['stage']
572           
573            if t == 0.0:
574                assert num.allclose(stage, self.initial_stage)
575                assert num.allclose(stage_file[:], stage.flatten())
576            else:
577                assert not num.allclose(stage, self.initial_stage)
578                assert not num.allclose(stage_file[:], stage.flatten())
579
580            fid.close()
581
582        os.remove(self.domain.writer.filename)
583
584
585    def test_sww_minimum_storable_height(self):
586        """Test that sww information can be written correctly
587        multiple timesteps using a different reduction operator (min)
588        """
589
590        import time, os
591        from Scientific.IO.NetCDF import NetCDFFile
592
593        self.domain.set_name('datatest' + str(id(self)))
594        self.domain.format = 'sww'
595        self.domain.smooth = True
596        self.domain.reduction = min
597        self.domain.minimum_storable_height = 100
598
599        sww = SWW_file(self.domain)
600        sww.store_connectivity()
601        sww.store_timestep()
602
603        #self.domain.tight_slope_limiters = 1
604        self.domain.evolve_to_end(finaltime = 0.01)
605        sww.store_timestep()
606
607
608        #Check contents
609        #Get NetCDF
610        fid = NetCDFFile(sww.filename, netcdf_mode_r)
611
612
613        # Get the variables
614        x = fid.variables['x']
615        y = fid.variables['y']
616        z = fid.variables['elevation']
617        time = fid.variables['time']
618        stage = fid.variables['stage']
619        xmomentum = fid.variables['xmomentum']
620        ymomentum = fid.variables['ymomentum']       
621
622        #Check values
623        Q = self.domain.quantities['stage']
624        Q0 = Q.vertex_values[:,0]
625        Q1 = Q.vertex_values[:,1]
626        Q2 = Q.vertex_values[:,2]
627
628        A = stage[1,:]
629        assert num.allclose(stage[1,:], z[:])
630
631
632        assert num.allclose(xmomentum, 0.0)
633        assert num.allclose(ymomentum, 0.0)       
634       
635        fid.close()
636
637        #Cleanup
638        os.remove(sww.filename)
639
640
641    def test_file_boundary_stsIV_sinewave_ordering(self):
642        """test_file_boundary_stsIV_sinewave_ordering(self):
643        Read correct points from ordering file and apply sts to boundary
644        This one uses a sine wave and compares to time boundary
645        """
646
647        lat_long_points=[[6.01,97.0],[6.02,97.0],[6.05,96.9],[6.0,97.0]]
648        bounding_polygon=[[6.0,97.0],[6.01,97.0],[6.02,97.0],[6.02,97.02],[6.00,97.02]]
649        tide = 0.35
650        time_step_count = 50
651        time_step = 0.1
652        times_ref = num.arange(0, time_step_count*time_step, time_step)
653       
654        n=len(lat_long_points)
655        first_tstep=num.ones(n,num.int)
656        last_tstep=(time_step_count)*num.ones(n,num.int)
657       
658        gauge_depth=20*num.ones(n,num.float)
659       
660        ha1=num.ones((n,time_step_count),num.float)
661        ua1=3.*num.ones((n,time_step_count),num.float)
662        va1=2.*num.ones((n,time_step_count),num.float)
663        for i in range(n):
664            ha1[i]=num.sin(times_ref)
665       
666       
667        base_name, files = self.write_mux2(lat_long_points,
668                                           time_step_count, time_step,
669                                           first_tstep, last_tstep,
670                                           depth=gauge_depth,
671                                           ha=ha1,
672                                           ua=ua1,
673                                           va=va1)
674
675        # Write order file
676        file_handle, order_base_name = tempfile.mkstemp("")
677        os.close(file_handle)
678        os.remove(order_base_name)
679        d=","
680        order_file=order_base_name+'order.txt'
681        fid=open(order_file,'w')
682       
683        # Write Header
684        header='index, longitude, latitude\n'
685        fid.write(header)
686        indices=[3,0,1]
687        for i in indices:
688            line=str(i)+d+str(lat_long_points[i][1])+d+\
689                str(lat_long_points[i][0])+"\n"
690            fid.write(line)
691        fid.close()
692
693        sts_file=base_name
694        urs2sts(base_name, basename_out=sts_file,
695                ordering_filename=order_file,
696                mean_stage=tide,
697                verbose=False)
698        self.delete_mux(files)
699       
700       
701       
702        # Now read the sts file and check that values have been stored correctly.
703        fid = NetCDFFile(sts_file + '.sts')
704
705        # Check the time vector
706        times = fid.variables['time'][:]
707       
708        #print times
709
710        # Check sts quantities
711        stage = fid.variables['stage'][:]
712        xmomentum = fid.variables['xmomentum'][:]
713        ymomentum = fid.variables['ymomentum'][:]
714        elevation = fid.variables['elevation'][:]       
715
716        # Create beginnings of boundary polygon based on sts_boundary
717        boundary_polygon = create_sts_boundary(base_name)
718       
719        os.remove(order_file)
720
721        # Append the remaining part of the boundary polygon to be defined by
722        # the user
723        bounding_polygon_utm=[]
724        for point in bounding_polygon:
725            zone,easting,northing=redfearn(point[0],point[1])
726            bounding_polygon_utm.append([easting,northing])
727
728        boundary_polygon.append(bounding_polygon_utm[3])
729        boundary_polygon.append(bounding_polygon_utm[4])
730
731        #print 'boundary_polygon', boundary_polygon
732       
733        plot=False
734        if plot:
735            from pylab import plot,show,axis
736            boundary_polygon=ensure_numeric(boundary_polygon)
737            bounding_polygon_utm=ensure_numeric(bounding_polygon_utm)
738            #plot(lat_long_points[:,0],lat_long_points[:,1],'o')
739            plot(boundary_polygon[:,0], boundary_polygon[:,1])
740            plot(bounding_polygon_utm[:,0],bounding_polygon_utm[:,1])
741            show()
742
743        assert num.allclose(bounding_polygon_utm,boundary_polygon)
744
745
746        extent_res=1000000
747        meshname = 'urs_test_mesh' + '.tsh'
748        interior_regions=None
749        boundary_tags={'ocean': [0,1], 'otherocean': [2,3,4]}
750       
751        # have to change boundary tags from last example because now bounding
752        # polygon starts in different place.
753        create_mesh_from_regions(boundary_polygon,
754                                 boundary_tags=boundary_tags,
755                                 maximum_triangle_area=extent_res,
756                                 filename=meshname,
757                                 interior_regions=interior_regions,
758                                 verbose=False)
759       
760        domain_fbound = Domain(meshname)
761        domain_fbound.set_quantity('stage', tide)
762        Bf = File_boundary(sts_file+'.sts', 
763                           domain_fbound, 
764                           boundary_polygon=boundary_polygon)
765        Br = Reflective_boundary(domain_fbound)
766
767        domain_fbound.set_boundary({'ocean': Bf,'otherocean': Br})
768        finaltime=time_step*(time_step_count-1)
769        yieldstep=time_step
770        temp_fbound=num.zeros(int(finaltime/yieldstep)+1,num.float)
771   
772        for i, t in enumerate(domain_fbound.evolve(yieldstep=yieldstep,
773                                                   finaltime=finaltime, 
774                                                   skip_initial_step=False)):
775            temp_fbound[i]=domain_fbound.quantities['stage'].centroid_values[2]
776   
777       
778        domain_time = Domain(meshname)
779        domain_time.set_quantity('stage', tide)
780        Br = Reflective_boundary(domain_time)
781        Bw = Time_boundary(domain=domain_time,
782                         f=lambda t: [num.sin(t)+tide,3.*(20.+num.sin(t)+tide),2.*(20.+num.sin(t)+tide)])
783        domain_time.set_boundary({'ocean': Bw,'otherocean': Br})
784       
785        temp_time=num.zeros(int(finaltime/yieldstep)+1,num.float)
786        for i, t in enumerate(domain_time.evolve(yieldstep=yieldstep,
787                                                   finaltime=finaltime, 
788                                                   skip_initial_step=False)):
789            temp_time[i]=domain_time.quantities['stage'].centroid_values[2]
790       
791        assert num.allclose(temp_fbound, temp_time)               
792        assert num.allclose(domain_fbound.quantities['stage'].vertex_values,
793                            domain_time.quantities['stage'].vertex_values)
794                       
795        assert num.allclose(domain_fbound.quantities['xmomentum'].vertex_values,
796                            domain_time.quantities['xmomentum'].vertex_values)                       
797                       
798        assert num.allclose(domain_fbound.quantities['ymomentum'].vertex_values,
799                            domain_time.quantities['ymomentum'].vertex_values)                                               
800       
801
802        try:
803            os.remove(sts_file+'.sts')
804        except:
805            # Windoze can't remove this file for some reason
806            pass
807       
808        os.remove(meshname)
809       
810 
811    def test_points2polygon(self): 
812        att_dict = {}
813        pointlist = num.array([[1.0, 0.0],[0.0, 1.0],[0.0, 0.0]])
814        att_dict['elevation'] = num.array([10.1, 0.0, 10.4])
815        att_dict['brightness'] = num.array([10.0, 1.0, 10.4])
816       
817        fileName = tempfile.mktemp(".csv")
818       
819        G = Geospatial_data(pointlist, att_dict)
820       
821        G.export_points_file(fileName)
822       
823        polygon = load_pts_as_polygon(fileName)
824       
825        # This test may fail if the order changes
826        assert (polygon == [[0.0, 0.0],[1.0, 0.0],[0.0, 1.0]])
827       
828
829
830#-------------------------------------------------------------
831
832if __name__ == "__main__":
833    #suite = unittest.makeSuite(Test_Data_Manager, 'test_sww2domain2')
834    suite = unittest.makeSuite(Test_Data_Manager, 'test_sww')
835   
836   
837   
838    # FIXME(Ole): When Ross has implemented logging, we can
839    # probably get rid of all this:
840    if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V':
841        Test_Data_Manager.verbose=True
842        saveout = sys.stdout   
843        filename = ".temp_verbose"
844        fid = open(filename, 'w')
845        sys.stdout = fid
846    else:
847        pass
848    runner = unittest.TextTestRunner() #verbosity=2)
849    runner.run(suite)
850
851    # Cleaning up
852    if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V':
853        sys.stdout = saveout
854        fid.close() 
855        os.remove(filename)
856
857
Note: See TracBrowser for help on using the repository browser.