source: trunk/anuga_core/source/anuga/shallow_water/test_data_manager.py @ 7841

Last change on this file since 7841 was 7841, checked in by hudson, 14 years ago

Refactorings to allow tests to pass.

File size: 64.7 KB
Line 
1#!/usr/bin/env python
2#
3
4"""
5Set of tests for the now-defunct data manager module.
6
7These could be split up into their correct modules.
8"""
9
10import unittest
11import copy
12import numpy as num
13import sys
14               
15import tempfile
16import os
17import shutil
18from struct import pack, unpack
19
20from Scientific.IO.NetCDF import NetCDFFile
21
22
23from anuga.anuga_exceptions import ANUGAError
24from anuga.file.sww import SWW_file
25from anuga.coordinate_transforms.geo_reference import Geo_reference                       
26from anuga.coordinate_transforms.redfearn import degminsec2decimal_degrees
27from anuga.abstract_2d_finite_volumes.util import file_function
28from anuga.utilities.system_tools import get_pathname_from_package, \
29                                            get_revision_number
30from anuga.utilities.file_utils import get_all_swwfiles
31from anuga.utilities.file_utils import del_dir
32from anuga.utilities.numerical_tools import ensure_numeric, mean
33from anuga.config import netcdf_mode_r, netcdf_mode_w, netcdf_mode_a
34from anuga.config import netcdf_float, epsilon, g
35from anuga.pmesh.mesh_interface import create_mesh_from_regions
36from anuga.file_conversion.sww2dem import sww2dem_batch
37from anuga.file.csv_file import load_csv_as_dict, load_csv_as_array, \
38                                load_csv_as_building_polygons, \
39                                load_csv_as_polygons
40from anuga.file.sts import create_sts_boundary
41from anuga.file.pts import load_pts_as_polygon
42from anuga.file.sww import Write_sww
43
44
45# import all the boundaries - some are generic, some are shallow water
46from boundaries import Reflective_boundary, \
47            Field_boundary, Transmissive_momentum_set_stage_boundary, \
48            Transmissive_stage_zero_momentum_boundary
49from anuga.abstract_2d_finite_volumes.generic_boundary_conditions\
50     import Transmissive_boundary, Dirichlet_boundary, \
51            Time_boundary, File_boundary, AWI_boundary
52
53# This is needed to run the tests of local functions
54from anuga.file_conversion.urs2sts import urs2sts
55from anuga.coordinate_transforms.redfearn import redfearn
56from anuga.coordinate_transforms.geo_reference import Geo_reference, \
57     DEFAULT_ZONE
58from anuga.geospatial_data.geospatial_data import Geospatial_data
59
60from shallow_water_domain import Domain
61
62# use helper methods from other unit test
63from anuga.file.test_mux import Test_Mux
64
65
66class Test_Data_Manager(Test_Mux):
67    # Class variable
68    verbose = False
69
70    def set_verbose(self):
71        Test_Data_Manager.verbose = True
72       
73    def setUp(self):
74        import time
75        from mesh_factory import rectangular
76       
77        self.verbose = Test_Data_Manager.verbose
78        # Create basic mesh
79        points, vertices, boundary = rectangular(2, 2)
80
81        # Create shallow water domain
82        domain = Domain(points, vertices, boundary)
83        domain.default_order = 2
84
85        # Set some field values
86        domain.set_quantity('elevation', lambda x,y: -x)
87        domain.set_quantity('friction', 0.03)
88
89
90        ######################
91        # Boundary conditions
92        B = Transmissive_boundary(domain)
93        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
94
95
96        ######################
97        #Initial condition - with jumps
98        bed = domain.quantities['elevation'].vertex_values
99        stage = num.zeros(bed.shape, num.float)
100
101        h = 0.3
102        for i in range(stage.shape[0]):
103            if i % 2 == 0:
104                stage[i,:] = bed[i,:] + h
105            else:
106                stage[i,:] = bed[i,:]
107
108        domain.set_quantity('stage', stage)
109
110
111        domain.distribute_to_vertices_and_edges()               
112        self.initial_stage = copy.copy(domain.quantities['stage'].vertex_values)
113
114
115        self.domain = domain
116
117        C = domain.get_vertex_coordinates()
118        self.X = C[:,0:6:2].copy()
119        self.Y = C[:,1:6:2].copy()
120
121        self.F = bed
122
123        #Write A testfile (not realistic. Values aren't realistic)
124        self.test_MOST_file = 'most_small'
125
126        longitudes = [150.66667, 150.83334, 151., 151.16667]
127        latitudes = [-34.5, -34.33333, -34.16667, -34]
128
129        long_name = 'LON'
130        lat_name = 'LAT'
131
132        nx = 4
133        ny = 4
134        six = 6
135
136
137        for ext in ['_ha.nc', '_ua.nc', '_va.nc', '_e.nc']:
138            fid = NetCDFFile(self.test_MOST_file + ext, netcdf_mode_w)
139
140            fid.createDimension(long_name,nx)
141            fid.createVariable(long_name,netcdf_float,(long_name,))
142            fid.variables[long_name].point_spacing='uneven'
143            fid.variables[long_name].units='degrees_east'
144            fid.variables[long_name].assignValue(longitudes)
145
146            fid.createDimension(lat_name,ny)
147            fid.createVariable(lat_name,netcdf_float,(lat_name,))
148            fid.variables[lat_name].point_spacing='uneven'
149            fid.variables[lat_name].units='degrees_north'
150            fid.variables[lat_name].assignValue(latitudes)
151
152            fid.createDimension('TIME',six)
153            fid.createVariable('TIME',netcdf_float,('TIME',))
154            fid.variables['TIME'].point_spacing='uneven'
155            fid.variables['TIME'].units='seconds'
156            fid.variables['TIME'].assignValue([0.0, 0.1, 0.6, 1.1, 1.6, 2.1])
157
158
159            name = ext[1:3].upper()
160            if name == 'E.': name = 'ELEVATION'
161            fid.createVariable(name,netcdf_float,('TIME', lat_name, long_name))
162            fid.variables[name].units='CENTIMETERS'
163            fid.variables[name].missing_value=-1.e+034
164
165            fid.variables[name].assignValue([[[0.3400644, 0, -46.63519, -6.50198],
166                                              [-0.1214216, 0, 0, 0],
167                                              [0, 0, 0, 0],
168                                              [0, 0, 0, 0]],
169                                             [[0.3400644, 2.291054e-005, -23.33335, -6.50198],
170                                              [-0.1213987, 4.581959e-005, -1.594838e-007, 1.421085e-012],
171                                              [2.291054e-005, 4.582107e-005, 4.581715e-005, 1.854517e-009],
172                                              [0, 2.291054e-005, 2.291054e-005, 0]],
173                                             [[0.3400644, 0.0001374632, -23.31503, -6.50198],
174                                              [-0.1212842, 0.0002756907, 0.006325484, 1.380492e-006],
175                                              [0.0001374632, 0.0002749264, 0.0002742863, 6.665601e-008],
176                                              [0, 0.0001374632, 0.0001374632, 0]],
177                                             [[0.3400644, 0.0002520159, -23.29672, -6.50198],
178                                              [-0.1211696, 0.0005075303, 0.01264618, 6.208276e-006],
179                                              [0.0002520159, 0.0005040318, 0.0005027961, 2.23865e-007],
180                                              [0, 0.0002520159, 0.0002520159, 0]],
181                                             [[0.3400644, 0.0003665686, -23.27842, -6.50198],
182                                              [-0.1210551, 0.0007413362, 0.01896192, 1.447638e-005],
183                                              [0.0003665686, 0.0007331371, 0.0007313463, 4.734126e-007],
184                                              [0, 0.0003665686, 0.0003665686, 0]],
185                                             [[0.3400644, 0.0004811212, -23.26012, -6.50198],
186                                              [-0.1209405, 0.0009771062, 0.02527271, 2.617787e-005],
187                                              [0.0004811212, 0.0009622425, 0.0009599366, 8.152277e-007],
188                                              [0, 0.0004811212, 0.0004811212, 0]]])
189
190
191            fid.close()
192
193
194
195
196    def tearDown(self):
197        import os
198        for ext in ['_ha.nc', '_ua.nc', '_va.nc', '_e.nc']:
199            #print 'Trying to remove', self.test_MOST_file + ext
200            os.remove(self.test_MOST_file + ext)
201
202    def test_sww_constant(self):
203        """Test that constant sww information can be written correctly
204        (non smooth)
205        """
206        self.domain.set_name('datatest' + str(id(self)))
207        self.domain.format = 'sww' #Remove??
208        self.domain.smooth = False
209       
210        sww = SWW_file(self.domain)
211        sww.store_connectivity()
212
213        fid = NetCDFFile(sww.filename, netcdf_mode_r)  # Open existing file for append
214
215        # Get the variables
216        x = fid.variables['x']
217        y = fid.variables['y']
218        z = fid.variables['elevation']
219        V = fid.variables['volumes']
220
221        assert num.allclose (x[:], self.X.flatten())
222        assert num.allclose (y[:], self.Y.flatten())
223        assert num.allclose (z[:], self.F.flatten())
224
225        P = len(self.domain)
226        for k in range(P):
227            assert V[k, 0] == 3*k
228            assert V[k, 1] == 3*k+1
229            assert V[k, 2] == 3*k+2
230
231        fid.close()
232        os.remove(sww.filename)
233
234    def test_sww_header(self):
235        """Test that constant sww information can be written correctly
236        (non smooth)
237        """
238        self.domain.set_name('datatest' + str(id(self)))
239        self.domain.format = 'sww' #Remove??
240        self.domain.smooth = False
241
242        sww = SWW_file(self.domain)
243        sww.store_connectivity()
244
245        # Check contents
246        # Get NetCDF
247        fid = NetCDFFile(sww.filename, netcdf_mode_r)  # Open existing file for append
248
249        # Get the variables
250        sww_revision = fid.revision_number
251        try:
252            revision_number = get_revision_number()
253        except:
254            revision_number = None
255           
256        assert str(revision_number) == sww_revision
257        fid.close()
258
259        #print "sww.filename", sww.filename
260        os.remove(sww.filename)
261
262    def test_sww_range(self):
263        """Test that constant sww information can be written correctly
264        Use non-smooth to be able to compare to quantity values.
265        """
266
267        self.domain.set_name('datatest' + str(id(self)))
268        self.domain.format = 'sww'
269        self.domain.set_store_vertices_uniquely(True)
270       
271        sww = SWW_file(self.domain)       
272
273        dqs = self.domain.get_quantity('stage')
274        dqx = self.domain.get_quantity('xmomentum')
275        dqy = self.domain.get_quantity('ymomentum')       
276        xmom_min = ymom_min = stage_min = sys.maxint
277        xmom_max = ymom_max = stage_max = -stage_min       
278        for t in self.domain.evolve(yieldstep = 1, finaltime = 1):
279            wmax = max(dqs.get_values().flatten())
280            if wmax > stage_max: stage_max = wmax
281            wmin = min(dqs.get_values().flatten())
282            if wmin < stage_min: stage_min = wmin           
283           
284            uhmax = max(dqx.get_values().flatten())
285            if uhmax > xmom_max: xmom_max = uhmax
286            uhmin = min(dqx.get_values().flatten())
287            if uhmin < xmom_min: xmom_min = uhmin                       
288           
289            vhmax = max(dqy.get_values().flatten())
290            if vhmax > ymom_max: ymom_max = vhmax
291            vhmin = min(dqy.get_values().flatten())
292            if vhmin < ymom_min: ymom_min = vhmin                                   
293           
294           
295           
296        # Get NetCDF
297        fid = NetCDFFile(sww.filename, netcdf_mode_r) # Open existing file for append
298
299        # Get the variables
300        range = fid.variables['stage_range'][:]
301        assert num.allclose(range,[stage_min, stage_max])
302
303        range = fid.variables['xmomentum_range'][:]
304        #print range
305        assert num.allclose(range, [xmom_min, xmom_max])
306       
307        range = fid.variables['ymomentum_range'][:]
308        #print range
309        assert num.allclose(range, [ymom_min, ymom_max])       
310
311
312       
313        fid.close()
314        os.remove(sww.filename)
315
316    def test_sww_extrema(self):
317        """Test that extrema of quantities can be retrieved at every vertex
318        Extrema are updated at every *internal* timestep
319        """
320
321        domain = self.domain
322       
323        domain.set_name('extrema_test' + str(id(self)))
324        domain.format = 'sww'
325        domain.smooth = True
326
327        assert domain.quantities_to_be_monitored is None
328        assert domain.monitor_polygon is None
329        assert domain.monitor_time_interval is None       
330       
331        domain.set_quantities_to_be_monitored(['xmomentum',
332                                               'ymomentum',
333                                               'stage-elevation'])
334
335        assert domain.monitor_polygon is None
336        assert domain.monitor_time_interval is None
337
338
339        domain.set_quantities_to_be_monitored(['xmomentum',
340                                               'ymomentum',
341                                               'stage-elevation'],
342                                              polygon=domain.get_boundary_polygon(),
343                                              time_interval=[0,1])
344       
345       
346        assert len(domain.quantities_to_be_monitored) == 3
347        assert domain.quantities_to_be_monitored.has_key('stage-elevation')
348        assert domain.quantities_to_be_monitored.has_key('xmomentum')               
349        assert domain.quantities_to_be_monitored.has_key('ymomentum')       
350
351       
352        #domain.protect_against_isolated_degenerate_timesteps = True
353        #domain.tight_slope_limiters = 1
354        domain.tight_slope_limiters = 0 # Backwards compatibility
355        domain.use_centroid_velocities = 0 # Backwards compatibility (7/5/8)
356       
357       
358        sww = SWW_file(domain)
359
360        for t in domain.evolve(yieldstep = 1, finaltime = 1):
361            pass
362            #print domain.timestepping_statistics()
363            domain.quantity_statistics(precision = '%.8f') # Silent
364
365           
366        # Get NetCDF
367        fid = NetCDFFile(sww.filename, netcdf_mode_r) # Open existing file for append
368
369        # Get the variables
370        extrema = fid.variables['stage-elevation.extrema'][:]
371        assert num.allclose(extrema, [0.00, 0.30])
372
373        loc = fid.variables['stage-elevation.min_location'][:]
374        assert num.allclose(loc, [0.16666667, 0.33333333])
375
376        loc = fid.variables['stage-elevation.max_location'][:]       
377        assert num.allclose(loc, [0.8333333, 0.16666667])       
378
379        time = fid.variables['stage-elevation.max_time'][:]
380        assert num.allclose(time, 0.0)               
381
382        extrema = fid.variables['xmomentum.extrema'][:]
383        assert num.allclose(extrema,[-0.06062178, 0.47873023]) or \
384            num.allclose(extrema, [-0.06062178, 0.47847986]) or \
385            num.allclose(extrema, [-0.06062178, 0.47848481]) or \
386            num.allclose(extrema, [-0.06062178, 0.47763887]) # 18/09/09
387       
388        extrema = fid.variables['ymomentum.extrema'][:]
389        assert num.allclose(extrema,[0.00, 0.0625786]) or num.allclose(extrema,[0.00, 0.06062178])
390
391        time_interval = fid.variables['extrema.time_interval'][:]
392        assert num.allclose(time_interval, [0,1])
393       
394        polygon = fid.variables['extrema.polygon'][:]       
395        assert num.allclose(polygon, domain.get_boundary_polygon())
396       
397        fid.close()
398        #print "sww.filename", sww.filename
399        os.remove(sww.filename)
400
401       
402       
403    def test_sww_constant_smooth(self):
404        """Test that constant sww information can be written correctly
405        (non smooth)
406        """
407        self.domain.set_name('datatest' + str(id(self)))
408        self.domain.format = 'sww'
409        self.domain.smooth = True
410
411        sww = SWW_file(self.domain)
412        sww.store_connectivity()
413
414        # Check contents
415        # Get NetCDF
416        fid = NetCDFFile(sww.filename, netcdf_mode_r)  # Open existing file for append
417
418        # Get the variables
419        X = fid.variables['x'][:]
420        Y = fid.variables['y'][:]
421        Z = fid.variables['elevation'][:]
422        V = fid.variables['volumes']
423
424        assert num.allclose([X[0], Y[0]], num.array([0.0, 0.0]))
425        assert num.allclose([X[1], Y[1]], num.array([0.0, 0.5]))
426        assert num.allclose([X[2], Y[2]], num.array([0.0, 1.0]))
427        assert num.allclose([X[4], Y[4]], num.array([0.5, 0.5]))
428        assert num.allclose([X[7], Y[7]], num.array([1.0, 0.5]))
429
430        assert Z[4] == -0.5
431
432        assert V[2,0] == 4
433        assert V[2,1] == 5
434        assert V[2,2] == 1
435        assert V[4,0] == 6
436        assert V[4,1] == 7
437        assert V[4,2] == 3
438
439        fid.close()
440        os.remove(sww.filename)
441       
442
443    def test_sww_variable(self):
444        """Test that sww information can be written correctly
445        """
446        self.domain.set_name('datatest' + str(id(self)))
447        self.domain.format = 'sww'
448        self.domain.smooth = True
449        self.domain.reduction = mean
450
451        sww = SWW_file(self.domain)
452        sww.store_connectivity()
453        sww.store_timestep()
454
455        # Check contents
456        # Get NetCDF
457        fid = NetCDFFile(sww.filename, netcdf_mode_r)  # Open existing file for append
458
459
460        # Get the variables
461        time = fid.variables['time']
462        stage = fid.variables['stage']
463
464        Q = self.domain.quantities['stage']
465        Q0 = Q.vertex_values[:,0]
466        Q1 = Q.vertex_values[:,1]
467        Q2 = Q.vertex_values[:,2]
468
469        A = stage[0,:]
470        #print A[0], (Q2[0,0] + Q1[1,0])/2
471        assert num.allclose(A[0], (Q2[0] + Q1[1])/2)
472        assert num.allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3)
473        assert num.allclose(A[2], Q0[3])
474        assert num.allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3)
475
476        #Center point
477        assert num.allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\
478                                   Q0[5] + Q2[6] + Q1[7])/6)
479       
480        fid.close()
481        os.remove(sww.filename)
482
483
484    def test_sww_variable2(self):
485        """Test that sww information can be written correctly
486        multiple timesteps. Use average as reduction operator
487        """
488
489        import time, os
490        from Scientific.IO.NetCDF import NetCDFFile
491
492        self.domain.set_name('datatest' + str(id(self)))
493        self.domain.format = 'sww'
494        self.domain.smooth = True
495
496        self.domain.reduction = mean
497
498        sww = SWW_file(self.domain)
499        sww.store_connectivity()
500        sww.store_timestep()
501        #self.domain.tight_slope_limiters = 1
502        self.domain.evolve_to_end(finaltime = 0.01)
503        sww.store_timestep()
504
505
506        # Check contents
507        # Get NetCDF
508        fid = NetCDFFile(sww.filename, netcdf_mode_r)  # Open existing file for append
509
510        # Get the variables
511        x = fid.variables['x']
512        y = fid.variables['y']
513        z = fid.variables['elevation']
514        time = fid.variables['time']
515        stage = fid.variables['stage']
516
517        #Check values
518        Q = self.domain.quantities['stage']
519        Q0 = Q.vertex_values[:,0]
520        Q1 = Q.vertex_values[:,1]
521        Q2 = Q.vertex_values[:,2]
522
523        A = stage[1,:]
524        assert num.allclose(A[0], (Q2[0] + Q1[1])/2)
525        assert num.allclose(A[1], (Q0[1] + Q1[3] + Q2[2])/3)
526        assert num.allclose(A[2], Q0[3])
527        assert num.allclose(A[3], (Q0[0] + Q1[5] + Q2[4])/3)
528
529        #Center point
530        assert num.allclose(A[4], (Q1[0] + Q2[1] + Q0[2] +\
531                                   Q0[5] + Q2[6] + Q1[7])/6)
532
533
534        fid.close()
535
536        #Cleanup
537        os.remove(sww.filename)
538
539    def no_test_sww_variable3(self):
540        """Test that sww information can be written correctly
541        multiple timesteps using a different reduction operator (min)
542        """
543
544        # Different reduction in sww files has been made obsolete.
545       
546        import time, os
547        from Scientific.IO.NetCDF import NetCDFFile
548
549        self.domain.set_name('datatest' + str(id(self)))
550        self.domain.format = 'sww'
551        self.domain.smooth = True
552        self.domain.reduction = min
553
554        sww = SWW_file(self.domain)
555        sww.store_connectivity()
556        sww.store_timestep()
557        #self.domain.tight_slope_limiters = 1
558        self.domain.evolve_to_end(finaltime = 0.01)
559        sww.store_timestep()
560
561
562        #Check contents
563        #Get NetCDF
564        fid = NetCDFFile(sww.filename, netcdf_mode_r)
565
566        # Get the variables
567        x = fid.variables['x']
568        y = fid.variables['y']
569        z = fid.variables['elevation']
570        time = fid.variables['time']
571        stage = fid.variables['stage']
572
573        #Check values
574        Q = self.domain.quantities['stage']
575        Q0 = Q.vertex_values[:,0]
576        Q1 = Q.vertex_values[:,1]
577        Q2 = Q.vertex_values[:,2]
578
579        A = stage[1,:]
580        assert num.allclose(A[0], min(Q2[0], Q1[1]))
581        assert num.allclose(A[1], min(Q0[1], Q1[3], Q2[2]))
582        assert num.allclose(A[2], Q0[3])
583        assert num.allclose(A[3], min(Q0[0], Q1[5], Q2[4]))
584
585        #Center point
586        assert num.allclose(A[4], min(Q1[0], Q2[1], Q0[2],
587                                      Q0[5], Q2[6], Q1[7]))
588
589
590        fid.close()
591
592        #Cleanup
593        os.remove(sww.filename)
594
595
596    def test_sync(self):
597        """test_sync - Test info stored at each timestep is as expected (incl initial condition)
598        """
599
600        import time, os, config
601        from Scientific.IO.NetCDF import NetCDFFile
602
603        self.domain.set_name('synctest')
604        self.domain.format = 'sww'
605        self.domain.smooth = False
606        self.domain.store = True
607
608        self.domain.tight_slope_limiters = True
609        self.domain.use_centroid_velocities = True       
610       
611        # In this case tight_slope_limiters as default
612        # in conjunction with protection
613        # against isolated degenerate timesteps works.
614        #self.domain.tight_slope_limiters = 1
615        #self.domain.protect_against_isolated_degenerate_timesteps = True
616
617        #print 'tight_sl', self.domain.tight_slope_limiters
618       
619
620        #Evolution
621        for t in self.domain.evolve(yieldstep = 1.0, finaltime = 4.0):
622           
623            #########self.domain.write_time(track_speeds=True)
624            stage = self.domain.quantities['stage'].vertex_values
625
626            #Get NetCDF
627            fid = NetCDFFile(self.domain.writer.filename, netcdf_mode_r)
628            stage_file = fid.variables['stage']
629           
630            if t == 0.0:
631                assert num.allclose(stage, self.initial_stage)
632                assert num.allclose(stage_file[:], stage.flatten())
633            else:
634                assert not num.allclose(stage, self.initial_stage)
635                assert not num.allclose(stage_file[:], stage.flatten())
636
637            fid.close()
638
639        os.remove(self.domain.writer.filename)
640
641
642    def test_sww_minimum_storable_height(self):
643        """Test that sww information can be written correctly
644        multiple timesteps using a different reduction operator (min)
645        """
646
647        import time, os
648        from Scientific.IO.NetCDF import NetCDFFile
649
650        self.domain.set_name('datatest' + str(id(self)))
651        self.domain.format = 'sww'
652        self.domain.smooth = True
653        self.domain.reduction = min
654        self.domain.minimum_storable_height = 100
655
656        sww = SWW_file(self.domain)
657        sww.store_connectivity()
658        sww.store_timestep()
659
660        #self.domain.tight_slope_limiters = 1
661        self.domain.evolve_to_end(finaltime = 0.01)
662        sww.store_timestep()
663
664
665        #Check contents
666        #Get NetCDF
667        fid = NetCDFFile(sww.filename, netcdf_mode_r)
668
669
670        # Get the variables
671        x = fid.variables['x']
672        y = fid.variables['y']
673        z = fid.variables['elevation']
674        time = fid.variables['time']
675        stage = fid.variables['stage']
676        xmomentum = fid.variables['xmomentum']
677        ymomentum = fid.variables['ymomentum']       
678
679        #Check values
680        Q = self.domain.quantities['stage']
681        Q0 = Q.vertex_values[:,0]
682        Q1 = Q.vertex_values[:,1]
683        Q2 = Q.vertex_values[:,2]
684
685        A = stage[1,:]
686        assert num.allclose(stage[1,:], z[:])
687
688
689        assert num.allclose(xmomentum, 0.0)
690        assert num.allclose(ymomentum, 0.0)       
691       
692        fid.close()
693
694        #Cleanup
695        os.remove(sww.filename)
696
697
698    def Not_a_test_sww_DSG(self):
699        """Not a test, rather a look at the sww format
700        """
701
702        import time, os
703        from Scientific.IO.NetCDF import NetCDFFile
704
705        self.domain.set_name('datatest' + str(id(self)))
706        self.domain.format = 'sww'
707        self.domain.smooth = True
708        self.domain.reduction = mean
709
710        sww = SWW_file(self.domain)
711        sww.store_connectivity()
712        sww.store_timestep()
713
714        #Check contents
715        #Get NetCDF
716        fid = NetCDFFile(sww.filename, netcdf_mode_r)
717
718        # Get the variables
719        x = fid.variables['x']
720        y = fid.variables['y']
721        z = fid.variables['elevation']
722
723        volumes = fid.variables['volumes']
724        time = fid.variables['time']
725
726        # 2D
727        stage = fid.variables['stage']
728
729        X = x[:]
730        Y = y[:]
731        Z = z[:]
732        V = volumes[:]
733        T = time[:]
734        S = stage[:,:]
735
736#         print "****************************"
737#         print "X ",X
738#         print "****************************"
739#         print "Y ",Y
740#         print "****************************"
741#         print "Z ",Z
742#         print "****************************"
743#         print "V ",V
744#         print "****************************"
745#         print "Time ",T
746#         print "****************************"
747#         print "Stage ",S
748#         print "****************************"
749
750
751        fid.close()
752
753        #Cleanup
754        os.remove(sww.filename)
755
756
757
758    def test_export_grid(self):
759        """
760        test_export_grid(self):
761        Test that sww information can be converted correctly to asc/prj
762        format readable by e.g. ArcView
763        """
764
765        import time, os
766        from Scientific.IO.NetCDF import NetCDFFile
767
768        try:
769            os.remove('teg*.sww')
770        except:
771            pass
772
773        #Setup
774        self.domain.set_name('teg')
775
776        prjfile = self.domain.get_name() + '_elevation.prj'
777        ascfile = self.domain.get_name() + '_elevation.asc'
778        swwfile = self.domain.get_name() + '.sww'
779
780        self.domain.set_datadir('.')
781        self.domain.smooth = True
782        self.domain.set_quantity('elevation', lambda x,y: -x-y)
783        self.domain.set_quantity('stage', 1.0)
784
785        self.domain.geo_reference = Geo_reference(56,308500,6189000)
786
787        sww = SWW_file(self.domain)
788        sww.store_connectivity()
789        sww.store_timestep()
790        self.domain.evolve_to_end(finaltime = 0.01)
791        sww.store_timestep()
792
793        cellsize = 0.25
794        #Check contents
795        #Get NetCDF
796
797        fid = NetCDFFile(sww.filename, netcdf_mode_r)
798
799        # Get the variables
800        x = fid.variables['x'][:]
801        y = fid.variables['y'][:]
802        z = fid.variables['elevation'][:]
803        time = fid.variables['time'][:]
804        stage = fid.variables['stage'][:]
805
806        fid.close()
807
808        #Export to ascii/prj files
809        sww2dem_batch(self.domain.get_name(),
810                quantities = 'elevation',
811                cellsize = cellsize,
812                verbose = self.verbose,
813                format = 'asc')
814
815        #Check asc file
816        ascid = open(ascfile)
817        lines = ascid.readlines()
818        ascid.close()
819
820        L = lines[2].strip().split()
821        assert L[0].strip().lower() == 'xllcorner'
822        assert num.allclose(float(L[1].strip().lower()), 308500)
823
824        L = lines[3].strip().split()
825        assert L[0].strip().lower() == 'yllcorner'
826        assert num.allclose(float(L[1].strip().lower()), 6189000)
827
828        #Check grid values
829        for j in range(5):
830            L = lines[6+j].strip().split()
831            y = (4-j) * cellsize
832            for i in range(5):
833                assert num.allclose(float(L[i]), -i*cellsize - y)
834               
835        #Cleanup
836        os.remove(prjfile)
837        os.remove(ascfile)
838        os.remove(swwfile)
839
840    def test_export_gridII(self):
841        """
842        test_export_gridII(self):
843        Test that sww information can be converted correctly to asc/prj
844        format readable by e.g. ArcView
845        """
846
847        import time, os
848        from Scientific.IO.NetCDF import NetCDFFile
849
850        try:
851            os.remove('teg*.sww')
852        except:
853            pass
854
855        #Setup
856        self.domain.set_name('tegII')
857
858        swwfile = self.domain.get_name() + '.sww'
859
860        self.domain.set_datadir('.')
861        self.domain.smooth = True
862        self.domain.set_quantity('elevation', lambda x,y: -x-y)
863        self.domain.set_quantity('stage', 1.0)
864
865        self.domain.geo_reference = Geo_reference(56,308500,6189000)
866
867        sww = SWW_file(self.domain)
868        sww.store_connectivity()
869        sww.store_timestep()
870        self.domain.evolve_to_end(finaltime = 0.01)
871        sww.store_timestep()
872
873        cellsize = 0.25
874        #Check contents
875        #Get NetCDF
876
877        fid = NetCDFFile(sww.filename, netcdf_mode_r)
878
879        # Get the variables
880        x = fid.variables['x'][:]
881        y = fid.variables['y'][:]
882        z = fid.variables['elevation'][:]
883        time = fid.variables['time'][:]
884        stage = fid.variables['stage'][:]
885        xmomentum = fid.variables['xmomentum'][:]
886        ymomentum = fid.variables['ymomentum'][:]       
887
888        #print 'stage', stage
889        #print 'xmom', xmomentum
890        #print 'ymom', ymomentum       
891
892        fid.close()
893
894        #Export to ascii/prj files
895        if True:
896            sww2dem_batch(self.domain.get_name(),
897                        quantities = ['elevation', 'depth'],
898                        cellsize = cellsize,
899                        verbose = self.verbose,
900                        format = 'asc')
901
902        else:
903            sww2dem_batch(self.domain.get_name(),
904                quantities = ['depth'],
905                cellsize = cellsize,
906                verbose = self.verbose,
907                format = 'asc')
908
909
910            export_grid(self.domain.get_name(),
911                quantities = ['elevation'],
912                cellsize = cellsize,
913                verbose = self.verbose,
914                format = 'asc')
915
916        prjfile = self.domain.get_name() + '_elevation.prj'
917        ascfile = self.domain.get_name() + '_elevation.asc'
918       
919        #Check asc file
920        ascid = open(ascfile)
921        lines = ascid.readlines()
922        ascid.close()
923
924        L = lines[2].strip().split()
925        assert L[0].strip().lower() == 'xllcorner'
926        assert num.allclose(float(L[1].strip().lower()), 308500)
927
928        L = lines[3].strip().split()
929        assert L[0].strip().lower() == 'yllcorner'
930        assert num.allclose(float(L[1].strip().lower()), 6189000)
931
932        #print "ascfile", ascfile
933        #Check grid values
934        for j in range(5):
935            L = lines[6+j].strip().split()
936            y = (4-j) * cellsize
937            for i in range(5):
938                #print " -i*cellsize - y",  -i*cellsize - y
939                #print "float(L[i])", float(L[i])
940                assert num.allclose(float(L[i]), -i*cellsize - y)
941
942        #Cleanup
943        os.remove(prjfile)
944        os.remove(ascfile)
945       
946        #Check asc file
947        ascfile = self.domain.get_name() + '_depth.asc'
948        prjfile = self.domain.get_name() + '_depth.prj'
949        ascid = open(ascfile)
950        lines = ascid.readlines()
951        ascid.close()
952
953        L = lines[2].strip().split()
954        assert L[0].strip().lower() == 'xllcorner'
955        assert num.allclose(float(L[1].strip().lower()), 308500)
956
957        L = lines[3].strip().split()
958        assert L[0].strip().lower() == 'yllcorner'
959        assert num.allclose(float(L[1].strip().lower()), 6189000)
960
961        #Check grid values
962        for j in range(5):
963            L = lines[6+j].strip().split()
964            y = (4-j) * cellsize
965            for i in range(5):
966                #print " -i*cellsize - y",  -i*cellsize - y
967                #print "float(L[i])", float(L[i])               
968                assert num.allclose(float(L[i]), 1 - (-i*cellsize - y))
969
970        #Cleanup
971        os.remove(prjfile)
972        os.remove(ascfile)
973        os.remove(swwfile)
974
975
976    def test_export_gridIII(self):
977        """
978        test_export_gridIII
979        Test that sww information can be converted correctly to asc/prj
980        format readable by e.g. ArcView
981        """
982
983        import time, os
984        from Scientific.IO.NetCDF import NetCDFFile
985
986        try:
987            os.remove('teg*.sww')
988        except:
989            pass
990
991        #Setup
992       
993        self.domain.set_name('tegIII')
994
995        swwfile = self.domain.get_name() + '.sww'
996
997        self.domain.set_datadir('.')
998        self.domain.format = 'sww'
999        self.domain.smooth = True
1000        self.domain.set_quantity('elevation', lambda x,y: -x-y)
1001        self.domain.set_quantity('stage', 1.0)
1002
1003        self.domain.geo_reference = Geo_reference(56,308500,6189000)
1004       
1005        sww = SWW_file(self.domain)
1006        sww.store_connectivity()
1007        sww.store_timestep() #'stage')
1008        self.domain.evolve_to_end(finaltime = 0.01)
1009        sww.store_timestep() #'stage')
1010
1011        cellsize = 0.25
1012        #Check contents
1013        #Get NetCDF
1014
1015        fid = NetCDFFile(sww.filename, netcdf_mode_r)
1016
1017        # Get the variables
1018        x = fid.variables['x'][:]
1019        y = fid.variables['y'][:]
1020        z = fid.variables['elevation'][:]
1021        time = fid.variables['time'][:]
1022        stage = fid.variables['stage'][:]
1023
1024        fid.close()
1025
1026        #Export to ascii/prj files
1027        extra_name_out = 'yeah'
1028        if True:
1029            sww2dem_batch(self.domain.get_name(),
1030                        quantities = ['elevation', 'depth'],
1031                        extra_name_out = extra_name_out,
1032                        cellsize = cellsize,
1033                        verbose = self.verbose,
1034                        format = 'asc')
1035
1036        else:
1037            sww2dem_batch(self.domain.get_name(),
1038                quantities = ['depth'],
1039                cellsize = cellsize,
1040                verbose = self.verbose,
1041                format = 'asc')
1042
1043
1044            sww2dem_batch(self.domain.get_name(),
1045                quantities = ['elevation'],
1046                cellsize = cellsize,
1047                verbose = self.verbose,
1048                format = 'asc')
1049
1050        prjfile = self.domain.get_name() + '_elevation_yeah.prj'
1051        ascfile = self.domain.get_name() + '_elevation_yeah.asc'
1052       
1053        #Check asc file
1054        ascid = open(ascfile)
1055        lines = ascid.readlines()
1056        ascid.close()
1057
1058        L = lines[2].strip().split()
1059        assert L[0].strip().lower() == 'xllcorner'
1060        assert num.allclose(float(L[1].strip().lower()), 308500)
1061
1062        L = lines[3].strip().split()
1063        assert L[0].strip().lower() == 'yllcorner'
1064        assert num.allclose(float(L[1].strip().lower()), 6189000)
1065
1066        #print "ascfile", ascfile
1067        #Check grid values
1068        for j in range(5):
1069            L = lines[6+j].strip().split()
1070            y = (4-j) * cellsize
1071            for i in range(5):
1072                #print " -i*cellsize - y",  -i*cellsize - y
1073                #print "float(L[i])", float(L[i])
1074                assert num.allclose(float(L[i]), -i*cellsize - y)
1075               
1076        #Cleanup
1077        os.remove(prjfile)
1078        os.remove(ascfile)
1079       
1080        #Check asc file
1081        ascfile = self.domain.get_name() + '_depth_yeah.asc'
1082        prjfile = self.domain.get_name() + '_depth_yeah.prj'
1083        ascid = open(ascfile)
1084        lines = ascid.readlines()
1085        ascid.close()
1086
1087        L = lines[2].strip().split()
1088        assert L[0].strip().lower() == 'xllcorner'
1089        assert num.allclose(float(L[1].strip().lower()), 308500)
1090
1091        L = lines[3].strip().split()
1092        assert L[0].strip().lower() == 'yllcorner'
1093        assert num.allclose(float(L[1].strip().lower()), 6189000)
1094
1095        #Check grid values
1096        for j in range(5):
1097            L = lines[6+j].strip().split()
1098            y = (4-j) * cellsize
1099            for i in range(5):
1100                assert num.allclose(float(L[i]), 1 - (-i*cellsize - y))
1101
1102        #Cleanup
1103        os.remove(prjfile)
1104        os.remove(ascfile)
1105        os.remove(swwfile)
1106
1107    def test_export_grid_bad(self):
1108        """Test that sww information can be converted correctly to asc/prj
1109        format readable by e.g. ArcView
1110        """
1111
1112        try:
1113            sww2dem_batch('a_small_round-egg',
1114                        quantities = ['elevation', 'depth'],
1115                        cellsize = 99,
1116                        verbose = self.verbose,
1117                        format = 'asc')
1118        except IOError:
1119            pass
1120        else:
1121            self.failUnless(0 ==1,  'Bad input did not throw exception error!')
1122
1123
1124    def test_file_boundary_stsIV_sinewave_ordering(self):
1125        """test_file_boundary_stsIV_sinewave_ordering(self):
1126        Read correct points from ordering file and apply sts to boundary
1127        This one uses a sine wave and compares to time boundary
1128        """
1129
1130        lat_long_points=[[6.01,97.0],[6.02,97.0],[6.05,96.9],[6.0,97.0]]
1131        bounding_polygon=[[6.0,97.0],[6.01,97.0],[6.02,97.0],[6.02,97.02],[6.00,97.02]]
1132        tide = 0.35
1133        time_step_count = 50
1134        time_step = 0.1
1135        times_ref = num.arange(0, time_step_count*time_step, time_step)
1136       
1137        n=len(lat_long_points)
1138        first_tstep=num.ones(n,num.int)
1139        last_tstep=(time_step_count)*num.ones(n,num.int)
1140       
1141        gauge_depth=20*num.ones(n,num.float)
1142       
1143        ha1=num.ones((n,time_step_count),num.float)
1144        ua1=3.*num.ones((n,time_step_count),num.float)
1145        va1=2.*num.ones((n,time_step_count),num.float)
1146        for i in range(n):
1147            ha1[i]=num.sin(times_ref)
1148       
1149       
1150        base_name, files = self.write_mux2(lat_long_points,
1151                                           time_step_count, time_step,
1152                                           first_tstep, last_tstep,
1153                                           depth=gauge_depth,
1154                                           ha=ha1,
1155                                           ua=ua1,
1156                                           va=va1)
1157
1158        # Write order file
1159        file_handle, order_base_name = tempfile.mkstemp("")
1160        os.close(file_handle)
1161        os.remove(order_base_name)
1162        d=","
1163        order_file=order_base_name+'order.txt'
1164        fid=open(order_file,'w')
1165       
1166        # Write Header
1167        header='index, longitude, latitude\n'
1168        fid.write(header)
1169        indices=[3,0,1]
1170        for i in indices:
1171            line=str(i)+d+str(lat_long_points[i][1])+d+\
1172                str(lat_long_points[i][0])+"\n"
1173            fid.write(line)
1174        fid.close()
1175
1176        sts_file=base_name
1177        urs2sts(base_name, basename_out=sts_file,
1178                ordering_filename=order_file,
1179                mean_stage=tide,
1180                verbose=False)
1181        self.delete_mux(files)
1182       
1183       
1184       
1185        # Now read the sts file and check that values have been stored correctly.
1186        fid = NetCDFFile(sts_file + '.sts')
1187
1188        # Check the time vector
1189        times = fid.variables['time'][:]
1190       
1191        #print times
1192
1193        # Check sts quantities
1194        stage = fid.variables['stage'][:]
1195        xmomentum = fid.variables['xmomentum'][:]
1196        ymomentum = fid.variables['ymomentum'][:]
1197        elevation = fid.variables['elevation'][:]
1198
1199        #print stage
1200        #print xmomentum
1201        #print ymomentum
1202        #print elevation
1203       
1204       
1205
1206        # Create beginnings of boundary polygon based on sts_boundary
1207        boundary_polygon = create_sts_boundary(base_name)
1208       
1209        os.remove(order_file)
1210
1211        # Append the remaining part of the boundary polygon to be defined by
1212        # the user
1213        bounding_polygon_utm=[]
1214        for point in bounding_polygon:
1215            zone,easting,northing=redfearn(point[0],point[1])
1216            bounding_polygon_utm.append([easting,northing])
1217
1218        boundary_polygon.append(bounding_polygon_utm[3])
1219        boundary_polygon.append(bounding_polygon_utm[4])
1220
1221        #print 'boundary_polygon', boundary_polygon
1222       
1223        plot=False
1224        if plot:
1225            from pylab import plot,show,axis
1226            boundary_polygon=ensure_numeric(boundary_polygon)
1227            bounding_polygon_utm=ensure_numeric(bounding_polygon_utm)
1228            #plot(lat_long_points[:,0],lat_long_points[:,1],'o')
1229            plot(boundary_polygon[:,0], boundary_polygon[:,1])
1230            plot(bounding_polygon_utm[:,0],bounding_polygon_utm[:,1])
1231            show()
1232
1233        assert num.allclose(bounding_polygon_utm,boundary_polygon)
1234
1235
1236        extent_res=1000000
1237        meshname = 'urs_test_mesh' + '.tsh'
1238        interior_regions=None
1239        boundary_tags={'ocean': [0,1], 'otherocean': [2,3,4]}
1240       
1241        # have to change boundary tags from last example because now bounding
1242        # polygon starts in different place.
1243        create_mesh_from_regions(boundary_polygon,
1244                                 boundary_tags=boundary_tags,
1245                                 maximum_triangle_area=extent_res,
1246                                 filename=meshname,
1247                                 interior_regions=interior_regions,
1248                                 verbose=False)
1249       
1250        domain_fbound = Domain(meshname)
1251        domain_fbound.set_quantity('stage', tide)
1252        Bf = File_boundary(sts_file+'.sts', 
1253                           domain_fbound, 
1254                           boundary_polygon=boundary_polygon)
1255        Br = Reflective_boundary(domain_fbound)
1256
1257        domain_fbound.set_boundary({'ocean': Bf,'otherocean': Br})
1258        finaltime=time_step*(time_step_count-1)
1259        yieldstep=time_step
1260        temp_fbound=num.zeros(int(finaltime/yieldstep)+1,num.float)
1261   
1262        for i, t in enumerate(domain_fbound.evolve(yieldstep=yieldstep,
1263                                                   finaltime=finaltime, 
1264                                                   skip_initial_step=False)):
1265            temp_fbound[i]=domain_fbound.quantities['stage'].centroid_values[2]
1266   
1267       
1268        domain_time = Domain(meshname)
1269        domain_time.set_quantity('stage', tide)
1270        Br = Reflective_boundary(domain_time)
1271        Bw = Time_boundary(domain=domain_time,
1272                         f=lambda t: [num.sin(t)+tide,3.*(20.+num.sin(t)+tide),2.*(20.+num.sin(t)+tide)])
1273        domain_time.set_boundary({'ocean': Bw,'otherocean': Br})
1274       
1275        temp_time=num.zeros(int(finaltime/yieldstep)+1,num.float)
1276        for i, t in enumerate(domain_time.evolve(yieldstep=yieldstep,
1277                                                   finaltime=finaltime, 
1278                                                   skip_initial_step=False)):
1279            temp_time[i]=domain_time.quantities['stage'].centroid_values[2]
1280
1281
1282
1283        #print temp_fbound
1284        #print temp_time
1285
1286        #print domain_fbound.quantities['stage'].vertex_values
1287        #print domain_time.quantities['stage'].vertex_values
1288       
1289        assert num.allclose(temp_fbound, temp_time)               
1290        assert num.allclose(domain_fbound.quantities['stage'].vertex_values,
1291                            domain_time.quantities['stage'].vertex_values)
1292                       
1293        assert num.allclose(domain_fbound.quantities['xmomentum'].vertex_values,
1294                            domain_time.quantities['xmomentum'].vertex_values)                       
1295                       
1296        assert num.allclose(domain_fbound.quantities['ymomentum'].vertex_values,
1297                            domain_time.quantities['ymomentum'].vertex_values)                                               
1298       
1299
1300        try:
1301            os.remove(sts_file+'.sts')
1302        except:
1303            # Windoze can't remove this file for some reason
1304            pass
1305       
1306        os.remove(meshname)
1307       
1308       
1309
1310       
1311       
1312    def test_file_boundary_sts_time_limit(self):
1313        """test_file_boundary_stsIV_sinewave_ordering(self):
1314        Read correct points from ordering file and apply sts to boundary
1315        This one uses a sine wave and compares to time boundary
1316       
1317        This one tests that times used can be limited by upper limit
1318        """
1319       
1320        lat_long_points=[[6.01,97.0],[6.02,97.0],[6.05,96.9],[6.0,97.0]]
1321        bounding_polygon=[[6.0,97.0],[6.01,97.0],[6.02,97.0],[6.02,97.02],[6.00,97.02]]
1322        tide = 0.35
1323        time_step_count = 50
1324        time_step = 0.1
1325        times_ref = num.arange(0, time_step_count*time_step, time_step)
1326       
1327        n=len(lat_long_points)
1328        first_tstep=num.ones(n,num.int)
1329        last_tstep=(time_step_count)*num.ones(n,num.int)
1330       
1331        gauge_depth=20*num.ones(n,num.float)
1332       
1333        ha1=num.ones((n,time_step_count),num.float)
1334        ua1=3.*num.ones((n,time_step_count),num.float)
1335        va1=2.*num.ones((n,time_step_count),num.float)
1336        for i in range(n):
1337            ha1[i]=num.sin(times_ref)
1338       
1339       
1340        base_name, files = self.write_mux2(lat_long_points,
1341                                           time_step_count, time_step,
1342                                           first_tstep, last_tstep,
1343                                           depth=gauge_depth,
1344                                           ha=ha1,
1345                                           ua=ua1,
1346                                           va=va1)
1347
1348        # Write order file
1349        file_handle, order_base_name = tempfile.mkstemp("")
1350        os.close(file_handle)
1351        os.remove(order_base_name)
1352        d=","
1353        order_file=order_base_name+'order.txt'
1354        fid=open(order_file,'w')
1355       
1356        # Write Header
1357        header='index, longitude, latitude\n'
1358        fid.write(header)
1359        indices=[3,0,1]
1360        for i in indices:
1361            line=str(i)+d+str(lat_long_points[i][1])+d+\
1362                str(lat_long_points[i][0])+"\n"
1363            fid.write(line)
1364        fid.close()
1365
1366        sts_file=base_name
1367        urs2sts(base_name, basename_out=sts_file,
1368                ordering_filename=order_file,
1369                mean_stage=tide,
1370                verbose=False)
1371        self.delete_mux(files)
1372       
1373       
1374       
1375        # Now read the sts file and check that values have been stored correctly.
1376        fid = NetCDFFile(sts_file + '.sts')
1377
1378        # Check the time vector
1379        times = fid.variables['time'][:]
1380        starttime = fid.starttime[0]
1381        #print times
1382        #print starttime
1383
1384        # Check sts quantities
1385        stage = fid.variables['stage'][:]
1386        xmomentum = fid.variables['xmomentum'][:]
1387        ymomentum = fid.variables['ymomentum'][:]
1388        elevation = fid.variables['elevation'][:]
1389
1390       
1391
1392        # Create beginnings of boundary polygon based on sts_boundary
1393        boundary_polygon = create_sts_boundary(base_name)
1394       
1395        os.remove(order_file)
1396
1397        # Append the remaining part of the boundary polygon to be defined by
1398        # the user
1399        bounding_polygon_utm=[]
1400        for point in bounding_polygon:
1401            zone,easting,northing=redfearn(point[0],point[1])
1402            bounding_polygon_utm.append([easting,northing])
1403
1404        boundary_polygon.append(bounding_polygon_utm[3])
1405        boundary_polygon.append(bounding_polygon_utm[4])
1406
1407        #print 'boundary_polygon', boundary_polygon
1408       
1409
1410        assert num.allclose(bounding_polygon_utm,boundary_polygon)
1411
1412
1413        extent_res=1000000
1414        meshname = 'urs_test_mesh' + '.tsh'
1415        interior_regions=None
1416        boundary_tags={'ocean': [0,1], 'otherocean': [2,3,4]}
1417       
1418        # have to change boundary tags from last example because now bounding
1419        # polygon starts in different place.
1420        create_mesh_from_regions(boundary_polygon,
1421                                 boundary_tags=boundary_tags,
1422                                 maximum_triangle_area=extent_res,
1423                                 filename=meshname,
1424                                 interior_regions=interior_regions,
1425                                 verbose=False)
1426       
1427        domain_fbound = Domain(meshname)
1428        domain_fbound.set_quantity('stage', tide)
1429       
1430       
1431        Bf = File_boundary(sts_file+'.sts', 
1432                           domain_fbound, 
1433                           boundary_polygon=boundary_polygon)
1434        time_vec = Bf.F.get_time()
1435        assert num.allclose(Bf.F.starttime, starttime)
1436        assert num.allclose(time_vec, times_ref)                                   
1437       
1438        for time_limit in [0.1, 0.2, 0.5, 1.0, 2.2, 3.0, 4.3, 6.0, 10.0]:
1439            Bf = File_boundary(sts_file+'.sts', 
1440                               domain_fbound, 
1441                               time_limit=time_limit+starttime,
1442                               boundary_polygon=boundary_polygon)
1443       
1444            time_vec = Bf.F.get_time()
1445            assert num.allclose(Bf.F.starttime, starttime)           
1446            assert num.alltrue(time_vec < time_limit)
1447           
1448           
1449        try:   
1450            Bf = File_boundary(sts_file+'.sts', 
1451                               domain_fbound, 
1452                               time_limit=-1+starttime,
1453                               boundary_polygon=boundary_polygon)           
1454            time_vec = Bf.F.get_time()   
1455            print time_vec   
1456        except AssertionError:
1457            pass
1458        else:
1459            raise Exception, 'Should have raised Exception here'
1460
1461    #### END TESTS FOR URS 2 SWW  ###
1462
1463
1464    def test_triangulation(self):
1465        #
1466       
1467       
1468        filename = tempfile.mktemp("_data_manager.sww")
1469        outfile = NetCDFFile(filename, netcdf_mode_w)
1470        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
1471        volumes = (0,1,2)
1472        elevation = [0,1,2]
1473        new_origin = None
1474        new_origin = Geo_reference(56, 0, 0)
1475        times = [0, 10]
1476        number_of_volumes = len(volumes)
1477        number_of_points = len(points_utm)
1478        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])
1479        sww.store_header(outfile, times, number_of_volumes,
1480                         number_of_points, description='fully sick testing',
1481                         verbose=self.verbose,sww_precision=netcdf_float)
1482        sww.store_triangulation(outfile, points_utm, volumes,
1483                                elevation,  new_origin=new_origin,
1484                                verbose=self.verbose)       
1485        outfile.close()
1486        fid = NetCDFFile(filename)
1487
1488        x = fid.variables['x'][:]
1489        y = fid.variables['y'][:]
1490        fid.close()
1491
1492        assert num.allclose(num.array(map(None, x,y)), points_utm)
1493        os.remove(filename)
1494
1495       
1496    def test_triangulationII(self):
1497        #
1498       
1499       
1500        filename = tempfile.mktemp("_data_manager.sww")
1501        outfile = NetCDFFile(filename, netcdf_mode_w)
1502        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
1503        volumes = (0,1,2)
1504        elevation = [0,1,2]
1505        new_origin = None
1506        #new_origin = Geo_reference(56, 0, 0)
1507        times = [0, 10]
1508        number_of_volumes = len(volumes)
1509        number_of_points = len(points_utm)
1510        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
1511        sww.store_header(outfile, times, number_of_volumes,
1512                         number_of_points, description='fully sick testing',
1513                         verbose=self.verbose,sww_precision=netcdf_float)
1514        sww.store_triangulation(outfile, points_utm, volumes,
1515                                new_origin=new_origin,
1516                                verbose=self.verbose)
1517        sww.store_static_quantities(outfile, elevation=elevation)                               
1518                               
1519        outfile.close()
1520        fid = NetCDFFile(filename)
1521
1522        x = fid.variables['x'][:]
1523        y = fid.variables['y'][:]
1524        results_georef = Geo_reference()
1525        results_georef.read_NetCDF(fid)
1526        assert results_georef == Geo_reference(DEFAULT_ZONE, 0, 0)
1527        fid.close()
1528
1529        assert num.allclose(num.array(map(None, x,y)), points_utm)
1530        os.remove(filename)
1531
1532       
1533    def test_triangulation_new_origin(self):
1534        #
1535       
1536       
1537        filename = tempfile.mktemp("_data_manager.sww")
1538        outfile = NetCDFFile(filename, netcdf_mode_w)
1539        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
1540        volumes = (0,1,2)
1541        elevation = [0,1,2]
1542        new_origin = None
1543        new_origin = Geo_reference(56, 1, 554354)
1544        points_utm = new_origin.change_points_geo_ref(points_utm)
1545        times = [0, 10]
1546        number_of_volumes = len(volumes)
1547        number_of_points = len(points_utm)
1548        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
1549        sww.store_header(outfile, times, number_of_volumes,
1550                         number_of_points, description='fully sick testing',
1551                         verbose=self.verbose,sww_precision=netcdf_float)
1552        sww.store_triangulation(outfile, points_utm, volumes,
1553                                elevation,  new_origin=new_origin,
1554                                verbose=self.verbose)
1555        outfile.close()
1556        fid = NetCDFFile(filename)
1557
1558        x = fid.variables['x'][:]
1559        y = fid.variables['y'][:]
1560        results_georef = Geo_reference()
1561        results_georef.read_NetCDF(fid)
1562        assert results_georef == new_origin
1563        fid.close()
1564
1565        absolute = Geo_reference(56, 0,0)
1566        assert num.allclose(num.array(
1567            absolute.change_points_geo_ref(map(None, x,y),
1568                                           new_origin)),points_utm)
1569       
1570        os.remove(filename)
1571       
1572    def test_triangulation_points_georeference(self):
1573        #
1574       
1575       
1576        filename = tempfile.mktemp("_data_manager.sww")
1577        outfile = NetCDFFile(filename, netcdf_mode_w)
1578        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
1579        volumes = (0,1,2)
1580        elevation = [0,1,2]
1581        new_origin = None
1582        points_georeference = Geo_reference(56, 1, 554354)
1583        points_utm = points_georeference.change_points_geo_ref(points_utm)
1584        times = [0, 10]
1585        number_of_volumes = len(volumes)
1586        number_of_points = len(points_utm)
1587        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
1588        sww.store_header(outfile, times, number_of_volumes,
1589                         number_of_points, description='fully sick testing',
1590                         verbose=self.verbose,sww_precision=netcdf_float)
1591        sww.store_triangulation(outfile, points_utm, volumes,
1592                                elevation,  new_origin=new_origin,
1593                                points_georeference=points_georeference,
1594                                verbose=self.verbose)       
1595        outfile.close()
1596        fid = NetCDFFile(filename)
1597
1598        x = fid.variables['x'][:]
1599        y = fid.variables['y'][:]
1600        results_georef = Geo_reference()
1601        results_georef.read_NetCDF(fid)
1602        assert results_georef == points_georeference
1603        fid.close()
1604
1605        assert num.allclose(num.array(map(None, x,y)), points_utm)
1606        os.remove(filename)
1607       
1608    def test_triangulation_2_geo_refs(self):
1609        #
1610       
1611       
1612        filename = tempfile.mktemp("_data_manager.sww")
1613        outfile = NetCDFFile(filename, netcdf_mode_w)
1614        points_utm = num.array([[0.,0.],[1.,1.], [0.,1.]])
1615        volumes = (0,1,2)
1616        elevation = [0,1,2]
1617        new_origin = Geo_reference(56, 1, 1)
1618        points_georeference = Geo_reference(56, 0, 0)
1619        points_utm = points_georeference.change_points_geo_ref(points_utm)
1620        times = [0, 10]
1621        number_of_volumes = len(volumes)
1622        number_of_points = len(points_utm)
1623        sww = Write_sww(['elevation'], ['stage', 'xmomentum', 'ymomentum'])       
1624        sww.store_header(outfile, times, number_of_volumes,
1625                         number_of_points, description='fully sick testing',
1626                         verbose=self.verbose,sww_precision=netcdf_float)
1627        sww.store_triangulation(outfile, points_utm, volumes,
1628                                elevation,  new_origin=new_origin,
1629                                points_georeference=points_georeference,
1630                                verbose=self.verbose)       
1631        outfile.close()
1632        fid = NetCDFFile(filename)
1633
1634        x = fid.variables['x'][:]
1635        y = fid.variables['y'][:]
1636        results_georef = Geo_reference()
1637        results_georef.read_NetCDF(fid)
1638        assert results_georef == new_origin
1639        fid.close()
1640
1641
1642        absolute = Geo_reference(56, 0,0)
1643        assert num.allclose(num.array(
1644            absolute.change_points_geo_ref(map(None, x,y),
1645                                           new_origin)),points_utm)
1646        os.remove(filename)
1647
1648 
1649    def test_points2polygon(self): 
1650        att_dict = {}
1651        pointlist = num.array([[1.0, 0.0],[0.0, 1.0],[0.0, 0.0]])
1652        att_dict['elevation'] = num.array([10.1, 0.0, 10.4])
1653        att_dict['brightness'] = num.array([10.0, 1.0, 10.4])
1654       
1655        fileName = tempfile.mktemp(".csv")
1656       
1657        G = Geospatial_data(pointlist, att_dict)
1658       
1659        G.export_points_file(fileName)
1660       
1661        polygon = load_pts_as_polygon(fileName)
1662       
1663        # This test may fail if the order changes
1664        assert (polygon == [[0.0, 0.0],[1.0, 0.0],[0.0, 1.0]])
1665       
1666   
1667    def test_csv2polygons(self):
1668        """test_csv2polygons
1669        """
1670       
1671        path = get_pathname_from_package('anuga.shallow_water')               
1672        testfile = os.path.join(path, 'polygon_values_example.csv')               
1673
1674        polygons, values = load_csv_as_polygons(testfile, 
1675                                        value_name='floors')
1676
1677        assert len(polygons) == 7, 'Must have seven polygons'
1678        assert len(values) == 7, 'Must have seven values'
1679           
1680        # Known floor values
1681        floors = {'1': 2, '2': 0, '3': 1, '4': 2, '5': 0, '8': 1, '9': 1}
1682       
1683        # Known polygon values
1684        known_polys = {'1': [[422681.61,871117.55],
1685                             [422691.02,871117.60],
1686                             [422690.87,871084.23],
1687                             [422649.36,871081.85],
1688                             [422649.36,871080.39],
1689                             [422631.86,871079.50],
1690                             [422631.72,871086.75],
1691                             [422636.75,871087.20],
1692                             [422636.75,871091.50],
1693                             [422649.66,871092.09],
1694                             [422649.83,871084.91],
1695                             [422652.94,871084.90],
1696                             [422652.84,871092.39],
1697                             [422681.83,871093.73],
1698                             [422681.61,871117.55]],
1699                       '2': [[422664.22,870785.46],
1700                             [422672.48,870780.14],
1701                             [422668.17,870772.62],
1702                             [422660.35,870777.17],
1703                             [422664.22,870785.46]],
1704                       '3': [[422661.30,871215.06],
1705                             [422667.50,871215.70],
1706                             [422668.30,871204.86],
1707                             [422662.21,871204.33],
1708                             [422661.30,871215.06]],
1709                       '4': [[422473.44,871191.22],
1710                             [422478.33,871192.26],
1711                             [422479.52,871186.03],
1712                             [422474.78,871185.14],
1713                             [422473.44,871191.22]],
1714                       '5': [[422369.69,871049.29],
1715                             [422378.63,871053.58],
1716                             [422383.91,871044.51],
1717                             [422374.97,871040.32],
1718                             [422369.69,871049.29]],
1719                       '8': [[422730.56,871203.13],
1720                             [422734.10,871204.90],
1721                             [422735.26,871202.18],
1722                             [422731.87,871200.58],
1723                             [422730.56,871203.13]],
1724                       '9': [[422659.85,871213.80],
1725                             [422660.91,871210.97],
1726                             [422655.42,871208.85],
1727                             [422654.36,871211.68],
1728                             [422659.85,871213.80]]
1729                       }       
1730       
1731
1732       
1733               
1734        for id in ['1', '2', '3', '4', '5' ,'8' ,'9']:
1735            assert id in polygons.keys()
1736            assert id in values.keys()           
1737
1738            assert int(values[id]) == int(floors[id])
1739            assert len(polygons[id]) == len(known_polys[id])
1740            assert num.allclose(polygons[id], known_polys[id])
1741
1742
1743    def test_csv2polygons_with_clipping(self):
1744        """test_csv2polygons with optional clipping
1745        """
1746        #FIXME(Ole): Not Done!!
1747       
1748        path = get_pathname_from_package('anuga.shallow_water')               
1749        testfile = os.path.join(path, 'polygon_values_example.csv')               
1750
1751        polygons, values = load_csv_as_polygons(testfile, 
1752                                        value_name='floors',
1753                                        clipping_polygons=None)
1754
1755        assert len(polygons) == 7, 'Must have seven polygons'
1756        assert len(values) == 7, 'Must have seven values'
1757           
1758        # Known floor values
1759        floors = {'1': 2, '2': 0, '3': 1, '4': 2, '5': 0, '8': 1, '9': 1}
1760       
1761        # Known polygon values
1762        known_polys = {'1': [[422681.61,871117.55],
1763                             [422691.02,871117.60],
1764                             [422690.87,871084.23],
1765                             [422649.36,871081.85],
1766                             [422649.36,871080.39],
1767                             [422631.86,871079.50],
1768                             [422631.72,871086.75],
1769                             [422636.75,871087.20],
1770                             [422636.75,871091.50],
1771                             [422649.66,871092.09],
1772                             [422649.83,871084.91],
1773                             [422652.94,871084.90],
1774                             [422652.84,871092.39],
1775                             [422681.83,871093.73],
1776                             [422681.61,871117.55]],
1777                       '2': [[422664.22,870785.46],
1778                             [422672.48,870780.14],
1779                             [422668.17,870772.62],
1780                             [422660.35,870777.17],
1781                             [422664.22,870785.46]],
1782                       '3': [[422661.30,871215.06],
1783                             [422667.50,871215.70],
1784                             [422668.30,871204.86],
1785                             [422662.21,871204.33],
1786                             [422661.30,871215.06]],
1787                       '4': [[422473.44,871191.22],
1788                             [422478.33,871192.26],
1789                             [422479.52,871186.03],
1790                             [422474.78,871185.14],
1791                             [422473.44,871191.22]],
1792                       '5': [[422369.69,871049.29],
1793                             [422378.63,871053.58],
1794                             [422383.91,871044.51],
1795                             [422374.97,871040.32],
1796                             [422369.69,871049.29]],
1797                       '8': [[422730.56,871203.13],
1798                             [422734.10,871204.90],
1799                             [422735.26,871202.18],
1800                             [422731.87,871200.58],
1801                             [422730.56,871203.13]],
1802                       '9': [[422659.85,871213.80],
1803                             [422660.91,871210.97],
1804                             [422655.42,871208.85],
1805                             [422654.36,871211.68],
1806                             [422659.85,871213.80]]
1807                       }       
1808       
1809
1810       
1811               
1812        for id in ['1', '2', '3', '4', '5' ,'8' ,'9']:
1813            assert id in polygons.keys()
1814            assert id in values.keys()           
1815
1816            assert int(values[id]) == int(floors[id])
1817            assert len(polygons[id]) == len(known_polys[id])
1818            assert num.allclose(polygons[id], known_polys[id])
1819
1820
1821
1822
1823   
1824    def test_csv2building_polygons(self):
1825        """test_csv2building_polygons
1826        """
1827       
1828        path = get_pathname_from_package('anuga.shallow_water')               
1829        testfile = os.path.join(path, 'polygon_values_example.csv')               
1830
1831        polygons, values = load_csv_as_building_polygons(testfile, 
1832                                                 floor_height=3)
1833
1834        assert len(polygons) == 7, 'Must have seven polygons'
1835        assert len(values) == 7, 'Must have seven values'
1836           
1837        # Known floor values
1838        floors = {'1': 6, '2': 0, '3': 3, '4': 6, '5': 0, '8': 3, '9': 3}
1839       
1840               
1841        for id in ['1', '2', '3', '4', '5' ,'8' ,'9']:
1842            assert id in polygons.keys()
1843            assert id in values.keys()           
1844
1845            assert float(values[id]) == float(floors[id])
1846
1847
1848#-------------------------------------------------------------
1849
1850if __name__ == "__main__":
1851    #suite = unittest.makeSuite(Test_Data_Manager, 'test_sww2domain2')
1852    suite = unittest.makeSuite(Test_Data_Manager, 'test_sww')
1853   
1854   
1855   
1856    # FIXME(Ole): When Ross has implemented logging, we can
1857    # probably get rid of all this:
1858    if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V':
1859        Test_Data_Manager.verbose=True
1860        saveout = sys.stdout   
1861        filename = ".temp_verbose"
1862        fid = open(filename, 'w')
1863        sys.stdout = fid
1864    else:
1865        pass
1866    runner = unittest.TextTestRunner() #verbosity=2)
1867    runner.run(suite)
1868
1869    # Cleaning up
1870    if len(sys.argv) > 1 and sys.argv[1][0].upper() == 'V':
1871        sys.stdout = saveout
1872        fid.close() 
1873        os.remove(filename)
1874
1875
Note: See TracBrowser for help on using the repository browser.