From 672da1868709f5a69e203a4921751265ebb22ff1 Mon Sep 17 00:00:00 2001 From: Michael Krayer Date: Wed, 1 Apr 2020 13:24:52 +0200 Subject: [PATCH] Simplified MPI calls, because to problems when communicating a lot of data. to be tested on FH2 --- python/ibmppp/ibmppp.py | 55 +++++++++++++++-------------------------- 1 file changed, 20 insertions(+), 35 deletions(-) diff --git a/python/ibmppp/ibmppp.py b/python/ibmppp/ibmppp.py index 5d8f090..038817e 100644 --- a/python/ibmppp/ibmppp.py +++ b/python/ibmppp/ibmppp.py @@ -72,10 +72,6 @@ class ibmppp: 'p': [None,None,None], 's1': [None,None,None] } # (nxl,nyl,nzl) size of local chunk (without ghost cells) - # Declare internal variables for MPI requests - self.__mpireq = [] - self.__mpibufdata = [] - self.__mpibufidx = [] # Evaluate flow type and set boundary conditions self.__setBoundaryConditions(flowType) # Prepare the processor layout @@ -203,6 +199,9 @@ class ibmppp: # There is No need to distinguish between master and slave particles here, since we will # only use them for masking the fields. Also we do not care about particles in ghost cells. if self.__rank==0: + # Setup MPI send request list + reqsend = [] + # Read input file file_input = self.__dir_base+'particles.bin' pp,col = ucf.readParticles(file_input,step=1) # Remove time dimension, because we are dealing with a single time step exclusively @@ -249,17 +248,14 @@ class ibmppp: (pp[col['z'],:]<=bdmax[2]+pp[col['r'],:])) # Send them to that processor sendbuf = (np.ascontiguousarray(pp[:,li_part]),col) - self.__mpireq.append(self.__comm.isend(sendbuf,dest=rank_dst)) + reqsend.append(self.__comm.isend(sendbuf,dest=rank_dst)) # Every rank needs to receive the particles, rank 0 send them to itself buffsize = 32*1024*1024 - req = self.__comm.irecv(buffsize,source=0) - (self.particles,self.col) = req.wait() + reqrecv = self.__comm.irecv(buffsize,source=0) + (self.particles,self.col) = reqrecv.wait() if self.__rank==0: # Wait for everyone to finish - MPI.Request.waitall(self.__mpireq) - # Communication is done! Clear the list of requests - self.__mpireq.clear() - self.__comm.Barrier() + MPI.Request.waitall(reqsend) def loadField(self,key,dtype='float64'): '''Reads chunks from files''' @@ -1047,12 +1043,7 @@ class ibmppp: def exchangeGhostCells(self,key): '''Communicates all ghost cells of specified field''' - # Clear previous MPI buffers. They should be empty anyway, but just to be sure. - self.__mpireq.clear() - self.__mpibufdata.clear() - self.__mpibufidx.clear() # Trigger non-blocking communication: - # requests will be stored in internal class variable "__mpireq" # Communicate faces (6 faces) self.__communicateGhostCells(key,(-1,0,0)) # left self.__communicateGhostCells(key,(+1,0,0)) # right @@ -1082,20 +1073,6 @@ class ibmppp: self.__communicateGhostCells(key,(+1,-1,+1)) # right,down,back self.__communicateGhostCells(key,(+1,+1,-1)) # right,up,front self.__communicateGhostCells(key,(+1,+1,+1)) # right,up,back - # Wait for communication to finish - MPI.Request.waitall(self.__mpireq) - self.__comm.Barrier() - # Communication is done! Clear the list of requests - self.__mpireq.clear() - # Assign buffer to array - for ibuf in range(0,len(self.__mpibufdata)): - ii,jj,kk = self.__mpibufidx.pop() - self.field[key][ii,jj,kk] = self.__mpibufdata.pop() - # Verify that buffer is empty - if len(self.__mpibufdata)!=0: - raise RuntimeError('MPI data buffer not empty after ghost cell exchange!') - if len(self.__mpibufidx)!=0: - raise RuntimeError('MPI index buffer not empty after ghost cell exchange!') def imposeBoundaryConditions(self,key): '''Imposes symmetry boundary conditions on each non-periodic wall''' @@ -1348,17 +1325,25 @@ class ibmppp: kk_dst = slice(0,self.__nghz) else: raise ValueError('Invalid direction for ghost cell exchange: {}'.format(positionDst[2])) + # [send/recv] Create a list for requests + reqsend = None + reqrecv = None # [send] now send the data to the neighbor, but only if there is one! # Communication must be done non-blocking, and we can use upper-case routines since this is a numpy array if rank_dst is not None: sendbuf = np.ascontiguousarray(self.field[key][ii_src,jj_src,kk_src]) - self.__mpireq.append(self.__comm.Isend(sendbuf,dest=rank_dst)) - # [recv] the corresponding receive: results are stored in a buffer, which will be taken care of in the calling routine + reqsend = self.__comm.Isend(sendbuf,dest=rank_dst) + # [recv] the corresponding receive: results are stored in a buffer which will be assigned to the parent array later if rank_src is not None: recvbuf = np.zeros((ii_dst.stop-ii_dst.start,jj_dst.stop-jj_dst.start,kk_dst.stop-kk_dst.start)) - self.__mpibufdata.append(recvbuf) - self.__mpibufidx.append((ii_dst,jj_dst,kk_dst)) - self.__mpireq.append(self.__comm.Irecv(recvbuf,source=rank_src)) + reqrecv = self.__comm.Irecv(recvbuf,source=rank_src) + # [recv] wait for data to be received + if reqrecv is not None: + reqrecv.wait() + self.field[key][ii_dst,jj_dst,kk_dst] = recvbuf + # [send] wait for data to be sent + if reqsend is not None: + reqsend.wait() def __allocateField(self,key,keytemplate,shift=[0,0,0],symmetry=None): '''Alocates a new field with name key. Copy grid and processor layout from keytemplate