suspendtools/ucf.py

902 lines
38 KiB
Python

import warnings
def __warning_format(message, category, filename, lineno, file=None, line=None):
return '%s:%s: %s:%s\n' % (filename, lineno, category.__name__, message)
warnings.formatwarning = __warning_format
class UCF:
"""UCF low-level access class"""
def __init__(self,file=None,verbosity=False,debug=False):
self.__initializeConstants()
self.__resetPublicProperties()
self.__resetPrivateProperties()
self.__resetCurrentStep()
self.__resetCurrentSet()
if file is not None:
self.open(file)
self.Debug = debug
self.Verbosity = verbosity
def open(self,file):
"""Opens an input stream for reading access. The variable 'file' can be of the following types:
str opens a file on disk whose path is specified by 'file'
bytes data which is already located in memory as a bytes or bytearray object (can be used for streams)
"""
from io import BytesIO
import numpy
# Check what class 'file' belongs to and treat it accordingly
if isinstance(file,str):
self.File = file
self.__fileobj = open(self.File,'rb')
self.__inputAvailable = True
elif isinstance(file,bytes):
self.File = 'byte-stream'
self.__fileobj = BytesIO(file)
self.__inputAvailable = True
# Determine file size
self.__fileobj.seek(0,2)
self.FileSize = self.__fileobj.tell()
self.__fileobj.seek(self.__fileBeg,0)
# Read the header of the file
self.__readHeaderFile()
# Scan through file to get the basic structure (steps/sets)
self.__timeStep = numpy.zeros(self.__scanBuffSize,dtype=numpy.float64)
self.__posStep = numpy.zeros(self.__scanBuffSize,dtype=numpy.int64)
self.__numSetPerStep = numpy.zeros(self.__scanBuffSize,dtype=numpy.int32)
istep = 0;
while self.__fileobj.tell()<self.FileSize:
self.__readHeaderStep();
self.__timeStep[istep] = self.__currentStepTime
self.__posStep[istep] = self.__fileobj.tell()
self.__numSetPerStep[istep] = self.__currentStepNumSet
istep = istep+1
if self.__currentStepSize==-1:
break
else:
self.__fileobj.seek(self.__currentStepSize,1)
nstep = istep
# Truncate buffered arrays
if nstep>self.__scanBuffSize:
warnings.warn('Buffer overflow detected: increase scanBuffSize.')
self.__timeStep = self.__timeStep[0:nstep]
self.__posStep = self.__posStep[0:nstep]
self.__numSetPerStep = self.__numSetPerStep[0:nstep]
# Set some internal variables
self.NumDataset = numpy.max(self.__numSetPerStep)
self.NumTimestep = nstep
self.__isFileHeaderWritten = True
self.__isStepHeaderWritten = True
def close(self):
"""Closes input file object"""
self.__fileobj.close()
self.__init__()
def addFileHeaderToBuffer(self,rank=0,rankijk=(0,0,0),ftype=1999):
"""Initialize a buffer to generate a new UCF file."""
self.__bufNumSteps = 0
self.__bufStep = []
self.__bufParams = []
self.__bufData = []
self.__bufRank = rank
self.__bufRankijk = rankijk
self.__bufFileType = ftype
self.__bufAvailable = True
def addStepToBuffer(self,step=1,time=0.0):
"""Add a new step to buffer."""
if not self.__bufAvailable:
raise BufferError('Buffer has not been initialized.')
if step>self.__bufNumSteps:
self.__bufStep.extend([None] for ii in range(self.__bufNumSteps,step))
self.__bufParams.extend([] for ii in range(self.__bufNumSteps,step))
self.__bufData.extend([] for ii in range(self.__bufNumSteps,step))
self.__bufNumSteps = step
self.__bufStep[step-1] = time
def addDatasetToBuffer(self,data,params=None,step=1,dset=1):
import numpy
"""Add a new dataset to specified step of buffer."""
if not self.__bufAvailable:
raise BufferError('Buffer has not been initialized.')
if step>self.__bufNumSteps:
raise ValueError('Requested step does not exist.')
if not hasattr(data,'dtype'):
raise TypeError('Cannot determine datatype of provided data')
if not hasattr(data,'nbytes'):
raise TypeError('Cannot determine number of bytes of provided data')
if not hasattr(data,'tobytes'):
raise TypeError('Cannot convert provided data to bytes')
if not hasattr(data,'shape'):
raise TypeError('Cannot determine shape of provided data')
if params is not None and not all(numpy.issubdtype(type(ii),numpy.integer) for ii in params):
raise TypeError('Parameters must be provided as integer')
nset = len(self.__bufData[step-1])
if dset>nset:
self.__bufParams[step-1].extend(None for ii in range(nset,dset))
self.__bufData[step-1].extend(None for ii in range(nset,dset))
self.__bufParams[step-1][dset-1] = params
self.__bufData[step-1][dset-1] = data
def copyFileHeaderToBuffer(self):
if not self.__inputAvailable:
raise IOError('No input file available')
self.addFileHeaderToBuffer(rank=self.IORank[0],rankijk=self.IORank[1:],ftype=self.__typeID)
def copyStepToBuffer(self,step_in,step_out=1,recursive=False,singlePrecision=False):
"""Copy a step from an input file to output buffer. If recursive copying is activated, all datasets
within the step will be copied, otherwise only the step header is copied without datasets.
If datasets are copied, the precision can be reduced using the 'singlePrecision' flag."""
if not self.__inputAvailable:
raise IOError('No input file available')
if not self.__bufAvailable:
raise BufferError('Buffer has not been initialized.')
self.addStepToBuffer(step=step_out,time=self.__timeStep[step_in-1])
if recursive:
for dset in range(0,self.__numSetPerStep[step_in-1]):
self.copyDatasetToBuffer(step_in,dset+1,step_out=step_out,dset_out=dset+1,singlePrecision=singlePrecision)
def copyDatasetToBuffer(self,step_in,dset_in,step_out=1,dset_out=1,singlePrecision=False):
"""Copy a dataset from an input file to output buffer at specified step. The precision of the
dataset can be reduced using the 'singlePrecision' flag."""
import numpy
if not self.__inputAvailable:
raise IOError('No input file available')
if not self.__bufAvailable:
raise BufferError('Buffer has not been initialized.')
(data,params) = self.readSet(step_in,dset_in)
if singlePrecision:
if data.dtype==numpy.dtype('float64'):
data = numpy.float32(data)
elif data.dtype==numpy.dtype('int64'):
data = numpy.int32(data)
self.addDatasetToBuffer(data,params=params,step=step_out,dset=dset_out)
def flushBuffer(self):
"""Returns the buffer as a bytes object, which can be written to a file using a file object."""
from time import time
from sys import stderr
from struct import pack
import numpy as np
# Sanity check and size gathering
sizeStep = []
sizeSet = [[]]
for step in range(0,self.__bufNumSteps):
nset = len(self.__bufData[step])
tmpSizeStep = 0
if nset==0:
warnings.warn('Step #{} in buffer does not contain any dataset.'.format(step+1),RuntimeWarning)
for dset in range(0,nset):
if self.__bufData[step][dset] is None:
raise ValueError('Step #{}, dataset #{} does not contain any data.'.format(step+1,dset+1))
if self.__bufParams[step][dset] is None:
warnings.warn('No parameters were provided for step #{}, dataset #{}.'.format(step+1,dset+1),RuntimeWarning)
nparam==0
else:
nparam = len(self.__bufParams[step][dset])
sizeSet[step].append(self.__bufData[step][dset].nbytes)
tmpSizeStep += (self.__nHeaderSet+nparam)*self.__nByteHeaderSet
tmpSizeStep += self.__bufData[step][dset].nbytes
sizeStep.append(tmpSizeStep)
# Create output buffer
obuff = b''
# Build file header
magicFile = self.__magicFile
fileVersion = 2
unixTime = int(time())
fileType = self.__bufFileType
rank = self.__bufRank
(iproc,jproc,kproc) = self.__bufRankijk
if self.Debug:
print('Write the following file header at {} bytes'.format(len(obuff)),file=stderr)
print((magicFile,fileVersion,unixTime,fileType,rank,iproc,jproc,kproc),file=stderr)
obuff += pack('qqqqqqqq',magicFile,fileVersion,unixTime,fileType,rank,iproc,jproc,kproc)
# Build step header
for step in range(0,self.__bufNumSteps):
if self.Verbosity:
print('Adding step #{} to output buffer'.format(step+1),file=stderr)
magicStep = self.__magicStep
stepBytes = sizeStep[step]
stepTime = self.__bufStep[step]
nset = len(self.__bufData[step])
if self.Debug:
print('Write the following step header at {} bytes'.format(len(obuff)),file=stderr)
print((magicStep,stepBytes,stepTime,nset),file=stderr)
obuff += pack('qqdq',magicStep,stepBytes,stepTime,nset)
# Build dataset headers + attach data
for dset in range(0,nset):
if self.Verbosity:
print(' dataset #{}'.format(dset+1),file=stderr)
magicSet = self.__magicSet
setSize = sizeSet[step][dset]
nptype = self.__bufData[step][dset].dtype
if nptype==numpy.dtype('int32'):
dtEncoded = 11
elif nptype==numpy.dtype('int64'):
dtEncoded = 12
elif nptype==numpy.dtype('float32'):
dtEncoded = 21
elif nptype==numpy.dtype('float64'):
dtEncoded = 22
else:
raise TypeError('Data at step #{}, dataset #{} has an invalid datatype.'.format(step+1,dset+1))
if self.__bufParams[step][dset] is None:
nparam = 0
else:
nparam = len(self.__bufParams[step][dset])
if self.Debug:
print('Write the following set header at {} bytes'.format(len(obuff)),file=stderr)
print((magicSet,setSize,dtEncoded,nparam),file=stderr)
print('with parameters:',file=stderr)
print(self.__bufParams[step][dset],file=stderr)
obuff += pack('qqqq',magicSet,setSize,dtEncoded,nparam)
if nparam!=0:
obuff += pack(nparam*'q',*self.__bufParams[step][dset])
obuff += self.__bufData[step][dset].tobytes('F')
# Return bytes
return obuff
def getTime(self,step=1):
assert(step-1<self.NumTimestep)
return self.__timeStep[step-1]
def readSet(self,step=1,dset=1,memmap=False):
"""Read a dataset from input file. If 'memmap' is activated, the file will only be read partially on demand."""
import numpy
from io import BytesIO
if not self.__inputAvailable:
raise IOError('No input file available')
self.__fileobj.seek(self.__findSet(step,dset),0)
self.__readHeaderSet();
params = self.__currentSetParams
if isinstance(self.__fileobj,BytesIO):
data = numpy.frombuffer(self.__fileobj.read(self.__currentSetSize),dtype=self.__currentSetDatatype)
else:
data = numpy.fromfile(self.__fileobj,dtype=self.__currentSetDatatype,count=self.__currentSetNumElements)
return (data,params)
def __readHeaderFile(self):
from datetime import datetime
from sys import stderr
from struct import unpack
import numpy
self.__fileobj.seek(self.__fileBeg,0);
# Determine endianess
mfmt = "<>"
buff = self.__fileobj.read(8)
for fmt in mfmt:
currentMagic = unpack("%sq"%fmt,buff)[0]
if currentMagic==self.__magicFile:
break
if currentMagic!=self.__magicFile:
raise ValueError('Magic mismatch: invalid file header. {}'.format(currentMagic))
self.Endian = fmt
# Read header
self.__fileobj.seek(self.__fileBeg,0);
buff = self.__fileobj.read(self.__nHeaderFile*8)
header = unpack("%s%dq"%(self.Endian,8),buff)
if self.Debug:
print('Read the following file header at 0 bytes',file=stderr)
print(header,file=stderr)
# Parse version
self.__versionMajor = numpy.floor(header[1]/self.__factorMajor)
self.__versionMinor = numpy.floor(numpy.mod(header[1],self.__factorMajor)/self.__factorMinor)
self.__versionPatch = numpy.floor(numpy.mod(header[1],self.__factorMinor)/self.__factorPatch)
self.CodeVersion = "%d.%d.%d" %(self.__versionMajor,self.__versionMinor,self.__versionPatch)
self.UCFVersion = numpy.mod(header[1],self.__factorPatch);
# Parse time stamp (UTC)
self.__creationTimeUnix = header[2];
self.CreationTime = datetime.utcfromtimestamp(self.__creationTimeUnix).strftime('%Y-%m-%d %H:%M:%S')
#Parse file type
self.__typeID = header[3];
typeDict = {
0: "grid",
10: "processor grid",
1000: "fluid snapshot",
1010: "scalar snapshot",
1999: "matlab field data",
2000: "particle snapshot",
2001: "particle append",
2011: "particle lagrange",
2021: "particle balancing",
2999: "matlab particle data",
3000: "statistics fluid",
3010: "statistics fluid pure",
3020: "statistics scalar"
}
self.Type = typeDict.get(self.__typeID,"unkmown")
# Parse file class
classDict = {
1: "field",
2: "particle",
3: "statistics"
}
self.Class = classDict.get(numpy.floor(self.__typeID/self.__factorTypeIDClass),"unknown")
# Parse IO rank
self.IORank = header[4:8]
def __readHeaderStep(self):
from sys import stderr
from struct import unpack
# Read and parse
self.__currentStepPosHeader = self.__fileobj.tell()
buff = self.__fileobj.read(self.__nHeaderStep*8)
header = unpack("%s%dq"%(self.Endian,self.__nHeaderStep),buff)
self.__currentStepPosData = self.__fileobj.tell()
currentMagic = header[0]
self.__currentStepSize = header[1]
self.__currentStepTime = unpack("%sd"%self.Endian,buff[16:24])[0]
self.__currentStepNumSet = header[3]
if self.Debug:
print("Read the following step header at %d bytes" % self.__currentStepPosHeader,file=stderr)
print("%d,%d,%f,%d" % (currentMagic,self.__currentStepSize,self.__currentStepTime,self.__currentStepNumSet),file=stderr)
# Check if magic is correct
if currentMagic!=self.__magicStep:
raise ValueError("Magic mismatch: invalid step header. %d" % currentMagic);
def __readHeaderSet(self):
from sys import stderr
from struct import unpack
import numpy
# Read and parse
self.__currentSetPosHeader = self.__fileobj.tell()
buff = self.__fileobj.read(self.__nHeaderSet*8)
header = unpack("%s%dq"%(self.Endian,self.__nHeaderSet),buff)
self.__currentSetPosData = self.__fileobj.tell()
currentMagic = header[0]
self.__currentSetSize = header[1]
self.__currentSetDatatypeNumeric = header[2]
dtSizeDict = {
11: 4,
12: 8,
21: 4,
22: 8
}
dtNameDict = {
11: "%si4" % self.Endian,
12: "%si8" % self.Endian,
21: "%sf4" % self.Endian,
22: "%sf8" % self.Endian
}
self.__currentSetSizeof = dtSizeDict[self.__currentSetDatatypeNumeric]
self.__currentSetDatatype = dtNameDict[self.__currentSetDatatypeNumeric]
self.__currentSetNumParams = header[3]
self.__currentSetNumElements = numpy.around(self.__currentSetSize/self.__currentSetSizeof).astype(numpy.int32)
if self.Debug:
print("Read the following set header at %d bytes" % self.__currentSetPosHeader,file=stderr)
print(header,file=stderr)
# Check if magic is correct
if currentMagic!=self.__magicSet:
raise ValueError("Magic mismatch: invalid dataset header. %d" % currentMagic)
# Read variable number of parameters
buff = self.__fileobj.read(self.__currentSetNumParams*8)
self.__currentSetParams = unpack("%s%dq"%(self.Endian,self.__currentSetNumParams),buff)
if self.Debug:
print('with parameters:',file=stderr)
print(self.__currentSetParams,file=stderr)
def __findSet(self,tstep,dset):
from sys import stderr
# Check input
if tstep>self.NumTimestep:
raise ValueError("Out of bounds: timestep. %d, %d" %(tstep,self.NumTimestep))
if dset>self.__numSetPerStep[tstep-1]:
raise ValueError("Out of bounds: dataset. %d, %d" % (dset,self.NumDataset))
# Navigate to correct set
self.__fileobj.seek(self.__posStep[tstep-1],0)
for iset in range(0,dset-1):
self.__readHeaderSet()
self.__fileobj.seek(self.__currentSetSize,1)
posHeader = self.__fileobj.tell()
if self.Debug:
print("Found step #%d, set #%d at position %d" % (tstep,dset,posHeader),file=stderr)
return posHeader
def __initializeConstants(self):
self.__magicFile = 81985529216486895;
self.__magicStep = 11944304052957;
self.__magicSet = 240217520921210;
self.__nHeaderFile = 8;
self.__nHeaderStep = 4;
self.__nHeaderSet = 4;
self.__nByteHeaderFile = 8;
self.__nByteHeaderStep = 8;
self.__nByteHeaderSet = 8;
self.__nSetParamsField = 10;
self.__nSetParamsParticle = 16;
self.__factorMajor = 1000000000;
self.__factorMinor = 1000000;
self.__factorPatch = 1000;
self.__factorTypeIDClass = 1000;
self.__factorTypeIDKind = 10;
self.__typeIDmatlabField = 1999;
self.__typeIDmatlabParticle = 2999;
self.__scanBuffSize = 131072;
def __resetPublicProperties(self):
self.File = '' # file name
self.Type = '' # file type
self.Class = '' # file class
self.Endian = '' # endianess
self.CodeVersion = '' # version of the simulation code
self.UCFVersion = '' # version of the data format ("unified container format")
self.NumDataset = 0 # maximum number of datasets in this file (over all time steps)
self.NumTimestep = 0 # number of time steps in this file
self.FileSize = 0 # file size
self.CreationTime = 0 # time of creation
self.IOMode = '' # file opened in read-only or read-write mode?
self.IORank = 0 # rank of processor + col,row,pln
self.Verbosity = 0 # verbose output?
self.Debug = 0 # debug information?
def __resetPrivateProperties(self):
self.__fileobj = None
self.__fileBeg = 0
self.__typeID = 0
self.__creationTimeUnix = ''
self.__versionMajor = 0
self.__versionMinor = 0
self.__versionPatch = 0
self.__versionFile = 0
self.__posStep = 0
self.__numSetPerStep = 0
self.__timeStep = 0
self.__inputAvailable = False
self.__bufAvailable = False
def __resetCurrentStep(self):
self.__currentStep = 0
self.__currentStepPosHeader = 0
self.__currentStepPosData = 0
self.__currentStepSize = 0
self.__currentStepTime = 0
self.__currentStepNumSet = 0
def __resetCurrentSet(self):
self.__currentSet = 0
self.__currentSetPosHeader = 0
self.__currentSetPosData = 0
self.__currentSetSize = 0
self.__currentSetDatatype = 0
self.__currentSetDatatypeNumeric = 0
self.__currentSetSizeof = 0
self.__currentSetNumParams = 0
self.__currentSetParams = 0
self.__currentSetNumElements = 0
class UCFTarFile:
'''Superclass for instances of ucf.tar files.'''
def __init__(self,file_tar,file_index=None):
self.handler = Ustar(file_tar,file_index)
self.verbose = False
self.debug = False
self._buffer_grid = None
self._buffer_proc = None
self._buffer_params = None
def read_grid(self,key=None):
self._initialize_buffer_grid()
if key is not None:
gidx = self.gidx_from_key(key)
return self._buffer_grid[gidx]
return self._buffer_grid
def read_procgrid(self,key=None):
self._initialize_buffer_proc()
if key is not None:
gidx = self.gidx_from_key(key)
return self._buffer_proc[gidx]
return self._buffer_proc
def read_parameters(self):
self._initialize_buffer_params()
return self._buffer_params
def bounds(self,axis=None):
params = self.read_parameters()
keys = ('a','b','c','d','e','f')
if axis is None:
return tuple(params.getfloat('geometry',key) for key in keys)
else:
assert axis<3, "'axis' must be smaller than 3"
return tuple(params.getfloat('geometry',key) for key in keys[2*axis:2*(axis+1)])
def periodicity(self,axis=None):
params = self.read_parameters()
keys = ('xperiodic','yperiodic','zperiodic')
if axis is None:
return tuple(params.getboolean('geometry',key) for key in keys)
else:
assert axis<3, "'axis' must be smaller than 3"
return params.getboolean('geometry',keys[axis])
def period(self,axis=None):
if axis is None:
return tuple(self.period(axis=ii) for ii in range(0,3))
else:
assert axis<3, "'axis' must be smaller than 3"
params = self.read_parameters()
key_bound = ('b','d','f')
key_period = ('xperiodic','yperiodic','zperiodic')
period = params.getfloat('geometry',key_bound[axis])
isPeriodic = params.getboolean('geometry',key_period[axis])
return period if isPeriodic else None
def gidx_from_key(self,key):
if key[0]=='u': return 0
elif key[0]=='v': return 1
elif key[0]=='w': return 2
elif key[0]=='p': return 3
elif key[0]=='s': return 4
else: raise ValueError("Invalid value of 'key'.")
def _initialize_buffer_grid(self):
if self._buffer_grid is None:
data = self.handler.read('grid.bin')
self._buffer_grid = read_grid(data,verbosity=self.verbose,debug=self.debug)
return
def _initialize_buffer_proc(self):
if self._buffer_proc is None:
data = self.handler.read('proc.bin')
self._buffer_proc = read_procgrid(data,verbosity=self.verbose,debug=self.debug)
return
def _initialize_buffer_params(self):
if self._buffer_params is None:
data = self.handler.read('parameters.asc')
self._buffer_params = read_parameters(data)
return
class UCFSnapshot(UCFTarFile):
'''Handles a snapshot.ucf.tar file.'''
def __init__(self,file_tar,file_index=None):
super(UCFSnapshot,self).__init__(file_tar,file_index=file_index)
file_name_string = '\t'.join(self.handler.file_name)
if 'uvwp' in file_name_string:
self.type = 'uvwp'
elif 'scal' in file_name_string:
self.type = 'scal'
else:
raise ValueError("Archive does not contain 'uvwp' nor 'scal' files.")
self._nproc = sum(self.type in s for s in self.handler.file_name)
self._nprocijk = None
def read_particles(self):
data = self.handler.read('particles.bin')
return read_particles(data,step=1,verbosity=self.verbose,debug=self.debug)
def read_chunk(self,rank,dset=None,keep_ghost=True):
file_target = self.type+'.{:05d}'.format(rank)
data = self.handler.read(file_target)
return read_chunk(data,step=1,dset=dset,keep_ghost=keep_ghost,
verbosity=self.verbose,debug=self.debug)
def field_chunk(self,rank,key,keep_ghost=True):
'''Returns chunk data as Field3d object.'''
from .field import Field3d
# Support a list/tuple of keys: this is not the most efficient
# implementation but convenient
if isinstance(key,(list,tuple)):
return [self.field_chunk(rank,k,keep_ghost=keep_ghost)
for k in key]
dset = self.dset_from_key(key)
chunk = self.read_chunk(rank,dset=dset,keep_ghost=keep_ghost)
grid = self.read_grid(key=key)
dx,dy,dz = (grid[0][1]-grid[0][0],
grid[1][1]-grid[1][0],
grid[2][1]-grid[2][0])
xo,yo,zo = (grid[0][chunk['ibeg']]-chunk['ighost']*dx,
grid[1][chunk['jbeg']]-chunk['ighost']*dy,
grid[2][chunk['kbeg']]-chunk['ighost']*dz)
# Per convention periodicity requires ghost cells for Field3d
if keep_ghost:
period = tuple(self.period(ii) if self.nproc(axis=ii)==1
else None for ii in range(0,3))
else:
period = (None,None,None)
return Field3d(chunk['data'],(xo,yo,zo),(dx,dy,dz),period)
def field_complete(self):
return
def particles(self,select_col=None):
'''Returns particle data as Particles object.'''
from .particle import Particles
pp,col,time = self.read_particles()
return Particles.from_array(pp,col,time,self.period(),select_col=select_col)
def nproc(self,axis=None):
if axis is None:
return self._nproc
else:
assert axis<3, "'axis' must be smaller than 3"
self._initialize_nprocijk()
return self._nprocijk[axis]
def ijk_from_rank(self,rank):
assert rank<self.nproc(), "'rank' exceeds highest rank."
ijk = (
rank//(self.nproc(axis=1)*self.nproc(axis=2)),
(rank//self.nproc(axis=2))%self.nproc(axis=1),
rank%self.nproc(axis=2)
)
return ijk
def rank_from_ijk(self,ijk):
assert len(ijk)==3, "len(ijk) must be 3."
assert all([ijk[ii]<self.nproc(axis=ii) for ii in range(0,3)])
return ijk[0]*self.nproc(axis=1)*self.nproc(axis=2)+ijk[1]*self.nproc(axis=2)+ijk[2]
def dset_from_key(self,key):
assert self.has_key(key), "Snapshot does not contain requested key."
if key[0]=='u': return 0
elif key[0]=='v': return 1
elif key[0]=='w': return 2
elif key[0]=='p': return 3
elif key[0]=='s': return int(key[1:])
else: raise ValueError("Invalid value of 'key'.")
def has_key(self,key):
if key in ('u','v','w','p') and self.type=='uvwp':
return True
elif key[0]=='s' and self.type=='scal':
return True
else:
return False
def _initialize_nprocijk(self):
self._initialize_buffer_proc()
self._nprocijk = tuple([len(self._buffer_proc[2*ii]) for ii in range(0,3)])
class UCFAuxiliary(UCFTarFile):
'''Handles a auxiliary.ucf.tar file.'''
def __init__(self,file_tar,file_index=None):
super(UCFAuxiliary,self).__init__(file_tar,file_index=file_index)
# TBD: support statistics files
def read_particles(self):
data = self.handler.read('append_particles.bin')
return read_particles(data,step=None,verbosity=self.verbose,debug=self.debug)
def trajectories(self,select_col=None):
from .particle import Particles,Trajectories
pp,col,time = self.read_particles()
part = Particles.from_array(pp[:,:,0],col,time[0],self.period(),select_col=select_col)
traj = Trajectories(part,copy_particles=False)
for ii in range(1,len(time)):
traj += Particles.from_array(pp[:,:,ii],col,time[ii],self.period(),select_col=select_col)
return traj
class Ustar:
'''Minimalistic ustar implementation meant to be used with ucftar files'''
def __init__(self,file_tar,file_index=None):
self.path = file_tar
self.num_files = 0
self.file_name = []
self.file_size = []
self.file_offset = []
if file_index:
self.import_index_file(file_index)
else:
self.import_tar_file()
def __del__(self):
return
def import_tar_file(self):
'''Imports information on tar archive from scanning it'''
from tarfile import TarFile,USTAR_FORMAT
from struct import unpack
with open(self.path,'rb') as f:
tarinfos = TarFile(fileobj=f,format=USTAR_FORMAT).getmembers()
self.num_files = 0
for tarinfo in tarinfos:
self.num_files += 1
self.file_name.append(tarinfo.name)
self.file_offset.append(tarinfo.offset_data)
self.file_size.append(tarinfo.size)
return
def import_index_file(self,file_index):
'''Imports information on tar archive from .taridx file'''
from struct import unpack
with open(file_index,'rb') as f:
self.num_files = unpack('<q',f.read(8))[0]
self.file_name = []
self.file_size = []
self.file_offset = []
for ifile in range(0,self.num_files):
self.file_name.append(f.read(256).decode().strip().rstrip('\0'))
self.file_offset.append(unpack('<q',f.read(8))[0])
self.file_size.append(unpack('<q',f.read(8))[0])
return
def read(self,file):
'''Reads a file from the archive into memory. Data is returned as bytes.'''
idx = self.file_name.index(file)
with open(self.path,'rb') as f:
f.seek(self.file_offset[idx])
return f.read(self.file_size[idx])
def read_grid(file,verbosity=False,debug=False):
obj = UCF(file=file,verbosity=verbosity,debug=debug)
output = []
for iset in range(0,obj.NumDataset):
(data,params) = obj.readSet(step=1,dset=iset+1)
nx = params[0]
ny = params[1]
nz = params[2]
output.append((data[0:nx],data[nx:nx+ny],data[nx+ny:nx+ny+nz]))
if obj.NumDataset<5:
output.extend(output[-1:])
obj.close()
return output
def read_procgrid(file,verbosity=False,debug=False):
obj = UCF(file=file,verbosity=verbosity,debug=debug)
output = []
for iset in range(0,obj.NumDataset):
(data,params) = obj.readSet(step=1,dset=iset+1)
nxp = params[0]
nyp = params[1]
nzp = params[2]
output.append((
data[0:nxp], # ibeg
data[nxp:2*nxp], # iend
data[2*nxp:2*nxp+nyp], # jbeg
data[2*nxp+nyp:2*nxp+2*nyp], # jend
data[2*nxp+2*nyp:2*nxp+2*nyp+nzp], # kbeg
data[2*nxp+2*nyp+nzp:2*nxp+2*nyp*2*nzp] # kend
))
#if obj.UCFVersion<2:
if obj.NumDataset<5:
output.extend(output[-1:])
obj.close()
return output
def read_chunk(file,step=1,dset=None,keep_ghost=True,verbosity=False,debug=False):
obj = UCF(file=file,verbosity=verbosity,debug=debug)
if dset is None:
dset = range(1,obj.NumDataset+1) # fix that maybe later (this is maximum over all timesteps)
returnList = True
elif isinstance(dset,(list,tuple)):
returnList = True
else:
dset = [dset]
returnList = False
output = []
for ii in dset:
tmp = dict()
(data,params) = obj.readSet(step=step,dset=ii)
tmp['ighost'] = params[0]
tmp['ibeg'] = params[1]
tmp['jbeg'] = params[2]
tmp['kbeg'] = params[3]
tmp['nxl'] = params[4]
tmp['nyl'] = params[5]
tmp['nzl'] = params[6]
tmp['nx'] = params[7]
tmp['ny'] = params[8]
tmp['nz'] = params[9]
tmp['data'] = data.reshape((tmp['nxl']+2*tmp['ighost'],
tmp['nyl']+2*tmp['ighost'],
tmp['nzl']+2*tmp['ighost']),
order='F')
tmp['rank'] = obj.IORank[0]
tmp['rankijk']= obj.IORank[1:]
if not keep_ghost and tmp['ighost']:
tmp['data'] = tmp['data'][1:-1,1:-1,1:-1]
tmp['ighost'] = 0
output.append(tmp)
obj.close()
if not returnList:
return output[0]
return output
def read_particles(file,step=None,verbosity=False,debug=False):
import numpy
obj = UCF(file=file,verbosity=verbosity,debug=debug)
if not isinstance(step,list):
if step is None:
step = range(1,obj.NumTimestep+1)
else:
step = [step]
# The output will be the following:
# 1) numpy array with dimension (ncol,np,ntime)
# 2) dictionary which specifies the columns
# We read the data step by step in a list, which is then converted to a 3D array
pp = []
time = []
for ii in step:
(data,params) = obj.readSet(step=ii,dset=1)
npart = params[0]
ncol = params[1]
ncol_rank = params[2]
ncol_hybrid = params[3]
ncol_dem = params[4]
ncol_scalar = params[5]
nscal = ncol_scalar//2
pp.append(data.reshape((ncol,npart),order='F'))
time.append(obj.getTime(ii))
# Close UCF obeject
obj.close()
# Convert list of 2D arrays to 3D array
pp = numpy.stack(pp,axis=2)
time = numpy.array(time)
# Create the dictionary
col = colmap_from_flags(ncol_rank,ncol_hybrid,ncol_dem,nscal)
# Return result
return (pp,col,time)
def read_parameters(file):
import configparser
params = configparser.ConfigParser()
if isinstance(file,str):
with open(file,'r') as f:
params.read(f)
elif isinstance(file,bytes):
params.read_string(file.decode('ascii'))
return params
def colmap_from_flags(irank,ihybrid,idem,iscal):
'''Creates a dictionary which specifies the columns of a particle array.'''
col = {}
ioffset = 0
if irank>0:
col['rank'] = ioffset; ioffset+=1
if ihybrid>0:
col['id'] = ioffset; ioffset+=1
col['x'] = ioffset; ioffset+=1
col['y'] = ioffset; ioffset+=1
col['z'] = ioffset; ioffset+=1
col['r'] = ioffset; ioffset+=1
col['rho']= ioffset; ioffset+=1
col['ax'] = ioffset; ioffset+=1
col['ay'] = ioffset; ioffset+=1
col['az'] = ioffset; ioffset+=1
col['u'] = ioffset; ioffset+=1
col['v'] = ioffset; ioffset+=1
col['w'] = ioffset; ioffset+=1
col['ox'] = ioffset; ioffset+=1
col['oy'] = ioffset; ioffset+=1
col['oz'] = ioffset; ioffset+=1
col['fx'] = ioffset; ioffset+=1
col['fy'] = ioffset; ioffset+=1
col['fz'] = ioffset; ioffset+=1
col['tx'] = ioffset; ioffset+=1
col['ty'] = ioffset; ioffset+=1
col['tz'] = ioffset; ioffset+=1
if idem>0:
col['fxc'] = ioffset; ioffset+=1
col['fyc'] = ioffset; ioffset+=1
col['fzc'] = ioffset; ioffset+=1
col['txc'] = ioffset; ioffset+=1
col['tyc'] = ioffset; ioffset+=1
col['tzc'] = ioffset; ioffset+=1
if iscal>0:
for ii in range(0,iscal):
col['s'+str(ii)] = ioffset; ioffset+=1
col['q'+str(ii)] = ioffset; ioffset+=1
return col
def grid_chunk(chunk,gridg):
'''Returns the grid vectors for chunk (including ghost cells if they exist)'''
import numpy
xg,yg,zg = gridg
# Shift indices so that they start from zero
ib = chunk['ibeg']-1
jb = chunk['jbeg']-1
kb = chunk['kbeg']-1
nxl = chunk['nxl']
nyl = chunk['nyl']
nzl = chunk['nzl']
ighost = chunk['ighost']
if ighost:
nxg = len(xg)
nyg = len(yg)
nzg = len(zg)
dx = xg[1]-xg[0]
dy = yg[1]-yg[0]
dz = zg[1]-zg[0]
xl = numpy.zeros(nxl+2)
yl = numpy.zeros(nyl+2)
zl = numpy.zeros(nzl+2)
xl[0] = xg[ib]-dx
xl[1:-1] = xg[ib:ib+nxl]
xl[-1] = xg[ib+nxl-1]+dx
yl[0] = yg[jb]-dy
yl[1:-1] = yg[jb:jb+nyl]
yl[-1] = yg[jb+nyl-1]+dy
zl[0] = zg[kb]-dz
zl[1:-1] = zg[kb:kb+nzl]
zl[-1] = zg[kb+nzl-1]+dz
else:
xl = xg[ibeg:ib+nxl-1]
yl = yg[jbeg:jb+nyl-1]
zl = zg[kbeg:kb+nzl-1]
return (xl,yl,zl)
def grid_chunk_origin_spacing(chunk,gridg):
'''Returns origin and spacing of regular grid of chunk data'''
xg,yg,zg = gridg
# Shift indices so that they start from zero
ib = chunk['ibeg']-1
jb = chunk['jbeg']-1
kb = chunk['kbeg']-1
dx = xg[1]-xg[0]
dy = yg[1]-yg[0]
dz = zg[1]-zg[0]
xo = xg[ib]-chunk['ighost']*dx
yo = yg[jb]-chunk['ighost']*dy
zo = zg[kb]-chunk['ighost']*dz
return (xo,yo,zo),(dx,dy,dz)