ucf base class added

This commit is contained in:
Michael Stumpf (ifhcluster) 2019-12-06 11:06:01 +01:00
parent 0490050e24
commit 7b8ed3e9af
1 changed files with 457 additions and 0 deletions

457
python/ucf/ucf.py Normal file
View File

@ -0,0 +1,457 @@
import sys
import io
import struct
import warnings
import numpy as np
import tarfile
import time
from datetime import datetime
def __warning_format(message, category, filename, lineno, file=None, line=None):
return '%s:%s: %s:%s\n' % (filename, lineno, category.__name__, message)
warnings.formatwarning = __warning_format
class UCF:
"""UCF low-level access class"""
def __init__(self):
self.__initializeConstants()
self.__resetPublicProperties()
self.__resetPrivateProperties()
self.__resetCurrentStep()
self.__resetCurrentSet()
def open(self,file):
"""Opens an input stream for reading access. The variable 'file' can be of the following types:
str opens a file on disk whose path is specified by 'file'
tarfile.ExFileObject read from a file inside a tar archive (use tarfile.extractfile to generate an instance for 'file')
bytes data which is already located in memory as a bytes or bytearray object (can be used for streams)
"""
# Check what class 'file' belongs to and treat it accordingly
if isinstance(file,str):
self.File = file
self.__external = False
self.__stream = False
self.__fileID = open(self.File,self.IOMode)
self.__inputAvailable = True
elif isinstance(file,tarfile.ExFileObject):
self.File = file.name
self.__external = True
self.__stream = False
self.__fileID = file
self.__inputAvailable = True
elif isinstance(file,bytes) or isinstance(file,bytearray):
self.File = 'byte-stream'
self.__external = True
self.__stream = True
self.__fileID = io.BytesIO(file)
self.__inputAvailable = True
# Determine file size
self.__fileID.seek(0,2)
self.FileSize = self.__fileID.tell()
self.__fileID.seek(self.__fileBeg,0)
# Read the header of the file
self.__readHeaderFile()
# Scan through file to get the basic structure (steps/sets)
self.__timeStep = np.zeros(self.__scanBuffSize,dtype=np.float64)
self.__posStep = np.zeros(self.__scanBuffSize,dtype=np.int32)
self.__numSetPerStep = np.zeros(self.__scanBuffSize,dtype=np.int32)
istep = 0;
while self.__fileID.tell()<self.FileSize:
self.__readHeaderStep();
self.__timeStep[istep] = self.__currentStepTime
self.__posStep[istep] = self.__fileID.tell()
self.__numSetPerStep[istep] = self.__currentStepNumSet
if self.__currentStepSize==-1:
break
else:
self.__fileID.seek(self.__currentStepSize,1)
istep = istep+1
nstep = istep
# Truncate buffered arrays
if nstep>self.__scanBuffSize:
warnings.warn('Buffer overflow detected: increase scanBuffSize.')
self.__timeStep = self.__timeStep[0:nstep]
self.__posStep = self.__posStep[0:nstep]
self.__numSetPerStep = self.__numSetPerStep[0:nstep]
# Set some internal variables
self.NumDataset = np.max(self.__numSetPerStep)
self.NumTimestep = nstep;
self.__isFileHeaderWritten = True;
self.__isStepHeaderWritten = True;
def initBuffer(self,rank=0,rankijk=(0,0,0),ftype=1999):
"""Initialize a buffer to generate a new UCF file."""
self.__bufNumSteps = 0
self.__bufStep = []
self.__bufParams = []
self.__bufData = []
self.__bufRank = rank
self.__bufRankijk = rankijk
self.__bufFileType = ftype
self.__bufAvailable = True
def addStepToBuffer(self,step=1,time=0.0):
"""Add a new step to buffer."""
if not self.__bufAvailable:
raise BufferError('Buffer has not been initialized.')
if step>self.__bufNumSteps:
self.__bufStep.extend([None] for ii in range(self.__bufNumSteps,step))
self.__bufParams.extend([] for ii in range(self.__bufNumSteps,step))
self.__bufData.extend([] for ii in range(self.__bufNumSteps,step))
self.__bufNumSteps = step
self.__bufStep[step-1] = time
def addDatasetToBuffer(self,data,params=None,step=1,dset=1):
"""Add a new dataset to specified step of buffer."""
if not self.__bufAvailable:
raise BufferError('Buffer has not been initialized.')
if step>self.__bufNumSteps:
raise ValueError('Requested step does not exist.')
if not hasattr(data,'dtype'):
raise TypeError('Cannot determine datatype of provided data')
if not hasattr(data,'nbytes'):
raise TypeError('Cannot determine number of bytes of provided data')
if not hasattr(data,'tobytes'):
raise TypeError('Cannot convert provided data to bytes')
if not hasattr(data,'shape'):
raise TypeError('Cannot determine shape of provided data')
if params is not None and not all(np.issubdtype(type(ii),np.integer) for ii in params):
raise TypeError('Parameters must be provided as integer')
nset = len(self.__bufData[step-1])
if dset>nset:
self.__bufParams[step-1].extend(None for ii in range(nset,dset))
self.__bufData[step-1].extend(None for ii in range(nset,dset))
self.__bufParams[step-1][dset-1] = params
self.__bufData[step-1][dset-1] = data
def copyStepToBuffer(self,step_in,step_out=1,recursive=False,singlePrecision=False):
"""Copy a step from an input file to output buffer. If recursive copying is activated, all datasets
within the step will be copied, otherwise only the step header is copied without datasets.
If datasets are copied, the precision can be reduced using the 'singlePrecision' flag."""
if not self.__inputAvailable:
raise IOError('No input file available')
if not self.__bufAvailable:
raise BufferError('Buffer has not been initialized.')
self.addStepToBuffer(step=step_out,time=self.__timeStep[step_in-1])
if recursive:
for dset in range(0,self.__numSetPerStep[step_in-1]):
self.copyDatasetToBuffer(step_in,dset+1,step_out=step_out,dset_out=dset+1,singlePrecision=singlePrecision)
def copyDatasetToBuffer(self,step_in,dset_in,step_out=1,dset_out=1,singlePrecision=False):
"""Copy a dataset from an input file to output buffer at specified step. The precision of the
dataset can be reduced using the 'singlePrecision' flag."""
if not self.__inputAvailable:
raise IOError('No input file available')
if not self.__bufAvailable:
raise BufferError('Buffer has not been initialized.')
(data,params) = self.readSet(step_in,dset_in)
if singlePrecision:
if data.dtype==np.dtype('float64'):
data = np.float32(data)
elif data.dtype==np.dtype('int64'):
data = np.int32(data)
self.addDatasetToBuffer(data,params=params,step=step_out,dset=dset_out)
def flushBuffer(self):
"""Returns the buffer as a bytes object, which can be written to a file using a file object."""
# Sanity check and size gathering
sizeStep = []
sizeSet = [[]]
for step in range(0,self.__bufNumSteps):
nset = len(self.__bufData[step])
tmpSizeStep = 0
if nset==0:
warnings.warn('Step #{} in buffer does not contain any dataset.'.format(step+1),RuntimeWarning)
for dset in range(0,nset):
if self.__bufData[step][dset] is None:
raise ValueError('Step #{}, dataset #{} does not contain any data.'.format(step+1,dset+1))
if self.__bufParams[step][dset] is None:
warnings.warn('No parameters were provided for step #{}, dataset #{}.'.format(step+1,dset+1),RuntimeWarning)
nparam==0
else:
nparam = len(self.__bufParams[step][dset])
sizeSet[step].append(self.__bufData[step][dset].nbytes)
tmpSizeStep += (self.__nHeaderSet+nparam)*self.__nByteHeaderSet
tmpSizeStep += self.__bufData[step][dset].nbytes
sizeStep.append(tmpSizeStep)
# Create output buffer
obuff = b''
# Build file header
magicFile = self.__magicFile
fileVersion = 2
unixTime = int(time.time())
fileType = self.__bufFileType
rank = self.__bufRank
(iproc,jproc,kproc) = self.__bufRankijk
if self.Debug:
print('Write the following file header at {} bytes'.format(len(obuff)),file=sys.stderr)
print((magicFile,fileVersion,unixTime,fileType,rank,iproc,jproc,kproc),file=sys.stderr)
obuff += struct.pack('qqqqqqqq',magicFile,fileVersion,unixTime,fileType,rank,iproc,jproc,kproc)
# Build step header
for step in range(0,self.__bufNumSteps):
if self.Verbosity:
print('Adding step #{} to output buffer'.format(step+1),file=sys.stderr)
magicStep = self.__magicStep
stepBytes = sizeStep[step]
stepTime = self.__bufStep[step]
nset = len(self.__bufData[step])
if self.Debug:
print('Write the following step header at {} bytes'.format(len(obuff)),file=sys.stderr)
print((magicStep,stepBytes,stepTime,nset),file=sys.stderr)
obuff += struct.pack('qqdq',magicStep,stepBytes,stepTime,nset)
# Build dataset headers + attach data
for dset in range(0,nset):
if self.Verbosity:
print(' dataset #{}'.format(dset+1),file=sys.stderr)
magicSet = self.__magicSet
setSize = sizeSet[step][dset]
nptype = self.__bufData[step][dset].dtype
if nptype==np.dtype('int32'):
dtEncoded = 11
elif nptype==np.dtype('int64'):
dtEncoded = 12
elif nptype==np.dtype('float32'):
dtEncoded = 21
elif nptype==np.dtype('float64'):
dtEncoded = 22
else:
raise TypeError('Data at step #{}, dataset #{} has an invalid datatype.'.format(step+1,dset+1))
if self.__bufParams[step][dset] is None:
nparam = 0
else:
nparam = len(self.__bufParams[step][dset])
if self.Debug:
print('Write the following set header at {} bytes'.format(len(obuff)),file=sys.stderr)
print((magicSet,setSize,dtEncoded,nparam),file=sys.stderr)
print('with parameters:',file=sys.stderr)
print(self.__bufParams[step][dset],file=sys.stderr)
obuff += struct.pack('qqqq',magicSet,setSize,dtEncoded,nparam)
if nparam!=0:
obuff += struct.pack(nparam*'q',*self.__bufParams[step][dset])
obuff += self.__bufData[step][dset].tobytes('F')
# Return bytes
return obuff
def readSet(self,step=1,dset=1,memmap=False):
"""Read a dataset from input file. If 'memmap' is activated, the file will only be read partially on demand."""
self.__fileID.seek(self.__findSet(step,dset),0)
self.__readHeaderSet();
params = self.__currentSetParams
if memmap:
if self.__external:
raise TypeError('Cannont memory map from tar-archive (yet)')
else:
data = np.memmap(self.__fileID,dtype=self.__currentSetDatatype,offset=self.__fileID.tell(),mode='c')
else:
if self.__external:
data = np.frombuffer(self.__fileID.read(self.__currentSetSize),dtype=self.__currentSetDatatype)
else:
data = np.fromfile(self.__fileID,dtype=self.__currentSetDatatype,count=self.__currentSetNumElements)
return (data,params)
def __readHeaderFile(self):
self.__fileID.seek(self.__fileBeg,0);
# Determine endianess
mfmt = "<>"
buff = self.__fileID.read(8)
for fmt in mfmt:
currentMagic = struct.unpack("%sq"%fmt,buff)[0]
if currentMagic==self.__magicFile:
break
if currentMagic!=self.__magicFile:
raise ValueError("Magic mismatch: invalid file header. %l" % currentMagic)
self.Endian = fmt
# Read header
self.__fileID.seek(self.__fileBeg,0);
buff = self.__fileID.read(self.__nHeaderFile*8)
header = struct.unpack("%s%dq"%(self.Endian,8),buff)
if self.Debug:
print("Read the following file header at %d bytes" % 0,file=sys.stderr)
print(header,file=sys.stderr)
# Parse version
self.__versionMajor = np.floor(header[1]/self.__factorMajor)
self.__versionMinor = np.floor(np.mod(header[1],self.__factorMajor)/self.__factorMinor)
self.__versionPatch = np.floor(np.mod(header[1],self.__factorMinor)/self.__factorPatch)
self.CodeVersion = "%d.%d.%d" %(self.__versionMajor,self.__versionMinor,self.__versionPatch)
self.UCFVersion = np.mod(header[1],self.__factorPatch);
# Parse time stamp (UTC)
self.__creationTimeUnix = header[2];
self.CreationTime = datetime.utcfromtimestamp(self.__creationTimeUnix).strftime('%Y-%m-%d %H:%M:%S')
#Parse file type
self.__typeID = header[3];
typeDict = {
0: "grid",
10: "processor grid",
1000: "fluid snapshot",
1010: "scalar snapshot",
1999: "matlab field data",
2000: "particle snapshot",
2001: "particle append",
2011: "particle lagrange",
2021: "particle balancing",
2999: "matlab particle data",
3000: "statistics fluid",
3010: "statistics fluid pure",
3020: "statistics scalar"
}
self.Type = typeDict.get(self.__typeID,"unkmown")
# Parse file class
classDict = {
1: "field",
2: "particle",
3: "statistics"
}
self.Class = classDict.get(np.floor(self.__typeID/self.__factorTypeIDClass),"unknown");
# Parse IO rank
self.IORank = header[5:8];
def __readHeaderStep(self):
# Read and parse
self.__currentStepPosHeader = self.__fileID.tell()
buff = self.__fileID.read(self.__nHeaderStep*8)
header = struct.unpack("%s%dq"%(self.Endian,self.__nHeaderStep),buff)
self.__currentStepPosData = self.__fileID.tell()
currentMagic = header[0]
self.__currentStepSize = header[1]
self.__currentStepTime = struct.unpack("%sd"%self.Endian,buff[16:24])[0]
self.__currentStepNumSet = header[3]
if self.Debug:
print("Read the following step header at %d bytes" % self.__currentStepPosHeader,file=sys.stderr)
print("%d,%d,%f,%d" % (currentMagic,self.__currentStepSize,self.__currentStepTime,self.__currentStepNumSet),file=sys.stderr)
# Check if magic is correct
if currentMagic!=self.__magicStep:
raise ValueError("Magic mismatch: invalid step header. %d" & currentMagic);
def __readHeaderSet(self):
# Read and parse
self.__currentSetPosHeader = self.__fileID.tell()
buff = self.__fileID.read(self.__nHeaderSet*8)
header = struct.unpack("%s%dq"%(self.Endian,self.__nHeaderSet),buff)
self.__currentSetPosData = self.__fileID.tell()
currentMagic = header[0]
self.__currentSetSize = header[1]
self.__currentSetDatatypeNumeric = header[2]
dtSizeDict = {
11: 4,
12: 8,
21: 4,
22: 8
}
dtNameDict = {
11: "%si4" % self.Endian,
12: "%si8" % self.Endian,
21: "%sf4" % self.Endian,
22: "%sf8" % self.Endian
}
self.__currentSetSizeof = dtSizeDict[self.__currentSetDatatypeNumeric]
self.__currentSetDatatype = dtNameDict[self.__currentSetDatatypeNumeric]
self.__currentSetNumParams = header[3]
self.__currentSetNumElements = np.around(self.__currentSetSize/self.__currentSetSizeof).astype(np.int32)
if self.Debug:
print("Read the following set header at %d bytes" % self.__currentSetPosHeader,file=sys.stderr)
print(header,file=sys.stderr)
# Check if magic is correct
if currentMagic!=self.__magicSet:
raise ValueError("Magic mismatch: invalid dataset header. %d" % currentMagic)
# Read variable number of parameters
buff = self.__fileID.read(self.__currentSetNumParams*8)
self.__currentSetParams = struct.unpack("%s%dq"%(self.Endian,self.__currentSetNumParams),buff)
if self.Debug:
print('with parameters:',file=sys.stderr)
print(self.__currentSetParams,file=sys.stderr)
def __findSet(self,tstep,dset):
# Check input
if tstep>self.NumTimestep:
raise ValueError("Out of bounds: timestep. %d, %d" %(tstep,self.NumTimestep))
if dset>self.__numSetPerStep[tstep-1]:
raise ValueError("Out of bounds: dataset. %d, %d" % (dset,self.NumDataset))
# Navigate to correct set
self.__fileID.seek(self.__posStep[tstep-1],0)
for iset in range(0,dset-1):
self.__readHeaderSet()
self.__fileID.seek(self.__currentSetSize,1)
posHeader = self.__fileID.tell()
if self.Debug:
print("Found step #%d, set #%d at position %d" % (tstep,dset,posHeader),file=sys.stderr)
return posHeader
def __initializeConstants(self):
self.__magicFile = 81985529216486895;
self.__magicStep = 11944304052957;
self.__magicSet = 240217520921210;
self.__nHeaderFile = 8;
self.__nHeaderStep = 4;
self.__nHeaderSet = 4;
self.__nByteHeaderFile = 8;
self.__nByteHeaderStep = 8;
self.__nByteHeaderSet = 8;
self.__nSetParamsField = 10;
self.__nSetParamsParticle = 16;
self.__factorMajor = 1000000000;
self.__factorMinor = 1000000;
self.__factorPatch = 1000;
self.__factorTypeIDClass = 1000;
self.__factorTypeIDKind = 10;
self.__typeIDmatlabField = 1999;
self.__typeIDmatlabParticle = 2999;
self.__scanBuffSize = 4096;
def __resetPublicProperties(self):
self.File = '' # file name
self.Type = '' # file type
self.Class = '' # file class
self.Endian = '' # endianess
self.CodeVersion = '' # version of the simulation code
self.UCFVersion = '' # version of the data format ("unified container format")
self.NumDataset = 0 # maximum number of datasets in this file (over all time steps)
self.NumTimestep = 0 # number of time steps in this file
self.FileSize = 0 # file size
self.CreationTime = 0 # time of creation
self.IOMode = '' # file opened in read-only or read-write mode?
self.IORank = 0 # rank of processor + col,row,pln
self.Verbosity = 0 # verbose output?
self.Debug = 0 # debug information?
def __resetPrivateProperties(self):
self.__fileID = None
self.__fileBeg = 0
self.__typeID = 0
self.__creationTimeUnix = ''
self.__versionMajor = 0
self.__versionMinor = 0
self.__versionPatch = 0
self.__versionFile = 0
self.__posStep = 0
self.__numSetPerStep = 0
self.__timeStep = 0
self.__inputAvailable = False
self.__stream = False
self.__external = False
self.__bufAvailable = False
def __resetCurrentStep(self):
self.__currentStep = 0
self.__currentStepPosHeader = 0
self.__currentStepPosData = 0
self.__currentStepSize = 0
self.__currentStepTime = 0
self.__currentStepNumSet = 0
def __resetCurrentSet(self):
self.__currentSet = 0
self.__currentSetPosHeader = 0
self.__currentSetPosData = 0
self.__currentSetSize = 0
self.__currentSetDatatype = 0
self.__currentSetDatatypeNumeric = 0
self.__currentSetSizeof = 0
self.__currentSetNumParams = 0
self.__currentSetParams = 0
self.__currentSetNumElements = 0