source: trunk/anuga_core/source/anuga_parallel/test_parallel_file_boundary.py @ 8757

Last change on this file since 8757 was 8757, checked in by steve, 11 years ago

Speed up the check_integrity procedure of neighbour_mesh

File size: 20.6 KB
Line 
1
2"""Run parallel shallow water domain.
3
4   run using command like:
5
6   mpirun -np m python run_parallel_sw_merimbula.py
7
8   where m is the number of processors to be used.
9   
10   Will produce sww files with names domain_Pn_m.sww where m is number of processors and
11   n in [0, m-1] refers to specific processor that owned this part of the partitioned mesh.
12"""
13
14#------------------------------------------------------------------------------
15# Import necessary modules
16#------------------------------------------------------------------------------
17
18import os
19import sys
20import time
21import pypar
22import numpy as num
23import unittest
24import tempfile
25from struct import pack, unpack
26from Scientific.IO.NetCDF import NetCDFFile
27import copy
28
29#------------------------
30# ANUGA Modules
31#------------------------
32from anuga.utilities.numerical_tools import ensure_numeric
33from anuga.utilities.util_ext        import double_precision
34from anuga.utilities.norms           import l1_norm, l2_norm, linf_norm
35       
36from anuga import Domain
37from anuga import Reflective_boundary
38from anuga import Dirichlet_boundary
39from anuga import Time_boundary
40from anuga import Transmissive_boundary
41from anuga import File_boundary
42
43from mux import WAVEHEIGHT_MUX2_LABEL, EAST_VELOCITY_MUX2_LABEL, \
44                NORTH_VELOCITY_MUX2_LABEL
45from mux import read_mux2_py
46from anuga.file_conversion.urs2sts import urs2sts
47from anuga.file.urs import Read_urs
48from anuga.file.sts import create_sts_boundary
49
50from anuga.utilities.numerical_tools import ensure_numeric
51from anuga.coordinate_transforms.redfearn import redfearn
52from anuga.coordinate_transforms.geo_reference import Geo_reference
53
54from anuga import rectangular_cross_domain
55from anuga.pmesh.mesh_interface import create_mesh_from_regions
56from anuga import create_domain_from_file
57
58from anuga_parallel import distribute, myid, numprocs, send, receive, barrier, finalize
59
60from anuga.file.test_mux import Test_Mux
61
62class Test_urs2sts_parallel(Test_Mux):
63    """ A suite of tests to test urs2sts file conversion functions.
64        These tests are quite coarse-grained: converting a file
65        and checking that its headers and some of its contents
66        are correct.
67    """ 
68
69    def sequential_test_time_varying_file_boundary_sts(self):
70        """sequential_ltest_time_varying_file_boundary_sts_sequential(self):
71        Read correct points from ordering file and apply sts to boundary. The boundary is time varying. FIXME add to test_urs2sts.
72        """
73        lat_long_points=[[6.01,97.0],[6.02,97.0],[6.05,96.9],[6.0,97.0]]
74        bounding_polygon=[[6.0,97.0],[6.01,97.0],[6.02,97.0],
75                          [6.02,97.02],[6.00,97.02]]
76        tide = 3.0
77        time_step_count = 65
78        time_step = 2.
79        n=len(lat_long_points)
80        first_tstep=num.ones(n,num.int)
81        last_tstep=(time_step_count)*num.ones(n,num.int)
82        finaltime=num.float(time_step*(time_step_count-1))
83        yieldstep=num.float(time_step)
84        gauge_depth=20*num.ones(n,num.float)
85        ha=2*num.ones((n,time_step_count),num.float)
86        ua=10*num.ones((n,time_step_count),num.float)
87        va=-10*num.ones((n,time_step_count),num.float)
88
89        times=num.arange(0., num.float(time_step_count*time_step), time_step)
90        for i in range(n):
91            #ha[i]+=num.sin(times)
92            ha[i]+=times/finaltime
93
94
95
96        sts_file="test"
97        if myid==0:
98            base_name, files = self.write_mux2(lat_long_points,
99                                               time_step_count,
100                                               time_step,
101                                               first_tstep,
102                                               last_tstep,
103                                               depth=gauge_depth,
104                                               ha=ha,
105                                               ua=ua,
106                                               va=va)
107            # base name will not exist, but 3 other files are created
108
109            # Write order file
110            file_handle, order_base_name = tempfile.mkstemp("")
111            os.close(file_handle)
112            os.remove(order_base_name)
113            d=","
114            order_file=order_base_name+'order.txt'
115            fid=open(order_file,'w')
116       
117            # Write Header
118            header='index, longitude, latitude\n'
119            fid.write(header)
120            indices=[3,0,1]
121            for i in indices:
122                line=str(i)+d+str(lat_long_points[i][1])+d+\
123                    str(lat_long_points[i][0])+"\n"
124                fid.write(line)
125            fid.close()
126
127            urs2sts(base_name,
128                    basename_out=sts_file,
129                    ordering_filename=order_file,
130                    mean_stage=tide,
131                    verbose=False)
132            self.delete_mux(files)
133
134            assert(os.access(sts_file+'.sts', os.F_OK))
135
136            os.remove(order_file)
137
138        barrier()
139        boundary_polygon = create_sts_boundary(sts_file)
140
141        # Append the remaining part of the boundary polygon to be defined by
142        # the user
143        bounding_polygon_utm=[]
144        for point in bounding_polygon:
145            zone,easting,northing=redfearn(point[0],point[1])
146            bounding_polygon_utm.append([easting,northing])
147
148        boundary_polygon.append(bounding_polygon_utm[3])
149        boundary_polygon.append(bounding_polygon_utm[4])
150
151        assert num.allclose(bounding_polygon_utm,boundary_polygon)
152
153
154        extent_res=1000000
155        meshname = 'urs_test_mesh' + '.tsh'
156        interior_regions=None
157        boundary_tags={'ocean': [0,1], 'otherocean': [2,3,4]}
158       
159        # have to change boundary tags from last example because now bounding
160        # polygon starts in different place.
161        if myid==0:
162            create_mesh_from_regions(boundary_polygon,
163                                     boundary_tags=boundary_tags,
164                                     maximum_triangle_area=extent_res,
165                                     filename=meshname,
166                                     interior_regions=interior_regions,
167                                     verbose=True)
168
169        barrier()
170       
171        domain_fbound = Domain(meshname)
172        domain_fbound.set_quantities_to_be_stored(None)
173        domain_fbound.set_quantity('stage', tide)
174        if verbose: print "Creating file boundary condition"
175        Bf = File_boundary(sts_file+'.sts',
176                           domain_fbound,
177                           boundary_polygon=boundary_polygon)
178        Br = Reflective_boundary(domain_fbound)
179
180        domain_fbound.set_boundary({'ocean': Bf,'otherocean': Br})
181
182        temp_fbound=num.zeros(int(finaltime/yieldstep)+1,num.float)
183        if verbose: print "Evolving domain with file boundary condition"
184        for i, t in enumerate(domain_fbound.evolve(yieldstep=yieldstep,
185                                                   finaltime=finaltime, 
186                                                   skip_initial_step = False)):
187            temp_fbound[i]=domain_fbound.quantities['stage'].centroid_values[2]
188            if verbose: domain_fbound.write_time()
189           
190       
191        domain_drchlt = Domain(meshname)
192        domain_drchlt.set_quantities_to_be_stored(None)
193        domain_drchlt.set_starttime(time_step)
194        domain_drchlt.set_quantity('stage', tide)
195        Br = Reflective_boundary(domain_drchlt)
196        #Bd = Dirichlet_boundary([2.0+tide,220+10*tide,-220-10*tide])
197        Bd = Time_boundary(domain=domain_drchlt, f=lambda t: [2.0+t/finaltime+tide,220.+10.*tide+10.*t/finaltime,-220.-10.*tide-10.*t/finaltime])
198        #Bd = Time_boundary(domain=domain_drchlt,f=lambda t: [2.0+num.sin(t)+tide,10.*(2+20.+num.sin(t)+tide),-10.*(2+20.+num.sin(t)+tide)])
199        domain_drchlt.set_boundary({'ocean': Bd,'otherocean': Br})
200        temp_drchlt=num.zeros(int(finaltime/yieldstep)+1,num.float)
201       
202        for i, t in enumerate(domain_drchlt.evolve(yieldstep=yieldstep,
203                                                   finaltime=finaltime, 
204                                                   skip_initial_step = False)):
205            temp_drchlt[i]=domain_drchlt.quantities['stage'].centroid_values[2]
206            #domain_drchlt.write_time()
207       
208        #print domain_fbound.quantities['stage'].vertex_values
209        #print domain_drchlt.quantities['stage'].vertex_values
210                   
211        assert num.allclose(temp_fbound,temp_drchlt),temp_fbound-temp_drchlt
212
213       
214        assert num.allclose(domain_fbound.quantities['stage'].vertex_values,
215                            domain_drchlt.quantities['stage'].vertex_values)
216                       
217        assert num.allclose(domain_fbound.quantities['xmomentum'].vertex_values,
218                            domain_drchlt.quantities['xmomentum'].vertex_values)                       
219                       
220        assert num.allclose(domain_fbound.quantities['ymomentum'].vertex_values,
221                            domain_drchlt.quantities['ymomentum'].vertex_values)
222       
223        if not sys.platform == 'win32':
224            if myid==0: os.remove(sts_file+'.sts')
225       
226        if myid==0: os.remove(meshname)
227
228    def parallel_test_time_varying_file_boundary_sts(self):
229        """ parallel_test_time_varying_file_boundary_sts_sequential(self):
230            Read correct points from ordering file and apply sts to boundary.
231            The boundary is time varying. Compares sequential result with
232            distributed result found using anuga_parallel
233        """
234
235        #------------------------------------------------------------
236        # Define test variables
237        #------------------------------------------------------------
238        lat_long_points=[[6.01,97.0],[6.02,97.0],[6.05,96.9],[6.0,97.0]]
239        bounding_polygon=[[6.0,97.0],[6.01,97.0],[6.02,97.0],
240                          [6.02,97.02],[6.00,97.02]]
241        tide = 3.0
242        time_step_count = 65
243        time_step = 2
244        n=len(lat_long_points)
245        first_tstep=num.ones(n,num.int)
246        last_tstep=(time_step_count)*num.ones(n,num.int)
247        finaltime=num.float(time_step*(time_step_count-1))
248        yieldstep=num.float(time_step)
249        gauge_depth=20*num.ones(n,num.float)
250        ha=2*num.ones((n,time_step_count),num.float)
251        ua=10*num.ones((n,time_step_count),num.float)
252        va=-10*num.ones((n,time_step_count),num.float)
253
254        times=num.arange(0, time_step_count*time_step, time_step)
255        for i in range(n):
256            #ha[i]+=num.sin(times)
257            ha[i]+=times/finaltime
258
259        #------------------------------------------------------------
260        # Write mux data to file then convert to sts format
261        #------------------------------------------------------------
262        sts_file="test"
263        if myid==0:
264            base_name, files = self.write_mux2(lat_long_points,
265                                               time_step_count,
266                                               time_step,
267                                               first_tstep,
268                                               last_tstep,
269                                               depth=gauge_depth,
270                                               ha=ha,
271                                               ua=ua,
272                                               va=va)
273            # base name will not exist, but 3 other files are created
274
275            # Write order file
276            file_handle, order_base_name = tempfile.mkstemp("")
277            os.close(file_handle)
278            os.remove(order_base_name)
279            d=","
280            order_file=order_base_name+'order.txt'
281            fid=open(order_file,'w')
282       
283            # Write Header
284            header='index, longitude, latitude\n'
285            fid.write(header)
286            indices=[3,0,1]
287            for i in indices:
288                line=str(i)+d+str(lat_long_points[i][1])+d+\
289                    str(lat_long_points[i][0])+"\n"
290                fid.write(line)
291            fid.close()
292
293            urs2sts(base_name,
294                    basename_out=sts_file,
295                    ordering_filename=order_file,
296                    mean_stage=tide,
297                    verbose=False)
298            self.delete_mux(files)
299
300            assert(os.access(sts_file+'.sts', os.F_OK))
301
302            os.remove(order_file)
303
304        barrier()
305        #------------------------------------------------------------
306        # Define boundary_polygon on each processor. This polygon defines the
307        # urs boundary and lies on a portion of the bounding_polygon
308        #------------------------------------------------------------
309        boundary_polygon = create_sts_boundary(sts_file)
310
311        # Append the remaining part of the boundary polygon to be defined by
312        # the user
313        bounding_polygon_utm=[]
314        for point in bounding_polygon:
315            zone,easting,northing=redfearn(point[0],point[1])
316            bounding_polygon_utm.append([easting,northing])
317
318        boundary_polygon.append(bounding_polygon_utm[3])
319        boundary_polygon.append(bounding_polygon_utm[4])
320
321
322        assert num.allclose(bounding_polygon_utm,boundary_polygon)
323
324        extent_res=10000
325        meshname = 'urs_test_mesh' + '.tsh'
326        interior_regions=None
327        boundary_tags={'ocean': [0,1], 'otherocean': [2,3,4]}
328       
329        #------------------------------------------------------------
330        # Create mesh on the master processor and store in file. This file
331        # is read in by each slave processor when needed
332        #------------------------------------------------------------
333        if myid==0:
334            create_mesh_from_regions(boundary_polygon,
335                                     boundary_tags=boundary_tags,
336                                     maximum_triangle_area=extent_res,
337                                     filename=meshname,
338                                     interior_regions=interior_regions,
339                                     verbose=True)
340       
341
342            # barrier()
343            domain_fbound = Domain(meshname)
344            domain_fbound.set_quantities_to_be_stored(None)
345            domain_fbound.set_quantity('stage', tide)
346            # print domain_fbound.mesh.get_boundary_polygon()
347        else:
348            domain_fbound=None
349
350        barrier()
351        if ( verbose and myid == 0 ): 
352            print 'DISTRIBUTING PARALLEL DOMAIN'
353        domain_fbound = distribute(domain_fbound)
354
355        #--------------------------------------------------------------------
356        # Find which sub_domain in which the interpolation points are located
357        #
358        # Sometimes the interpolation points sit exactly
359        # between two centroids, so in the parallel run we
360        # reset the interpolation points to the centroids
361        # found in the sequential run
362        #--------------------------------------------------------------------
363        interpolation_points = [[279000,664000], [280250,664130], 
364                                    [279280,665400], [280500,665000]]
365
366        interpolation_points=num.array(interpolation_points)
367
368        #if myid==0:
369        #    import pylab as P
370        #    boundary_polygon=num.array(boundary_polygon)
371        #    P.plot(boundary_polygon[:,0],boundary_polygon[:,1])
372        #    P.plot(interpolation_points[:,0],interpolation_points[:,1],'ko')
373        #    P.show()
374
375        fbound_gauge_values = []
376        fbound_proc_tri_ids = []
377        for i, point in enumerate(interpolation_points):
378            fbound_gauge_values.append([]) # Empty list for timeseries
379
380            try:
381                k = domain_fbound.get_triangle_containing_point(point)
382                if domain_fbound.tri_full_flag[k] == 1:
383                    fbound_proc_tri_ids.append(k)
384                else:
385                    fbound_proc_tri_ids.append(-1)           
386            except:
387                fbound_proc_tri_ids.append(-2)
388
389
390        if verbose: print 'P%d has points = %s' %(myid, fbound_proc_tri_ids)
391
392        #------------------------------------------------------------
393        # Set boundary conditions
394        #------------------------------------------------------------
395        Bf = File_boundary(sts_file+'.sts',
396                           domain_fbound,
397                           boundary_polygon=boundary_polygon)
398        Br = Reflective_boundary(domain_fbound)
399   
400        domain_fbound.set_boundary({'ocean': Bf,'otherocean': Br})
401
402        #------------------------------------------------------------
403        # Evolve the domain on each processor
404        #------------------------------------------------------------ 
405        for i, t in enumerate(domain_fbound.evolve(yieldstep=yieldstep,
406                                                   finaltime=finaltime, 
407                                                   skip_initial_step = False)):
408
409            stage = domain_fbound.get_quantity('stage')
410            for i in range(4):
411                if fbound_proc_tri_ids[i] > -1:
412                    fbound_gauge_values[i].append(stage.centroid_values[fbound_proc_tri_ids[i]])
413       
414        #------------------------------------------------------------
415        # Create domain to be run sequntially on each processor
416        #------------------------------------------------------------
417        domain_drchlt = Domain(meshname)
418        domain_drchlt.set_quantities_to_be_stored(None)
419        domain_drchlt.set_starttime(time_step)
420        domain_drchlt.set_quantity('stage', tide)
421        Br = Reflective_boundary(domain_drchlt)
422        #Bd = Dirichlet_boundary([2.0+tide,220+10*tide,-220-10*tide])
423        Bd = Time_boundary(domain=domain_drchlt, function=lambda t: [2.0+t/finaltime+tide,220.+10.*tide+10.*t/finaltime,-220.-10.*tide-10.*t/finaltime])
424        #Bd = Time_boundary(domain=domain_drchlt,function=lambda t: [2.0+num.sin(t)+tide,10.*(2+20.+num.sin(t)+tide),-10.*(2+20.+num.sin(t)+tide)])
425        domain_drchlt.set_boundary({'ocean': Bd,'otherocean': Br})
426       
427        drchlt_gauge_values = []
428        drchlt_proc_tri_ids = []
429        for i, point in enumerate(interpolation_points):
430            drchlt_gauge_values.append([]) # Empty list for timeseries
431
432            try:
433                k = domain_drchlt.get_triangle_containing_point(point)
434                if domain_drchlt.tri_full_flag[k] == 1:
435                    drchlt_proc_tri_ids.append(k)
436                else:
437                    drchlt_proc_tri_ids.append(-1)           
438            except:
439                drchlt_proc_tri_ids.append(-2)
440
441
442        if verbose: print 'P%d has points = %s' %(myid, drchlt_proc_tri_ids)
443
444        #------------------------------------------------------------
445        # Evolve entire domain on each processor
446        #------------------------------------------------------------
447        for i, t in enumerate(domain_drchlt.evolve(yieldstep=yieldstep,
448                                                   finaltime=finaltime, 
449                                                   skip_initial_step = False)):
450
451            stage = domain_drchlt.get_quantity('stage')
452            for i in range(4):
453                drchlt_gauge_values[i].append(stage.centroid_values[drchlt_proc_tri_ids[i]])
454
455        #------------------------------------------------------------
456        # Compare sequential values with parallel values
457        #------------------------------------------------------------
458        barrier()
459        success = True
460        for i in range(4):
461            if fbound_proc_tri_ids[i] > -1:
462                fbound_gauge_values[i]=num.array(fbound_gauge_values[i])
463                drchlt_gauge_values[i]=num.array(drchlt_gauge_values[i])
464                #print i,fbound_gauge_values[i][4]
465                #print i,drchlt_gauge_values[i][4]
466                success = success and num.allclose(fbound_gauge_values[i], drchlt_gauge_values[i])
467                assert success#, (fbound_gauge_values[i]-drchlt_gauge_values[i])
468
469        #assert_(success)       
470
471        if not sys.platform == 'win32':
472            if myid==0: os.remove(sts_file+'.sts')
473       
474        if myid==0: os.remove(meshname)
475   
476# Because we are doing assertions outside of the TestCase class
477# the PyUnit defined assert_ function can't be used.
478def assert_(condition, msg="Assertion Failed"):
479    if condition == False:
480        #pypar.finalize()
481        raise AssertionError, msg
482
483
484# Test an nprocs-way run of the shallow water equations
485# against the sequential code.
486
487if __name__=="__main__":
488    verbose=True
489    if myid ==0 and verbose: 
490        print 'PARALLEL START'
491    suite = unittest.makeSuite(Test_urs2sts_parallel,'parallel_test')
492    #suite = unittest.makeSuite(Test_urs2sts_parallel,'sequential_test')
493    runner = unittest.TextTestRunner()
494    runner.run(suite)
495
496    #------------------------------------------
497    # Run the code code and compare sequential
498    # results at 4 gauge stations
499    #------------------------------------------
500
501   
502    finalize()
Note: See TracBrowser for help on using the repository browser.