source: trunk/anuga_core/source/anuga/file_conversion/test_2pts.py @ 7814

Last change on this file since 7814 was 7814, checked in by hudson, 13 years ago

New filename conventions for file conversion. Filenames must always be passed in with the correct extension.

File size: 7.2 KB
Line 
1# external modules
2import unittest
3import tempfile
4import shutil
5import numpy as num
6
7# ANUGA modules
8from anuga.shallow_water.shallow_water_domain import Domain
9from anuga.coordinate_transforms.geo_reference import Geo_reference
10from anuga.file.sww import Write_sww, SWW_file
11from anuga.abstract_2d_finite_volumes.generic_boundary_conditions \
12                            import Transmissive_boundary
13from anuga.config import netcdf_mode_r, netcdf_mode_w, netcdf_mode_a, \
14                            netcdf_float
15from anuga.geospatial_data.geospatial_data import Geospatial_data
16
17# local modules
18from sdf2pts import sdf2pts
19from sww2pts import sww2pts
20
21
22class Test_2Pts(unittest.TestCase):
23    """ Test files that convert to pts format. """
24   
25    def test_hecras_cross_sections2pts(self):
26        """Test conversion from HECRAS cross sections in ascii format
27        to native NetCDF pts format
28        """
29
30        import time, os
31        from Scientific.IO.NetCDF import NetCDFFile
32
33        #Write test asc file
34        root = 'hecrastest'
35
36        filename = root+'.sdf'
37        fid = open(filename, 'w')
38        fid.write("""
39# RAS export file created on Mon 15Aug2005 11:42
40# by HEC-RAS Version 3.1.1
41
42BEGIN HEADER:
43  UNITS: METRIC
44  DTM TYPE: TIN
45  DTM: v:\1\cit\perth_topo\river_tin
46  STREAM LAYER: c:\\x_local\hecras\21_02_03\up_canning_cent3d.shp
47  CROSS-SECTION LAYER: c:\\x_local\hecras\21_02_03\up_can_xs3d.shp
48  MAP PROJECTION: UTM
49  PROJECTION ZONE: 50
50  DATUM: AGD66
51  VERTICAL DATUM:
52  NUMBER OF REACHES:  19
53  NUMBER OF CROSS-SECTIONS:  2
54END HEADER:
55
56
57BEGIN CROSS-SECTIONS:
58
59  CROSS-SECTION:
60    STREAM ID:Southern-Wungong
61    REACH ID:Southern-Wungong
62    STATION:21410
63    CUT LINE:
64      407546.08 , 6437277.542
65      407329.32 , 6437489.482
66      407283.11 , 6437541.232
67    SURFACE LINE:
68     407546.08,   6437277.54,   52.14
69     407538.88,   6437284.58,   51.07
70     407531.68,   6437291.62,   50.56
71     407524.48,   6437298.66,   49.58
72     407517.28,   6437305.70,   49.09
73     407510.08,   6437312.74,   48.76
74  END:
75
76  CROSS-SECTION:
77    STREAM ID:Swan River
78    REACH ID:Swan Mouth
79    STATION:840.*
80    CUT LINE:
81      381178.0855 , 6452559.0685
82      380485.4755 , 6453169.272
83    SURFACE LINE:
84     381178.09,   6452559.07,   4.17
85     381169.49,   6452566.64,   4.26
86     381157.78,   6452576.96,   4.34
87     381155.97,   6452578.56,   4.35
88     381143.72,   6452589.35,   4.43
89     381136.69,   6452595.54,   4.58
90     381114.74,   6452614.88,   4.41
91     381075.53,   6452649.43,   4.17
92     381071.47,   6452653.00,   3.99
93     381063.46,   6452660.06,   3.67
94     381054.41,   6452668.03,   3.67
95  END:
96END CROSS-SECTIONS:
97""")
98
99        fid.close()
100
101
102        #Convert to NetCDF pts
103        sdf2pts(root+'.sdf')
104
105        #Check contents
106        #Get NetCDF
107        fid = NetCDFFile(root+'.pts', netcdf_mode_r)
108
109        # Get the variables
110        #print fid.variables.keys()
111        points = fid.variables['points']
112        elevation = fid.variables['elevation']
113
114        #Check values
115        ref_points = [[407546.08, 6437277.54],
116                      [407538.88, 6437284.58],
117                      [407531.68, 6437291.62],
118                      [407524.48, 6437298.66],
119                      [407517.28, 6437305.70],
120                      [407510.08, 6437312.74]]
121
122        ref_points += [[381178.09, 6452559.07],
123                       [381169.49, 6452566.64],
124                       [381157.78, 6452576.96],
125                       [381155.97, 6452578.56],
126                       [381143.72, 6452589.35],
127                       [381136.69, 6452595.54],
128                       [381114.74, 6452614.88],
129                       [381075.53, 6452649.43],
130                       [381071.47, 6452653.00],
131                       [381063.46, 6452660.06],
132                       [381054.41, 6452668.03]]
133
134
135        ref_elevation = [52.14, 51.07, 50.56, 49.58, 49.09, 48.76]
136        ref_elevation += [4.17, 4.26, 4.34, 4.35, 4.43, 4.58, 4.41, 4.17, 3.99, 3.67, 3.67]
137
138        #print points[:]
139        #print ref_points
140        assert num.allclose(points, ref_points)
141
142        #print attributes[:]
143        #print ref_elevation
144        assert num.allclose(elevation, ref_elevation)
145
146        #Cleanup
147        fid.close()
148
149
150        os.remove(root + '.sdf')
151        os.remove(root + '.pts')
152
153
154
155
156    def test_sww2pts_centroids(self):
157        """Test that sww information can be converted correctly to pts data at specified coordinates
158        - in this case, the centroids.
159        """
160
161        import time, os
162        from Scientific.IO.NetCDF import NetCDFFile
163        # Used for points that lie outside mesh
164        NODATA_value = 1758323
165
166        # Setup
167        from mesh_factory import rectangular
168
169        # Create shallow water domain
170        domain = Domain(*rectangular(2, 2))
171
172        B = Transmissive_boundary(domain)
173        domain.set_boundary( {'left': B, 'right': B, 'top': B, 'bottom': B})
174
175        domain.set_name('datatest')
176
177        ptsfile = domain.get_name() + '_elevation.pts'
178        swwfile = domain.get_name() + '.sww'
179
180        domain.set_datadir('.')
181        domain.format = 'sww'
182        domain.set_quantity('elevation', lambda x,y: -x-y)
183
184        domain.geo_reference = Geo_reference(56,308500,6189000)
185
186        sww = SWW_file(domain)
187        sww.store_connectivity()
188        sww.store_timestep()
189
190        #self.domain.tight_slope_limiters = 1
191        domain.evolve_to_end(finaltime = 0.01)
192        sww.store_timestep()
193
194        # Check contents in NetCDF
195        fid = NetCDFFile(sww.filename, netcdf_mode_r)
196
197        # Get the variables
198        x = fid.variables['x'][:]
199        y = fid.variables['y'][:]
200        elevation = fid.variables['elevation'][:]
201        time = fid.variables['time'][:]
202        stage = fid.variables['stage'][:]
203
204        volumes = fid.variables['volumes'][:]
205
206
207        # Invoke interpolation for vertex points       
208        points = num.concatenate( (x[:,num.newaxis],y[:,num.newaxis]), axis=1 )
209        points = num.ascontiguousarray(points)
210        sww2pts(domain.get_name() + '.sww',
211                quantity = 'elevation',
212                data_points = points,
213                NODATA_value = NODATA_value)
214        ref_point_values = elevation
215        point_values = Geospatial_data(ptsfile).get_attributes()
216        #print 'P', point_values
217        #print 'Ref', ref_point_values       
218        assert num.allclose(point_values, ref_point_values)       
219
220
221
222        # Invoke interpolation for centroids
223        points = domain.get_centroid_coordinates()
224        #print points
225        sww2pts(domain.get_name() + '.sww',
226                quantity = 'elevation',
227                data_points = points,
228                NODATA_value = NODATA_value)
229        ref_point_values = [-0.5, -0.5, -1, -1, -1, -1, -1.5, -1.5]   #At centroids
230
231       
232        point_values = Geospatial_data(ptsfile).get_attributes()
233        #print 'P', point_values
234        #print 'Ref', ref_point_values       
235        assert num.allclose(point_values, ref_point_values)       
236
237        fid.close()
238
239        #Cleanup
240        os.remove(sww.filename)
241        os.remove(ptsfile)
242
243
244#-------------------------------------------------------------
245
246if __name__ == "__main__":
247    suite = unittest.makeSuite(Test_2Pts, 'test_sww')
248    runner = unittest.TextTestRunner() #verbosity=2)
249    runner.run(suite)   
Note: See TracBrowser for help on using the repository browser.