do not create file if it exists

This commit is contained in:
Michael Stumpf (ifhcluster) 2019-09-11 14:26:10 +02:00
parent 73bec97625
commit bd31202047
1 changed files with 496 additions and 0 deletions

496
python/ratarindex.py Executable file
View File

@ -0,0 +1,496 @@
#!/usr/bin/env python3
import os, re, sys, stat, tarfile, argparse
import itertools
from collections import namedtuple
from timeit import default_timer as timer
printDebug = 1
def overrides( parentClass ):
def overrider( method ):
assert( method.__name__ in dir( parentClass ) )
return method
return overrider
FileInfo = namedtuple( "FileInfo", "offset size mtime mode type linkname uid gid istar" )
class IndexedTar( object ):
"""
This class reads once through a whole TAR archive and stores TAR file offsets for all packed files
in an index to support fast seeking to a given file.
"""
__slots__ = (
'tarFileName',
'fileIndex',
'mountRecursively',
'cacheFolder',
'possibleIndexFilePaths',
'indexFileName',
)
# these allowed backends also double as extensions for the index file to look for
availableSerializationBackends = [
'pickle',
'pickle2',
'pickle3',
'custom',
'cbor',
'msgpack',
'rapidjson',
'ujson',
'simplejson'
]
availableCompressions = [
'', # no compression
'lz4',
'gz',
]
def __init__( self, pathToTar = None, fileObject = None, writeIndex = False,
recursive = False, serializationBackend = None ):
self.tarFileName = os.path.normpath( pathToTar )
# Stores the file hierarchy in a dictionary with keys being either the file and containing file metainformation
# or keys being a folder name and containing recursively defined dictionary.
self.fileIndex = {}
self.mountRecursively = recursive
self.cacheFolder = os.path.expanduser( "~/.ratarmount" ) # will be used for storing if current path is read-only
self.possibleIndexFilePaths = [
self.tarFileName + ".index",
self.cacheFolder + "/" + self.tarFileName.replace( "/", "_" ) + ".index"
]
if serializationBackend not in self.supportedIndexExtensions():
serializationBackend = 'custom'
print( "[Warning] Serialization backend not supported. Defaulting to '" + serializationBackend + "'!" )
# this is the actual index file, which will be used in the end, and by default
self.indexFileName = self.possibleIndexFilePaths[0] + "." + serializationBackend
if fileObject is not None:
if writeIndex:
print( "Can't write out index for file object input. Ignoring this option." )
self.createIndex( fileObject )
else:
# first try loading the index for the given serialization backend
if serializationBackend is not None:
for indexPath in self.possibleIndexFilePaths:
indexPathWitExt = indexPath + "." + serializationBackend
if self.indexIsLoaded():
break
if os.path.isfile( indexPathWitExt ):
if os.path.getsize( indexPathWitExt ) == 0:
os.remove( indexPathWitExt )
else:
writeIndex = False
if not self.indexIsLoaded():
with open( self.tarFileName, 'rb' ) as file:
self.createIndex( file )
if writeIndex:
for indexPath in self.possibleIndexFilePaths:
indexPath += "." + serializationBackend
try:
folder = os.path.dirname( indexPath )
if not os.path.exists( folder ):
os.mkdir( folder )
f = open( indexPath, 'wb' )
f.close()
os.remove( indexPath )
self.indexFileName = indexPath
break
except IOError:
if printDebug >= 2:
print( "Could not create file:", indexPath )
try:
self.writeIndex( self.indexFileName )
except IOError:
print( "[Info] Could not write TAR index to file." )
@staticmethod
def supportedIndexExtensions():
return [ '.'.join( combination ).strip( '.' )
for combination in itertools.product( IndexedTar.availableSerializationBackends,
IndexedTar.availableCompressions ) ]
@staticmethod
def dump( toDump, file ):
if isinstance( toDump, dict ):
file.write( b'\x01' ) # magic code meaning "start dictionary object"
for key, value in toDump.items():
file.write( b'\x03' ) # magic code meaning "serialized key value pair"
IndexedTar.dump( key, file )
IndexedTar.dump( value, file )
file.write( b'\x02' ) # magic code meaning "close dictionary object"
elif isinstance( toDump, FileInfo ):
import msgpack
serialized = msgpack.dumps( toDump )
file.write( b'\x05' ) # magic code meaning "msgpack object"
file.write( len( serialized ).to_bytes( 4, byteorder = 'little' ) )
file.write( serialized )
elif isinstance( toDump, str ):
serialized = toDump.encode()
file.write( b'\x04' ) # magic code meaning "string object"
file.write( len( serialized ).to_bytes( 4, byteorder = 'little' ) )
file.write( serialized )
else:
print( "Ignoring unsupported type to write:", toDump )
@staticmethod
def load( file ):
elementType = file.read( 1 )
if elementType == b'\x01': # start of dictionary
result = {}
dictElementType = file.read( 1 )
while len( dictElementType ) != 0:
if dictElementType == b'\x02':
break
elif dictElementType == b'\x03':
import msgpack
keyType = file.read( 1 )
if keyType != b'\x04': # key must be string object
raise Exception( 'Custom TAR index loader: invalid file format' )
size = int.from_bytes( file.read( 4 ), byteorder = 'little' )
key = file.read( size ).decode()
valueType = file.read( 1 )
if valueType == b'\x05': # msgpack object
size = int.from_bytes( file.read( 4 ), byteorder = 'little' )
serialized = file.read( size )
value = FileInfo( *msgpack.loads( serialized ) )
elif valueType == b'\x01': # dict object
import io
file.seek( -1, io.SEEK_CUR )
value = IndexedTar.load( file )
else:
raise Exception( 'Custom TAR index loader: invalid file format ' +
'(expected msgpack or dict but got' +
str( int.from_bytes( valueType, byteorder = 'little' ) ) + ')' )
result[key] = value
else:
raise Exception( 'Custom TAR index loader: invalid file format ' +
'(expected end-of-dict or key-value pair but got' +
str( int.from_bytes( dictElementType, byteorder = 'little' ) ) + ')' )
dictElementType = file.read( 1 )
return result
else:
raise Exception( 'Custom TAR index loader: invalid file format' )
def getFileInfo( self, path, listDir = False ):
# go down file hierarchy tree along the given path
p = self.fileIndex
for name in os.path.normpath( path ).split( os.sep ):
if not name:
continue
if not name in p:
return
p = p[name]
def repackDeserializedNamedTuple( p ):
if isinstance( p, list ) and len( p ) == len( FileInfo._fields ):
return FileInfo( *p )
elif isinstance( p, dict ) and len( p ) == len( FileInfo._fields ) and \
'uid' in p and isinstance( p['uid'], int ):
# a normal directory dict must only have dict or FileInfo values, so if the value to the 'uid'
# key is an actual int, then it is sure it is a deserialized FileInfo object and not a file named 'uid'
print( "P ===", p )
print( "FileInfo ===", FileInfo( **p ) )
return FileInfo( **p )
return p
p = repackDeserializedNamedTuple( p )
# if the directory contents are not to be printed and it is a directory, return the "file" info of "."
if not listDir and isinstance( p, dict ):
if '.' in p:
p = p['.']
else:
return FileInfo(
offset = 0, # not necessary for directory anyways
size = 1, # might be misleading / non-conform
mtime = 0,
mode = 0o555 | stat.S_IFDIR,
type = tarfile.DIRTYPE,
linkname = "",
uid = 0,
gid = 0,
istar = False
)
return repackDeserializedNamedTuple( p )
def isDir( self, path ):
return True if isinstance( self.getFileInfo( path, listDir = True ), dict ) else False
def exists( self, path ):
path = os.path.normpath( path )
return self.isDir( path ) or isinstance( self.getFileInfo( path ), FileInfo )
def setFileInfo( self, path, fileInfo ):
"""
path: the full path to the file with leading slash (/) for which to set the file info
"""
assert( isinstance( fileInfo, FileInfo ) )
pathHierarchy = os.path.normpath( path ).split( os.sep )
if len( pathHierarchy ) == 0:
return
# go down file hierarchy tree along the given path
p = self.fileIndex
for name in pathHierarchy[:-1]:
if not name:
continue
assert( isinstance( p, dict ) )
p = p.setdefault( name, {} )
# create a new key in the dictionary of the parent folder
p.update( { pathHierarchy[-1] : fileInfo } )
def setDirInfo( self, path, dirInfo, dirContents = {} ):
"""
path: the full path to the file with leading slash (/) for which to set the folder info
"""
assert( isinstance( dirInfo, FileInfo ) )
assert( isinstance( dirContents, dict ) )
pathHierarchy = os.path.normpath( path ).strip( os.sep ).split( os.sep )
if len( pathHierarchy ) == 0:
return
# go down file hierarchy tree along the given path
p = self.fileIndex
for name in pathHierarchy[:-1]:
if not name:
continue
assert( isinstance( p, dict ) )
p = p.setdefault( name, {} )
# create a new key in the dictionary of the parent folder
p.update( { pathHierarchy[-1] : dirContents } )
p[pathHierarchy[-1]].update( { '.' : dirInfo } )
def createIndex( self, fileObject ):
if printDebug >= 1:
print( "Creating offset dictionary for", "<file object>" if self.tarFileName is None else self.tarFileName, "..." )
t0 = timer()
self.fileIndex = {}
try:
loadedTarFile = tarfile.open( fileobj = fileObject, mode = 'r:' )
except tarfile.ReadError as exception:
print( "Archive can't be opened! This might happen for compressed TAR archives, which currently is not supported." )
raise exception
for tarInfo in loadedTarFile:
mode = tarInfo.mode
if tarInfo.isdir() : mode |= stat.S_IFDIR
if tarInfo.isfile(): mode |= stat.S_IFREG
if tarInfo.issym() : mode |= stat.S_IFLNK
if tarInfo.ischr() : mode |= stat.S_IFCHR
if tarInfo.isfifo(): mode |= stat.S_IFIFO
fileInfo = FileInfo(
offset = tarInfo.offset_data,
size = tarInfo.size ,
mtime = tarInfo.mtime ,
mode = mode ,
type = tarInfo.type ,
linkname = tarInfo.linkname ,
uid = tarInfo.uid ,
gid = tarInfo.gid ,
istar = False
)
# open contained tars for recursive mounting
indexedTar = None
if self.mountRecursively and tarInfo.isfile() and tarInfo.name.endswith( ".tar" ):
oldPos = fileObject.tell()
if oldPos != tarInfo.offset_data:
fileObject.seek( tarInfo.offset_data )
indexedTar = IndexedTar( tarInfo.name, fileObject = fileObject, writeIndex = False )
fileObject.seek( fileObject.tell() ) # might be especially necessary if the .tar is not actually a tar!
# Add a leading '/' as a convention where '/' represents the TAR root folder
# Partly, done because fusepy specifies paths in a mounted directory like this
path = os.path.normpath( "/" + tarInfo.name )
# test whether the TAR file could be loaded and if so "mount" it recursively
if indexedTar is not None and indexedTar.indexIsLoaded():
# actually apply the recursive tar mounting
extractedName = re.sub( r"\.tar$", "", path )
if not self.exists( extractedName ):
path = extractedName
mountMode = ( fileInfo.mode & 0o777 ) | stat.S_IFDIR
if mountMode & stat.S_IRUSR != 0: mountMode |= stat.S_IXUSR
if mountMode & stat.S_IRGRP != 0: mountMode |= stat.S_IXGRP
if mountMode & stat.S_IROTH != 0: mountMode |= stat.S_IXOTH
fileInfo = fileInfo._replace( mode = mountMode, istar = True )
if self.exists( path ):
print( "[Warning]", path, "already exists in database and will be overwritten!" )
# merge fileIndex from recursively loaded TAR into our Indexes
self.setDirInfo( path, fileInfo, indexedTar.fileIndex )
elif path != '/':
# just a warning and check for the path already existing
if self.exists( path ):
fileInfo = self.getFileInfo( path, listDir = False )
if fileInfo.istar:
# move recursively mounted TAR directory to original .tar name if there is a name-clash,
# e.g., when foo/ also exists in the TAR but foo.tar would be mounted to foo/.
# In this case, move that mount to foo.tar/
self.setFileInfo( path + ".tar", fileInfo, self.getFileInfo( path, listDir = True ) )
else:
print( "[Warning]", path, "already exists in database and will be overwritten!" )
# simply store the file or directory information from current TAR item
if tarInfo.isdir():
self.setDirInfo( path, fileInfo, {} )
else:
self.setFileInfo( path, fileInfo )
t1 = timer()
if printDebug >= 1:
print( "Creating offset dictionary for", "<file object>" if self.tarFileName is None else self.tarFileName, "took {:.2f}s".format( t1 - t0 ) )
def serializationBackendFromFileName( self, fileName ):
splitName = fileName.split( '.' )
if len( splitName ) > 2 and '.'.join( splitName[-2:] ) in self.supportedIndexExtensions():
return '.'.join( splitName[-2:] )
elif splitName[-1] in self.supportedIndexExtensions():
return splitName[-1]
return None
def indexIsLoaded( self ):
return True if self.fileIndex else False
def writeIndex( self, outFileName ):
"""
outFileName: full file name with backend extension. Depending on the extension the serialization is chosen.
"""
serializationBackend = self.serializationBackendFromFileName( outFileName )
if printDebug >= 1:
print( "Writing out TAR index using", serializationBackend, "to", outFileName, "..." )
t0 = timer()
fileMode = 'wt' if 'json' in serializationBackend else 'wb'
if serializationBackend.endswith( '.lz4' ):
import lz4.frame
wrapperOpen = lambda x : lz4.frame.open( x, fileMode )
elif serializationBackend.endswith( '.gz' ):
import gzip
wrapperOpen = lambda x : gzip.open( x, fileMode )
else:
wrapperOpen = lambda x : open( x, fileMode )
serializationBackend = serializationBackend.split( '.' )[0]
# libraries tested but not working:
# - marshal: can't serialize namedtuples
# - hickle: for some reason, creates files almost 64x larger as pickle!? And also takes similarly longer
# - yaml: almost a 10 times slower and more memory usage and deserializes everything including ints to string
with wrapperOpen( outFileName ) as outFile:
if serializationBackend == 'pickle2':
import pickle
pickle.dump( self.fileIndex, outFile )
pickle.dump( self.fileIndex, outFile, protocol = 2 )
# default serialization because it has the fewest dependencies and because it was legacy default
elif serializationBackend == 'pickle3' or \
serializationBackend == 'pickle' or \
serializationBackend is None:
import pickle
pickle.dump( self.fileIndex, outFile )
pickle.dump( self.fileIndex, outFile, protocol = 3 ) # 3 is default protocol
elif serializationBackend == 'simplejson':
import simplejson
simplejson.dump( self.fileIndex, outFile, namedtuple_as_object = True )
elif serializationBackend == 'custom':
IndexedTar.dump( self.fileIndex, outFile )
elif serializationBackend in [ 'msgpack', 'cbor', 'rapidjson', 'ujson' ]:
import importlib
module = importlib.import_module( serializationBackend )
getattr( module, 'dump' )( self.fileIndex, outFile )
else:
print( "Tried to save index with unsupported extension backend:", serializationBackend, "!" )
t1 = timer()
if printDebug >= 1:
print( "Writing out TAR index to", outFileName, "took {:.2f}s".format( t1 - t0 ),
"and is sized", os.stat( outFileName ).st_size, "B" )
if __name__ == '__main__':
parser = argparse.ArgumentParser(
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
description = '''\
Create index for random access to files inside the tar which will be saved to <path to tar>.index.<backend>[.<compression]. If it can't be saved there, it will be saved in ~/.ratarmount/<path to tar: '/' -> '_'>.index.<backend>[.<compression].
''' )
parser.add_argument( '-d', '--debug', type = int, default = 1,
help = 'sets the debugging level. Higher means more output. Currently 3 is the highest' )
parser.add_argument( '-r', '--recursive', action='store_true', default = False,
help = 'index TAR archives inside the mounted TAR recursively.' )
parser.add_argument( '-s', '--serialization-backend', type = str, default = 'custom',
help = 'specify which library to use for writing out the TAR index. Supported keywords: (' +
','.join( IndexedTar.availableSerializationBackends ) + ')[.(' +
','.join( IndexedTar.availableCompressions ).strip( ',' ) + ')]' )
parser.add_argument( 'tarfilepath', metavar = 'tar-file-path',
type = argparse.FileType( 'r' ), nargs = 1,
help = 'the path to the TAR archive to be mounted' )
args = parser.parse_args()
tarToMount = os.path.abspath( args.tarfilepath[0].name )
try:
tarfile.open( tarToMount, mode = 'r:' )
except tarfile.ReadError:
print( "Archive", tarToMount, "can't be opened!",
"This might happen for compressed TAR archives, which currently is not supported." )
exit( 1 )
printDebug = args.debug
IndexedTar( pathToTar = tarToMount,
writeIndex = True,
recursive = args.recursive,
serializationBackend = args.serialization_backend )