import sys import io import struct import warnings import numpy as np import tarfile import time from datetime import datetime def __warning_format(message, category, filename, lineno, file=None, line=None): return '%s:%s: %s:%s\n' % (filename, lineno, category.__name__, message) warnings.formatwarning = __warning_format ############################# # Low-level class interface # ############################# class UCF: """UCF low-level access class""" def __init__(self,file=None,verbosity=False,debug=False): self.__initializeConstants() self.__resetPublicProperties() self.__resetPrivateProperties() self.__resetCurrentStep() self.__resetCurrentSet() if file is not None: self.open(file) self.Debug = debug self.Verbosity = verbosity def open(self,file): """Opens an input stream for reading access. The variable 'file' can be of the following types: str opens a file on disk whose path is specified by 'file' tarfile.ExFileObject read from a file inside a tar archive (use tarfile.extractfile to generate an instance for 'file') bytes data which is already located in memory as a bytes or bytearray object (can be used for streams) """ # Check what class 'file' belongs to and treat it accordingly if isinstance(file,str): self.File = file self.__external = False self.__stream = False self.__fileID = open(self.File,'rb') self.__inputAvailable = True elif isinstance(file,tarfile.ExFileObject): self.File = file.name self.__external = True self.__stream = False self.__fileID = file self.__inputAvailable = True elif isinstance(file,bytes) or isinstance(file,bytearray): self.File = 'byte-stream' self.__external = True self.__stream = True self.__fileID = io.BytesIO(file) self.__inputAvailable = True # Determine file size self.__fileID.seek(0,2) self.FileSize = self.__fileID.tell() self.__fileID.seek(self.__fileBeg,0) # Read the header of the file self.__readHeaderFile() # Scan through file to get the basic structure (steps/sets) self.__timeStep = np.zeros(self.__scanBuffSize,dtype=np.float64) self.__posStep = np.zeros(self.__scanBuffSize,dtype=np.int32) self.__numSetPerStep = np.zeros(self.__scanBuffSize,dtype=np.int32) istep = 0; while self.__fileID.tell()self.__scanBuffSize: warnings.warn('Buffer overflow detected: increase scanBuffSize.') self.__timeStep = self.__timeStep[0:nstep] self.__posStep = self.__posStep[0:nstep] self.__numSetPerStep = self.__numSetPerStep[0:nstep] # Set some internal variables self.NumDataset = np.max(self.__numSetPerStep) self.NumTimestep = nstep; self.__isFileHeaderWritten = True; self.__isStepHeaderWritten = True; def close(self): """Closes input file object""" if not isinstance(self.__fileID,tarfile.ExFileObject): self.__fileID.close() self.__init__ def addFileHeaderToBuffer(self,rank=0,rankijk=(0,0,0),ftype=1999): """Initialize a buffer to generate a new UCF file.""" self.__bufNumSteps = 0 self.__bufStep = [] self.__bufParams = [] self.__bufData = [] self.__bufRank = rank self.__bufRankijk = rankijk self.__bufFileType = ftype self.__bufAvailable = True def addStepToBuffer(self,step=1,time=0.0): """Add a new step to buffer.""" if not self.__bufAvailable: raise BufferError('Buffer has not been initialized.') if step>self.__bufNumSteps: self.__bufStep.extend([None] for ii in range(self.__bufNumSteps,step)) self.__bufParams.extend([] for ii in range(self.__bufNumSteps,step)) self.__bufData.extend([] for ii in range(self.__bufNumSteps,step)) self.__bufNumSteps = step self.__bufStep[step-1] = time def addDatasetToBuffer(self,data,params=None,step=1,dset=1): """Add a new dataset to specified step of buffer.""" if not self.__bufAvailable: raise BufferError('Buffer has not been initialized.') if step>self.__bufNumSteps: raise ValueError('Requested step does not exist.') if not hasattr(data,'dtype'): raise TypeError('Cannot determine datatype of provided data') if not hasattr(data,'nbytes'): raise TypeError('Cannot determine number of bytes of provided data') if not hasattr(data,'tobytes'): raise TypeError('Cannot convert provided data to bytes') if not hasattr(data,'shape'): raise TypeError('Cannot determine shape of provided data') if params is not None and not all(np.issubdtype(type(ii),np.integer) for ii in params): raise TypeError('Parameters must be provided as integer') nset = len(self.__bufData[step-1]) if dset>nset: self.__bufParams[step-1].extend(None for ii in range(nset,dset)) self.__bufData[step-1].extend(None for ii in range(nset,dset)) self.__bufParams[step-1][dset-1] = params self.__bufData[step-1][dset-1] = data def copyFileHeaderToBuffer(self): if not self.__inputAvailable: raise IOError('No input file available') self.addFileHeaderToBuffer(rank=self.IORank[0],rankijk=self.IORank[1:],ftype=self.__typeID) def copyStepToBuffer(self,step_in,step_out=1,recursive=False,singlePrecision=False): """Copy a step from an input file to output buffer. If recursive copying is activated, all datasets within the step will be copied, otherwise only the step header is copied without datasets. If datasets are copied, the precision can be reduced using the 'singlePrecision' flag.""" if not self.__inputAvailable: raise IOError('No input file available') if not self.__bufAvailable: raise BufferError('Buffer has not been initialized.') self.addStepToBuffer(step=step_out,time=self.__timeStep[step_in-1]) if recursive: for dset in range(0,self.__numSetPerStep[step_in-1]): self.copyDatasetToBuffer(step_in,dset+1,step_out=step_out,dset_out=dset+1,singlePrecision=singlePrecision) def copyDatasetToBuffer(self,step_in,dset_in,step_out=1,dset_out=1,singlePrecision=False): """Copy a dataset from an input file to output buffer at specified step. The precision of the dataset can be reduced using the 'singlePrecision' flag.""" if not self.__inputAvailable: raise IOError('No input file available') if not self.__bufAvailable: raise BufferError('Buffer has not been initialized.') (data,params) = self.readSet(step_in,dset_in) if singlePrecision: if data.dtype==np.dtype('float64'): data = np.float32(data) elif data.dtype==np.dtype('int64'): data = np.int32(data) self.addDatasetToBuffer(data,params=params,step=step_out,dset=dset_out) def flushBuffer(self): """Returns the buffer as a bytes object, which can be written to a file using a file object.""" # Sanity check and size gathering sizeStep = [] sizeSet = [[]] for step in range(0,self.__bufNumSteps): nset = len(self.__bufData[step]) tmpSizeStep = 0 if nset==0: warnings.warn('Step #{} in buffer does not contain any dataset.'.format(step+1),RuntimeWarning) for dset in range(0,nset): if self.__bufData[step][dset] is None: raise ValueError('Step #{}, dataset #{} does not contain any data.'.format(step+1,dset+1)) if self.__bufParams[step][dset] is None: warnings.warn('No parameters were provided for step #{}, dataset #{}.'.format(step+1,dset+1),RuntimeWarning) nparam==0 else: nparam = len(self.__bufParams[step][dset]) sizeSet[step].append(self.__bufData[step][dset].nbytes) tmpSizeStep += (self.__nHeaderSet+nparam)*self.__nByteHeaderSet tmpSizeStep += self.__bufData[step][dset].nbytes sizeStep.append(tmpSizeStep) # Create output buffer obuff = b'' # Build file header magicFile = self.__magicFile fileVersion = 2 unixTime = int(time.time()) fileType = self.__bufFileType rank = self.__bufRank (iproc,jproc,kproc) = self.__bufRankijk if self.Debug: print('Write the following file header at {} bytes'.format(len(obuff)),file=sys.stderr) print((magicFile,fileVersion,unixTime,fileType,rank,iproc,jproc,kproc),file=sys.stderr) obuff += struct.pack('qqqqqqqq',magicFile,fileVersion,unixTime,fileType,rank,iproc,jproc,kproc) # Build step header for step in range(0,self.__bufNumSteps): if self.Verbosity: print('Adding step #{} to output buffer'.format(step+1),file=sys.stderr) magicStep = self.__magicStep stepBytes = sizeStep[step] stepTime = self.__bufStep[step] nset = len(self.__bufData[step]) if self.Debug: print('Write the following step header at {} bytes'.format(len(obuff)),file=sys.stderr) print((magicStep,stepBytes,stepTime,nset),file=sys.stderr) obuff += struct.pack('qqdq',magicStep,stepBytes,stepTime,nset) # Build dataset headers + attach data for dset in range(0,nset): if self.Verbosity: print(' dataset #{}'.format(dset+1),file=sys.stderr) magicSet = self.__magicSet setSize = sizeSet[step][dset] nptype = self.__bufData[step][dset].dtype if nptype==np.dtype('int32'): dtEncoded = 11 elif nptype==np.dtype('int64'): dtEncoded = 12 elif nptype==np.dtype('float32'): dtEncoded = 21 elif nptype==np.dtype('float64'): dtEncoded = 22 else: raise TypeError('Data at step #{}, dataset #{} has an invalid datatype.'.format(step+1,dset+1)) if self.__bufParams[step][dset] is None: nparam = 0 else: nparam = len(self.__bufParams[step][dset]) if self.Debug: print('Write the following set header at {} bytes'.format(len(obuff)),file=sys.stderr) print((magicSet,setSize,dtEncoded,nparam),file=sys.stderr) print('with parameters:',file=sys.stderr) print(self.__bufParams[step][dset],file=sys.stderr) obuff += struct.pack('qqqq',magicSet,setSize,dtEncoded,nparam) if nparam!=0: obuff += struct.pack(nparam*'q',*self.__bufParams[step][dset]) obuff += self.__bufData[step][dset].tobytes('F') # Return bytes return obuff def readSet(self,step=1,dset=1,memmap=False): """Read a dataset from input file. If 'memmap' is activated, the file will only be read partially on demand.""" if not self.__inputAvailable: raise IOError('No input file available') self.__fileID.seek(self.__findSet(step,dset),0) self.__readHeaderSet(); params = self.__currentSetParams if memmap: if self.__external: raise TypeError('Cannont memory map from tar-archive (yet)') else: data = np.memmap(self.__fileID,dtype=self.__currentSetDatatype,offset=self.__fileID.tell(),mode='c') else: if self.__external: data = np.frombuffer(self.__fileID.read(self.__currentSetSize),dtype=self.__currentSetDatatype) else: data = np.fromfile(self.__fileID,dtype=self.__currentSetDatatype,count=self.__currentSetNumElements) return (data,params) def __readHeaderFile(self): self.__fileID.seek(self.__fileBeg,0); # Determine endianess mfmt = "<>" buff = self.__fileID.read(8) for fmt in mfmt: currentMagic = struct.unpack("%sq"%fmt,buff)[0] if currentMagic==self.__magicFile: break if currentMagic!=self.__magicFile: raise ValueError('Magic mismatch: invalid file header. {}'.format(currentMagic)) self.Endian = fmt # Read header self.__fileID.seek(self.__fileBeg,0); buff = self.__fileID.read(self.__nHeaderFile*8) header = struct.unpack("%s%dq"%(self.Endian,8),buff) if self.Debug: print('Read the following file header at 0 bytes',file=sys.stderr) print(header,file=sys.stderr) # Parse version self.__versionMajor = np.floor(header[1]/self.__factorMajor) self.__versionMinor = np.floor(np.mod(header[1],self.__factorMajor)/self.__factorMinor) self.__versionPatch = np.floor(np.mod(header[1],self.__factorMinor)/self.__factorPatch) self.CodeVersion = "%d.%d.%d" %(self.__versionMajor,self.__versionMinor,self.__versionPatch) self.UCFVersion = np.mod(header[1],self.__factorPatch); # Parse time stamp (UTC) self.__creationTimeUnix = header[2]; self.CreationTime = datetime.utcfromtimestamp(self.__creationTimeUnix).strftime('%Y-%m-%d %H:%M:%S') #Parse file type self.__typeID = header[3]; typeDict = { 0: "grid", 10: "processor grid", 1000: "fluid snapshot", 1010: "scalar snapshot", 1999: "matlab field data", 2000: "particle snapshot", 2001: "particle append", 2011: "particle lagrange", 2021: "particle balancing", 2999: "matlab particle data", 3000: "statistics fluid", 3010: "statistics fluid pure", 3020: "statistics scalar" } self.Type = typeDict.get(self.__typeID,"unkmown") # Parse file class classDict = { 1: "field", 2: "particle", 3: "statistics" } self.Class = classDict.get(np.floor(self.__typeID/self.__factorTypeIDClass),"unknown") # Parse IO rank self.IORank = header[4:8] def __readHeaderStep(self): # Read and parse self.__currentStepPosHeader = self.__fileID.tell() buff = self.__fileID.read(self.__nHeaderStep*8) header = struct.unpack("%s%dq"%(self.Endian,self.__nHeaderStep),buff) self.__currentStepPosData = self.__fileID.tell() currentMagic = header[0] self.__currentStepSize = header[1] self.__currentStepTime = struct.unpack("%sd"%self.Endian,buff[16:24])[0] self.__currentStepNumSet = header[3] if self.Debug: print("Read the following step header at %d bytes" % self.__currentStepPosHeader,file=sys.stderr) print("%d,%d,%f,%d" % (currentMagic,self.__currentStepSize,self.__currentStepTime,self.__currentStepNumSet),file=sys.stderr) # Check if magic is correct if currentMagic!=self.__magicStep: raise ValueError("Magic mismatch: invalid step header. %d" & currentMagic); def __readHeaderSet(self): # Read and parse self.__currentSetPosHeader = self.__fileID.tell() buff = self.__fileID.read(self.__nHeaderSet*8) header = struct.unpack("%s%dq"%(self.Endian,self.__nHeaderSet),buff) self.__currentSetPosData = self.__fileID.tell() currentMagic = header[0] self.__currentSetSize = header[1] self.__currentSetDatatypeNumeric = header[2] dtSizeDict = { 11: 4, 12: 8, 21: 4, 22: 8 } dtNameDict = { 11: "%si4" % self.Endian, 12: "%si8" % self.Endian, 21: "%sf4" % self.Endian, 22: "%sf8" % self.Endian } self.__currentSetSizeof = dtSizeDict[self.__currentSetDatatypeNumeric] self.__currentSetDatatype = dtNameDict[self.__currentSetDatatypeNumeric] self.__currentSetNumParams = header[3] self.__currentSetNumElements = np.around(self.__currentSetSize/self.__currentSetSizeof).astype(np.int32) if self.Debug: print("Read the following set header at %d bytes" % self.__currentSetPosHeader,file=sys.stderr) print(header,file=sys.stderr) # Check if magic is correct if currentMagic!=self.__magicSet: raise ValueError("Magic mismatch: invalid dataset header. %d" % currentMagic) # Read variable number of parameters buff = self.__fileID.read(self.__currentSetNumParams*8) self.__currentSetParams = struct.unpack("%s%dq"%(self.Endian,self.__currentSetNumParams),buff) if self.Debug: print('with parameters:',file=sys.stderr) print(self.__currentSetParams,file=sys.stderr) def __findSet(self,tstep,dset): # Check input if tstep>self.NumTimestep: raise ValueError("Out of bounds: timestep. %d, %d" %(tstep,self.NumTimestep)) if dset>self.__numSetPerStep[tstep-1]: raise ValueError("Out of bounds: dataset. %d, %d" % (dset,self.NumDataset)) # Navigate to correct set self.__fileID.seek(self.__posStep[tstep-1],0) for iset in range(0,dset-1): self.__readHeaderSet() self.__fileID.seek(self.__currentSetSize,1) posHeader = self.__fileID.tell() if self.Debug: print("Found step #%d, set #%d at position %d" % (tstep,dset,posHeader),file=sys.stderr) return posHeader def __initializeConstants(self): self.__magicFile = 81985529216486895; self.__magicStep = 11944304052957; self.__magicSet = 240217520921210; self.__nHeaderFile = 8; self.__nHeaderStep = 4; self.__nHeaderSet = 4; self.__nByteHeaderFile = 8; self.__nByteHeaderStep = 8; self.__nByteHeaderSet = 8; self.__nSetParamsField = 10; self.__nSetParamsParticle = 16; self.__factorMajor = 1000000000; self.__factorMinor = 1000000; self.__factorPatch = 1000; self.__factorTypeIDClass = 1000; self.__factorTypeIDKind = 10; self.__typeIDmatlabField = 1999; self.__typeIDmatlabParticle = 2999; self.__scanBuffSize = 4096; def __resetPublicProperties(self): self.File = '' # file name self.Type = '' # file type self.Class = '' # file class self.Endian = '' # endianess self.CodeVersion = '' # version of the simulation code self.UCFVersion = '' # version of the data format ("unified container format") self.NumDataset = 0 # maximum number of datasets in this file (over all time steps) self.NumTimestep = 0 # number of time steps in this file self.FileSize = 0 # file size self.CreationTime = 0 # time of creation self.IOMode = '' # file opened in read-only or read-write mode? self.IORank = 0 # rank of processor + col,row,pln self.Verbosity = 0 # verbose output? self.Debug = 0 # debug information? def __resetPrivateProperties(self): self.__fileID = None self.__fileBeg = 0 self.__typeID = 0 self.__creationTimeUnix = '' self.__versionMajor = 0 self.__versionMinor = 0 self.__versionPatch = 0 self.__versionFile = 0 self.__posStep = 0 self.__numSetPerStep = 0 self.__timeStep = 0 self.__inputAvailable = False self.__stream = False self.__external = False self.__bufAvailable = False def __resetCurrentStep(self): self.__currentStep = 0 self.__currentStepPosHeader = 0 self.__currentStepPosData = 0 self.__currentStepSize = 0 self.__currentStepTime = 0 self.__currentStepNumSet = 0 def __resetCurrentSet(self): self.__currentSet = 0 self.__currentSetPosHeader = 0 self.__currentSetPosData = 0 self.__currentSetSize = 0 self.__currentSetDatatype = 0 self.__currentSetDatatypeNumeric = 0 self.__currentSetSizeof = 0 self.__currentSetNumParams = 0 self.__currentSetParams = 0 self.__currentSetNumElements = 0 ################################# # High-level function interface # ################################# def readGrid(file,verbosity=False,debug=False): obj = UCF(file=file,verbosity=verbosity,debug=debug) output = [] for iset in range(0,obj.NumDataset): (data,params) = obj.readSet(step=1,dset=iset+1) nx = params[0] ny = params[1] nz = params[2] output.append(data[0:nx]) output.append(data[nx:nx+ny]) output.append(data[nx+ny:nx+ny+nz]) #if obj.UCFVersion<2: if obj.NumDataset<5: output.extend(output[-3:]) obj.close() return output def readProcgrid(file,verbosity=False,debug=False): obj = UCF(file=file,verbosity=verbosity,debug=debug) output = [] for iset in range(0,obj.NumDataset): (data,params) = obj.readSet(step=1,dset=iset+1) nxp = params[0] nyp = params[1] nzp = params[2] output.append(data[0:nxp]) # ibeg output.append(data[nxp:2*nxp]) # iend output.append(data[2*nxp:2*nxp+nyp]) # jbeg output.append(data[2*nxp+nyp:2*nxp+2*nyp]) # jend output.append(data[2*nxp+2*nyp:2*nxp+2*nyp+nzp]) # kbeg output.append(data[2*nxp+2*nyp+nzp:2*nxp+2*nyp*2*nzp]) # kend #if obj.UCFVersion<2: if obj.NumDataset<5: output.extend(output[-6:]) obj.close() return output def readFieldChunk(file,step=1,dset=-1,verbosity=False,debug=False): obj = UCF(file=file,verbosity=verbosity,debug=debug) if not isinstance(dset,list): if dset==-1: dset = range(1,obj.NumDataset+1) # fix that maybe later (this is maximum over all timesteps) else: dset = [dset] output = [] for ii in dset: tmp = dict() (data,params) = obj.readSet(step=step,dset=ii) tmp['ighost'] = params[0] tmp['ibeg'] = params[1] tmp['jbeg'] = params[2] tmp['kbeg'] = params[3] tmp['nxl'] = params[4] tmp['nyl'] = params[5] tmp['nzl'] = params[6] tmp['nx'] = params[7] tmp['ny'] = params[8] tmp['nz'] = params[9] tmp['data'] = data.reshape((tmp['nxl']+2*tmp['ighost'], tmp['nyl']+2*tmp['ighost'], tmp['nzl']+2*tmp['ighost']), order='F') tmp['rank'] = obj.IORank[0] tmp['rankijk']= obj.IORank[1:] output.append(tmp) obj.close() return output def readParticles(file,step=-1,verbosity=False,debug=False): # Check what kind of file was passed: standalone, tar, bytes # TBD: tar is not supported yet obj = UCF(file=file,verbosity=verbosity,debug=debug) if not isinstance(step,list): if step==-1: step = range(1,obj.NumTimestep+1) else: step = [step] # The output will be the following: # 1) numpy array with dimension (ncol,np,ntime) # 2) dictionary which specifies the columns # We read the data step by step in a list, which is then converted to a 3D array pp = [] for ii in step: (data,params) = obj.readSet(step=ii,dset=1) npart = params[0] ncol = params[1] ncol_rank = params[2] ncol_hybrid = params[3] ncol_dem = params[4] ncol_scalar = params[5] nscal = ncol_scalar//2 pp.append(data.reshape((ncol,npart),order='F')) # Close UCF obeject obj.close() # Convert list of 2D arrays to 3D array pp = np.stack(pp,axis=2) # Create the dictionary col = colmap_from_flags(ncol_rank,ncol_hybrid,ncol_dem,nscal) # Return result return (pp,col) def colmap_from_flags(irank,ihybrid,idem,iscal): '''Creates a dictionary which specifies the columns of a particle array.''' col = {} ioffset = 0 if irank>0: col['rank'] = ioffset; ioffset+=1 if ihybrid>0: col['id'] = ioffset; ioffset+=1 col['x'] = ioffset; ioffset+=1 col['y'] = ioffset; ioffset+=1 col['z'] = ioffset; ioffset+=1 col['r'] = ioffset; ioffset+=1 col['rho']= ioffset; ioffset+=1 col['ax'] = ioffset; ioffset+=1 col['ay'] = ioffset; ioffset+=1 col['az'] = ioffset; ioffset+=1 col['u'] = ioffset; ioffset+=1 col['v'] = ioffset; ioffset+=1 col['w'] = ioffset; ioffset+=1 col['ox'] = ioffset; ioffset+=1 col['oy'] = ioffset; ioffset+=1 col['oz'] = ioffset; ioffset+=1 col['fx'] = ioffset; ioffset+=1 col['fy'] = ioffset; ioffset+=1 col['fz'] = ioffset; ioffset+=1 col['tx'] = ioffset; ioffset+=1 col['ty'] = ioffset; ioffset+=1 col['tz'] = ioffset; ioffset+=1 if idem>0: col['fxc'] = ioffset; ioffset+=1 col['fyc'] = ioffset; ioffset+=1 col['fzc'] = ioffset; ioffset+=1 col['txc'] = ioffset; ioffset+=1 col['tyc'] = ioffset; ioffset+=1 col['tzc'] = ioffset; ioffset+=1 if iscal>0: for ii in range(0,iscal): col['s'+str(ii)] = ioffset; ioffset+=1 col['q'+str(ii)] = ioffset; ioffset+=1 return col