added downsampler

This commit is contained in:
Michael Stumpf (ifhcluster) 2020-02-10 14:50:10 +01:00
parent 1409eb5fca
commit d6f021fc12
2 changed files with 269 additions and 6 deletions

View File

@ -11,14 +11,21 @@ def __warning_format(message, category, filename, lineno, file=None, line=None):
return '%s:%s: %s:%s\n' % (filename, lineno, category.__name__, message) return '%s:%s: %s:%s\n' % (filename, lineno, category.__name__, message)
warnings.formatwarning = __warning_format warnings.formatwarning = __warning_format
#############################
# Low-level class interface #
#############################
class UCF: class UCF:
"""UCF low-level access class""" """UCF low-level access class"""
def __init__(self): def __init__(self,file=None,verbosity=False,debug=False):
self.__initializeConstants() self.__initializeConstants()
self.__resetPublicProperties() self.__resetPublicProperties()
self.__resetPrivateProperties() self.__resetPrivateProperties()
self.__resetCurrentStep() self.__resetCurrentStep()
self.__resetCurrentSet() self.__resetCurrentSet()
if file is not None:
self.open(file)
self.Debug = debug
self.Verbosity = verbosity
def open(self,file): def open(self,file):
"""Opens an input stream for reading access. The variable 'file' can be of the following types: """Opens an input stream for reading access. The variable 'file' can be of the following types:
@ -31,7 +38,7 @@ class UCF:
self.File = file self.File = file
self.__external = False self.__external = False
self.__stream = False self.__stream = False
self.__fileID = open(self.File,self.IOMode) self.__fileID = open(self.File,'rb')
self.__inputAvailable = True self.__inputAvailable = True
elif isinstance(file,tarfile.ExFileObject): elif isinstance(file,tarfile.ExFileObject):
self.File = file.name self.File = file.name
@ -86,10 +93,11 @@ class UCF:
def close(self): def close(self):
"""Closes input file object""" """Closes input file object"""
self.__fileID.close() if not isinstance(self.__fileID,tarfile.ExFileObject):
self.__fileID.close()
self.__init__ self.__init__
def initBuffer(self,rank=0,rankijk=(0,0,0),ftype=1999): def addFileHeaderToBuffer(self,rank=0,rankijk=(0,0,0),ftype=1999):
"""Initialize a buffer to generate a new UCF file.""" """Initialize a buffer to generate a new UCF file."""
self.__bufNumSteps = 0 self.__bufNumSteps = 0
self.__bufStep = [] self.__bufStep = []
@ -135,6 +143,11 @@ class UCF:
self.__bufParams[step-1][dset-1] = params self.__bufParams[step-1][dset-1] = params
self.__bufData[step-1][dset-1] = data self.__bufData[step-1][dset-1] = data
def copyFileHeaderToBuffer(self):
if not self.__inputAvailable:
raise IOError('No input file available')
self.addFileHeaderToBuffer(rank=self.IORank[0],rankijk=self.IORank[1:],ftype=self.__typeID)
def copyStepToBuffer(self,step_in,step_out=1,recursive=False,singlePrecision=False): def copyStepToBuffer(self,step_in,step_out=1,recursive=False,singlePrecision=False):
"""Copy a step from an input file to output buffer. If recursive copying is activated, all datasets """Copy a step from an input file to output buffer. If recursive copying is activated, all datasets
within the step will be copied, otherwise only the step header is copied without datasets. within the step will be copied, otherwise only the step header is copied without datasets.
@ -318,9 +331,9 @@ class UCF:
2: "particle", 2: "particle",
3: "statistics" 3: "statistics"
} }
self.Class = classDict.get(np.floor(self.__typeID/self.__factorTypeIDClass),"unknown"); self.Class = classDict.get(np.floor(self.__typeID/self.__factorTypeIDClass),"unknown")
# Parse IO rank # Parse IO rank
self.IORank = header[5:8]; self.IORank = header[4:8]
def __readHeaderStep(self): def __readHeaderStep(self):
# Read and parse # Read and parse
@ -462,3 +475,72 @@ class UCF:
self.__currentSetNumParams = 0 self.__currentSetNumParams = 0
self.__currentSetParams = 0 self.__currentSetParams = 0
self.__currentSetNumElements = 0 self.__currentSetNumElements = 0
#################################
# High-level function interface #
#################################
def readGrid(file,verbosity=False,debug=False):
obj = UCF(file=file,verbosity=verbosity,debug=debug)
output = []
for iset in range(0,obj.NumDataset):
(data,params) = obj.readSet(step=1,dset=iset)
nx = params[0]
ny = params[1]
nz = params[2]
output.append(data[0:nx])
output.append(data[nx:nx+ny])
output.append(data[nx+ny:nx+ny+nz])
obj.close()
if obj.UCFVersion<2:
output.extend(output[-3:])
return output
def readProcgrid(file,verbosity=False,debug=False):
obj = UCF(file=file,verbosity=verbosity,debug=debug)
output = []
for iset in range(0,obj.NumDataset):
(data,params) = obj.readSet(step=1,dset=iset)
nxp = params[0]
nyp = params[1]
nzp = params[2]
output.append(data[0:nxp]) # ibeg
output.append(data[nxp:2*nxp]) # iend
output.append(data[2*nxp:2*nxp+nyp]) # jbeg
output.append(data[2*nxp+nyp:2*nxp+2*nyp]) # jend
output.append(data[2*nxp+2*nyp:2*nxp+2*nyp+nzp]) # kbeg
output.append(data[2*nxp+2*nyp+nzp:2*nxp+2*nyp*2*nzp]) # kend
obj.close()
if obj.UCFVersion<2:
output.extend(output[-6:])
return output
def readFieldChunk(file,step=1,dset=-1,verbosity=False,debug=False):
obj = UCF(file=file,verbosity=verbosity,debug=debug)
if not isinstance(dset,list):
if dset==-1:
dset = range(1,obj.NumDataset+1) # fix that maybe later (this is maximum over all timesteps)
else:
dset = [dset]
output = []
for ii in dset:
tmp = dict()
(data,params) = obj.readSet(step=step,dset=ii)
tmp['ighost'] = params[0]
tmp['ibeg'] = params[1]
tmp['jbeg'] = params[2]
tmp['kbeg'] = params[3]
tmp['nxl'] = params[4]
tmp['nyl'] = params[5]
tmp['nzl'] = params[6]
tmp['nx'] = params[7]
tmp['ny'] = params[8]
tmp['nz'] = params[9]
tmp['data'] = data.reshape((tmp['nxl']+2*tmp['ighost'],
tmp['nyl']+2*tmp['ighost'],
tmp['nzl']+2*tmp['ighost']),
order='F')
tmp['rank'] = obj.IORank[0]
tmp['rankijk']= obj.IORank[1:]
output.append(tmp)
obj.close()
return output

