class PPP: """Parallel Python Postprocessor for suspend""" def __init__(self,comm,func_load,num_ghost,precision,origin,spacing,periodicity,bounds,proc,chunks_per_proc): '''Constructor: except for comm, only rank 0 needs initialized data.''' self.comm = comm self.rank = comm.Get_rank() self.func_load = func_load self.init_settings(num_ghost,precision) self.init_domain(origin,spacing,periodicity,bounds) self.init_procgrid(proc,chunks_per_proc) self.field = {} self.symmetries = {} return @classmethod def from_snapshot(cls,snap,chunks_per_proc=(1,1,1),num_ghost=(1,1,1),precision='float64'): from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() if rank==0: proc = snap.procgrid() origin = snap.origin() spacing = snap.spacing() periodicity = snap.periodicity() bounds = snap.bounds() else: proc = None origin = None spacing = None periodicity = None bounds = None func_load = snap.field_chunk return cls(comm,func_load,num_ghost,precision,origin,spacing,periodicity,bounds,proc,chunks_per_proc) def init_settings(self,num_ghost,precision): '''Initializes PPP settings for all processors.''' # TBD: some assertions self.num_ghost = self.comm.bcast(num_ghost,root=0) self.precision = self.comm.bcast(precision,root=0) # Some shortcuts self.nghx,self.nghy,self.nghz = self.num_ghost def init_symmetries(self,key,mirror=(True,True)): '''Sets the symmetries for ghost cells behind the wall''' # Example: wall in y # No-slip boundary (no mirror) free-slip boundary (mirror) # u -> -u u -> u # v -> -v v -> -v # w -> -w w -> w # p -> p p -> p import numpy as np self.symmetries[key] = np.zeros((3,3,3),dtype='i') if not self.xperiodic: if key=='u': self.symmetries[key][0,1,1] = -1 self.symmetries[key][2,1,1] = -1 else: self.symmetries[key][0,1,1] = 1 if mirror[0] else -1 self.symmetries[key][2,1,1] = 1 if mirror[1] else -1 if not self.yperiodic: if key=='v': self.symmetries[key][1,0,1] = -1 self.symmetries[key][1,2,1] = -1 else: self.symmetries[key][1,0,1] = 1 if mirror[0] else -1 self.symmetries[key][1,2,1] = 1 if mirror[1] else -1 if not self.zperiodic: if key=='w': self.symmetries[key][1,1,0] = -1 self.symmetries[key][1,1,2] = -1 else: self.symmetries[key][1,1,0] = 1 if mirror[0] else -1 self.symmetries[key][1,1,2] = 1 if mirror[1] else -1 def init_domain(self,origin,spacing,periodicity,bounds): '''Sets up domain information for all processors''' # TBD: some assertions self.origin = self.comm.bcast(origin,root=0) self.spacing = self.comm.bcast(spacing,root=0) self.periodicity = self.comm.bcast(periodicity,root=0) self.bounds = self.comm.bcast(bounds,root=0) # Some shortcuts self.xperiodic,self.yperiodic,self.zperiodic = self.periodicity return def init_procgrid(self,proc,chunks_per_proc): # Note: requires nghx, xperiodic to be set '''Read input processor grid, compute processor grid for workers''' import numpy as np self.chunks_per_proc = self.comm.bcast(chunks_per_proc,root=0) self.nxcpp = chunks_per_proc[0] self.nycpp = chunks_per_proc[1] self.nzcpp = chunks_per_proc[2] if self.rank==0: # Assert proc and add it to class assert all(k in proc for k in ('u','v','w','p','s')), "'proc' must be a dictionary with "\ "keys 'u','v','w','p','s'" for key in proc: assert len(proc[key])==6, "Entries of 'proc' must have length of 6." proc_grid_ext = proc # Initialize chunks per processor # Verify that this processor grid can be redistributed onto the requested processor layout nxp_ext = len(proc_grid_ext['u'][0]) nyp_ext = len(proc_grid_ext['u'][2]) nzp_ext = len(proc_grid_ext['u'][4]) nproc_ext = nxp_ext*nyp_ext*nzp_ext # assert nxp_ext%self.nxcpp==0, "Number of processors must be divisible by the number "\ "of processors per process. (nxp_ext={}, nxcpp={})".format( nxp_ext,self.nxcpp) assert nyp_ext%self.nycpp==0, "Number of processors must be divisible by the number "\ "of processors per process. (nyp_ext={}, nycpp={})".format( nyp_ext,self.nycpp) assert nzp_ext%self.nzcpp==0, "Number of processors must be divisible by the number "\ "of processors per process. (nzp_ext={}, nzcpp={})".format( nzp_ext,self.nzcpp) # Determine the new processor layout and verify total number of MPI processes nxp = nxp_ext//self.nxcpp nyp = nyp_ext//self.nycpp nzp = nzp_ext//self.nzcpp nproc = nxp*nyp*nzp assert nproc==self.comm.Get_size(), "Number of MPI processes does not match the requested "\ "processor layout. (MPI procs: {}, required procs: {})".format( self.comm.Get_size(),nproc) # Construct internal processor grid proc_grid = {} for key in proc_grid_ext: proc_grid[key] = [None]*6 proc_grid[key][0] = proc_grid_ext[key][0][0::self.nxcpp] proc_grid[key][1] = proc_grid_ext[key][1][self.nxcpp-1::self.nxcpp] proc_grid[key][2] = proc_grid_ext[key][2][0::self.nycpp] proc_grid[key][3] = proc_grid_ext[key][3][self.nycpp-1::self.nycpp] proc_grid[key][4] = proc_grid_ext[key][4][0::self.nzcpp] proc_grid[key][5] = proc_grid_ext[key][5][self.nzcpp-1::self.nzcpp] else: proc_grid_ext = None proc_grid = None nxp_ext,nyp_ext,nzp_ext,nproc_ext = None,None,None,None nxp,nyp,nzp,nproc = None,None,None,None # Broadcast the data self.proc_grid_ext = self.comm.bcast(proc_grid_ext,root=0) self.proc_grid = self.comm.bcast(proc_grid,root=0) self.nxp_ext,self.nyp_ext,\ self.nzp_ext,self.nproc_ext = self.comm.bcast((nxp_ext,nyp_ext,nzp_ext,nproc_ext),root=0) self.nxp,self.nyp,\ self.nzp,self.nproc = self.comm.bcast((nxp,nyp,nzp,nproc),root=0) # Get position in processor grid self.ip,self.jp,self.kp = self.position_from_rank(self.rank,external=False) # Determine local grid indices and size for key in self.proc_grid: nxl = self.proc_grid[key][1][self.ip]-self.proc_grid[key][0][self.ip]+1 nyl = self.proc_grid[key][3][self.jp]-self.proc_grid[key][2][self.jp]+1 nzl = self.proc_grid[key][5][self.kp]-self.proc_grid[key][4][self.kp]+1 # Verify that local grid size is not smaller than ghost cell size assert (nxl>=self.nghx and nyl>=self.nghy and nzl>=self.nghz),\ "Local grid size must be greater than number "\ "of ghost cells in each direction!" # Initialize neighbor array nghbr = np.empty((3,3,3),dtype='int') # wrap-around x ipl = self.ip-1 if ipl<0: if self.xperiodic: ipl = self.nxp-1 else: ipl = -1 ipr = self.ip+1 if ipr>self.nxp-1: if self.xperiodic: ipr = 0 else: ipr = -1 inghbr = (ipl,self.ip,ipr) # wrap-around y jpl = self.jp-1 if jpl<0: if self.yperiodic: jpl = self.nyp-1 else: jpl = -1 jpr = self.jp+1 if jpr>self.nyp-1: if self.yperiodic: jpr = 0 else: jpr = -1 jnghbr = (jpl,self.jp,jpr) # wrap-around z kpl = self.kp-1 if kpl<0: if self.zperiodic: kpl = self.nzp-1 else: kpl = -1 kpr = self.kp+1 if kpr>self.nzp-1: if self.zperiodic: kpr = 0 else: kpr = -1 knghbr = (kpl,self.kp,kpr) # Construct array of neighbors for ip in range(3): for jp in range(3): for kp in range(3): # Assign rank to neighbor array if inghbr[ip]<0 or jnghbr[jp]<0 or knghbr[kp]<0: nghbr[ip,jp,kp] = -1 else: nghbr[ip,jp,kp] = self.rank_from_position(inghbr[ip],jnghbr[jp],knghbr[kp],external=False) # Save neighbors as class variable self.nghbr = nghbr def load_field(self,key,io_limit=None,barrier=False): '''Loads the required chunks from file''' from .field import Field3d import numpy as np # Block execution of some processors if IO is limited self._baton_wait(io_limit) # Determine which chunks are to be loaded by the current processor ip_beg_ext = self.ip*self.chunks_per_proc[0] jp_beg_ext = self.jp*self.chunks_per_proc[1] kp_beg_ext = self.kp*self.chunks_per_proc[2] ip_end_ext = ip_beg_ext+self.chunks_per_proc[0]-1 jp_end_ext = jp_beg_ext+self.chunks_per_proc[1]-1 kp_end_ext = kp_beg_ext+self.chunks_per_proc[2]-1 # Get the total size of the field to be loaded ib = self.proc_grid_ext[key][0][ip_beg_ext] ie = self.proc_grid_ext[key][1][ip_end_ext] jb = self.proc_grid_ext[key][2][jp_beg_ext] je = self.proc_grid_ext[key][3][jp_end_ext] kb = self.proc_grid_ext[key][4][kp_beg_ext] ke = self.proc_grid_ext[key][5][kp_end_ext] nxl = ie-ib+1 nyl = je-jb+1 nzl = ke-kb+1 # Allocate an array to hold the entire field data = np.empty( (nxl+2*self.nghx, nyl+2*self.nghy, nzl+2*self.nghz),dtype=self.precision) # Compute origin of subfield origin = (self.origin[key][0]+(ib-1-self.nghx)*self.spacing[0], self.origin[key][1]+(jb-1-self.nghy)*self.spacing[1], self.origin[key][2]+(kb-1-self.nghz)*self.spacing[2]) # Create a Field3d self.field[key] = Field3d(data,origin,self.spacing) # Go through each chunk to be read and construct the field for ip_ext in range(ip_beg_ext,ip_end_ext+1): for jp_ext in range(jp_beg_ext,jp_end_ext+1): for kp_ext in range(kp_beg_ext,kp_end_ext+1): # Determine rank of the chunk to be read rank_ext = self.rank_from_position(ip_ext,jp_ext,kp_ext,external=True) # Compute bounds of this chunk ib_ext = self.proc_grid_ext[key][0][ip_ext] ie_ext = self.proc_grid_ext[key][1][ip_ext] jb_ext = self.proc_grid_ext[key][2][jp_ext] je_ext = self.proc_grid_ext[key][3][jp_ext] kb_ext = self.proc_grid_ext[key][4][kp_ext] ke_ext = self.proc_grid_ext[key][5][kp_ext] nxl_ext = ie_ext-ib_ext+1 nyl_ext = je_ext-jb_ext+1 nzl_ext = ke_ext-kb_ext+1 # Read data and insert it subfield = self.func_load(rank_ext,key) self.field[key].insert_subfield(subfield) # Continue execution of waiting processors if IO was limited self._baton_pass(io_limit) # Exchange ghost cells self.exchange_ghost_cells(key) # Initialize symmetries and impose BC self.init_symmetries(key) self.impose_boundary_conditions(key) # Syncronize processes if requested if barrier: self.comm.Barrier() def differentiate(self,key,axis,key_out=None): assert axis<3, "'axis' must be one of 0,1,2." if key_out is None: key_out = key+('x','y','z')[axis] origin = list(self.origin) shifting_state = self.shifting_state(key,axis=axis) if shifting_state==-1: padding = 'after' origin[axis] += 0.5*self.spacing[axis] elif shifting_state==0: padding = 'before' origin[axis] -= 0.5*self.spacing[axis] elif shifting_state==1: padding = 'before' origin[axis] -= 0.5*self.spacing[axis] else: raise ValueError("Invalid shifting state.") self.field[key_out] = self.field[key].derivative(axis,padding=padding) self.origin[key_out] = tuple(origin) self.spacing[key_out] = self.spacing[key].copy() self.symmetries[key_out] = self.symmetries[key].copy() self.proc_grid[key_out] = self.proc_grid[key].copy() self.proc_grid_ext[key_out] = self.proc_grid_ext[key].copy() if axis==0: self.symmetries[key_out][0,1,1] = -self.symmetries[key_out][0,1,1] self.symmetries[key_out][2,1,1] = -self.symmetries[key_out][2,1,1] elif axis==1: self.symmetries[key_out][1,0,1] = -self.symmetries[key_out][1,0,1] self.symmetries[key_out][1,2,1] = -self.symmetries[key_out][1,2,1] elif axis==2: self.symmetries[key_out][1,1,0] = -self.symmetries[key_out][1,1,0] self.symmetries[key_out][1,1,2] = -self.symmetries[key_out][1,1,2] # Exchange ghost cells and set boundary conditions self.exchange_ghost_cells(key_out) self.impose_boundary_conditions(key_out) def gaussian_filter(self,key,sigma,truncate=4.0,key_out=None,iterate=False): '''Applies a gaussian filter to a field as in-place operation. Sigma is the std of the filter in terms of grid width.''' import numpy as np if key_out is None: key_out = key else: self.origin[key_out] = self.origin[key].copy() self.spacing[key_out] = self.spacing[key].copy() # Compute radius of Gaussian filter radius = self.field[key].gaussian_filter_radius(sigma,truncate=truncate) if not iterate: # Assert that we have sufficient amount of ghost cells assert all([self.num_ghost[ii]>=radius[ii] for ii in range(3)]),\ "Too few ghost cells for stencil: {}, {}".format(self.num_ghost,radius) niter = 1 else: # Determine number of iterations required for current ghost cell setup sigma = np.array(sigma) niter = 1 while not all([self.num_ghost[ii]>=radius[ii] for ii in range(3)]): sigma /= np.sqrt(2) niter *= 2 radius = self.field[key].gaussian_filter_radius(sigma,truncate=truncate) assert all([radius[ii]>0 if sigma[ii]>0.0 else True for ii in range(3)]),\ "Iterative procedure leads to invalid stencil radius: "\ "increase number of ghost cells. {}".format(radius) print('Iterations: {}, stencil radius: {}'.format(niter,radius)) for iiter in range(niter): # Filter field: if key_out is None, perform operation inplace self.field[key_out] = self.field[key].gaussian_filter(sigma, truncate=truncate,border='constant',const_val=0.0) # Exchange ghost cells and set boundary conditions self.exchange_ghost_cells(key_out) self.impose_boundary_conditions(key_out) # Iterate inplace from now on key = key_out def broadcast(self,key,arg,operation): '''Broadcasts an inplace operation involving a scalar or matrix on the entire grid. If 'arg' is a matrix, it must be three-dimensional and its axes must be singular or of length nx/ny/nz.''' import numpy as np import operator if operation in ('add','+'): op = operator.iadd elif operation in ('subtract','sub','-'): op = operator.isub elif operation in ('divide','div','/'): op = operator.itruediv elif operation in ('multiply','mul','*'): op = operator.imul elif operation in ('power','pow','^','**'): op = operator.ipow else: raise ValueError("Invalid operation: {}".format(operation)) if isinstance(arg,np.ndarray): sl_arg = 3*[slice(None)] for axis in range(3): if arg.shape[axis]==1: continue elif arg.shape[axis]==self.numpoints(key,axis=axis): pos = self.position_from_rank(self.rank,external=False)[axis] sl_arg[axis] = slice( self.proc_grid[key][2*axis][pos]-1, self.proc_grid[key][2*axis+1][pos]) else: raise ValueError("'arg' must either be singular or match global "\ "grid dimension. (axis={}: got {:d}, expected {:d}".format( axis,arg.shape[axis],self.numpoints(key,axis=axis))) # Only operate on interior and communcate ghosts later sl_int = tuple(slice(self.num_ghost[ii],-self.num_ghost[ii]) for ii in range(3)) sl_arg = tuple(sl_arg) op(self.field[key].data[sl_int],arg[sl_arg]) # Exchange ghost cells and set boundary conditions self.exchange_ghost_cells(key) self.impose_boundary_conditions(key) elif isinstance(arg,(int,float)): op(self.field[key].data,arg) return def integrate(self,key,integrate_axis,ufunc=None,ignore_nan=False,average=False): '''Computes a global integral or average along a given axis applying the function 'ufunc' to each node. The result is returned by rank 0, all other ranks return None.''' from mpi4py import MPI import numpy as np if integrate_axis is None: integrate_axis = tuple(self.periodicity) # Compute local integral, but get weights separately idx_origin = tuple(self.num_ghost[ii]-1 for ii in range(3)) (integ_local,weights_local) = self.field[key].extract_subfield( idx_origin,self.chunk_size(key,axis=None)).integral( integrate_axis,ufunc=ufunc,ignore_nan=ignore_nan, average=average,return_weights=True) # Make sure weights_local is a numpy array weights_local = np.array(weights_local) # Simple implementation: all processor communicate directly to root if self.rank==0: dim_final = tuple(1 if integrate_axis[axis] else self.numpoints(key,axis=axis) for axis in range(3)) # Allocate a nice spot of memory integ = np.zeros(dim_final,dtype=integ_local.dtype) if average: weights = np.zeros(dim_final,dtype=weights_local.dtype) # Receive from peers for rank_src in range(0,self.nproc): # Determine target array position pos = self.position_from_rank(rank_src,external=False) sl = 3*[slice(None)] for axis in range(3): if integrate_axis[axis]: sl[axis] = slice(0,1) else: sl[axis] = slice( self.proc_grid[key][2*axis][pos[axis]]-1, self.proc_grid[key][2*axis+1][pos[axis]]) # Receive data and put it in a good spot if rank_src==0: integ[sl] += integ_local if average: weights[sl] += weights_local else: integ[sl] += self.comm.recv(source=rank_src,tag=1) if average: weights[sl] += self.comm.recv(source=rank_src,tag=2) if average: return integ/weights else: return integ/weights_local else: self.comm.send(integ_local,dest=0,tag=1) if average: self.comm.send(weights_local,dest=0,tag=2) return None def vtk_contour(self,key,val): '''Compute isocontour for chunks.''' if any([self.num_ghost[ii]>1 for ii in range(3)]): idx_origin = tuple(self.num_ghost[ii]-1 for ii in range(3)) numpoints = self.chunk_size(axis=None) return self.field[key].extract_subfield( idx_origin,numpoints).vtk_contour(val) else: return self.field[key].vtk_contour(val) return def vtk_slice(self,key,normal,origin): '''Extracts a plane from field.''' if any([self.num_ghost[ii]>1 for ii in range(3)]): idx_origin = tuple(self.num_ghost[ii]-1 for ii in range(3)) numpoints = tuple(self.field[key].numpoints[ii]-2*(self.num_ghost[ii]-1) for ii in range(3)) return self.field[key].extract_subfield( idx_origin,numpoints).vtk_slice(normal,origin) else: return self.field[key].vtk_slice(normal,origin) return def rank_from_position(self,ip,jp,kp,external=False): if external: nyp,nzp = self.nyp_ext,self.nzp_ext else: nyp,nzp = self.nyp,self.nzp return ip*nyp*nzp+jp*nzp+kp def position_from_rank(self,rank,external=False): if external: nyp,nzp = self.nyp_ext,self.nzp_ext else: nyp,nzp = self.nyp,self.nzp ip = rank//(nyp*nzp) jp = (rank//nzp)%nyp kp = rank%nzp return (ip,jp,kp) def shifting_state(self,key,axis=None): if axis is None: return tuple(self.shifting_state(key,axis=ii) for ii in range(3)) assert axis<3, "'axis' must be one of 0,1,2." return int(round((self.origin[key][axis]-self.bounds[2*axis])/(0.5*self.spacing[axis]))) def chunk_size(self,key,axis=None): '''Returns size of chunk without ghost cells.''' if axis is None: return tuple(self.chunk_size(key,axis=ii) for ii in range(3)) assert axis<3, "'axis' must be one of 0,1,2." return self.field[key].numpoints[axis]-2*self.num_ghost[axis] def numpoints(self,key,axis=None): '''Returns the total number of gridpoints across all processors without ghost cells.''' if axis is None: return tuple(self.numpoints(key,axis=ii) for ii in range(3)) assert axis<3, "'axis' must be one of 0,1,2." return self.proc_grid[key][2*axis+1][-1] def exchange_ghost_cells(self,key): '''Communicates all ghost cells of specified field''' # Trigger non-blocking communication: # Communicate faces (6 faces) self._communicate_ghost_cells(key,(-1,0,0)) # left self._communicate_ghost_cells(key,(+1,0,0)) # right self._communicate_ghost_cells(key,(0,-1,0)) # down self._communicate_ghost_cells(key,(0,+1,0)) # up self._communicate_ghost_cells(key,(0,0,-1)) # front self._communicate_ghost_cells(key,(0,0,+1)) # back # Communicate edges (12 edges) self._communicate_ghost_cells(key,(-1,-1,0)) # left,down self._communicate_ghost_cells(key,(-1,0,-1)) # left,front self._communicate_ghost_cells(key,(-1,+1,0)) # left,up self._communicate_ghost_cells(key,(-1,0,+1)) # left,back self._communicate_ghost_cells(key,(+1,-1,0)) # right,down self._communicate_ghost_cells(key,(+1,0,-1)) # right,front self._communicate_ghost_cells(key,(+1,+1,0)) # right,up self._communicate_ghost_cells(key,(+1,0,+1)) # right,back self._communicate_ghost_cells(key,(0,-1,-1)) # down,front self._communicate_ghost_cells(key,(0,-1,+1)) # down,back self._communicate_ghost_cells(key,(0,+1,-1)) # up,front self._communicate_ghost_cells(key,(0,+1,+1)) # up,back # Communicate corners (8 corners) self._communicate_ghost_cells(key,(-1,-1,-1)) # left,down,front self._communicate_ghost_cells(key,(-1,-1,+1)) # left,down,back self._communicate_ghost_cells(key,(-1,+1,-1)) # left,up,front self._communicate_ghost_cells(key,(-1,+1,+1)) # left,up,back self._communicate_ghost_cells(key,(+1,-1,-1)) # right,down,front self._communicate_ghost_cells(key,(+1,-1,+1)) # right,down,back self._communicate_ghost_cells(key,(+1,+1,-1)) # right,up,front self._communicate_ghost_cells(key,(+1,+1,+1)) # right,up,back def impose_boundary_conditions(self,key): '''Imposes symmetry boundary conditions on each non-periodic wall''' self._symmetrize_wall(key,(-1,0,0)) self._symmetrize_wall(key,(+1,0,0)) self._symmetrize_wall(key,(0,-1,0)) self._symmetrize_wall(key,(0,+1,0)) self._symmetrize_wall(key,(0,0,-1)) self._symmetrize_wall(key,(0,0,+1)) def _communicate_ghost_cells(self,key,positionDst): '''Triggers communication of ghost cells''' import numpy as np assert np.max(positionDst)<=1 and np.min(positionDst)>=-1, "communicate_ghost_cells: "\ "invalid neighbor position {}".format(positionDst) # [send/recv] get the rank of the neighbor where data is to be sent to # The position is passed as values -1,0,+1, but need to be converted to array indices ip_dst = positionDst[0]+1 jp_dst = positionDst[1]+1 kp_dst = positionDst[2]+1 ip_src = -positionDst[0]+1 jp_src = -positionDst[1]+1 kp_src = -positionDst[2]+1 rank_dst = self.nghbr[ip_dst,jp_dst,kp_dst] rank_src = self.nghbr[ip_src,jp_src,kp_src] # [send/recv] create a tag tag = ip_dst*100+jp_dst*10+kp_dst # [send/recv] get the indices of data to be sent/received nxl,nyl,nzl = self.chunk_size(key) if positionDst[0]==-1: ii_src = slice(self.nghx,2*self.nghx) ii_dst = slice(self.nghx+nxl,2*self.nghx+nxl) elif positionDst[0]==0: ii_src = slice(self.nghx,self.nghx+nxl) ii_dst = slice(self.nghx,self.nghx+nxl) elif positionDst[0]==1: ii_src = slice(nxl,nxl+self.nghx) ii_dst = slice(0,self.nghx) else: raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[0])) if positionDst[1]==-1: jj_src = slice(self.nghy,2*self.nghy) jj_dst = slice(self.nghy+nyl,2*self.nghy+nyl) elif positionDst[1]==0: jj_src = slice(self.nghy,self.nghy+nyl) jj_dst = slice(self.nghy,self.nghy+nyl) elif positionDst[1]==1: jj_src = slice(nyl,nyl+self.nghy) jj_dst = slice(0,self.nghy) else: raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[1])) if positionDst[2]==-1: kk_src = slice(self.nghz,2*self.nghz) kk_dst = slice(self.nghz+nzl,2*self.nghz+nzl) elif positionDst[2]==0: kk_src = slice(self.nghz,self.nghz+nzl) kk_dst = slice(self.nghz,self.nghz+nzl) elif positionDst[2]==1: kk_src = slice(nzl,nzl+self.nghz) kk_dst = slice(0,self.nghz) else: raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[2])) # [send/recv] Create a list for requests reqsend = None reqrecv = None # [send] now send the data to the neighbor, but only if there is one! # Communication must be done non-blocking, and we can use upper-case routines since this is a numpy array if rank_dst>=0: sendbuf = np.ascontiguousarray(self.field[key].data[ii_src,jj_src,kk_src]) reqsend = self.comm.Isend(sendbuf,dest=rank_dst,tag=tag) # [recv] the corresponding receive: results are stored in a buffer which will be assigned to the parent array later if rank_src>=0: recvbuf = np.zeros( (ii_dst.stop-ii_dst.start, jj_dst.stop-jj_dst.start, kk_dst.stop-kk_dst.start),dtype=self.precision) reqrecv = self.comm.Irecv(recvbuf,source=rank_src,tag=tag) # [recv] wait for data to be received if reqrecv is not None: reqrecv.wait() self.field[key].data[ii_dst,jj_dst,kk_dst] = recvbuf # [send] wait for data to be sent if reqsend is not None: reqsend.wait() def _symmetrize_wall(self,key,positionBd): import numpy as np assert np.max(positionBd)<=1 and np.min(positionBd)>=-1, "symmetrize_wall: invalid neighbor "\ "position {}".format(positionBd) assert np.count_nonzero(positionBd)==1, "symmetrize_wall: only face direction is accepted "\ "(no edges or corners)" inghbr = positionBd[0]+1 jnghbr = positionBd[1]+1 knghbr = positionBd[2]+1 if self.nghbr[inghbr,jnghbr,knghbr]<0: sl_dst = [slice(None),slice(None),slice(None)] sl_src = [slice(None),slice(None),slice(None)] for axis in range(3): if positionBd[axis]==-1: # lower boundary # index of first point within the domain idx = self.field[key].nearest_gridpoint(self.bounds[2*axis],axis=axis,lower=True)+1 # distance first point to wall: should be either 0 or dx/2 dist = np.abs(self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis]) if dist>=0.25*self.spacing[axis]: # no point on boundary sl_dst[axis] = slice(0,idx,1) sl_src[axis] = slice(2*idx-1,idx-1,-1) else: # point on boundary sl_dst[axis] = slice(0,idx,1) sl_src[axis] = slice(2*idx,idx,-1) break elif positionBd[axis]==1: # upper boundary # index of last point within the domain idx = self.field[key].nearest_gridpoint(self.bounds[2*axis+1],axis=axis,lower=True) # distance last point to wall: should be either 0 or -dx/2 dist = np.abs(self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis+1]) if dist>=0.25*self.spacing[axis]: # no point on boundary sl_dst[axis] = slice(idx+1,self.field[key].numpoints[axis],1) sl_src[axis] = slice(idx,2*idx+1-self.field[key].numpoints[axis],-1) else: # point on boundary sl_dst[axis] = slice(idx+1,self.field[key].numpoints[axis],1) sl_src[axis] = slice(idx-1,2*idx-self.field[key].numpoints[axis],-1) break self.field[key].data[tuple(sl_dst)] = self.symmetries[key][inghbr,jnghbr,knghbr]*\ self.field[key].data[tuple(sl_src)] def _baton_wait(self,batch_size,tag=420): '''Block execution until an empty message from rank-batch_wait is received (issued by _baton_pass)''' from mpi4py import MPI if batch_size is not None: if self.rank>=batch_size: source = self.rank-batch_size self.comm.recv(source=source,tag=tag) def _baton_pass(self,batch_size,tag=420): '''Sends an empty message to rank+batch_wait to unblock its execution (issued by _baton_wait)''' from mpi4py import MPI if batch_size is not None: dest = self.rank+batch_size if dest=self.size: if self.barrier: self.comm.Barrier() raise StopIteration if self.rank==self.root: if self.rank==self.iter: r = self.data else: r = self.comm.recv(source=self.iter,tag=0) else: if self.rank==self.iter: self.comm.send(self.data,dest=self.root,tag=0) r = None self.iter += 1 return r def parprint(*args, **kwargs): from mpi4py import MPI rank = kwargs.pop('rank') if 'rank' in kwargs else 0 comm = kwargs.pop('comm') if 'comm' in kwargs else MPI.COMM_WORLD if get_rank(comm=comm)==rank: print(*args, **kwargs) def get_rank(comm=None): from mpi4py import MPI comm = MPI.COMM_WORLD if comm is None else comm return comm.Get_rank() def is_root(comm=None,root=0): from mpi4py import MPI comm = MPI.COMM_WORLD if comm is None else comm return comm.Get_rank()==root def gather(data,comm=None): from mpi4py import MPI comm = MPI.COMM_WORLD if comm is None else comm return comm.gather(data,root=0)