Ticket #368: run_model_parallel.py

File run_model_parallel.py, 13.8 KB (added by steve, 12 years ago)

Script with elevation set for the sequential domain

Line 
1"""
2"""
3import time
4t0 = time.time()
5
6from anuga_parallel import myid, numprocs, distribute, finalize, barrier
7
8if myid == 0:
9    print ' ABOUT to Start Simulation:- Importing Modules' # This section imports all required ANUGA functions. Add or delete as required.
10
11if numprocs > 1:
12    print 'Running in parallel on %g processors' %numprocs
13
14#------------------------------------------------------------------------------
15# IMPORT NECESSARY MODULES
16#------------------------------------------------------------------------------
17
18import os
19import datetime
20import shutil
21import cPickle
22import pypar
23import numpy as num
24import anuga
25
26from os.path import join, exists
27from setup_model import project, trigs_min
28from anuga import sww_merge
29from anuga.geometry.polygon import read_polygon
30from anuga.geometry.polygon_function import Polygon_function
31from anuga.structures.inlet_operator import Inlet_operator
32
33project.output_run = join(project.output_folder, project.output_comment) + "_np" + str(numprocs)   
34project.meshes = join(project.output_run, project.scenario_name) + '.msh'
35
36if myid == 0:
37    print "Running simulation on %i processors" %numprocs
38    print 'min estimated number of triangles', trigs_min
39    print " "
40 
41    if not os.path.exists(project.output_folder):
42        os.makedirs(project.output_folder)
43
44if project.UseCheckRestore:   
45    restore_folder = project.restore_folder + "_np" + str(numprocs)
46    if not os.path.exists(restore_folder):
47        try: 
48            os.makedirs(restore_folder)
49        except OSError:
50            pass
51       
52barrier()
53 
54sanity_error = False
55
56if not exists(project.output_folder):
57    print "Sorry, outputs directory '%s' doesn't exist" % project.output_folder
58    sanity_error = True
59
60if sanity_error:
61    msg = 'You must fix the above errors before continuing.'
62    raise Exception, msg
63   
64#-------------------------------------------------------------------------------
65# Copy scripts to time stamped output directory and capture screen
66# output to file. Copy script must be before screen_catcher
67#-------------------------------------------------------------------------------
68
69if myid == 0:
70    anuga.copy_code_files(project.output_run, __file__, 
71        os.path.join(os.path.dirname(project.__file__),
72        project.__name__+'.py'))
73   
74    # note: presently the screen catcher does not work in anuga parallel
75    #anuga.abstract_2d_finite_volumes.util.start_screen_catcher(project.output_run, myid, numprocs)
76
77#-------------------------------------------------------------------------------
78# Create the computational domain based on overall clipping polygon with
79# a tagged boundary and interior regions defined in project.py along with
80# resolutions (maximal area of per triangle) for each polygon
81#-------------------------------------------------------------------------------
82
83#reading the GEMS boundary points
84gems_boundary = anuga.create_sts_boundary(project.event_sts)       
85landward_boundary = anuga.read_polygon(project.landward_boundary)
86
87# Combine GEMS input boundary polyline with landward points
88bounding_polygon_gems = gems_boundary + landward_boundary
89
90# Number of boundary segments
91num_ocean_segments = len(gems_boundary) - 1
92
93# Number of landward_boundary points
94num_land_points = anuga.file_length(project.landward_boundary)
95
96# Boundary tags refer to project.landward_boundary
97# 4 points equals 5 segments start at N
98boundary_tags={'back': range(num_ocean_segments+1,
99                             num_ocean_segments+num_land_points),
100               'side': [num_ocean_segments,
101                        num_ocean_segments+num_land_points],
102               'ocean': range(num_ocean_segments)}
103
104# Build mesh and domain
105
106if not project.UseCheckRestore: 
107    UseCheckPoint = False
108   
109else:
110    files = []
111   
112    for (path, directory, filenames) in os.walk (restore_folder):
113   
114        if len(filenames) == 0:
115            UseCheckPoint = False
116        else:         
117            UseCheckPoint = True 
118            for filename in filenames:
119                files.append(int(filename.rpartition("_")[2]))
120   
121if UseCheckPoint == True:
122    files.sort()
123    lastResPt = files[-1]
124   
125    if myid == 0:
126        print "Loading domain fragments from the last restore point: ", lastResPt
127        print " "
128    barrier()
129   
130    import exceptions 
131   
132    for pronb in range(numprocs):
133
134        if myid == pronb:
135            try:
136                print "Loading domain fragment from cpu ", pronb 
137                restorePointFile = join(project.restore_folder + "_np" + str(numprocs), 'RestorePoint_' + "Proc_" + str(myid) + "_" + str(lastResPt)) 
138                domain = cPickle.load(open(restorePointFile, 'rb'))
139                domain.starttime = lastResPt
140                RestartPoint = "_From" + str(domain.starttime)
141                domain.set_datadir(project.output_run + RestartPoint)
142               
143                if os.path.exists(project.output_run + "_From" + str(domain.starttime)):
144                    domain.set_datadir(project.output_run + "_From" + str(domain.starttime) + "a")
145                   
146                UsePenultimate = False
147               
148            except exceptions.Exception, e:
149               
150                UsePenultimate = True
151 
152                print " "
153                print "    Load failed for cpu ", myid
154                print "        error: ", e
155                print " "
156                raise
157
158    failedcpus = []
159   
160    if UsePenultimate == True:
161        mystatus = 1
162    else:
163        mystatus = 0
164       
165    for cpu in range(numprocs):
166        if myid != cpu:
167            pypar.send(mystatus,cpu)
168            cpustatus = pypar.receive(cpu)
169            if cpustatus == 1:
170                failedcpus.append(cpu)
171   
172    if len(failedcpus) > 0:
173        UsePenultimate = True 
174                 
175    if UsePenultimate:
176       
177        try:
178            lastResPt = files[-(numprocs+1):-numprocs][0]
179        except IndexError:
180            print "Only one restore point and it failed, stopping"
181            import sys
182            sys.exit()
183           
184        lastResPt = files[-(numprocs+1):-numprocs][0]
185       
186        if myid == 0:
187            print "    Loading domain fragments from penultimate restore point: ", lastResPt
188            print " "
189        barrier()
190       
191        for pronb in range(numprocs):
192
193            if myid == pronb:
194               
195                print "Loading domain fragment from cpu ", pronb 
196                restorePointFile = join(project.restore_folder + "_np" + str(numprocs), 'RestorePoint_' + "Proc_" + str(myid) + "_" + str(lastResPt)) 
197                domain = cPickle.load(open(restorePointFile, 'rb'))
198                domain.starttime = lastResPt
199                RestartPoint = "_From" + str(domain.starttime)
200                domain.set_datadir(project.output_run + RestartPoint)
201               
202                if os.path.exists(project.output_run + "_From" + str(domain.starttime)):
203                    domain.set_datadir(project.output_run + "_From" + str(domain.starttime) + "a")
204   
205    if myid == 0:
206        print " "
207        print "Starting from time: ", lastResPt
208        print " "
209       
210else:
211
212    if myid == 0:
213           
214        meshFile = join(project.meshes_folder, project.scenario_name + "_" + project.setup) + '.msh'
215           
216        meshFileRestore = join(project.output_run, project.scenario_name) + '.msh'
217
218        if os.path.exists(meshFile):
219            print "Reading from mesh file..."
220            domain = anuga.create_domain_from_file(meshFile) 
221           
222        elif os.path.exists(meshFileRestore):
223            print "Reading from restore mesh file..."
224            domain = anuga.create_domain_from_file(meshFileRestore) 
225                     
226        else:
227            print "Creating sequential domain..."
228            from anuga.coordinate_transforms.geo_reference import Geo_reference
229            GeoRef = Geo_reference(zone=project.zone, datum="D_GDA_1194")
230            domain = anuga.create_domain_from_regions(bounding_polygon_gems,
231                                                    boundary_tags=boundary_tags,
232                                                    maximum_triangle_area=project.bounding_maxarea,
233                                                    interior_regions=project.interior_regions,
234                                                    mesh_filename=project.meshes,
235                                                    mesh_geo_reference = GeoRef,
236                                                    poly_geo_reference = GeoRef,
237                                                    use_cache=True, 
238                                                    verbose=True)
239                                                   
240            print "Calculating elevation on sequential domain"
241            domain.set_quantity('elevation', 
242                        filename=project.combined_elevation+'.pts',
243                        use_cache=False,
244                        verbose=True,
245                        alpha=project.alpha)
246
247            domain.set_starttime(project.starttime)
248            domain.set_datadir(project.output_run) 
249               
250            print 'Number of triangles = ', len(domain)
251            print 'The extent is ', domain.get_extent()
252            print domain.statistics()
253   
254    else:
255        domain = None
256   
257#-------------------------
258# Distribute domain if run in parallel
259#-------------------------
260if numprocs > 1:
261
262    if UseCheckPoint == False: 
263             
264        if myid == 0:
265            print "Distributing domain..."
266
267
268    domain = distribute(domain)
269
270#--------------------------
271# Set simulation parameters
272#--------------------------
273if not UseCheckPoint:
274    domain.set_name(project.scenario_name)
275    domain.set_minimum_storable_height(project.minimum_storable_height)
276    domain.set_quantity('friction', project.friction)
277   
278    polygons = []
279   
280    for filename, stageInit in project.InitialStages:
281            polygon = read_polygon(join(project.polygons_folder, filename))
282            polygons.append((polygon, stageInit))
283           
284    domain.set_quantity('stage', Polygon_function( polygons ))
285     
286    ##--------------------------
287    ## Setup boundary conditions
288    ##--------------------------
289
290    Br = anuga.Reflective_boundary(domain)
291    Bt = anuga.Transmissive_stage_zero_momentum_boundary(domain)
292    Bd = anuga.Dirichlet_boundary([project.tide, 0, 0])
293    Bf = anuga.Field_boundary(project.event_sts,
294                        domain, mean_stage=project.tide,
295                        time_thinning=1,
296                        default_boundary=anuga.Dirichlet_boundary([0, 0, 0]),
297                        boundary_polygon=bounding_polygon_gems,                   
298                        use_cache=True,
299                        verbose=True)
300   
301    if myid == 0:
302        print 'Set boundary - available tags:', domain.get_boundary_tags()
303   
304    domain.set_boundary({'back': Br,
305                         'side': Bt,
306                         'ocean': Bf}) 
307   
308#------------------------------------------------------------------------------
309# EVOLVE SYSTEM THROUGH TIME
310#------------------------------------------------------------------------------
311
312if UseCheckPoint == False:
313    lastResPt = domain.starttime
314   
315ft = project.finaltime
316
317if project.UseCheckRestore: 
318   
319    restPtInt = project.restPtInt
320    StartingResPt = int(lastResPt + restPtInt)
321   
322    for  restorepoint in xrange(StartingResPt , ft, restPtInt):
323     
324    #    if restorepoint == lastResPt + restPtInt*3:
325    #        import sys
326    #        for proc in range(numprocs):
327    #            if myid == proc:
328    #                print "Simulating crash", proc
329    #        sys.exit()
330   
331        for t in domain.evolve(yieldstep = project.yieldstep, finaltime = restorepoint):
332            if myid == 0:
333               print domain.timestepping_statistics(), datetime.datetime.today() 
334         
335        if myid == 0:
336            if restorepoint != ft:
337                print 'Saving restore point at time: ',  restorepoint
338                print " "
339               
340     
341        if restorepoint != ft:
342            restorePointFile = join(restore_folder, 'RestorePoint_' + "Proc_" + str(myid) + "_" + str(restorepoint)) 
343            cPickle.dump(domain, open(restorePointFile, 'wb'))
344            domain.starttime = restorepoint + project.yieldstep
345            barrier()   #Wait until domain fragments fully written
346            if myid == 0:
347                print "All domain fragments fully written"
348                print " "
349               
350else:
351   
352    for t in domain.evolve(yieldstep = project.yieldstep, finaltime = ft):
353       if myid == 0:
354          print domain.timestepping_statistics(), datetime.datetime.today()     
355               
356barrier()
357
358if myid == 0 and numprocs > 1:
359    print 'Number of processors %g ' %numprocs
360    print 'Communication time %.2f seconds'%domain.communication_time
361    print 'Reduction Communication time %.2f seconds'%domain.communication_reduce_time
362    print 'Broadcast time %.2f seconds'%domain.communication_broadcast_time
363   
364if myid ==0:
365
366    Duration = (int((time.time()-t0)/3600), int(((time.time()-t0)%3600)/60), int((time.time()-t0)%60))
367
368    print "Finished"
369    print "Time taken:"
370    print "Hours: %i, Minutes: %i, Seconds: %i" %Duration
371    print "Output written to ~", domain.get_datadir().rpartition("outputs")[2]
372
373    Durationfn = str(Duration[0]) + "h "  +  str(Duration[1]) + "m "  +  str(Duration[2]) + "s"
374    f = open(join(domain.get_datadir(),Durationfn), 'w')
375    f.close
376    g = open(join(domain.get_datadir(),"trigs_min " + str(trigs_min)), 'w')
377    g.close
378   
379    if numprocs > 1:   
380
381        sww_file = join(project.output_run,'Busselton')
382        sww_merge(sww_file,numprocs)
383       
384        if UseCheckPoint == True:
385            sww_file = join(project.output_run + RestartPoint,'Busselton')
386            sww_merge(sww_file,numprocs)
387           
388            if os.path.exists(restore_folder): 
389                shutil.rmtree(restore_folder)
390
391finalize()