From 7b8ed3e9af79c694d7e8f418ddd02333ead2fea8 Mon Sep 17 00:00:00 2001 From: "Michael Stumpf (ifhcluster)" Date: Fri, 6 Dec 2019 11:06:01 +0100 Subject: [PATCH] ucf base class added --- python/ucf/ucf.py | 457 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 457 insertions(+) create mode 100644 python/ucf/ucf.py diff --git a/python/ucf/ucf.py b/python/ucf/ucf.py new file mode 100644 index 0000000..70d3754 --- /dev/null +++ b/python/ucf/ucf.py @@ -0,0 +1,457 @@ +import sys +import io +import struct +import warnings +import numpy as np +import tarfile +import time +from datetime import datetime + +def __warning_format(message, category, filename, lineno, file=None, line=None): + return '%s:%s: %s:%s\n' % (filename, lineno, category.__name__, message) +warnings.formatwarning = __warning_format + +class UCF: + """UCF low-level access class""" + def __init__(self): + self.__initializeConstants() + self.__resetPublicProperties() + self.__resetPrivateProperties() + self.__resetCurrentStep() + self.__resetCurrentSet() + + def open(self,file): + """Opens an input stream for reading access. The variable 'file' can be of the following types: + str opens a file on disk whose path is specified by 'file' + tarfile.ExFileObject read from a file inside a tar archive (use tarfile.extractfile to generate an instance for 'file') + bytes data which is already located in memory as a bytes or bytearray object (can be used for streams) + """ + # Check what class 'file' belongs to and treat it accordingly + if isinstance(file,str): + self.File = file + self.__external = False + self.__stream = False + self.__fileID = open(self.File,self.IOMode) + self.__inputAvailable = True + elif isinstance(file,tarfile.ExFileObject): + self.File = file.name + self.__external = True + self.__stream = False + self.__fileID = file + self.__inputAvailable = True + elif isinstance(file,bytes) or isinstance(file,bytearray): + self.File = 'byte-stream' + self.__external = True + self.__stream = True + self.__fileID = io.BytesIO(file) + self.__inputAvailable = True + + # Determine file size + self.__fileID.seek(0,2) + self.FileSize = self.__fileID.tell() + self.__fileID.seek(self.__fileBeg,0) + + # Read the header of the file + self.__readHeaderFile() + + # Scan through file to get the basic structure (steps/sets) + self.__timeStep = np.zeros(self.__scanBuffSize,dtype=np.float64) + self.__posStep = np.zeros(self.__scanBuffSize,dtype=np.int32) + self.__numSetPerStep = np.zeros(self.__scanBuffSize,dtype=np.int32) + istep = 0; + while self.__fileID.tell()self.__scanBuffSize: + warnings.warn('Buffer overflow detected: increase scanBuffSize.') + self.__timeStep = self.__timeStep[0:nstep] + self.__posStep = self.__posStep[0:nstep] + self.__numSetPerStep = self.__numSetPerStep[0:nstep] + + # Set some internal variables + self.NumDataset = np.max(self.__numSetPerStep) + self.NumTimestep = nstep; + self.__isFileHeaderWritten = True; + self.__isStepHeaderWritten = True; + + def initBuffer(self,rank=0,rankijk=(0,0,0),ftype=1999): + """Initialize a buffer to generate a new UCF file.""" + self.__bufNumSteps = 0 + self.__bufStep = [] + self.__bufParams = [] + self.__bufData = [] + self.__bufRank = rank + self.__bufRankijk = rankijk + self.__bufFileType = ftype + self.__bufAvailable = True + + def addStepToBuffer(self,step=1,time=0.0): + """Add a new step to buffer.""" + if not self.__bufAvailable: + raise BufferError('Buffer has not been initialized.') + if step>self.__bufNumSteps: + self.__bufStep.extend([None] for ii in range(self.__bufNumSteps,step)) + self.__bufParams.extend([] for ii in range(self.__bufNumSteps,step)) + self.__bufData.extend([] for ii in range(self.__bufNumSteps,step)) + self.__bufNumSteps = step + self.__bufStep[step-1] = time + + def addDatasetToBuffer(self,data,params=None,step=1,dset=1): + """Add a new dataset to specified step of buffer.""" + if not self.__bufAvailable: + raise BufferError('Buffer has not been initialized.') + if step>self.__bufNumSteps: + raise ValueError('Requested step does not exist.') + if not hasattr(data,'dtype'): + raise TypeError('Cannot determine datatype of provided data') + if not hasattr(data,'nbytes'): + raise TypeError('Cannot determine number of bytes of provided data') + if not hasattr(data,'tobytes'): + raise TypeError('Cannot convert provided data to bytes') + if not hasattr(data,'shape'): + raise TypeError('Cannot determine shape of provided data') + if params is not None and not all(np.issubdtype(type(ii),np.integer) for ii in params): + raise TypeError('Parameters must be provided as integer') + + nset = len(self.__bufData[step-1]) + if dset>nset: + self.__bufParams[step-1].extend(None for ii in range(nset,dset)) + self.__bufData[step-1].extend(None for ii in range(nset,dset)) + self.__bufParams[step-1][dset-1] = params + self.__bufData[step-1][dset-1] = data + + def copyStepToBuffer(self,step_in,step_out=1,recursive=False,singlePrecision=False): + """Copy a step from an input file to output buffer. If recursive copying is activated, all datasets + within the step will be copied, otherwise only the step header is copied without datasets. + If datasets are copied, the precision can be reduced using the 'singlePrecision' flag.""" + if not self.__inputAvailable: + raise IOError('No input file available') + if not self.__bufAvailable: + raise BufferError('Buffer has not been initialized.') + self.addStepToBuffer(step=step_out,time=self.__timeStep[step_in-1]) + if recursive: + for dset in range(0,self.__numSetPerStep[step_in-1]): + self.copyDatasetToBuffer(step_in,dset+1,step_out=step_out,dset_out=dset+1,singlePrecision=singlePrecision) + + def copyDatasetToBuffer(self,step_in,dset_in,step_out=1,dset_out=1,singlePrecision=False): + """Copy a dataset from an input file to output buffer at specified step. The precision of the + dataset can be reduced using the 'singlePrecision' flag.""" + if not self.__inputAvailable: + raise IOError('No input file available') + if not self.__bufAvailable: + raise BufferError('Buffer has not been initialized.') + (data,params) = self.readSet(step_in,dset_in) + if singlePrecision: + if data.dtype==np.dtype('float64'): + data = np.float32(data) + elif data.dtype==np.dtype('int64'): + data = np.int32(data) + self.addDatasetToBuffer(data,params=params,step=step_out,dset=dset_out) + + def flushBuffer(self): + """Returns the buffer as a bytes object, which can be written to a file using a file object.""" + # Sanity check and size gathering + sizeStep = [] + sizeSet = [[]] + for step in range(0,self.__bufNumSteps): + nset = len(self.__bufData[step]) + tmpSizeStep = 0 + if nset==0: + warnings.warn('Step #{} in buffer does not contain any dataset.'.format(step+1),RuntimeWarning) + for dset in range(0,nset): + if self.__bufData[step][dset] is None: + raise ValueError('Step #{}, dataset #{} does not contain any data.'.format(step+1,dset+1)) + if self.__bufParams[step][dset] is None: + warnings.warn('No parameters were provided for step #{}, dataset #{}.'.format(step+1,dset+1),RuntimeWarning) + nparam==0 + else: + nparam = len(self.__bufParams[step][dset]) + sizeSet[step].append(self.__bufData[step][dset].nbytes) + tmpSizeStep += (self.__nHeaderSet+nparam)*self.__nByteHeaderSet + tmpSizeStep += self.__bufData[step][dset].nbytes + sizeStep.append(tmpSizeStep) + + # Create output buffer + obuff = b'' + + # Build file header + magicFile = self.__magicFile + fileVersion = 2 + unixTime = int(time.time()) + fileType = self.__bufFileType + rank = self.__bufRank + (iproc,jproc,kproc) = self.__bufRankijk + if self.Debug: + print('Write the following file header at {} bytes'.format(len(obuff)),file=sys.stderr) + print((magicFile,fileVersion,unixTime,fileType,rank,iproc,jproc,kproc),file=sys.stderr) + obuff += struct.pack('qqqqqqqq',magicFile,fileVersion,unixTime,fileType,rank,iproc,jproc,kproc) + + # Build step header + for step in range(0,self.__bufNumSteps): + if self.Verbosity: + print('Adding step #{} to output buffer'.format(step+1),file=sys.stderr) + magicStep = self.__magicStep + stepBytes = sizeStep[step] + stepTime = self.__bufStep[step] + nset = len(self.__bufData[step]) + if self.Debug: + print('Write the following step header at {} bytes'.format(len(obuff)),file=sys.stderr) + print((magicStep,stepBytes,stepTime,nset),file=sys.stderr) + obuff += struct.pack('qqdq',magicStep,stepBytes,stepTime,nset) + # Build dataset headers + attach data + for dset in range(0,nset): + if self.Verbosity: + print(' dataset #{}'.format(dset+1),file=sys.stderr) + magicSet = self.__magicSet + setSize = sizeSet[step][dset] + nptype = self.__bufData[step][dset].dtype + if nptype==np.dtype('int32'): + dtEncoded = 11 + elif nptype==np.dtype('int64'): + dtEncoded = 12 + elif nptype==np.dtype('float32'): + dtEncoded = 21 + elif nptype==np.dtype('float64'): + dtEncoded = 22 + else: + raise TypeError('Data at step #{}, dataset #{} has an invalid datatype.'.format(step+1,dset+1)) + if self.__bufParams[step][dset] is None: + nparam = 0 + else: + nparam = len(self.__bufParams[step][dset]) + if self.Debug: + print('Write the following set header at {} bytes'.format(len(obuff)),file=sys.stderr) + print((magicSet,setSize,dtEncoded,nparam),file=sys.stderr) + print('with parameters:',file=sys.stderr) + print(self.__bufParams[step][dset],file=sys.stderr) + + obuff += struct.pack('qqqq',magicSet,setSize,dtEncoded,nparam) + if nparam!=0: + obuff += struct.pack(nparam*'q',*self.__bufParams[step][dset]) + obuff += self.__bufData[step][dset].tobytes('F') + # Return bytes + return obuff + + def readSet(self,step=1,dset=1,memmap=False): + """Read a dataset from input file. If 'memmap' is activated, the file will only be read partially on demand.""" + self.__fileID.seek(self.__findSet(step,dset),0) + self.__readHeaderSet(); + params = self.__currentSetParams + if memmap: + if self.__external: + raise TypeError('Cannont memory map from tar-archive (yet)') + else: + data = np.memmap(self.__fileID,dtype=self.__currentSetDatatype,offset=self.__fileID.tell(),mode='c') + else: + if self.__external: + data = np.frombuffer(self.__fileID.read(self.__currentSetSize),dtype=self.__currentSetDatatype) + else: + data = np.fromfile(self.__fileID,dtype=self.__currentSetDatatype,count=self.__currentSetNumElements) + return (data,params) + + def __readHeaderFile(self): + self.__fileID.seek(self.__fileBeg,0); + # Determine endianess + mfmt = "<>" + buff = self.__fileID.read(8) + for fmt in mfmt: + currentMagic = struct.unpack("%sq"%fmt,buff)[0] + if currentMagic==self.__magicFile: + break + if currentMagic!=self.__magicFile: + raise ValueError("Magic mismatch: invalid file header. %l" % currentMagic) + self.Endian = fmt + # Read header + self.__fileID.seek(self.__fileBeg,0); + buff = self.__fileID.read(self.__nHeaderFile*8) + header = struct.unpack("%s%dq"%(self.Endian,8),buff) + if self.Debug: + print("Read the following file header at %d bytes" % 0,file=sys.stderr) + print(header,file=sys.stderr) + # Parse version + self.__versionMajor = np.floor(header[1]/self.__factorMajor) + self.__versionMinor = np.floor(np.mod(header[1],self.__factorMajor)/self.__factorMinor) + self.__versionPatch = np.floor(np.mod(header[1],self.__factorMinor)/self.__factorPatch) + self.CodeVersion = "%d.%d.%d" %(self.__versionMajor,self.__versionMinor,self.__versionPatch) + self.UCFVersion = np.mod(header[1],self.__factorPatch); + # Parse time stamp (UTC) + self.__creationTimeUnix = header[2]; + self.CreationTime = datetime.utcfromtimestamp(self.__creationTimeUnix).strftime('%Y-%m-%d %H:%M:%S') + #Parse file type + self.__typeID = header[3]; + typeDict = { + 0: "grid", + 10: "processor grid", + 1000: "fluid snapshot", + 1010: "scalar snapshot", + 1999: "matlab field data", + 2000: "particle snapshot", + 2001: "particle append", + 2011: "particle lagrange", + 2021: "particle balancing", + 2999: "matlab particle data", + 3000: "statistics fluid", + 3010: "statistics fluid pure", + 3020: "statistics scalar" + } + self.Type = typeDict.get(self.__typeID,"unkmown") + # Parse file class + classDict = { + 1: "field", + 2: "particle", + 3: "statistics" + } + self.Class = classDict.get(np.floor(self.__typeID/self.__factorTypeIDClass),"unknown"); + # Parse IO rank + self.IORank = header[5:8]; + + def __readHeaderStep(self): + # Read and parse + self.__currentStepPosHeader = self.__fileID.tell() + buff = self.__fileID.read(self.__nHeaderStep*8) + header = struct.unpack("%s%dq"%(self.Endian,self.__nHeaderStep),buff) + self.__currentStepPosData = self.__fileID.tell() + currentMagic = header[0] + self.__currentStepSize = header[1] + self.__currentStepTime = struct.unpack("%sd"%self.Endian,buff[16:24])[0] + self.__currentStepNumSet = header[3] + if self.Debug: + print("Read the following step header at %d bytes" % self.__currentStepPosHeader,file=sys.stderr) + print("%d,%d,%f,%d" % (currentMagic,self.__currentStepSize,self.__currentStepTime,self.__currentStepNumSet),file=sys.stderr) + # Check if magic is correct + if currentMagic!=self.__magicStep: + raise ValueError("Magic mismatch: invalid step header. %d" & currentMagic); + + def __readHeaderSet(self): + # Read and parse + self.__currentSetPosHeader = self.__fileID.tell() + buff = self.__fileID.read(self.__nHeaderSet*8) + header = struct.unpack("%s%dq"%(self.Endian,self.__nHeaderSet),buff) + self.__currentSetPosData = self.__fileID.tell() + currentMagic = header[0] + self.__currentSetSize = header[1] + self.__currentSetDatatypeNumeric = header[2] + dtSizeDict = { + 11: 4, + 12: 8, + 21: 4, + 22: 8 + } + dtNameDict = { + 11: "%si4" % self.Endian, + 12: "%si8" % self.Endian, + 21: "%sf4" % self.Endian, + 22: "%sf8" % self.Endian + } + self.__currentSetSizeof = dtSizeDict[self.__currentSetDatatypeNumeric] + self.__currentSetDatatype = dtNameDict[self.__currentSetDatatypeNumeric] + self.__currentSetNumParams = header[3] + self.__currentSetNumElements = np.around(self.__currentSetSize/self.__currentSetSizeof).astype(np.int32) + if self.Debug: + print("Read the following set header at %d bytes" % self.__currentSetPosHeader,file=sys.stderr) + print(header,file=sys.stderr) + # Check if magic is correct + if currentMagic!=self.__magicSet: + raise ValueError("Magic mismatch: invalid dataset header. %d" % currentMagic) + # Read variable number of parameters + buff = self.__fileID.read(self.__currentSetNumParams*8) + self.__currentSetParams = struct.unpack("%s%dq"%(self.Endian,self.__currentSetNumParams),buff) + if self.Debug: + print('with parameters:',file=sys.stderr) + print(self.__currentSetParams,file=sys.stderr) + + def __findSet(self,tstep,dset): + # Check input + if tstep>self.NumTimestep: + raise ValueError("Out of bounds: timestep. %d, %d" %(tstep,self.NumTimestep)) + if dset>self.__numSetPerStep[tstep-1]: + raise ValueError("Out of bounds: dataset. %d, %d" % (dset,self.NumDataset)) + # Navigate to correct set + self.__fileID.seek(self.__posStep[tstep-1],0) + for iset in range(0,dset-1): + self.__readHeaderSet() + self.__fileID.seek(self.__currentSetSize,1) + posHeader = self.__fileID.tell() + if self.Debug: + print("Found step #%d, set #%d at position %d" % (tstep,dset,posHeader),file=sys.stderr) + return posHeader + + def __initializeConstants(self): + self.__magicFile = 81985529216486895; + self.__magicStep = 11944304052957; + self.__magicSet = 240217520921210; + self.__nHeaderFile = 8; + self.__nHeaderStep = 4; + self.__nHeaderSet = 4; + self.__nByteHeaderFile = 8; + self.__nByteHeaderStep = 8; + self.__nByteHeaderSet = 8; + self.__nSetParamsField = 10; + self.__nSetParamsParticle = 16; + self.__factorMajor = 1000000000; + self.__factorMinor = 1000000; + self.__factorPatch = 1000; + self.__factorTypeIDClass = 1000; + self.__factorTypeIDKind = 10; + self.__typeIDmatlabField = 1999; + self.__typeIDmatlabParticle = 2999; + self.__scanBuffSize = 4096; + def __resetPublicProperties(self): + self.File = '' # file name + self.Type = '' # file type + self.Class = '' # file class + self.Endian = '' # endianess + self.CodeVersion = '' # version of the simulation code + self.UCFVersion = '' # version of the data format ("unified container format") + self.NumDataset = 0 # maximum number of datasets in this file (over all time steps) + self.NumTimestep = 0 # number of time steps in this file + self.FileSize = 0 # file size + self.CreationTime = 0 # time of creation + self.IOMode = '' # file opened in read-only or read-write mode? + self.IORank = 0 # rank of processor + col,row,pln + self.Verbosity = 0 # verbose output? + self.Debug = 0 # debug information? + def __resetPrivateProperties(self): + self.__fileID = None + self.__fileBeg = 0 + self.__typeID = 0 + self.__creationTimeUnix = '' + self.__versionMajor = 0 + self.__versionMinor = 0 + self.__versionPatch = 0 + self.__versionFile = 0 + self.__posStep = 0 + self.__numSetPerStep = 0 + self.__timeStep = 0 + self.__inputAvailable = False + self.__stream = False + self.__external = False + self.__bufAvailable = False + def __resetCurrentStep(self): + self.__currentStep = 0 + self.__currentStepPosHeader = 0 + self.__currentStepPosData = 0 + self.__currentStepSize = 0 + self.__currentStepTime = 0 + self.__currentStepNumSet = 0 + def __resetCurrentSet(self): + self.__currentSet = 0 + self.__currentSetPosHeader = 0 + self.__currentSetPosData = 0 + self.__currentSetSize = 0 + self.__currentSetDatatype = 0 + self.__currentSetDatatypeNumeric = 0 + self.__currentSetSizeof = 0 + self.__currentSetNumParams = 0 + self.__currentSetParams = 0 + self.__currentSetNumElements = 0