Added memory monitoring, because memory caused problems...

This commit is contained in:
Michael Stumpf 2020-04-01 16:29:30 +02:00
parent 672da18687
commit 9f2262ca7a
1 changed files with 32 additions and 38 deletions

View File

@ -4,10 +4,11 @@ from scipy import ndimage
import ucf
import h5py
import os
import resource
class ibmppp:
"""Parallel Python Postprocessor for the IBM-DNS code"""
def __init__(self,comm,dir_base,flowType,chunksPerProc=(1,1,1),numberOfGhosts=(1,1,1)):
def __init__(self,comm,dir_base,flowType,chunksPerProc=(1,1,1),numberOfGhosts=(1,1,1),precision='float64'):
"""Class constructor"""
# Save method arguments for later use
self.__comm = comm
@ -20,6 +21,7 @@ class ibmppp:
self.__nghx = numberOfGhosts[0]
self.__nghy = numberOfGhosts[1]
self.__nghz = numberOfGhosts[2]
self.__precision = precision
# Declare variables which hold fields
self.field = {'u': None, 'v': None, 'w': None, 'p': None, 's1': None}
# Declare variables for grid (to be set by "loadGrid()")
@ -257,7 +259,7 @@ class ibmppp:
# Wait for everyone to finish
MPI.Request.waitall(reqsend)
def loadField(self,key,dtype='float64'):
def loadField(self,key):
'''Reads chunks from files'''
# Determine which chunks are to be loaded by this processor
(ip,jp,kp) = self.__positionFromRank(self.__rank,self.__nxp,self.__nyp,self.__nzp)
@ -267,7 +269,6 @@ class ibmppp:
ipe1 = ipb1+self.__nxppp-1
jpe1 = jpb1+self.__nyppp-1
kpe1 = kpb1+self.__nzppp-1
#print(ipb1,ipe1,jpb1,jpe1,kpb1,kpe1)
# Get grid bounds for the field to be loaded.
# Also determine file which contains the desired field as well as the
# dataset ID
@ -301,14 +302,11 @@ class ibmppp:
je = self.__procGrid1[key][3][jpe1]
kb = self.__procGrid1[key][4][kpb1]
ke = self.__procGrid1[key][5][kpe1]
#print(ib,ie,jb,je,kb,ke)
nxl = ie-ib+1
nyl = je-jb+1
nzl = ke-kb+1
#print(nxl,nyl,nzl)
# Allocate an array to hold the field
self.field[key] = np.empty((nxl+2*self.__nghx,nyl+2*self.__nghy,nzl+2*self.__nghz),dtype=dtype,order='F')
#print(q.shape)
self.field[key] = np.empty((nxl+2*self.__nghx,nyl+2*self.__nghy,nzl+2*self.__nghz),dtype=self.__precision,order='F')
# Go through each chunk to be read and construct the field
for ip1 in range(ipb1,ipe1+1):
for jp1 in range(jpb1,jpe1+1):
@ -316,7 +314,6 @@ class ibmppp:
# Determine rank of the chunk to be read
iprank = self.__rankFromPosition(ip1,jp1,kp1,self.__nxp1,self.__nyp1,self.__nzp1)
file_input = filebase+'{:05d}'.format(iprank)
#print('Loading '+file_input)
# Compute bounds of this chunk
ib1 = self.__procGrid1[key][0][ip1]
ie1 = self.__procGrid1[key][1][ip1]
@ -324,7 +321,6 @@ class ibmppp:
je1 = self.__procGrid1[key][3][jp1]
kb1 = self.__procGrid1[key][4][kp1]
ke1 = self.__procGrid1[key][5][kp1]
#print(ib1,ie1,jb1,je1,kb1,ke1)
# Read chunk and check its validity
[chunk] = ucf.readFieldChunk(file_input,dset=dset)
if ib1!=chunk['ibeg'] or ie1!=chunk['ibeg']+chunk['nxl']-1:
@ -335,8 +331,6 @@ class ibmppp:
raise ValueError('Loaded chunk does not correspond to processor grid (kbeg={}, nzl={} vs kbeg={},nzl={}'.format(kb1,ke1-kb1+1,chunk['kbeg'],chunk['nzl']))
ngh1 = chunk['ighost']
# No need to care about ghost cells here, we communicate them later
#print('shape(q)',q[ib1-ib+ngh:ie1-ib+ngh+1,jb1-jb+ngh:je1-jb+ngh+1,kb1-kb+ngh:ke1-kb+ngh+1].shape)
#print('shape(q1)',chunk['data'][ngh1:-ngh1,ngh1:-ngh1,ngh1:-ngh1].shape)
self.field[key][ib1-ib+self.__nghx:ie1-ib+self.__nghx+1,
jb1-jb+self.__nghy:je1-jb+self.__nghy+1,
kb1-kb+self.__nghz:ke1-kb+self.__nghz+1] = chunk['data'][ngh1:-ngh1,ngh1:-ngh1,ngh1:-ngh1]
@ -820,27 +814,7 @@ class ibmppp:
def collocateVelocity(self):
'''Moves velocity components onto the pressure grid.'''
print('WARNING: collocateVelocity is UNTESTED')
for key in ('u','v','w'):
# Shift field (this is done in-place)
self.shiftField(key,self.__getShiftDirection(key,'p'))
# Match local domain
iin,jin,kin,iout,jout,kout = self.__sliceCollocated(key,'Q')
tmp = np.empty_like(self.field['p'])
tmp[iout,jout,kout] = self.field[key][iin,jin,kin]
self.field[key] = tmp
# Get new processor grid
self.__procGrid[key] = [ii.copy() for ii in self.__procGrid['p']]
self.__procGrid1[key] = [ii.copy() for ii in self.__procGrid1['p']]
self.__localChunkBounds[key] = [ii.copy() for ii in self.__localChunkBounds['p']]
self.__localChunkSize[key] = [ii.copy() for ii in self.__localChunkSize['p']]
# Get new grid
self.__dx[key] = self.__dx['p']
self.grid[key] = [ii.copy() for ii in self.grid['p']]
self.localGrid[key] = [ii.copy() for ii in self.localGrid['p']]
# Exchange ghost cells and set boundary conditions
self.exchangeGhostCells(key)
self.imposeBoundaryConditions(key)
raise NotImplementedError('TBD')
def downsample(self,field,factor=2):
'''Coarsens a field by integer factor.'''
@ -852,10 +826,10 @@ class ibmppp:
if len(sigma)!=3:
raise ValueError('Expected one value of sigma for each direction, but only got {}.'.format(len(sigma)))
# Compute stencil width and check if we got enough ghost cells
stencil = np.empty((3,))
stencil = [0,0,0]
for ii in range(0,3):
stencil[ii] = 2*int(truncate*sigma[ii]+0.5)+1
if self.__nghx<stencil[0] or self.__nghx<stencil[1] or self.__nghx<stencil[2]:
if self.__nghx<stencil[0] or self.__nghy<stencil[1] or self.__nghz<stencil[2]:
raise ValueError('Too few ghost cells for stencil: ',stencil,(self.__nghx,self.__nghy,self.__nghz))
#print(stencil)
# Run scipy gaussian filter
@ -1082,6 +1056,24 @@ class ibmppp:
self.__symmetrizeWall(key,(0,+1,0))
self.__symmetrizeWall(key,(0,0,-1))
self.__symmetrizeWall(key,(0,0,+1))
def printMemoryUsage(self):
'''Print estimation of memory usage (max over all processors)'''
mem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss/1024.
mem = self.__comm.reduce(sendobj=mem,op=MPI.MAX,root=0)
if self.__rank==0:
print('Current memory usage: {:8.2f} MiB'.format(mem))
def printMemoryUsageField(self,key):
'''Print estimation of memory usage per allocated field.'''
bytesNumpy = np.dtype(self.__precision).itemsize
nxl = self.__localChunkSize[key][0]+2*self.__nghx
nyl = self.__localChunkSize[key][1]+2*self.__nghy
nzl = self.__localChunkSize[key][2]+2*self.__nghz
mem = nxl*nyl*nzl*bytesNumpy/1024.**2
mem = self.__comm.reduce(sendobj=mem,op=MPI.MAX,root=0)
if self.__rank==0:
print('Memory usage of {}: {:8.2f} MiB'.format(key,mem))
def __setBoundaryConditions(self,flowType):
'''Sets the symmetries for ghost cells behind the wall'''
@ -1290,6 +1282,8 @@ class ibmppp:
kp_src = -positionDst[2]+1
rank_dst = self.__nghbr[ip_dst,jp_dst,kp_dst]
rank_src = self.__nghbr[ip_src,jp_src,kp_src]
# [send/recv] create a tag
tag = ip_dst*100+jp_dst*10+kp_dst
# [send/recv] get the indices of data to be sent/received
(nxl,nyl,nzl) = self.__localChunkSize[key]
if positionDst[0]==-1:
@ -1332,11 +1326,11 @@ class ibmppp:
# Communication must be done non-blocking, and we can use upper-case routines since this is a numpy array
if rank_dst is not None:
sendbuf = np.ascontiguousarray(self.field[key][ii_src,jj_src,kk_src])
reqsend = self.__comm.Isend(sendbuf,dest=rank_dst)
reqsend = self.__comm.Isend(sendbuf,dest=rank_dst,tag=tag)
# [recv] the corresponding receive: results are stored in a buffer which will be assigned to the parent array later
if rank_src is not None:
recvbuf = np.zeros((ii_dst.stop-ii_dst.start,jj_dst.stop-jj_dst.start,kk_dst.stop-kk_dst.start))
reqrecv = self.__comm.Irecv(recvbuf,source=rank_src)
recvbuf = np.zeros((ii_dst.stop-ii_dst.start,jj_dst.stop-jj_dst.start,kk_dst.stop-kk_dst.start),dtype=self.__precision)
reqrecv = self.__comm.Irecv(recvbuf,source=rank_src,tag=tag)
# [recv] wait for data to be received
if reqrecv is not None:
reqrecv.wait()
@ -1367,7 +1361,7 @@ class ibmppp:
self.field[key] = np.zeros(
(self.__localChunkSize[key][0]+2*self.__nghx,
self.__localChunkSize[key][1]+2*self.__nghy,
self.__localChunkSize[key][2]+2*self.__nghz)
self.__localChunkSize[key][2]+2*self.__nghz),dtype=self.__precision
)
# Shift grid
for idir in range(0,3):