- Timestamp:
- Jun 17, 2014, 9:46:06 PM (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/anuga_core/source/anuga_parallel/sequential_distribute.py
r8680 r9174 18 18 19 19 20 def sequential_distribute_dump(domain, numprocs=1, verbose=False, debug=False, parameters = None): 21 """ Distribute the domain, create parallel domain and pickle result 22 """ 23 24 25 if debug: 26 verbose = True 27 28 29 30 # FIXME: Dummy assignment (until boundaries are refactored to 31 # be independent of domains until they are applied) 32 bdmap = {} 33 for tag in domain.get_boundary_tags(): 34 bdmap[tag] = None 35 36 domain.set_boundary(bdmap) 37 38 39 domain_name = domain.get_name() 40 domain_dir = domain.get_datadir() 41 domain_store = domain.get_store() 42 domain_minimum_storable_height = domain.minimum_storable_height 43 domain_flow_algorithm = domain.get_flow_algorithm() 44 domain_minimum_allowed_height = domain.get_minimum_allowed_height() 45 georef = domain.geo_reference 46 number_of_global_triangles = domain.number_of_triangles 47 number_of_global_nodes = domain.number_of_nodes 48 boundary_map = domain.boundary_map 49 50 51 #sequential_distribute_mesh(domain, numprocs, verbose=verbose, debug=debug, parameters=parameters) 52 53 54 # Subdivide the mesh 55 if verbose: print 'sequential_distribute: Subdivide mesh' 56 new_nodes, new_triangles, new_boundary, triangles_per_proc, quantities, \ 57 s2p_map, p2s_map = \ 58 pmesh_divide_metis_with_map(domain, numprocs) 59 60 #PETE: s2p_map (maps serial domain triangles to parallel domain triangles) 61 # sp2_map (maps parallel domain triangles to domain triangles) 62 63 64 65 # Build the mesh that should be assigned to each processor, 66 # this includes ghost nodes and the communication pattern 67 if verbose: print 'sequential_distribute: Build submeshes' 68 submesh = build_submesh(new_nodes, new_triangles, new_boundary, quantities, triangles_per_proc, parameters) 69 70 if debug: 71 for p in range(numprocs): 72 N = len(submesh['ghost_nodes'][p]) 73 M = len(submesh['ghost_triangles'][p]) 74 print 'There are %d ghost nodes and %d ghost triangles on proc %d'\ 75 %(N, M, p) 76 77 #if debug: 78 # from pprint import pprint 79 # pprint(submesh) 80 81 82 # extract data to create parallel domain 83 if verbose: print 'sequential_distribute: Distribute submeshes' 84 for p in range(0, numprocs): 85 86 # Build the local mesh for processor 0 20 class Sequential_distribute(object): 21 22 def __init__(self, domain, verbose=False, debug=False, parameters=None): 23 24 if debug: 25 verbose = True 26 27 self.domain = domain 28 self.verbose = verbose 29 self.debug = debug 30 self.parameters = parameters 31 32 33 def distribute(self, numprocs=1): 34 35 self.numprocs = numprocs 36 37 domain = self.domain 38 verbose = self.verbose 39 debug = self.debug 40 parameters = self.parameters 41 42 # FIXME: Dummy assignment (until boundaries are refactored to 43 # be independent of domains until they are applied) 44 bdmap = {} 45 for tag in domain.get_boundary_tags(): 46 bdmap[tag] = None 47 48 domain.set_boundary(bdmap) 49 50 51 self.domain_name = domain.get_name() 52 self.domain_dir = domain.get_datadir() 53 self.domain_store = domain.get_store() 54 self.domain_store_centroids = domain.get_store_centroids() 55 self.domain_minimum_storable_height = domain.minimum_storable_height 56 self.domain_flow_algorithm = domain.get_flow_algorithm() 57 self.domain_minimum_allowed_height = domain.get_minimum_allowed_height() 58 self.domain_georef = domain.geo_reference 59 self.number_of_global_triangles = domain.number_of_triangles 60 self.number_of_global_nodes = domain.number_of_nodes 61 self.boundary_map = domain.boundary_map 62 63 64 # Subdivide the mesh 65 if verbose: print 'sequential_distribute: Subdivide mesh' 66 67 new_nodes, new_triangles, new_boundary, triangles_per_proc, quantities, \ 68 s2p_map, p2s_map = \ 69 pmesh_divide_metis_with_map(domain, numprocs) 70 71 72 # Build the mesh that should be assigned to each processor, 73 # this includes ghost nodes and the communication pattern 74 if verbose: print 'sequential_distribute: Build submeshes' 75 if verbose: print parameters 76 77 submesh = build_submesh(new_nodes, new_triangles, new_boundary, \ 78 quantities, triangles_per_proc, parameters=parameters) 79 80 if verbose: 81 for p in range(numprocs): 82 N = len(submesh['ghost_nodes'][p]) 83 M = len(submesh['ghost_triangles'][p]) 84 print 'There are %d ghost nodes and %d ghost triangles on proc %d'\ 85 %(N, M, p) 86 87 88 self.submesh = submesh 89 self.triangles_per_proc = triangles_per_proc 90 self.p2s_map = p2s_map 91 92 93 def extract_submesh(self, p=0): 94 """Build the local mesh for processor p 95 """ 96 97 submesh = self.submesh 98 triangles_per_proc = self.triangles_per_proc 99 p2s_map = self.p2s_map 100 verbose = self.verbose 101 debug = self.debug 102 103 assert p>=0 104 assert p<self.numprocs 105 106 87 107 points, vertices, boundary, quantities, \ 88 ghost_recv_dict, full_send_dict, tri_map, node_map, ghost_layer_width =\ 89 extract_submesh(submesh, triangles_per_proc, p) 90 91 92 # from pprint import pprint 93 # print '='*80 94 # print p 95 # print '='*80 96 # pprint(tri_map) 97 # print len(tri_map) 98 99 # Keep track of the number full nodes and triangles. 100 # This is useful later if one needs access to a ghost-free domain 101 # Here, we do it for process 0. The others are done in rec_submesh. 108 ghost_recv_dict, full_send_dict, \ 109 tri_map, node_map, tri_l2g, node_l2g, ghost_layer_width =\ 110 extract_submesh(submesh, triangles_per_proc, p2s_map, p) 111 112 102 113 number_of_full_nodes = len(submesh['full_nodes'][p]) 103 114 number_of_full_triangles = len(submesh['full_triangles'][p]) 104 115 105 # Extract l2g maps 106 tri_l2g = extract_l2g_map(tri_map) 107 node_l2g = extract_l2g_map(node_map) 108 109 116 117 if debug: 118 import pprint 119 print 50*"=" 120 print 'NODE_L2G' 121 pprint.pprint(node_l2g) 122 123 pprint.pprint(node_l2g[vertices[:,0]]) 124 125 print 'VERTICES' 126 pprint.pprint(vertices[:,0]) 127 pprint.pprint(new_triangles[tri_l2g,0]) 128 129 assert num.allclose(node_l2g[vertices[:,0]], new_triangles[tri_l2g,0]) 130 assert num.allclose(node_l2g[vertices[:,1]], new_triangles[tri_l2g,1]) 131 assert num.allclose(node_l2g[vertices[:,2]], new_triangles[tri_l2g,2]) 132 133 134 print 'POINTS' 135 pprint.pprint(points) 136 137 assert num.allclose(points[:,0], new_nodes[node_l2g,0]) 138 assert num.allclose(points[:,1], new_nodes[node_l2g,1]) 139 140 141 print 'TRI' 142 pprint.pprint(tri_l2g) 143 pprint.pprint(p2s_map[tri_l2g]) 144 145 146 assert num.allclose(original_triangles[tri_l2orig,0],node_l2g[vertices[:,0]]) 147 assert num.allclose(original_triangles[tri_l2orig,1],node_l2g[vertices[:,1]]) 148 assert num.allclose(original_triangles[tri_l2orig,2],node_l2g[vertices[:,2]]) 149 150 print 'NODES' 151 pprint.pprint(node_map) 152 pprint.pprint(node_l2g) 153 154 #tri_l2orig = p2s_map[tri_l2g] 155 110 156 s2p_map = None 111 157 p2s_map = None … … 118 164 print 'sequential_distribute: P%g, no_full_nodes = %g, no_full_triangles = %g' % (p, number_of_full_nodes, number_of_full_triangles) 119 165 120 121 #args = [points, vertices, boundary]122 166 123 167 kwargs = {'full_send_dict': full_send_dict, … … 125 169 'number_of_full_nodes': number_of_full_nodes, 126 170 'number_of_full_triangles': number_of_full_triangles, 127 'geo_reference': georef,128 'number_of_global_triangles': number_of_global_triangles,129 'number_of_global_nodes': number_of_global_nodes,171 'geo_reference': self.domain_georef, 172 'number_of_global_triangles': self.number_of_global_triangles, 173 'number_of_global_nodes': self.number_of_global_nodes, 130 174 'processor': p, 131 'numproc': numprocs,175 'numproc': self.numprocs, 132 176 's2p_map': s2p_map, 133 177 'p2s_map': p2s_map, ## jj added this … … 136 180 'ghost_layer_width': ghost_layer_width} 137 181 138 #----------------------------------------------------------------------- 139 # Now let's store the data for a parallel_domain via cPickle 140 #----------------------------------------------------------------------- 141 142 #Looks like we reduce storage by a factor of 4 by just 143 # storing the data to create the parallel_domain instead of pickling 144 # a created domain 182 183 boundary_map = self.boundary_map 184 domain_name = self.domain_name 185 domain_dir = self.domain_dir 186 domain_store = self.domain_store 187 domain_store_centroids = self.domain_store_centroids 188 domain_minimum_storable_height = self.domain_minimum_storable_height 189 domain_minimum_allowed_height = self.domain_minimum_allowed_height 190 domain_flow_algorithm = self.domain_flow_algorithm 191 domain_georef = self.domain_georef 192 193 tostore = (kwargs, points, vertices, boundary, quantities, \ 194 boundary_map, \ 195 domain_name, domain_dir, domain_store, domain_store_centroids, \ 196 domain_minimum_storable_height, \ 197 domain_minimum_allowed_height, domain_flow_algorithm, \ 198 domain_georef) 199 200 201 return tostore 202 203 204 205 206 207 208 def sequential_distribute_dump(domain, numprocs=1, verbose=False, debug=False, parameters = None): 209 """ Distribute the domain, create parallel domain and pickle result 210 """ 211 212 213 partition = Sequential_distribute(domain, verbose, debug, parameters) 214 215 partition.distribute(numprocs) 216 217 218 for p in range(0, numprocs): 219 220 tostore = partition.extract_submesh(p) 221 145 222 import cPickle 146 pickle_name = domain_name + '_P%g_%g.pickle'% (numprocs,p)223 pickle_name = partition.domain_name + '_P%g_%g.pickle'% (numprocs,p) 147 224 f = file(pickle_name, 'wb') 148 tostore = (kwargs, points, vertices, boundary, quantities, boundary_map, domain_name, domain_dir, domain_store, domain_minimum_storable_height, \149 domain_minimum_allowed_height, domain_flow_algorithm, georef)150 225 cPickle.dump( tostore, f, protocol=cPickle.HIGHEST_PROTOCOL) 151 226 … … 165 240 pickle_name = filename+'_P%g_%g.pickle'% (numprocs,myid) 166 241 f = file(pickle_name, 'rb') 167 kwargs, points, vertices, boundary, quantities, boundary_map, domain_name, domain_dir, domain_store, domain_minimum_storable_height, \ 168 domain_minimum_allowed_height, domain_flow_algorithm, georef = cPickle.load(f) 242 243 kwargs, points, vertices, boundary, quantities, boundary_map, \ 244 domain_name, domain_dir, domain_store, domain_store_centroids, \ 245 domain_minimum_storable_height, domain_minimum_allowed_height, \ 246 domain_flow_algorithm, georef = cPickle.load(f) 169 247 f.close() 170 248 … … 194 272 parallel_domain.set_name(domain_name) 195 273 parallel_domain.set_datadir(domain_dir) 274 parallel_domain.set_flow_algorithm(domain_flow_algorithm) 196 275 parallel_domain.set_store(domain_store) 276 parallel_domain.set_store_centroids(domain_store_centroids) 197 277 parallel_domain.set_minimum_storable_height(domain_minimum_storable_height) 198 278 parallel_domain.set_minimum_allowed_height(domain_minimum_allowed_height) 199 parallel_domain.set_flow_algorithm(domain_flow_algorithm)200 279 parallel_domain.geo_reference = georef 201 280 … … 203 282 return parallel_domain 204 283 205 def extract_l2g_map(map): 206 # Extract l2g data from corresponding map 207 # Maps 208 209 import numpy as num 210 211 b = num.arange(len(map)) 212 213 l_ids = num.extract(map>-1,map) 214 g_ids = num.extract(map>-1,b) 215 216 217 # print len(g_ids) 218 # print len(l_ids) 219 # print l_ids 220 # print g_ids 221 222 l2g = num.zeros_like(g_ids) 223 l2g[l_ids] = g_ids 224 225 return l2g 226 227 228 229 230 284 285 286 287 288 289
Note: See TracChangeset
for help on using the changeset viewer.