Simplified MPI calls, because to problems when communicating a lot of data. to be tested on FH2

This commit is contained in:
Michael Krayer 2020-04-01 13:24:52 +02:00
parent b3748eb210
commit 672da18687
1 changed files with 20 additions and 35 deletions

View File

@ -72,10 +72,6 @@ class ibmppp:
'p': [None,None,None],
's1': [None,None,None]
} # (nxl,nyl,nzl) size of local chunk (without ghost cells)
# Declare internal variables for MPI requests
self.__mpireq = []
self.__mpibufdata = []
self.__mpibufidx = []
# Evaluate flow type and set boundary conditions
self.__setBoundaryConditions(flowType)
# Prepare the processor layout
@ -203,6 +199,9 @@ class ibmppp:
# There is No need to distinguish between master and slave particles here, since we will
# only use them for masking the fields. Also we do not care about particles in ghost cells.
if self.__rank==0:
# Setup MPI send request list
reqsend = []
# Read input file
file_input = self.__dir_base+'particles.bin'
pp,col = ucf.readParticles(file_input,step=1)
# Remove time dimension, because we are dealing with a single time step exclusively
@ -249,17 +248,14 @@ class ibmppp:
(pp[col['z'],:]<=bdmax[2]+pp[col['r'],:]))
# Send them to that processor
sendbuf = (np.ascontiguousarray(pp[:,li_part]),col)
self.__mpireq.append(self.__comm.isend(sendbuf,dest=rank_dst))
reqsend.append(self.__comm.isend(sendbuf,dest=rank_dst))
# Every rank needs to receive the particles, rank 0 send them to itself
buffsize = 32*1024*1024
req = self.__comm.irecv(buffsize,source=0)
(self.particles,self.col) = req.wait()
reqrecv = self.__comm.irecv(buffsize,source=0)
(self.particles,self.col) = reqrecv.wait()
if self.__rank==0:
# Wait for everyone to finish
MPI.Request.waitall(self.__mpireq)
# Communication is done! Clear the list of requests
self.__mpireq.clear()
self.__comm.Barrier()
MPI.Request.waitall(reqsend)
def loadField(self,key,dtype='float64'):
'''Reads chunks from files'''
@ -1047,12 +1043,7 @@ class ibmppp:
def exchangeGhostCells(self,key):
'''Communicates all ghost cells of specified field'''
# Clear previous MPI buffers. They should be empty anyway, but just to be sure.
self.__mpireq.clear()
self.__mpibufdata.clear()
self.__mpibufidx.clear()
# Trigger non-blocking communication:
# requests will be stored in internal class variable "__mpireq"
# Communicate faces (6 faces)
self.__communicateGhostCells(key,(-1,0,0)) # left
self.__communicateGhostCells(key,(+1,0,0)) # right
@ -1082,20 +1073,6 @@ class ibmppp:
self.__communicateGhostCells(key,(+1,-1,+1)) # right,down,back
self.__communicateGhostCells(key,(+1,+1,-1)) # right,up,front
self.__communicateGhostCells(key,(+1,+1,+1)) # right,up,back
# Wait for communication to finish
MPI.Request.waitall(self.__mpireq)
self.__comm.Barrier()
# Communication is done! Clear the list of requests
self.__mpireq.clear()
# Assign buffer to array
for ibuf in range(0,len(self.__mpibufdata)):
ii,jj,kk = self.__mpibufidx.pop()
self.field[key][ii,jj,kk] = self.__mpibufdata.pop()
# Verify that buffer is empty
if len(self.__mpibufdata)!=0:
raise RuntimeError('MPI data buffer not empty after ghost cell exchange!')
if len(self.__mpibufidx)!=0:
raise RuntimeError('MPI index buffer not empty after ghost cell exchange!')
def imposeBoundaryConditions(self,key):
'''Imposes symmetry boundary conditions on each non-periodic wall'''
@ -1348,17 +1325,25 @@ class ibmppp:
kk_dst = slice(0,self.__nghz)
else:
raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[2]))
# [send/recv] Create a list for requests
reqsend = None
reqrecv = None
# [send] now send the data to the neighbor, but only if there is one!
# Communication must be done non-blocking, and we can use upper-case routines since this is a numpy array
if rank_dst is not None:
sendbuf = np.ascontiguousarray(self.field[key][ii_src,jj_src,kk_src])
self.__mpireq.append(self.__comm.Isend(sendbuf,dest=rank_dst))
# [recv] the corresponding receive: results are stored in a buffer, which will be taken care of in the calling routine
reqsend = self.__comm.Isend(sendbuf,dest=rank_dst)
# [recv] the corresponding receive: results are stored in a buffer which will be assigned to the parent array later
if rank_src is not None:
recvbuf = np.zeros((ii_dst.stop-ii_dst.start,jj_dst.stop-jj_dst.start,kk_dst.stop-kk_dst.start))
self.__mpibufdata.append(recvbuf)
self.__mpibufidx.append((ii_dst,jj_dst,kk_dst))
self.__mpireq.append(self.__comm.Irecv(recvbuf,source=rank_src))
reqrecv = self.__comm.Irecv(recvbuf,source=rank_src)
# [recv] wait for data to be received
if reqrecv is not None:
reqrecv.wait()
self.field[key][ii_dst,jj_dst,kk_dst] = recvbuf
# [send] wait for data to be sent
if reqsend is not None:
reqsend.wait()
def __allocateField(self,key,keytemplate,shift=[0,0,0],symmetry=None):
'''Alocates a new field with name key. Copy grid and processor layout from keytemplate