source: trunk/anuga_work/development/Busselton/BP3Hydro/run_model_parallel.py @ 8449

Last change on this file since 8449 was 8449, checked in by martins, 13 years ago

Checking in Busselton NCI code

  • Property svn:executable set to *
File size: 14.9 KB
Line 
1"""
2"""
3import time
4t0 = time.time()
5
6from anuga_parallel import myid, numprocs, distribute, finalize, barrier
7
8if myid == 0:
9    print ' ABOUT to Start Simulation:- Importing Modules' # This section imports all required ANUGA functions. Add or delete as required.
10
11if numprocs > 1:
12    print 'Running in parallel on %g processors' %numprocs
13
14#------------------------------------------------------------------------------
15# IMPORT NECESSARY MODULES
16#------------------------------------------------------------------------------
17
18import os
19import datetime
20import shutil
21import cPickle
22import pypar
23import numpy as num
24import anuga
25
26from os.path import join, exists
27from setup_model import project, trigs_min
28#from anuga import sww_merge
29##from anuga import Domain
30from anuga.geometry.polygon import read_polygon
31from anuga.geometry.polygon_function import Polygon_function
32from anuga.structures.inlet_operator import Inlet_operator
33
34project.output_run = join(project.output_folder, project.output_comment) + "_np" + str(numprocs)   
35project.meshes = join(project.output_run, project.scenario_name) + '.msh'
36
37if myid == 0:
38    print "Running simulation on %i processors" %numprocs
39    print 'min estimated number of triangles', trigs_min
40    print " "
41 
42    if not os.path.exists(project.output_folder):
43        os.makedirs(project.output_folder)
44
45if project.UseCheckRestore:   
46    restore_folder = project.restore_folder + "_np" + str(numprocs)
47    if not os.path.exists(restore_folder):
48        try: 
49            os.makedirs(restore_folder)
50        except OSError:
51            pass
52       
53barrier()
54 
55sanity_error = False
56
57if not exists(project.output_folder):
58    print "Sorry, outputs directory '%s' doesn't exist" % project.output_folder
59    sanity_error = True
60
61if sanity_error:
62    msg = 'You must fix the above errors before continuing.'
63    raise Exception, msg
64   
65#-------------------------------------------------------------------------------
66# Copy scripts to time stamped output directory and capture screen
67# output to file. Copy script must be before screen_catcher
68#-------------------------------------------------------------------------------
69
70if myid == 0:
71    anuga.copy_code_files(project.output_run, __file__, 
72        os.path.join(os.path.dirname(project.__file__),
73        project.__name__+'.py'))
74   
75# note: presently the screen catcher does not work in anuga parallel
76    #from anuga.utilities.file_utils import start_screen_catcher
77    #anuga.start_screen_catcher(project.output_run, myid, numprocs)
78
79
80#-------------------------------------------------------------------------------
81# Create the computational domain based on overall clipping polygon with
82# a tagged boundary and interior regions defined in project.py along with
83# resolutions (maximal area of per triangle) for each polygon
84#-------------------------------------------------------------------------------
85
86#reading the GEMS boundary points
87gems_boundary = anuga.create_sts_boundary(project.event_sts)       
88landward_boundary = anuga.read_polygon(project.landward_boundary)
89
90# Combine GEMS input boundary polyline with landward points
91bounding_polygon_gems = gems_boundary + landward_boundary
92
93# Number of boundary segments
94num_ocean_segments = len(gems_boundary) - 1
95
96# Number of landward_boundary points
97num_land_points = anuga.file_length(project.landward_boundary)
98
99# Boundary tags refer to project.landward_boundary
100# 4 points equals 5 segments start at N
101boundary_tags={'back': range(num_ocean_segments+1,
102                             num_ocean_segments+num_land_points),
103               'side': [num_ocean_segments,
104                        num_ocean_segments+num_land_points],
105               'ocean': range(num_ocean_segments)}
106
107# Build mesh and domain
108
109if not project.UseCheckRestore: 
110    UseCheckPoint = False
111   
112else:
113    files = []
114   
115    for (path, directory, filenames) in os.walk (restore_folder):
116   
117        if len(filenames) == 0:
118            UseCheckPoint = False
119        else:         
120            UseCheckPoint = True 
121            for filename in filenames:
122                files.append(int(filename.rpartition("_")[2]))
123   
124if UseCheckPoint == True:
125    files.sort()
126    lastResPt = files[-1]
127   
128    if myid == 0:
129        print "Loading domain fragments from the last restore point: ", lastResPt
130        print " "
131    barrier()
132   
133    import exceptions 
134   
135    for pronb in range(numprocs):
136
137        if myid == pronb:
138            try:
139                print "Loading domain fragment from cpu ", pronb 
140                restorePointFile = join(project.restore_folder + "_np" + str(numprocs), 'RestorePoint_' + "Proc_" + str(myid) + "_" + str(lastResPt)) 
141                domain = cPickle.load(open(restorePointFile, 'rb'))
142                domain.starttime = lastResPt
143                RestartPoint = "_From" + str(domain.starttime)
144                domain.set_datadir(project.output_run + RestartPoint)
145               
146                if os.path.exists(project.output_run + "_From" + str(domain.starttime)):
147                    domain.set_datadir(project.output_run + "_From" + str(domain.starttime) + "a")
148                   
149                UsePenultimate = False
150               
151            except exceptions.Exception, e:
152               
153                UsePenultimate = True
154 
155                print " "
156                print "    Load failed for cpu ", myid
157                print "        error: ", e
158                print " "
159                raise
160
161    failedcpus = []
162   
163    if UsePenultimate == True:
164        mystatus = 1
165    else:
166        mystatus = 0
167       
168    for cpu in range(numprocs):
169        if myid != cpu:
170            pypar.send(mystatus,cpu)
171            cpustatus = pypar.receive(cpu)
172            if cpustatus == 1:
173                failedcpus.append(cpu)
174   
175    if len(failedcpus) > 0:
176        UsePenultimate = True 
177                 
178    if UsePenultimate:
179       
180        try:
181            lastResPt = files[-(numprocs+1):-numprocs][0]
182        except IndexError:
183            print "Only one restore point and it failed, stopping"
184            import sys
185            sys.exit()
186           
187        lastResPt = files[-(numprocs+1):-numprocs][0]
188       
189        if myid == 0:
190            print "    Loading domain fragments from penultimate restore point: ", lastResPt
191            print " "
192        barrier()
193       
194        for pronb in range(numprocs):
195
196            if myid == pronb:
197               
198                print "Loading domain fragment from cpu ", pronb 
199                restorePointFile = join(project.restore_folder + "_np" + str(numprocs), 'RestorePoint_' + "Proc_" + str(myid) + "_" + str(lastResPt)) 
200                domain = cPickle.load(open(restorePointFile, 'rb'))
201                domain.starttime = lastResPt
202                RestartPoint = "_From" + str(domain.starttime)
203                domain.set_datadir(project.output_run + RestartPoint)
204               
205                if os.path.exists(project.output_run + "_From" + str(domain.starttime)):
206                    domain.set_datadir(project.output_run + "_From" + str(domain.starttime) + "a")
207   
208    if myid == 0:
209        print " "
210        print "Starting from time: ", lastResPt
211        print " "
212       
213else:
214
215    if myid == 0:
216           
217            meshFile = join(project.meshes_folder, project.scenario_name + "_" + project.setup) + '.msh'
218           
219            meshFileRestore = join(project.output_run, project.scenario_name) + '.msh'
220
221            if os.path.exists(meshFile):
222                print "Reading from mesh file..."
223                domain = anuga.create_domain_from_file(meshFile) 
224           
225            elif os.path.exists(meshFileRestore):
226                print "Reading from restore mesh file..."
227                domain = anuga.create_domain_from_file(meshFileRestore) 
228                     
229            else:
230                print "Creating sequential domain..."
231                from anuga.coordinate_transforms.geo_reference import Geo_reference
232                GeoRef = Geo_reference(zone=project.zone, datum="D_GDA_1194")
233                domain = anuga.create_domain_from_regions(bounding_polygon_gems,
234                                                    boundary_tags=boundary_tags,
235                                                    maximum_triangle_area=project.bounding_maxarea,
236                                                    interior_regions=project.interior_regions,
237                                                    mesh_filename=project.meshes,
238                                                    mesh_geo_reference = GeoRef,
239                                                    poly_geo_reference = GeoRef,
240                                                    use_cache=True, 
241                                                    verbose=True)   
242
243            print "Calculating elevation on sequential domain"
244            domain.set_quantity('elevation', 
245                        filename=project.combined_elevation+'.pts',
246                        use_cache=False,
247                        verbose=True,
248                        alpha=project.alpha)
249           
250            domain.set_starttime(project.starttime)
251            domain.set_datadir(project.output_run) 
252               
253            print 'Number of triangles = ', len(domain)
254            print 'The extent is ', domain.get_extent()
255            print domain.statistics()
256   
257    else:
258        domain = None
259
260#-------------------------
261# Distribute domain if run in parallel
262#-------------------------
263if numprocs > 1:
264
265        if UseCheckPoint == False: 
266                     
267            if myid == 0:
268                print "Distributing domain..."
269               
270            domain = distribute(domain)
271
272#--------------------------
273# Set simulation parameters
274#--------------------------
275if not UseCheckPoint:
276    domain.set_name(project.scenario_name)
277    domain.set_minimum_storable_height(project.minimum_storable_height)
278    domain.set_quantity('friction', project.friction)
279
280    polygons = []
281   
282    for filename, stageInit in project.InitialStages:
283            polygon = read_polygon(join(project.polygons_folder, filename))
284            polygons.append((polygon, stageInit))
285           
286    domain.set_quantity('stage', Polygon_function( polygons ))
287     
288    ##--------------------------
289    ## Setup boundary conditions
290    ##--------------------------
291
292    Br = anuga.Reflective_boundary(domain)
293    Bt = anuga.Transmissive_stage_zero_momentum_boundary(domain)
294    Bd = anuga.Dirichlet_boundary([project.tide, 0, 0])
295    Bf = anuga.Field_boundary(project.event_sts,
296                        domain, mean_stage=project.tide,
297                        time_thinning=1,
298                        default_boundary=anuga.Dirichlet_boundary([0, 0, 0]),
299                        boundary_polygon=bounding_polygon_gems,                   
300                        use_cache=True,
301                        verbose=True)
302   
303    if myid == 0:
304        print 'Set boundary - available tags:', domain.get_boundary_tags()
305   
306    domain.set_boundary({'back': Br,
307                         'side': Bt,
308                         'ocean': Bf}) 
309
310    #---------------------------------------------------------------- --------------
311    # APPLY RAINFALL OR FLOWS
312    #-------------------------------------------------------------------------------
313   
314#    hydrograph = anuga.Inflow(domain, center=(347897.734893, 6271599.810148), radius=10, rate=anuga.file_function(join(project.hydrographs_folder, 'VasseDD25.tms'), quantities=['hydrograph']))
315#
316#    domain.forcing_terms.append(hydrograph)
317
318    from anuga_parallel.parallel_operator_factory import Inlet_operator
319
320    for [hydroName,line] in project.riversHydro:
321        Q = anuga.file_function(join(project.hydrographs_folder, hydroName), quantities=['hydrograph'])
322        Inlet_operator(domain, line, Q)
323   
324#------------------------------------------------------------------------------
325# EVOLVE SYSTEM THROUGH TIME
326#------------------------------------------------------------------------------
327
328if UseCheckPoint == False:
329    lastResPt = domain.starttime
330   
331ft = project.finaltime
332
333if project.UseCheckRestore: 
334   
335    restPtInt = project.restPtInt
336    StartingResPt = int(lastResPt + restPtInt)
337   
338    for  restorepoint in xrange(StartingResPt , ft, restPtInt):
339     
340    #    if restorepoint == lastResPt + restPtInt*3:
341    #        import sys
342    #        for proc in range(numprocs):
343    #            if myid == proc:
344    #                print "Simulating crash", proc
345    #        sys.exit()
346
347        if not restorepoint == ft - project.yieldstep:
348           
349            for t in domain.evolve(yieldstep = project.yieldstep, finaltime = restorepoint):
350                if myid == 0:
351                   print domain.timestepping_statistics(), datetime.datetime.today() 
352             
353            if myid == 0:
354                print 'Saving restore point at time: ',  restorepoint
355                print " "
356                   
357            restorePointFile = join(restore_folder, 'RestorePoint_' + "Proc_" + str(myid) + "_" + str(restorepoint)) 
358            cPickle.dump(domain, open(restorePointFile, 'wb'))
359            domain.starttime = restorepoint + project.yieldstep
360            barrier()   #Wait until domain fragments fully written
361            if myid == 0:
362                print "All domain fragments fully written"
363                print " "
364
365    if domain.get_starttime() < ft:
366        for t in domain.evolve(yieldstep = project.yieldstep, finaltime = ft):
367            if myid == 0:
368               print domain.timestepping_statistics(), datetime.datetime.today()
369                         
370else:
371   
372    for t in domain.evolve(yieldstep = project.yieldstep, finaltime = ft):
373       if myid == 0:
374          print domain.timestepping_statistics(), datetime.datetime.today()     
375               
376barrier()
377
378if myid == 0 and numprocs > 1:
379    print " "
380    print 'Number of processors %g ' %numprocs
381    print 'Communication time %.2f seconds'%domain.communication_time
382    print 'Reduction Communication time %.2f seconds'%domain.communication_reduce_time
383    print 'Broadcast time %.2f seconds'%domain.communication_broadcast_time
384   
385if myid ==0:
386
387    Duration = (int((time.time()-t0)/3600), int(((time.time()-t0)%3600)/60), int((time.time()-t0)%60))
388
389    print "Finished"
390    print "Time taken:"
391    print "Hours: %i, Minutes: %i, Seconds: %i" %Duration
392    print "Output written to ~", domain.get_datadir().rpartition("outputs")[2]
393
394    Durationfn = str(Duration[0]) + "h "  +  str(Duration[1]) + "m "  +  str(Duration[2]) + "s"
395    f = open(join(domain.get_datadir(),Durationfn), 'w')
396    f.close
397    g = open(join(domain.get_datadir(),"trigs_min " + str(trigs_min)), 'w')
398    g.close
399   
400    if project.UseCheckRestore: 
401        if os.path.exists(restore_folder): 
402            shutil.rmtree(restore_folder)
403   
404    if numprocs > 1: 
405        from anuga.utilities.sww_merge import sww_merge_parallel
406        sww_file = join(project.output_run,'Busselton')
407        sww_merge_parallel(sww_file,numprocs, delete_old=True)
408
409        if UseCheckPoint == True:
410            sww_file = join(project.output_run + RestartPoint,'Busselton')
411            sww_merge_parallel(sww_file,numprocs, delete_old=True)
412
413finalize()
Note: See TracBrowser for help on using the repository browser.