source: trunk/anuga_core/source/anuga_parallel/parallel_api.py @ 8553

Last change on this file since 8553 was 8553, checked in by steve, 13 years ago

Committing faster ghost_layer code

File size: 12.0 KB
Line 
1"""Trying to lump parallel stuff into simpler interface
2
3
4"""
5
6import numpy as num
7
8# The abstract Python-MPI interface
9from anuga_parallel.parallel_abstraction import size, rank, get_processor_name
10from anuga_parallel.parallel_abstraction import finalize, send, receive
11from anuga_parallel.parallel_abstraction import pypar_available, barrier
12
13
14# ANUGA parallel engine (only load if pypar can)
15if pypar_available:
16    from anuga_parallel.distribute_mesh  import send_submesh
17    from anuga_parallel.distribute_mesh  import rec_submesh
18    from anuga_parallel.distribute_mesh  import extract_hostmesh
19
20    # Mesh partitioning using Metis
21    from anuga_parallel.distribute_mesh import build_submesh
22    from anuga_parallel.distribute_mesh import pmesh_divide_metis_with_map
23
24    from anuga_parallel.parallel_shallow_water import Parallel_domain
25
26from anuga.abstract_2d_finite_volumes.neighbour_mesh import Mesh
27
28#------------------------------------------------------------------------------
29# Read in processor information
30#------------------------------------------------------------------------------
31
32numprocs = size()
33myid = rank()
34processor_name = get_processor_name()
35#print 'I am processor %d of %d on node %s' %(myid, numprocs, processor_name)
36
37
38
39
40def distribute(domain, verbose=False, debug=False, parameters = None):
41    """ Distribute the domain to all processes
42    """
43
44
45    if debug:
46        verbose = True
47       
48    barrier()
49
50    # FIXME: Dummy assignment (until boundaries are refactored to
51    # be independent of domains until they are applied)
52    if myid == 0:
53        bdmap = {}
54        for tag in domain.get_boundary_tags():
55            bdmap[tag] = None
56   
57   
58        domain.set_boundary(bdmap)
59
60
61    if not pypar_available or numprocs == 1 : return domain # Bypass
62
63    # For some obscure reason this communication must happen prior to
64    # the more complex mesh distribution - Oh Well!
65    if myid == 0:
66        domain_name = domain.get_name()
67        domain_dir = domain.get_datadir()
68        domain_store = domain.get_store()
69        domain_minimum_storable_height = domain.minimum_storable_height
70        georef = domain.geo_reference
71        number_of_global_triangles = domain.number_of_triangles
72        number_of_global_nodes = domain.number_of_nodes
73       
74        # FIXME - what other attributes need to be transferred?
75
76        for p in xrange(1, numprocs):
77            # FIXME SR: Creates cPickle dump
78            send((domain_name, domain_dir, domain_store, \
79                  domain_minimum_storable_height, georef, \
80                  number_of_global_triangles, number_of_global_nodes), p)
81    else:
82        if verbose: print 'P%d: Receiving domain attributes' %(myid)
83
84        domain_name, domain_dir, domain_store, \
85                  domain_minimum_storable_height, \
86                  georef, number_of_global_triangles, \
87                  number_of_global_nodes = receive(0)
88
89
90
91    # Distribute boundary conditions
92    # FIXME: This cannot handle e.g. Time_boundaries due to
93    # difficulties pickling functions
94    if myid == 0:
95        boundary_map = domain.boundary_map
96        for p in xrange(1, numprocs):
97            # FIXME SR: Creates cPickle dump
98            send(boundary_map, p)
99    else:
100        if verbose: print 'P%d: Receiving boundary map' %(myid)       
101
102        boundary_map = receive(0)
103       
104
105    send_s2p = False
106
107    if myid == 0:
108        # Partition and distribute mesh.
109        # Structures returned is in the
110        # correct form for the ANUGA data structure
111
112
113        points, vertices, boundary, quantities,\
114                ghost_recv_dict, full_send_dict,\
115                number_of_full_nodes, number_of_full_triangles,\
116                s2p_map, p2s_map, tri_map, node_map, ghost_layer_width =\
117                distribute_mesh(domain, verbose=verbose, debug=debug, parameters=parameters)
118           
119        # Extract l2g maps
120        tri_l2g  = extract_l2g_map(tri_map)
121        node_l2g = extract_l2g_map(node_map)
122
123        if debug:
124            print 'P%d' %myid
125            print 'tri_map ',tri_map
126            print 'node_map',node_map
127            print 'tri_l2g', tri_l2g
128            print 'node_l2g', node_l2g
129            print 's2p_map', s2p_map
130            print 'p2s_map', p2s_map
131
132
133        def protocol(x):
134            vanilla=False
135            import pypar
136            control_info, x = pypar.create_control_info(x, vanilla, return_object=True)
137            print 'protocol', control_info[0]
138           
139        # Send serial to parallel (s2p) and parallel to serial (p2s) triangle mapping to proc 1 .. numprocs
140
141
142        if send_s2p :
143            n = len(s2p_map)
144            s2p_map_keys_flat = num.reshape(num.array(s2p_map.keys(),num.int), (n,1) )
145            s2p_map_values_flat = num.array(s2p_map.values(),num.int)
146            s2p_map_flat = num.concatenate( (s2p_map_keys_flat, s2p_map_values_flat), axis=1 )
147
148            n = len(p2s_map)
149            p2s_map_keys_flat = num.reshape(num.array(p2s_map.keys(),num.int), (n,2) )
150            p2s_map_values_flat = num.reshape(num.array(p2s_map.values(),num.int) , (n,1))
151            p2s_map_flat = num.concatenate( (p2s_map_keys_flat, p2s_map_values_flat), axis=1 )
152
153            for p in range(1, numprocs):
154
155                # FIXME SR: Creates cPickle dump
156                send(s2p_map_flat, p)
157                # FIXME SR: Creates cPickle dump
158                #print p2s_map
159                send(p2s_map_flat, p)
160        else:
161            if verbose: print 'Not sending s2p_map and p2s_map'
162            s2p_map = None
163            p2s_map = None
164
165        if verbose: print 'Communication done'
166       
167    else:
168        # Read in the mesh partition that belongs to this
169        # processor
170        if verbose: print 'P%d: Receiving submeshes' %(myid)               
171        points, vertices, boundary, quantities,\
172                ghost_recv_dict, full_send_dict,\
173                number_of_full_nodes, number_of_full_triangles, \
174                tri_map, node_map, ghost_layer_width =\
175                rec_submesh(0, verbose)
176
177
178
179        # Extract l2g maps
180        tri_l2g  = extract_l2g_map(tri_map)
181        node_l2g = extract_l2g_map(node_map)
182       
183        # Recieve serial to parallel (s2p) and parallel to serial (p2s) triangle mapping
184        if send_s2p :
185            s2p_map_flat = receive(0)
186            s2p_map = dict.fromkeys(s2p_map_flat[:,0], s2p_map_flat[:,1:2])
187
188            p2s_map_flat = receive(0)
189            p2s_map_keys = [tuple(x) for x in p2s_map_flat[:,0:1]]
190
191            p2s_map = dict.fromkeys(p2s_map_keys, p2s_map_flat[:,2])
192        else:
193            s2p_map = None
194            p2s_map = None
195           
196    #------------------------------------------------------------------------
197    # Build the domain for this processor using partion structures
198    #------------------------------------------------------------------------
199
200    if verbose: print 'myid = %g, no_full_nodes = %g, no_full_triangles = %g' % (myid, number_of_full_nodes, number_of_full_triangles)
201
202   
203    domain = Parallel_domain(points, vertices, boundary,
204                             full_send_dict=full_send_dict,
205                             ghost_recv_dict=ghost_recv_dict,
206                             number_of_full_nodes=number_of_full_nodes,
207                             number_of_full_triangles=number_of_full_triangles,
208                             geo_reference=georef,
209                             number_of_global_triangles = number_of_global_triangles,
210                             number_of_global_nodes = number_of_global_nodes,
211                             s2p_map = s2p_map,
212                             p2s_map = p2s_map, ## jj added this
213                             tri_l2g = tri_l2g, ## SR added this
214                             node_l2g = node_l2g,
215                             ghost_layer_width = ghost_layer_width)
216
217    #------------------------------------------------------------------------
218    # Transfer initial conditions to each subdomain
219    #------------------------------------------------------------------------
220    for q in quantities:
221        domain.set_quantity(q, quantities[q]) 
222
223
224    #------------------------------------------------------------------------
225    # Transfer boundary conditions to each subdomain
226    #------------------------------------------------------------------------
227    boundary_map['ghost'] = None  # Add binding to ghost boundary
228    domain.set_boundary(boundary_map)
229
230
231    #------------------------------------------------------------------------
232    # Transfer other attributes to each subdomain
233    #------------------------------------------------------------------------
234    domain.set_name(domain_name)
235    domain.set_datadir(domain_dir)
236    domain.set_store(domain_store)
237    domain.set_minimum_storable_height(domain_minimum_storable_height)
238    domain.geo_reference = georef   
239
240    #------------------------------------------------------------------------
241    # Return parallel domain to all nodes
242    #------------------------------------------------------------------------
243    return domain   
244
245
246
247
248
249
250def distribute_mesh(domain, verbose=False, debug=False, parameters=None):
251
252
253    if debug:
254        verbose = True
255
256    numprocs = size()
257
258   
259    # Subdivide the mesh
260    if verbose: print 'Subdivide mesh'
261    new_nodes, new_triangles, new_boundary, triangles_per_proc, quantities, \
262           s2p_map, p2s_map = \
263           pmesh_divide_metis_with_map(domain, numprocs)
264
265    #PETE: s2p_map (maps serial domain triangles to parallel domain triangles)
266    #      sp2_map (maps parallel domain triangles to domain triangles)
267
268
269
270    # Build the mesh that should be assigned to each processor,
271    # this includes ghost nodes and the communication pattern
272    if verbose: print 'Build submeshes'   
273    submesh = build_submesh(new_nodes, new_triangles, new_boundary, quantities, triangles_per_proc, parameters)
274
275    if verbose:
276        for p in range(numprocs):
277            N = len(submesh['ghost_nodes'][p])               
278            M = len(submesh['ghost_triangles'][p])
279            print 'There are %d ghost nodes and %d ghost triangles on proc %d'\
280                  %(N, M, p)
281
282    #if debug:
283    #    from pprint import pprint
284    #    pprint(submesh)
285
286
287    # Send the mesh partition to the appropriate processor
288    if verbose: print 'Distribute submeshes'       
289    for p in range(1, numprocs):
290        send_submesh(submesh, triangles_per_proc, p, verbose)
291
292    # Build the local mesh for processor 0
293    points, vertices, boundary, quantities, \
294            ghost_recv_dict, full_send_dict, tri_map, node_map, ghost_layer_width =\
295              extract_hostmesh(submesh, triangles_per_proc)
296
297    # Keep track of the number full nodes and triangles.
298    # This is useful later if one needs access to a ghost-free domain
299    # Here, we do it for process 0. The others are done in rec_submesh.
300    number_of_full_nodes = len(submesh['full_nodes'][0])
301    number_of_full_triangles = len(submesh['full_triangles'][0])
302       
303    #print
304    #for p in range(numprocs):
305    #    print 'Process %d:' %(p)
306    #
307    #    print 'full_triangles:'
308    #    print submesh['full_triangles'][p]
309    #
310    #    print 'full_nodes:'
311    #    print submesh['full_nodes'][p]
312    #
313    #    print 'ghost_triangles:'
314    #    print submesh['ghost_triangles'][p]#
315    #
316    #    print 'ghost_nodes:'
317    #   print submesh['ghost_nodes'][p]                               
318    #    print
319    #
320    #print 'Receive dict'
321    #print ghost_recv_dict
322    #
323    #print 'Send dict'
324    #print full_send_dict       
325
326
327    # Return structures necessary for building the parallel domain
328    return points, vertices, boundary, quantities,\
329           ghost_recv_dict, full_send_dict,\
330           number_of_full_nodes, number_of_full_triangles, \
331           s2p_map, p2s_map, tri_map, node_map, ghost_layer_width
332   
333
334
335def extract_l2g_map(map):
336    # Extract l2g data  from corresponding map
337    # Maps
338
339    import numpy as num
340   
341    b = num.arange(len(map))
342
343    l_ids = num.extract(map>-1,map)
344    g_ids = num.extract(map>-1,b)
345
346#    print len(g_ids)
347#    print len(l_ids)
348#    print l_ids
349
350    l2g = num.zeros_like(g_ids)
351    l2g[l_ids] = g_ids
352
353    return l2g
354
Note: See TracBrowser for help on using the repository browser.