import numpy as np from .helper import position_from_rank, rank_from_position class PPP: """Parallel Python Postprocessor for suspend""" def __init__(self,comm,func_load, num_ghost,chunks_per_proc,origin,spacing,periodicity,bounds, proc_grid,nxp,nyp,nzp, proc_grid_ext,nxp_ext,nyp_ext,nzp_ext, nghbr,field,symmetries): '''Class constructor: it is expected that all communcation is done in the corresponding class methods, such that every rank already holds the required information.''' # Initialize object with passed arguments self.comm = comm self.func_load = func_load self.num_ghost = num_ghost self.chunks_per_proc = chunks_per_proc self.origin = origin self.spacing = spacing self.periodicity = periodicity self.bounds = bounds self.proc_grid = proc_grid self.nxp,self.nyp,self.nzp = nxp,nyp,nzp self.proc_grid_ext = proc_grid_ext self.nxp_ext,self.nyp_ext,self.nzp_ext = nxp_ext,nyp_ext,nzp_ext self.nghbr = nghbr self.field = field self.symmetries = symmetries # Initialize some deduced variables self.rank = comm.Get_rank() self.nproc = nxp*nyp*nzp self.ip,self.jp,self.kp = self.position_from_rank(self.rank,external=False) self.nghx,self.nghy,self.nghz = num_ghost self.xperiodic,self.yperiodic,self.zperiodic = periodicity return @classmethod def from_snapshot(cls,snap,chunks_per_proc=(1,1,1),num_ghost=(1,1,1)): from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() # Let rank 0 read the processor grid as well as some info and then broadcast it if rank==0: proc = snap.procgrid() origin = snap.origin() spacing = snap.spacing() periodicity = snap.periodicity() bounds = snap.bounds() else: proc,origin,spacing,periodicity,bounds = 5*[None] proc = comm.bcast(proc ,root=0) origin = comm.bcast(origin ,root=0) spacing = comm.bcast(spacing ,root=0) periodicity = comm.bcast(periodicity,root=0) bounds = comm.bcast(bounds ,root=0) # Make sure all processors got the same arguments chunks_per_proc = comm.bcast(chunks_per_proc,root=0) num_ghost = comm.bcast(num_ghost,root=0) # Set data loading function func_load = snap.field_chunk # Construct new processor grid (proc_grid,nxp,nyp,nzp, proc_grid_ext,nxp_ext,nyp_ext,nzp_ext, nghbr) = PPP._construct_procgrid(comm,proc,chunks_per_proc,num_ghost,periodicity) # Initially, no field is loaded field = {} symmetries = {} return cls(comm,func_load, num_ghost,chunks_per_proc,origin,spacing,periodicity,bounds, proc_grid,nxp,nyp,nzp, proc_grid_ext,nxp_ext,nyp_ext,nzp_ext, nghbr,field,symmetries) @classmethod def from_state(cls,file,parallel=True,io_limit=None): import h5py from mpi4py import MPI from .field import Field3d comm = MPI.COMM_WORLD rank = comm.Get_rank() # Only use parallel IO if flag is set and h5py has MPIIO support parallel = (h5py.h5fd.MPIO>=0) and parallel if parallel: f = h5py.File(file,'r',driver='mpio',comm=comm) else: sb = SequentialBlock(io_limit,comm=comm,per_node=True) f = h5py.File(file,'r') # Read attributes which are the same for all ranks g = f['PPP'] field_names = tuple(x.decode('ascii') for x in g['field_names']) field_loaded = tuple(x.decode('ascii') for x in g['field_loaded']) origin = {} proc_grid = {} for key in field_names: origin[key] = tuple(g[key]['origin']) proc_grid[key] = [] proc_grid[key].append(tuple(g[key]['ibeg'])) proc_grid[key].append(tuple(g[key]['iend'])) proc_grid[key].append(tuple(g[key]['jbeg'])) proc_grid[key].append(tuple(g[key]['jend'])) proc_grid[key].append(tuple(g[key]['kbeg'])) proc_grid[key].append(tuple(g[key]['kend'])) num_ghost = tuple(g['num_ghost']) chunks_per_proc = tuple(g['chunks_per_proc']) spacing = tuple(g['spacing']) periodicity = tuple(g['periodicity']) bounds = tuple(g['bounds']) nxp = int(g['nxp'][:]) nyp = int(g['nyp'][:]) nzp = int(g['nzp'][:]) # Independent read grp_rank = '{:05d}'.format(rank) g = f[grp_rank] nghbr = g['nghbr'][:] field = {} symmetries = {} for key in field_loaded: field[key] = Field3d.from_file(g,name=key) symmetries[key] = g[key]['symmetries'][:] f.close() assert nxp*nyp*nzp==comm.Get_size(), "The loaded state requires {} processors, but "\ "we are currently running with {}.".format(nxp*nyp*nzp,comm.Get_size()) if not parallel: sb.proceed() func_load = None proc_grid_ext = None nxp_ext,nyp_ext,nzp_ext = 3*[None] return cls(comm,func_load, num_ghost,chunks_per_proc,origin,spacing,periodicity,bounds, proc_grid,nxp,nyp,nzp, proc_grid_ext,nxp_ext,nyp_ext,nzp_ext, nghbr,field,symmetries) def load_field(self,key,io_limit=None,dtype=np.float64,verbose=True): '''Loads the required chunks from file''' from .field import Field3d import numpy as np # Verbose output if verbose and self.rank==0: print('[load_field] key={}, io_limit={}'.format(key,io_limit)) # Block execution of some processors if IO is limited sb = SequentialBlock(io_limit,comm=self.comm,per_node=True) # Determine which chunks are to be loaded by the current processor ip_beg_ext = self.ip*self.chunks_per_proc[0] jp_beg_ext = self.jp*self.chunks_per_proc[1] kp_beg_ext = self.kp*self.chunks_per_proc[2] ip_end_ext = ip_beg_ext+self.chunks_per_proc[0]-1 jp_end_ext = jp_beg_ext+self.chunks_per_proc[1]-1 kp_end_ext = kp_beg_ext+self.chunks_per_proc[2]-1 # Get the total size of the field to be loaded key_grid = self._key_grid(key) ib = self.proc_grid_ext[key_grid][0][ip_beg_ext] ie = self.proc_grid_ext[key_grid][1][ip_end_ext] jb = self.proc_grid_ext[key_grid][2][jp_beg_ext] je = self.proc_grid_ext[key_grid][3][jp_end_ext] kb = self.proc_grid_ext[key_grid][4][kp_beg_ext] ke = self.proc_grid_ext[key_grid][5][kp_end_ext] nxl = ie-ib+1 nyl = je-jb+1 nzl = ke-kb+1 # Allocate an array to hold the entire field data = np.empty( (nxl+2*self.nghx, nyl+2*self.nghy, nzl+2*self.nghz),dtype=dtype) # Compute origin of subfield origin = (self.origin[key_grid][0]+(ib-1-self.nghx)*self.spacing[0], self.origin[key_grid][1]+(jb-1-self.nghy)*self.spacing[1], self.origin[key_grid][2]+(kb-1-self.nghz)*self.spacing[2]) # Create a Field3d self.field[key] = Field3d(data,origin,self.spacing) # Go through each chunk to be read and construct the field for ip_ext in range(ip_beg_ext,ip_end_ext+1): for jp_ext in range(jp_beg_ext,jp_end_ext+1): for kp_ext in range(kp_beg_ext,kp_end_ext+1): # Determine rank of the chunk to be read rank_ext = self.rank_from_position(ip_ext,jp_ext,kp_ext,external=True) # Compute bounds of this chunk ib_ext = self.proc_grid_ext[key_grid][0][ip_ext] ie_ext = self.proc_grid_ext[key_grid][1][ip_ext] jb_ext = self.proc_grid_ext[key_grid][2][jp_ext] je_ext = self.proc_grid_ext[key_grid][3][jp_ext] kb_ext = self.proc_grid_ext[key_grid][4][kp_ext] ke_ext = self.proc_grid_ext[key_grid][5][kp_ext] nxl_ext = ie_ext-ib_ext+1 nyl_ext = je_ext-jb_ext+1 nzl_ext = ke_ext-kb_ext+1 # Read data and insert it subfield = self.func_load(rank_ext,key) self.field[key].insert_subfield(subfield) # Continue execution of waiting processors if IO was limited sb.proceed() if verbose: print('[load_field] rank {} done with I/O.'.format(self.rank)) # Exchange ghost cells self.exchange_ghost_cells(key) # Initialize symmetries and impose BC self.init_symmetry(key) self.impose_boundary_conditions(key) def differentiate(self,key,axis,key_out=None,on_pressure_grid=False): assert axis<3, "'axis' must be one of 0,1,2." if key_out is None: key_out = 'd'+key+('dx','dy','dz')[axis] key_grid = self._key_grid(key) origin = list(self.origin[key_grid]) shifting_state = self.shifting_state(key,axis=axis) if shifting_state==-1: shift_origin = 'after' origin[axis] += 0.5*self.spacing[axis] elif shifting_state==0: shift_origin = 'before' origin[axis] -= 0.5*self.spacing[axis] elif shifting_state==1: shift_origin = 'before' origin[axis] -= 0.5*self.spacing[axis] else: raise ValueError("Invalid shifting state.") self.copy(key,key_out,skip_data=True) self.field[key_out] = self.field[key].derivative(axis,only_keep_interior=False,shift_origin=shift_origin) self.origin[key_out] = tuple(origin) if axis==0: self.symmetries[key_out][0,1,1] = -self.symmetries[key_out][0,1,1] self.symmetries[key_out][2,1,1] = -self.symmetries[key_out][2,1,1] elif axis==1: self.symmetries[key_out][1,0,1] = -self.symmetries[key_out][1,0,1] self.symmetries[key_out][1,2,1] = -self.symmetries[key_out][1,2,1] elif axis==2: self.symmetries[key_out][1,1,0] = -self.symmetries[key_out][1,1,0] self.symmetries[key_out][1,1,2] = -self.symmetries[key_out][1,1,2] # Exchange ghost cells and set boundary conditions self.exchange_ghost_cells(key_out) self.impose_boundary_conditions(key_out) # Shift to pressure grid if desired if on_pressure_grid: self.shift_to_pressure_grid(key_out,key_out=key_out) def allocate(self,key_grid,fill=None,pseudo=False,dtype=np.float64): from .field import Field3d ib = self.proc_grid[key_grid][0][self.ip] ie = self.proc_grid[key_grid][1][self.ip] jb = self.proc_grid[key_grid][2][self.jp] je = self.proc_grid[key_grid][3][self.jp] kb = self.proc_grid[key_grid][4][self.kp] ke = self.proc_grid[key_grid][5][self.kp] origin = (self.origin[key_grid][0]+(ib-1-self.nghx)*self.spacing[0], self.origin[key_grid][1]+(jb-1-self.nghy)*self.spacing[1], self.origin[key_grid][2]+(kb-1-self.nghz)*self.spacing[2]) dim = (ie-ib+1+2*self.nghx, je-jb+1+2*self.nghy, ke-kb+1+2*self.nghz) if pseudo: return Field3d.pseudo_field(dim,origin,self.spacing) else: return Field3d.allocate(dim,origin,self.spacing,fill=fill,dtype=dtype) def qcriterion(self,key_out=None,from_pressure=False,keep_derivatives=False): '''Computes Q-criterion''' from .field import Field3d if key_out is None: key_out = 'Q' if from_pressure: assert 'p' in self.field, "Pressure field is not loaded." self.copy('p',key_out,skip_data=True) self.field[key_out] = 0.5*self.field['p'].laplacian(only_keep_interior=False) else: assert 'u' in self.field, "Velocity field is not loaded: u" assert 'v' in self.field, "Velocity field is not loaded: v" assert 'w' in self.field, "Velocity field is not loaded: w" self.copy('p',key_out,skip_data=True,skip_symmetries=True) self.init_symmetry(key_out) self.field[key_out] = self.allocate('p',fill=0.0,dtype=self.field['u'].dtype()) keyx = ('x','y','z') keyu = ('u','v','w') for ii in range(3): key_tmp = 'd'+keyu[ii]+'d'+keyx[ii] parprint(key_tmp) self.differentiate(keyu[ii],axis=ii,key_out=key_tmp,on_pressure_grid=True) self.field[key_out] += self.field[key_tmp]**2 if not keep_derivatives: self.delete(key_tmp) # iip1 = (ii+1)%3 key_tmp1 = 'd'+keyu[ii]+'d'+keyx[iip1] key_tmp2 = 'd'+keyu[iip1]+'d'+keyx[ii] parprint(key_tmp1,key_tmp2) self.differentiate(keyu[ii],axis=iip1,key_out=key_tmp1,on_pressure_grid=True) self.differentiate(keyu[iip1],axis=ii,key_out=key_tmp2,on_pressure_grid=True) self.field[key_out] += 2.0*self.field[key_tmp1]*self.field[key_tmp2] if not keep_derivatives: self.delete(key_tmp1) self.delete(key_tmp2) self.field[key_out] *= -0.5 # Exchange ghost cells and set boundary conditions self.exchange_ghost_cells(key_out) self.impose_boundary_conditions(key_out) return #def vorticity(self,axis=None,key_out=None,keep_derivatives=False,on_pressure_grid=True): def arithmetic_operation(self,key1,operation,key2,key_out=None,on_pressure_grid=True): import operator if operation in ('add','+'): op = operator.iadd elif operation in ('subtract','sub','-'): op = operator.isub elif operation in ('divide','div','/'): op = operator.itruediv elif operation in ('multiply','mul','*'): op = operator.imul if key_out is None: key_out = key1 if not self.field[key1].has_same_grid(self.field[key2]) or on_pressure_grid: self.shift_to_pressure_grid(key1,key_out) self.shift_to_pressure_grid(key2,'tmp') op(self.field[key_out],self.field['tmp']) self.delete('tmp') else: self.copy(key1,key_out) op(self.field[key_out],self.field[key2]) return def add(self,key1,key2,key_out=None,on_pressure_grid=True): self.arithmetic_operation(key1,'+',key2,key_out=key_out,on_pressure_grid=on_pressure_grid) return def subtract(self,key1,key2,key_out=None,on_pressure_grid=True): self.arithmetic_operation(key1,'-',key2,key_out=key_out,on_pressure_grid=on_pressure_grid) return def multiply(self,key1,key2,key_out=None,on_pressure_grid=True): self.arithmetic_operation(key1,'*',key2,key_out=key_out,on_pressure_grid=on_pressure_grid) return def divide(self,key1,key2,key_out=None,on_pressure_grid=True): self.arithmetic_operation(key1,'/',key2,key_out=key_out,on_pressure_grid=on_pressure_grid) return def gaussian_filter(self,key,sigma,truncate=4.0,key_out=None,iterate=False,verbose=True): '''Applies a gaussian filter to a field as in-place operation. Sigma is the std of the filter in terms of grid width.''' import numpy as np from mpi4py import MPI if key_out is None: key_out = key if key!=key_out: self.copy(key,key_out,skip_data=True) # Compute radius of Gaussian filter radius = self.field[key].gaussian_filter_radius(sigma,truncate=truncate) if verbose and self.rank==0: print('[gaussian_filter] key={}, stencil radius={}'.format(key,radius)) if not iterate: # Assert that we have sufficient amount of ghost cells assert all([self.num_ghost[ii]>=radius[ii] for ii in range(3)]),\ "Too few ghost cells for stencil: {}, {}".format(self.num_ghost,radius) niter = 1 else: # Determine number of iterations required for current ghost cell setup sigma = np.array(sigma) niter = 1 while not all([self.num_ghost[ii]>=radius[ii] for ii in range(3)]): sigma /= np.sqrt(2) niter *= 2 radius = self.field[key].gaussian_filter_radius(sigma,truncate=truncate) assert all([radius[ii]>0 if sigma[ii]>0.0 else True for ii in range(3)]),\ "Iterative procedure leads to invalid stencil radius: "\ "increase number of ghost cells. {}".format(radius) if verbose and self.rank==0: print('[gaussian_filter] iterations={}'.format(niter)) for iiter in range(niter): if verbose and self.rank==0: print('[gaussian_filter] iter #{:d}: '.format(iiter),end='') tbeg = MPI.Wtime() # Filter field: if key_out is None, perform operation inplace self.field[key_out] = self.field[key].gaussian_filter(sigma, truncate=truncate,only_keep_interior=False) # Exchange ghost cells and set boundary conditions self.exchange_ghost_cells(key_out) self.impose_boundary_conditions(key_out) # Iterate inplace from now on key = key_out if verbose and self.rank==0: print('{:g} sec'.format(MPI.Wtime()-tbeg)) def broadcast(self,key,operation,arg,key_out=None): '''Broadcasts an operation involving a scalar or matrix on the entire grid. If 'arg' is a matrix, it must be three-dimensional and its axes must be singular or of length nx/ny/nz.''' import numpy as np import operator # Broadcast argument arg = self.comm.bcast(arg,root=0) # Select operator if operation in ('add','+'): op = operator.iadd elif operation in ('subtract','sub','-'): op = operator.isub elif operation in ('divide','div','/'): op = operator.itruediv elif operation in ('multiply','mul','*'): op = operator.imul elif operation in ('power','pow','^','**'): op = operator.ipow else: raise ValueError("Invalid operation: {}".format(operation)) key_grid = self._key_grid(key) if isinstance(arg,np.ndarray): sl_arg = 3*[slice(None)] for axis in range(3): if arg.shape[axis]==1: continue elif arg.shape[axis]==self.dim(key,axis=axis): pos = self.position_from_rank(self.rank,external=False)[axis] sl_arg[axis] = slice( self.proc_grid[key_grid][2*axis][pos]-1, self.proc_grid[key_grid][2*axis+1][pos]) else: raise ValueError("'arg' must either be singular or match global "\ "grid dimension. (axis={}: got {:d}, expected {:d}".format( axis,arg.shape[axis],self.dim(key,axis=axis))) # Only operate on interior and communcate ghosts later sl_int = tuple(slice(self.num_ghost[ii],-self.num_ghost[ii]) for ii in range(3)) sl_arg = tuple(sl_arg) if key_out is None: op(self.field[key].data[sl_int],arg[sl_arg]) else: self.copy(key,key_out) op(self.field[key_out].data[sl_int],arg[sl_arg]) # Exchange ghost cells and set boundary conditions self.exchange_ghost_cells(key) self.impose_boundary_conditions(key) elif isinstance(arg,(int,float)): if key_out is None: op(self.field[key].data,arg) else: self.copy(key,key_out) op(self.field[key_out].data,arg) return def integrate(self,key,integrate_axis,ufunc=None,ignore_nan=False,average=False): '''Computes a global integral or average along a given axis applying the function 'ufunc' to each node. The result is returned by rank 0, all other ranks return None.''' from mpi4py import MPI import numpy as np if integrate_axis is None: integrate_axis = tuple(self.periodicity) # Compute local integral, but get weights separately idx_origin = tuple(self.num_ghost[ii]-1 for ii in range(3)) (integ_local,weights_local) = self.field[key].extract_subfield( idx_origin,self.chunk_size(key,axis=None)).integral( integrate_axis,ufunc=ufunc,ignore_nan=ignore_nan, average=average,return_weights=True) # Make sure weights_local is a numpy array weights_local = np.array(weights_local) # Simple implementation: all processor communicate directly to root key_grid = self._key_grid(key) if self.rank==0: dim_final = tuple(1 if integrate_axis[axis] else self.dim(key,axis=axis) for axis in range(3)) # Allocate a nice spot of memory integ = np.zeros(dim_final,dtype=integ_local.dtype) if average: weights = np.zeros(dim_final,dtype=weights_local.dtype) # Receive from peers for rank_src in range(0,self.nproc): # Determine target array position pos = self.position_from_rank(rank_src,external=False) sl = 3*[slice(None)] for axis in range(3): if integrate_axis[axis]: sl[axis] = slice(0,1) else: sl[axis] = slice( self.proc_grid[key_grid][2*axis][pos[axis]]-1, self.proc_grid[key_grid][2*axis+1][pos[axis]]) sl = tuple(sl) # Receive data and put it in a good spot if rank_src==0: integ[sl] += integ_local if average: weights[sl] += weights_local else: integ[sl] += self.comm.recv(source=rank_src,tag=1) if average: weights[sl] += self.comm.recv(source=rank_src,tag=2) if average: return integ/weights else: return integ/weights_local else: self.comm.send(integ_local,dest=0,tag=1) if average: self.comm.send(weights_local,dest=0,tag=2) return None def shift_to_pressure_grid(self,key,key_out=None): from .field import Field3d if key_out is None: if key in ('u','v','w'): raise ValueError("In-place shift of {'u','v','w'} is not permitted.") key_out = key # If pressure is loaded, use field information, else create a # fake Field3d object so that we can use its methods. field_ref = self.allocate('p',pseudo=True) # Compute the relative shift and shift it rel_shift = self.field[key].relative_shift(field_ref) self.field[key_out] = self.field[key].shift_origin(rel_shift) # Match number of gridpoints sl = 3*[slice(None)] for axis in range(3): if not self.field[key_out].has_same_dim(field_ref,axis=axis): sl[axis] = slice(0,field_ref.dim(axis=axis)) sl = tuple(sl) self.field[key_out].data = self.field[key_out].data[sl] # Copy metadata of pressure grid self.copy('p',key_out,skip_data=True,skip_symmetries=True) self.symmetries[key_out] = self.symmetries[key].copy() # Exchange ghost cells and set boundary conditions self.exchange_ghost_cells(key_out) self.impose_boundary_conditions(key_out) def init_symmetry(self,key): '''Sets mirror symmetries for ghost cells behind the wall. Use impose_symmetries_noslip() afterwards to set appropriate no-slip boundary conditions.''' import numpy as np self.symmetries[key] = np.zeros((3,3,3),dtype='i') for axis in range(3): self.init_mirror_symmetry(key,axis,-1) self.init_mirror_symmetry(key,axis,+1) return def init_mirror_symmetry(self,key,axis,wall,invert=False): '''Applies symmetry: [u,v,w](x,-y,z) -> [u,-v,w](x,y,z) Proposed by Lozano-Duran JFM 2016 and used for free-slip.''' if self.periodicity[axis]: return if wall in (-1,'low'): iwall = 0 elif wall in (1,'high'): iwall = 2 else: raise ValueError('Invalid value for "wall". Valid values are -1 / "low" or 1 / "high".') keyu = ('u','v','w') # Get position in symmetries array to be edited sl = [1,1,1] sl[axis] = iwall sl = tuple(sl) if key==keyu[axis]: self.symmetries[key][sl] = -1 if not invert else +1 else: self.symmetries[key][sl] = +1 if not invert else -1 return def init_inversion_symmetry(self,key,axis,wall): '''Applies symmetry: [u,v,w](x,-y,z) -> [-u,v,-w](x,y,z)''' return self.init_mirror_symmetry(key,axis,wall,invert=True) def copy(self,key,key_out,skip_data=False,skip_symmetries=False): '''Copies a field to a new key.''' if key==key_out: return key_grid = self._key_grid(key) self.origin[key_out] = tuple(self.origin[key_grid]) self.proc_grid[key_out] = self.proc_grid[key_grid].copy() if not skip_symmetries: self.symmetries[key_out] = self.symmetries[key].copy() if not skip_data: self.field[key_out] = self.field[key].copy() return def delete(self,key): '''Removes a field from memory.''' assert key in self.field, "Field {} cannot be deleted, because it "\ "does not exist.".format(key) import gc del self.field[key] del self.symmetries[key] key_grid = self._key_grid(key) if key_grid not in ('u','v','w','p','s'): del self.origin[key_grid] del self.proc_grid[key_grid] gc.collect() return def save_state(self,file,parallel=False,verbose=True): import h5py from mpi4py import MPI if verbose and self.rank==0: print('[save_state] file={}'.format(file)) tbeg = MPI.Wtime() ascii_type = h5py.string_dtype('ascii',32) # Only use parallel IO if flag is set and h5py has MPIIO support parallel = (h5py.h5fd.MPIO>=0) and parallel if parallel: f = h5py.File(file,'w',driver='mpio',comm=self.comm) else: sb = SequentialBlock(1,comm=self.comm,per_node=False) f = h5py.File(file,'w') if self.rank==0 else h5py.File(file,'a') # Write attributes which are the same for all ranks if parallel or self.rank==0: g = f.create_group('/PPP') # Create a variable which stores all the field names d = g.create_dataset('field_names',len(self.origin),dtype=ascii_type) for ii,key in enumerate(self.origin): d[ii] = key d = g.create_dataset('field_loaded',len(self.field),dtype=ascii_type) for ii,key in enumerate(self.field): d[ii] = key # Store all global field dependent data for key in self.origin: g2 = g.create_group(key) d = g2.create_dataset('origin',3,dtype='f') if self.rank==0: d[:] = self.origin[key] d = g2.create_dataset('ibeg',self.nxp,dtype='i') if self.rank==0: d[:] = self.proc_grid[key][0] d = g2.create_dataset('iend',self.nxp,dtype='i') if self.rank==0: d[:] = self.proc_grid[key][1] d = g2.create_dataset('jbeg',self.nyp,dtype='i') if self.rank==0: d[:] = self.proc_grid[key][2] d = g2.create_dataset('jend',self.nyp,dtype='i') if self.rank==0: d[:] = self.proc_grid[key][3] d = g2.create_dataset('kbeg',self.nzp,dtype='i') if self.rank==0: d[:] = self.proc_grid[key][4] d = g2.create_dataset('kend',self.nzp,dtype='i') if self.rank==0: d[:] = self.proc_grid[key][5] # Store all global field independent data d = g.create_dataset('num_ghost',3,dtype='i') if self.rank==0: d[:] = self.num_ghost d = g.create_dataset('chunks_per_proc',3,dtype='i') if self.rank==0: d[:] = self.chunks_per_proc d = g.create_dataset('spacing',3,dtype='f') if self.rank==0: d[:] = self.spacing d = g.create_dataset('periodicity',3,dtype='i') if self.rank==0: d[:] = self.periodicity d = g.create_dataset('bounds',6,dtype='f') if self.rank==0: d[:] = self.bounds d = g.create_dataset('nxp',1,'i') if self.rank==0: d[:] = self.nxp d = g.create_dataset('nyp',1,'i') if self.rank==0: d[:] = self.nyp d = g.create_dataset('nzp',1,'i') if self.rank==0: d[:] = self.nzp # Collectively create groups and datasets for rank-specific data for ii in range(self.nproc): if parallel or ii==self.rank: g1 = f.create_group('{:05d}'.format(ii)) g1.create_dataset('nghbr',self.nghbr.shape,data=self.nghbr) for key in self.field: # To support parallel h5py, we need to create the groups here and cannot use # the 'save' method of Field3d g2 = g1.create_group('{}'.format(key)) g2.create_dataset('origin',3,dtype='f') g2.create_dataset('spacing',3,dtype='f') g2.create_dataset('data', self.chunk_size(key,rank=ii,incl_ghost=True), dtype=self.chunk_dtype(key)) g2.create_dataset('symmetries',(3,3,3),dtype='i') # Independent write grp_rank = '{:05d}'.format(self.rank) f[grp_rank]['nghbr'][:] = self.nghbr for key in self.field: f[grp_rank][key]['origin'][:] = self.field[key].origin f[grp_rank][key]['spacing'][:] = self.field[key].spacing f[grp_rank][key]['data'][:] = self.field[key].data f[grp_rank][key]['symmetries'][:] = self.symmetries[key] f.close() if not parallel: sb.proceed() self.comm.Barrier() tend = MPI.Wtime() if verbose and self.rank==0: print("[save_state] elapsed time: {:f}".format(tend-tbeg)) return def save_for_vtk(self,file,key,stride=(1,1,1),truncate=True,merge_at_root=True,on_pressure_grid=True,verbose=True): '''Saves a field for visualization purposes. This means it will only have a single lower ghost cell if there is an upper neighbor, and both a single and an upper ghost cell if there is no upper neighbor (or merged).''' import h5py from mpi4py import MPI # Recursive saving if key is a list. Take care of 'truncate'. if isinstance(key,(tuple,list)): for key_ in key: self.save_for_vtk(file,key_,stride=stride,merge_at_root=merge_at_root, truncate=truncate,on_pressure_grid=on_pressure_grid) truncate = False return # Since the data is usually much smaller than a full 'save_state', I only # implement sequential IO for now. if verbose and self.rank==0: print('[save_for_vtk] key={}, file={}'.format(key,file)) tbeg = MPI.Wtime() # If flag is set, shift data onto pressure grid first. Use a temporary field for this. name = key if on_pressure_grid: key_tmp = 'tmp' self.shift_to_pressure_grid(key,key_out='tmp') key = key_tmp # Get the subfield and save them if merge_at_root: fld = self._merge_at_root(key,stride=stride) if self.rank==0: fld.save(file,name=name,truncate=truncate) else: sb = SequentialBlock(1,comm=self.comm,per_node=False) if self.rank!=0: truncate=False name += '/{:05d}'.format(self.rank) self._subfield_for_vtk(key,stride=stride).save(file,name=name,truncate=truncate) sb.proceed() # Free the temporary field (if it was created) if on_pressure_grid: self.delete(key_tmp) # Sync (important in case there is another write to the same file following!) self.comm.Barrier() # Print timing tend = MPI.Wtime() if verbose and self.rank==0: print("[save_for_vtk] elapsed time: {:f}".format(tend-tbeg)) return def to_vtk(self,key,stride=(1,1,1),merge_at_root=False): '''Returns the field (only its interior + some ghost cells for plotting) as a vtk mesh. If other ghost cells are required, use one of the _subfield methods and apply .to_vtk() to the result.''' from .field import Field3d if merge_at_root: return self._merge_at_root(key,stride=stride).to_vtk() else: return self._subfield_for_vtk(key,stride=stride).to_vtk() def vtk_contour(self,key,val,stride=(1,1,1)): '''Compute isocontour for chunks.''' return self._subfield_for_vtk(key,stride=stride).vtk_contour(val) def vtk_slice(self,key,normal,origin,stride=(1,1,1)): '''Extracts a plane from field.''' return self._subfield_for_vtk(key,stride=stride).vtk_slice(normal,origin) def _subfield_for_vtk(self,key,stride=(1,1,1)): '''Extracts a subfield from chunk which has one leading and no trailing ghost cell except when there is no trailing neighbor. This guarantees non-overlapping contours.''' no_upper_ghost = [True,True,True] if self.ip==self.nxp-1: no_upper_ghost[0]=False if self.jp==self.nyp-1: no_upper_ghost[1]=False if self.kp==self.nzp-1: no_upper_ghost[2]=False return self._subfield(key,stride,(1,1,1),no_upper_ghost=no_upper_ghost) def _subfield_for_vtk_merge(self,key,stride=(1,1,1)): '''Extracts a subfield from chunk which has one leading and no trailing ghost cell except when there is no trailing neighbor. This guarantees non-overlapping contours.''' no_lower_ghost = [True,True,True] no_upper_ghost = [True,True,True] if self.ip==self.nxp-1: no_upper_ghost[0]=False if self.jp==self.nyp-1: no_upper_ghost[1]=False if self.kp==self.nzp-1: no_upper_ghost[2]=False return self._subfield(key,stride,(1,1,1),no_lower_ghost=no_lower_ghost, no_upper_ghost=no_upper_ghost) def _subfield_interior(self,key,stride=(1,1,1)): return self._subfield(key,stride,(0,0,0)) def _subfield(self,key,stride,num_ghost,no_lower_ghost=(False,False,False), no_upper_ghost=(False,False,False), deep=True): '''Returns the field with a stride applied.''' stride = np.array(stride,dtype=int) num_ghost = np.array(num_ghost,dtype=int) no_lower_ghost = np.array(no_lower_ghost,dtype=bool) no_upper_ghost = np.array(no_upper_ghost,dtype=bool) assert len(stride)==3, "'stride' needs to be of length 3." assert len(num_ghost)==3, "'num_ghost' needs to be of length 3." assert len(no_lower_ghost)==3, "'no_lower_ghost' needs to be of length 3." assert len(no_upper_ghost)==3, "'no_upper_ghost' needs to be of length 3." assert all(np.array(self.num_ghost)>=stride*num_ghost), "At least stride*num_ghost "\ "ghost cells are required in the original field. Have {}, need {}.".format( tuple(self.num_ghost),tuple(stride*num_ghost)) key_grid = self._key_grid(key) ib = self.proc_grid[key_grid][0][self.ip]-1 ie = self.proc_grid[key_grid][1][self.ip]-1 jb = self.proc_grid[key_grid][2][self.jp]-1 je = self.proc_grid[key_grid][3][self.jp]-1 kb = self.proc_grid[key_grid][4][self.kp]-1 ke = self.proc_grid[key_grid][5][self.kp]-1 ijkb = np.array((ib,jb,kb)) ijke = np.array((ie,je,ke)) # Determine the first point to consider in this chunk for the case of output # without ghost cells idx_origin = (np.ceil(ijkb/stride)*stride).astype(int)-ijkb+self.num_ghost # Now the last point to consider under the same circumstances idx_endpoint = (np.floor(ijke/stride)*stride).astype(int)-ijkb+self.num_ghost # If we want the subfield to have ghost cells, shift the origin and endpoint idx_origin -= (~no_lower_ghost).astype(int)*stride*num_ghost idx_endpoint += (~no_upper_ghost).astype(int)*stride*num_ghost # Compute the number of points num_points = ((idx_endpoint-idx_origin)/stride+1).astype(int) # Some last assertions which should never fail # print(self.rank,idx_origin,idx_endpoint,num_points,self.field[key].dim(),self.field[key].dim()) assert all(idx_origin>=0) assert all(idx_endpoint=-1, "communicate_ghost_cells: "\ "invalid neighbor position {}".format(positionDst) # [send/recv] get the rank of the neighbor where data is to be sent to # The position is passed as values -1,0,+1, but need to be converted to array indices ip_dst = positionDst[0]+1 jp_dst = positionDst[1]+1 kp_dst = positionDst[2]+1 ip_src = -positionDst[0]+1 jp_src = -positionDst[1]+1 kp_src = -positionDst[2]+1 rank_dst = self.nghbr[ip_dst,jp_dst,kp_dst] rank_src = self.nghbr[ip_src,jp_src,kp_src] # [send/recv] create a tag tag = ip_dst*100+jp_dst*10+kp_dst # [send/recv] get the indices of data to be sent/received nxl,nyl,nzl = self.chunk_size(key) if positionDst[0]==-1: ii_src = slice(self.nghx,2*self.nghx) ii_dst = slice(self.nghx+nxl,2*self.nghx+nxl) elif positionDst[0]==0: ii_src = slice(self.nghx,self.nghx+nxl) ii_dst = slice(self.nghx,self.nghx+nxl) elif positionDst[0]==1: ii_src = slice(nxl,nxl+self.nghx) ii_dst = slice(0,self.nghx) else: raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[0])) if positionDst[1]==-1: jj_src = slice(self.nghy,2*self.nghy) jj_dst = slice(self.nghy+nyl,2*self.nghy+nyl) elif positionDst[1]==0: jj_src = slice(self.nghy,self.nghy+nyl) jj_dst = slice(self.nghy,self.nghy+nyl) elif positionDst[1]==1: jj_src = slice(nyl,nyl+self.nghy) jj_dst = slice(0,self.nghy) else: raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[1])) if positionDst[2]==-1: kk_src = slice(self.nghz,2*self.nghz) kk_dst = slice(self.nghz+nzl,2*self.nghz+nzl) elif positionDst[2]==0: kk_src = slice(self.nghz,self.nghz+nzl) kk_dst = slice(self.nghz,self.nghz+nzl) elif positionDst[2]==1: kk_src = slice(nzl,nzl+self.nghz) kk_dst = slice(0,self.nghz) else: raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[2])) # [send/recv] Create a list for requests reqsend = None reqrecv = None # [send] now send the data to the neighbor, but only if there is one! # Communication must be done non-blocking, and we can use upper-case routines since this is a numpy array if rank_dst>=0: sendbuf = np.ascontiguousarray(self.field[key].data[ii_src,jj_src,kk_src]) reqsend = self.comm.Isend(sendbuf,dest=rank_dst,tag=tag) # [recv] the corresponding receive: results are stored in a buffer which will be assigned to the parent array later if rank_src>=0: recvbuf = np.zeros( (ii_dst.stop-ii_dst.start, jj_dst.stop-jj_dst.start, kk_dst.stop-kk_dst.start),dtype=self.field[key].data.dtype) reqrecv = self.comm.Irecv(recvbuf,source=rank_src,tag=tag) # [recv] wait for data to be received if reqrecv is not None: reqrecv.wait() self.field[key].data[ii_dst,jj_dst,kk_dst] = recvbuf # [send] wait for data to be sent if reqsend is not None: reqsend.wait() def _symmetrize_wall(self,key,positionBd): import numpy as np assert np.max(positionBd)<=1 and np.min(positionBd)>=-1, "symmetrize_wall: invalid neighbor "\ "position {}".format(positionBd) assert np.count_nonzero(positionBd)==1, "symmetrize_wall: only face direction is accepted "\ "(no edges or corners)" inghbr = positionBd[0]+1 jnghbr = positionBd[1]+1 knghbr = positionBd[2]+1 if self.nghbr[inghbr,jnghbr,knghbr]<0: sl_dst = [slice(None),slice(None),slice(None)] sl_src = [slice(None),slice(None),slice(None)] for axis in range(3): if positionBd[axis]==-1: # lower boundary # index of first point within the domain idx = self.field[key].nearest_gridpoint(self.bounds[2*axis],axis=axis) dist = self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis] if np.abs(dist)>self.field[key].eps_collapse and np.sign(dist)<0.0: idx += 1 dist = self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis] # distance first point to wall: should be either 0 or dx/2 assert (dist<=self.field[key].eps_collapse or (dist-0.5*self.spacing[axis])<=self.field[key].eps_collapse) if np.abs(dist)>=0.25*self.spacing[axis]: # no point on boundary sl_dst[axis] = slice(0,idx,1) sl_src[axis] = slice(2*idx-1,idx-1,-1) else: # point on boundary sl_dst[axis] = slice(0,idx,1) sl_src[axis] = slice(2*idx,idx,-1) break elif positionBd[axis]==1: # upper boundary # index of last point within the domain idx = self.field[key].nearest_gridpoint(self.bounds[2*axis+1],axis=axis) dist = self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis+1] if np.abs(dist)>self.field[key].eps_collapse and np.sign(dist)>0.0: idx -= 1 dist = self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis+1] # distance last point to wall: should be either 0 or -dx/2 assert (dist<=self.field[key].eps_collapse or (dist+0.5*self.spacing[axis])<=self.field[key].eps_collapse) if np.abs(dist)>=0.25*self.spacing[axis]: # no point on boundary sl_dst[axis] = slice(idx+1,self.field[key].dim(axis=axis),1) sl_src[axis] = slice(idx,2*idx+1-self.field[key].dim(axis=axis),-1) else: # point on boundary sl_dst[axis] = slice(idx+1,self.field[key].dim(axis=axis),1) sl_src[axis] = slice(idx-1,2*idx-self.field[key].dim(axis=axis),-1) break # print(sl_src,sl_dst,self.field[key].dim()) self.field[key].data[tuple(sl_dst)] = self.symmetries[key][inghbr,jnghbr,knghbr]*\ self.field[key].data[tuple(sl_src)] @staticmethod def _construct_procgrid(comm,proc,chunks_per_proc,num_ghost,periodicity): import numpy as np rank = comm.Get_rank() nxcpp,nycpp,nzcpp = chunks_per_proc nghx,nghy,nghz = num_ghost xperiodic,yperiodic,zperiodic = periodicity # Assert proc assert all(k in proc for k in ('u','v','w','p','s')), "'proc' must be a dictionary with "\ "keys 'u','v','w','p','s'" for key in proc: assert len(proc[key])==6, "Entries of 'proc' must have length of 6." proc_grid_ext = proc # Initialize chunks per processor # Verify that this processor grid can be redistributed onto the requested processor layout nxp_ext = len(proc_grid_ext['u'][0]) nyp_ext = len(proc_grid_ext['u'][2]) nzp_ext = len(proc_grid_ext['u'][4]) nproc_ext = nxp_ext*nyp_ext*nzp_ext # assert nxp_ext%nxcpp==0, "Number of processors must be divisible by the number "\ "of processors per process. (nxp_ext={}, nxcpp={})".format( nxp_ext,nxcpp) assert nyp_ext%nycpp==0, "Number of processors must be divisible by the number "\ "of processors per process. (nyp_ext={}, nycpp={})".format( nyp_ext,nycpp) assert nzp_ext%nzcpp==0, "Number of processors must be divisible by the number "\ "of processors per process. (nzp_ext={}, nzcpp={})".format( nzp_ext,nzcpp) # Determine the new processor layout and verify total number of MPI processes nxp = nxp_ext//nxcpp nyp = nyp_ext//nycpp nzp = nzp_ext//nzcpp nproc = nxp*nyp*nzp assert nproc==comm.Get_size(), "Number of MPI processes does not match the requested "\ "processor layout. (MPI procs: {}, required procs: {})".format( comm.Get_size(),nproc) # Construct internal processor grid proc_grid = {} for key in proc_grid_ext: proc_grid[key] = [None]*6 proc_grid[key][0] = proc_grid_ext[key][0][0::nxcpp] proc_grid[key][1] = proc_grid_ext[key][1][nxcpp-1::nxcpp] proc_grid[key][2] = proc_grid_ext[key][2][0::nycpp] proc_grid[key][3] = proc_grid_ext[key][3][nycpp-1::nycpp] proc_grid[key][4] = proc_grid_ext[key][4][0::nzcpp] proc_grid[key][5] = proc_grid_ext[key][5][nzcpp-1::nzcpp] # Get position in processor grid ip,jp,kp = position_from_rank(rank,nyp,nzp) # Determine local grid indices and size for key in proc_grid: nxl = proc_grid[key][1][ip]-proc_grid[key][0][ip]+1 nyl = proc_grid[key][3][jp]-proc_grid[key][2][jp]+1 nzl = proc_grid[key][5][kp]-proc_grid[key][4][kp]+1 # Verify that local grid size is not smaller than ghost cell size assert (nxl>=nghx and nyl>=nghy and nzl>=nghz),\ "Local grid size must be greater than number "\ "of ghost cells in each direction!" # Initialize neighbor array nghbr = np.empty((3,3,3),dtype='int') # wrap-around x ipl = ip-1 if ipl<0: if xperiodic: ipl = nxp-1 else: ipl = -1 ipr = ip+1 if ipr>nxp-1: if xperiodic: ipr = 0 else: ipr = -1 inghbr = (ipl,ip,ipr) # wrap-around y jpl = jp-1 if jpl<0: if yperiodic: jpl = nyp-1 else: jpl = -1 jpr = jp+1 if jpr>nyp-1: if yperiodic: jpr = 0 else: jpr = -1 jnghbr = (jpl,jp,jpr) # wrap-around z kpl = kp-1 if kpl<0: if zperiodic: kpl = nzp-1 else: kpl = -1 kpr = kp+1 if kpr>nzp-1: if zperiodic: kpr = 0 else: kpr = -1 knghbr = (kpl,kp,kpr) # Construct array of neighbors for ip in range(3): for jp in range(3): for kp in range(3): # Assign rank to neighbor array if inghbr[ip]<0 or jnghbr[jp]<0 or knghbr[kp]<0: nghbr[ip,jp,kp] = -1 else: nghbr[ip,jp,kp] = rank_from_position(inghbr[ip],jnghbr[jp],knghbr[kp],nyp,nzp) return (proc_grid,nxp,nyp,nzp, proc_grid_ext,nxp_ext,nyp_ext,nzp_ext, nghbr) def _key_grid(self,key): '''Get the key for the grid: scalar fields all share a common grid, so this routine returns 's' if key is 'sX' with X being a (potentially multi-digit) number. Otherwise the key itself is returned.''' if key[0]=='s' and key[1:].isnumeric(): return 's' else: return key class SequentialBlock: '''Block execution of following code for some processors until 'next' is issued.''' def __init__(self,batch_size,comm=None,per_node=False,tag=420): from mpi4py import MPI self.comm = MPI.COMM_WORLD if comm is None else comm self.tag = tag self.completed = False # If batch_size is None, there will be no blocking if batch_size is None: self.successor = None self.predecessor = None return # Establish the order at which the ranks are run if per_node: # Generate a list with all ranks on the current node nodename = MPI.Get_processor_name() nodelist = self.comm.allgather(nodename) ranking = [ii for ii,x in enumerate(nodelist) if x==nodename] # Get the position of predecessor/successor in ranking list position = ranking.index(comm.Get_rank()) idx_pre = position-batch_size idx_suc = position+batch_size # Convert to rank self.predecessor = ranking[idx_pre] if position-batch_size>=0 else None self.successor = ranking[idx_suc] if position+batch_size=self.comm.Get_size(): self.successor = None # Wait for predecessor if self.predecessor is not None: self.comm.recv(source=self.predecessor,tag=self.tag) return def proceed(self): self.completed = True if self.successor is not None: data = None self.comm.send(data,dest=self.successor,tag=self.tag) class GatherIterator: '''Sends 'data' sequentially to 'root' which can iterate over it without gathering all data at once. Every process which is not 'root' receives None.''' def __init__(self,data,comm=None,root=0,barrier=False): from mpi4py import MPI comm = MPI.COMM_WORLD if comm is None else comm self.comm = comm self.rank = comm.Get_rank() self.size = comm.Get_size() self.root = root self.barrier = barrier self.iter = 0 self.data = data def __iter__(self): return self def __next__(self): if self.iter>=self.size: if self.barrier: self.comm.Barrier() raise StopIteration if self.rank==self.root: if self.rank==self.iter: r = self.data else: r = self.comm.recv(source=self.iter,tag=0) else: if self.rank==self.iter: self.comm.send(self.data,dest=self.root,tag=0) r = None self.iter += 1 return r def parprint(*args, **kwargs): from mpi4py import MPI rank = kwargs.pop('rank') if 'rank' in kwargs else 0 comm = kwargs.pop('comm') if 'comm' in kwargs else MPI.COMM_WORLD if get_rank(comm=comm)==rank: print(*args, **kwargs) def get_rank(comm=None): from mpi4py import MPI comm = MPI.COMM_WORLD if comm is None else comm return comm.Get_rank() def is_root(comm=None,root=0): from mpi4py import MPI comm = MPI.COMM_WORLD if comm is None else comm return comm.Get_rank()==root def gather(data,comm=None): from mpi4py import MPI comm = MPI.COMM_WORLD if comm is None else comm return comm.gather(data,root=0)