import warnings def __warning_format(message, category, filename, lineno, file=None, line=None): return '%s:%s: %s:%s\n' % (filename, lineno, category.__name__, message) warnings.formatwarning = __warning_format class UCF: """UCF low-level access class""" def __init__(self,file=None,verbosity=False,debug=False): self.__initializeConstants() self.__resetPublicProperties() self.__resetPrivateProperties() self.__resetCurrentStep() self.__resetCurrentSet() if file is not None: self.open(file) self.Debug = debug self.Verbosity = verbosity def open(self,file): """Opens an input stream for reading access. The variable 'file' can be of the following types: str opens a file on disk whose path is specified by 'file' bytes data which is already located in memory as a bytes or bytearray object (can be used for streams) """ from io import BytesIO import numpy # Check what class 'file' belongs to and treat it accordingly if isinstance(file,str): self.File = file self.__fileobj = open(self.File,'rb') self.__inputAvailable = True elif isinstance(file,bytes): self.File = 'byte-stream' self.__fileobj = BytesIO(file) self.__inputAvailable = True # Determine file size self.__fileobj.seek(0,2) self.FileSize = self.__fileobj.tell() self.__fileobj.seek(self.__fileBeg,0) # Read the header of the file self.__readHeaderFile() # Scan through file to get the basic structure (steps/sets) self.__timeStep = numpy.zeros(self.__scanBuffSize,dtype=numpy.float64) self.__posStep = numpy.zeros(self.__scanBuffSize,dtype=numpy.int64) self.__numSetPerStep = numpy.zeros(self.__scanBuffSize,dtype=numpy.int32) istep = 0; while self.__fileobj.tell()self.__scanBuffSize: warnings.warn('Buffer overflow detected: increase scanBuffSize.') self.__timeStep = self.__timeStep[0:nstep] self.__posStep = self.__posStep[0:nstep] self.__numSetPerStep = self.__numSetPerStep[0:nstep] # Set some internal variables self.NumDataset = numpy.max(self.__numSetPerStep) self.NumTimestep = nstep self.__isFileHeaderWritten = True self.__isStepHeaderWritten = True def close(self): """Closes input file object""" self.__fileobj.close() self.__init__() def addFileHeaderToBuffer(self,rank=0,rankijk=(0,0,0),ftype=1999): """Initialize a buffer to generate a new UCF file.""" self.__bufNumSteps = 0 self.__bufStep = [] self.__bufParams = [] self.__bufData = [] self.__bufRank = rank self.__bufRankijk = rankijk self.__bufFileType = ftype self.__bufAvailable = True def addStepToBuffer(self,step=1,time=0.0): """Add a new step to buffer.""" if not self.__bufAvailable: raise BufferError('Buffer has not been initialized.') if step>self.__bufNumSteps: self.__bufStep.extend([None] for ii in range(self.__bufNumSteps,step)) self.__bufParams.extend([] for ii in range(self.__bufNumSteps,step)) self.__bufData.extend([] for ii in range(self.__bufNumSteps,step)) self.__bufNumSteps = step self.__bufStep[step-1] = time def addDatasetToBuffer(self,data,params=None,step=1,dset=1): import numpy """Add a new dataset to specified step of buffer.""" if not self.__bufAvailable: raise BufferError('Buffer has not been initialized.') if step>self.__bufNumSteps: raise ValueError('Requested step does not exist.') if not hasattr(data,'dtype'): raise TypeError('Cannot determine datatype of provided data') if not hasattr(data,'nbytes'): raise TypeError('Cannot determine number of bytes of provided data') if not hasattr(data,'tobytes'): raise TypeError('Cannot convert provided data to bytes') if not hasattr(data,'shape'): raise TypeError('Cannot determine shape of provided data') if params is not None and not all(numpy.issubdtype(type(ii),numpy.integer) for ii in params): raise TypeError('Parameters must be provided as integer') nset = len(self.__bufData[step-1]) if dset>nset: self.__bufParams[step-1].extend(None for ii in range(nset,dset)) self.__bufData[step-1].extend(None for ii in range(nset,dset)) self.__bufParams[step-1][dset-1] = params self.__bufData[step-1][dset-1] = data def copyFileHeaderToBuffer(self): if not self.__inputAvailable: raise IOError('No input file available') self.addFileHeaderToBuffer(rank=self.IORank[0],rankijk=self.IORank[1:],ftype=self.__typeID) def copyStepToBuffer(self,step_in,step_out=1,recursive=False,singlePrecision=False): """Copy a step from an input file to output buffer. If recursive copying is activated, all datasets within the step will be copied, otherwise only the step header is copied without datasets. If datasets are copied, the precision can be reduced using the 'singlePrecision' flag.""" if not self.__inputAvailable: raise IOError('No input file available') if not self.__bufAvailable: raise BufferError('Buffer has not been initialized.') self.addStepToBuffer(step=step_out,time=self.__timeStep[step_in-1]) if recursive: for dset in range(0,self.__numSetPerStep[step_in-1]): self.copyDatasetToBuffer(step_in,dset+1,step_out=step_out,dset_out=dset+1,singlePrecision=singlePrecision) def copyDatasetToBuffer(self,step_in,dset_in,step_out=1,dset_out=1,singlePrecision=False): """Copy a dataset from an input file to output buffer at specified step. The precision of the dataset can be reduced using the 'singlePrecision' flag.""" import numpy if not self.__inputAvailable: raise IOError('No input file available') if not self.__bufAvailable: raise BufferError('Buffer has not been initialized.') (data,params) = self.readSet(step_in,dset_in) if singlePrecision: if data.dtype==numpy.dtype('float64'): data = numpy.float32(data) elif data.dtype==numpy.dtype('int64'): data = numpy.int32(data) self.addDatasetToBuffer(data,params=params,step=step_out,dset=dset_out) def flushBuffer(self): """Returns the buffer as a bytes object, which can be written to a file using a file object.""" from time import time from sys import stderr from struct import pack import numpy as np # Sanity check and size gathering sizeStep = [] sizeSet = [[]] for step in range(0,self.__bufNumSteps): nset = len(self.__bufData[step]) tmpSizeStep = 0 if nset==0: warnings.warn('Step #{} in buffer does not contain any dataset.'.format(step+1),RuntimeWarning) for dset in range(0,nset): if self.__bufData[step][dset] is None: raise ValueError('Step #{}, dataset #{} does not contain any data.'.format(step+1,dset+1)) if self.__bufParams[step][dset] is None: warnings.warn('No parameters were provided for step #{}, dataset #{}.'.format(step+1,dset+1),RuntimeWarning) nparam==0 else: nparam = len(self.__bufParams[step][dset]) sizeSet[step].append(self.__bufData[step][dset].nbytes) tmpSizeStep += (self.__nHeaderSet+nparam)*self.__nByteHeaderSet tmpSizeStep += self.__bufData[step][dset].nbytes sizeStep.append(tmpSizeStep) # Create output buffer obuff = b'' # Build file header magicFile = self.__magicFile fileVersion = 2 unixTime = int(time()) fileType = self.__bufFileType rank = self.__bufRank (iproc,jproc,kproc) = self.__bufRankijk if self.Debug: print('Write the following file header at {} bytes'.format(len(obuff)),file=stderr) print((magicFile,fileVersion,unixTime,fileType,rank,iproc,jproc,kproc),file=stderr) obuff += pack('qqqqqqqq',magicFile,fileVersion,unixTime,fileType,rank,iproc,jproc,kproc) # Build step header for step in range(0,self.__bufNumSteps): if self.Verbosity: print('Adding step #{} to output buffer'.format(step+1),file=stderr) magicStep = self.__magicStep stepBytes = sizeStep[step] stepTime = self.__bufStep[step] nset = len(self.__bufData[step]) if self.Debug: print('Write the following step header at {} bytes'.format(len(obuff)),file=stderr) print((magicStep,stepBytes,stepTime,nset),file=stderr) obuff += pack('qqdq',magicStep,stepBytes,stepTime,nset) # Build dataset headers + attach data for dset in range(0,nset): if self.Verbosity: print(' dataset #{}'.format(dset+1),file=stderr) magicSet = self.__magicSet setSize = sizeSet[step][dset] nptype = self.__bufData[step][dset].dtype if nptype==numpy.dtype('int32'): dtEncoded = 11 elif nptype==numpy.dtype('int64'): dtEncoded = 12 elif nptype==numpy.dtype('float32'): dtEncoded = 21 elif nptype==numpy.dtype('float64'): dtEncoded = 22 else: raise TypeError('Data at step #{}, dataset #{} has an invalid datatype.'.format(step+1,dset+1)) if self.__bufParams[step][dset] is None: nparam = 0 else: nparam = len(self.__bufParams[step][dset]) if self.Debug: print('Write the following set header at {} bytes'.format(len(obuff)),file=stderr) print((magicSet,setSize,dtEncoded,nparam),file=stderr) print('with parameters:',file=stderr) print(self.__bufParams[step][dset],file=stderr) obuff += pack('qqqq',magicSet,setSize,dtEncoded,nparam) if nparam!=0: obuff += pack(nparam*'q',*self.__bufParams[step][dset]) obuff += self.__bufData[step][dset].tobytes('F') # Return bytes return obuff def getTime(self,step=1): assert(step-1self.NumTimestep: raise ValueError("Out of bounds: timestep. %d, %d" %(tstep,self.NumTimestep)) if dset>self.__numSetPerStep[tstep-1]: raise ValueError("Out of bounds: dataset. %d, %d" % (dset,self.NumDataset)) # Navigate to correct set self.__fileobj.seek(self.__posStep[tstep-1],0) for iset in range(0,dset-1): self.__readHeaderSet() self.__fileobj.seek(self.__currentSetSize,1) posHeader = self.__fileobj.tell() if self.Debug: print("Found step #%d, set #%d at position %d" % (tstep,dset,posHeader),file=stderr) return posHeader def __initializeConstants(self): self.__magicFile = 81985529216486895; self.__magicStep = 11944304052957; self.__magicSet = 240217520921210; self.__nHeaderFile = 8; self.__nHeaderStep = 4; self.__nHeaderSet = 4; self.__nByteHeaderFile = 8; self.__nByteHeaderStep = 8; self.__nByteHeaderSet = 8; self.__nSetParamsField = 10; self.__nSetParamsParticle = 16; self.__factorMajor = 1000000000; self.__factorMinor = 1000000; self.__factorPatch = 1000; self.__factorTypeIDClass = 1000; self.__factorTypeIDKind = 10; self.__typeIDmatlabField = 1999; self.__typeIDmatlabParticle = 2999; self.__scanBuffSize = 131072; def __resetPublicProperties(self): self.File = '' # file name self.Type = '' # file type self.Class = '' # file class self.Endian = '' # endianess self.CodeVersion = '' # version of the simulation code self.UCFVersion = '' # version of the data format ("unified container format") self.NumDataset = 0 # maximum number of datasets in this file (over all time steps) self.NumTimestep = 0 # number of time steps in this file self.FileSize = 0 # file size self.CreationTime = 0 # time of creation self.IOMode = '' # file opened in read-only or read-write mode? self.IORank = 0 # rank of processor + col,row,pln self.Verbosity = 0 # verbose output? self.Debug = 0 # debug information? def __resetPrivateProperties(self): self.__fileobj = None self.__fileBeg = 0 self.__typeID = 0 self.__creationTimeUnix = '' self.__versionMajor = 0 self.__versionMinor = 0 self.__versionPatch = 0 self.__versionFile = 0 self.__posStep = 0 self.__numSetPerStep = 0 self.__timeStep = 0 self.__inputAvailable = False self.__bufAvailable = False def __resetCurrentStep(self): self.__currentStep = 0 self.__currentStepPosHeader = 0 self.__currentStepPosData = 0 self.__currentStepSize = 0 self.__currentStepTime = 0 self.__currentStepNumSet = 0 def __resetCurrentSet(self): self.__currentSet = 0 self.__currentSetPosHeader = 0 self.__currentSetPosData = 0 self.__currentSetSize = 0 self.__currentSetDatatype = 0 self.__currentSetDatatypeNumeric = 0 self.__currentSetSizeof = 0 self.__currentSetNumParams = 0 self.__currentSetParams = 0 self.__currentSetNumElements = 0 # TBD: refactor -> UCFCollection, UCFTarFile constructors etc class UCFTarFile: '''Superclass for instances of ucf.tar files.''' def __init__(self,file_tar,file_index=None): self.handler = Ustar(file_tar,file_index) self.verbose = False self.debug = False self._buffer_grid = None self._buffer_proc = None self._buffer_params = None def read_grid(self,key=None): self._initialize_buffer_grid() if key is not None: gidx = self.gidx_from_key(key) return self._buffer_grid[gidx] return self._buffer_grid def read_procgrid(self,key=None): self._initialize_buffer_proc() if key is not None: gidx = self.gidx_from_key(key) return self._buffer_proc[gidx] return self._buffer_proc def read_parameters(self): self._initialize_buffer_params() return self._buffer_params def bounds(self,axis=None): params = self.read_parameters() keys = ('a','b','c','d','e','f') if axis is None: return tuple(params.getfloat('geometry',key) for key in keys) else: assert axis<3, "'axis' must be smaller than 3" return tuple(params.getfloat('geometry',key) for key in keys[2*axis:2*(axis+1)]) def periodicity(self,axis=None): params = self.read_parameters() keys = ('xperiodic','yperiodic','zperiodic') if axis is None: return tuple(params.getboolean('geometry',key) for key in keys) else: assert axis<3, "'axis' must be smaller than 3" return params.getboolean('geometry',keys[axis]) def period(self,axis=None): if axis is None: return tuple(self.period(axis=ii) for ii in range(0,3)) else: assert axis<3, "'axis' must be smaller than 3" params = self.read_parameters() key_bound = ('b','d','f') key_period = ('xperiodic','yperiodic','zperiodic') period = params.getfloat('geometry',key_bound[axis]) isPeriodic = params.getboolean('geometry',key_period[axis]) return period if isPeriodic else None def procgrid(self): self._initialize_buffer_proc() proc = {} offset = 0 for key in ('u','v','w','p','s'): proc[key] = tuple(self._buffer_proc[offset]) offset += 1 return proc def origin(self): self._initialize_buffer_grid() origin = {} offset = 0 for key in ('u','v','w','p','s'): origin[key] = ( self._buffer_grid[offset][0][0], self._buffer_grid[offset][1][0], self._buffer_grid[offset][2][0]) offset +=1 return origin def spacing(self): self._initialize_buffer_grid() spacing = ( self._buffer_grid[0][0][1]-self._buffer_grid[0][0][0], self._buffer_grid[0][1][1]-self._buffer_grid[0][1][0], self._buffer_grid[0][2][1]-self._buffer_grid[0][2][0], ) return spacing def gidx_from_key(self,key): if key[0]=='u': return 0 elif key[0]=='v': return 1 elif key[0]=='w': return 2 elif key[0]=='p': return 3 elif key[0]=='s': return 4 else: raise ValueError("Invalid value of 'key'.") def _initialize_buffer_grid(self): if self._buffer_grid is None: data = self.handler.read('grid.bin') self._buffer_grid = read_grid(data,verbosity=self.verbose,debug=self.debug) return def _initialize_buffer_proc(self): if self._buffer_proc is None: data = self.handler.read('proc.bin') self._buffer_proc = read_procgrid(data,verbosity=self.verbose,debug=self.debug) return def _initialize_buffer_params(self): if self._buffer_params is None: data = self.handler.read('parameters.asc') self._buffer_params = read_parameters(data) return class UCFSnapshot(UCFTarFile): '''Handles a snapshot.ucf.tar file.''' def __init__(self,file_tar,file_index=None): super(UCFSnapshot,self).__init__(file_tar,file_index=file_index) file_name_string = '\t'.join(self.handler.file_name) if 'uvwp' in file_name_string: self.type = 'uvwp' elif 'scal' in file_name_string: self.type = 'scal' else: raise ValueError("Archive does not contain 'uvwp' nor 'scal' files.") self._nproc = sum(self.type in s for s in self.handler.file_name) self._nprocijk = None def read_particles(self): data = self.handler.read('particles.bin') return read_particles(data,step=1,verbosity=self.verbose,debug=self.debug) def read_chunk(self,rank,dset=None,keep_ghost=True): file_target = self.type+'.{:05d}'.format(rank) data = self.handler.read(file_target) return read_chunk(data,step=1,dset=dset,keep_ghost=keep_ghost, verbosity=self.verbose,debug=self.debug) def field_chunk(self,rank,key,keep_ghost=True): '''Returns chunk data as Field3d object.''' from .field import Field3d # Support a list/tuple of keys: this is not the most efficient # implementation but convenient if isinstance(key,(list,tuple)): return [self.field_chunk(rank,k,keep_ghost=keep_ghost) for k in key] dset = self.dset_from_key(key) chunk = self.read_chunk(rank,dset=dset,keep_ghost=keep_ghost) grid = self.read_grid(key=key) dx,dy,dz = (grid[0][1]-grid[0][0], grid[1][1]-grid[1][0], grid[2][1]-grid[2][0]) xo,yo,zo = (grid[0][chunk['ibeg']-1]-(chunk['ighost'])*dx, grid[1][chunk['jbeg']-1]-(chunk['ighost'])*dy, grid[2][chunk['kbeg']-1]-(chunk['ighost'])*dz) return Field3d(chunk['data'],(xo,yo,zo),(dx,dy,dz)) def particles(self,select_col=None): '''Returns particle data as Particles object.''' from .particle import Particles pp,col,time = self.read_particles() return Particles.from_array(pp,col,time,self.period(),select_col=select_col) def nproc(self,axis=None): if axis is None: return self._nproc else: assert axis<3, "'axis' must be smaller than 3" self._initialize_nprocijk() return self._nprocijk[axis] def ijk_from_rank(self,rank): assert rank0: col['rank'] = ioffset; ioffset+=1 if ihybrid>0: col['id'] = ioffset; ioffset+=1 col['x'] = ioffset; ioffset+=1 col['y'] = ioffset; ioffset+=1 col['z'] = ioffset; ioffset+=1 col['r'] = ioffset; ioffset+=1 col['rho']= ioffset; ioffset+=1 col['ax'] = ioffset; ioffset+=1 col['ay'] = ioffset; ioffset+=1 col['az'] = ioffset; ioffset+=1 col['u'] = ioffset; ioffset+=1 col['v'] = ioffset; ioffset+=1 col['w'] = ioffset; ioffset+=1 col['ox'] = ioffset; ioffset+=1 col['oy'] = ioffset; ioffset+=1 col['oz'] = ioffset; ioffset+=1 col['fx'] = ioffset; ioffset+=1 col['fy'] = ioffset; ioffset+=1 col['fz'] = ioffset; ioffset+=1 col['tx'] = ioffset; ioffset+=1 col['ty'] = ioffset; ioffset+=1 col['tz'] = ioffset; ioffset+=1 if idem>0: col['fxc'] = ioffset; ioffset+=1 col['fyc'] = ioffset; ioffset+=1 col['fzc'] = ioffset; ioffset+=1 col['txc'] = ioffset; ioffset+=1 col['tyc'] = ioffset; ioffset+=1 col['tzc'] = ioffset; ioffset+=1 if iscal>0: for ii in range(iscal): col['s'+str(ii+1)] = ioffset; ioffset+=1 col['q'+str(ii+1)] = ioffset; ioffset+=1 return col def grid_chunk(chunk,gridg): '''Returns the grid vectors for chunk (including ghost cells if they exist)''' import numpy xg,yg,zg = gridg # Shift indices so that they start from zero ib = chunk['ibeg']-1 jb = chunk['jbeg']-1 kb = chunk['kbeg']-1 nxl = chunk['nxl'] nyl = chunk['nyl'] nzl = chunk['nzl'] ighost = chunk['ighost'] if ighost: nxg = len(xg) nyg = len(yg) nzg = len(zg) dx = xg[1]-xg[0] dy = yg[1]-yg[0] dz = zg[1]-zg[0] xl = numpy.zeros(nxl+2) yl = numpy.zeros(nyl+2) zl = numpy.zeros(nzl+2) xl[0] = xg[ib]-dx xl[1:-1] = xg[ib:ib+nxl] xl[-1] = xg[ib+nxl-1]+dx yl[0] = yg[jb]-dy yl[1:-1] = yg[jb:jb+nyl] yl[-1] = yg[jb+nyl-1]+dy zl[0] = zg[kb]-dz zl[1:-1] = zg[kb:kb+nzl] zl[-1] = zg[kb+nzl-1]+dz else: xl = xg[ibeg:ib+nxl-1] yl = yg[jbeg:jb+nyl-1] zl = zg[kbeg:kb+nzl-1] return (xl,yl,zl) def grid_chunk_origin_spacing(chunk,gridg): '''Returns origin and spacing of regular grid of chunk data''' xg,yg,zg = gridg # Shift indices so that they start from zero ib = chunk['ibeg']-1 jb = chunk['jbeg']-1 kb = chunk['kbeg']-1 dx = xg[1]-xg[0] dy = yg[1]-yg[0] dz = zg[1]-zg[0] xo = xg[ib]-chunk['ighost']*dx yo = yg[jb]-chunk['ighost']*dy zo = zg[kb]-chunk['ighost']*dz return (xo,yo,zo),(dx,dy,dz)