suspendtools/parallel.py

624 lines
29 KiB
Python

class PPP:
"""Parallel Python Postprocessor for suspend"""
def __init__(self,comm,func_load,num_ghost,precision,origin,spacing,periodicity,bounds,proc,chunks_per_proc):
'''Constructor: except for comm, only rank 0 needs initialized data.'''
self.comm = comm
self.rank = comm.Get_rank()
self.func_load = func_load
self.init_settings(num_ghost,precision)
self.init_domain(origin,spacing,periodicity,bounds)
self.init_procgrid(proc,chunks_per_proc)
self.field = {}
self.symmetries = {}
return
@classmethod
def from_snapshot(cls,snap,chunks_per_proc=(1,1,1),num_ghost=(1,1,1),precision='float64'):
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank==0:
proc = snap.procgrid()
origin = snap.origin()
spacing = snap.spacing()
periodicity = snap.periodicity()
bounds = snap.bounds()
else:
proc = None
origin = None
spacing = None
periodicity = None
bounds = None
func_load = snap.field_chunk
return cls(comm,func_load,num_ghost,precision,origin,spacing,periodicity,bounds,proc,chunks_per_proc)
def init_settings(self,num_ghost,precision):
'''Initializes PPP settings for all processors.'''
# TBD: some assertions
self.num_ghost = self.comm.bcast(num_ghost,root=0)
self.precision = self.comm.bcast(precision,root=0)
# Some shortcuts
self.nghx,self.nghy,self.nghz = self.num_ghost
def init_symmetries(self,key,mirror=(True,True)):
'''Sets the symmetries for ghost cells behind the wall'''
# Example: wall in y
# No-slip boundary (no mirror) free-slip boundary (mirror)
# u -> -u u -> u
# v -> -v v -> -v
# w -> -w w -> w
# p -> p p -> p
import numpy as np
self.symmetries[key] = np.zeros((3,3,3),dtype='i')
if not self.xperiodic:
if key=='u':
self.symmetries[key][0,1,1] = -1
self.symmetries[key][2,1,1] = -1
else:
self.symmetries[key][0,1,1] = 1 if mirror[0] else -1
self.symmetries[key][2,1,1] = 1 if mirror[1] else -1
if not self.yperiodic:
if key=='v':
self.symmetries[key][1,0,1] = -1
self.symmetries[key][1,2,1] = -1
else:
self.symmetries[key][1,0,1] = 1 if mirror[0] else -1
self.symmetries[key][1,2,1] = 1 if mirror[1] else -1
if not self.zperiodic:
if key=='w':
self.symmetries[key][1,1,0] = -1
self.symmetries[key][1,1,2] = -1
else:
self.symmetries[key][1,1,0] = 1 if mirror[0] else -1
self.symmetries[key][1,1,2] = 1 if mirror[1] else -1
def init_domain(self,origin,spacing,periodicity,bounds):
'''Sets up domain information for all processors'''
# TBD: some assertions
self.origin = self.comm.bcast(origin,root=0)
self.spacing = self.comm.bcast(spacing,root=0)
self.periodicity = self.comm.bcast(periodicity,root=0)
self.bounds = self.comm.bcast(bounds,root=0)
# Some shortcuts
self.xperiodic,self.yperiodic,self.zperiodic = self.periodicity
return
def init_procgrid(self,proc,chunks_per_proc):
# Note: requires nghx, xperiodic to be set
'''Read input processor grid, compute processor grid for workers'''
import numpy as np
self.chunks_per_proc = self.comm.bcast(chunks_per_proc,root=0)
self.nxcpp = chunks_per_proc[0]
self.nycpp = chunks_per_proc[1]
self.nzcpp = chunks_per_proc[2]
if self.rank==0:
# Assert proc and add it to class
assert all(k in proc for k in ('u','v','w','p','s')), "'proc' must be a dictionary with "\
"keys 'u','v','w','p','s'"
for key in proc:
assert len(proc[key])==6, "Entries of 'proc' must have length of 6."
proc_grid_ext = proc
# Initialize chunks per processor
# Verify that this processor grid can be redistributed onto the requested processor layout
nxp_ext = len(proc_grid_ext['u'][0])
nyp_ext = len(proc_grid_ext['u'][2])
nzp_ext = len(proc_grid_ext['u'][4])
nproc_ext = nxp_ext*nyp_ext*nzp_ext
#
assert nxp_ext%self.nxcpp==0, "Number of processors must be divisible by the number "\
"of processors per process. (nxp_ext={}, nxcpp={})".format(
nxp_ext,self.nxcpp)
assert nyp_ext%self.nycpp==0, "Number of processors must be divisible by the number "\
"of processors per process. (nyp_ext={}, nycpp={})".format(
nyp_ext,self.nycpp)
assert nzp_ext%self.nzcpp==0, "Number of processors must be divisible by the number "\
"of processors per process. (nzp_ext={}, nzcpp={})".format(
nzp_ext,self.nzcpp)
# Determine the new processor layout and verify total number of MPI processes
nxp = nxp_ext//self.nxcpp
nyp = nyp_ext//self.nycpp
nzp = nzp_ext//self.nzcpp
nproc = nxp*nyp*nzp
assert nproc==self.comm.Get_size(), "Number of MPI processes does not match the requested "\
"processor layout. (MPI procs: {}, required procs: {})".format(
self.comm.Get_size(),nproc)
# Construct internal processor grid
proc_grid = {}
for key in proc_grid_ext:
proc_grid[key] = [None]*6
proc_grid[key][0] = proc_grid_ext[key][0][0::self.nxcpp]
proc_grid[key][1] = proc_grid_ext[key][1][self.nxcpp-1::self.nxcpp]
proc_grid[key][2] = proc_grid_ext[key][2][0::self.nycpp]
proc_grid[key][3] = proc_grid_ext[key][3][self.nycpp-1::self.nycpp]
proc_grid[key][4] = proc_grid_ext[key][4][0::self.nzcpp]
proc_grid[key][5] = proc_grid_ext[key][5][self.nzcpp-1::self.nzcpp]
else:
proc_grid_ext = None
proc_grid = None
nxp_ext,nyp_ext,nzp_ext,nproc_ext = None,None,None,None
nxp,nyp,nzp,nproc = None,None,None,None
# Broadcast the data
self.proc_grid_ext = self.comm.bcast(proc_grid_ext,root=0)
self.proc_grid = self.comm.bcast(proc_grid,root=0)
self.nxp_ext,self.nyp_ext,\
self.nzp_ext,self.nproc_ext = self.comm.bcast((nxp_ext,nyp_ext,nzp_ext,nproc_ext),root=0)
self.nxp,self.nyp,\
self.nzp,self.nproc = self.comm.bcast((nxp,nyp,nzp,nproc),root=0)
# Get position in processor grid
self.ip,self.jp,self.kp = self.position_from_rank(self.rank,external=False)
# Determine local grid indices and size
self.chunk_bounds = {}
self.chunk_size = {}
for key in self.proc_grid:
self.chunk_bounds[key] = [None]*6
self.chunk_bounds[key][0] = self.proc_grid[key][0][self.ip]
self.chunk_bounds[key][1] = self.proc_grid[key][1][self.ip]
self.chunk_bounds[key][2] = self.proc_grid[key][2][self.jp]
self.chunk_bounds[key][3] = self.proc_grid[key][3][self.jp]
self.chunk_bounds[key][4] = self.proc_grid[key][4][self.kp]
self.chunk_bounds[key][5] = self.proc_grid[key][5][self.kp]
self.chunk_size[key] = [None]*3
self.chunk_size[key][0] = self.chunk_bounds[key][1]-self.chunk_bounds[key][0]+1
self.chunk_size[key][1] = self.chunk_bounds[key][3]-self.chunk_bounds[key][2]+1
self.chunk_size[key][2] = self.chunk_bounds[key][5]-self.chunk_bounds[key][4]+1
# Verify that local grid size is not smaller than ghost cell size
assert (self.chunk_size[key][0]>=self.nghx and
self.chunk_size[key][1]>=self.nghy and
self.chunk_size[key][2]>=self.nghz), "Local grid size must be greater than number "\
"of ghost cells in each direction!"
# Initialize neighbor array
nghbr = np.empty((3,3,3),dtype='int')
# wrap-around x
ipl = self.ip-1
if ipl<0:
if self.xperiodic: ipl = self.nxp-1
else: ipl = -1
ipr = self.ip+1
if ipr>self.nxp-1:
if self.xperiodic: ipr = 0
else: ipr = -1
inghbr = (ipl,self.ip,ipr)
# wrap-around y
jpl = self.jp-1
if jpl<0:
if self.yperiodic: jpl = self.nyp-1
else: jpl = -1
jpr = self.jp+1
if jpr>self.nyp-1:
if self.yperiodic: jpr = 0
else: jpr = -1
jnghbr = (jpl,self.jp,jpr)
# wrap-around z
kpl = self.kp-1
if kpl<0:
if self.zperiodic: kpl = self.nzp-1
else: kpl = -1
kpr = self.kp+1
if kpr>self.nzp-1:
if self.zperiodic: kpr = 0
else: kpr = -1
knghbr = (kpl,self.kp,kpr)
# Construct array of neighbors
for ip in range(3):
for jp in range(3):
for kp in range(3):
# Assign rank to neighbor array
if inghbr[ip]<0 or jnghbr[jp]<0 or knghbr[kp]<0:
nghbr[ip,jp,kp] = -1
else:
nghbr[ip,jp,kp] = self.rank_from_position(inghbr[ip],jnghbr[jp],knghbr[kp],external=False)
# Save neighbors as class variable
self.nghbr = nghbr
def load_field(self,key,io_limit=None,barrier=False):
'''Loads the required chunks from file'''
from .field import Field3d
import numpy as np
# Block execution of some processors if IO is limited
self._baton_wait(io_limit)
# Determine which chunks are to be loaded by the current processor
ip_beg_ext = self.ip*self.chunks_per_proc[0]
jp_beg_ext = self.jp*self.chunks_per_proc[1]
kp_beg_ext = self.kp*self.chunks_per_proc[2]
ip_end_ext = ip_beg_ext+self.chunks_per_proc[0]-1
jp_end_ext = jp_beg_ext+self.chunks_per_proc[1]-1
kp_end_ext = kp_beg_ext+self.chunks_per_proc[2]-1
# Get the total size of the field to be loaded
ib = self.proc_grid_ext[key][0][ip_beg_ext]
ie = self.proc_grid_ext[key][1][ip_end_ext]
jb = self.proc_grid_ext[key][2][jp_beg_ext]
je = self.proc_grid_ext[key][3][jp_end_ext]
kb = self.proc_grid_ext[key][4][kp_beg_ext]
ke = self.proc_grid_ext[key][5][kp_end_ext]
nxl = ie-ib+1
nyl = je-jb+1
nzl = ke-kb+1
# Allocate an array to hold the entire field
data = np.empty(
(nxl+2*self.nghx,
nyl+2*self.nghy,
nzl+2*self.nghz),dtype=self.precision)
# Compute origin of subfield
origin = (self.origin[key][0]+(ib-1-self.nghx)*self.spacing[0],
self.origin[key][1]+(jb-1-self.nghy)*self.spacing[1],
self.origin[key][2]+(kb-1-self.nghz)*self.spacing[2])
# Create a Field3d
self.field[key] = Field3d(data,origin,self.spacing)
# Go through each chunk to be read and construct the field
for ip_ext in range(ip_beg_ext,ip_end_ext+1):
for jp_ext in range(jp_beg_ext,jp_end_ext+1):
for kp_ext in range(kp_beg_ext,kp_end_ext+1):
# Determine rank of the chunk to be read
rank_ext = self.rank_from_position(ip_ext,jp_ext,kp_ext,external=True)
# Compute bounds of this chunk
ib_ext = self.proc_grid_ext[key][0][ip_ext]
ie_ext = self.proc_grid_ext[key][1][ip_ext]
jb_ext = self.proc_grid_ext[key][2][jp_ext]
je_ext = self.proc_grid_ext[key][3][jp_ext]
kb_ext = self.proc_grid_ext[key][4][kp_ext]
ke_ext = self.proc_grid_ext[key][5][kp_ext]
nxl_ext = ie_ext-ib_ext+1
nyl_ext = je_ext-jb_ext+1
nzl_ext = ke_ext-kb_ext+1
# Read data and insert it
subfield = self.func_load(rank_ext,key)
self.field[key].insert_subfield(subfield)
# Continue execution of waiting processors if IO was limited
self._baton_pass(io_limit)
# Exchange ghost cells
self.exchange_ghost_cells(key)
# Initialize symmetries and impose BC
self.init_symmetries(key)
self.impose_boundary_conditions(key)
# Syncronize processes if requested
if barrier: self.comm.Barrier()
def differentiate(self,key,axis,key_out=None):
assert axis<3, "'axis' must be one of 0,1,2."
if key_out is None:
key_out = key+('x','y','z')[axis]
origin = list(self.origin)
shifting_state = self.shifting_state(key,axis=axis)
if shifting_state==-1:
padding = 'after'
origin[axis] += 0.5*self.spacing[axis]
elif shifting_state==0:
padding = 'before'
origin[axis] -= 0.5*self.spacing[axis]
elif shifting_state==1:
padding = 'before'
origin[axis] -= 0.5*self.spacing[axis]
else:
raise ValueError("Invalid shifting state.")
self.field[key_out] = self.field[key].derivative(axis,padding=padding)
self.origin[key_out] = tuple(origin)
self.spacing[key_out] = self.spacing[key].copy()
self.symmetries[key_out] = self.symmetries[key].copy()
# TBD: copy everything field specific
# TBD: make sure processor distribution is fine
if axis==0:
self.symmetries[key_out][0,1,1] = -self.symmetries[key_out][0,1,1]
self.symmetries[key_out][2,1,1] = -self.symmetries[key_out][2,1,1]
elif axis==1:
self.symmetries[key_out][1,0,1] = -self.symmetries[key_out][1,0,1]
self.symmetries[key_out][1,2,1] = -self.symmetries[key_out][1,2,1]
elif axis==2:
self.symmetries[key_out][1,1,0] = -self.symmetries[key_out][1,1,0]
self.symmetries[key_out][1,1,2] = -self.symmetries[key_out][1,1,2]
# Exchange ghost cells and set boundary conditions
self.exchange_ghost_cells(key_out)
self.impose_boundary_conditions(key_out)
def gaussian_filter(self,key,sigma,truncate=4.0,key_out=None,iterate=False):
'''Applies a gaussian filter to a field as in-place operation. Sigma is the std of the filter in terms of grid width.'''
import numpy as np
if key_out is None:
key_out = key
else:
self.origin[key_out] = self.origin[key].copy()
self.spacing[key_out] = self.spacing[key].copy()
# Compute radius of Gaussian filter
radius = self.field[key].gaussian_filter_radius(sigma,truncate=truncate)
if not iterate:
# Assert that we have sufficient amount of ghost cells
assert all([self.num_ghost[ii]>=radius[ii] for ii in range(3)]),\
"Too few ghost cells for stencil: {}, {}".format(self.num_ghost,radius)
niter = 1
else:
# Determine number of iterations required for current ghost cell setup
sigma = np.array(sigma)
niter = 1
while not all([self.num_ghost[ii]>=radius[ii] for ii in range(3)]):
sigma /= np.sqrt(2)
niter *= 2
radius = self.field[key].gaussian_filter_radius(sigma,truncate=truncate)
assert all([radius[ii]>0 if sigma[ii]>0.0 else True for ii in range(3)]),\
"Iterative procedure leads to invalid stencil radius: "\
"increase number of ghost cells. {}".format(radius)
print('Iterations: {}, stencil radius: {}'.format(niter,radius))
for iiter in range(niter):
# Filter field: if key_out is None, perform operation inplace
self.field[key_out] = self.field[key].gaussian_filter(sigma,
truncate=truncate,border='constant',const_val=0.0)
# Exchange ghost cells and set boundary conditions
self.exchange_ghost_cells(key_out)
self.impose_boundary_conditions(key_out)
# Iterate inplace from now on
key = key_out
def vtk_contour(self,key,val):
'''Compute isocontour for chunks.'''
if any([self.num_ghost[ii]>1 for ii in range(3)]):
idx_origin = tuple(self.num_ghost[ii]-1 for ii in range(3))
numpoints = tuple(self.field[key].numpoints[ii]-2*(self.num_ghost[ii]-1)
for ii in range(3))
return self.field[key].extract_subfield(
idx_origin,numpoints).vtk_contour(val)
else:
return self.field[key].vtk_contour(val)
return
def vtk_slice(self,key,normal,origin):
'''Extracts a plane from field.'''
if any([self.num_ghost[ii]>1 for ii in range(3)]):
idx_origin = tuple(self.num_ghost[ii]-1 for ii in range(3))
numpoints = tuple(self.field[key].numpoints[ii]-2*(self.num_ghost[ii]-1)
for ii in range(3))
return self.field[key].extract_subfield(
idx_origin,numpoints).vtk_slice(normal,origin)
else:
return self.field[key].vtk_slice(normal,origin)
return
def rank_from_position(self,ip,jp,kp,external=False):
if external:
nyp,nzp = self.nyp_ext,self.nzp_ext
else:
nyp,nzp = self.nyp,self.nzp
return ip*nyp*nzp+jp*nzp+kp
def position_from_rank(self,rank,external=False):
if external:
nyp,nzp = self.nyp_ext,self.nzp_ext
else:
nyp,nzp = self.nyp,self.nzp
ip = rank//(nyp*nzp)
jp = (rank//nzp)%nyp
kp = rank%nzp
return (ip,jp,kp)
def shifting_state(self,key,axis=None):
if axis is None:
return tuple(self.shifting_state(key,axis=ii) for axis in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
return int(round((self.origin[key][axis]-self.bounds[2*axis])/(0.5*self.spacing[axis])))
def exchange_ghost_cells(self,key):
'''Communicates all ghost cells of specified field'''
# Trigger non-blocking communication:
# Communicate faces (6 faces)
self._communicate_ghost_cells(key,(-1,0,0)) # left
self._communicate_ghost_cells(key,(+1,0,0)) # right
self._communicate_ghost_cells(key,(0,-1,0)) # down
self._communicate_ghost_cells(key,(0,+1,0)) # up
self._communicate_ghost_cells(key,(0,0,-1)) # front
self._communicate_ghost_cells(key,(0,0,+1)) # back
# Communicate edges (12 edges)
self._communicate_ghost_cells(key,(-1,-1,0)) # left,down
self._communicate_ghost_cells(key,(-1,0,-1)) # left,front
self._communicate_ghost_cells(key,(-1,+1,0)) # left,up
self._communicate_ghost_cells(key,(-1,0,+1)) # left,back
self._communicate_ghost_cells(key,(+1,-1,0)) # right,down
self._communicate_ghost_cells(key,(+1,0,-1)) # right,front
self._communicate_ghost_cells(key,(+1,+1,0)) # right,up
self._communicate_ghost_cells(key,(+1,0,+1)) # right,back
self._communicate_ghost_cells(key,(0,-1,-1)) # down,front
self._communicate_ghost_cells(key,(0,-1,+1)) # down,back
self._communicate_ghost_cells(key,(0,+1,-1)) # up,front
self._communicate_ghost_cells(key,(0,+1,+1)) # up,back
# Communicate corners (8 corners)
self._communicate_ghost_cells(key,(-1,-1,-1)) # left,down,front
self._communicate_ghost_cells(key,(-1,-1,+1)) # left,down,back
self._communicate_ghost_cells(key,(-1,+1,-1)) # left,up,front
self._communicate_ghost_cells(key,(-1,+1,+1)) # left,up,back
self._communicate_ghost_cells(key,(+1,-1,-1)) # right,down,front
self._communicate_ghost_cells(key,(+1,-1,+1)) # right,down,back
self._communicate_ghost_cells(key,(+1,+1,-1)) # right,up,front
self._communicate_ghost_cells(key,(+1,+1,+1)) # right,up,back
def impose_boundary_conditions(self,key):
'''Imposes symmetry boundary conditions on each non-periodic wall'''
self._symmetrize_wall(key,(-1,0,0))
self._symmetrize_wall(key,(+1,0,0))
self._symmetrize_wall(key,(0,-1,0))
self._symmetrize_wall(key,(0,+1,0))
self._symmetrize_wall(key,(0,0,-1))
self._symmetrize_wall(key,(0,0,+1))
def _communicate_ghost_cells(self,key,positionDst):
'''Triggers communication of ghost cells'''
import numpy as np
assert np.max(positionDst)<=1 and np.min(positionDst)>=-1, "communicate_ghost_cells: "\
"invalid neighbor position {}".format(positionDst)
# [send/recv] get the rank of the neighbor where data is to be sent to
# The position is passed as values -1,0,+1, but need to be converted to array indices
ip_dst = positionDst[0]+1
jp_dst = positionDst[1]+1
kp_dst = positionDst[2]+1
ip_src = -positionDst[0]+1
jp_src = -positionDst[1]+1
kp_src = -positionDst[2]+1
rank_dst = self.nghbr[ip_dst,jp_dst,kp_dst]
rank_src = self.nghbr[ip_src,jp_src,kp_src]
# [send/recv] create a tag
tag = ip_dst*100+jp_dst*10+kp_dst
# [send/recv] get the indices of data to be sent/received
nxl,nyl,nzl = self.chunk_size[key]
if positionDst[0]==-1:
ii_src = slice(self.nghx,2*self.nghx)
ii_dst = slice(self.nghx+nxl,2*self.nghx+nxl)
elif positionDst[0]==0:
ii_src = slice(self.nghx,self.nghx+nxl)
ii_dst = slice(self.nghx,self.nghx+nxl)
elif positionDst[0]==1:
ii_src = slice(nxl,nxl+self.nghx)
ii_dst = slice(0,self.nghx)
else:
raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[0]))
if positionDst[1]==-1:
jj_src = slice(self.nghy,2*self.nghy)
jj_dst = slice(self.nghy+nyl,2*self.nghy+nyl)
elif positionDst[1]==0:
jj_src = slice(self.nghy,self.nghy+nyl)
jj_dst = slice(self.nghy,self.nghy+nyl)
elif positionDst[1]==1:
jj_src = slice(nyl,nyl+self.nghy)
jj_dst = slice(0,self.nghy)
else:
raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[1]))
if positionDst[2]==-1:
kk_src = slice(self.nghz,2*self.nghz)
kk_dst = slice(self.nghz+nzl,2*self.nghz+nzl)
elif positionDst[2]==0:
kk_src = slice(self.nghz,self.nghz+nzl)
kk_dst = slice(self.nghz,self.nghz+nzl)
elif positionDst[2]==1:
kk_src = slice(nzl,nzl+self.nghz)
kk_dst = slice(0,self.nghz)
else:
raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[2]))
# [send/recv] Create a list for requests
reqsend = None
reqrecv = None
# [send] now send the data to the neighbor, but only if there is one!
# Communication must be done non-blocking, and we can use upper-case routines since this is a numpy array
if rank_dst>=0:
sendbuf = np.ascontiguousarray(self.field[key].data[ii_src,jj_src,kk_src])
reqsend = self.comm.Isend(sendbuf,dest=rank_dst,tag=tag)
# [recv] the corresponding receive: results are stored in a buffer which will be assigned to the parent array later
if rank_src>=0:
recvbuf = np.zeros(
(ii_dst.stop-ii_dst.start,
jj_dst.stop-jj_dst.start,
kk_dst.stop-kk_dst.start),dtype=self.precision)
reqrecv = self.comm.Irecv(recvbuf,source=rank_src,tag=tag)
# [recv] wait for data to be received
if reqrecv is not None:
reqrecv.wait()
self.field[key].data[ii_dst,jj_dst,kk_dst] = recvbuf
# [send] wait for data to be sent
if reqsend is not None:
reqsend.wait()
def _symmetrize_wall(self,key,positionBd):
import numpy as np
assert np.max(positionBd)<=1 and np.min(positionBd)>=-1, "symmetrize_wall: invalid neighbor "\
"position {}".format(positionBd)
assert np.count_nonzero(positionBd)==1, "symmetrize_wall: only face direction is accepted "\
"(no edges or corners)"
inghbr = positionBd[0]+1
jnghbr = positionBd[1]+1
knghbr = positionBd[2]+1
if self.nghbr[inghbr,jnghbr,knghbr]<0:
sl_dst = [slice(None),slice(None),slice(None)]
sl_src = [slice(None),slice(None),slice(None)]
for axis in range(3):
if positionBd[axis]==-1: # lower boundary
# index of first point within the domain
idx = self.field[key].nearest_gridpoint(self.bounds[2*axis],axis=axis,lower=True)+1
# distance first point to wall: should be either 0 or dx/2
dist = np.abs(self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis])
if dist>=0.25*self.spacing[axis]: # no point on boundary
sl_dst[axis] = slice(0,idx,1)
sl_src[axis] = slice(2*idx-1,idx-1,-1)
else: # point on boundary
sl_dst[axis] = slice(0,idx,1)
sl_src[axis] = slice(2*idx,idx,-1)
break
elif positionBd[axis]==1: # upper boundary
# index of last point within the domain
idx = self.field[key].nearest_gridpoint(self.bounds[2*axis+1],axis=axis,lower=True)
# distance last point to wall: should be either 0 or -dx/2
dist = np.abs(self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis+1])
if dist>=0.25*self.spacing[axis]: # no point on boundary
sl_dst[axis] = slice(idx+1,self.field[key].numpoints[axis],1)
sl_src[axis] = slice(idx,2*idx+1-self.field[key].numpoints[axis],-1)
else: # point on boundary
sl_dst[axis] = slice(idx+1,self.field[key].numpoints[axis],1)
sl_src[axis] = slice(idx-1,2*idx-self.field[key].numpoints[axis],-1)
break
self.field[key].data[tuple(sl_dst)] = self.symmetries[key][inghbr,jnghbr,knghbr]*\
self.field[key].data[tuple(sl_src)]
def _baton_wait(self,batch_size,tag=420):
'''Block execution until an empty message from rank-batch_wait
is received (issued by _baton_pass)'''
from mpi4py import MPI
if batch_size is not None:
if self.rank>=batch_size:
source = self.rank-batch_size
self.comm.recv(source=source,tag=tag)
def _baton_pass(self,batch_size,tag=420):
'''Sends an empty message to rank+batch_wait to unblock its
execution (issued by _baton_wait)'''
from mpi4py import MPI
if batch_size is not None:
dest = self.rank+batch_size
if dest<self.comm.Get_size():
data = None
self.comm.send(data,dest=dest,tag=tag)
class GatherIterator:
'''Sends 'data' sequentially to 'root' which can iterate over it
without gathering all data at once. Every process which is not 'root'
receives None.'''
def __init__(self,data,comm=None,root=0,barrier=False):
from mpi4py import MPI
comm = MPI.COMM_WORLD if comm is None else comm
self.comm = comm
self.rank = comm.Get_rank()
self.size = comm.Get_size()
self.root = root
self.barrier = barrier
self.iter = 0
self.data = data
def __iter__(self):
return self
def __next__(self):
if self.iter>=self.size:
if self.barrier: self.comm.Barrier()
raise StopIteration
if self.rank==self.root:
if self.rank==self.iter:
r = self.data
else:
r = self.comm.recv(source=self.iter,tag=0)
else:
if self.rank==self.iter:
self.comm.send(self.data,dest=self.root,tag=0)
r = None
self.iter += 1
return r
def parprint(*args, **kwargs):
from mpi4py import MPI
rank = kwargs.pop('rank') if 'rank' in kwargs else 0
comm = kwargs.pop('comm') if 'comm' in kwargs else MPI.COMM_WORLD
if get_rank(comm=comm)==rank:
print(*args, **kwargs)
def get_rank(comm=None):
from mpi4py import MPI
comm = MPI.COMM_WORLD if comm is None else comm
return comm.Get_rank()
def is_root(comm=None,root=0):
from mpi4py import MPI
comm = MPI.COMM_WORLD if comm is None else comm
return comm.Get_rank()==root
def gather(data,comm=None):
from mpi4py import MPI
comm = MPI.COMM_WORLD if comm is None else comm
return comm.gather(data,root=0)