fixed grid bug for scalar: refined grid are still not possible; created new class SequentialBlock which replaces 'baton' routines.

This commit is contained in:
Michael Krayer 2021-06-07 16:29:51 +02:00
parent eeb14ab818
commit f8e728149f
1 changed files with 115 additions and 82 deletions

View File

@ -81,7 +81,7 @@ class PPP:
if parallel: if parallel:
f = h5py.File(file,'r',driver='mpio',comm=comm) f = h5py.File(file,'r',driver='mpio',comm=comm)
else: else:
baton_wait(io_limit,comm=comm) sb = SequentialBlock(io_limit,comm=comm,per_node=True)
f = h5py.File(file,'r') f = h5py.File(file,'r')
# Read attributes which are the same for all ranks # Read attributes which are the same for all ranks
g = f['PPP'] g = f['PPP']
@ -118,7 +118,7 @@ class PPP:
f.close() f.close()
assert nxp*nyp*nzp==comm.Get_size(), "The loaded state requires {} processors, but "\ assert nxp*nyp*nzp==comm.Get_size(), "The loaded state requires {} processors, but "\
"we are currently running with {}.".format(nxp*nyp*nzp,comm.Get_size()) "we are currently running with {}.".format(nxp*nyp*nzp,comm.Get_size())
if not parallel: baton_pass(io_limit,comm=comm) if not parallel: sb.proceed()
func_load = None func_load = None
proc_grid_ext = None proc_grid_ext = None
nxp_ext,nyp_ext,nzp_ext = 3*[None] nxp_ext,nyp_ext,nzp_ext = 3*[None]
@ -128,12 +128,15 @@ class PPP:
proc_grid_ext,nxp_ext,nyp_ext,nzp_ext, proc_grid_ext,nxp_ext,nyp_ext,nzp_ext,
nghbr,field,symmetries) nghbr,field,symmetries)
def load_field(self,key,io_limit=None,barrier=False,dtype=np.float64): def load_field(self,key,io_limit=None,dtype=np.float64,verbose=False):
'''Loads the required chunks from file''' '''Loads the required chunks from file'''
from .field import Field3d from .field import Field3d
import numpy as np import numpy as np
# Verbose output
if verbose and self.rank==0:
print('[load_field] loading {} (io_limit={})'.format(key,io_limit))
# Block execution of some processors if IO is limited # Block execution of some processors if IO is limited
self._baton_wait(io_limit) sb = SequentialBlock(io_limit,comm=self.comm,per_node=True)
# Determine which chunks are to be loaded by the current processor # Determine which chunks are to be loaded by the current processor
ip_beg_ext = self.ip*self.chunks_per_proc[0] ip_beg_ext = self.ip*self.chunks_per_proc[0]
jp_beg_ext = self.jp*self.chunks_per_proc[1] jp_beg_ext = self.jp*self.chunks_per_proc[1]
@ -142,12 +145,13 @@ class PPP:
jp_end_ext = jp_beg_ext+self.chunks_per_proc[1]-1 jp_end_ext = jp_beg_ext+self.chunks_per_proc[1]-1
kp_end_ext = kp_beg_ext+self.chunks_per_proc[2]-1 kp_end_ext = kp_beg_ext+self.chunks_per_proc[2]-1
# Get the total size of the field to be loaded # Get the total size of the field to be loaded
ib = self.proc_grid_ext[key][0][ip_beg_ext] key_grid = self._key_grid(key)
ie = self.proc_grid_ext[key][1][ip_end_ext] ib = self.proc_grid_ext[key_grid][0][ip_beg_ext]
jb = self.proc_grid_ext[key][2][jp_beg_ext] ie = self.proc_grid_ext[key_grid][1][ip_end_ext]
je = self.proc_grid_ext[key][3][jp_end_ext] jb = self.proc_grid_ext[key_grid][2][jp_beg_ext]
kb = self.proc_grid_ext[key][4][kp_beg_ext] je = self.proc_grid_ext[key_grid][3][jp_end_ext]
ke = self.proc_grid_ext[key][5][kp_end_ext] kb = self.proc_grid_ext[key_grid][4][kp_beg_ext]
ke = self.proc_grid_ext[key_grid][5][kp_end_ext]
nxl = ie-ib+1 nxl = ie-ib+1
nyl = je-jb+1 nyl = je-jb+1
nzl = ke-kb+1 nzl = ke-kb+1
@ -157,9 +161,9 @@ class PPP:
nyl+2*self.nghy, nyl+2*self.nghy,
nzl+2*self.nghz),dtype=dtype) nzl+2*self.nghz),dtype=dtype)
# Compute origin of subfield # Compute origin of subfield
origin = (self.origin[key][0]+(ib-1-self.nghx)*self.spacing[0], origin = (self.origin[key_grid][0]+(ib-1-self.nghx)*self.spacing[0],
self.origin[key][1]+(jb-1-self.nghy)*self.spacing[1], self.origin[key_grid][1]+(jb-1-self.nghy)*self.spacing[1],
self.origin[key][2]+(kb-1-self.nghz)*self.spacing[2]) self.origin[key_grid][2]+(kb-1-self.nghz)*self.spacing[2])
# Create a Field3d # Create a Field3d
self.field[key] = Field3d(data,origin,self.spacing) self.field[key] = Field3d(data,origin,self.spacing)
# Go through each chunk to be read and construct the field # Go through each chunk to be read and construct the field
@ -169,12 +173,12 @@ class PPP:
# Determine rank of the chunk to be read # Determine rank of the chunk to be read
rank_ext = self.rank_from_position(ip_ext,jp_ext,kp_ext,external=True) rank_ext = self.rank_from_position(ip_ext,jp_ext,kp_ext,external=True)
# Compute bounds of this chunk # Compute bounds of this chunk
ib_ext = self.proc_grid_ext[key][0][ip_ext] ib_ext = self.proc_grid_ext[key_grid][0][ip_ext]
ie_ext = self.proc_grid_ext[key][1][ip_ext] ie_ext = self.proc_grid_ext[key_grid][1][ip_ext]
jb_ext = self.proc_grid_ext[key][2][jp_ext] jb_ext = self.proc_grid_ext[key_grid][2][jp_ext]
je_ext = self.proc_grid_ext[key][3][jp_ext] je_ext = self.proc_grid_ext[key_grid][3][jp_ext]
kb_ext = self.proc_grid_ext[key][4][kp_ext] kb_ext = self.proc_grid_ext[key_grid][4][kp_ext]
ke_ext = self.proc_grid_ext[key][5][kp_ext] ke_ext = self.proc_grid_ext[key_grid][5][kp_ext]
nxl_ext = ie_ext-ib_ext+1 nxl_ext = ie_ext-ib_ext+1
nyl_ext = je_ext-jb_ext+1 nyl_ext = je_ext-jb_ext+1
nzl_ext = ke_ext-kb_ext+1 nzl_ext = ke_ext-kb_ext+1
@ -182,20 +186,21 @@ class PPP:
subfield = self.func_load(rank_ext,key) subfield = self.func_load(rank_ext,key)
self.field[key].insert_subfield(subfield) self.field[key].insert_subfield(subfield)
# Continue execution of waiting processors if IO was limited # Continue execution of waiting processors if IO was limited
self._baton_pass(io_limit) sb.proceed()
if verbose:
print('[load_field] rank {} done with I/O.'.format(self.rank))
# Exchange ghost cells # Exchange ghost cells
self.exchange_ghost_cells(key) self.exchange_ghost_cells(key)
# Initialize symmetries and impose BC # Initialize symmetries and impose BC
self.init_symmetry(key) self.init_symmetry(key)
self.impose_boundary_conditions(key) self.impose_boundary_conditions(key)
# Syncronize processes if requested
if barrier: self.comm.Barrier()
def differentiate(self,key,axis,key_out=None,on_pressure_grid=False): def differentiate(self,key,axis,key_out=None,on_pressure_grid=False):
assert axis<3, "'axis' must be one of 0,1,2." assert axis<3, "'axis' must be one of 0,1,2."
if key_out is None: if key_out is None:
key_out = 'd'+key+('/dx','/dy','/dz')[axis] key_out = 'd'+key+('/dx','/dy','/dz')[axis]
origin = list(self.origin[key]) key_grid = self._key_grid(key)
origin = list(self.origin[key_grid])
shifting_state = self.shifting_state(key,axis=axis) shifting_state = self.shifting_state(key,axis=axis)
if shifting_state==-1: if shifting_state==-1:
shift_origin = 'after' shift_origin = 'after'
@ -208,10 +213,9 @@ class PPP:
origin[axis] -= 0.5*self.spacing[axis] origin[axis] -= 0.5*self.spacing[axis]
else: else:
raise ValueError("Invalid shifting state.") raise ValueError("Invalid shifting state.")
self.copy(key,key_out,skip_data=True)
self.field[key_out] = self.field[key].derivative(axis,only_keep_interior=False,shift_origin=shift_origin) self.field[key_out] = self.field[key].derivative(axis,only_keep_interior=False,shift_origin=shift_origin)
self.origin[key_out] = tuple(origin) self.origin[key_out] = tuple(origin)
self.symmetries[key_out] = self.symmetries[key].copy()
self.proc_grid[key_out] = self.proc_grid[key].copy()
if axis==0: if axis==0:
self.symmetries[key_out][0,1,1] = -self.symmetries[key_out][0,1,1] self.symmetries[key_out][0,1,1] = -self.symmetries[key_out][0,1,1]
self.symmetries[key_out][2,1,1] = -self.symmetries[key_out][2,1,1] self.symmetries[key_out][2,1,1] = -self.symmetries[key_out][2,1,1]
@ -362,6 +366,7 @@ class PPP:
op = operator.ipow op = operator.ipow
else: else:
raise ValueError("Invalid operation: {}".format(operation)) raise ValueError("Invalid operation: {}".format(operation))
key_grid = self._key_grid(key)
if isinstance(arg,np.ndarray): if isinstance(arg,np.ndarray):
sl_arg = 3*[slice(None)] sl_arg = 3*[slice(None)]
for axis in range(3): for axis in range(3):
@ -370,8 +375,8 @@ class PPP:
elif arg.shape[axis]==self.dim(key,axis=axis): elif arg.shape[axis]==self.dim(key,axis=axis):
pos = self.position_from_rank(self.rank,external=False)[axis] pos = self.position_from_rank(self.rank,external=False)[axis]
sl_arg[axis] = slice( sl_arg[axis] = slice(
self.proc_grid[key][2*axis][pos]-1, self.proc_grid[key_grid][2*axis][pos]-1,
self.proc_grid[key][2*axis+1][pos]) self.proc_grid[key_grid][2*axis+1][pos])
else: else:
raise ValueError("'arg' must either be singular or match global "\ raise ValueError("'arg' must either be singular or match global "\
"grid dimension. (axis={}: got {:d}, expected {:d}".format( "grid dimension. (axis={}: got {:d}, expected {:d}".format(
@ -412,6 +417,7 @@ class PPP:
# Make sure weights_local is a numpy array # Make sure weights_local is a numpy array
weights_local = np.array(weights_local) weights_local = np.array(weights_local)
# Simple implementation: all processor communicate directly to root # Simple implementation: all processor communicate directly to root
key_grid = self._key_grid(key)
if self.rank==0: if self.rank==0:
dim_final = tuple(1 if integrate_axis[axis] else self.dim(key,axis=axis) for axis in range(3)) dim_final = tuple(1 if integrate_axis[axis] else self.dim(key,axis=axis) for axis in range(3))
# Allocate a nice spot of memory # Allocate a nice spot of memory
@ -428,8 +434,8 @@ class PPP:
sl[axis] = slice(0,1) sl[axis] = slice(0,1)
else: else:
sl[axis] = slice( sl[axis] = slice(
self.proc_grid[key][2*axis][pos[axis]]-1, self.proc_grid[key_grid][2*axis][pos[axis]]-1,
self.proc_grid[key][2*axis+1][pos[axis]]) self.proc_grid[key_grid][2*axis+1][pos[axis]])
sl = tuple(sl) sl = tuple(sl)
# Receive data and put it in a good spot # Receive data and put it in a good spot
if rank_src==0: if rank_src==0:
@ -512,8 +518,9 @@ class PPP:
'''Copies a field to a new key.''' '''Copies a field to a new key.'''
if key==key_out: if key==key_out:
return return
self.origin[key_out] = tuple(self.origin[key]) key_grid = self._key_grid(key)
self.proc_grid[key_out] = self.proc_grid[key].copy() self.origin[key_out] = tuple(self.origin[key_grid])
self.proc_grid[key_out] = self.proc_grid[key_grid].copy()
if not skip_symmetries: if not skip_symmetries:
self.symmetries[key_out] = self.symmetries[key].copy() self.symmetries[key_out] = self.symmetries[key].copy()
if not skip_data: if not skip_data:
@ -527,9 +534,10 @@ class PPP:
import gc import gc
del self.field[key] del self.field[key]
del self.symmetries[key] del self.symmetries[key]
if key not in ('u','v','w','p','s'): key_grid = self._key_grid(key)
del self.origin[key] if key_grid not in ('u','v','w','p','s'):
del self.proc_grid[key] del self.origin[key_grid]
del self.proc_grid[key_grid]
gc.collect() gc.collect()
return return
@ -543,7 +551,7 @@ class PPP:
if parallel: if parallel:
f = h5py.File(file,'w',driver='mpio',comm=self.comm) f = h5py.File(file,'w',driver='mpio',comm=self.comm)
else: else:
self._baton_wait(1) sb = SequentialBlock(1,comm=self.comm,per_node=False)
f = h5py.File(file,'w') if self.rank==0 else h5py.File(file,'a') f = h5py.File(file,'w') if self.rank==0 else h5py.File(file,'a')
# Write attributes which are the same for all ranks # Write attributes which are the same for all ranks
if parallel or self.rank==0: if parallel or self.rank==0:
@ -613,7 +621,7 @@ class PPP:
f[grp_rank][key]['data'][:] = self.field[key].data f[grp_rank][key]['data'][:] = self.field[key].data
f[grp_rank][key]['symmetries'][:] = self.symmetries[key] f[grp_rank][key]['symmetries'][:] = self.symmetries[key]
f.close() f.close()
if not parallel: self._baton_pass(1) if not parallel: sb.proceed()
self.comm.Barrier() self.comm.Barrier()
tend = MPI.Wtime() tend = MPI.Wtime()
if self.rank==0: if self.rank==0:
@ -647,11 +655,11 @@ class PPP:
fld = self._merge_at_root(key,stride=stride) fld = self._merge_at_root(key,stride=stride)
if self.rank==0: fld.save(file,name=name,truncate=truncate) if self.rank==0: fld.save(file,name=name,truncate=truncate)
else: else:
self._baton_wait(1) sb = SequentialBlock(1,comm=self.comm,per_node=False)
if self.rank!=0: truncate=False if self.rank!=0: truncate=False
name += '/{:05d}'.format(self.rank) name += '/{:05d}'.format(self.rank)
self._subfield_for_vtk(key,stride=stride).save(file,name=name,truncate=truncate) self._subfield_for_vtk(key,stride=stride).save(file,name=name,truncate=truncate)
self._baton_pass(1) sb.proceed()
# Free the temporary field (if it was created) # Free the temporary field (if it was created)
if on_pressure_grid: self.delete(key_tmp) if on_pressure_grid: self.delete(key_tmp)
# Sync (important in case there is another write to the same file following!) # Sync (important in case there is another write to the same file following!)
@ -719,12 +727,13 @@ class PPP:
assert all(np.array(self.num_ghost)>=stride*num_ghost), "At least stride*num_ghost "\ assert all(np.array(self.num_ghost)>=stride*num_ghost), "At least stride*num_ghost "\
"ghost cells are required in the original field. Have {}, need {}.".format( "ghost cells are required in the original field. Have {}, need {}.".format(
tuple(self.num_ghost),tuple(stride*num_ghost)) tuple(self.num_ghost),tuple(stride*num_ghost))
ib = self.proc_grid[key][0][self.ip]-1 key_grid = self._key_grid(key)
ie = self.proc_grid[key][1][self.ip]-1 ib = self.proc_grid[key_grid][0][self.ip]-1
jb = self.proc_grid[key][2][self.jp]-1 ie = self.proc_grid[key_grid][1][self.ip]-1
je = self.proc_grid[key][3][self.jp]-1 jb = self.proc_grid[key_grid][2][self.jp]-1
kb = self.proc_grid[key][4][self.kp]-1 je = self.proc_grid[key_grid][3][self.jp]-1
ke = self.proc_grid[key][5][self.kp]-1 kb = self.proc_grid[key_grid][4][self.kp]-1
ke = self.proc_grid[key_grid][5][self.kp]-1
ijkb = np.array((ib,jb,kb)) ijkb = np.array((ib,jb,kb))
ijke = np.array((ie,je,ke)) ijke = np.array((ie,je,ke))
# Determine the first point to consider in this chunk for the case of output # Determine the first point to consider in this chunk for the case of output
@ -751,7 +760,8 @@ class PPP:
if self.rank==0: if self.rank==0:
stride = np.array(stride) stride = np.array(stride)
# Allocate the full output field # Allocate the full output field
origin = np.array(self.origin[key]) key_grid = self._key_grid(key)
origin = np.array(self.origin[key_grid])
spacing = stride*np.array(self.spacing) spacing = stride*np.array(self.spacing)
dim = (np.array(self.dim(key,axis=None))/stride+1).astype(int) dim = (np.array(self.dim(key,axis=None))/stride+1).astype(int)
# the following seems necessary and seems to work, but i didn't think much about it # the following seems necessary and seems to work, but i didn't think much about it
@ -799,7 +809,8 @@ class PPP:
if axis is None: if axis is None:
return tuple(self.shifting_state(key,axis=ii) for ii in range(3)) return tuple(self.shifting_state(key,axis=ii) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2." assert axis<3, "'axis' must be one of 0,1,2."
return int(round((self.origin[key][axis]-self.bounds[2*axis])/(0.5*self.spacing[axis]))) key_grid = self._key_grid(key)
return int(round((self.origin[key_grid][axis]-self.bounds[2*axis])/(0.5*self.spacing[axis])))
def chunk_size(self,key,axis=None,rank=None,incl_ghost=False): def chunk_size(self,key,axis=None,rank=None,incl_ghost=False):
'''Returns size of a chunk.''' '''Returns size of a chunk.'''
@ -807,11 +818,11 @@ class PPP:
return tuple(self.chunk_size(key, return tuple(self.chunk_size(key,
axis=ii,rank=rank,incl_ghost=incl_ghost) for ii in range(3)) axis=ii,rank=rank,incl_ghost=incl_ghost) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2." assert axis<3, "'axis' must be one of 0,1,2."
#return self.field[key].dim(axis=axis)-2*self.num_ghost[axis] key_grid = self._key_grid(key)
if rank is None: if rank is None:
rank=self.rank rank=self.rank
pos = self.position_from_rank(rank,external=False) pos = self.position_from_rank(rank,external=False)
r = self.proc_grid[key][2*axis+1][pos[axis]]-self.proc_grid[key][2*axis][pos[axis]]+1 r = self.proc_grid[key_grid][2*axis+1][pos[axis]]-self.proc_grid[key_grid][2*axis][pos[axis]]+1
if incl_ghost: if incl_ghost:
r+=2*self.num_ghost[axis] r+=2*self.num_ghost[axis]
return r return r
@ -825,7 +836,8 @@ class PPP:
if axis is None: if axis is None:
return tuple(self.dim(key,axis=ii) for ii in range(3)) return tuple(self.dim(key,axis=ii) for ii in range(3))
assert axis<3, "'axis' must be one of 0,1,2." assert axis<3, "'axis' must be one of 0,1,2."
return self.proc_grid[key][2*axis+1][-1] key_grid = self._key_grid(key)
return self.proc_grid[key_grid][2*axis+1][-1]
def exchange_ghost_cells(self,key): def exchange_ghost_cells(self,key):
'''Communicates all ghost cells of specified field''' '''Communicates all ghost cells of specified field'''
@ -1088,15 +1100,59 @@ class PPP:
proc_grid_ext,nxp_ext,nyp_ext,nzp_ext, proc_grid_ext,nxp_ext,nyp_ext,nzp_ext,
nghbr) nghbr)
def _baton_wait(self,batch_size,tag=420): def _key_grid(self,key):
'''Block execution until an empty message from rank-batch_wait '''Get the key for the grid: scalar fields all share a common grid,
is received (issued by _baton_pass)''' so this routine returns 's' if key is 'sX' with X being a (potentially
baton_wait(batch_size,comm=self.comm,tag=tag) multi-digit) number. Otherwise the key itself is returned.'''
if key[0]=='s' and key[1:].isnumeric():
return 's'
else:
return key
def _baton_pass(self,batch_size,tag=420): class SequentialBlock:
'''Sends an empty message to rank+batch_wait to unblock its '''Block execution of following code for some processors until
execution (issued by _baton_wait)''' 'next' is issued.'''
baton_pass(batch_size,comm=self.comm,tag=tag) def __init__(self,batch_size,comm=None,per_node=False,tag=420):
from mpi4py import MPI
self.comm = MPI.COMM_WORLD if comm is None else comm
self.tag = tag
self.completed = False
# If batch_size is None, there will be no blocking
if batch_size is None:
self.successor = None
self.predecessor = None
return
# Establish the order at which the ranks are run
if per_node:
# Generate a list with all ranks on the current node
nodename = MPI.Get_processor_name()
nodelist = self.comm.allgather(nodename)
ranking = [ii for ii,x in enumerate(nodelist) if x==nodename]
# Get the position of predecessor/successor in ranking list
position = ranking.index(comm.Get_rank())
idx_pre = position-batch_size
idx_suc = position+batch_size
# Convert to rank
self.predecessor = ranking[idx_pre] if position-batch_size>=0 else None
self.successor = ranking[idx_suc] if position+batch_size<len(ranking) else None
else:
# The ranking is solely the rank in the communicator
position = comm.Get_rank()
self.predecessor = position-batch_size
if self.predecessor<0:
self.predecessor = None
self.successor = position+batch_size
if self.successor>=self.comm.Get_size():
self.successor = None
# Wait for predecessor
if self.predecessor is not None:
self.comm.recv(source=self.predecessor,tag=self.tag)
return
def proceed(self):
self.completed = True
if self.successor is not None:
data = None
self.comm.send(data,dest=self.successor,tag=self.tag)
class GatherIterator: class GatherIterator:
'''Sends 'data' sequentially to 'root' which can iterate over it '''Sends 'data' sequentially to 'root' which can iterate over it
@ -1150,27 +1206,4 @@ def is_root(comm=None,root=0):
def gather(data,comm=None): def gather(data,comm=None):
from mpi4py import MPI from mpi4py import MPI
comm = MPI.COMM_WORLD if comm is None else comm comm = MPI.COMM_WORLD if comm is None else comm
return comm.gather(data,root=0) return comm.gather(data,root=0)
def baton_wait(batch_size,comm=None,tag=420):
'''Block execution until an empty message from rank-batch_wait
is received (issued by _baton_pass)'''
from mpi4py import MPI
comm = MPI.COMM_WORLD if comm is None else comm
rank = comm.Get_rank()
if batch_size is not None:
if rank>=batch_size:
source = rank-batch_size
comm.recv(source=source,tag=tag)
def baton_pass(batch_size,comm=None,tag=420):
'''Sends an empty message to rank+batch_wait to unblock its
execution (issued by _baton_wait)'''
from mpi4py import MPI
comm = MPI.COMM_WORLD if comm is None else comm
rank = comm.Get_rank()
if batch_size is not None:
dest = rank+batch_size
if dest<comm.Get_size():
data = None
comm.send(data,dest=dest,tag=tag)