source: trunk/anuga_core/source/anuga_parallel/sequential_distribute.py @ 9426

Last change on this file since 9426 was 9416, checked in by steve, 10 years ago

Moved patong/gauges data

File size: 10.4 KB
Line 
1"""Trying to lump parallel stuff into simpler interface
2
3
4"""
5
6import numpy as num
7
8
9from anuga import Domain
10
11from anuga_parallel.distribute_mesh  import send_submesh
12from anuga_parallel.distribute_mesh  import rec_submesh
13from anuga_parallel.distribute_mesh  import extract_submesh
14
15# Mesh partitioning using Metis
16from anuga_parallel.distribute_mesh import build_submesh
17from anuga_parallel.distribute_mesh import pmesh_divide_metis_with_map
18
19from anuga_parallel.parallel_shallow_water import Parallel_domain
20
21
22
23class Sequential_distribute(object):
24
25    def __init__(self, domain, verbose=False, debug=False, parameters=None):
26
27        if debug:
28            verbose = True
29           
30        self.domain = domain
31        self.verbose = verbose
32        self.debug = debug
33        self.parameters = parameters
34
35
36    def distribute(self, numprocs=1):
37
38        self.numprocs = numprocs
39       
40        domain = self.domain
41        verbose = self.verbose
42        debug = self.debug
43        parameters = self.parameters
44
45        # FIXME: Dummy assignment (until boundaries are refactored to
46        # be independent of domains until they are applied)
47        bdmap = {}
48        for tag in domain.get_boundary_tags():
49            bdmap[tag] = None
50
51        domain.set_boundary(bdmap)
52
53
54        self.domain_name = domain.get_name()
55        self.domain_dir = domain.get_datadir()
56        self.domain_store = domain.get_store()
57        self.domain_store_centroids = domain.get_store_centroids()
58        self.domain_minimum_storable_height = domain.minimum_storable_height
59        self.domain_flow_algorithm = domain.get_flow_algorithm()
60        self.domain_minimum_allowed_height = domain.get_minimum_allowed_height()
61        self.domain_georef = domain.geo_reference
62        self.number_of_global_triangles = domain.number_of_triangles
63        self.number_of_global_nodes = domain.number_of_nodes
64        self.boundary_map = domain.boundary_map
65
66
67        # Subdivide the mesh
68        if verbose: print 'sequential_distribute: Subdivide mesh'
69
70        new_nodes, new_triangles, new_boundary, triangles_per_proc, quantities, \
71               s2p_map, p2s_map = \
72               pmesh_divide_metis_with_map(domain, numprocs)
73
74
75        # Build the mesh that should be assigned to each processor,
76        # this includes ghost nodes and the communication pattern
77        if verbose: print 'sequential_distribute: Build submeshes'
78        if verbose: print 'sequential_distribute: parameters = ',parameters
79
80        submesh = build_submesh(new_nodes, new_triangles, new_boundary, \
81                                quantities, triangles_per_proc, parameters=parameters)
82
83        if verbose:
84            for p in range(numprocs):
85                N = len(submesh['ghost_nodes'][p])
86                M = len(submesh['ghost_triangles'][p])
87                print 'There are %d ghost nodes and %d ghost triangles on proc %d'\
88                      %(N, M, p)
89
90
91        self.submesh = submesh
92        self.triangles_per_proc = triangles_per_proc
93        self.p2s_map =  p2s_map
94
95
96    def extract_submesh(self, p=0):
97        """Build the local mesh for processor p
98        """
99
100        submesh = self.submesh
101        triangles_per_proc = self.triangles_per_proc
102        p2s_map = self.p2s_map
103        verbose = self.verbose
104        debug = self.debug
105
106        assert p>=0
107        assert p<self.numprocs
108       
109       
110        points, vertices, boundary, quantities, \
111            ghost_recv_dict, full_send_dict, \
112            tri_map, node_map, tri_l2g, node_l2g, ghost_layer_width =\
113              extract_submesh(submesh, triangles_per_proc, p2s_map, p)
114             
115
116        number_of_full_nodes = len(submesh['full_nodes'][p])
117        number_of_full_triangles = len(submesh['full_triangles'][p])
118
119
120        if debug:
121            import pprint
122            print  50*"="
123            print 'NODE_L2G'
124            pprint.pprint(node_l2g)
125       
126            pprint.pprint(node_l2g[vertices[:,0]])
127       
128            print 'VERTICES'
129            pprint.pprint(vertices[:,0])
130            pprint.pprint(new_triangles[tri_l2g,0])
131       
132            assert num.allclose(node_l2g[vertices[:,0]], new_triangles[tri_l2g,0])       
133            assert num.allclose(node_l2g[vertices[:,1]], new_triangles[tri_l2g,1]) 
134            assert num.allclose(node_l2g[vertices[:,2]], new_triangles[tri_l2g,2]) 
135       
136
137            print 'POINTS'
138            pprint.pprint(points)
139       
140            assert num.allclose(points[:,0], new_nodes[node_l2g,0])
141            assert num.allclose(points[:,1], new_nodes[node_l2g,1])
142
143
144            print 'TRI'
145            pprint.pprint(tri_l2g)
146            pprint.pprint(p2s_map[tri_l2g])
147       
148
149            assert num.allclose(original_triangles[tri_l2orig,0],node_l2g[vertices[:,0]])
150            assert num.allclose(original_triangles[tri_l2orig,1],node_l2g[vertices[:,1]])
151            assert num.allclose(original_triangles[tri_l2orig,2],node_l2g[vertices[:,2]])
152
153            print 'NODES'
154            pprint.pprint(node_map)
155            pprint.pprint(node_l2g)     
156       
157        #tri_l2orig = p2s_map[tri_l2g]       
158       
159        s2p_map = None
160        p2s_map = None
161
162        #------------------------------------------------------------------------
163        # Build the parallel domain for this processor using partion structures
164        #------------------------------------------------------------------------
165
166        if verbose:
167            print 'sequential_distribute: P%g, no_full_nodes = %g, no_full_triangles = %g' % (p, number_of_full_nodes, number_of_full_triangles)
168
169
170        kwargs = {'full_send_dict': full_send_dict,
171                'ghost_recv_dict': ghost_recv_dict,
172                'number_of_full_nodes': number_of_full_nodes,
173                'number_of_full_triangles': number_of_full_triangles,
174                'geo_reference': self.domain_georef,
175                'number_of_global_triangles':  self.number_of_global_triangles,
176                'number_of_global_nodes':  self.number_of_global_nodes,
177                'processor':  p,
178                'numproc':  self.numprocs,
179                's2p_map':  s2p_map,
180                'p2s_map':  p2s_map, ## jj added this
181                'tri_l2g':  tri_l2g, ## SR added this
182                'node_l2g':  node_l2g,
183                'ghost_layer_width':  ghost_layer_width}
184
185
186        boundary_map = self.boundary_map
187        domain_name = self.domain_name
188        domain_dir = self.domain_dir
189        domain_store = self.domain_store
190        domain_store_centroids = self.domain_store_centroids
191        domain_minimum_storable_height = self.domain_minimum_storable_height
192        domain_minimum_allowed_height = self.domain_minimum_allowed_height
193        domain_flow_algorithm = self.domain_flow_algorithm
194        domain_georef = self.domain_georef
195           
196        tostore = (kwargs, points, vertices, boundary, quantities, \
197                   boundary_map, \
198                   domain_name, domain_dir, domain_store, domain_store_centroids, \
199                   domain_minimum_storable_height, \
200                   domain_minimum_allowed_height, domain_flow_algorithm, \
201                   domain_georef)
202
203
204        return tostore
205
206
207
208                       
209
210   
211def sequential_distribute_dump(domain, numprocs=1, verbose=False, partition_dir='.', debug=False, parameters = None):
212    """ Distribute the domain, create parallel domain and pickle result
213    """
214
215    from os.path import join
216   
217    partition = Sequential_distribute(domain, verbose, debug, parameters)
218
219    partition.distribute(numprocs)
220
221    # Make sure the partition_dir exists
222    if partition_dir == '.' :
223        pass
224    else:
225        import os
226        import errno
227        try:
228            os.makedirs(partition_dir)
229        except OSError as exception:
230            if exception.errno != errno.EEXIST:
231                raise
232
233   
234    for p in range(0, numprocs):
235
236        tostore = partition.extract_submesh(p) 
237
238        import cPickle
239        pickle_name = partition.domain_name + '_P%g_%g.pickle'% (numprocs,p)
240        pickle_name = join(partition_dir,pickle_name)
241        f = file(pickle_name, 'wb')
242        cPickle.dump( tostore, f, protocol=cPickle.HIGHEST_PROTOCOL)
243
244    return
245
246
247def sequential_distribute_load(filename = 'domain', partition_dir = '.', verbose = False):
248
249
250    from anuga_parallel import myid, numprocs
251
252    from os.path import join
253
254    pickle_name = filename+'_P%g_%g.pickle'% (numprocs,myid)
255    pickle_name = join(partition_dir,pickle_name) 
256   
257    return sequential_distribute_load_pickle_file(pickle_name, numprocs, verbose = verbose)
258
259
260def sequential_distribute_load_pickle_file(pickle_name, np=1, verbose = False):
261    """
262    Open pickle files
263    """
264   
265    import cPickle   
266    f = file(pickle_name, 'rb')
267
268    kwargs, points, vertices, boundary, quantities, boundary_map, \
269                   domain_name, domain_dir, domain_store, domain_store_centroids, \
270                   domain_minimum_storable_height, domain_minimum_allowed_height, \
271                   domain_flow_algorithm, georef = cPickle.load(f)
272    f.close()
273
274    #---------------------------------------------------------------------------
275    # Create domain (parallel if np>1)
276    #---------------------------------------------------------------------------
277    if np>1:
278        domain = Parallel_domain(points, vertices, boundary, **kwargs)
279    else:
280        domain = Domain(points, vertices, boundary, **kwargs)
281
282    #------------------------------------------------------------------------
283    # Copy in quantity data
284    #------------------------------------------------------------------------
285    for q in quantities:
286        domain.set_quantity(q, quantities[q])
287
288
289    #------------------------------------------------------------------------
290    # Transfer boundary conditions to each subdomain
291    #------------------------------------------------------------------------
292    boundary_map['ghost'] = None  # Add binding to ghost boundary
293    domain.set_boundary(boundary_map)
294
295
296    #------------------------------------------------------------------------
297    # Transfer other attributes to each subdomain
298    #------------------------------------------------------------------------
299    domain.set_name(domain_name)
300    domain.set_datadir(domain_dir)
301    domain.set_flow_algorithm(domain_flow_algorithm)
302    domain.set_store(domain_store)
303    domain.set_store_centroids(domain_store_centroids)
304    domain.set_minimum_storable_height(domain_minimum_storable_height)
305    domain.set_minimum_allowed_height(domain_minimum_allowed_height)
306    domain.geo_reference = georef
307
308
309    return domain
Note: See TracBrowser for help on using the repository browser.