181
python/ucftar_downsampler Executable file
View File

@ -0,0 +1,181 @@
#!/usr/bin/env python3
import sys
import io
import tarfile
import argparse
import numpy as np
import ucf
parser = argparse.ArgumentParser(description='Reads an ucf.tar archive, downsamples it and saves it to a new ucf.tar archive. Can be used as a pipe.')
parser.add_argument("-i", "--infile", metavar='file',nargs='?', default=None, help="name of the input file [default: stdin]", action="store")
parser.add_argument("-o", "--outfile", metavar='file',nargs='?', default=None, help="name of the output file [default: stdout]", action="store")
parser.add_argument("-n", "--nskip", metavar='N',nargs='?', type=int, default=2, help="keep every Nth grid point [default: 2]", action="store")
parser.add_argument("-sp", "--single-precision", help="output data in single-precision? [default: False]", action="store_true")
args = parser.parse_args()
nskip = args.nskip
file_in = args.infile
file_out = args.outfile
saveSinglePrecision = args.single_precision
if file_in is None:
istream = tarfile.open(fileobj=sys.stdin.buffer,mode='r|',bufsize=512*1024**2,ignore_zeros=True)
else:
filehandle_in = open(file_in,'rb')
istream = tarfile.open(fileobj=filehandle_in,mode='r')
if file_out is None:
ostream = tarfile.open(fileobj=sys.stdout.buffer,mode='w|',bufsize=512*1024**2,pax_headers=tarfile.USTAR_FORMAT)
else:
filehandle_out = open(file_out,'wb')
ostream = tarfile.open(fileobj=filehandle_out,mode='w',pax_headers=tarfile.USTAR_FORMAT)
while True:
iinfo = istream.next()
if iinfo is None:
break
print(iinfo.name,file=sys.stderr)
ucfbytes_in = istream.extractfile(iinfo).read()
ucfbytes_out = b''
if iinfo.name=='parameters.asc':
ucfbytes_out += ucfbytes_in
if iinfo.name=='particles.bin':
ucfbytes_out += ucfbytes_in
if iinfo.name=='grid.bin':
ucfhandle = ucf.UCF(file=ucfbytes_in,verbosity=False)
ucfhandle.copyFileHeaderToBuffer()
ucfhandle.copyStepToBuffer(1,step_out=1,recursive=False)
for iset in range(0,ucfhandle.NumDataset):
(data,params) = ucfhandle.readSet(step=1,dset=iset+1)
params = list(params)
nx = params[0]
ny = params[1]
nz = params[2]
x = data[0:nx:nskip]
y = data[nx:nx+ny:nskip]
z = data[nx+ny:nx+ny+nz:nskip]
params[0] = len(x)
params[1] = len(y)
params[2] = len(z)
data = np.concatenate((x,y,z))
ucfhandle.addDatasetToBuffer(data,params=params,step=1,dset=iset+1)
ucfbytes_out += ucfhandle.flushBuffer()
ucfhandle.close()
if iinfo.name=='proc.bin':
ucfhandle = ucf.UCF(file=ucfbytes_in,verbosity=False)
ucfhandle.copyFileHeaderToBuffer()
ucfhandle.copyStepToBuffer(1,step_out=1,recursive=False)
for iset in range(0,ucfhandle.NumDataset):
(data,params) = ucfhandle.readSet(step=1,dset=iset+1)
nxp = params[0]
nyp = params[1]
nzp = params[2]
ibeg = np.copy(data[0:nxp] )
iend = np.copy(data[nxp:2*nxp] )
jbeg = np.copy(data[2*nxp:2*nxp+nyp] )
jend = np.copy(data[2*nxp+nyp:2*nxp+2*nyp] )
kbeg = np.copy(data[2*nxp+2*nyp:2*nxp+2*nyp+nzp] )
kend = np.copy(data[2*nxp+2*nyp+nzp:2*nxp+2*nyp*2*nzp])
for ixp in range(0,nxp):
ibeg[ixp] = (ibeg[ixp]-1)//nskip+1
iend[ixp] = (iend[ixp]-1)//nskip+1
for iyp in range(0,nyp):
jbeg[iyp] = (jbeg[iyp]-1)//nskip+1
jend[iyp] = (jend[iyp]-1)//nskip+1
for izp in range(0,nzp):
kbeg[izp] = (kbeg[izp]-1)//nskip+1
kend[izp] = (kend[izp]-1)//nskip+1
data = np.concatenate((ibeg,iend,jbeg,jend,kbeg,kend))
ucfhandle.addDatasetToBuffer(data,params=params,step=1,dset=iset+1)
ucfbytes_out += ucfhandle.flushBuffer()
ucfhandle.close()
if 'uvwp.' in iinfo.name:
ucfhandle = ucf.UCF(file=ucfbytes_in,verbosity=False)
ucfhandle.copyFileHeaderToBuffer()
ucfhandle.copyStepToBuffer(1,step_out=1,recursive=False)
for iset in range(0,4):
(data,params_in) = ucfhandle.readSet(step=1,dset=iset+1)
ighost = params_in[0]
(ibeg,jbeg,kbeg) = params_in[1:4]
(nxl,nyl,nzl) = params_in[4:7]
(nxg,nyg,nzg) = params_in[7:10]
data = data.reshape((nxl+2*ighost,nyl+2*ighost,nzl+2*ighost),order='F')
islice = [ii-ibeg+ighost for ii in range(ibeg,ibeg+nxl) if (ii-1)%nskip==0]
jslice = [ii-jbeg+ighost for ii in range(jbeg,jbeg+nyl) if (ii-1)%nskip==0]
kslice = [ii-kbeg+ighost for ii in range(kbeg,kbeg+nzl) if (ii-1)%nskip==0]
data = data[np.ix_(islice,jslice,kslice)]
ibeg = (islice[0]+ibeg-ighost)//nskip+1
jbeg = (jslice[0]+jbeg-ighost)//nskip+1
kbeg = (kslice[0]+kbeg-ighost)//nskip+1
(nxl,nyl,nzl) = data.shape
nxg = (nxg-1)//nskip+1
nyg = (nyg-1)//nskip+1
nzg = (nzg-1)//nskip+1
params_out = list(params_in)
params_out[0] = 0
params_out[1:4] = (ibeg,jbeg,kbeg)
params_out[4:7] = (nxl,nyl,nzl)
params_out[7:10] = (nxg,nyg,nzg)
if saveSinglePrecision:
data = data.astype(np.float32,casting='same_kind')
ucfhandle.addDatasetToBuffer(data,params=params_out,step=1,dset=iset+1)
ucfbytes_out += ucfhandle.flushBuffer()
ucfhandle.close()
if 'scal.' in iinfo.name:
ucfhandle = ucf.UCF(file=ucfbytes_in,verbosity=False)
ucfhandle.copyFileHeaderToBuffer()
ucfhandle.copyStepToBuffer(1,step_out=1,recursive=False)
for iset in range(0,ucfhandle.NumDataset):
(data,params_in) = ucfhandle.readSet(step=1,dset=iset+1)
ighost = params_in[0]
(ibeg,jbeg,kbeg) = params_in[1:4]
(nxl,nyl,nzl) = params_in[4:7]
(nxg,nyg,nzg) = params_in[7:10]
data = data.reshape((nxl+2*ighost,nyl+2*ighost,nzl+2*ighost),order='F')
islice = [ii-ibeg+ighost for ii in range(ibeg,ibeg+nxl) if (ii-1)%nskip==0]
jslice = [ii-jbeg+ighost for ii in range(jbeg,jbeg+nyl) if (ii-1)%nskip==0]
kslice = [ii-kbeg+ighost for ii in range(kbeg,kbeg+nzl) if (ii-1)%nskip==0]
data = data[np.ix_(islice,jslice,kslice)]
ibeg = (islice[0]+ibeg-ighost)//nskip+1
jbeg = (jslice[0]+jbeg-ighost)//nskip+1
kbeg = (kslice[0]+kbeg-ighost)//nskip+1
(nxl,nyl,nzl) = data.shape
nxg = (nxg-1)//nskip+1
nyg = (nyg-1)//nskip+1
nzg = (nzg-1)//nskip+1
params_out = list(params_in)
params_out[0] = 0
params_out[1:4] = (ibeg,jbeg,kbeg)
params_out[4:7] = (nxl,nyl,nzl)
params_out[7:10] = (nxg,nyg,nzg)
if saveSinglePrecision:
data = data.astype(np.float32,casting='same_kind')
ucfhandle.addDatasetToBuffer(data,params=params_out,step=1,dset=iset+1)
ucfbytes_out += ucfhandle.flushBuffer()
ucfhandle.close()
oinfo = tarfile.TarInfo(name=iinfo.name)
oinfo.size = len(ucfbytes_out)
ostream.addfile(oinfo,fileobj=io.BytesIO(ucfbytes_out))
istream.close()
ostream.close()
if file_in is not None:
filehandle_in.close()
if file_out is not None:
filehandle_out.close()