source: trunk/anuga_core/source/anuga/shallow_water/test_data_manager.py @ 7872

Last change on this file since 7872 was 7872, checked in by hudson, 14 years ago

Fixed a few unit test errors.

File size: 30.1 KB
Line 
1#!/usr/bin/env python
2#
3
4"""
5Set of tests for the now-defunct data manager module.
6
7These could be split up into their correct modules.
8"""
9
10import unittest
11import copy
12import numpy as num
13import sys
14               
15import tempfile
16import os
17import shutil
18from struct import pack, unpack
19
20from Scientific.IO.NetCDF import NetCDFFile
21
22
23from anuga.anuga_exceptions import ANUGAError
24from anuga.file.sww import SWW_file
25from anuga.coordinate_transforms.geo_reference import Geo_reference                       
26from anuga.coordinate_transforms.redfearn import degminsec2decimal_degrees
27from anuga.abstract_2d_finite_volumes.util import file_function
28from anuga.utilities.system_tools import get_pathname_from_package, \
29                                            get_revision_number
30from anuga.utilities.file_utils import get_all_swwfiles
31from anuga.utilities.file_utils import del_dir
32from anuga.utilities.numerical_tools import ensure_numeric, mean
33from anuga.config import netcdf_mode_r, netcdf_mode_w, netcdf_mode_a
34from anuga.config import netcdf_float, epsilon, g
35from anuga.pmesh.mesh_interface import create_mesh_from_regions
36from anuga.file_conversion.sww2dem import sww2dem_batch
37from anuga.file.csv_file import load_csv_as_dict, load_csv_as_array, \
38                                load_csv_as_building_polygons, \
39                                load_csv_as_polygons
40from anuga.file.sts import create_sts_boundary
41from anuga.file.pts import load_pts_as_polygon
42from anuga.file.sww import Write_sww
43
44
45# import all the boundaries - some are generic, some are shallow water
46from boundaries import Reflective_boundary, \
47            Field_boundary, Transmissive_momentum_set_stage_boundary, \
48            Transmissive_stage_zero_momentum_boundary
49from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\
50     import Transmissive_boundary, Dirichlet_boundary, \
51            Time_boundary, File_boundary, AWI_boundary
52
53# This is needed to run the tests of local functions
54from anuga.file_conversion.urs2sts import urs2sts
55from anuga.coordinate_transforms.redfearn import redfearn
56from anuga.coordinate_transforms.geo_reference import Geo_reference, \
57     DEFAULT_ZONE
58from anuga.geospatial_data.geospatial_data import Geospatial_data
59
60from shallow_water_domain import Domain
61
62# use helper methods from other unit test
63from anuga.file.test_mux import Test_Mux
64
65
66class Test_Data_Manager(Test_Mux):
67    # Class variable
68    verbose = False
69
70    def set_verbose(self):
71        Test_Data_Manager.verbose = True
72       
73    def setUp(self):
74        import time
75        from mesh_factory import rectangular
76       
77        self.verbose = Test_Data_Manager.verbose
78        # Create basic mesh
79        points, vertices, boundary = rectangular(2, 2)
80
81        # Create shallow water domain
82        domain = Domain(points, vertices, boundary)
83        domain.default_order = 2
84
85        # Set some field values
86        domain.set_quantity('elevation', lambda x,y: -x)
87        domain.set_quantity('friction', 0.03)
88
89
90        ######################
91        # Boundary conditions
92        B = Transmissive_boundary(domain)
93        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
94
95
96        ######################
97        #Initial condition - with jumps
98        bed = domain.quantities['elevation'].vertex_values
99        stage = num.zeros(bed.shape, num.float)
100
101        h = 0.3
102        for i in range(stage.shape[0]):
103            if i % 2 == 0:
104                stage[i,:] = bed[i,:] + h
105            else:
106                stage[i,:] = bed[i,:]
107
108        domain.set_quantity('stage', stage)
109
110
111        domain.distribute_to_vertices_and_edges()               
112        self.initial_stage = copy.copy(domain.quantities['stage'].vertex_values)
113
114
115        self.domain = domain
116
117        C = domain.get_vertex_coordinates()
118        self.X = C[:,0:6:2].copy()
119        self.Y = C[:,1:6:2].copy()
120
121        self.F = bed
122
123        #Write A testfile (not realistic. Values aren't realistic)
124        self.test_MOST_file = 'most_small'
125
126        longitudes = [150.66667, 150.83334, 151., 151.16667]
127        latitudes = [-34.5, -34.33333, -34.16667, -34]
128
129        long_name = 'LON'
130        lat_name = 'LAT'
131
132        nx = 4
133        ny = 4
134        six = 6
135
136
137        for ext in ['_ha.nc', '_ua.nc', '_va.nc', '_e.nc']:
138            fid = NetCDFFile(self.test_MOST_file + ext, netcdf_mode_w)
139
140            fid.createDimension(long_name,nx)
141            fid.createVariable(long_name,netcdf_float,(long_name,))
142            fid.variables[long_name].point_spacing='uneven'
143            fid.variables[long_name].units='degrees_east'
144            fid.variables[long_name].assignValue(longitudes)
145
146            fid.createDimension(lat_name,ny)
147            fid.createVariable(lat_name,netcdf_float,(lat_name,))
148            fid.variables[lat_name].point_spacing='uneven'
149            fid.variables[lat_name].units='degrees_north'
150            fid.variables[lat_name].assignValue(latitudes)
151
152            fid.createDimension('TIME',six)
153            fid.createVariable('TIME',netcdf_float,('TIME',))
154            fid.variables['TIME'].point_spacing='uneven'
155            fid.variables['TIME'].units='seconds'
156            fid.variables['TIME'].assignValue([0.0, 0.1, 0.6, 1.1, 1.6, 2.1])
157
158
159            name = ext[1:3].upper()
160            if name == 'E.': name = 'ELEVATION'
161            fid.createVariable(name,netcdf_float,('TIME', lat_name, long_name))
162            fid.variables[name].units='CENTIMETERS'
163            fid.variables[name].missing_value=-1.e+034
164
165            fid.variables[name].assignValue([[[0.3400644, 0, -46.63519, -6.50198],
166                                              [-0.1214216, 0, 0, 0],
167                                              [0, 0, 0, 0],
168                                              [0, 0, 0, 0]],
169                                             [[0.3400644, 2.291054e-005, -23.33335, -6.50198],
170                                              [-0.1213987, 4.581959e-005, -1.594838e-007, 1.421085e-012],
171                                              [2.291054e-005, 4.582107e-005, 4.581715e-005, 1.854517e-009],
172                                              [0, 2.291054e-005, 2.291054e-005, 0]],
173                                             [[0.3400644, 0.0001374632, -23.31503, -6.50198],
174                                              [-0.1212842, 0.0002756907, 0.006325484, 1.380492e-006],
175                                              [0.0001374632, 0.0002749264, 0.0002742863, 6.665601e-008],
176                                              [0, 0.0001374632, 0.0001374632, 0]],
177                                             [[0.3400644, 0.0002520159, -23.29672, -6.50198],
178                                              [-0.1211696, 0.0005075303, 0.01264618, 6.208276e-006],
179                                              [0.0002520159, 0.0005040318, 0.0005027961, 2.23865e-007],
180                                              [0, 0.0002520159, 0.0002520159, 0]],
181                                             [[0.3400644, 0.0003665686, -23.27842, -6.50198],
182                                              [-0.1210551, 0.0007413362, 0.01896192, 1.447638e-005],
183                                              [0.0003665686, 0.0007331371, 0.0007313463, 4.734126e-007],
184                                              [0, 0.0003665686, 0.0003665686, 0]],
185                                             [[0.3400644, 0.0004811212, -23.26012, -6.50198],
186                                              [-0.1209405, 0.0009771062, 0.02527271, 2.617787e-005],
187                                              [0.0004811212, 0.0009622425, 0.0009599366, 8.152277e-007],
188                                              [0, 0.0004811212, 0.0004811212, 0]]])
189
190
191            fid.close()
192
193
194
195
196    def tearDown(self):
197        import os
198        for ext in ['_ha.nc', '_ua.nc', '_va.nc', '_e.nc']:
199            #print 'Trying to remove', self.test_MOST_file + ext
200            os.remove(self.test_MOST_file + ext)
201
202    def test_sww_constant(self):
203        """Test that constant sww information can be written correctly
204        (non smooth)
205        """
206        self.domain.set_name('datatest' + str(id(self)))
207        self.domain.format = 'sww' #Remove??
208        self.domain.smooth = False
209       
210        sww = SWW_file(self.domain)
211        sww.store_connectivity()
212
213        fid = NetCDFFile(sww.filename, netcdf_mode_r)  # Open existing file for append
214
215        # Get the variables
216        x = fid.variables['x']
217        y = fid.variables['y']
218        z = fid.variables['elevation']
219        V = fid.variables['volumes']
220
221        assert num.allclose (x[:], self.X.flatten())
222        assert num.allclose (y[:], self.Y.flatten())
223        assert num.allclose (z[:], self.F.flatten())
224
225        P = len(self.domain)
226        for k in range(P):
227            assert V[k, 0] == 3*k
228            assert V[k, 1] == 3*k+1
229            assert V[k, 2] == 3*k+2
230
231        fid.close()
232        os.remove(sww.filename)
233
234    def test_sww_header(self):
235        """Test that constant sww information can be written correctly
236        (non smooth)
237        """
238        self.domain.set_name('datatest' + str(id(self)))
239        self.domain.format = 'sww' #Remove??
240        self.domain.smooth = False
241
242        sww = SWW_file(self.domain)
243        sww.store_connectivity()
244
245        # Check contents
246        # Get NetCDF
247        fid = NetCDFFile(sww.filename, netcdf_mode_r)  # Open existing file for append
248
249        # Get the variables
250        sww_revision = fid.revision_number
251        try:
252            revision_number = get_revision_number()
253        except:
254            revision_number = None
255           
256        assert str(revision_number) == sww_revision
257        fid.close()
258
259        #print "sww.filename", sww.filename
260        os.remove(sww.filename)
261
262    def test_sww_range(self):
263        """Test that constant sww information can be written correctly
264        Use non-smooth to be able to compare to quantity values.
265        """
266
267        self.domain.set_name('datatest' + str(id(self)))
268        self.domain.format = 'sww'
269        self.domain.set_store_vertices_uniquely(True)
270       
271        sww = SWW_file(self.domain)       
272
273        dqs = self.domain.get_quantity('stage')
274        dqx = self.domain.get_quantity('xmomentum')
275        dqy = self.domain.get_quantity('ymomentum')       
276        xmom_min = ymom_min = stage_min = sys.maxint
277        xmom_max = ymom_max = stage_max = -stage_min       
278        for t in self.domain.evolve(yieldstep = 1, finaltime = 1):
279            wmax = max(dqs.get_values().flatten())
280            if wmax > stage_max: stage_max = wmax
281            wmin = min(dqs.get_values().flatten())
282            if wmin < stage_min: stage_min = wmin           
283           
284            uhmax = max(dqx.get_values().flatten())
285            if uhmax > xmom_max: xmom_max = uhmax
286            uhmin = min(dqx.get_values().flatten())
287            if uhmin < xmom_min: xmom_min = uhmin                       
288           
289            vhmax = max(dqy.get_values().flatten())
290            if vhmax > ymom_max: ymom_max = vhmax
291            vhmin = min(dqy.get_values().flatten())
292            if vhmin < ymom_min: ymom_min = vhmin                                   
293           
294           
295           
296        # Get NetCDF
297        fid = NetCDFFile(sww.filename, netcdf_mode_r) # Open existing file for append
298
299        # Get the variables
300        range = fid.variables['stage_range'][:]
301        assert num.allclose(range,[stage_min, stage_max])
302
303        range = fid.variables['xmomentum_range'][:]
304        #print range
305        assert num.allclose(range, [xmom_min, xmom_max])
306       
307        range = fid.variables['ymomentum_range'][:]
308        #print range
309        assert num.allclose(range, [ymom_min, ymom_max])       
310
311
312       
313        fid.close()
314        os.remove(sww.filename)
315
316    def test_sww_extrema(self):
317        """Test that extrema of quantities can be retrieved at every vertex
318        Extrema are updated at every *internal* timestep
319        """
320
321        domain = self.domain
322       
323        domain.set_name('extrema_test' + str(id(self)))
324        domain.format = 'sww'
325        domain.smooth = True
326
327        assert domain.quantities_to_be_monitored is None
328        assert domain.monitor_polygon is None
329        assert domain.monitor_time_interval is None       
330       
331        domain.set_quantities_to_be_monitored(['xmomentum',
332                                               'ymomentum',
333                                               'stage-elevation'])
334
335        assert domain.monitor_polygon is None
336        assert domain.monitor_time_interval is None
337
338
339        domain.set_quantities_to_be_monitored(['xmomentum',
340                                               'ymomentum',
341                                               'stage-elevation'],
342                                              polygon=domain.get_boundary_polygon(),
343                                              time_interval=[0,1])
344       
345       
346        assert len(domain.quantities_to_be_monitored) == 3
347        assert domain.quantities_to_be_monitored.has_key('stage-elevation')
348        assert domain.quantities_to_be_monitored.has_key('xmomentum')               
349        assert domain.quantities_to_be_monitored.has_key('ymomentum')       
350
351       
352        #domain.protect_against_isolated_degenerate_timesteps = True
353        #domain.tight_slope_limiters = 1
354        domain.tight_slope_limiters = 0 # Backwards compatibility
355        domain.use_centroid_velocities = 0 # Backwards compatibility (7/5/8)
356       
357       
358        sww = SWW_file(domain)
359
360        for t in domain.evolve(yieldstep = 1, finaltime = 1):
361            pass
362            #print domain.timestepping_statistics()
363            domain.quantity_statistics(precision = '%.8f') # Silent
364
365           
366        # Get NetCDF
367        fid = NetCDFFile(sww.filename, netcdf_mode_r) # Open existing file for append
368
369        # Get the variables
370        extrema = fid.variables['stage-elevation.extrema'][:]
371        assert num.allclose(extrema, [0.00, 0.30])
372
373        loc = fid.variables['stage-elevation.min_location'][:]
374        assert num.allclose(loc, [0.16666667, 0.33333333])
375
376        loc = fid.variables['stage-elevation.max_location'][:]       
377        assert num.allclose(loc, [0.8333333, 0.16666667])       
378
379        time = fid.variables['stage-elevation.max_time'][:]
380        assert num.allclose(time, 0.0)               
381
382        extrema = fid.variables['xmomentum.extrema'][:]
383        assert num.allclose(extrema,[-0.06062178, 0.47873023]) or \
384            num.allclose(extrema, [-0.06062178, 0.47847986]) or \
385            num.allclose(extrema, [-0.06062178, 0.47848481]) or \
386            num.allclose(extrema, [-0.06062178, 0.47763887]) # 18/09/09
387       
388        extrema = fid.variables['ymomentum.extrema'][:]
389        assert num.allclose(extrema,[0.00, 0.0625786]) or num.allclose(extrema,[0.00, 0.06062178])
390
391        time_interval = fid.variables['extrema.time_interval'][:]
392        assert num.allclose(time_interval, [0,1])
393       
394        polygon = fid.variables['extrema.polygon'][:]       
395        assert num.allclose(polygon, domain.get_boundary_polygon())
396       
397        fid.close()
398        #print "sww.filename", sww.filename
399        os.remove(sww.filename)
400
401       
402       
403    def test_sww_constant_smooth(self):
404        """Test that constant sww information can be written correctly
405        (non smooth)
406        """
407        self.domain.set_name('datatest' + str(id(self)))
408        self.domain.format = 'sww'
409        self.domain.smooth = True
410
411        sww = SWW_file(self.domain)
412        sww.store_connectivity()
413
414        # Check contents
415        # Get NetCDF
416        fid = NetCDFFile(sww.filename, netcdf_mode_r)  # Open existing file for append
417
418        # Get the variables
419        X = fid.variables['x'][:]
420        Y = fid.variables['y'][:]
421        Z = fid.variables['elevation'][:]
422        V = fid.variables['volumes']
423
424        assert num.allclose([X[0], Y[0]], num.array([0.0, 0.0]))
425        assert num.allclose([X[1], Y[1]], num.array([0.0, 0.5]))
426        assert num.allclose([X[2], Y[2]], num.array([0.0, 1.0]))
427        assert num.allclose([X[4], Y[4]], num.array([0.5, 0.5]))
428        assert num.allclose([X[7], Y[7]], num.array([1.0, 0.5]))
429
430        assert Z[4] == -0.5
431
432        assert V[2,0] == 4
433        assert V[2,1] == 5
434        assert V[2,2] == 1
435        assert V[4,0] == 6
436        assert V[4,1] == 7
437        assert V[4,2] == 3
438
439        fid.close()
440        os.remove(sww.filename)
441       
442
443    def test_sww_variable(self):
444        """Test that sww information can be written correctly
445        """
446        self.domain.set_name('datatest' + str(id(self)))
447        self.domain.format = 'sww'
448        self.domain.smooth = True
449        self.domain.reduction = mean
450
451        sww = SWW_file(self.domain)
452        sww.store_connectivity()
453        sww.store_timestep()
454
455        # Check contents
456        # Get NetCDF
457        fid = NetCDFFile(sww.filename, netcdf_mode_r)  # Open existing file for append
458
459
460        # Get the variables
461        time = fid.variables['time']
462        stage = fid.variables['stage']
463
464        Q = self.domain.quantities['stage']
465        Q0 = Q.vertex_values[:,0]
466        Q1 = Q.vertex_values[:,1]
467        Q2 = Q.vertex_values[:,2]
468
469        A = stage[0,:]
470        #print A[0], (Q2[0,0] + Q1[1,0])/2
471        assert num.allclose(A[0], (Q2[0] + Q1[1])/2)
472        assert num.allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3)
473        assert num.allclose(A[2], Q0[3])
474        assert num.allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3)
475
476        #Center point
477        assert num.allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\
478                                   Q0[5] + Q2[6] + Q1[7])/6)
479       
480        fid.close()
481        os.remove(sww.filename)
482
483
484    def test_sww_variable2(self):
485        """Test that sww information can be written correctly
486        multiple timesteps. Use average as reduction operator
487        """
488
489        import time, os
490        from Scientific.IO.NetCDF import NetCDFFile
491
492        self.domain.set_name('datatest' + str(id(self)))
493        self.domain.format = 'sww'
494        self.domain.smooth = True
495
496        self.domain.reduction = mean
497
498        sww = SWW_file(self.domain)
499        sww.store_connectivity()
500        sww.store_timestep()
501        #self.domain.tight_slope_limiters = 1
502        self.domain.evolve_to_end(finaltime = 0.01)
503        sww.store_timestep()
504
505        # Check contents
506        # Get NetCDF
507        fid = NetCDFFile(sww.filename, netcdf_mode_r)
508
509        # Get the variables
510        x = fid.variables['x']
511        y = fid.variables['y']
512        z = fid.variables['elevation']
513        time = fid.variables['time']
514        stage = fid.variables['stage']
515
516        #Check values
517        Q = self.domain.quantities['stage']
518        Q0 = Q.vertex_values[:,0]
519        Q1 = Q.vertex_values[:,1]
520        Q2 = Q.vertex_values[:,2]
521
522        A = stage[1,:]
523        assert num.allclose(A[0], (Q2[0] + Q1[1])/2)
524        assert num.allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3)
525        assert num.allclose(A[2], Q0[3])
526        assert num.allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3)
527
528        #Center point
529        assert num.allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\
530                                   Q0[5] + Q2[6] + Q1[7])/6)
531
532
533        fid.close()
534
535        #Cleanup
536        os.remove(sww.filename)
537
538
539    def test_sync(self):
540        """test_sync - Test info stored at each timestep is as expected (incl initial condition)
541        """
542
543        import time, os, config
544        from Scientific.IO.NetCDF import NetCDFFile
545
546        self.domain.set_name('synctest')
547        self.domain.format = 'sww'
548        self.domain.smooth = False
549        self.domain.store = True
550
551        self.domain.tight_slope_limiters = True
552        self.domain.use_centroid_velocities = True       
553       
554        # In this case tight_slope_limiters as default
555        # in conjunction with protection
556        # against isolated degenerate timesteps works.
557        #self.domain.tight_slope_limiters = 1
558        #self.domain.protect_against_isolated_degenerate_timesteps = True
559
560        #print 'tight_sl', self.domain.tight_slope_limiters
561       
562
563        #Evolution
564        for t in self.domain.evolve(yieldstep = 1.0, finaltime = 4.0):
565           
566            #########self.domain.write_time(track_speeds=True)
567            stage = self.domain.quantities['stage'].vertex_values
568
569            #Get NetCDF
570            fid = NetCDFFile(self.domain.writer.filename, netcdf_mode_r)
571            stage_file = fid.variables['stage']
572           
573            if t == 0.0:
574                assert num.allclose(stage, self.initial_stage)
575                assert num.allclose(stage_file[:], stage.flatten())
576            else:
577                assert not num.allclose(stage, self.initial_stage)
578                assert not num.allclose(stage_file[:], stage.flatten())
579
580            fid.close()
581
582        os.remove(self.domain.writer.filename)
583
584
585    def test_sww_minimum_storable_height(self):
586        """Test that sww information can be written correctly
587        multiple timesteps using a different reduction operator (min)
588        """
589
590        import time, os
591        from Scientific.IO.NetCDF import NetCDFFile
592
593        self.domain.set_name('datatest' + str(id(self)))
594        self.domain.format = 'sww'
595        self.domain.smooth = True
596        self.domain.reduction = min
597        self.domain.minimum_storable_height = 100
598
599        sww = SWW_file(self.domain)
600        sww.store_connectivity()
601        sww.store_timestep()
602
603        #self.domain.tight_slope_limiters = 1
604        self.domain.evolve_to_end(finaltime = 0.01)
605        sww.store_timestep()
606
607        #Check contents
608        #Get NetCDF
609        fid = NetCDFFile(sww.filename, netcdf_mode_r)
610
611        # Get the variables
612        x = fid.variables['x']
613        y = fid.variables['y']
614        z = fid.variables['elevation']
615        time = fid.variables['time']
616        stage = fid.variables['stage']
617        xmomentum = fid.variables['xmomentum']
618        ymomentum = fid.variables['ymomentum']       
619
620        #Check values
621        Q = self.domain.quantities['stage']
622        Q0 = Q.vertex_values[:,0]
623        Q1 = Q.vertex_values[:,1]
624        Q2 = Q.vertex_values[:,2]
625
626        A = stage[1,:]
627        assert num.allclose(stage[1,:], z[:])
628
629
630        assert num.allclose(xmomentum, 0.0)
631        assert num.allclose(ymomentum, 0.0)       
632       
633        fid.close()
634
635        #Cleanup
636        os.remove(sww.filename)
637
638
639    def test_file_boundary_stsIV_sinewave_ordering(self):
640        """test_file_boundary_stsIV_sinewave_ordering(self):
641        Read correct points from ordering file and apply sts to boundary
642        This one uses a sine wave and compares to time boundary
643        """
644
645        lat_long_points=[[6.01, 97.0], [6.02, 97.0], [6.05, 96.9], [6.0, 97.0]]
646        bounding_polygon=[[6.0, 97.0], [6.01, 97.0], [6.02,97.0], \
647                            [6.02,97.02], [6.00,97.02]]
648        tide = 0.35
649        time_step_count = 50
650        time_step = 0.1
651        times_ref = num.arange(0, time_step_count*time_step, time_step)
652       
653        n=len(lat_long_points)
654        first_tstep=num.ones(n,num.int)
655        last_tstep=(time_step_count)*num.ones(n,num.int)
656       
657        gauge_depth=20*num.ones(n,num.float)
658       
659        ha1=num.ones((n,time_step_count),num.float)
660        ua1=3.*num.ones((n,time_step_count),num.float)
661        va1=2.*num.ones((n,time_step_count),num.float)
662        for i in range(n):
663            ha1[i]=num.sin(times_ref)
664       
665       
666        base_name, files = self.write_mux2(lat_long_points,
667                                           time_step_count, time_step,
668                                           first_tstep, last_tstep,
669                                           depth=gauge_depth,
670                                           ha=ha1,
671                                           ua=ua1,
672                                           va=va1)
673
674        # Write order file
675        file_handle, order_base_name = tempfile.mkstemp("")
676        os.close(file_handle)
677        os.remove(order_base_name)
678        d=","
679        order_file=order_base_name+'order.txt'
680        fid=open(order_file,'w')
681       
682        # Write Header
683        header='index, longitude, latitude\n'
684        fid.write(header)
685        indices=[3,0,1]
686        for i in indices:
687            line=str(i)+d+str(lat_long_points[i][1])+d+\
688                str(lat_long_points[i][0])+"\n"
689            fid.write(line)
690        fid.close()
691
692        sts_file=base_name
693        urs2sts(base_name, basename_out=sts_file,
694                ordering_filename=order_file,
695                mean_stage=tide,
696                verbose=False)
697        self.delete_mux(files)
698       
699       
700       
701        # Now read the sts file and check that values have been stored correctly.
702        fid = NetCDFFile(sts_file + '.sts')
703
704        # Check the time vector
705        times = fid.variables['time'][:]
706       
707        #print times
708
709        # Check sts quantities
710        stage = fid.variables['stage'][:]
711        xmomentum = fid.variables['xmomentum'][:]
712        ymomentum = fid.variables['ymomentum'][:]
713        elevation = fid.variables['elevation'][:]       
714
715        # Create beginnings of boundary polygon based on sts_boundary
716        boundary_polygon = create_sts_boundary(base_name)
717       
718        os.remove(order_file)
719
720        # Append the remaining part of the boundary polygon to be defined by
721        # the user
722        bounding_polygon_utm=[]
723        for point in bounding_polygon:
724            zone,easting,northing=redfearn(point[0],point[1])
725            bounding_polygon_utm.append([easting,northing])
726
727        boundary_polygon.append(bounding_polygon_utm[3])
728        boundary_polygon.append(bounding_polygon_utm[4])
729
730        #print 'boundary_polygon', boundary_polygon
731       
732        plot=False
733        if plot:
734            from pylab import plot,show,axis
735            boundary_polygon=ensure_numeric(boundary_polygon)
736            bounding_polygon_utm=ensure_numeric(bounding_polygon_utm)
737            #plot(lat_long_points[:,0],lat_long_points[:,1],'o')
738            plot(boundary_polygon[:,0], boundary_polygon[:,1])
739            plot(bounding_polygon_utm[:,0],bounding_polygon_utm[:,1])
740            show()
741
742        assert num.allclose(bounding_polygon_utm,boundary_polygon)
743
744
745        extent_res=1000000
746        meshname = 'urs_test_mesh' + '.tsh'
747        interior_regions=None
748        boundary_tags={'ocean': [0,1], 'otherocean': [2,3,4]}
749       
750        # have to change boundary tags from last example because now bounding
751        # polygon starts in different place.
752        create_mesh_from_regions(boundary_polygon,
753                                 boundary_tags=boundary_tags,
754                                 maximum_triangle_area=extent_res,
755                                 filename=meshname,
756                                 interior_regions=interior_regions,
757                                 verbose=False)
758       
759        domain_fbound = Domain(meshname)
760        domain_fbound.set_quantity('stage', tide)
761        Bf = File_boundary(sts_file+'.sts', 
762                           domain_fbound, 
763                           boundary_polygon=boundary_polygon)
764        Br = Reflective_boundary(domain_fbound)
765
766        domain_fbound.set_boundary({'ocean': Bf,'otherocean': Br})
767        finaltime=time_step*(time_step_count-1)
768        yieldstep=time_step
769        temp_fbound=num.zeros(int(finaltime/yieldstep)+1,num.float)
770   
771        for i, t in enumerate(domain_fbound.evolve(yieldstep=yieldstep,
772                                                   finaltime=finaltime, 
773                                                   skip_initial_step=False)):
774            temp_fbound[i]=domain_fbound.quantities['stage'].centroid_values[2]
775   
776       
777        domain_time = Domain(meshname)
778        domain_time.set_quantity('stage', tide)
779        Br = Reflective_boundary(domain_time)
780        Bw = Time_boundary(domain=domain_time,
781                         f=lambda t: [num.sin(t)+tide,3.*(20.+num.sin(t)+tide),2.*(20.+num.sin(t)+tide)])
782        domain_time.set_boundary({'ocean': Bw,'otherocean': Br})
783       
784        temp_time=num.zeros(int(finaltime/yieldstep)+1,num.float)
785        for i, t in enumerate(domain_time.evolve(yieldstep=yieldstep,
786                                                   finaltime=finaltime, 
787                                                   skip_initial_step=False)):
788            temp_time[i]=domain_time.quantities['stage'].centroid_values[2]
789       
790        assert num.allclose(temp_fbound, temp_time)               
791        assert num.allclose(domain_fbound.quantities['stage'].vertex_values,
792                            domain_time.quantities['stage'].vertex_values)
793                       
794        assert num.allclose(domain_fbound.quantities['xmomentum'].vertex_values,
795                            domain_time.quantities['xmomentum'].vertex_values)                       
796                       
797        assert num.allclose(domain_fbound.quantities['ymomentum'].vertex_values,
798                            domain_time.quantities['ymomentum'].vertex_values)                                               
799       
800
801        try:
802            os.remove(sts_file+'.sts')
803        except:
804            # Windoze can't remove this file for some reason
805            pass
806       
807        os.remove(meshname)
808       
809 
810    def test_points2polygon(self): 
811        att_dict = {}
812        pointlist = num.array([[1.0, 0.0],[0.0, 1.0],[0.0, 0.0]])
813        att_dict['elevation'] = num.array([10.1, 0.0, 10.4])
814        att_dict['brightness'] = num.array([10.0, 1.0, 10.4])
815       
816        fileName = tempfile.mktemp(".csv")
817       
818        G = Geospatial_data(pointlist, att_dict)
819       
820        G.export_points_file(fileName)
821       
822        polygon = load_pts_as_polygon(fileName)
823       
824        # This test may fail if the order changes
825        assert (polygon == [[0.0, 0.0],[1.0, 0.0],[0.0, 1.0]])
826       
827
828
829#-------------------------------------------------------------
830
831if __name__ == "__main__":
832    #suite = unittest.makeSuite(Test_Data_Manager, 'test_sww2domain2')
833    suite = unittest.makeSuite(Test_Data_Manager, 'test_sww')
834   
835   
836   
837    # FIXME(Ole): When Ross has implemented logging, we can
838    # probably get rid of all this:
839    if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V':
840        Test_Data_Manager.verbose=True
841        saveout = sys.stdout   
842        filename = ".temp_verbose"
843        fid = open(filename, 'w')
844        sys.stdout = fid
845    else:
846        pass
847    runner = unittest.TextTestRunner() #verbosity=2)
848    runner.run(suite)
849
850    # Cleaning up
851    if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V':
852        sys.stdout = saveout
853        fid.close() 
854        os.remove(filename)
855
856
Note: See TracBrowser for help on using the repository browser.