suspendtools/parallel.py

724 lines
34 KiB
Python

class PPP:
"""Parallel Python Postprocessor for suspend"""
def __init__(self,comm,func_load,num_ghost,precision,origin,spacing,periodicity,bounds,proc,chunks_per_proc):
'''Constructor: except for comm, only rank 0 needs initialized data.'''
self.comm = comm
self.rank = comm.Get_rank()
self.func_load = func_load
self.init_settings(num_ghost,precision)
self.init_domain(origin,spacing,periodicity,bounds)
self.init_procgrid(proc,chunks_per_proc)
self.field = {}
self.symmetries = {}
return
@classmethod
def from_snapshot(cls,snap,chunks_per_proc=(1,1,1),num_ghost=(1,1,1),precision='float64'):
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank==0:
proc = snap.procgrid()
origin = snap.origin()
spacing = snap.spacing()
periodicity = snap.periodicity()
bounds = snap.bounds()
else:
proc = None
origin = None
spacing = None
periodicity = None
bounds = None
func_load = snap.field_chunk
return cls(comm,func_load,num_ghost,precision,origin,spacing,periodicity,bounds,proc,chunks_per_proc)
def init_settings(self,num_ghost,precision):
'''Initializes PPP settings for all processors.'''
# TBD: some assertions
self.num_ghost = self.comm.bcast(num_ghost,root=0)
self.precision = self.comm.bcast(precision,root=0)
# Some shortcuts
self.nghx,self.nghy,self.nghz = self.num_ghost
def init_symmetries(self,key,mirror=(True,True)):
'''Sets the symmetries for ghost cells behind the wall'''
# Example: wall in y
# No-slip boundary (no mirror) free-slip boundary (mirror)
# u -> -u u -> u
# v -> -v v -> -v
# w -> -w w -> w
# p -> p p -> p
import numpy as np
self.symmetries[key] = np.zeros((3,3,3),dtype='i')
if not self.xperiodic:
if key=='u':
self.symmetries[key][0,1,1] = -1
self.symmetries[key][2,1,1] = -1
else:
self.symmetries[key][0,1,1] = 1 if mirror[0] else -1
self.symmetries[key][2,1,1] = 1 if mirror[1] else -1
if not self.yperiodic:
if key=='v':
self.symmetries[key][1,0,1] = -1
self.symmetries[key][1,2,1] = -1
else:
self.symmetries[key][1,0,1] = 1 if mirror[0] else -1
self.symmetries[key][1,2,1] = 1 if mirror[1] else -1
if not self.zperiodic:
if key=='w':
self.symmetries[key][1,1,0] = -1
self.symmetries[key][1,1,2] = -1
else:
self.symmetries[key][1,1,0] = 1 if mirror[0] else -1
self.symmetries[key][1,1,2] = 1 if mirror[1] else -1
def init_domain(self,origin,spacing,periodicity,bounds):
'''Sets up domain information for all processors'''
# TBD: some assertions
self.origin = self.comm.bcast(origin,root=0)
self.spacing = self.comm.bcast(spacing,root=0)
self.periodicity = self.comm.bcast(periodicity,root=0)
self.bounds = self.comm.bcast(bounds,root=0)
# Some shortcuts
self.xperiodic,self.yperiodic,self.zperiodic = self.periodicity
return
def init_procgrid(self,proc,chunks_per_proc):
# Note: requires nghx, xperiodic to be set
'''Read input processor grid, compute processor grid for workers'''
import numpy as np
self.chunks_per_proc = self.comm.bcast(chunks_per_proc,root=0)
self.nxcpp = chunks_per_proc[0]
self.nycpp = chunks_per_proc[1]
self.nzcpp = chunks_per_proc[2]
if self.rank==0:
# Assert proc and add it to class
assert all(k in proc for k in ('u','v','w','p','s')), "'proc' must be a dictionary with "\
"keys 'u','v','w','p','s'"
for key in proc:
assert len(proc[key])==6, "Entries of 'proc' must have length of 6."
proc_grid_ext = proc
# Initialize chunks per processor
# Verify that this processor grid can be redistributed onto the requested processor layout
nxp_ext = len(proc_grid_ext['u'][0])
nyp_ext = len(proc_grid_ext['u'][2])
nzp_ext = len(proc_grid_ext['u'][4])
nproc_ext = nxp_ext*nyp_ext*nzp_ext
#
assert nxp_ext%self.nxcpp==0, "Number of processors must be divisible by the number "\
"of processors per process. (nxp_ext={}, nxcpp={})".format(
nxp_ext,self.nxcpp)
assert nyp_ext%self.nycpp==0, "Number of processors must be divisible by the number "\
"of processors per process. (nyp_ext={}, nycpp={})".format(
nyp_ext,self.nycpp)
assert nzp_ext%self.nzcpp==0, "Number of processors must be divisible by the number "\
"of processors per process. (nzp_ext={}, nzcpp={})".format(
nzp_ext,self.nzcpp)
# Determine the new processor layout and verify total number of MPI processes
nxp = nxp_ext//self.nxcpp
nyp = nyp_ext//self.nycpp
nzp = nzp_ext//self.nzcpp
nproc = nxp*nyp*nzp
assert nproc==self.comm.Get_size(), "Number of MPI processes does not match the requested "\
"processor layout. (MPI procs: {}, required procs: {})".format(
self.comm.Get_size(),nproc)
# Construct internal processor grid
proc_grid = {}
for key in proc_grid_ext:
proc_grid[key] = [None]*6
proc_grid[key][0] = proc_grid_ext[key][0][0::self.nxcpp]
proc_grid[key][1] = proc_grid_ext[key][1][self.nxcpp-1::self.nxcpp]
proc_grid[key][2] = proc_grid_ext[key][2][0::self.nycpp]
proc_grid[key][3] = proc_grid_ext[key][3][self.nycpp-1::self.nycpp]
proc_grid[key][4] = proc_grid_ext[key][4][0::self.nzcpp]
proc_grid[key][5] = proc_grid_ext[key][5][self.nzcpp-1::self.nzcpp]
else:
proc_grid_ext = None
proc_grid = None
nxp_ext,nyp_ext,nzp_ext,nproc_ext = None,None,None,None
nxp,nyp,nzp,nproc = None,None,None,None
# Broadcast the data
self.proc_grid_ext = self.comm.bcast(proc_grid_ext,root=0)
self.proc_grid = self.comm.bcast(proc_grid,root=0)
self.nxp_ext,self.nyp_ext,\
self.nzp_ext,self.nproc_ext = self.comm.bcast((nxp_ext,nyp_ext,nzp_ext,nproc_ext),root=0)
self.nxp,self.nyp,\
self.nzp,self.nproc = self.comm.bcast((nxp,nyp,nzp,nproc),root=0)
# Get position in processor grid
self.ip,self.jp,self.kp = self.position_from_rank(self.rank,external=False)
# Determine local grid indices and size
for key in self.proc_grid:
nxl = self.proc_grid[key][1][self.ip]-self.proc_grid[key][0][self.ip]+1
nyl = self.proc_grid[key][3][self.jp]-self.proc_grid[key][2][self.jp]+1
nzl = self.proc_grid[key][5][self.kp]-self.proc_grid[key][4][self.kp]+1
# Verify that local grid size is not smaller than ghost cell size
assert (nxl>=self.nghx and nyl>=self.nghy and nzl>=self.nghz),\
"Local grid size must be greater than number "\
"of ghost cells in each direction!"
# Initialize neighbor array
nghbr = np.empty((3,3,3),dtype='int')
# wrap-around x
ipl = self.ip-1
if ipl<0:
if self.xperiodic: ipl = self.nxp-1
else: ipl = -1
ipr = self.ip+1
if ipr>self.nxp-1:
if self.xperiodic: ipr = 0
else: ipr = -1
inghbr = (ipl,self.ip,ipr)
# wrap-around y
jpl = self.jp-1
if jpl<0:
if self.yperiodic: jpl = self.nyp-1
else: jpl = -1
jpr = self.jp+1
if jpr>self.nyp-1:
if self.yperiodic: jpr = 0
else: jpr = -1
jnghbr = (jpl,self.jp,jpr)
# wrap-around z
kpl = self.kp-1
if kpl<0:
if self.zperiodic: kpl = self.nzp-1
else: kpl = -1
kpr = self.kp+1
if kpr>self.nzp-1:
if self.zperiodic: kpr = 0
else: kpr = -1
knghbr = (kpl,self.kp,kpr)
# Construct array of neighbors
for ip in range(3):
for jp in range(3):
for kp in range(3):
# Assign rank to neighbor array
if inghbr[ip]<0 or jnghbr[jp]<0 or knghbr[kp]<0:
nghbr[ip,jp,kp] = -1
else:
nghbr[ip,jp,kp] = self.rank_from_position(inghbr[ip],jnghbr[jp],knghbr[kp],external=False)
# Save neighbors as class variable
self.nghbr = nghbr
def load_field(self,key,io_limit=None,barrier=False):
'''Loads the required chunks from file'''
from .field import Field3d
import numpy as np
# Block execution of some processors if IO is limited
self._baton_wait(io_limit)
# Determine which chunks are to be loaded by the current processor
ip_beg_ext = self.ip*self.chunks_per_proc[0]
jp_beg_ext = self.jp*self.chunks_per_proc[1]
kp_beg_ext = self.kp*self.chunks_per_proc[2]
ip_end_ext = ip_beg_ext+self.chunks_per_proc[0]-1
jp_end_ext = jp_beg_ext+self.chunks_per_proc[1]-1
kp_end_ext = kp_beg_ext+self.chunks_per_proc[2]-1
# Get the total size of the field to be loaded
ib = self.proc_grid_ext[key][0][ip_beg_ext]
ie = self.proc_grid_ext[key][1][ip_end_ext]
jb = self.proc_grid_ext[key][2][jp_beg_ext]
je = self.proc_grid_ext[key][3][jp_end_ext]
kb = self.proc_grid_ext[key][4][kp_beg_ext]
ke = self.proc_grid_ext[key][5][kp_end_ext]
nxl = ie-ib+1
nyl = je-jb+1
nzl = ke-kb+1
# Allocate an array to hold the entire field
data = np.empty(
(nxl+2*self.nghx,
nyl+2*self.nghy,
nzl+2*self.nghz),dtype=self.precision)
# Compute origin of subfield
origin = (self.origin[key][0]+(ib-1-self.nghx)*self.spacing[0],
self.origin[key][1]+(jb-1-self.nghy)*self.spacing[1],
self.origin[key][2]+(kb-1-self.nghz)*self.spacing[2])
# Create a Field3d
self.field[key] = Field3d(data,origin,self.spacing)
# Go through each chunk to be read and construct the field
for ip_ext in range(ip_beg_ext,ip_end_ext+1):
for jp_ext in range(jp_beg_ext,jp_end_ext+1):
for kp_ext in range(kp_beg_ext,kp_end_ext+1):
# Determine rank of the chunk to be read
rank_ext = self.rank_from_position(ip_ext,jp_ext,kp_ext,external=True)
# Compute bounds of this chunk
ib_ext = self.proc_grid_ext[key][0][ip_ext]
ie_ext = self.proc_grid_ext[key][1][ip_ext]
jb_ext = self.proc_grid_ext[key][2][jp_ext]
je_ext = self.proc_grid_ext[key][3][jp_ext]
kb_ext = self.proc_grid_ext[key][4][kp_ext]
ke_ext = self.proc_grid_ext[key][5][kp_ext]
nxl_ext = ie_ext-ib_ext+1
nyl_ext = je_ext-jb_ext+1
nzl_ext = ke_ext-kb_ext+1
# Read data and insert it
subfield = self.func_load(rank_ext,key)
self.field[key].insert_subfield(subfield)
# Continue execution of waiting processors if IO was limited
self._baton_pass(io_limit)
# Exchange ghost cells
self.exchange_ghost_cells(key)
# Initialize symmetries and impose BC
self.init_symmetries(key)
self.impose_boundary_conditions(key)
# Syncronize processes if requested
if barrier: self.comm.Barrier()
def differentiate(self,key,axis,key_out=None):
assert axis<3, "'axis' must be one of 0,1,2."
if key_out is None:
key_out = key+('x','y','z')[axis]
origin = list(self.origin)
shifting_state = self.shifting_state(key,axis=axis)
if shifting_state==-1:
padding = 'after'
origin[axis] += 0.5*self.spacing[axis]
elif shifting_state==0:
padding = 'before'
origin[axis] -= 0.5*self.spacing[axis]
elif shifting_state==1:
padding = 'before'
origin[axis] -= 0.5*self.spacing[axis]
else:
raise ValueError("Invalid shifting state.")
self.field[key_out] = self.field[key].derivative(axis,padding=padding)
self.origin[key_out] = tuple(origin)
self.spacing[key_out] = self.spacing[key].copy()
self.symmetries[key_out] = self.symmetries[key].copy()
self.proc_grid[key_out] = self.proc_grid[key].copy()
self.proc_grid_ext[key_out] = self.proc_grid_ext[key].copy()
if axis==0:
self.symmetries[key_out][0,1,1] = -self.symmetries[key_out][0,1,1]
self.symmetries[key_out][2,1,1] = -self.symmetries[key_out][2,1,1]
elif axis==1:
self.symmetries[key_out][1,0,1] = -self.symmetries[key_out][1,0,1]
self.symmetries[key_out][1,2,1] = -self.symmetries[key_out][1,2,1]
elif axis==2:
self.symmetries[key_out][1,1,0] = -self.symmetries[key_out][1,1,0]
self.symmetries[key_out][1,1,2] = -self.symmetries[key_out][1,1,2]
# Exchange ghost cells and set boundary conditions
self.exchange_ghost_cells(key_out)
self.impose_boundary_conditions(key_out)
def gaussian_filter(self,key,sigma,truncate=4.0,key_out=None,iterate=False):
'''Applies a gaussian filter to a field as in-place operation. Sigma is the std of the filter in terms of grid width.'''
import numpy as np
if key_out is None:
key_out = key
else:
self.origin[key_out] = self.origin[key].copy()
self.spacing[key_out] = self.spacing[key].copy()
# Compute radius of Gaussian filter
radius = self.field[key].gaussian_filter_radius(sigma,truncate=truncate)
if not iterate:
# Assert that we have sufficient amount of ghost cells
assert all([self.num_ghost[ii]>=radius[ii] for ii in range(3)]),\
"Too few ghost cells for stencil: {}, {}".format(self.num_ghost,radius)
niter = 1
else:
# Determine number of iterations required for current ghost cell setup
sigma = np.array(sigma)
niter = 1
while not all([self.num_ghost[ii]>=radius[ii] for ii in range(3)]):
sigma /= np.sqrt(2)
niter *= 2
radius = self.field[key].gaussian_filter_radius(sigma,truncate=truncate)
assert all([radius[ii]>0 if sigma[ii]>0.0 else True for ii in range(3)]),\
"Iterative procedure leads to invalid stencil radius: "\
"increase number of ghost cells. {}".format(radius)
print('Iterations: {}, stencil radius: {}'.format(niter,radius))
for iiter in range(niter):
# Filter field: if key_out is None, perform operation inplace
self.field[key_out] = self.field[key].gaussian_filter(sigma,
truncate=truncate,border='constant',const_val=0.0)
# Exchange ghost cells and set boundary conditions
self.exchange_ghost_cells(key_out)
self.impose_boundary_conditions(key_out)
# Iterate inplace from now on
key = key_out
def broadcast(self,key,arg,operation):
'''Broadcasts an inplace operation involving a scalar or matrix on
the entire grid. If 'arg' is a matrix, it must be three-dimensional
and its axes must be singular or of length nx/ny/nz.'''
import numpy as np
import operator
if operation in ('add','+'):
op = operator.iadd
elif operation in ('subtract','sub','-'):
op = operator.isub
elif operation in ('divide','div','/'):
op = operator.itruediv
elif operation in ('multiply','mul','*'):
op = operator.imul
elif operation in ('power','pow','^','**'):
op = operator.ipow
else:
raise ValueError("Invalid operation: {}".format(operation))
if isinstance(arg,np.ndarray):
sl_arg = 3*[slice(None)]
for axis in range(3):
if arg.shape[axis]==1:
continue
elif arg.shape[axis]==self.numpoints(key,axis=axis):
pos = self.position_from_rank(self.rank,external=False)[axis]
sl_arg[axis] = slice(
self.proc_grid[key][2*axis][pos]-1,
self.proc_grid[key][2*axis+1][pos])
else:
raise ValueError("'arg' must either be singular or match global "\
"grid dimension. (axis={}: got {:d}, expected {:d}".format(
axis,arg.shape[axis],self.numpoints(key,axis=axis)))
# Only operate on interior and communcate ghosts later
sl_int = tuple(slice(self.num_ghost[ii],-self.num_ghost[ii]) for ii in range(3))
sl_arg = tuple(sl_arg)
op(self.field[key].data[sl_int],arg[sl_arg])
# Exchange ghost cells and set boundary conditions
self.exchange_ghost_cells(key)
self.impose_boundary_conditions(key)
elif isinstance(arg,(int,float)):
op(self.field[key].data,arg)
return
def integrate(self,key,integrate_axis,ufunc=None,ignore_nan=False,average=False):
'''Computes a global integral or average along a given axis applying the
function 'ufunc' to each node. The result is returned by rank 0, all other
ranks return None.'''
from mpi4py import MPI
import numpy as np
if integrate_axis is None:
integrate_axis = tuple(self.periodicity)
# Compute local integral, but get weights separately
idx_origin = tuple(self.num_ghost[ii]-1 for ii in range(3))
(integ_local,weights_local) = self.field[key].extract_subfield(
idx_origin,self.chunk_size(key,axis=None)).integral(
integrate_axis,ufunc=ufunc,ignore_nan=ignore_nan,
average=average,return_weights=True)
# Make sure weights_local is a numpy array
weights_local = np.array(weights_local)
# Simple implementation: all processor communicate directly to root
if self.rank==0:
dim_final = tuple(1 if integrate_axis[axis] else self.numpoints(key,axis=axis) for axis in range(3))
# Allocate a nice spot of memory
integ = np.zeros(dim_final,dtype=integ_local.dtype)
if average:
weights = np.zeros(dim_final,dtype=weights_local.dtype)
# Receive from peers
for rank_src in range(0,self.nproc):
# Determine target array position
pos = self.position_from_rank(rank_src,external=False)
sl = 3*[slice(None)]
for axis in range(3):
if integrate_axis[axis]:
sl[axis] = slice(0,1)
else:
sl[axis] = slice(
self.proc_grid[key][2*axis][pos[axis]]-1,
self.proc_grid[key][2*axis+1][pos[axis]])
# Receive data and put it in a good spot
if rank_src==0:
integ[sl] += integ_local
if average:
weights[sl] += weights_local
else:
integ[sl] += self.comm.recv(source=rank_src,tag=1)
if average:
weights[sl] += self.comm.recv(source=rank_src,tag=2)
if average:
return integ/weights
else:
return integ/weights_local
else:
self.comm.send(integ_local,dest=0,tag=1)
if average:
self.comm.send(weights_local,dest=0,tag=2)
return None
def vtk_contour(self,key,val):
'''Compute isocontour for chunks.'''
if any([self.num_ghost[ii]>1 for ii in range(3)]):
idx_origin = tuple(self.num_ghost[ii]-1 for ii in range(3))
numpoints = self.chunk_size(axis=None)
return self.field[key].extract_subfield(
idx_origin,numpoints).vtk_contour(val)
else:
return self.field[key].vtk_contour(val)
return
def vtk_slice(self,key,normal,origin):
'''Extracts a plane from field.'''
if any([self.num_ghost[ii]>1 for ii in range(3)]):
idx_origin = tuple(self.num_ghost[ii]-1 for ii in range(3))
numpoints = tuple(self.field[key].numpoints[ii]-2*(self.num_ghost[ii]-1)
for ii in range(3))
return self.field[key].extract_subfield(
idx_origin,numpoints).vtk_slice(normal,origin)
else:
return self.field[key].vtk_slice(normal,origin)
return
def rank_from_position(self,ip,jp,kp,external=False):
if external:
nyp,nzp = self.nyp_ext,self.nzp_ext
else:
nyp,nzp = self.nyp,self.nzp
return ip*nyp*nzp+jp*nzp+kp
def position_from_rank(self,rank,external=False):
if external:
nyp,nzp = self.nyp_ext,self.nzp_ext
else:
nyp,nzp = self.nyp,self.nzp
ip = rank//(nyp*nzp)
jp = (rank//nzp)%nyp
kp = rank%nzp
return (ip,jp,kp)
def shifting_state(self,key,axis=None):
if axis is None:
return tuple(self.shifting_state(key,axis=ii) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
return int(round((self.origin[key][axis]-self.bounds[2*axis])/(0.5*self.spacing[axis])))
def chunk_size(self,key,axis=None):
'''Returns size of chunk without ghost cells.'''
if axis is None:
return tuple(self.chunk_size(key,axis=ii) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
return self.field[key].numpoints[axis]-2*self.num_ghost[axis]
def numpoints(self,key,axis=None):
'''Returns the total number of gridpoints across all processors
without ghost cells.'''
if axis is None:
return tuple(self.numpoints(key,axis=ii) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
return self.proc_grid[key][2*axis+1][-1]
def exchange_ghost_cells(self,key):
'''Communicates all ghost cells of specified field'''
# Trigger non-blocking communication:
# Communicate faces (6 faces)
self._communicate_ghost_cells(key,(-1,0,0)) # left
self._communicate_ghost_cells(key,(+1,0,0)) # right
self._communicate_ghost_cells(key,(0,-1,0)) # down
self._communicate_ghost_cells(key,(0,+1,0)) # up
self._communicate_ghost_cells(key,(0,0,-1)) # front
self._communicate_ghost_cells(key,(0,0,+1)) # back
# Communicate edges (12 edges)
self._communicate_ghost_cells(key,(-1,-1,0)) # left,down
self._communicate_ghost_cells(key,(-1,0,-1)) # left,front
self._communicate_ghost_cells(key,(-1,+1,0)) # left,up
self._communicate_ghost_cells(key,(-1,0,+1)) # left,back
self._communicate_ghost_cells(key,(+1,-1,0)) # right,down
self._communicate_ghost_cells(key,(+1,0,-1)) # right,front
self._communicate_ghost_cells(key,(+1,+1,0)) # right,up
self._communicate_ghost_cells(key,(+1,0,+1)) # right,back
self._communicate_ghost_cells(key,(0,-1,-1)) # down,front
self._communicate_ghost_cells(key,(0,-1,+1)) # down,back
self._communicate_ghost_cells(key,(0,+1,-1)) # up,front
self._communicate_ghost_cells(key,(0,+1,+1)) # up,back
# Communicate corners (8 corners)
self._communicate_ghost_cells(key,(-1,-1,-1)) # left,down,front
self._communicate_ghost_cells(key,(-1,-1,+1)) # left,down,back
self._communicate_ghost_cells(key,(-1,+1,-1)) # left,up,front
self._communicate_ghost_cells(key,(-1,+1,+1)) # left,up,back
self._communicate_ghost_cells(key,(+1,-1,-1)) # right,down,front
self._communicate_ghost_cells(key,(+1,-1,+1)) # right,down,back
self._communicate_ghost_cells(key,(+1,+1,-1)) # right,up,front
self._communicate_ghost_cells(key,(+1,+1,+1)) # right,up,back
def impose_boundary_conditions(self,key):
'''Imposes symmetry boundary conditions on each non-periodic wall'''
self._symmetrize_wall(key,(-1,0,0))
self._symmetrize_wall(key,(+1,0,0))
self._symmetrize_wall(key,(0,-1,0))
self._symmetrize_wall(key,(0,+1,0))
self._symmetrize_wall(key,(0,0,-1))
self._symmetrize_wall(key,(0,0,+1))
def _communicate_ghost_cells(self,key,positionDst):
'''Triggers communication of ghost cells'''
import numpy as np
assert np.max(positionDst)<=1 and np.min(positionDst)>=-1, "communicate_ghost_cells: "\
"invalid neighbor position {}".format(positionDst)
# [send/recv] get the rank of the neighbor where data is to be sent to
# The position is passed as values -1,0,+1, but need to be converted to array indices
ip_dst = positionDst[0]+1
jp_dst = positionDst[1]+1
kp_dst = positionDst[2]+1
ip_src = -positionDst[0]+1
jp_src = -positionDst[1]+1
kp_src = -positionDst[2]+1
rank_dst = self.nghbr[ip_dst,jp_dst,kp_dst]
rank_src = self.nghbr[ip_src,jp_src,kp_src]
# [send/recv] create a tag
tag = ip_dst*100+jp_dst*10+kp_dst
# [send/recv] get the indices of data to be sent/received
nxl,nyl,nzl = self.chunk_size(key)
if positionDst[0]==-1:
ii_src = slice(self.nghx,2*self.nghx)
ii_dst = slice(self.nghx+nxl,2*self.nghx+nxl)
elif positionDst[0]==0:
ii_src = slice(self.nghx,self.nghx+nxl)
ii_dst = slice(self.nghx,self.nghx+nxl)
elif positionDst[0]==1:
ii_src = slice(nxl,nxl+self.nghx)
ii_dst = slice(0,self.nghx)
else:
raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[0]))
if positionDst[1]==-1:
jj_src = slice(self.nghy,2*self.nghy)
jj_dst = slice(self.nghy+nyl,2*self.nghy+nyl)
elif positionDst[1]==0:
jj_src = slice(self.nghy,self.nghy+nyl)
jj_dst = slice(self.nghy,self.nghy+nyl)
elif positionDst[1]==1:
jj_src = slice(nyl,nyl+self.nghy)
jj_dst = slice(0,self.nghy)
else:
raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[1]))
if positionDst[2]==-1:
kk_src = slice(self.nghz,2*self.nghz)
kk_dst = slice(self.nghz+nzl,2*self.nghz+nzl)
elif positionDst[2]==0:
kk_src = slice(self.nghz,self.nghz+nzl)
kk_dst = slice(self.nghz,self.nghz+nzl)
elif positionDst[2]==1:
kk_src = slice(nzl,nzl+self.nghz)
kk_dst = slice(0,self.nghz)
else:
raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[2]))
# [send/recv] Create a list for requests
reqsend = None
reqrecv = None
# [send] now send the data to the neighbor, but only if there is one!
# Communication must be done non-blocking, and we can use upper-case routines since this is a numpy array
if rank_dst>=0:
sendbuf = np.ascontiguousarray(self.field[key].data[ii_src,jj_src,kk_src])
reqsend = self.comm.Isend(sendbuf,dest=rank_dst,tag=tag)
# [recv] the corresponding receive: results are stored in a buffer which will be assigned to the parent array later
if rank_src>=0:
recvbuf = np.zeros(
(ii_dst.stop-ii_dst.start,
jj_dst.stop-jj_dst.start,
kk_dst.stop-kk_dst.start),dtype=self.precision)
reqrecv = self.comm.Irecv(recvbuf,source=rank_src,tag=tag)
# [recv] wait for data to be received
if reqrecv is not None:
reqrecv.wait()
self.field[key].data[ii_dst,jj_dst,kk_dst] = recvbuf
# [send] wait for data to be sent
if reqsend is not None:
reqsend.wait()
def _symmetrize_wall(self,key,positionBd):
import numpy as np
assert np.max(positionBd)<=1 and np.min(positionBd)>=-1, "symmetrize_wall: invalid neighbor "\
"position {}".format(positionBd)
assert np.count_nonzero(positionBd)==1, "symmetrize_wall: only face direction is accepted "\
"(no edges or corners)"
inghbr = positionBd[0]+1
jnghbr = positionBd[1]+1
knghbr = positionBd[2]+1
if self.nghbr[inghbr,jnghbr,knghbr]<0:
sl_dst = [slice(None),slice(None),slice(None)]
sl_src = [slice(None),slice(None),slice(None)]
for axis in range(3):
if positionBd[axis]==-1: # lower boundary
# index of first point within the domain
idx = self.field[key].nearest_gridpoint(self.bounds[2*axis],axis=axis,lower=True)+1
# distance first point to wall: should be either 0 or dx/2
dist = np.abs(self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis])
if dist>=0.25*self.spacing[axis]: # no point on boundary
sl_dst[axis] = slice(0,idx,1)
sl_src[axis] = slice(2*idx-1,idx-1,-1)
else: # point on boundary
sl_dst[axis] = slice(0,idx,1)
sl_src[axis] = slice(2*idx,idx,-1)
break
elif positionBd[axis]==1: # upper boundary
# index of last point within the domain
idx = self.field[key].nearest_gridpoint(self.bounds[2*axis+1],axis=axis,lower=True)
# distance last point to wall: should be either 0 or -dx/2
dist = np.abs(self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis+1])
if dist>=0.25*self.spacing[axis]: # no point on boundary
sl_dst[axis] = slice(idx+1,self.field[key].numpoints[axis],1)
sl_src[axis] = slice(idx,2*idx+1-self.field[key].numpoints[axis],-1)
else: # point on boundary
sl_dst[axis] = slice(idx+1,self.field[key].numpoints[axis],1)
sl_src[axis] = slice(idx-1,2*idx-self.field[key].numpoints[axis],-1)
break
self.field[key].data[tuple(sl_dst)] = self.symmetries[key][inghbr,jnghbr,knghbr]*\
self.field[key].data[tuple(sl_src)]
def _baton_wait(self,batch_size,tag=420):
'''Block execution until an empty message from rank-batch_wait
is received (issued by _baton_pass)'''
from mpi4py import MPI
if batch_size is not None:
if self.rank>=batch_size:
source = self.rank-batch_size
self.comm.recv(source=source,tag=tag)
def _baton_pass(self,batch_size,tag=420):
'''Sends an empty message to rank+batch_wait to unblock its
execution (issued by _baton_wait)'''
from mpi4py import MPI
if batch_size is not None:
dest = self.rank+batch_size
if dest<self.comm.Get_size():
data = None
self.comm.send(data,dest=dest,tag=tag)
class GatherIterator:
'''Sends 'data' sequentially to 'root' which can iterate over it
without gathering all data at once. Every process which is not 'root'
receives None.'''
def __init__(self,data,comm=None,root=0,barrier=False):
from mpi4py import MPI
comm = MPI.COMM_WORLD if comm is None else comm
self.comm = comm
self.rank = comm.Get_rank()
self.size = comm.Get_size()
self.root = root
self.barrier = barrier
self.iter = 0
self.data = data
def __iter__(self):
return self
def __next__(self):
if self.iter>=self.size:
if self.barrier: self.comm.Barrier()
raise StopIteration
if self.rank==self.root:
if self.rank==self.iter:
r = self.data
else:
r = self.comm.recv(source=self.iter,tag=0)
else:
if self.rank==self.iter:
self.comm.send(self.data,dest=self.root,tag=0)
r = None
self.iter += 1
return r
def parprint(*args, **kwargs):
from mpi4py import MPI
rank = kwargs.pop('rank') if 'rank' in kwargs else 0
comm = kwargs.pop('comm') if 'comm' in kwargs else MPI.COMM_WORLD
if get_rank(comm=comm)==rank:
print(*args, **kwargs)
def get_rank(comm=None):
from mpi4py import MPI
comm = MPI.COMM_WORLD if comm is None else comm
return comm.Get_rank()
def is_root(comm=None,root=0):
from mpi4py import MPI
comm = MPI.COMM_WORLD if comm is None else comm
return comm.Get_rank()==root
def gather(data,comm=None):
from mpi4py import MPI
comm = MPI.COMM_WORLD if comm is None else comm
return comm.gather(data,root=0)