import numpy as np class PPP: """Parallel Python Postprocessor for suspend""" def __init__(self,comm,func_load, num_ghost,chunks_per_proc,origin,spacing,periodicity,bounds, proc_grid,nxp,nyp,nzp, proc_grid_ext,nxp_ext,nyp_ext,nzp_ext, nghbr,field,symmetries): '''Class constructor: it is expected that all communcation is done in the corresponding class methods, such that every rank already holds the required information.''' # Initialize object with passed arguments self.comm = comm self.func_load = func_load self.num_ghost = num_ghost self.chunks_per_proc = chunks_per_proc self.origin = origin self.spacing = spacing self.periodicity = periodicity self.bounds = bounds self.proc_grid = proc_grid self.nxp,self.nyp,self.nzp = nxp,nyp,nzp self.proc_grid_ext = proc_grid_ext self.nxp_ext,self.nyp_ext,self.nzp_ext = nxp_ext,nyp_ext,nzp_ext self.nghbr = nghbr self.field = field self.symmetries = symmetries # Initialize some deduced variables self.rank = comm.Get_rank() self.nproc = nxp*nyp*nzp self.ip,self.jp,self.kp = self.position_from_rank(self.rank,external=False) self.nghx,self.nghy,self.nghz = num_ghost self.xperiodic,self.yperiodic,self.zperiodic = periodicity return @classmethod def from_snapshot(cls,snap,chunks_per_proc=(1,1,1),num_ghost=(1,1,1)): from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() # Let rank 0 read the processor grid as well as some info and then broadcast it if rank==0: proc = snap.procgrid() origin = snap.origin() spacing = snap.spacing() periodicity = snap.periodicity() bounds = snap.bounds() else: proc,origin,spacing,periodicity,bounds = 5*[None] proc = comm.bcast(proc ,root=0) origin = comm.bcast(origin ,root=0) spacing = comm.bcast(spacing ,root=0) periodicity = comm.bcast(periodicity,root=0) bounds = comm.bcast(bounds ,root=0) # Make sure all processors got the same arguments chunks_per_proc = comm.bcast(chunks_per_proc,root=0) num_ghost = comm.bcast(num_ghost,root=0) # Set data loading function func_load = snap.field_chunk # Construct new processor grid (proc_grid,nxp,nyp,nzp, proc_grid_ext,nxp_ext,nyp_ext,nzp_ext, nghbr) = PPP._construct_procgrid(comm,proc,chunks_per_proc,num_ghost,periodicity) # Initially, no field is loaded field = {} symmetries = {} return cls(comm,func_load, num_ghost,chunks_per_proc,origin,spacing,periodicity,bounds, proc_grid,nxp,nyp,nzp, proc_grid_ext,nxp_ext,nyp_ext,nzp_ext, nghbr,field,symmetries) @classmethod def from_state(cls,filename): import pickle from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() fin = filename+'.{:05d}.pickle'.format(rank) with open(fin,"rb") as f: payload = pickle.load(f) (num_ghost,chunks_per_proc,origin,spacing,periodicity,bounds, proc_grid,nxp,nyp,nzp,nghbr,field,symmetries) = payload assert nxp*nyp*nzp==comm.Get_size(), "The loaded state requires {} processors, but "\ "we are currently running with {}.".format(nxp*nyp*nzp,comm.Get_size()) func_load = None proc_grid_ext = None nxp_ext,nyp_ext,nzp_ext = 3*[None] return cls(comm,func_load, num_ghost,chunks_per_proc,origin,spacing,periodicity,bounds, proc_grid,nxp,nyp,nzp, proc_grid_ext,nxp_ext,nyp_ext,nzp_ext, nghbr,field,symmetries) def load_field(self,key,io_limit=None,barrier=False,dtype=np.float64): '''Loads the required chunks from file''' from .field import Field3d import numpy as np # Block execution of some processors if IO is limited self._baton_wait(io_limit) # Determine which chunks are to be loaded by the current processor ip_beg_ext = self.ip*self.chunks_per_proc[0] jp_beg_ext = self.jp*self.chunks_per_proc[1] kp_beg_ext = self.kp*self.chunks_per_proc[2] ip_end_ext = ip_beg_ext+self.chunks_per_proc[0]-1 jp_end_ext = jp_beg_ext+self.chunks_per_proc[1]-1 kp_end_ext = kp_beg_ext+self.chunks_per_proc[2]-1 # Get the total size of the field to be loaded ib = self.proc_grid_ext[key][0][ip_beg_ext] ie = self.proc_grid_ext[key][1][ip_end_ext] jb = self.proc_grid_ext[key][2][jp_beg_ext] je = self.proc_grid_ext[key][3][jp_end_ext] kb = self.proc_grid_ext[key][4][kp_beg_ext] ke = self.proc_grid_ext[key][5][kp_end_ext] nxl = ie-ib+1 nyl = je-jb+1 nzl = ke-kb+1 # Allocate an array to hold the entire field data = np.empty( (nxl+2*self.nghx, nyl+2*self.nghy, nzl+2*self.nghz),dtype=dtype) # Compute origin of subfield origin = (self.origin[key][0]+(ib-1-self.nghx)*self.spacing[0], self.origin[key][1]+(jb-1-self.nghy)*self.spacing[1], self.origin[key][2]+(kb-1-self.nghz)*self.spacing[2]) # Create a Field3d self.field[key] = Field3d(data,origin,self.spacing) # Go through each chunk to be read and construct the field for ip_ext in range(ip_beg_ext,ip_end_ext+1): for jp_ext in range(jp_beg_ext,jp_end_ext+1): for kp_ext in range(kp_beg_ext,kp_end_ext+1): # Determine rank of the chunk to be read rank_ext = self.rank_from_position(ip_ext,jp_ext,kp_ext,external=True) # Compute bounds of this chunk ib_ext = self.proc_grid_ext[key][0][ip_ext] ie_ext = self.proc_grid_ext[key][1][ip_ext] jb_ext = self.proc_grid_ext[key][2][jp_ext] je_ext = self.proc_grid_ext[key][3][jp_ext] kb_ext = self.proc_grid_ext[key][4][kp_ext] ke_ext = self.proc_grid_ext[key][5][kp_ext] nxl_ext = ie_ext-ib_ext+1 nyl_ext = je_ext-jb_ext+1 nzl_ext = ke_ext-kb_ext+1 # Read data and insert it subfield = self.func_load(rank_ext,key) self.field[key].insert_subfield(subfield) # Continue execution of waiting processors if IO was limited self._baton_pass(io_limit) # Exchange ghost cells self.exchange_ghost_cells(key) # Initialize symmetries and impose BC self.init_symmetry(key) self.impose_boundary_conditions(key) # Syncronize processes if requested if barrier: self.comm.Barrier() def differentiate(self,key,axis,key_out=None,on_pressure_grid=False): assert axis<3, "'axis' must be one of 0,1,2." if key_out is None: key_out = 'd'+key+('/dx','/dy','/dz')[axis] origin = list(self.origin[key]) shifting_state = self.shifting_state(key,axis=axis) if shifting_state==-1: shift_origin = 'after' origin[axis] += 0.5*self.spacing[axis] elif shifting_state==0: shift_origin = 'before' origin[axis] -= 0.5*self.spacing[axis] elif shifting_state==1: shift_origin = 'before' origin[axis] -= 0.5*self.spacing[axis] else: raise ValueError("Invalid shifting state.") self.field[key_out] = self.field[key].derivative(axis,only_keep_interior=False,shift_origin=shift_origin) self.origin[key_out] = tuple(origin) self.symmetries[key_out] = self.symmetries[key].copy() self.proc_grid[key_out] = self.proc_grid[key].copy() if axis==0: self.symmetries[key_out][0,1,1] = -self.symmetries[key_out][0,1,1] self.symmetries[key_out][2,1,1] = -self.symmetries[key_out][2,1,1] elif axis==1: self.symmetries[key_out][1,0,1] = -self.symmetries[key_out][1,0,1] self.symmetries[key_out][1,2,1] = -self.symmetries[key_out][1,2,1] elif axis==2: self.symmetries[key_out][1,1,0] = -self.symmetries[key_out][1,1,0] self.symmetries[key_out][1,1,2] = -self.symmetries[key_out][1,1,2] # Exchange ghost cells and set boundary conditions self.exchange_ghost_cells(key_out) self.impose_boundary_conditions(key_out) # Shift to pressure grid if desired if on_pressure_grid: self.shift_to_pressure_grid(key_out,key_out=key_out) def allocate(self,key_grid,fill=None,pseudo=False,dtype=np.float64): from .field import Field3d ib = self.proc_grid[key_grid][0][self.ip] ie = self.proc_grid[key_grid][1][self.ip] jb = self.proc_grid[key_grid][2][self.jp] je = self.proc_grid[key_grid][3][self.jp] kb = self.proc_grid[key_grid][4][self.kp] ke = self.proc_grid[key_grid][5][self.kp] origin = (self.origin[key_grid][0]+(ib-1-self.nghx)*self.spacing[0], self.origin[key_grid][1]+(jb-1-self.nghy)*self.spacing[1], self.origin[key_grid][2]+(kb-1-self.nghz)*self.spacing[2]) dim = (ie-ib+1+2*self.nghx, je-jb+1+2*self.nghy, ke-kb+1+2*self.nghz) return Field3d.allocate(dim,origin,self.spacing,fill=fill,pseudo=pseudo,dtype=dtype) def qcriterion(self,key_out=None,from_pressure=False,keep_derivatives=False): '''Computes Q-criterion''' from .field import Field3d if key_out is None: key_out = 'Q' if from_pressure: assert 'p' in self.field, "Pressure field is not loaded." self.copy('p',key_out,skip_data=True) self.field[key_out] = 0.5*self.field['p'].laplacian(only_keep_interior=False) else: assert 'u' in self.field, "Velocity field is not loaded: u" assert 'v' in self.field, "Velocity field is not loaded: v" assert 'w' in self.field, "Velocity field is not loaded: w" self.copy('p',key_out,skip_data=True,skip_symmetries=True) self.init_symmetry(key_out) self.field[key_out] = self.allocate('p',fill=0.0,dtype=self.field['u'].dtype()) keyx = ('x','y','z') keyu = ('u','v','w') for ii in range(3): key_tmp = 'd'+keyu[ii]+'/d'+keyx[ii] parprint(key_tmp) self.differentiate(keyu[ii],axis=ii,key_out=key_tmp,on_pressure_grid=True) self.field[key_out] += self.field[key_tmp]**2 if not keep_derivatives: self.delete(key_tmp) # iip1 = (ii+1)%3 key_tmp1 = 'd'+keyu[ii]+'/d'+keyx[iip1] key_tmp2 = 'd'+keyu[iip1]+'/d'+keyx[ii] parprint(key_tmp1,key_tmp2) self.differentiate(keyu[ii],axis=iip1,key_out=key_tmp1,on_pressure_grid=True) self.differentiate(keyu[iip1],axis=ii,key_out=key_tmp2,on_pressure_grid=True) self.field[key_out] += 2.0*self.field[key_tmp1]*self.field[key_tmp2] if not keep_derivatives: self.delete(key_tmp1) self.delete(key_tmp2) self.field[key_out] *= -0.5 # Exchange ghost cells and set boundary conditions self.exchange_ghost_cells(key_out) self.impose_boundary_conditions(key_out) return #def vorticity(self,axis=None,key_out=None,keep_derivatives=False,on_pressure_grid=True): def multiply(self,key1,key2,key_out=None,on_pressure_grid=True): if key_out is None: key_out = key1 if not self.field[key1].has_same_grid(self.field[key2]) or on_pressure_grid: self.shift_to_pressure_grid(key1,key_out) self.shift_to_pressure_grid(key2,'tmp') self.field[key_out] *= self.field['tmp'] self.delete('tmp') else: self.copy(key1,key_out) self.field[key_out] *= self.field[key2] return def gaussian_filter(self,key,sigma,truncate=4.0,key_out=None,iterate=False): '''Applies a gaussian filter to a field as in-place operation. Sigma is the std of the filter in terms of grid width.''' import numpy as np if key_out is None: key_out = key if key!=key_out: self.copy(key,key_out,skip_data=True) # Compute radius of Gaussian filter radius = self.field[key].gaussian_filter_radius(sigma,truncate=truncate) if not iterate: # Assert that we have sufficient amount of ghost cells assert all([self.num_ghost[ii]>=radius[ii] for ii in range(3)]),\ "Too few ghost cells for stencil: {}, {}".format(self.num_ghost,radius) niter = 1 else: # Determine number of iterations required for current ghost cell setup sigma = np.array(sigma) niter = 1 while not all([self.num_ghost[ii]>=radius[ii] for ii in range(3)]): sigma /= np.sqrt(2) niter *= 2 radius = self.field[key].gaussian_filter_radius(sigma,truncate=truncate) assert all([radius[ii]>0 if sigma[ii]>0.0 else True for ii in range(3)]),\ "Iterative procedure leads to invalid stencil radius: "\ "increase number of ghost cells. {}".format(radius) parprint('Gaussian filter: iterations={}, stencil radius={}'.format(niter,radius)) for iiter in range(niter): parprint('Iter #{:d}'.format(iiter)) # Filter field: if key_out is None, perform operation inplace self.field[key_out] = self.field[key].gaussian_filter(sigma, truncate=truncate,only_keep_interior=False) # Exchange ghost cells and set boundary conditions self.exchange_ghost_cells(key_out) self.impose_boundary_conditions(key_out) # Iterate inplace from now on key = key_out def broadcast(self,key,operation,arg,key_out=None): '''Broadcasts an operation involving a scalar or matrix on the entire grid. If 'arg' is a matrix, it must be three-dimensional and its axes must be singular or of length nx/ny/nz.''' import numpy as np import operator # Broadcast argument arg = self.comm.bcast(arg,root=0) # Select operator if operation in ('add','+'): op = operator.iadd elif operation in ('subtract','sub','-'): op = operator.isub elif operation in ('divide','div','/'): op = operator.itruediv elif operation in ('multiply','mul','*'): op = operator.imul elif operation in ('power','pow','^','**'): op = operator.ipow else: raise ValueError("Invalid operation: {}".format(operation)) if isinstance(arg,np.ndarray): sl_arg = 3*[slice(None)] for axis in range(3): if arg.shape[axis]==1: continue elif arg.shape[axis]==self.dim(key,axis=axis): pos = self.position_from_rank(self.rank,external=False)[axis] sl_arg[axis] = slice( self.proc_grid[key][2*axis][pos]-1, self.proc_grid[key][2*axis+1][pos]) else: raise ValueError("'arg' must either be singular or match global "\ "grid dimension. (axis={}: got {:d}, expected {:d}".format( axis,arg.shape[axis],self.dim(key,axis=axis))) # Only operate on interior and communcate ghosts later sl_int = tuple(slice(self.num_ghost[ii],-self.num_ghost[ii]) for ii in range(3)) sl_arg = tuple(sl_arg) if key_out is None: op(self.field[key].data[sl_int],arg[sl_arg]) else: self.copy(key,key_out) op(self.field[key_out].data[sl_int],arg[sl_arg]) # Exchange ghost cells and set boundary conditions self.exchange_ghost_cells(key) self.impose_boundary_conditions(key) elif isinstance(arg,(int,float)): if key_out is None: op(self.field[key].data,arg) else: self.copy(key,key_out) op(self.field[key_out].data,arg) return def integrate(self,key,integrate_axis,ufunc=None,ignore_nan=False,average=False): '''Computes a global integral or average along a given axis applying the function 'ufunc' to each node. The result is returned by rank 0, all other ranks return None.''' from mpi4py import MPI import numpy as np if integrate_axis is None: integrate_axis = tuple(self.periodicity) # Compute local integral, but get weights separately idx_origin = tuple(self.num_ghost[ii]-1 for ii in range(3)) (integ_local,weights_local) = self.field[key].extract_subfield( idx_origin,self.chunk_size(key,axis=None)).integral( integrate_axis,ufunc=ufunc,ignore_nan=ignore_nan, average=average,return_weights=True) # Make sure weights_local is a numpy array weights_local = np.array(weights_local) # Simple implementation: all processor communicate directly to root if self.rank==0: dim_final = tuple(1 if integrate_axis[axis] else self.dim(key,axis=axis) for axis in range(3)) # Allocate a nice spot of memory integ = np.zeros(dim_final,dtype=integ_local.dtype) if average: weights = np.zeros(dim_final,dtype=weights_local.dtype) # Receive from peers for rank_src in range(0,self.nproc): # Determine target array position pos = self.position_from_rank(rank_src,external=False) sl = 3*[slice(None)] for axis in range(3): if integrate_axis[axis]: sl[axis] = slice(0,1) else: sl[axis] = slice( self.proc_grid[key][2*axis][pos[axis]]-1, self.proc_grid[key][2*axis+1][pos[axis]]) # Receive data and put it in a good spot if rank_src==0: integ[sl] += integ_local if average: weights[sl] += weights_local else: integ[sl] += self.comm.recv(source=rank_src,tag=1) if average: weights[sl] += self.comm.recv(source=rank_src,tag=2) if average: return integ/weights else: return integ/weights_local else: self.comm.send(integ_local,dest=0,tag=1) if average: self.comm.send(weights_local,dest=0,tag=2) return None def shift_to_pressure_grid(self,key,key_out=None): from .field import Field3d if key_out is None: if key in ('u','v','w'): raise ValueError("In-place shift of {'u','v','w'} is not permitted.") key_out = key # If pressure is loaded, use field information, else create a # fake Field3d object so that we can use its methods. field_ref = self.allocate('p',pseudo=True) # Compute the relative shift and shift it rel_shift = self.field[key].relative_shift(field_ref) self.field[key_out] = self.field[key].shift_origin(rel_shift) # Match number of gridpoints sl = 3*[slice(None)] for axis in range(3): if not self.field[key_out].has_same_dim(field_ref,axis=axis): sl[axis] = slice(0,field_ref.dim(axis=axis)) self.field[key_out].data = self.field[key_out].data[sl] # Copy metadata of pressure grid self.copy('p',key_out,skip_data=True,skip_symmetries=True) self.symmetries[key_out] = self.symmetries[key].copy() # Exchange ghost cells and set boundary conditions self.exchange_ghost_cells(key_out) self.impose_boundary_conditions(key_out) def init_symmetry(self,key,mirror=(True,True)): '''Sets the symmetries for ghost cells behind the wall''' # Example: wall in y # No-slip boundary (no mirror) free-slip boundary (mirror) # u -> -u u -> u # v -> -v v -> -v # w -> -w w -> w # p -> p p -> p import numpy as np self.symmetries[key] = np.zeros((3,3,3),dtype='i') if not self.xperiodic: if key=='u': self.symmetries[key][0,1,1] = -1 self.symmetries[key][2,1,1] = -1 else: self.symmetries[key][0,1,1] = 1 if mirror[0] else -1 self.symmetries[key][2,1,1] = 1 if mirror[1] else -1 if not self.yperiodic: if key=='v': self.symmetries[key][1,0,1] = -1 self.symmetries[key][1,2,1] = -1 else: self.symmetries[key][1,0,1] = 1 if mirror[0] else -1 self.symmetries[key][1,2,1] = 1 if mirror[1] else -1 if not self.zperiodic: if key=='w': self.symmetries[key][1,1,0] = -1 self.symmetries[key][1,1,2] = -1 else: self.symmetries[key][1,1,0] = 1 if mirror[0] else -1 self.symmetries[key][1,1,2] = 1 if mirror[1] else -1 def copy(self,key,key_out,skip_data=False,skip_symmetries=False): '''Copies a field to a new key.''' if key==key_out: return self.origin[key_out] = tuple(self.origin[key]) self.proc_grid[key_out] = self.proc_grid[key].copy() if not skip_symmetries: self.symmetries[key_out] = self.symmetries[key].copy() if not skip_data: self.field[key_out] = self.field[key].copy() return def delete(self,key): '''Removes a field from memory.''' assert key in self.field, "Field {} cannot be deleted, because it "\ "does not exist.".format(key) import gc del self.field[key] del self.symmetries[key] if key not in ('u','v','w','p','s'): del self.origin[key] del self.proc_grid[key] gc.collect() return def merge(self,key,stride=(1,1,1)): '''Returns the complete (possibly downsampled) field at rank 0.''' return def save_state(self,filename): import pickle fout = filename+'.{:05d}.pickle'.format(self.rank) payload = ( self.num_ghost, self.chunks_per_proc, self.origin, self.spacing, self.periodicity, self.bounds, self.proc_grid, self.nxp, self.nyp, self.nzp, self.nghbr, self.field, self.symmetries) with open(fout,"wb") as f: pickle.dump(payload,f) def to_vtk(self,key,no_ghost=False): '''Returns the field (only its interior + some ghost cells for plotting) as a vtk mesh.''' return self._subfield_for_vtk(key,no_ghost).to_vtk() def vtk_contour(self,key,val): '''Compute isocontour for chunks.''' return self._subfield_for_vtk(key,False).vtk_contour(val) def vtk_slice(self,key,normal,origin): '''Extracts a plane from field.''' return self._subfield_for_vtk(key,False).vtk_slice(normal,origin) def _subfield_for_vtk(self,key,no_ghost): num_ghost = np.array(self.num_ghost) idx_origin = num_ghost dim = np.array(self.field[key].dim(axis=None)) dim -= 2*num_ghost if not no_ghost: idx_origin -= 1 dim += 1 if self.ip==self.nxp-1: dim[0]+=1 if self.jp==self.nyp-1: dim[1]+=1 if self.kp==self.nzp-1: dim[2]+=1 idx_origin = tuple(idx_origin) dim = tuple(dim) return self.field[key].extract_subfield(idx_origin,dim) def rank_from_position(self,ip,jp,kp,external=False): if external: nyp,nzp = self.nyp_ext,self.nzp_ext else: nyp,nzp = self.nyp,self.nzp return self._rank_from_position(ip,jp,kp,nyp,nzp) def position_from_rank(self,rank,external=False): if external: nyp,nzp = self.nyp_ext,self.nzp_ext else: nyp,nzp = self.nyp,self.nzp return self._position_from_rank(rank,nyp,nzp) @staticmethod def _rank_from_position(ip,jp,kp,nyp,nzp): return ip*nyp*nzp+jp*nzp+kp @staticmethod def _position_from_rank(rank,nyp,nzp): ip = rank//(nyp*nzp) jp = (rank//nzp)%nyp kp = rank%nzp return (ip,jp,kp) def shifting_state(self,key,axis=None): '''Returns the relative shift of first global grid point to domain boundaries as either 1 (= +0.5h), 0 (= 0h) or -1 (= -0.5h).''' if axis is None: return tuple(self.shifting_state(key,axis=ii) for ii in range(3)) assert axis<3, "'axis' must be one of 0,1,2." return int(round((self.origin[key][axis]-self.bounds[2*axis])/(0.5*self.spacing[axis]))) def chunk_size(self,key,axis=None): '''Returns size of chunk without ghost cells.''' if axis is None: return tuple(self.chunk_size(key,axis=ii) for ii in range(3)) assert axis<3, "'axis' must be one of 0,1,2." return self.field[key].dim(axis=axis)-2*self.num_ghost[axis] def dim(self,key,axis=None): '''Returns the total number of gridpoints across all processors without ghost cells.''' if axis is None: return tuple(self.dim(key,axis=ii) for ii in range(3)) assert axis<3, "'axis' must be one of 0,1,2." return self.proc_grid[key][2*axis+1][-1] def exchange_ghost_cells(self,key): '''Communicates all ghost cells of specified field''' # Trigger non-blocking communication: # Communicate faces (6 faces) self._communicate_ghost_cells(key,(-1,0,0)) # left self._communicate_ghost_cells(key,(+1,0,0)) # right self._communicate_ghost_cells(key,(0,-1,0)) # down self._communicate_ghost_cells(key,(0,+1,0)) # up self._communicate_ghost_cells(key,(0,0,-1)) # front self._communicate_ghost_cells(key,(0,0,+1)) # back # Communicate edges (12 edges) self._communicate_ghost_cells(key,(-1,-1,0)) # left,down self._communicate_ghost_cells(key,(-1,0,-1)) # left,front self._communicate_ghost_cells(key,(-1,+1,0)) # left,up self._communicate_ghost_cells(key,(-1,0,+1)) # left,back self._communicate_ghost_cells(key,(+1,-1,0)) # right,down self._communicate_ghost_cells(key,(+1,0,-1)) # right,front self._communicate_ghost_cells(key,(+1,+1,0)) # right,up self._communicate_ghost_cells(key,(+1,0,+1)) # right,back self._communicate_ghost_cells(key,(0,-1,-1)) # down,front self._communicate_ghost_cells(key,(0,-1,+1)) # down,back self._communicate_ghost_cells(key,(0,+1,-1)) # up,front self._communicate_ghost_cells(key,(0,+1,+1)) # up,back # Communicate corners (8 corners) self._communicate_ghost_cells(key,(-1,-1,-1)) # left,down,front self._communicate_ghost_cells(key,(-1,-1,+1)) # left,down,back self._communicate_ghost_cells(key,(-1,+1,-1)) # left,up,front self._communicate_ghost_cells(key,(-1,+1,+1)) # left,up,back self._communicate_ghost_cells(key,(+1,-1,-1)) # right,down,front self._communicate_ghost_cells(key,(+1,-1,+1)) # right,down,back self._communicate_ghost_cells(key,(+1,+1,-1)) # right,up,front self._communicate_ghost_cells(key,(+1,+1,+1)) # right,up,back def impose_boundary_conditions(self,key): '''Imposes symmetry boundary conditions on each non-periodic wall''' self._symmetrize_wall(key,(-1,0,0)) self._symmetrize_wall(key,(+1,0,0)) self._symmetrize_wall(key,(0,-1,0)) self._symmetrize_wall(key,(0,+1,0)) self._symmetrize_wall(key,(0,0,-1)) self._symmetrize_wall(key,(0,0,+1)) def _communicate_ghost_cells(self,key,positionDst): '''Triggers communication of ghost cells''' import numpy as np assert np.max(positionDst)<=1 and np.min(positionDst)>=-1, "communicate_ghost_cells: "\ "invalid neighbor position {}".format(positionDst) # [send/recv] get the rank of the neighbor where data is to be sent to # The position is passed as values -1,0,+1, but need to be converted to array indices ip_dst = positionDst[0]+1 jp_dst = positionDst[1]+1 kp_dst = positionDst[2]+1 ip_src = -positionDst[0]+1 jp_src = -positionDst[1]+1 kp_src = -positionDst[2]+1 rank_dst = self.nghbr[ip_dst,jp_dst,kp_dst] rank_src = self.nghbr[ip_src,jp_src,kp_src] # [send/recv] create a tag tag = ip_dst*100+jp_dst*10+kp_dst # [send/recv] get the indices of data to be sent/received nxl,nyl,nzl = self.chunk_size(key) if positionDst[0]==-1: ii_src = slice(self.nghx,2*self.nghx) ii_dst = slice(self.nghx+nxl,2*self.nghx+nxl) elif positionDst[0]==0: ii_src = slice(self.nghx,self.nghx+nxl) ii_dst = slice(self.nghx,self.nghx+nxl) elif positionDst[0]==1: ii_src = slice(nxl,nxl+self.nghx) ii_dst = slice(0,self.nghx) else: raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[0])) if positionDst[1]==-1: jj_src = slice(self.nghy,2*self.nghy) jj_dst = slice(self.nghy+nyl,2*self.nghy+nyl) elif positionDst[1]==0: jj_src = slice(self.nghy,self.nghy+nyl) jj_dst = slice(self.nghy,self.nghy+nyl) elif positionDst[1]==1: jj_src = slice(nyl,nyl+self.nghy) jj_dst = slice(0,self.nghy) else: raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[1])) if positionDst[2]==-1: kk_src = slice(self.nghz,2*self.nghz) kk_dst = slice(self.nghz+nzl,2*self.nghz+nzl) elif positionDst[2]==0: kk_src = slice(self.nghz,self.nghz+nzl) kk_dst = slice(self.nghz,self.nghz+nzl) elif positionDst[2]==1: kk_src = slice(nzl,nzl+self.nghz) kk_dst = slice(0,self.nghz) else: raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[2])) # [send/recv] Create a list for requests reqsend = None reqrecv = None # [send] now send the data to the neighbor, but only if there is one! # Communication must be done non-blocking, and we can use upper-case routines since this is a numpy array if rank_dst>=0: sendbuf = np.ascontiguousarray(self.field[key].data[ii_src,jj_src,kk_src]) reqsend = self.comm.Isend(sendbuf,dest=rank_dst,tag=tag) # [recv] the corresponding receive: results are stored in a buffer which will be assigned to the parent array later if rank_src>=0: recvbuf = np.zeros( (ii_dst.stop-ii_dst.start, jj_dst.stop-jj_dst.start, kk_dst.stop-kk_dst.start),dtype=self.field[key].data.dtype) reqrecv = self.comm.Irecv(recvbuf,source=rank_src,tag=tag) # [recv] wait for data to be received if reqrecv is not None: reqrecv.wait() self.field[key].data[ii_dst,jj_dst,kk_dst] = recvbuf # [send] wait for data to be sent if reqsend is not None: reqsend.wait() def _symmetrize_wall(self,key,positionBd): import numpy as np assert np.max(positionBd)<=1 and np.min(positionBd)>=-1, "symmetrize_wall: invalid neighbor "\ "position {}".format(positionBd) assert np.count_nonzero(positionBd)==1, "symmetrize_wall: only face direction is accepted "\ "(no edges or corners)" inghbr = positionBd[0]+1 jnghbr = positionBd[1]+1 knghbr = positionBd[2]+1 if self.nghbr[inghbr,jnghbr,knghbr]<0: sl_dst = [slice(None),slice(None),slice(None)] sl_src = [slice(None),slice(None),slice(None)] for axis in range(3): if positionBd[axis]==-1: # lower boundary # index of first point within the domain idx = self.field[key].nearest_gridpoint(self.bounds[2*axis],axis=axis,lower=True)+1 # distance first point to wall: should be either 0 or dx/2 dist = np.abs(self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis]) if dist>=0.25*self.spacing[axis]: # no point on boundary sl_dst[axis] = slice(0,idx,1) sl_src[axis] = slice(2*idx-1,idx-1,-1) else: # point on boundary sl_dst[axis] = slice(0,idx,1) sl_src[axis] = slice(2*idx,idx,-1) break elif positionBd[axis]==1: # upper boundary # index of last point within the domain idx = self.field[key].nearest_gridpoint(self.bounds[2*axis+1],axis=axis,lower=True) # distance last point to wall: should be either 0 or -dx/2 dist = np.abs(self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis+1]) if dist>=0.25*self.spacing[axis]: # no point on boundary sl_dst[axis] = slice(idx+1,self.field[key].dim(axis=axis),1) sl_src[axis] = slice(idx,2*idx+1-self.field[key].dim(axis=axis),-1) else: # point on boundary sl_dst[axis] = slice(idx+1,self.field[key].dim(axis=axis),1) sl_src[axis] = slice(idx-1,2*idx-self.field[key].dim(axis=axis),-1) break # print(sl_src,sl_dst,self.field[key].dim()) self.field[key].data[tuple(sl_dst)] = self.symmetries[key][inghbr,jnghbr,knghbr]*\ self.field[key].data[tuple(sl_src)] @staticmethod def _construct_procgrid(comm,proc,chunks_per_proc,num_ghost,periodicity): import numpy as np rank = comm.Get_rank() nxcpp,nycpp,nzcpp = chunks_per_proc nghx,nghy,nghz = num_ghost xperiodic,yperiodic,zperiodic = periodicity # Assert proc assert all(k in proc for k in ('u','v','w','p','s')), "'proc' must be a dictionary with "\ "keys 'u','v','w','p','s'" for key in proc: assert len(proc[key])==6, "Entries of 'proc' must have length of 6." proc_grid_ext = proc # Initialize chunks per processor # Verify that this processor grid can be redistributed onto the requested processor layout nxp_ext = len(proc_grid_ext['u'][0]) nyp_ext = len(proc_grid_ext['u'][2]) nzp_ext = len(proc_grid_ext['u'][4]) nproc_ext = nxp_ext*nyp_ext*nzp_ext # assert nxp_ext%nxcpp==0, "Number of processors must be divisible by the number "\ "of processors per process. (nxp_ext={}, nxcpp={})".format( nxp_ext,nxcpp) assert nyp_ext%nycpp==0, "Number of processors must be divisible by the number "\ "of processors per process. (nyp_ext={}, nycpp={})".format( nyp_ext,nycpp) assert nzp_ext%nzcpp==0, "Number of processors must be divisible by the number "\ "of processors per process. (nzp_ext={}, nzcpp={})".format( nzp_ext,nzcpp) # Determine the new processor layout and verify total number of MPI processes nxp = nxp_ext//nxcpp nyp = nyp_ext//nycpp nzp = nzp_ext//nzcpp nproc = nxp*nyp*nzp assert nproc==comm.Get_size(), "Number of MPI processes does not match the requested "\ "processor layout. (MPI procs: {}, required procs: {})".format( comm.Get_size(),nproc) # Construct internal processor grid proc_grid = {} for key in proc_grid_ext: proc_grid[key] = [None]*6 proc_grid[key][0] = proc_grid_ext[key][0][0::nxcpp] proc_grid[key][1] = proc_grid_ext[key][1][nxcpp-1::nxcpp] proc_grid[key][2] = proc_grid_ext[key][2][0::nycpp] proc_grid[key][3] = proc_grid_ext[key][3][nycpp-1::nycpp] proc_grid[key][4] = proc_grid_ext[key][4][0::nzcpp] proc_grid[key][5] = proc_grid_ext[key][5][nzcpp-1::nzcpp] # Get position in processor grid ip,jp,kp = PPP._position_from_rank(rank,nyp,nzp) # Determine local grid indices and size for key in proc_grid: nxl = proc_grid[key][1][ip]-proc_grid[key][0][ip]+1 nyl = proc_grid[key][3][jp]-proc_grid[key][2][jp]+1 nzl = proc_grid[key][5][kp]-proc_grid[key][4][kp]+1 # Verify that local grid size is not smaller than ghost cell size assert (nxl>=nghx and nyl>=nghy and nzl>=nghz),\ "Local grid size must be greater than number "\ "of ghost cells in each direction!" # Initialize neighbor array nghbr = np.empty((3,3,3),dtype='int') # wrap-around x ipl = ip-1 if ipl<0: if xperiodic: ipl = nxp-1 else: ipl = -1 ipr = ip+1 if ipr>nxp-1: if xperiodic: ipr = 0 else: ipr = -1 inghbr = (ipl,ip,ipr) # wrap-around y jpl = jp-1 if jpl<0: if yperiodic: jpl = nyp-1 else: jpl = -1 jpr = jp+1 if jpr>nyp-1: if yperiodic: jpr = 0 else: jpr = -1 jnghbr = (jpl,jp,jpr) # wrap-around z kpl = kp-1 if kpl<0: if zperiodic: kpl = nzp-1 else: kpl = -1 kpr = kp+1 if kpr>nzp-1: if zperiodic: kpr = 0 else: kpr = -1 knghbr = (kpl,kp,kpr) # Construct array of neighbors for ip in range(3): for jp in range(3): for kp in range(3): # Assign rank to neighbor array if inghbr[ip]<0 or jnghbr[jp]<0 or knghbr[kp]<0: nghbr[ip,jp,kp] = -1 else: nghbr[ip,jp,kp] = PPP._rank_from_position(inghbr[ip],jnghbr[jp],knghbr[kp],nyp,nzp) return (proc_grid,nxp,nyp,nzp, proc_grid_ext,nxp_ext,nyp_ext,nzp_ext, nghbr) def _baton_wait(self,batch_size,tag=420): '''Block execution until an empty message from rank-batch_wait is received (issued by _baton_pass)''' from mpi4py import MPI if batch_size is not None: if self.rank>=batch_size: source = self.rank-batch_size self.comm.recv(source=source,tag=tag) def _baton_pass(self,batch_size,tag=420): '''Sends an empty message to rank+batch_wait to unblock its execution (issued by _baton_wait)''' from mpi4py import MPI if batch_size is not None: dest = self.rank+batch_size if dest=self.size: if self.barrier: self.comm.Barrier() raise StopIteration if self.rank==self.root: if self.rank==self.iter: r = self.data else: r = self.comm.recv(source=self.iter,tag=0) else: if self.rank==self.iter: self.comm.send(self.data,dest=self.root,tag=0) r = None self.iter += 1 return r def parprint(*args, **kwargs): from mpi4py import MPI rank = kwargs.pop('rank') if 'rank' in kwargs else 0 comm = kwargs.pop('comm') if 'comm' in kwargs else MPI.COMM_WORLD if get_rank(comm=comm)==rank: print(*args, **kwargs) def get_rank(comm=None): from mpi4py import MPI comm = MPI.COMM_WORLD if comm is None else comm return comm.Get_rank() def is_root(comm=None,root=0): from mpi4py import MPI comm = MPI.COMM_WORLD if comm is None else comm return comm.Get_rank()==root def gather(data,comm=None): from mpi4py import MPI comm = MPI.COMM_WORLD if comm is None else comm return comm.gather(data,root=0)