From f8e728149f612757ebd7a8a81173d4bfb1d978e6 Mon Sep 17 00:00:00 2001 From: Michael Krayer Date: Mon, 7 Jun 2021 16:29:51 +0200 Subject: [PATCH] fixed grid bug for scalar: refined grid are still not possible; created new class SequentialBlock which replaces 'baton' routines. --- parallel.py | 197 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 115 insertions(+), 82 deletions(-) diff --git a/parallel.py b/parallel.py index 5a92556..1d09f61 100644 --- a/parallel.py +++ b/parallel.py @@ -81,7 +81,7 @@ class PPP: if parallel: f = h5py.File(file,'r',driver='mpio',comm=comm) else: - baton_wait(io_limit,comm=comm) + sb = SequentialBlock(io_limit,comm=comm,per_node=True) f = h5py.File(file,'r') # Read attributes which are the same for all ranks g = f['PPP'] @@ -118,7 +118,7 @@ class PPP: f.close() assert nxp*nyp*nzp==comm.Get_size(), "The loaded state requires {} processors, but "\ "we are currently running with {}.".format(nxp*nyp*nzp,comm.Get_size()) - if not parallel: baton_pass(io_limit,comm=comm) + if not parallel: sb.proceed() func_load = None proc_grid_ext = None nxp_ext,nyp_ext,nzp_ext = 3*[None] @@ -128,12 +128,15 @@ class PPP: proc_grid_ext,nxp_ext,nyp_ext,nzp_ext, nghbr,field,symmetries) - def load_field(self,key,io_limit=None,barrier=False,dtype=np.float64): + def load_field(self,key,io_limit=None,dtype=np.float64,verbose=False): '''Loads the required chunks from file''' from .field import Field3d import numpy as np + # Verbose output + if verbose and self.rank==0: + print('[load_field] loading {} (io_limit={})'.format(key,io_limit)) # Block execution of some processors if IO is limited - self._baton_wait(io_limit) + sb = SequentialBlock(io_limit,comm=self.comm,per_node=True) # Determine which chunks are to be loaded by the current processor ip_beg_ext = self.ip*self.chunks_per_proc[0] jp_beg_ext = self.jp*self.chunks_per_proc[1] @@ -142,12 +145,13 @@ class PPP: jp_end_ext = jp_beg_ext+self.chunks_per_proc[1]-1 kp_end_ext = kp_beg_ext+self.chunks_per_proc[2]-1 # Get the total size of the field to be loaded - ib = self.proc_grid_ext[key][0][ip_beg_ext] - ie = self.proc_grid_ext[key][1][ip_end_ext] - jb = self.proc_grid_ext[key][2][jp_beg_ext] - je = self.proc_grid_ext[key][3][jp_end_ext] - kb = self.proc_grid_ext[key][4][kp_beg_ext] - ke = self.proc_grid_ext[key][5][kp_end_ext] + key_grid = self._key_grid(key) + ib = self.proc_grid_ext[key_grid][0][ip_beg_ext] + ie = self.proc_grid_ext[key_grid][1][ip_end_ext] + jb = self.proc_grid_ext[key_grid][2][jp_beg_ext] + je = self.proc_grid_ext[key_grid][3][jp_end_ext] + kb = self.proc_grid_ext[key_grid][4][kp_beg_ext] + ke = self.proc_grid_ext[key_grid][5][kp_end_ext] nxl = ie-ib+1 nyl = je-jb+1 nzl = ke-kb+1 @@ -157,9 +161,9 @@ class PPP: nyl+2*self.nghy, nzl+2*self.nghz),dtype=dtype) # Compute origin of subfield - origin = (self.origin[key][0]+(ib-1-self.nghx)*self.spacing[0], - self.origin[key][1]+(jb-1-self.nghy)*self.spacing[1], - self.origin[key][2]+(kb-1-self.nghz)*self.spacing[2]) + origin = (self.origin[key_grid][0]+(ib-1-self.nghx)*self.spacing[0], + self.origin[key_grid][1]+(jb-1-self.nghy)*self.spacing[1], + self.origin[key_grid][2]+(kb-1-self.nghz)*self.spacing[2]) # Create a Field3d self.field[key] = Field3d(data,origin,self.spacing) # Go through each chunk to be read and construct the field @@ -169,12 +173,12 @@ class PPP: # Determine rank of the chunk to be read rank_ext = self.rank_from_position(ip_ext,jp_ext,kp_ext,external=True) # Compute bounds of this chunk - ib_ext = self.proc_grid_ext[key][0][ip_ext] - ie_ext = self.proc_grid_ext[key][1][ip_ext] - jb_ext = self.proc_grid_ext[key][2][jp_ext] - je_ext = self.proc_grid_ext[key][3][jp_ext] - kb_ext = self.proc_grid_ext[key][4][kp_ext] - ke_ext = self.proc_grid_ext[key][5][kp_ext] + ib_ext = self.proc_grid_ext[key_grid][0][ip_ext] + ie_ext = self.proc_grid_ext[key_grid][1][ip_ext] + jb_ext = self.proc_grid_ext[key_grid][2][jp_ext] + je_ext = self.proc_grid_ext[key_grid][3][jp_ext] + kb_ext = self.proc_grid_ext[key_grid][4][kp_ext] + ke_ext = self.proc_grid_ext[key_grid][5][kp_ext] nxl_ext = ie_ext-ib_ext+1 nyl_ext = je_ext-jb_ext+1 nzl_ext = ke_ext-kb_ext+1 @@ -182,20 +186,21 @@ class PPP: subfield = self.func_load(rank_ext,key) self.field[key].insert_subfield(subfield) # Continue execution of waiting processors if IO was limited - self._baton_pass(io_limit) + sb.proceed() + if verbose: + print('[load_field] rank {} done with I/O.'.format(self.rank)) # Exchange ghost cells self.exchange_ghost_cells(key) # Initialize symmetries and impose BC self.init_symmetry(key) self.impose_boundary_conditions(key) - # Syncronize processes if requested - if barrier: self.comm.Barrier() def differentiate(self,key,axis,key_out=None,on_pressure_grid=False): assert axis<3, "'axis' must be one of 0,1,2." if key_out is None: key_out = 'd'+key+('/dx','/dy','/dz')[axis] - origin = list(self.origin[key]) + key_grid = self._key_grid(key) + origin = list(self.origin[key_grid]) shifting_state = self.shifting_state(key,axis=axis) if shifting_state==-1: shift_origin = 'after' @@ -208,10 +213,9 @@ class PPP: origin[axis] -= 0.5*self.spacing[axis] else: raise ValueError("Invalid shifting state.") + self.copy(key,key_out,skip_data=True) self.field[key_out] = self.field[key].derivative(axis,only_keep_interior=False,shift_origin=shift_origin) - self.origin[key_out] = tuple(origin) - self.symmetries[key_out] = self.symmetries[key].copy() - self.proc_grid[key_out] = self.proc_grid[key].copy() + self.origin[key_out] = tuple(origin) if axis==0: self.symmetries[key_out][0,1,1] = -self.symmetries[key_out][0,1,1] self.symmetries[key_out][2,1,1] = -self.symmetries[key_out][2,1,1] @@ -362,6 +366,7 @@ class PPP: op = operator.ipow else: raise ValueError("Invalid operation: {}".format(operation)) + key_grid = self._key_grid(key) if isinstance(arg,np.ndarray): sl_arg = 3*[slice(None)] for axis in range(3): @@ -370,8 +375,8 @@ class PPP: elif arg.shape[axis]==self.dim(key,axis=axis): pos = self.position_from_rank(self.rank,external=False)[axis] sl_arg[axis] = slice( - self.proc_grid[key][2*axis][pos]-1, - self.proc_grid[key][2*axis+1][pos]) + self.proc_grid[key_grid][2*axis][pos]-1, + self.proc_grid[key_grid][2*axis+1][pos]) else: raise ValueError("'arg' must either be singular or match global "\ "grid dimension. (axis={}: got {:d}, expected {:d}".format( @@ -412,6 +417,7 @@ class PPP: # Make sure weights_local is a numpy array weights_local = np.array(weights_local) # Simple implementation: all processor communicate directly to root + key_grid = self._key_grid(key) if self.rank==0: dim_final = tuple(1 if integrate_axis[axis] else self.dim(key,axis=axis) for axis in range(3)) # Allocate a nice spot of memory @@ -428,8 +434,8 @@ class PPP: sl[axis] = slice(0,1) else: sl[axis] = slice( - self.proc_grid[key][2*axis][pos[axis]]-1, - self.proc_grid[key][2*axis+1][pos[axis]]) + self.proc_grid[key_grid][2*axis][pos[axis]]-1, + self.proc_grid[key_grid][2*axis+1][pos[axis]]) sl = tuple(sl) # Receive data and put it in a good spot if rank_src==0: @@ -512,8 +518,9 @@ class PPP: '''Copies a field to a new key.''' if key==key_out: return - self.origin[key_out] = tuple(self.origin[key]) - self.proc_grid[key_out] = self.proc_grid[key].copy() + key_grid = self._key_grid(key) + self.origin[key_out] = tuple(self.origin[key_grid]) + self.proc_grid[key_out] = self.proc_grid[key_grid].copy() if not skip_symmetries: self.symmetries[key_out] = self.symmetries[key].copy() if not skip_data: @@ -527,9 +534,10 @@ class PPP: import gc del self.field[key] del self.symmetries[key] - if key not in ('u','v','w','p','s'): - del self.origin[key] - del self.proc_grid[key] + key_grid = self._key_grid(key) + if key_grid not in ('u','v','w','p','s'): + del self.origin[key_grid] + del self.proc_grid[key_grid] gc.collect() return @@ -543,7 +551,7 @@ class PPP: if parallel: f = h5py.File(file,'w',driver='mpio',comm=self.comm) else: - self._baton_wait(1) + sb = SequentialBlock(1,comm=self.comm,per_node=False) f = h5py.File(file,'w') if self.rank==0 else h5py.File(file,'a') # Write attributes which are the same for all ranks if parallel or self.rank==0: @@ -613,7 +621,7 @@ class PPP: f[grp_rank][key]['data'][:] = self.field[key].data f[grp_rank][key]['symmetries'][:] = self.symmetries[key] f.close() - if not parallel: self._baton_pass(1) + if not parallel: sb.proceed() self.comm.Barrier() tend = MPI.Wtime() if self.rank==0: @@ -647,11 +655,11 @@ class PPP: fld = self._merge_at_root(key,stride=stride) if self.rank==0: fld.save(file,name=name,truncate=truncate) else: - self._baton_wait(1) + sb = SequentialBlock(1,comm=self.comm,per_node=False) if self.rank!=0: truncate=False name += '/{:05d}'.format(self.rank) self._subfield_for_vtk(key,stride=stride).save(file,name=name,truncate=truncate) - self._baton_pass(1) + sb.proceed() # Free the temporary field (if it was created) if on_pressure_grid: self.delete(key_tmp) # Sync (important in case there is another write to the same file following!) @@ -719,12 +727,13 @@ class PPP: assert all(np.array(self.num_ghost)>=stride*num_ghost), "At least stride*num_ghost "\ "ghost cells are required in the original field. Have {}, need {}.".format( tuple(self.num_ghost),tuple(stride*num_ghost)) - ib = self.proc_grid[key][0][self.ip]-1 - ie = self.proc_grid[key][1][self.ip]-1 - jb = self.proc_grid[key][2][self.jp]-1 - je = self.proc_grid[key][3][self.jp]-1 - kb = self.proc_grid[key][4][self.kp]-1 - ke = self.proc_grid[key][5][self.kp]-1 + key_grid = self._key_grid(key) + ib = self.proc_grid[key_grid][0][self.ip]-1 + ie = self.proc_grid[key_grid][1][self.ip]-1 + jb = self.proc_grid[key_grid][2][self.jp]-1 + je = self.proc_grid[key_grid][3][self.jp]-1 + kb = self.proc_grid[key_grid][4][self.kp]-1 + ke = self.proc_grid[key_grid][5][self.kp]-1 ijkb = np.array((ib,jb,kb)) ijke = np.array((ie,je,ke)) # Determine the first point to consider in this chunk for the case of output @@ -751,7 +760,8 @@ class PPP: if self.rank==0: stride = np.array(stride) # Allocate the full output field - origin = np.array(self.origin[key]) + key_grid = self._key_grid(key) + origin = np.array(self.origin[key_grid]) spacing = stride*np.array(self.spacing) dim = (np.array(self.dim(key,axis=None))/stride+1).astype(int) # the following seems necessary and seems to work, but i didn't think much about it @@ -799,7 +809,8 @@ class PPP: if axis is None: return tuple(self.shifting_state(key,axis=ii) for ii in range(3)) assert axis<3, "'axis' must be one of 0,1,2." - return int(round((self.origin[key][axis]-self.bounds[2*axis])/(0.5*self.spacing[axis]))) + key_grid = self._key_grid(key) + return int(round((self.origin[key_grid][axis]-self.bounds[2*axis])/(0.5*self.spacing[axis]))) def chunk_size(self,key,axis=None,rank=None,incl_ghost=False): '''Returns size of a chunk.''' @@ -807,11 +818,11 @@ class PPP: return tuple(self.chunk_size(key, axis=ii,rank=rank,incl_ghost=incl_ghost) for ii in range(3)) assert axis<3, "'axis' must be one of 0,1,2." - #return self.field[key].dim(axis=axis)-2*self.num_ghost[axis] + key_grid = self._key_grid(key) if rank is None: rank=self.rank pos = self.position_from_rank(rank,external=False) - r = self.proc_grid[key][2*axis+1][pos[axis]]-self.proc_grid[key][2*axis][pos[axis]]+1 + r = self.proc_grid[key_grid][2*axis+1][pos[axis]]-self.proc_grid[key_grid][2*axis][pos[axis]]+1 if incl_ghost: r+=2*self.num_ghost[axis] return r @@ -825,7 +836,8 @@ class PPP: if axis is None: return tuple(self.dim(key,axis=ii) for ii in range(3)) assert axis<3, "'axis' must be one of 0,1,2." - return self.proc_grid[key][2*axis+1][-1] + key_grid = self._key_grid(key) + return self.proc_grid[key_grid][2*axis+1][-1] def exchange_ghost_cells(self,key): '''Communicates all ghost cells of specified field''' @@ -1088,15 +1100,59 @@ class PPP: proc_grid_ext,nxp_ext,nyp_ext,nzp_ext, nghbr) - def _baton_wait(self,batch_size,tag=420): - '''Block execution until an empty message from rank-batch_wait - is received (issued by _baton_pass)''' - baton_wait(batch_size,comm=self.comm,tag=tag) + def _key_grid(self,key): + '''Get the key for the grid: scalar fields all share a common grid, + so this routine returns 's' if key is 'sX' with X being a (potentially + multi-digit) number. Otherwise the key itself is returned.''' + if key[0]=='s' and key[1:].isnumeric(): + return 's' + else: + return key - def _baton_pass(self,batch_size,tag=420): - '''Sends an empty message to rank+batch_wait to unblock its - execution (issued by _baton_wait)''' - baton_pass(batch_size,comm=self.comm,tag=tag) +class SequentialBlock: + '''Block execution of following code for some processors until + 'next' is issued.''' + def __init__(self,batch_size,comm=None,per_node=False,tag=420): + from mpi4py import MPI + self.comm = MPI.COMM_WORLD if comm is None else comm + self.tag = tag + self.completed = False + # If batch_size is None, there will be no blocking + if batch_size is None: + self.successor = None + self.predecessor = None + return + # Establish the order at which the ranks are run + if per_node: + # Generate a list with all ranks on the current node + nodename = MPI.Get_processor_name() + nodelist = self.comm.allgather(nodename) + ranking = [ii for ii,x in enumerate(nodelist) if x==nodename] + # Get the position of predecessor/successor in ranking list + position = ranking.index(comm.Get_rank()) + idx_pre = position-batch_size + idx_suc = position+batch_size + # Convert to rank + self.predecessor = ranking[idx_pre] if position-batch_size>=0 else None + self.successor = ranking[idx_suc] if position+batch_size=self.comm.Get_size(): + self.successor = None + # Wait for predecessor + if self.predecessor is not None: + self.comm.recv(source=self.predecessor,tag=self.tag) + return + def proceed(self): + self.completed = True + if self.successor is not None: + data = None + self.comm.send(data,dest=self.successor,tag=self.tag) class GatherIterator: '''Sends 'data' sequentially to 'root' which can iterate over it @@ -1150,27 +1206,4 @@ def is_root(comm=None,root=0): def gather(data,comm=None): from mpi4py import MPI comm = MPI.COMM_WORLD if comm is None else comm - return comm.gather(data,root=0) - -def baton_wait(batch_size,comm=None,tag=420): - '''Block execution until an empty message from rank-batch_wait - is received (issued by _baton_pass)''' - from mpi4py import MPI - comm = MPI.COMM_WORLD if comm is None else comm - rank = comm.Get_rank() - if batch_size is not None: - if rank>=batch_size: - source = rank-batch_size - comm.recv(source=source,tag=tag) - -def baton_pass(batch_size,comm=None,tag=420): - '''Sends an empty message to rank+batch_wait to unblock its - execution (issued by _baton_wait)''' - from mpi4py import MPI - comm = MPI.COMM_WORLD if comm is None else comm - rank = comm.Get_rank() - if batch_size is not None: - dest = rank+batch_size - if dest