class PPP: """Parallel Python Postprocessor for suspend""" def __init__(self,comm,func_load,num_ghost,precision,origin,spacing,periodicity,bounds,proc,chunks_per_proc): '''Constructor: except for comm, only rank 0 needs initialized data.''' self.comm = comm self.rank = comm.Get_rank() self.func_load = func_load self.init_settings(num_ghost,precision) self.init_domain(origin,spacing,periodicity,bounds) self.init_procgrid(proc,chunks_per_proc) self.field = {} self.symmetries = {} return @classmethod def from_snapshot(cls,snap,chunks_per_proc=(1,1,1),num_ghost=(1,1,1),precision='float64'): from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() if rank==0: proc = snap.procgrid() origin = snap.origin() spacing = snap.spacing() periodicity = snap.periodicity() bounds = snap.bounds() else: proc = None origin = None spacing = None periodicity = None bounds = None func_load = snap.field_chunk return cls(comm,func_load,num_ghost,precision,origin,spacing,periodicity,bounds,proc,chunks_per_proc) def init_settings(self,num_ghost,precision): '''Initializes PPP settings for all processors.''' # TBD: some assertions self.num_ghost = self.comm.bcast(num_ghost,root=0) self.precision = self.comm.bcast(precision,root=0) # Some shortcuts self.nghx,self.nghy,self.nghz = self.num_ghost def init_symmetries(self,key,mirror=(True,True)): '''Sets the symmetries for ghost cells behind the wall''' # Example: wall in y # No-slip boundary (no mirror) free-slip boundary (mirror) # u -> -u u -> u # v -> -v v -> -v # w -> -w w -> w # p -> p p -> p import numpy as np self.symmetries[key] = np.zeros((3,3,3),dtype='i') if not self.xperiodic: if key=='u': self.symmetries[key][0,1,1] = -1 self.symmetries[key][2,1,1] = -1 else: self.symmetries[key][0,1,1] = 1 if mirror[0] else -1 self.symmetries[key][2,1,1] = 1 if mirror[1] else -1 if not self.yperiodic: if key=='v': self.symmetries[key][1,0,1] = -1 self.symmetries[key][1,2,1] = -1 else: self.symmetries[key][1,0,1] = 1 if mirror[0] else -1 self.symmetries[key][1,2,1] = 1 if mirror[1] else -1 if not self.zperiodic: if key=='w': self.symmetries[key][1,1,0] = -1 self.symmetries[key][1,1,2] = -1 else: self.symmetries[key][1,1,0] = 1 if mirror[0] else -1 self.symmetries[key][1,1,2] = 1 if mirror[1] else -1 def init_domain(self,origin,spacing,periodicity,bounds): '''Sets up domain information for all processors''' # TBD: some assertions self.origin = self.comm.bcast(origin,root=0) self.spacing = self.comm.bcast(spacing,root=0) self.periodicity = self.comm.bcast(periodicity,root=0) self.bounds = self.comm.bcast(bounds,root=0) # Some shortcuts self.xperiodic,self.yperiodic,self.zperiodic = self.periodicity return def init_procgrid(self,proc,chunks_per_proc): # Note: requires nghx, xperiodic to be set '''Read input processor grid, compute processor grid for workers''' import numpy as np self.chunks_per_proc = self.comm.bcast(chunks_per_proc,root=0) self.nxcpp = chunks_per_proc[0] self.nycpp = chunks_per_proc[1] self.nzcpp = chunks_per_proc[2] if self.rank==0: # Assert proc and add it to class assert all(k in proc for k in ('u','v','w','p','s')), "'proc' must be a dictionary with "\ "keys 'u','v','w','p','s'" for key in proc: assert len(proc[key])==6, "Entries of 'proc' must have length of 6." proc_grid_ext = proc # Initialize chunks per processor # Verify that this processor grid can be redistributed onto the requested processor layout nxp_ext = len(proc_grid_ext['u'][0]) nyp_ext = len(proc_grid_ext['u'][2]) nzp_ext = len(proc_grid_ext['u'][4]) nproc_ext = nxp_ext*nyp_ext*nzp_ext # assert nxp_ext%self.nxcpp==0, "Number of processors must be divisible by the number "\ "of processors per process. (nxp_ext={}, nxcpp={})".format( nxp_ext,self.nxcpp) assert nyp_ext%self.nycpp==0, "Number of processors must be divisible by the number "\ "of processors per process. (nyp_ext={}, nycpp={})".format( nyp_ext,self.nycpp) assert nzp_ext%self.nzcpp==0, "Number of processors must be divisible by the number "\ "of processors per process. (nzp_ext={}, nzcpp={})".format( nzp_ext,self.nzcpp) # Determine the new processor layout and verify total number of MPI processes nxp = nxp_ext//self.nxcpp nyp = nyp_ext//self.nycpp nzp = nzp_ext//self.nzcpp nproc = nxp*nyp*nzp assert nproc==self.comm.Get_size(), "Number of MPI processes does not match the requested "\ "processor layout. (MPI procs: {}, required procs: {})".format( self.comm.Get_size(),nproc) # Construct internal processor grid proc_grid = {} for key in proc_grid_ext: proc_grid[key] = [None]*6 proc_grid[key][0] = proc_grid_ext[key][0][0::self.nxcpp] proc_grid[key][1] = proc_grid_ext[key][1][self.nxcpp-1::self.nxcpp] proc_grid[key][2] = proc_grid_ext[key][2][0::self.nycpp] proc_grid[key][3] = proc_grid_ext[key][3][self.nycpp-1::self.nycpp] proc_grid[key][4] = proc_grid_ext[key][4][0::self.nzcpp] proc_grid[key][5] = proc_grid_ext[key][5][self.nzcpp-1::self.nzcpp] else: proc_grid_ext = None proc_grid = None nxp_ext,nyp_ext,nzp_ext,nproc_ext = None,None,None,None nxp,nyp,nzp,nproc = None,None,None,None # Broadcast the data self.proc_grid_ext = self.comm.bcast(proc_grid_ext,root=0) self.proc_grid = self.comm.bcast(proc_grid,root=0) self.nxp_ext,self.nyp_ext,\ self.nzp_ext,self.nproc_ext = self.comm.bcast((nxp_ext,nyp_ext,nzp_ext,nproc_ext),root=0) self.nxp,self.nyp,\ self.nzp,self.nproc = self.comm.bcast((nxp,nyp,nzp,nproc),root=0) # Get position in processor grid self.ip,self.jp,self.kp = self.position_from_rank(self.rank,external=False) # Determine local grid indices and size self.chunk_bounds = {} self.chunk_size = {} for key in self.proc_grid: self.chunk_bounds[key] = [None]*6 self.chunk_bounds[key][0] = self.proc_grid[key][0][self.ip] self.chunk_bounds[key][1] = self.proc_grid[key][1][self.ip] self.chunk_bounds[key][2] = self.proc_grid[key][2][self.jp] self.chunk_bounds[key][3] = self.proc_grid[key][3][self.jp] self.chunk_bounds[key][4] = self.proc_grid[key][4][self.kp] self.chunk_bounds[key][5] = self.proc_grid[key][5][self.kp] self.chunk_size[key] = [None]*3 self.chunk_size[key][0] = self.chunk_bounds[key][1]-self.chunk_bounds[key][0]+1 self.chunk_size[key][1] = self.chunk_bounds[key][3]-self.chunk_bounds[key][2]+1 self.chunk_size[key][2] = self.chunk_bounds[key][5]-self.chunk_bounds[key][4]+1 # Verify that local grid size is not smaller than ghost cell size assert (self.chunk_size[key][0]>=self.nghx and self.chunk_size[key][1]>=self.nghy and self.chunk_size[key][2]>=self.nghz), "Local grid size must be greater than number "\ "of ghost cells in each direction!" # Initialize neighbor array nghbr = np.empty((3,3,3),dtype='int') # wrap-around x ipl = self.ip-1 if ipl<0: if self.xperiodic: ipl = self.nxp-1 else: ipl = -1 ipr = self.ip+1 if ipr>self.nxp-1: if self.xperiodic: ipr = 0 else: ipr = -1 inghbr = (ipl,self.ip,ipr) # wrap-around y jpl = self.jp-1 if jpl<0: if self.yperiodic: jpl = self.nyp-1 else: jpl = -1 jpr = self.jp+1 if jpr>self.nyp-1: if self.yperiodic: jpr = 0 else: jpr = -1 jnghbr = (jpl,self.jp,jpr) # wrap-around z kpl = self.kp-1 if kpl<0: if self.zperiodic: kpl = self.nzp-1 else: kpl = -1 kpr = self.kp+1 if kpr>self.nzp-1: if self.zperiodic: kpr = 0 else: kpr = -1 knghbr = (kpl,self.kp,kpr) # Construct array of neighbors for ip in range(3): for jp in range(3): for kp in range(3): # Assign rank to neighbor array if inghbr[ip]<0 or jnghbr[jp]<0 or knghbr[kp]<0: nghbr[ip,jp,kp] = -1 else: nghbr[ip,jp,kp] = self.rank_from_position(inghbr[ip],jnghbr[jp],knghbr[kp],external=False) # Save neighbors as class variable self.nghbr = nghbr def load_field(self,key,io_limit=None,barrier=False): '''Loads the required chunks from file''' from .field import Field3d import numpy as np # Block execution of some processors if IO is limited self._baton_wait(io_limit) # Determine which chunks are to be loaded by the current processor ip_beg_ext = self.ip*self.chunks_per_proc[0] jp_beg_ext = self.jp*self.chunks_per_proc[1] kp_beg_ext = self.kp*self.chunks_per_proc[2] ip_end_ext = ip_beg_ext+self.chunks_per_proc[0]-1 jp_end_ext = jp_beg_ext+self.chunks_per_proc[1]-1 kp_end_ext = kp_beg_ext+self.chunks_per_proc[2]-1 # Get the total size of the field to be loaded ib = self.proc_grid_ext[key][0][ip_beg_ext] ie = self.proc_grid_ext[key][1][ip_end_ext] jb = self.proc_grid_ext[key][2][jp_beg_ext] je = self.proc_grid_ext[key][3][jp_end_ext] kb = self.proc_grid_ext[key][4][kp_beg_ext] ke = self.proc_grid_ext[key][5][kp_end_ext] nxl = ie-ib+1 nyl = je-jb+1 nzl = ke-kb+1 # Allocate an array to hold the entire field data = np.empty( (nxl+2*self.nghx, nyl+2*self.nghy, nzl+2*self.nghz),dtype=self.precision) # Compute origin of subfield origin = (self.origin[key][0]+(ib-1-self.nghx)*self.spacing[0], self.origin[key][1]+(jb-1-self.nghy)*self.spacing[1], self.origin[key][2]+(kb-1-self.nghz)*self.spacing[2]) # Create a Field3d self.field[key] = Field3d(data,origin,self.spacing) # Go through each chunk to be read and construct the field for ip_ext in range(ip_beg_ext,ip_end_ext+1): for jp_ext in range(jp_beg_ext,jp_end_ext+1): for kp_ext in range(kp_beg_ext,kp_end_ext+1): # Determine rank of the chunk to be read rank_ext = self.rank_from_position(ip_ext,jp_ext,kp_ext,external=True) # Compute bounds of this chunk ib_ext = self.proc_grid_ext[key][0][ip_ext] ie_ext = self.proc_grid_ext[key][1][ip_ext] jb_ext = self.proc_grid_ext[key][2][jp_ext] je_ext = self.proc_grid_ext[key][3][jp_ext] kb_ext = self.proc_grid_ext[key][4][kp_ext] ke_ext = self.proc_grid_ext[key][5][kp_ext] nxl_ext = ie_ext-ib_ext+1 nyl_ext = je_ext-jb_ext+1 nzl_ext = ke_ext-kb_ext+1 # Read data and insert it subfield = self.func_load(rank_ext,key) self.field[key].insert_subfield(subfield) # Continue execution of waiting processors if IO was limited self._baton_pass(io_limit) # Exchange ghost cells self.exchange_ghost_cells(key) # Initialize symmetries and impose BC self.init_symmetries(key) self.impose_boundary_conditions(key) # Syncronize processes if requested if barrier: self.comm.Barrier() def differentiate(self,key,axis,key_out=None): assert axis<3, "'axis' must be one of 0,1,2." if key_out is None: key_out = key+('x','y','z')[axis] origin = list(self.origin) shifting_state = self.shifting_state(key,axis=axis) if shifting_state==-1: padding = 'after' origin[axis] += 0.5*self.spacing[axis] elif shifting_state==0: padding = 'before' origin[axis] -= 0.5*self.spacing[axis] elif shifting_state==1: padding = 'before' origin[axis] -= 0.5*self.spacing[axis] else: raise ValueError("Invalid shifting state.") self.field[key_out] = self.field[key].derivative(axis,padding=padding) self.origin[key_out] = tuple(origin) self.spacing[key_out] = self.spacing[key].copy() self.symmetries[key_out] = self.symmetries[key].copy() # TBD: copy everything field specific # TBD: make sure processor distribution is fine if axis==0: self.symmetries[key_out][0,1,1] = -self.symmetries[key_out][0,1,1] self.symmetries[key_out][2,1,1] = -self.symmetries[key_out][2,1,1] elif axis==1: self.symmetries[key_out][1,0,1] = -self.symmetries[key_out][1,0,1] self.symmetries[key_out][1,2,1] = -self.symmetries[key_out][1,2,1] elif axis==2: self.symmetries[key_out][1,1,0] = -self.symmetries[key_out][1,1,0] self.symmetries[key_out][1,1,2] = -self.symmetries[key_out][1,1,2] # Exchange ghost cells and set boundary conditions self.exchange_ghost_cells(key_out) self.impose_boundary_conditions(key_out) def gaussian_filter(self,key,sigma,truncate=4.0,key_out=None,iterate=False): '''Applies a gaussian filter to a field as in-place operation. Sigma is the std of the filter in terms of grid width.''' import numpy as np if key_out is None: key_out = key else: self.origin[key_out] = self.origin[key].copy() self.spacing[key_out] = self.spacing[key].copy() # Compute radius of Gaussian filter radius = self.field[key].gaussian_filter_radius(sigma,truncate=truncate) if not iterate: # Assert that we have sufficient amount of ghost cells assert all([self.num_ghost[ii]>=radius[ii] for ii in range(3)]),\ "Too few ghost cells for stencil: {}, {}".format(self.num_ghost,radius) niter = 1 else: # Determine number of iterations required for current ghost cell setup sigma = np.array(sigma) niter = 1 while not all([self.num_ghost[ii]>=radius[ii] for ii in range(3)]): sigma /= np.sqrt(2) niter *= 2 radius = self.field[key].gaussian_filter_radius(sigma,truncate=truncate) assert all([radius[ii]>0 if sigma[ii]>0.0 else True for ii in range(3)]),\ "Iterative procedure leads to invalid stencil radius: "\ "increase number of ghost cells. {}".format(radius) print('Iterations: {}, stencil radius: {}'.format(niter,radius)) for iiter in range(niter): # Filter field: if key_out is None, perform operation inplace self.field[key_out] = self.field[key].gaussian_filter(sigma, truncate=truncate,border='constant',const_val=0.0) # Exchange ghost cells and set boundary conditions self.exchange_ghost_cells(key_out) self.impose_boundary_conditions(key_out) # Iterate inplace from now on key = key_out def broadcast(self,key,arg,operation): '''Broadcasts an inplace operation involving a scalar or matrix on the entire grid. If 'arg' is a matrix, it must be three-dimensional and its axes must be singular or of length nx/ny/nz.''' import numpy as np import operator if operation in ('add','+'): op = operator.iadd elif operation in ('subtract','sub','-'): op = operator.isub elif operation in ('divide','div','/'): op = operator.itruediv elif operation in ('multiply','mul','*'): op = operator.imul elif operation in ('power','pow','^','**'): op = operator.ipow else: raise ValueError("Invalid operation: {}".format(operation)) if isinstance(arg,np.ndarray): sl_arg = 3*[slice(None)] for axis in range(3): if arg.shape[axis]==1: continue elif arg.shape[axis]==self.proc_grid[key][2*axis+1][-1]: pos = self.position_from_rank(self.rank,external=False)[axis] sl_arg[axis] = slice( self.proc_grid[key][2*axis][pos]-1, self.proc_grid[key][2*axis+1][pos]) else: raise ValueError("'arg' must either be singular or match global "\ "grid dimension. (axis={}: got {:d}, expected {:d}".format( axis,arg.shape[axis],self.proc_grid[key][2*axis+1][-1])) # Only operate on interior and communcate ghosts later sl_int = tuple(slice(self.num_ghost[ii],-self.num_ghost[ii]) for ii in range(3)) sl_arg = tuple(sl_arg) op(self.field[key].data[sl_int],arg[sl_arg]) # Exchange ghost cells and set boundary conditions self.exchange_ghost_cells(key) self.impose_boundary_conditions(key) elif isinstance(arg,(int,float)): op(self.field[key].data,arg) return def vtk_contour(self,key,val): '''Compute isocontour for chunks.''' if any([self.num_ghost[ii]>1 for ii in range(3)]): idx_origin = tuple(self.num_ghost[ii]-1 for ii in range(3)) numpoints = tuple(self.field[key].numpoints[ii]-2*(self.num_ghost[ii]-1) for ii in range(3)) return self.field[key].extract_subfield( idx_origin,numpoints).vtk_contour(val) else: return self.field[key].vtk_contour(val) return def vtk_slice(self,key,normal,origin): '''Extracts a plane from field.''' if any([self.num_ghost[ii]>1 for ii in range(3)]): idx_origin = tuple(self.num_ghost[ii]-1 for ii in range(3)) numpoints = tuple(self.field[key].numpoints[ii]-2*(self.num_ghost[ii]-1) for ii in range(3)) return self.field[key].extract_subfield( idx_origin,numpoints).vtk_slice(normal,origin) else: return self.field[key].vtk_slice(normal,origin) return def rank_from_position(self,ip,jp,kp,external=False): if external: nyp,nzp = self.nyp_ext,self.nzp_ext else: nyp,nzp = self.nyp,self.nzp return ip*nyp*nzp+jp*nzp+kp def position_from_rank(self,rank,external=False): if external: nyp,nzp = self.nyp_ext,self.nzp_ext else: nyp,nzp = self.nyp,self.nzp ip = rank//(nyp*nzp) jp = (rank//nzp)%nyp kp = rank%nzp return (ip,jp,kp) def shifting_state(self,key,axis=None): if axis is None: return tuple(self.shifting_state(key,axis=ii) for axis in range(3)) assert axis<3, "'axis' must be one of 0,1,2." return int(round((self.origin[key][axis]-self.bounds[2*axis])/(0.5*self.spacing[axis]))) def exchange_ghost_cells(self,key): '''Communicates all ghost cells of specified field''' # Trigger non-blocking communication: # Communicate faces (6 faces) self._communicate_ghost_cells(key,(-1,0,0)) # left self._communicate_ghost_cells(key,(+1,0,0)) # right self._communicate_ghost_cells(key,(0,-1,0)) # down self._communicate_ghost_cells(key,(0,+1,0)) # up self._communicate_ghost_cells(key,(0,0,-1)) # front self._communicate_ghost_cells(key,(0,0,+1)) # back # Communicate edges (12 edges) self._communicate_ghost_cells(key,(-1,-1,0)) # left,down self._communicate_ghost_cells(key,(-1,0,-1)) # left,front self._communicate_ghost_cells(key,(-1,+1,0)) # left,up self._communicate_ghost_cells(key,(-1,0,+1)) # left,back self._communicate_ghost_cells(key,(+1,-1,0)) # right,down self._communicate_ghost_cells(key,(+1,0,-1)) # right,front self._communicate_ghost_cells(key,(+1,+1,0)) # right,up self._communicate_ghost_cells(key,(+1,0,+1)) # right,back self._communicate_ghost_cells(key,(0,-1,-1)) # down,front self._communicate_ghost_cells(key,(0,-1,+1)) # down,back self._communicate_ghost_cells(key,(0,+1,-1)) # up,front self._communicate_ghost_cells(key,(0,+1,+1)) # up,back # Communicate corners (8 corners) self._communicate_ghost_cells(key,(-1,-1,-1)) # left,down,front self._communicate_ghost_cells(key,(-1,-1,+1)) # left,down,back self._communicate_ghost_cells(key,(-1,+1,-1)) # left,up,front self._communicate_ghost_cells(key,(-1,+1,+1)) # left,up,back self._communicate_ghost_cells(key,(+1,-1,-1)) # right,down,front self._communicate_ghost_cells(key,(+1,-1,+1)) # right,down,back self._communicate_ghost_cells(key,(+1,+1,-1)) # right,up,front self._communicate_ghost_cells(key,(+1,+1,+1)) # right,up,back def impose_boundary_conditions(self,key): '''Imposes symmetry boundary conditions on each non-periodic wall''' self._symmetrize_wall(key,(-1,0,0)) self._symmetrize_wall(key,(+1,0,0)) self._symmetrize_wall(key,(0,-1,0)) self._symmetrize_wall(key,(0,+1,0)) self._symmetrize_wall(key,(0,0,-1)) self._symmetrize_wall(key,(0,0,+1)) def _communicate_ghost_cells(self,key,positionDst): '''Triggers communication of ghost cells''' import numpy as np assert np.max(positionDst)<=1 and np.min(positionDst)>=-1, "communicate_ghost_cells: "\ "invalid neighbor position {}".format(positionDst) # [send/recv] get the rank of the neighbor where data is to be sent to # The position is passed as values -1,0,+1, but need to be converted to array indices ip_dst = positionDst[0]+1 jp_dst = positionDst[1]+1 kp_dst = positionDst[2]+1 ip_src = -positionDst[0]+1 jp_src = -positionDst[1]+1 kp_src = -positionDst[2]+1 rank_dst = self.nghbr[ip_dst,jp_dst,kp_dst] rank_src = self.nghbr[ip_src,jp_src,kp_src] # [send/recv] create a tag tag = ip_dst*100+jp_dst*10+kp_dst # [send/recv] get the indices of data to be sent/received nxl,nyl,nzl = self.chunk_size[key] if positionDst[0]==-1: ii_src = slice(self.nghx,2*self.nghx) ii_dst = slice(self.nghx+nxl,2*self.nghx+nxl) elif positionDst[0]==0: ii_src = slice(self.nghx,self.nghx+nxl) ii_dst = slice(self.nghx,self.nghx+nxl) elif positionDst[0]==1: ii_src = slice(nxl,nxl+self.nghx) ii_dst = slice(0,self.nghx) else: raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[0])) if positionDst[1]==-1: jj_src = slice(self.nghy,2*self.nghy) jj_dst = slice(self.nghy+nyl,2*self.nghy+nyl) elif positionDst[1]==0: jj_src = slice(self.nghy,self.nghy+nyl) jj_dst = slice(self.nghy,self.nghy+nyl) elif positionDst[1]==1: jj_src = slice(nyl,nyl+self.nghy) jj_dst = slice(0,self.nghy) else: raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[1])) if positionDst[2]==-1: kk_src = slice(self.nghz,2*self.nghz) kk_dst = slice(self.nghz+nzl,2*self.nghz+nzl) elif positionDst[2]==0: kk_src = slice(self.nghz,self.nghz+nzl) kk_dst = slice(self.nghz,self.nghz+nzl) elif positionDst[2]==1: kk_src = slice(nzl,nzl+self.nghz) kk_dst = slice(0,self.nghz) else: raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[2])) # [send/recv] Create a list for requests reqsend = None reqrecv = None # [send] now send the data to the neighbor, but only if there is one! # Communication must be done non-blocking, and we can use upper-case routines since this is a numpy array if rank_dst>=0: sendbuf = np.ascontiguousarray(self.field[key].data[ii_src,jj_src,kk_src]) reqsend = self.comm.Isend(sendbuf,dest=rank_dst,tag=tag) # [recv] the corresponding receive: results are stored in a buffer which will be assigned to the parent array later if rank_src>=0: recvbuf = np.zeros( (ii_dst.stop-ii_dst.start, jj_dst.stop-jj_dst.start, kk_dst.stop-kk_dst.start),dtype=self.precision) reqrecv = self.comm.Irecv(recvbuf,source=rank_src,tag=tag) # [recv] wait for data to be received if reqrecv is not None: reqrecv.wait() self.field[key].data[ii_dst,jj_dst,kk_dst] = recvbuf # [send] wait for data to be sent if reqsend is not None: reqsend.wait() def _symmetrize_wall(self,key,positionBd): import numpy as np assert np.max(positionBd)<=1 and np.min(positionBd)>=-1, "symmetrize_wall: invalid neighbor "\ "position {}".format(positionBd) assert np.count_nonzero(positionBd)==1, "symmetrize_wall: only face direction is accepted "\ "(no edges or corners)" inghbr = positionBd[0]+1 jnghbr = positionBd[1]+1 knghbr = positionBd[2]+1 if self.nghbr[inghbr,jnghbr,knghbr]<0: sl_dst = [slice(None),slice(None),slice(None)] sl_src = [slice(None),slice(None),slice(None)] for axis in range(3): if positionBd[axis]==-1: # lower boundary # index of first point within the domain idx = self.field[key].nearest_gridpoint(self.bounds[2*axis],axis=axis,lower=True)+1 # distance first point to wall: should be either 0 or dx/2 dist = np.abs(self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis]) if dist>=0.25*self.spacing[axis]: # no point on boundary sl_dst[axis] = slice(0,idx,1) sl_src[axis] = slice(2*idx-1,idx-1,-1) else: # point on boundary sl_dst[axis] = slice(0,idx,1) sl_src[axis] = slice(2*idx,idx,-1) break elif positionBd[axis]==1: # upper boundary # index of last point within the domain idx = self.field[key].nearest_gridpoint(self.bounds[2*axis+1],axis=axis,lower=True) # distance last point to wall: should be either 0 or -dx/2 dist = np.abs(self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis+1]) if dist>=0.25*self.spacing[axis]: # no point on boundary sl_dst[axis] = slice(idx+1,self.field[key].numpoints[axis],1) sl_src[axis] = slice(idx,2*idx+1-self.field[key].numpoints[axis],-1) else: # point on boundary sl_dst[axis] = slice(idx+1,self.field[key].numpoints[axis],1) sl_src[axis] = slice(idx-1,2*idx-self.field[key].numpoints[axis],-1) break self.field[key].data[tuple(sl_dst)] = self.symmetries[key][inghbr,jnghbr,knghbr]*\ self.field[key].data[tuple(sl_src)] def _baton_wait(self,batch_size,tag=420): '''Block execution until an empty message from rank-batch_wait is received (issued by _baton_pass)''' from mpi4py import MPI if batch_size is not None: if self.rank>=batch_size: source = self.rank-batch_size self.comm.recv(source=source,tag=tag) def _baton_pass(self,batch_size,tag=420): '''Sends an empty message to rank+batch_wait to unblock its execution (issued by _baton_wait)''' from mpi4py import MPI if batch_size is not None: dest = self.rank+batch_size if dest=self.size: if self.barrier: self.comm.Barrier() raise StopIteration if self.rank==self.root: if self.rank==self.iter: r = self.data else: r = self.comm.recv(source=self.iter,tag=0) else: if self.rank==self.iter: self.comm.send(self.data,dest=self.root,tag=0) r = None self.iter += 1 return r def parprint(*args, **kwargs): from mpi4py import MPI rank = kwargs.pop('rank') if 'rank' in kwargs else 0 comm = kwargs.pop('comm') if 'comm' in kwargs else MPI.COMM_WORLD if get_rank(comm=comm)==rank: print(*args, **kwargs) def get_rank(comm=None): from mpi4py import MPI comm = MPI.COMM_WORLD if comm is None else comm return comm.Get_rank() def is_root(comm=None,root=0): from mpi4py import MPI comm = MPI.COMM_WORLD if comm is None else comm return comm.Get_rank()==root def gather(data,comm=None): from mpi4py import MPI comm = MPI.COMM_WORLD if comm is None else comm return comm.gather(data,root=0)