source: trunk/anuga_core/source/anuga_parallel/parallel_api.py @ 9020

Last change on this file since 9020 was 9020, checked in by steve, 11 years ago

sww_merge should now work with set_store_centroids in parallel

File size: 12.8 KB
Line 
1"""Trying to lump parallel stuff into simpler interface
2
3
4"""
5
6import numpy as num
7
8# The abstract Python-MPI interface
9from anuga_parallel.parallel_abstraction import size, rank, get_processor_name
10from anuga_parallel.parallel_abstraction import finalize, send, receive
11from anuga_parallel.parallel_abstraction import pypar_available, barrier
12
13
14# ANUGA parallel engine (only load if pypar can)
15if pypar_available:
16    from anuga_parallel.distribute_mesh  import send_submesh
17    from anuga_parallel.distribute_mesh  import rec_submesh
18    from anuga_parallel.distribute_mesh  import extract_submesh
19
20    # Mesh partitioning using Metis
21    from anuga_parallel.distribute_mesh import build_submesh
22    from anuga_parallel.distribute_mesh import pmesh_divide_metis_with_map
23
24    from anuga_parallel.parallel_shallow_water import Parallel_domain
25
26from anuga.abstract_2d_finite_volumes.neighbour_mesh import Mesh
27
28#------------------------------------------------------------------------------
29# Read in processor information
30#------------------------------------------------------------------------------
31
32numprocs = size()
33myid = rank()
34processor_name = get_processor_name()
35#print 'I am processor %d of %d on node %s' %(myid, numprocs, processor_name)
36
37
38
39
40def distribute(domain, verbose=False, debug=False, parameters = None):
41    """ Distribute the domain to all processes
42
43    parameters values 
44    """
45
46
47    if debug:
48        verbose = True
49       
50    barrier()
51
52    # FIXME: Dummy assignment (until boundaries are refactored to
53    # be independent of domains until they are applied)
54    if myid == 0:
55        bdmap = {}
56        for tag in domain.get_boundary_tags():
57            bdmap[tag] = None
58   
59   
60        domain.set_boundary(bdmap)
61
62
63    if not pypar_available or numprocs == 1 : return domain # Bypass
64
65    # For some obscure reason this communication must happen prior to
66    # the more complex mesh distribution - Oh Well!
67    if myid == 0:
68        domain_name = domain.get_name()
69        domain_dir = domain.get_datadir()
70        domain_store = domain.get_store()
71        domain_store_centroids = domain.get_store_centroids()
72        domain_smooth = domain.smooth
73        domain_reduction = domain.reduction
74        domain_minimum_storable_height = domain.minimum_storable_height
75        domain_flow_algorithm = domain.get_flow_algorithm()
76        domain_minimum_allowed_height = domain.get_minimum_allowed_height()
77        georef = domain.geo_reference
78        number_of_global_triangles = domain.number_of_triangles
79        number_of_global_nodes = domain.number_of_nodes
80       
81        # FIXME - what other attributes need to be transferred?
82
83        for p in xrange(1, numprocs):
84            # FIXME SR: Creates cPickle dump
85            send((domain_name, domain_dir, domain_store, \
86                  domain_store_centroids, domain_smooth, domain_reduction, \
87                  domain_minimum_storable_height, domain_flow_algorithm, \
88                  domain_minimum_allowed_height, georef, \
89                  number_of_global_triangles, number_of_global_nodes), p)
90    else:
91        if verbose: print 'P%d: Receiving domain attributes' %(myid)
92
93        domain_name, domain_dir, domain_store, \
94                  domain_store_centroids, domain_smooth, domain_reduction, \
95                  domain_minimum_storable_height, domain_flow_algorithm, \
96                  domain_minimum_allowed_height, georef, \
97                  number_of_global_triangles, \
98                  number_of_global_nodes = receive(0)
99
100
101
102    # Distribute boundary conditions
103    # FIXME: This cannot handle e.g. Time_boundaries due to
104    # difficulties pickling functions
105    if myid == 0:
106        boundary_map = domain.boundary_map
107        for p in xrange(1, numprocs):
108            # FIXME SR: Creates cPickle dump
109            send(boundary_map, p)
110    else:
111        if verbose: print 'P%d: Receiving boundary map' %(myid)       
112
113        boundary_map = receive(0)
114       
115
116    send_s2p = False
117
118    if myid == 0:
119        # Partition and distribute mesh.
120        # Structures returned is in the
121        # correct form for the ANUGA data structure
122
123
124        points, vertices, boundary, quantities,\
125                ghost_recv_dict, full_send_dict,\
126                number_of_full_nodes, number_of_full_triangles,\
127                s2p_map, p2s_map, tri_map, node_map, ghost_layer_width =\
128                distribute_mesh(domain, verbose=verbose, debug=debug, parameters=parameters)
129           
130        # Extract l2g maps
131        tri_l2g  = extract_l2g_map(tri_map)
132        node_l2g = extract_l2g_map(node_map)
133
134        if debug:
135            print 'P%d' %myid
136            print 'tri_map ',tri_map
137            print 'node_map',node_map
138            print 'tri_l2g', tri_l2g
139            print 'node_l2g', node_l2g
140            print 's2p_map', s2p_map
141            print 'p2s_map', p2s_map
142
143
144        def protocol(x):
145            vanilla=False
146            import pypar
147            control_info, x = pypar.create_control_info(x, vanilla, return_object=True)
148            print 'protocol', control_info[0]
149           
150        # Send serial to parallel (s2p) and parallel to serial (p2s) triangle mapping to proc 1 .. numprocs
151
152
153        if send_s2p :
154            n = len(s2p_map)
155            s2p_map_keys_flat = num.reshape(num.array(s2p_map.keys(),num.int), (n,1) )
156            s2p_map_values_flat = num.array(s2p_map.values(),num.int)
157            s2p_map_flat = num.concatenate( (s2p_map_keys_flat, s2p_map_values_flat), axis=1 )
158
159            n = len(p2s_map)
160            p2s_map_keys_flat = num.reshape(num.array(p2s_map.keys(),num.int), (n,2) )
161            p2s_map_values_flat = num.reshape(num.array(p2s_map.values(),num.int) , (n,1))
162            p2s_map_flat = num.concatenate( (p2s_map_keys_flat, p2s_map_values_flat), axis=1 )
163
164            for p in range(1, numprocs):
165
166                # FIXME SR: Creates cPickle dump
167                send(s2p_map_flat, p)
168                # FIXME SR: Creates cPickle dump
169                #print p2s_map
170                send(p2s_map_flat, p)
171        else:
172            if verbose: print 'Not sending s2p_map and p2s_map'
173            s2p_map = None
174            p2s_map = None
175
176        if verbose: print 'Communication done'
177       
178    else:
179        # Read in the mesh partition that belongs to this
180        # processor
181        if verbose: print 'P%d: Receiving submeshes' %(myid)               
182        points, vertices, boundary, quantities,\
183                ghost_recv_dict, full_send_dict,\
184                number_of_full_nodes, number_of_full_triangles, \
185                tri_map, node_map, ghost_layer_width =\
186                rec_submesh(0, verbose)
187
188
189
190        # Extract l2g maps
191        tri_l2g  = extract_l2g_map(tri_map)
192        node_l2g = extract_l2g_map(node_map)
193       
194        # Recieve serial to parallel (s2p) and parallel to serial (p2s) triangle mapping
195        if send_s2p :
196            s2p_map_flat = receive(0)
197            s2p_map = dict.fromkeys(s2p_map_flat[:,0], s2p_map_flat[:,1:2])
198
199            p2s_map_flat = receive(0)
200            p2s_map_keys = [tuple(x) for x in p2s_map_flat[:,0:1]]
201
202            p2s_map = dict.fromkeys(p2s_map_keys, p2s_map_flat[:,2])
203        else:
204            s2p_map = None
205            p2s_map = None
206           
207    #------------------------------------------------------------------------
208    # Build the domain for this processor using partion structures
209    #------------------------------------------------------------------------
210
211    if verbose: print 'myid = %g, no_full_nodes = %g, no_full_triangles = %g' % (myid, number_of_full_nodes, number_of_full_triangles)
212
213   
214    domain = Parallel_domain(points, vertices, boundary,
215                             full_send_dict=full_send_dict,
216                             ghost_recv_dict=ghost_recv_dict,
217                             number_of_full_nodes=number_of_full_nodes,
218                             number_of_full_triangles=number_of_full_triangles,
219                             geo_reference=georef,
220                             number_of_global_triangles = number_of_global_triangles,
221                             number_of_global_nodes = number_of_global_nodes,
222                             s2p_map = s2p_map,
223                             p2s_map = p2s_map, ## jj added this
224                             tri_l2g = tri_l2g, ## SR added this
225                             node_l2g = node_l2g,
226                             ghost_layer_width = ghost_layer_width)
227
228    #------------------------------------------------------------------------
229    # Transfer initial conditions to each subdomain
230    #------------------------------------------------------------------------
231    for q in quantities:
232        domain.set_quantity(q, quantities[q]) 
233
234
235    #------------------------------------------------------------------------
236    # Transfer boundary conditions to each subdomain
237    #------------------------------------------------------------------------
238    boundary_map['ghost'] = None  # Add binding to ghost boundary
239    domain.set_boundary(boundary_map)
240
241
242    #------------------------------------------------------------------------
243    # Transfer other attributes to each subdomain
244    #------------------------------------------------------------------------
245    domain.set_name(domain_name)
246    domain.set_datadir(domain_dir)
247    domain.set_store(domain_store)
248    domain.set_store_centroids(domain_store_centroids)
249    domain.set_store_vertices_smoothly(domain_smooth,domain_reduction)
250    domain.set_minimum_storable_height(domain_minimum_storable_height)
251    domain.set_minimum_allowed_height(domain_minimum_allowed_height)
252    domain.set_flow_algorithm(domain_flow_algorithm)
253    domain.geo_reference = georef   
254
255    #------------------------------------------------------------------------
256    # Return parallel domain to all nodes
257    #------------------------------------------------------------------------
258    return domain   
259
260
261
262
263
264
265def distribute_mesh(domain, verbose=False, debug=False, parameters=None):
266
267
268    if debug:
269        verbose = True
270
271    numprocs = size()
272
273   
274    # Subdivide the mesh
275    if verbose: print 'Subdivide mesh'
276    new_nodes, new_triangles, new_boundary, triangles_per_proc, quantities, \
277           s2p_map, p2s_map = \
278           pmesh_divide_metis_with_map(domain, numprocs)
279
280    #PETE: s2p_map (maps serial domain triangles to parallel domain triangles)
281    #      sp2_map (maps parallel domain triangles to domain triangles)
282
283
284
285    # Build the mesh that should be assigned to each processor,
286    # this includes ghost nodes and the communication pattern
287    if verbose: print 'Build submeshes'   
288    submesh = build_submesh(new_nodes, new_triangles, new_boundary, quantities, triangles_per_proc, parameters)
289
290    if verbose:
291        for p in range(numprocs):
292            N = len(submesh['ghost_nodes'][p])               
293            M = len(submesh['ghost_triangles'][p])
294            print 'There are %d ghost nodes and %d ghost triangles on proc %d'\
295                  %(N, M, p)
296
297    #if debug:
298    #    from pprint import pprint
299    #    pprint(submesh)
300
301
302    # Send the mesh partition to the appropriate processor
303    if verbose: print 'Distribute submeshes'       
304    for p in range(1, numprocs):
305        send_submesh(submesh, triangles_per_proc, p, verbose)
306
307    # Build the local mesh for processor 0
308    points, vertices, boundary, quantities, \
309            ghost_recv_dict, full_send_dict, tri_map, node_map, ghost_layer_width =\
310              extract_submesh(submesh, triangles_per_proc,0)
311
312    # Keep track of the number full nodes and triangles.
313    # This is useful later if one needs access to a ghost-free domain
314    # Here, we do it for process 0. The others are done in rec_submesh.
315    number_of_full_nodes = len(submesh['full_nodes'][0])
316    number_of_full_triangles = len(submesh['full_triangles'][0])
317       
318    #print
319    #for p in range(numprocs):
320    #    print 'Process %d:' %(p)
321    #
322    #    print 'full_triangles:'
323    #    print submesh['full_triangles'][p]
324    #
325    #    print 'full_nodes:'
326    #    print submesh['full_nodes'][p]
327    #
328    #    print 'ghost_triangles:'
329    #    print submesh['ghost_triangles'][p]#
330    #
331    #    print 'ghost_nodes:'
332    #   print submesh['ghost_nodes'][p]                               
333    #    print
334    #
335    #print 'Receive dict'
336    #print ghost_recv_dict
337    #
338    #print 'Send dict'
339    #print full_send_dict       
340
341
342    # Return structures necessary for building the parallel domain
343    return points, vertices, boundary, quantities,\
344           ghost_recv_dict, full_send_dict,\
345           number_of_full_nodes, number_of_full_triangles, \
346           s2p_map, p2s_map, tri_map, node_map, ghost_layer_width
347   
348
349
350def extract_l2g_map(map):
351    # Extract l2g data  from corresponding map
352    # Maps
353
354    import numpy as num
355   
356    b = num.arange(len(map))
357
358    l_ids = num.extract(map>-1,map)
359    g_ids = num.extract(map>-1,b)
360
361#    print len(g_ids)
362#    print len(l_ids)
363#    print l_ids
364
365    l2g = num.zeros_like(g_ids)
366    l2g[l_ids] = g_ids
367
368    return l2g
369
Note: See TracBrowser for help on using the repository browser.