suspendtools/parallel.py

1251 lines
59 KiB
Python

import numpy as np
from .helper import position_from_rank, rank_from_position
class PPP:
"""Parallel Python Postprocessor for suspend"""
def __init__(self,comm,func_load,
num_ghost,chunks_per_proc,origin,spacing,periodicity,bounds,
proc_grid,nxp,nyp,nzp,
proc_grid_ext,nxp_ext,nyp_ext,nzp_ext,
nghbr,field,symmetries):
'''Class constructor: it is expected that all communcation is done in the corresponding
class methods, such that every rank already holds the required information.'''
# Initialize object with passed arguments
self.comm = comm
self.func_load = func_load
self.num_ghost = num_ghost
self.chunks_per_proc = chunks_per_proc
self.origin = origin
self.spacing = spacing
self.periodicity = periodicity
self.bounds = bounds
self.proc_grid = proc_grid
self.nxp,self.nyp,self.nzp = nxp,nyp,nzp
self.proc_grid_ext = proc_grid_ext
self.nxp_ext,self.nyp_ext,self.nzp_ext = nxp_ext,nyp_ext,nzp_ext
self.nghbr = nghbr
self.field = field
self.symmetries = symmetries
# Initialize some deduced variables
self.rank = comm.Get_rank()
self.nproc = nxp*nyp*nzp
self.ip,self.jp,self.kp = self.position_from_rank(self.rank,external=False)
self.nghx,self.nghy,self.nghz = num_ghost
self.xperiodic,self.yperiodic,self.zperiodic = periodicity
return
@classmethod
def from_snapshot(cls,snap,chunks_per_proc=(1,1,1),num_ghost=(1,1,1)):
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# Let rank 0 read the processor grid as well as some info and then broadcast it
if rank==0:
proc = snap.procgrid()
origin = snap.origin()
spacing = snap.spacing()
periodicity = snap.periodicity()
bounds = snap.bounds()
else:
proc,origin,spacing,periodicity,bounds = 5*[None]
proc = comm.bcast(proc ,root=0)
origin = comm.bcast(origin ,root=0)
spacing = comm.bcast(spacing ,root=0)
periodicity = comm.bcast(periodicity,root=0)
bounds = comm.bcast(bounds ,root=0)
# Make sure all processors got the same arguments
chunks_per_proc = comm.bcast(chunks_per_proc,root=0)
num_ghost = comm.bcast(num_ghost,root=0)
# Set data loading function
func_load = snap.field_chunk
# Construct new processor grid
(proc_grid,nxp,nyp,nzp,
proc_grid_ext,nxp_ext,nyp_ext,nzp_ext,
nghbr) = PPP._construct_procgrid(comm,proc,chunks_per_proc,num_ghost,periodicity)
# Initially, no field is loaded
field = {}
symmetries = {}
return cls(comm,func_load,
num_ghost,chunks_per_proc,origin,spacing,periodicity,bounds,
proc_grid,nxp,nyp,nzp,
proc_grid_ext,nxp_ext,nyp_ext,nzp_ext,
nghbr,field,symmetries)
@classmethod
def from_state(cls,file,parallel=True,io_limit=None):
import h5py
from mpi4py import MPI
from .field import Field3d
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# Only use parallel IO if flag is set and h5py has MPIIO support
parallel = (h5py.h5fd.MPIO>=0) and parallel
if parallel:
f = h5py.File(file,'r',driver='mpio',comm=comm)
else:
sb = SequentialBlock(io_limit,comm=comm,per_node=True)
f = h5py.File(file,'r')
# Read attributes which are the same for all ranks
g = f['PPP']
field_names = tuple(x.decode('ascii') for x in g['field_names'])
field_loaded = tuple(x.decode('ascii') for x in g['field_loaded'])
origin = {}
proc_grid = {}
for key in field_names:
origin[key] = tuple(g[key]['origin'])
proc_grid[key] = []
proc_grid[key].append(tuple(g[key]['ibeg']))
proc_grid[key].append(tuple(g[key]['iend']))
proc_grid[key].append(tuple(g[key]['jbeg']))
proc_grid[key].append(tuple(g[key]['jend']))
proc_grid[key].append(tuple(g[key]['kbeg']))
proc_grid[key].append(tuple(g[key]['kend']))
num_ghost = tuple(g['num_ghost'])
chunks_per_proc = tuple(g['chunks_per_proc'])
spacing = tuple(g['spacing'])
periodicity = tuple(g['periodicity'])
bounds = tuple(g['bounds'])
nxp = int(g['nxp'][:])
nyp = int(g['nyp'][:])
nzp = int(g['nzp'][:])
# Independent read
grp_rank = '{:05d}'.format(rank)
g = f[grp_rank]
nghbr = g['nghbr'][:]
field = {}
symmetries = {}
for key in field_loaded:
field[key] = Field3d.from_file(g,name=key)
symmetries[key] = g[key]['symmetries'][:]
f.close()
assert nxp*nyp*nzp==comm.Get_size(), "The loaded state requires {} processors, but "\
"we are currently running with {}.".format(nxp*nyp*nzp,comm.Get_size())
if not parallel: sb.proceed()
func_load = None
proc_grid_ext = None
nxp_ext,nyp_ext,nzp_ext = 3*[None]
return cls(comm,func_load,
num_ghost,chunks_per_proc,origin,spacing,periodicity,bounds,
proc_grid,nxp,nyp,nzp,
proc_grid_ext,nxp_ext,nyp_ext,nzp_ext,
nghbr,field,symmetries)
def load_field(self,key,io_limit=None,dtype=np.float64,verbose=True):
'''Loads the required chunks from file'''
from .field import Field3d
import numpy as np
# Verbose output
if verbose and self.rank==0:
print('[load_field] key={}, io_limit={}'.format(key,io_limit))
# Block execution of some processors if IO is limited
sb = SequentialBlock(io_limit,comm=self.comm,per_node=True)
# Determine which chunks are to be loaded by the current processor
ip_beg_ext = self.ip*self.chunks_per_proc[0]
jp_beg_ext = self.jp*self.chunks_per_proc[1]
kp_beg_ext = self.kp*self.chunks_per_proc[2]
ip_end_ext = ip_beg_ext+self.chunks_per_proc[0]-1
jp_end_ext = jp_beg_ext+self.chunks_per_proc[1]-1
kp_end_ext = kp_beg_ext+self.chunks_per_proc[2]-1
# Get the total size of the field to be loaded
key_grid = self._key_grid(key)
ib = self.proc_grid_ext[key_grid][0][ip_beg_ext]
ie = self.proc_grid_ext[key_grid][1][ip_end_ext]
jb = self.proc_grid_ext[key_grid][2][jp_beg_ext]
je = self.proc_grid_ext[key_grid][3][jp_end_ext]
kb = self.proc_grid_ext[key_grid][4][kp_beg_ext]
ke = self.proc_grid_ext[key_grid][5][kp_end_ext]
nxl = ie-ib+1
nyl = je-jb+1
nzl = ke-kb+1
# Allocate an array to hold the entire field
data = np.empty(
(nxl+2*self.nghx,
nyl+2*self.nghy,
nzl+2*self.nghz),dtype=dtype)
# Compute origin of subfield
origin = (self.origin[key_grid][0]+(ib-1-self.nghx)*self.spacing[0],
self.origin[key_grid][1]+(jb-1-self.nghy)*self.spacing[1],
self.origin[key_grid][2]+(kb-1-self.nghz)*self.spacing[2])
# Create a Field3d
self.field[key] = Field3d(data,origin,self.spacing)
# Go through each chunk to be read and construct the field
for ip_ext in range(ip_beg_ext,ip_end_ext+1):
for jp_ext in range(jp_beg_ext,jp_end_ext+1):
for kp_ext in range(kp_beg_ext,kp_end_ext+1):
# Determine rank of the chunk to be read
rank_ext = self.rank_from_position(ip_ext,jp_ext,kp_ext,external=True)
# Compute bounds of this chunk
ib_ext = self.proc_grid_ext[key_grid][0][ip_ext]
ie_ext = self.proc_grid_ext[key_grid][1][ip_ext]
jb_ext = self.proc_grid_ext[key_grid][2][jp_ext]
je_ext = self.proc_grid_ext[key_grid][3][jp_ext]
kb_ext = self.proc_grid_ext[key_grid][4][kp_ext]
ke_ext = self.proc_grid_ext[key_grid][5][kp_ext]
nxl_ext = ie_ext-ib_ext+1
nyl_ext = je_ext-jb_ext+1
nzl_ext = ke_ext-kb_ext+1
# Read data and insert it
subfield = self.func_load(rank_ext,key)
self.field[key].insert_subfield(subfield)
# Continue execution of waiting processors if IO was limited
sb.proceed()
if verbose:
print('[load_field] rank {} done with I/O.'.format(self.rank))
# Exchange ghost cells
self.exchange_ghost_cells(key)
# Initialize symmetries and impose BC
self.init_symmetry(key)
self.impose_boundary_conditions(key)
def differentiate(self,key,axis,key_out=None,on_pressure_grid=False):
assert axis<3, "'axis' must be one of 0,1,2."
if key_out is None:
key_out = 'd'+key+('dx','dy','dz')[axis]
key_grid = self._key_grid(key)
origin = list(self.origin[key_grid])
shifting_state = self.shifting_state(key,axis=axis)
if shifting_state==-1:
shift_origin = 'after'
origin[axis] += 0.5*self.spacing[axis]
elif shifting_state==0:
shift_origin = 'before'
origin[axis] -= 0.5*self.spacing[axis]
elif shifting_state==1:
shift_origin = 'before'
origin[axis] -= 0.5*self.spacing[axis]
else:
raise ValueError("Invalid shifting state.")
self.copy(key,key_out,skip_data=True)
self.field[key_out] = self.field[key].derivative(axis,only_keep_interior=False,shift_origin=shift_origin)
self.origin[key_out] = tuple(origin)
if axis==0:
self.symmetries[key_out][0,1,1] = -self.symmetries[key_out][0,1,1]
self.symmetries[key_out][2,1,1] = -self.symmetries[key_out][2,1,1]
elif axis==1:
self.symmetries[key_out][1,0,1] = -self.symmetries[key_out][1,0,1]
self.symmetries[key_out][1,2,1] = -self.symmetries[key_out][1,2,1]
elif axis==2:
self.symmetries[key_out][1,1,0] = -self.symmetries[key_out][1,1,0]
self.symmetries[key_out][1,1,2] = -self.symmetries[key_out][1,1,2]
# Exchange ghost cells and set boundary conditions
self.exchange_ghost_cells(key_out)
self.impose_boundary_conditions(key_out)
# Shift to pressure grid if desired
if on_pressure_grid:
self.shift_to_pressure_grid(key_out,key_out=key_out)
def allocate(self,key_grid,fill=None,pseudo=False,dtype=np.float64):
from .field import Field3d
ib = self.proc_grid[key_grid][0][self.ip]
ie = self.proc_grid[key_grid][1][self.ip]
jb = self.proc_grid[key_grid][2][self.jp]
je = self.proc_grid[key_grid][3][self.jp]
kb = self.proc_grid[key_grid][4][self.kp]
ke = self.proc_grid[key_grid][5][self.kp]
origin = (self.origin[key_grid][0]+(ib-1-self.nghx)*self.spacing[0],
self.origin[key_grid][1]+(jb-1-self.nghy)*self.spacing[1],
self.origin[key_grid][2]+(kb-1-self.nghz)*self.spacing[2])
dim = (ie-ib+1+2*self.nghx,
je-jb+1+2*self.nghy,
ke-kb+1+2*self.nghz)
if pseudo:
return Field3d.pseudo_field(dim,origin,self.spacing)
else:
return Field3d.allocate(dim,origin,self.spacing,fill=fill,dtype=dtype)
def qcriterion(self,key_out=None,from_pressure=False,keep_derivatives=False):
'''Computes Q-criterion'''
from .field import Field3d
if key_out is None:
key_out = 'Q'
if from_pressure:
assert 'p' in self.field, "Pressure field is not loaded."
self.copy('p',key_out,skip_data=True)
self.field[key_out] = 0.5*self.field['p'].laplacian(only_keep_interior=False)
else:
assert 'u' in self.field, "Velocity field is not loaded: u"
assert 'v' in self.field, "Velocity field is not loaded: v"
assert 'w' in self.field, "Velocity field is not loaded: w"
self.copy('p',key_out,skip_data=True,skip_symmetries=True)
self.init_symmetry(key_out)
self.field[key_out] = self.allocate('p',fill=0.0,dtype=self.field['u'].dtype())
keyx = ('x','y','z')
keyu = ('u','v','w')
for ii in range(3):
key_tmp = 'd'+keyu[ii]+'d'+keyx[ii]
parprint(key_tmp)
self.differentiate(keyu[ii],axis=ii,key_out=key_tmp,on_pressure_grid=True)
self.field[key_out] += self.field[key_tmp]**2
if not keep_derivatives:
self.delete(key_tmp)
#
iip1 = (ii+1)%3
key_tmp1 = 'd'+keyu[ii]+'d'+keyx[iip1]
key_tmp2 = 'd'+keyu[iip1]+'d'+keyx[ii]
parprint(key_tmp1,key_tmp2)
self.differentiate(keyu[ii],axis=iip1,key_out=key_tmp1,on_pressure_grid=True)
self.differentiate(keyu[iip1],axis=ii,key_out=key_tmp2,on_pressure_grid=True)
self.field[key_out] += 2.0*self.field[key_tmp1]*self.field[key_tmp2]
if not keep_derivatives:
self.delete(key_tmp1)
self.delete(key_tmp2)
self.field[key_out] *= -0.5
# Exchange ghost cells and set boundary conditions
self.exchange_ghost_cells(key_out)
self.impose_boundary_conditions(key_out)
return
#def vorticity(self,axis=None,key_out=None,keep_derivatives=False,on_pressure_grid=True):
def arithmetic_operation(self,key1,operation,key2,key_out=None,on_pressure_grid=True):
import operator
if operation in ('add','+'):
op = operator.iadd
elif operation in ('subtract','sub','-'):
op = operator.isub
elif operation in ('divide','div','/'):
op = operator.itruediv
elif operation in ('multiply','mul','*'):
op = operator.imul
if key_out is None:
key_out = key1
if not self.field[key1].has_same_grid(self.field[key2]) or on_pressure_grid:
self.shift_to_pressure_grid(key1,key_out)
self.shift_to_pressure_grid(key2,'tmp')
op(self.field[key_out],self.field['tmp'])
self.delete('tmp')
else:
self.copy(key1,key_out)
op(self.field[key_out],self.field[key2])
return
def add(self,key1,key2,key_out=None,on_pressure_grid=True):
self.arithmetic_operation(key1,'+',key2,key_out=key_out,on_pressure_grid=on_pressure_grid)
return
def subtract(self,key1,key2,key_out=None,on_pressure_grid=True):
self.arithmetic_operation(key1,'-',key2,key_out=key_out,on_pressure_grid=on_pressure_grid)
return
def multiply(self,key1,key2,key_out=None,on_pressure_grid=True):
self.arithmetic_operation(key1,'*',key2,key_out=key_out,on_pressure_grid=on_pressure_grid)
return
def divide(self,key1,key2,key_out=None,on_pressure_grid=True):
self.arithmetic_operation(key1,'/',key2,key_out=key_out,on_pressure_grid=on_pressure_grid)
return
def gaussian_filter(self,key,sigma,truncate=4.0,key_out=None,iterate=False,verbose=True):
'''Applies a gaussian filter to a field as in-place operation. Sigma is the std of the filter in terms of grid width.'''
import numpy as np
from mpi4py import MPI
if key_out is None:
key_out = key
if key!=key_out:
self.copy(key,key_out,skip_data=True)
# Compute radius of Gaussian filter
radius = self.field[key].gaussian_filter_radius(sigma,truncate=truncate)
if verbose and self.rank==0:
print('[gaussian_filter] key={}, stencil radius={}'.format(key,radius))
if not iterate:
# Assert that we have sufficient amount of ghost cells
assert all([self.num_ghost[ii]>=radius[ii] for ii in range(3)]),\
"Too few ghost cells for stencil: {}, {}".format(self.num_ghost,radius)
niter = 1
else:
# Determine number of iterations required for current ghost cell setup
sigma = np.array(sigma)
niter = 1
while not all([self.num_ghost[ii]>=radius[ii] for ii in range(3)]):
sigma /= np.sqrt(2)
niter *= 2
radius = self.field[key].gaussian_filter_radius(sigma,truncate=truncate)
assert all([radius[ii]>0 if sigma[ii]>0.0 else True for ii in range(3)]),\
"Iterative procedure leads to invalid stencil radius: "\
"increase number of ghost cells. {}".format(radius)
if verbose and self.rank==0:
print('[gaussian_filter] iterations={}'.format(niter))
for iiter in range(niter):
if verbose and self.rank==0:
print('[gaussian_filter] iter #{:d}: '.format(iiter),end='')
tbeg = MPI.Wtime()
# Filter field: if key_out is None, perform operation inplace
self.field[key_out] = self.field[key].gaussian_filter(sigma,
truncate=truncate,only_keep_interior=False)
# Exchange ghost cells and set boundary conditions
self.exchange_ghost_cells(key_out)
self.impose_boundary_conditions(key_out)
# Iterate inplace from now on
key = key_out
if verbose and self.rank==0:
print('{:g} sec'.format(MPI.Wtime()-tbeg))
def broadcast(self,key,operation,arg,key_out=None):
'''Broadcasts an operation involving a scalar or matrix on
the entire grid. If 'arg' is a matrix, it must be three-dimensional
and its axes must be singular or of length nx/ny/nz.'''
import numpy as np
import operator
# Broadcast argument
arg = self.comm.bcast(arg,root=0)
# Select operator
if operation in ('add','+'):
op = operator.iadd
elif operation in ('subtract','sub','-'):
op = operator.isub
elif operation in ('divide','div','/'):
op = operator.itruediv
elif operation in ('multiply','mul','*'):
op = operator.imul
elif operation in ('power','pow','^','**'):
op = operator.ipow
else:
raise ValueError("Invalid operation: {}".format(operation))
key_grid = self._key_grid(key)
if isinstance(arg,np.ndarray):
sl_arg = 3*[slice(None)]
for axis in range(3):
if arg.shape[axis]==1:
continue
elif arg.shape[axis]==self.dim(key,axis=axis):
pos = self.position_from_rank(self.rank,external=False)[axis]
sl_arg[axis] = slice(
self.proc_grid[key_grid][2*axis][pos]-1,
self.proc_grid[key_grid][2*axis+1][pos])
else:
raise ValueError("'arg' must either be singular or match global "\
"grid dimension. (axis={}: got {:d}, expected {:d}".format(
axis,arg.shape[axis],self.dim(key,axis=axis)))
# Only operate on interior and communcate ghosts later
sl_int = tuple(slice(self.num_ghost[ii],-self.num_ghost[ii]) for ii in range(3))
sl_arg = tuple(sl_arg)
if key_out is None:
op(self.field[key].data[sl_int],arg[sl_arg])
else:
self.copy(key,key_out)
op(self.field[key_out].data[sl_int],arg[sl_arg])
# Exchange ghost cells and set boundary conditions
self.exchange_ghost_cells(key)
self.impose_boundary_conditions(key)
elif isinstance(arg,(int,float)):
if key_out is None:
op(self.field[key].data,arg)
else:
self.copy(key,key_out)
op(self.field[key_out].data,arg)
return
def integrate(self,key,integrate_axis,ufunc=None,ignore_nan=False,average=False):
'''Computes a global integral or average along a given axis applying the
function 'ufunc' to each node. The result is returned by rank 0, all other
ranks return None.'''
from mpi4py import MPI
import numpy as np
if integrate_axis is None:
integrate_axis = tuple(self.periodicity)
# Compute local integral, but get weights separately
idx_origin = tuple(self.num_ghost[ii]-1 for ii in range(3))
(integ_local,weights_local) = self.field[key].extract_subfield(
idx_origin,self.chunk_size(key,axis=None)).integral(
integrate_axis,ufunc=ufunc,ignore_nan=ignore_nan,
average=average,return_weights=True)
# Make sure weights_local is a numpy array
weights_local = np.array(weights_local)
# Simple implementation: all processor communicate directly to root
key_grid = self._key_grid(key)
if self.rank==0:
dim_final = tuple(1 if integrate_axis[axis] else self.dim(key,axis=axis) for axis in range(3))
# Allocate a nice spot of memory
integ = np.zeros(dim_final,dtype=integ_local.dtype)
if average:
weights = np.zeros(dim_final,dtype=weights_local.dtype)
# Receive from peers
for rank_src in range(0,self.nproc):
# Determine target array position
pos = self.position_from_rank(rank_src,external=False)
sl = 3*[slice(None)]
for axis in range(3):
if integrate_axis[axis]:
sl[axis] = slice(0,1)
else:
sl[axis] = slice(
self.proc_grid[key_grid][2*axis][pos[axis]]-1,
self.proc_grid[key_grid][2*axis+1][pos[axis]])
sl = tuple(sl)
# Receive data and put it in a good spot
if rank_src==0:
integ[sl] += integ_local
if average:
weights[sl] += weights_local
else:
integ[sl] += self.comm.recv(source=rank_src,tag=1)
if average:
weights[sl] += self.comm.recv(source=rank_src,tag=2)
if average:
return integ/weights
else:
return integ/weights_local
else:
self.comm.send(integ_local,dest=0,tag=1)
if average:
self.comm.send(weights_local,dest=0,tag=2)
return None
def shift_to_pressure_grid(self,key,key_out=None):
from .field import Field3d
if key_out is None:
if key in ('u','v','w'):
raise ValueError("In-place shift of {'u','v','w'} is not permitted.")
key_out = key
# If pressure is loaded, use field information, else create a
# fake Field3d object so that we can use its methods.
field_ref = self.allocate('p',pseudo=True)
# Compute the relative shift and shift it
rel_shift = self.field[key].relative_shift(field_ref)
self.field[key_out] = self.field[key].shift_origin(rel_shift)
# Match number of gridpoints
sl = 3*[slice(None)]
for axis in range(3):
if not self.field[key_out].has_same_dim(field_ref,axis=axis):
sl[axis] = slice(0,field_ref.dim(axis=axis))
sl = tuple(sl)
self.field[key_out].data = self.field[key_out].data[sl]
# Copy metadata of pressure grid
self.copy('p',key_out,skip_data=True,skip_symmetries=True)
self.symmetries[key_out] = self.symmetries[key].copy()
# Exchange ghost cells and set boundary conditions
self.exchange_ghost_cells(key_out)
self.impose_boundary_conditions(key_out)
def init_symmetry(self,key):
'''Sets mirror symmetries for ghost cells behind the wall. Use
impose_symmetries_noslip() afterwards to set appropriate
no-slip boundary conditions.'''
import numpy as np
self.symmetries[key] = np.zeros((3,3,3),dtype='i')
for axis in range(3):
self.init_mirror_symmetry(key,axis,-1)
self.init_mirror_symmetry(key,axis,+1)
return
def init_mirror_symmetry(self,key,axis,wall,invert=False):
'''Applies symmetry: [u,v,w](x,-y,z) -> [u,-v,w](x,y,z)
Proposed by Lozano-Duran JFM 2016 and used for free-slip.'''
if self.periodicity[axis]:
return
if wall in (-1,'low'): iwall = 0
elif wall in (1,'high'): iwall = 2
else: raise ValueError('Invalid value for "wall". Valid values are -1 / "low" or 1 / "high".')
keyu = ('u','v','w')
# Get position in symmetries array to be edited
sl = [1,1,1]
sl[axis] = iwall
sl = tuple(sl)
if key==keyu[axis]:
self.symmetries[key][sl] = -1 if not invert else +1
else:
self.symmetries[key][sl] = +1 if not invert else -1
return
def init_inversion_symmetry(self,key,axis,wall):
'''Applies symmetry: [u,v,w](x,-y,z) -> [-u,v,-w](x,y,z)'''
return self.init_mirror_symmetry(key,axis,wall,invert=True)
def copy(self,key,key_out,skip_data=False,skip_symmetries=False):
'''Copies a field to a new key.'''
if key==key_out:
return
key_grid = self._key_grid(key)
self.origin[key_out] = tuple(self.origin[key_grid])
self.proc_grid[key_out] = self.proc_grid[key_grid].copy()
if not skip_symmetries:
self.symmetries[key_out] = self.symmetries[key].copy()
if not skip_data:
self.field[key_out] = self.field[key].copy()
return
def delete(self,key):
'''Removes a field from memory.'''
assert key in self.field, "Field {} cannot be deleted, because it "\
"does not exist.".format(key)
import gc
del self.field[key]
del self.symmetries[key]
key_grid = self._key_grid(key)
if key_grid not in ('u','v','w','p','s'):
del self.origin[key_grid]
del self.proc_grid[key_grid]
gc.collect()
return
def save_state(self,file,parallel=False,verbose=True):
import h5py
from mpi4py import MPI
if verbose and self.rank==0:
print('[save_state] file={}'.format(file))
tbeg = MPI.Wtime()
ascii_type = h5py.string_dtype('ascii',32)
# Only use parallel IO if flag is set and h5py has MPIIO support
parallel = (h5py.h5fd.MPIO>=0) and parallel
if parallel:
f = h5py.File(file,'w',driver='mpio',comm=self.comm)
else:
sb = SequentialBlock(1,comm=self.comm,per_node=False)
f = h5py.File(file,'w') if self.rank==0 else h5py.File(file,'a')
# Write attributes which are the same for all ranks
if parallel or self.rank==0:
g = f.create_group('/PPP')
# Create a variable which stores all the field names
d = g.create_dataset('field_names',len(self.origin),dtype=ascii_type)
for ii,key in enumerate(self.origin):
d[ii] = key
d = g.create_dataset('field_loaded',len(self.field),dtype=ascii_type)
for ii,key in enumerate(self.field):
d[ii] = key
# Store all global field dependent data
for key in self.origin:
g2 = g.create_group(key)
d = g2.create_dataset('origin',3,dtype='f')
if self.rank==0: d[:] = self.origin[key]
d = g2.create_dataset('ibeg',self.nxp,dtype='i')
if self.rank==0: d[:] = self.proc_grid[key][0]
d = g2.create_dataset('iend',self.nxp,dtype='i')
if self.rank==0: d[:] = self.proc_grid[key][1]
d = g2.create_dataset('jbeg',self.nyp,dtype='i')
if self.rank==0: d[:] = self.proc_grid[key][2]
d = g2.create_dataset('jend',self.nyp,dtype='i')
if self.rank==0: d[:] = self.proc_grid[key][3]
d = g2.create_dataset('kbeg',self.nzp,dtype='i')
if self.rank==0: d[:] = self.proc_grid[key][4]
d = g2.create_dataset('kend',self.nzp,dtype='i')
if self.rank==0: d[:] = self.proc_grid[key][5]
# Store all global field independent data
d = g.create_dataset('num_ghost',3,dtype='i')
if self.rank==0: d[:] = self.num_ghost
d = g.create_dataset('chunks_per_proc',3,dtype='i')
if self.rank==0: d[:] = self.chunks_per_proc
d = g.create_dataset('spacing',3,dtype='f')
if self.rank==0: d[:] = self.spacing
d = g.create_dataset('periodicity',3,dtype='i')
if self.rank==0: d[:] = self.periodicity
d = g.create_dataset('bounds',6,dtype='f')
if self.rank==0: d[:] = self.bounds
d = g.create_dataset('nxp',1,'i')
if self.rank==0: d[:] = self.nxp
d = g.create_dataset('nyp',1,'i')
if self.rank==0: d[:] = self.nyp
d = g.create_dataset('nzp',1,'i')
if self.rank==0: d[:] = self.nzp
# Collectively create groups and datasets for rank-specific data
for ii in range(self.nproc):
if parallel or ii==self.rank:
g1 = f.create_group('{:05d}'.format(ii))
g1.create_dataset('nghbr',self.nghbr.shape,data=self.nghbr)
for key in self.field:
# To support parallel h5py, we need to create the groups here and cannot use
# the 'save' method of Field3d
g2 = g1.create_group('{}'.format(key))
g2.create_dataset('origin',3,dtype='f')
g2.create_dataset('spacing',3,dtype='f')
g2.create_dataset('data',
self.chunk_size(key,rank=ii,incl_ghost=True),
dtype=self.chunk_dtype(key))
g2.create_dataset('symmetries',(3,3,3),dtype='i')
# Independent write
grp_rank = '{:05d}'.format(self.rank)
f[grp_rank]['nghbr'][:] = self.nghbr
for key in self.field:
f[grp_rank][key]['origin'][:] = self.field[key].origin
f[grp_rank][key]['spacing'][:] = self.field[key].spacing
f[grp_rank][key]['data'][:] = self.field[key].data
f[grp_rank][key]['symmetries'][:] = self.symmetries[key]
f.close()
if not parallel: sb.proceed()
self.comm.Barrier()
tend = MPI.Wtime()
if verbose and self.rank==0:
print("[save_state] elapsed time: {:f}".format(tend-tbeg))
return
def save_for_vtk(self,file,key,stride=(1,1,1),truncate=True,merge_at_root=True,on_pressure_grid=True,verbose=True):
'''Saves a field for visualization purposes. This means it will only have a single
lower ghost cell if there is an upper neighbor, and both a single and an upper
ghost cell if there is no upper neighbor (or merged).'''
import h5py
from mpi4py import MPI
# Recursive saving if key is a list. Take care of 'truncate'.
if isinstance(key,(tuple,list)):
for key_ in key:
self.save_for_vtk(file,key_,stride=stride,merge_at_root=merge_at_root,
truncate=truncate,on_pressure_grid=on_pressure_grid)
truncate = False
return
# Since the data is usually much smaller than a full 'save_state', I only
# implement sequential IO for now.
if verbose and self.rank==0:
print('[save_for_vtk] key={}, file={}'.format(key,file))
tbeg = MPI.Wtime()
# If flag is set, shift data onto pressure grid first. Use a temporary field for this.
name = key
if on_pressure_grid:
key_tmp = 'tmp'
self.shift_to_pressure_grid(key,key_out='tmp')
key = key_tmp
# Get the subfield and save them
if merge_at_root:
fld = self._merge_at_root(key,stride=stride)
if self.rank==0: fld.save(file,name=name,truncate=truncate)
else:
sb = SequentialBlock(1,comm=self.comm,per_node=False)
if self.rank!=0: truncate=False
name += '/{:05d}'.format(self.rank)
self._subfield_for_vtk(key,stride=stride).save(file,name=name,truncate=truncate)
sb.proceed()
# Free the temporary field (if it was created)
if on_pressure_grid: self.delete(key_tmp)
# Sync (important in case there is another write to the same file following!)
self.comm.Barrier()
# Print timing
tend = MPI.Wtime()
if verbose and self.rank==0:
print("[save_for_vtk] elapsed time: {:f}".format(tend-tbeg))
return
def to_vtk(self,key,stride=(1,1,1),merge_at_root=False):
'''Returns the field (only its interior + some ghost cells for plotting)
as a vtk mesh. If other ghost cells are required, use one of the _subfield
methods and apply .to_vtk() to the result.'''
from .field import Field3d
if merge_at_root:
return self._merge_at_root(key,stride=stride).to_vtk()
else:
return self._subfield_for_vtk(key,stride=stride).to_vtk()
def vtk_contour(self,key,val,stride=(1,1,1)):
'''Compute isocontour for chunks.'''
return self._subfield_for_vtk(key,stride=stride).vtk_contour(val)
def vtk_slice(self,key,normal,origin,stride=(1,1,1)):
'''Extracts a plane from field.'''
return self._subfield_for_vtk(key,stride=stride).vtk_slice(normal,origin)
def _subfield_for_vtk(self,key,stride=(1,1,1)):
'''Extracts a subfield from chunk which has one leading and no trailing
ghost cell except when there is no trailing neighbor. This guarantees
non-overlapping contours.'''
no_upper_ghost = [True,True,True]
if self.ip==self.nxp-1: no_upper_ghost[0]=False
if self.jp==self.nyp-1: no_upper_ghost[1]=False
if self.kp==self.nzp-1: no_upper_ghost[2]=False
return self._subfield(key,stride,(1,1,1),no_upper_ghost=no_upper_ghost)
def _subfield_for_vtk_merge(self,key,stride=(1,1,1)):
'''Extracts a subfield from chunk which has one leading and no trailing
ghost cell except when there is no trailing neighbor. This guarantees
non-overlapping contours.'''
no_lower_ghost = [True,True,True]
no_upper_ghost = [True,True,True]
if self.ip==self.nxp-1: no_upper_ghost[0]=False
if self.jp==self.nyp-1: no_upper_ghost[1]=False
if self.kp==self.nzp-1: no_upper_ghost[2]=False
return self._subfield(key,stride,(1,1,1),no_lower_ghost=no_lower_ghost,
no_upper_ghost=no_upper_ghost)
def _subfield_interior(self,key,stride=(1,1,1)):
return self._subfield(key,stride,(0,0,0))
def _subfield(self,key,stride,num_ghost,no_lower_ghost=(False,False,False),
no_upper_ghost=(False,False,False),
deep=True):
'''Returns the field with a stride applied.'''
stride = np.array(stride,dtype=int)
num_ghost = np.array(num_ghost,dtype=int)
no_lower_ghost = np.array(no_lower_ghost,dtype=bool)
no_upper_ghost = np.array(no_upper_ghost,dtype=bool)
assert len(stride)==3, "'stride' needs to be of length 3."
assert len(num_ghost)==3, "'num_ghost' needs to be of length 3."
assert len(no_lower_ghost)==3, "'no_lower_ghost' needs to be of length 3."
assert len(no_upper_ghost)==3, "'no_upper_ghost' needs to be of length 3."
assert all(np.array(self.num_ghost)>=stride*num_ghost), "At least stride*num_ghost "\
"ghost cells are required in the original field. Have {}, need {}.".format(
tuple(self.num_ghost),tuple(stride*num_ghost))
key_grid = self._key_grid(key)
ib = self.proc_grid[key_grid][0][self.ip]-1
ie = self.proc_grid[key_grid][1][self.ip]-1
jb = self.proc_grid[key_grid][2][self.jp]-1
je = self.proc_grid[key_grid][3][self.jp]-1
kb = self.proc_grid[key_grid][4][self.kp]-1
ke = self.proc_grid[key_grid][5][self.kp]-1
ijkb = np.array((ib,jb,kb))
ijke = np.array((ie,je,ke))
# Determine the first point to consider in this chunk for the case of output
# without ghost cells
idx_origin = (np.ceil(ijkb/stride)*stride).astype(int)-ijkb+self.num_ghost
# Now the last point to consider under the same circumstances
idx_endpoint = (np.floor(ijke/stride)*stride).astype(int)-ijkb+self.num_ghost
# If we want the subfield to have ghost cells, shift the origin and endpoint
idx_origin -= (~no_lower_ghost).astype(int)*stride*num_ghost
idx_endpoint += (~no_upper_ghost).astype(int)*stride*num_ghost
# Compute the number of points
num_points = ((idx_endpoint-idx_origin)/stride+1).astype(int)
# Some last assertions which should never fail
# print(self.rank,idx_origin,idx_endpoint,num_points,self.field[key].dim(),self.field[key].dim())
assert all(idx_origin>=0)
assert all(idx_endpoint<np.array(self.field[key].dim()))
# Return the subfield
return self.field[key].extract_subfield(idx_origin,num_points,stride=stride,deep=deep)
def _merge_at_root(self,key,stride=(1,1,1)):
'''Returns the entire field gathered from all processors with a
stride applied as a Field3d.'''
from .field import Field3d
if self.rank==0:
stride = np.array(stride)
# Allocate the full output field
key_grid = self._key_grid(key)
origin = np.array(self.origin[key_grid])
spacing = stride*np.array(self.spacing)
dim = (np.array(self.dim(key,axis=None))/stride+1).astype(int)
# the following seems necessary and seems to work, but i didn't think much about it
for axis in range(3):
if self.dim(key,axis=axis)%stride[axis]!=0:
dim[axis]+=1
output = Field3d.allocate(dim,origin,spacing,dtype=self.field[key].dtype())
# Recieve subfields and insert them
output.insert_subfield(self._subfield_for_vtk_merge(key,stride=stride))
for rank_src in range(1,self.nproc):
output.insert_subfield(self.comm.recv(source=rank_src))
return output
else:
self.comm.send(self._subfield_for_vtk_merge(key,stride=stride),dest=0)
return None
def rank_from_position(self,ip,jp,kp,external=False):
if external:
nyp,nzp = self.nyp_ext,self.nzp_ext
else:
nyp,nzp = self.nyp,self.nzp
return rank_from_position(ip,jp,kp,nyp,nzp)
def position_from_rank(self,rank,external=False):
if external:
nyp,nzp = self.nyp_ext,self.nzp_ext
else:
nyp,nzp = self.nyp,self.nzp
return position_from_rank(rank,nyp,nzp)
def shifting_state(self,key,axis=None):
'''Returns the relative shift of first global grid point to domain
boundaries as either 1 (= +0.5h), 0 (= 0h) or -1 (= -0.5h).'''
if axis is None:
return tuple(self.shifting_state(key,axis=ii) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
key_grid = self._key_grid(key)
return int(round((self.origin[key_grid][axis]-self.bounds[2*axis])/(0.5*self.spacing[axis])))
def chunk_size(self,key,axis=None,rank=None,incl_ghost=False):
'''Returns size of a chunk.'''
if axis is None:
return tuple(self.chunk_size(key,
axis=ii,rank=rank,incl_ghost=incl_ghost) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
key_grid = self._key_grid(key)
if rank is None:
rank=self.rank
pos = self.position_from_rank(rank,external=False)
r = self.proc_grid[key_grid][2*axis+1][pos[axis]]-self.proc_grid[key_grid][2*axis][pos[axis]]+1
if incl_ghost:
r+=2*self.num_ghost[axis]
return r
def chunk_dtype(self,key):
return self.field[key].dtype()
def dim(self,key,axis=None):
'''Returns the total number of gridpoints across all processors
without ghost cells.'''
if axis is None:
return tuple(self.dim(key,axis=ii) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2."
key_grid = self._key_grid(key)
return self.proc_grid[key_grid][2*axis+1][-1]
def exchange_ghost_cells(self,key):
'''Communicates all ghost cells of specified field'''
# Trigger non-blocking communication:
# Communicate faces (6 faces)
self._communicate_ghost_cells(key,(-1,0,0)) # left
self._communicate_ghost_cells(key,(+1,0,0)) # right
self._communicate_ghost_cells(key,(0,-1,0)) # down
self._communicate_ghost_cells(key,(0,+1,0)) # up
self._communicate_ghost_cells(key,(0,0,-1)) # front
self._communicate_ghost_cells(key,(0,0,+1)) # back
# Communicate edges (12 edges)
self._communicate_ghost_cells(key,(-1,-1,0)) # left,down
self._communicate_ghost_cells(key,(-1,0,-1)) # left,front
self._communicate_ghost_cells(key,(-1,+1,0)) # left,up
self._communicate_ghost_cells(key,(-1,0,+1)) # left,back
self._communicate_ghost_cells(key,(+1,-1,0)) # right,down
self._communicate_ghost_cells(key,(+1,0,-1)) # right,front
self._communicate_ghost_cells(key,(+1,+1,0)) # right,up
self._communicate_ghost_cells(key,(+1,0,+1)) # right,back
self._communicate_ghost_cells(key,(0,-1,-1)) # down,front
self._communicate_ghost_cells(key,(0,-1,+1)) # down,back
self._communicate_ghost_cells(key,(0,+1,-1)) # up,front
self._communicate_ghost_cells(key,(0,+1,+1)) # up,back
# Communicate corners (8 corners)
self._communicate_ghost_cells(key,(-1,-1,-1)) # left,down,front
self._communicate_ghost_cells(key,(-1,-1,+1)) # left,down,back
self._communicate_ghost_cells(key,(-1,+1,-1)) # left,up,front
self._communicate_ghost_cells(key,(-1,+1,+1)) # left,up,back
self._communicate_ghost_cells(key,(+1,-1,-1)) # right,down,front
self._communicate_ghost_cells(key,(+1,-1,+1)) # right,down,back
self._communicate_ghost_cells(key,(+1,+1,-1)) # right,up,front
self._communicate_ghost_cells(key,(+1,+1,+1)) # right,up,back
def impose_boundary_conditions(self,key):
'''Imposes symmetry boundary conditions on each non-periodic wall'''
self._symmetrize_wall(key,(-1,0,0))
self._symmetrize_wall(key,(+1,0,0))
self._symmetrize_wall(key,(0,-1,0))
self._symmetrize_wall(key,(0,+1,0))
self._symmetrize_wall(key,(0,0,-1))
self._symmetrize_wall(key,(0,0,+1))
def _communicate_ghost_cells(self,key,positionDst):
'''Triggers communication of ghost cells'''
import numpy as np
assert np.max(positionDst)<=1 and np.min(positionDst)>=-1, "communicate_ghost_cells: "\
"invalid neighbor position {}".format(positionDst)
# [send/recv] get the rank of the neighbor where data is to be sent to
# The position is passed as values -1,0,+1, but need to be converted to array indices
ip_dst = positionDst[0]+1
jp_dst = positionDst[1]+1
kp_dst = positionDst[2]+1
ip_src = -positionDst[0]+1
jp_src = -positionDst[1]+1
kp_src = -positionDst[2]+1
rank_dst = self.nghbr[ip_dst,jp_dst,kp_dst]
rank_src = self.nghbr[ip_src,jp_src,kp_src]
# [send/recv] create a tag
tag = ip_dst*100+jp_dst*10+kp_dst
# [send/recv] get the indices of data to be sent/received
nxl,nyl,nzl = self.chunk_size(key)
if positionDst[0]==-1:
ii_src = slice(self.nghx,2*self.nghx)
ii_dst = slice(self.nghx+nxl,2*self.nghx+nxl)
elif positionDst[0]==0:
ii_src = slice(self.nghx,self.nghx+nxl)
ii_dst = slice(self.nghx,self.nghx+nxl)
elif positionDst[0]==1:
ii_src = slice(nxl,nxl+self.nghx)
ii_dst = slice(0,self.nghx)
else:
raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[0]))
if positionDst[1]==-1:
jj_src = slice(self.nghy,2*self.nghy)
jj_dst = slice(self.nghy+nyl,2*self.nghy+nyl)
elif positionDst[1]==0:
jj_src = slice(self.nghy,self.nghy+nyl)
jj_dst = slice(self.nghy,self.nghy+nyl)
elif positionDst[1]==1:
jj_src = slice(nyl,nyl+self.nghy)
jj_dst = slice(0,self.nghy)
else:
raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[1]))
if positionDst[2]==-1:
kk_src = slice(self.nghz,2*self.nghz)
kk_dst = slice(self.nghz+nzl,2*self.nghz+nzl)
elif positionDst[2]==0:
kk_src = slice(self.nghz,self.nghz+nzl)
kk_dst = slice(self.nghz,self.nghz+nzl)
elif positionDst[2]==1:
kk_src = slice(nzl,nzl+self.nghz)
kk_dst = slice(0,self.nghz)
else:
raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[2]))
# [send/recv] Create a list for requests
reqsend = None
reqrecv = None
# [send] now send the data to the neighbor, but only if there is one!
# Communication must be done non-blocking, and we can use upper-case routines since this is a numpy array
if rank_dst>=0:
sendbuf = np.ascontiguousarray(self.field[key].data[ii_src,jj_src,kk_src])
reqsend = self.comm.Isend(sendbuf,dest=rank_dst,tag=tag)
# [recv] the corresponding receive: results are stored in a buffer which will be assigned to the parent array later
if rank_src>=0:
recvbuf = np.zeros(
(ii_dst.stop-ii_dst.start,
jj_dst.stop-jj_dst.start,
kk_dst.stop-kk_dst.start),dtype=self.field[key].data.dtype)
reqrecv = self.comm.Irecv(recvbuf,source=rank_src,tag=tag)
# [recv] wait for data to be received
if reqrecv is not None:
reqrecv.wait()
self.field[key].data[ii_dst,jj_dst,kk_dst] = recvbuf
# [send] wait for data to be sent
if reqsend is not None:
reqsend.wait()
def _symmetrize_wall(self,key,positionBd):
import numpy as np
assert np.max(positionBd)<=1 and np.min(positionBd)>=-1, "symmetrize_wall: invalid neighbor "\
"position {}".format(positionBd)
assert np.count_nonzero(positionBd)==1, "symmetrize_wall: only face direction is accepted "\
"(no edges or corners)"
inghbr = positionBd[0]+1
jnghbr = positionBd[1]+1
knghbr = positionBd[2]+1
if self.nghbr[inghbr,jnghbr,knghbr]<0:
sl_dst = [slice(None),slice(None),slice(None)]
sl_src = [slice(None),slice(None),slice(None)]
for axis in range(3):
if positionBd[axis]==-1: # lower boundary
# index of first point within the domain
idx = self.field[key].nearest_gridpoint(self.bounds[2*axis],axis=axis)
dist = self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis]
if np.abs(dist)>self.field[key].eps_collapse and np.sign(dist)<0.0:
idx += 1
dist = self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis]
# distance first point to wall: should be either 0 or dx/2
assert (dist<=self.field[key].eps_collapse or
(dist-0.5*self.spacing[axis])<=self.field[key].eps_collapse)
if np.abs(dist)>=0.25*self.spacing[axis]: # no point on boundary
sl_dst[axis] = slice(0,idx,1)
sl_src[axis] = slice(2*idx-1,idx-1,-1)
else: # point on boundary
sl_dst[axis] = slice(0,idx,1)
sl_src[axis] = slice(2*idx,idx,-1)
break
elif positionBd[axis]==1: # upper boundary
# index of last point within the domain
idx = self.field[key].nearest_gridpoint(self.bounds[2*axis+1],axis=axis)
dist = self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis+1]
if np.abs(dist)>self.field[key].eps_collapse and np.sign(dist)>0.0:
idx -= 1
dist = self.field[key].coordinate(idx,axis=axis)-self.bounds[2*axis+1]
# distance last point to wall: should be either 0 or -dx/2
assert (dist<=self.field[key].eps_collapse or
(dist+0.5*self.spacing[axis])<=self.field[key].eps_collapse)
if np.abs(dist)>=0.25*self.spacing[axis]: # no point on boundary
sl_dst[axis] = slice(idx+1,self.field[key].dim(axis=axis),1)
sl_src[axis] = slice(idx,2*idx+1-self.field[key].dim(axis=axis),-1)
else: # point on boundary
sl_dst[axis] = slice(idx+1,self.field[key].dim(axis=axis),1)
sl_src[axis] = slice(idx-1,2*idx-self.field[key].dim(axis=axis),-1)
break
# print(sl_src,sl_dst,self.field[key].dim())
self.field[key].data[tuple(sl_dst)] = self.symmetries[key][inghbr,jnghbr,knghbr]*\
self.field[key].data[tuple(sl_src)]
@staticmethod
def _construct_procgrid(comm,proc,chunks_per_proc,num_ghost,periodicity):
import numpy as np
rank = comm.Get_rank()
nxcpp,nycpp,nzcpp = chunks_per_proc
nghx,nghy,nghz = num_ghost
xperiodic,yperiodic,zperiodic = periodicity
# Assert proc
assert all(k in proc for k in ('u','v','w','p','s')), "'proc' must be a dictionary with "\
"keys 'u','v','w','p','s'"
for key in proc:
assert len(proc[key])==6, "Entries of 'proc' must have length of 6."
proc_grid_ext = proc
# Initialize chunks per processor
# Verify that this processor grid can be redistributed onto the requested processor layout
nxp_ext = len(proc_grid_ext['u'][0])
nyp_ext = len(proc_grid_ext['u'][2])
nzp_ext = len(proc_grid_ext['u'][4])
nproc_ext = nxp_ext*nyp_ext*nzp_ext
#
assert nxp_ext%nxcpp==0, "Number of processors must be divisible by the number "\
"of processors per process. (nxp_ext={}, nxcpp={})".format(
nxp_ext,nxcpp)
assert nyp_ext%nycpp==0, "Number of processors must be divisible by the number "\
"of processors per process. (nyp_ext={}, nycpp={})".format(
nyp_ext,nycpp)
assert nzp_ext%nzcpp==0, "Number of processors must be divisible by the number "\
"of processors per process. (nzp_ext={}, nzcpp={})".format(
nzp_ext,nzcpp)
# Determine the new processor layout and verify total number of MPI processes
nxp = nxp_ext//nxcpp
nyp = nyp_ext//nycpp
nzp = nzp_ext//nzcpp
nproc = nxp*nyp*nzp
assert nproc==comm.Get_size(), "Number of MPI processes does not match the requested "\
"processor layout. (MPI procs: {}, required procs: {})".format(
comm.Get_size(),nproc)
# Construct internal processor grid
proc_grid = {}
for key in proc_grid_ext:
proc_grid[key] = [None]*6
proc_grid[key][0] = proc_grid_ext[key][0][0::nxcpp]
proc_grid[key][1] = proc_grid_ext[key][1][nxcpp-1::nxcpp]
proc_grid[key][2] = proc_grid_ext[key][2][0::nycpp]
proc_grid[key][3] = proc_grid_ext[key][3][nycpp-1::nycpp]
proc_grid[key][4] = proc_grid_ext[key][4][0::nzcpp]
proc_grid[key][5] = proc_grid_ext[key][5][nzcpp-1::nzcpp]
# Get position in processor grid
ip,jp,kp = position_from_rank(rank,nyp,nzp)
# Determine local grid indices and size
for key in proc_grid:
nxl = proc_grid[key][1][ip]-proc_grid[key][0][ip]+1
nyl = proc_grid[key][3][jp]-proc_grid[key][2][jp]+1
nzl = proc_grid[key][5][kp]-proc_grid[key][4][kp]+1
# Verify that local grid size is not smaller than ghost cell size
assert (nxl>=nghx and nyl>=nghy and nzl>=nghz),\
"Local grid size must be greater than number "\
"of ghost cells in each direction!"
# Initialize neighbor array
nghbr = np.empty((3,3,3),dtype='int')
# wrap-around x
ipl = ip-1
if ipl<0:
if xperiodic: ipl = nxp-1
else: ipl = -1
ipr = ip+1
if ipr>nxp-1:
if xperiodic: ipr = 0
else: ipr = -1
inghbr = (ipl,ip,ipr)
# wrap-around y
jpl = jp-1
if jpl<0:
if yperiodic: jpl = nyp-1
else: jpl = -1
jpr = jp+1
if jpr>nyp-1:
if yperiodic: jpr = 0
else: jpr = -1
jnghbr = (jpl,jp,jpr)
# wrap-around z
kpl = kp-1
if kpl<0:
if zperiodic: kpl = nzp-1
else: kpl = -1
kpr = kp+1
if kpr>nzp-1:
if zperiodic: kpr = 0
else: kpr = -1
knghbr = (kpl,kp,kpr)
# Construct array of neighbors
for ip in range(3):
for jp in range(3):
for kp in range(3):
# Assign rank to neighbor array
if inghbr[ip]<0 or jnghbr[jp]<0 or knghbr[kp]<0:
nghbr[ip,jp,kp] = -1
else:
nghbr[ip,jp,kp] = rank_from_position(inghbr[ip],jnghbr[jp],knghbr[kp],nyp,nzp)
return (proc_grid,nxp,nyp,nzp,
proc_grid_ext,nxp_ext,nyp_ext,nzp_ext,
nghbr)
def _key_grid(self,key):
'''Get the key for the grid: scalar fields all share a common grid,
so this routine returns 's' if key is 'sX' with X being a (potentially
multi-digit) number. Otherwise the key itself is returned.'''
if key[0]=='s' and key[1:].isnumeric():
return 's'
else:
return key
class SequentialBlock:
'''Block execution of following code for some processors until
'next' is issued.'''
def __init__(self,batch_size,comm=None,per_node=False,tag=420):
from mpi4py import MPI
self.comm = MPI.COMM_WORLD if comm is None else comm
self.tag = tag
self.completed = False
# If batch_size is None, there will be no blocking
if batch_size is None:
self.successor = None
self.predecessor = None
return
# Establish the order at which the ranks are run
if per_node:
# Generate a list with all ranks on the current node
nodename = MPI.Get_processor_name()
nodelist = self.comm.allgather(nodename)
ranking = [ii for ii,x in enumerate(nodelist) if x==nodename]
# Get the position of predecessor/successor in ranking list
position = ranking.index(comm.Get_rank())
idx_pre = position-batch_size
idx_suc = position+batch_size
# Convert to rank
self.predecessor = ranking[idx_pre] if position-batch_size>=0 else None
self.successor = ranking[idx_suc] if position+batch_size<len(ranking) else None
else:
# The ranking is solely the rank in the communicator
position = comm.Get_rank()
self.predecessor = position-batch_size
if self.predecessor<0:
self.predecessor = None
self.successor = position+batch_size
if self.successor>=self.comm.Get_size():
self.successor = None
# Wait for predecessor
if self.predecessor is not None:
self.comm.recv(source=self.predecessor,tag=self.tag)
return
def proceed(self):
self.completed = True
if self.successor is not None:
data = None
self.comm.send(data,dest=self.successor,tag=self.tag)
class GatherIterator:
'''Sends 'data' sequentially to 'root' which can iterate over it
without gathering all data at once. Every process which is not 'root'
receives None.'''
def __init__(self,data,comm=None,root=0,barrier=False):
from mpi4py import MPI
comm = MPI.COMM_WORLD if comm is None else comm
self.comm = comm
self.rank = comm.Get_rank()
self.size = comm.Get_size()
self.root = root
self.barrier = barrier
self.iter = 0
self.data = data
def __iter__(self):
return self
def __next__(self):
if self.iter>=self.size:
if self.barrier: self.comm.Barrier()
raise StopIteration
if self.rank==self.root:
if self.rank==self.iter:
r = self.data
else:
r = self.comm.recv(source=self.iter,tag=0)
else:
if self.rank==self.iter:
self.comm.send(self.data,dest=self.root,tag=0)
r = None
self.iter += 1
return r
def parprint(*args, **kwargs):
from mpi4py import MPI
rank = kwargs.pop('rank') if 'rank' in kwargs else 0
comm = kwargs.pop('comm') if 'comm' in kwargs else MPI.COMM_WORLD
if get_rank(comm=comm)==rank:
print(*args, **kwargs)
def get_rank(comm=None):
from mpi4py import MPI
comm = MPI.COMM_WORLD if comm is None else comm
return comm.Get_rank()
def is_root(comm=None,root=0):
from mpi4py import MPI
comm = MPI.COMM_WORLD if comm is None else comm
return comm.Get_rank()==root
def gather(data,comm=None):
from mpi4py import MPI
comm = MPI.COMM_WORLD if comm is None else comm
return comm.gather(data,root=0)