source: trunk/anuga_core/anuga/parallel/parallel_api.py @ 9679

Last change on this file since 9679 was 9500, checked in by steve, 10 years ago

setup up to move anuga_parllel to anuga.parallel

File size: 16.6 KB
Line 
1"""Trying to lump parallel stuff into simpler interface
2
3
4"""
5
6import numpy as num
7
8# The abstract Python-MPI interface
9from anuga.utilities.parallel_abstraction import size, rank, get_processor_name
10from anuga.utilities.parallel_abstraction import finalize, send, receive
11from anuga.utilities.parallel_abstraction import pypar_available, barrier
12
13
14
15# ANUGA parallel engine (only load if pypar can)
16if pypar_available:
17    from anuga.parallel.sequential_distribute import sequential_distribute_dump
18    from anuga.parallel.sequential_distribute import sequential_distribute_load
19   
20    from anuga.parallel.distribute_mesh  import send_submesh
21    from anuga.parallel.distribute_mesh  import rec_submesh
22    from anuga.parallel.distribute_mesh  import extract_submesh
23
24    # Mesh partitioning using Metis
25    from anuga.parallel.distribute_mesh import build_submesh
26    from anuga.parallel.distribute_mesh import pmesh_divide_metis_with_map
27
28    from anuga.parallel.parallel_shallow_water import Parallel_domain
29   
30   
31
32from anuga.abstract_2d_finite_volumes.neighbour_mesh import Mesh
33
34#------------------------------------------------------------------------------
35# Read in processor information
36#------------------------------------------------------------------------------
37
38numprocs = size()
39myid = rank()
40processor_name = get_processor_name()
41#print 'I am processor %d of %d on node %s' %(myid, numprocs, processor_name)
42
43
44
45def distribute(domain, verbose=False, debug=False, parameters = None):
46    """ Distribute the domain to all processes
47
48    parameters allows user to change size of ghost layer
49    """
50
51    if not pypar_available or numprocs == 1 : return domain # Bypass
52
53       
54    if myid == 0:
55        from sequential_distribute import Sequential_distribute
56        partition = Sequential_distribute(domain, verbose, debug, parameters)
57
58        partition.distribute(numprocs)
59
60        kwargs, points, vertices, boundary, quantities, boundary_map, \
61                domain_name, domain_dir, domain_store, domain_store_centroids, \
62                domain_minimum_storable_height, domain_minimum_allowed_height, \
63                domain_flow_algorithm, georef = partition.extract_submesh(0)
64       
65        for p in range(1, numprocs):
66
67            tostore = partition.extract_submesh(p)
68
69            send(tostore,p)
70
71    else:
72
73        kwargs, points, vertices, boundary, quantities, boundary_map, \
74                       domain_name, domain_dir, domain_store, domain_store_centroids, \
75                       domain_minimum_storable_height, domain_minimum_allowed_height, \
76                       domain_flow_algorithm, georef = receive(0)
77
78    #---------------------------------------------------------------------------
79    # Now Create parallel domain
80    #---------------------------------------------------------------------------
81    parallel_domain = Parallel_domain(points, vertices, boundary, **kwargs)
82
83
84    #------------------------------------------------------------------------
85    # Copy in quantity data
86    #------------------------------------------------------------------------
87    for q in quantities:
88        try:
89            parallel_domain.set_quantity(q, quantities[q])
90        except KeyError:
91            #print 'Try to create quantity %s'% q
92            from anuga import Quantity
93            Q = Quantity(parallel_domain, name=q, register=True)
94            parallel_domain.set_quantity(q, quantities[q])
95
96    #------------------------------------------------------------------------
97    # Transfer boundary conditions to each subdomain
98    #------------------------------------------------------------------------
99    boundary_map['ghost'] = None  # Add binding to ghost boundary
100    parallel_domain.set_boundary(boundary_map)
101
102
103    #------------------------------------------------------------------------
104    # Transfer other attributes to each subdomain
105    #------------------------------------------------------------------------
106    parallel_domain.set_name(domain_name)
107    parallel_domain.set_datadir(domain_dir)
108    parallel_domain.set_store(domain_store)
109    parallel_domain.set_store_centroids(domain_store_centroids)
110    parallel_domain.set_minimum_storable_height(domain_minimum_storable_height)
111    parallel_domain.set_minimum_allowed_height(domain_minimum_allowed_height)
112    parallel_domain.set_flow_algorithm(domain_flow_algorithm)
113    parallel_domain.geo_reference = georef
114
115
116    return parallel_domain
117
118
119       
120       
121
122
123
124def old_distribute(domain, verbose=False, debug=False, parameters = None):
125    """ Distribute the domain to all processes
126
127    parameters values 
128    """
129
130
131    if debug:
132        verbose = True
133       
134    barrier()
135
136    # FIXME: Dummy assignment (until boundaries are refactored to
137    # be independent of domains until they are applied)
138    if myid == 0:
139        bdmap = {}
140        for tag in domain.get_boundary_tags():
141            bdmap[tag] = None
142   
143   
144        domain.set_boundary(bdmap)
145
146
147    if not pypar_available or numprocs == 1 : return domain # Bypass
148
149    # For some obscure reason this communication must happen prior to
150    # the more complex mesh distribution - Oh Well!
151    if myid == 0:
152        domain_name = domain.get_name()
153        domain_dir = domain.get_datadir()
154        domain_store = domain.get_store()
155        domain_store_centroids = domain.get_store_centroids()
156        domain_smooth = domain.smooth
157        domain_reduction = domain.reduction
158        domain_minimum_storable_height = domain.minimum_storable_height
159        domain_flow_algorithm = domain.get_flow_algorithm()
160        domain_minimum_allowed_height = domain.get_minimum_allowed_height()
161        georef = domain.geo_reference
162        number_of_global_triangles = domain.number_of_triangles
163        number_of_global_nodes = domain.number_of_nodes
164       
165        # FIXME - what other attributes need to be transferred?
166
167        for p in xrange(1, numprocs):
168            # FIXME SR: Creates cPickle dump
169            send((domain_name, domain_dir, domain_store, \
170                  domain_store_centroids, domain_smooth, domain_reduction, \
171                  domain_minimum_storable_height, domain_flow_algorithm, \
172                  domain_minimum_allowed_height, georef, \
173                  number_of_global_triangles, number_of_global_nodes), p)
174    else:
175        if verbose: print 'P%d: Receiving domain attributes' %(myid)
176
177        domain_name, domain_dir, domain_store, \
178                  domain_store_centroids, domain_smooth, domain_reduction, \
179                  domain_minimum_storable_height, domain_flow_algorithm, \
180                  domain_minimum_allowed_height, georef, \
181                  number_of_global_triangles, \
182                  number_of_global_nodes = receive(0)
183
184
185
186    # Distribute boundary conditions
187    # FIXME: This cannot handle e.g. Time_boundaries due to
188    # difficulties pickling functions
189    if myid == 0:
190        boundary_map = domain.boundary_map
191        for p in xrange(1, numprocs):
192            # FIXME SR: Creates cPickle dump
193            send(boundary_map, p)
194    else:
195        if verbose: print 'P%d: Receiving boundary map' %(myid)       
196
197        boundary_map = receive(0)
198       
199
200    send_s2p = False
201
202    if myid == 0:
203        # Partition and distribute mesh.
204        # Structures returned is in the
205        # correct form for the ANUGA data structure
206
207
208        points, vertices, boundary, quantities,\
209                ghost_recv_dict, full_send_dict,\
210                number_of_full_nodes, number_of_full_triangles,\
211                s2p_map, p2s_map, tri_map, node_map, tri_l2g, node_l2g, \
212                ghost_layer_width =\
213                distribute_mesh(domain, verbose=verbose, debug=debug, parameters=parameters)
214           
215        # Extract l2g maps
216        #tri_l2g  = extract_l2g_map(tri_map)
217        #node_l2g = extract_l2g_map(node_map)
218        #tri_l2g = p2s_map[tri_l2g]
219       
220        if debug:
221            print 'P%d' %myid
222            print 'tri_map ',tri_map
223            print 'node_map',node_map
224            print 'tri_l2g', tri_l2g
225            print 'node_l2g', node_l2g
226            print 's2p_map', s2p_map
227            print 'p2s_map', p2s_map
228
229
230        def protocol(x):
231            vanilla=False
232            import pypar
233            control_info, x = pypar.create_control_info(x, vanilla, return_object=True)
234            print 'protocol', control_info[0]
235           
236        # Send serial to parallel (s2p) and parallel to serial (p2s) triangle mapping to proc 1 .. numprocs
237
238
239        if send_s2p :
240            n = len(s2p_map)
241            s2p_map_keys_flat = num.reshape(num.array(s2p_map.keys(),num.int), (n,1) )
242            s2p_map_values_flat = num.array(s2p_map.values(),num.int)
243            s2p_map_flat = num.concatenate( (s2p_map_keys_flat, s2p_map_values_flat), axis=1 )
244
245            n = len(p2s_map)
246            p2s_map_keys_flat = num.reshape(num.array(p2s_map.keys(),num.int), (n,2) )
247            p2s_map_values_flat = num.reshape(num.array(p2s_map.values(),num.int) , (n,1))
248            p2s_map_flat = num.concatenate( (p2s_map_keys_flat, p2s_map_values_flat), axis=1 )
249
250            for p in range(1, numprocs):
251
252                # FIXME SR: Creates cPickle dump
253                send(s2p_map_flat, p)
254                # FIXME SR: Creates cPickle dump
255                #print p2s_map
256                send(p2s_map_flat, p)
257        else:
258            if verbose: print 'Not sending s2p_map and p2s_map'
259            s2p_map = None
260            p2s_map = None
261
262        if verbose: print 'Communication done'
263       
264    else:
265        # Read in the mesh partition that belongs to this
266        # processor
267        if verbose: print 'P%d: Receiving submeshes' %(myid)               
268        points, vertices, boundary, quantities,\
269                ghost_recv_dict, full_send_dict,\
270                number_of_full_nodes, number_of_full_triangles, \
271                tri_map, node_map, tri_l2g, node_l2g, ghost_layer_width =\
272                rec_submesh(0, verbose)
273
274
275
276        # Extract l2g maps
277        #tri_l2g  = extract_l2g_map(tri_map)
278        #node_l2g = extract_l2g_map(node_map)
279        #tri_l2g = p2s_map[tri_l2g]
280       
281        # Receive serial to parallel (s2p) and parallel to serial (p2s) triangle mapping
282        if send_s2p :
283            s2p_map_flat = receive(0)
284            s2p_map = dict.fromkeys(s2p_map_flat[:,0], s2p_map_flat[:,1:2])
285
286            p2s_map_flat = receive(0)
287            p2s_map_keys = [tuple(x) for x in p2s_map_flat[:,0:1]]
288
289            p2s_map = dict.fromkeys(p2s_map_keys, p2s_map_flat[:,2])
290        else:
291            s2p_map = None
292            p2s_map = None
293           
294    #------------------------------------------------------------------------
295    # Build the domain for this processor using partion structures
296    #------------------------------------------------------------------------
297
298    if verbose: print 'myid = %g, no_full_nodes = %g, no_full_triangles = %g' % (myid, number_of_full_nodes, number_of_full_triangles)
299
300   
301    domain = Parallel_domain(points, vertices, boundary,
302                             full_send_dict=full_send_dict,
303                             ghost_recv_dict=ghost_recv_dict,
304                             number_of_full_nodes=number_of_full_nodes,
305                             number_of_full_triangles=number_of_full_triangles,
306                             geo_reference=georef,
307                             number_of_global_triangles = number_of_global_triangles,
308                             number_of_global_nodes = number_of_global_nodes,
309                             s2p_map = s2p_map,
310                             p2s_map = p2s_map, ## jj added this
311                             tri_l2g = tri_l2g, ## SR added this
312                             node_l2g = node_l2g,
313                             ghost_layer_width = ghost_layer_width)
314
315    #------------------------------------------------------------------------
316    # Transfer initial conditions to each subdomain
317    #------------------------------------------------------------------------
318    for q in quantities:
319        domain.set_quantity(q, quantities[q]) 
320
321
322    #------------------------------------------------------------------------
323    # Transfer boundary conditions to each subdomain
324    #------------------------------------------------------------------------
325    boundary_map['ghost'] = None  # Add binding to ghost boundary
326    domain.set_boundary(boundary_map)
327
328
329    #------------------------------------------------------------------------
330    # Transfer other attributes to each subdomain
331    #------------------------------------------------------------------------
332    domain.set_name(domain_name)
333    domain.set_datadir(domain_dir)
334    domain.set_store(domain_store)
335    domain.set_store_centroids(domain_store_centroids)
336    domain.set_store_vertices_smoothly(domain_smooth,domain_reduction)
337    domain.set_minimum_storable_height(domain_minimum_storable_height)
338    domain.set_minimum_allowed_height(domain_minimum_allowed_height)
339    domain.set_flow_algorithm(domain_flow_algorithm)
340    domain.geo_reference = georef   
341
342    #------------------------------------------------------------------------
343    # Return parallel domain to all nodes
344    #------------------------------------------------------------------------
345    return domain   
346
347
348
349
350
351
352def distribute_mesh(domain, verbose=False, debug=False, parameters=None):
353    """ Distribute andsend the mesh info to all the processors.
354    Should only be run from processor 0 and will send info to the other
355    processors.
356    There should be a corresponding  rec_submesh called from all the other
357    processors
358    """
359
360    if debug:
361        verbose = True
362
363    numprocs = size()
364
365   
366    # Subdivide the mesh
367    if verbose: print 'Subdivide mesh'
368    new_nodes, new_triangles, new_boundary, triangles_per_proc, quantities, \
369           s2p_map, p2s_map = \
370           pmesh_divide_metis_with_map(domain, numprocs)
371
372    #PETE: s2p_map (maps serial domain triangles to parallel domain triangles)
373    #      sp2_map (maps parallel domain triangles to domain triangles)
374
375
376
377    # Build the mesh that should be assigned to each processor,
378    # this includes ghost nodes and the communication pattern
379    if verbose: print 'Build submeshes'   
380    submesh = build_submesh(new_nodes, new_triangles, new_boundary, quantities, triangles_per_proc, parameters)
381
382    if verbose:
383        for p in range(numprocs):
384            N = len(submesh['ghost_nodes'][p])               
385            M = len(submesh['ghost_triangles'][p])
386            print 'There are %d ghost nodes and %d ghost triangles on proc %d'\
387                  %(N, M, p)
388
389    #if debug:
390    #    from pprint import pprint
391    #    pprint(submesh)
392
393
394    # Send the mesh partition to the appropriate processor
395    if verbose: print 'Distribute submeshes'       
396    for p in range(1, numprocs):
397        send_submesh(submesh, triangles_per_proc, p2s_map, p, verbose)
398
399    # Build the local mesh for processor 0
400    points, vertices, boundary, quantities, \
401            ghost_recv_dict, full_send_dict, \
402            tri_map, node_map, tri_l2g, node_l2g, ghost_layer_width =\
403              extract_submesh(submesh, triangles_per_proc, p2s_map, 0)
404
405
406             
407    # Keep track of the number full nodes and triangles.
408    # This is useful later if one needs access to a ghost-free domain
409    # Here, we do it for process 0. The others are done in rec_submesh.
410    number_of_full_nodes = len(submesh['full_nodes'][0])
411    number_of_full_triangles = len(submesh['full_triangles'][0])
412       
413    #print
414    #for p in range(numprocs):
415    #    print 'Process %d:' %(p)
416    #
417    #    print 'full_triangles:'
418    #    print submesh['full_triangles'][p]
419    #
420    #    print 'full_nodes:'
421    #    print submesh['full_nodes'][p]
422    #
423    #    print 'ghost_triangles:'
424    #    print submesh['ghost_triangles'][p]#
425    #
426    #    print 'ghost_nodes:'
427    #   print submesh['ghost_nodes'][p]                               
428    #    print
429    #
430    #print 'Receive dict'
431    #print ghost_recv_dict
432    #
433    #print 'Send dict'
434    #print full_send_dict       
435
436
437    # Return structures necessary for building the parallel domain
438    return points, vertices, boundary, quantities,\
439           ghost_recv_dict, full_send_dict,\
440           number_of_full_nodes, number_of_full_triangles, \
441           s2p_map, p2s_map, tri_map, node_map, tri_l2g, node_l2g, \
442           ghost_layer_width
443   
444
445
446## def extract_l2g_map(map):
447##     # Extract l2g data  from corresponding map
448##     # Maps
449
450##     import numpy as num
451   
452##     # FIXME: this is where we loss the original order of
453##     # sequential domain
454##     b = num.arange(len(map))
455
456##     l_ids = num.extract(map>-1,map)
457##     g_ids = num.extract(map>-1,b)
458
459## #    print len(g_ids)
460## #    print len(l_ids)
461## #    print l_ids
462
463##     l2g = num.zeros_like(g_ids)
464##     l2g[l_ids] = g_ids
465
466##     return l2g
467
Note: See TracBrowser for help on using the repository browser.