source: trunk/anuga_core/source/anuga/file_conversion/test_2pts.py @ 7776

Last change on this file since 7776 was 7776, checked in by hudson, 13 years ago

Removed redundant data_manager class. Unit tests are running, but may fail.

File size: 7.2 KB
Line 
1# external modules
2import unittest
3import tempfile
4import shutil
5import numpy as num
6
7# ANUGA modules
8from anuga.shallow_water.shallow_water_domain import Domain
9from anuga.coordinate_transforms.geo_reference import Geo_reference
10from anuga.file.sww import Write_sww, SWW_file
11from anuga.abstract_2d_finite_volumes.generic_boundary_conditions \
12                            import Transmissive_boundary
13from anuga.config import netcdf_mode_r, netcdf_mode_w, netcdf_mode_a, \
14                            netcdf_float
15from anuga.geospatial_data.geospatial_data import Geospatial_data
16
17# local modules
18from sdf2pts import sdf2pts
19from sww2pts import sww2pts
20
21
22class Test_2Pts(unittest.TestCase):
23    """ Test files that convert to pts format. """
24   
25    def test_hecras_cross_sections2pts(self):
26        """Test conversion from HECRAS cross sections in ascii format
27        to native NetCDF pts format
28        """
29
30        import time, os
31        from Scientific.IO.NetCDF import NetCDFFile
32
33        #Write test asc file
34        root = 'hecrastest'
35
36        filename = root+'.sdf'
37        fid = open(filename, 'w')
38        fid.write("""
39# RAS export file created on Mon 15Aug2005 11:42
40# by HEC-RAS Version 3.1.1
41
42BEGIN HEADER:
43  UNITS: METRIC
44  DTM TYPE: TIN
45  DTM: v:\1\cit\perth_topo\river_tin
46  STREAM LAYER: c:\\x_local\hecras\21_02_03\up_canning_cent3d.shp
47  CROSS-SECTION LAYER: c:\\x_local\hecras\21_02_03\up_can_xs3d.shp
48  MAP PROJECTION: UTM
49  PROJECTION ZONE: 50
50  DATUM: AGD66
51  VERTICAL DATUM:
52  NUMBER OF REACHES:  19
53  NUMBER OF CROSS-SECTIONS:  2
54END HEADER:
55
56
57BEGIN CROSS-SECTIONS:
58
59  CROSS-SECTION:
60    STREAM ID:Southern-Wungong
61    REACH ID:Southern-Wungong
62    STATION:21410
63    CUT LINE:
64      407546.08 , 6437277.542
65      407329.32 , 6437489.482
66      407283.11 , 6437541.232
67    SURFACE LINE:
68     407546.08,   6437277.54,   52.14
69     407538.88,   6437284.58,   51.07
70     407531.68,   6437291.62,   50.56
71     407524.48,   6437298.66,   49.58
72     407517.28,   6437305.70,   49.09
73     407510.08,   6437312.74,   48.76
74  END:
75
76  CROSS-SECTION:
77    STREAM ID:Swan River
78    REACH ID:Swan Mouth
79    STATION:840.*
80    CUT LINE:
81      381178.0855 , 6452559.0685
82      380485.4755 , 6453169.272
83    SURFACE LINE:
84     381178.09,   6452559.07,   4.17
85     381169.49,   6452566.64,   4.26
86     381157.78,   6452576.96,   4.34
87     381155.97,   6452578.56,   4.35
88     381143.72,   6452589.35,   4.43
89     381136.69,   6452595.54,   4.58
90     381114.74,   6452614.88,   4.41
91     381075.53,   6452649.43,   4.17
92     381071.47,   6452653.00,   3.99
93     381063.46,   6452660.06,   3.67
94     381054.41,   6452668.03,   3.67
95  END:
96END CROSS-SECTIONS:
97""")
98
99        fid.close()
100
101
102        #Convert to NetCDF pts
103        sdf2pts(root)
104
105        #Check contents
106        #Get NetCDF
107        fid = NetCDFFile(root+'.pts', netcdf_mode_r)
108
109        # Get the variables
110        #print fid.variables.keys()
111        points = fid.variables['points']
112        elevation = fid.variables['elevation']
113
114        #Check values
115        ref_points = [[407546.08, 6437277.54],
116                      [407538.88, 6437284.58],
117                      [407531.68, 6437291.62],
118                      [407524.48, 6437298.66],
119                      [407517.28, 6437305.70],
120                      [407510.08, 6437312.74]]
121
122        ref_points += [[381178.09, 6452559.07],
123                       [381169.49, 6452566.64],
124                       [381157.78, 6452576.96],
125                       [381155.97, 6452578.56],
126                       [381143.72, 6452589.35],
127                       [381136.69, 6452595.54],
128                       [381114.74, 6452614.88],
129                       [381075.53, 6452649.43],
130                       [381071.47, 6452653.00],
131                       [381063.46, 6452660.06],
132                       [381054.41, 6452668.03]]
133
134
135        ref_elevation = [52.14, 51.07, 50.56, 49.58, 49.09, 48.76]
136        ref_elevation += [4.17, 4.26, 4.34, 4.35, 4.43, 4.58, 4.41, 4.17, 3.99, 3.67, 3.67]
137
138        #print points[:]
139        #print ref_points
140        assert num.allclose(points, ref_points)
141
142        #print attributes[:]
143        #print ref_elevation
144        assert num.allclose(elevation, ref_elevation)
145
146        #Cleanup
147        fid.close()
148
149
150        os.remove(root + '.sdf')
151        os.remove(root + '.pts')
152
153
154
155
156    def test_sww2pts_centroids(self):
157        """Test that sww information can be converted correctly to pts data at specified coordinates
158        - in this case, the centroids.
159        """
160
161        import time, os
162        from Scientific.IO.NetCDF import NetCDFFile
163        # Used for points that lie outside mesh
164        NODATA_value = 1758323
165
166        # Setup
167        from mesh_factory import rectangular
168
169        # Create shallow water domain
170        domain = Domain(*rectangular(2, 2))
171
172        B = Transmissive_boundary(domain)
173        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
174
175        domain.set_name('datatest')
176
177        ptsfile = domain.get_name() + '_elevation.pts'
178        swwfile = domain.get_name() + '.sww'
179
180        domain.set_datadir('.')
181        domain.format = 'sww'
182        domain.set_quantity('elevation', lambda x,y: -x-y)
183
184        domain.geo_reference = Geo_reference(56,308500,6189000)
185
186        sww = SWW_file(domain)
187        sww.store_connectivity()
188        sww.store_timestep()
189
190        #self.domain.tight_slope_limiters = 1
191        domain.evolve_to_end(finaltime = 0.01)
192        sww.store_timestep()
193
194        # Check contents in NetCDF
195        fid = NetCDFFile(sww.filename, netcdf_mode_r)
196
197        # Get the variables
198        x = fid.variables['x'][:]
199        y = fid.variables['y'][:]
200        elevation = fid.variables['elevation'][:]
201        time = fid.variables['time'][:]
202        stage = fid.variables['stage'][:]
203
204        volumes = fid.variables['volumes'][:]
205
206
207        # Invoke interpolation for vertex points       
208        points = num.concatenate( (x[:,num.newaxis],y[:,num.newaxis]), axis=1 )
209        points = num.ascontiguousarray(points)
210        sww2pts(domain.get_name(),
211                quantity = 'elevation',
212                data_points = points,
213                NODATA_value = NODATA_value)
214        ref_point_values = elevation
215        point_values = Geospatial_data(ptsfile).get_attributes()
216        #print 'P', point_values
217        #print 'Ref', ref_point_values       
218        assert num.allclose(point_values, ref_point_values)       
219
220
221
222        # Invoke interpolation for centroids
223        points = domain.get_centroid_coordinates()
224        #print points
225        sww2pts(domain.get_name(),
226                quantity = 'elevation',
227                data_points = points,
228                NODATA_value = NODATA_value)
229        ref_point_values = [-0.5, -0.5, -1, -1, -1, -1, -1.5, -1.5]   #At centroids
230
231       
232        point_values = Geospatial_data(ptsfile).get_attributes()
233        #print 'P', point_values
234        #print 'Ref', ref_point_values       
235        assert num.allclose(point_values, ref_point_values)       
236
237        fid.close()
238
239        #Cleanup
240        os.remove(sww.filename)
241        os.remove(ptsfile)
242
243
244#-------------------------------------------------------------
245
246if __name__ == "__main__":
247    suite = unittest.makeSuite(Test_2Pts, 'test_sww')
248    runner = unittest.TextTestRunner() #verbosity=2)
249    runner.run(suite)   
Note: See TracBrowser for help on using the repository browser.