split mpiio ucf files while taring
This commit is contained in:
parent
0284e8f954
commit
ec6c02c7d6
|
|
@ -0,0 +1,128 @@
|
|||
#!/usr/bin/env python3
|
||||
import sys
|
||||
import io
|
||||
import os
|
||||
import tarfile
|
||||
import argparse
|
||||
import numpy as np
|
||||
import ucf
|
||||
import configparser
|
||||
|
||||
parser = argparse.ArgumentParser(description='Reads an ucf.tar archive, downsamples it and saves it to a new ucf.tar archive. Can be used as a pipe.')
|
||||
parser.add_argument('indir', metavar='dirin', help='input directory')
|
||||
parser.add_argument('iseq', metavar='iseq', help='sequence number')
|
||||
parser.add_argument('base', metavar='base', help='filebase to be archived: "uvwp" or "scal"')
|
||||
parser.add_argument("-o", "--outfile", metavar='file',nargs='?', default=None, help="name of the output file [default: snapshot_XXXX.ucf.tar]", action="store")
|
||||
parser.add_argument("-v", "--verbose", help="activate verbose output", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
dir_in = args.indir
|
||||
iseq = int(args.iseq)
|
||||
base = args.base
|
||||
verbose = args.verbose
|
||||
|
||||
if not base in ('uvwp','scal'):
|
||||
raise ValueError('Invalid base key: {}',base)
|
||||
|
||||
if args.outfile is None:
|
||||
if base=='uvwp':
|
||||
file_out = 'snapshot_{:04d}.ucf.tar'.format(iseq)
|
||||
elif base=='scal':
|
||||
file_out = 'snapshot_scalar_{:04d}.ucf.tar'.format(iseq)
|
||||
else:
|
||||
file_out = args.outfile
|
||||
|
||||
# Check if all required files are available
|
||||
files_req = ('parameters_{:04d}.asc','proc_{:04d}.bin','grid_{:04d}.bin','particles_{:04d}.bin',base+'_{:04d}.bin')
|
||||
for files in files_req:
|
||||
path_check = '{}/{}'.format(dir_in,files.format(iseq))
|
||||
if not os.path.isfile(path_check):
|
||||
raise IOError('File does not exist: {}'.format(path_check))
|
||||
if verbose:
|
||||
print("[x] parameters")
|
||||
print("[x] grid")
|
||||
print("[x] processor grid")
|
||||
print("[x] particles")
|
||||
if base=='uvwp':
|
||||
print("[x] fluid field")
|
||||
elif base=='scal':
|
||||
print("[x] scalar field")
|
||||
|
||||
# Open tar file for output
|
||||
ftar = tarfile.open(name=file_out,mode='w',pax_headers=tarfile.USTAR_FORMAT)
|
||||
|
||||
def transform_filename(filename,iseq):
|
||||
return os.path.basename(file_in).replace('_{:04d}'.format(iseq),'')
|
||||
|
||||
# Parse parameters to construct file headers, then add it to tar
|
||||
file_in = '{}/parameters_{:04d}.asc'.format(dir_in,iseq)
|
||||
config = configparser.ConfigParser()
|
||||
config.read(file_in)
|
||||
nxprocs = int(config['parallel']['nxprocs'])
|
||||
nyprocs = int(config['parallel']['nyprocs'])
|
||||
nzprocs = int(config['parallel']['nzprocs'])
|
||||
nprocs = nxprocs*nyprocs*nzprocs
|
||||
|
||||
print(transform_filename(file_in,iseq))
|
||||
fid = open(file_in,'rb')
|
||||
info = tarfile.TarInfo(name=transform_filename(file_in,iseq))
|
||||
info.size = os.path.getsize(file_in)
|
||||
ftar.addfile(info,fileobj=fid)
|
||||
fid.close()
|
||||
|
||||
# Add proc.bin
|
||||
file_in = '{}/proc_{:04d}.bin'.format(dir_in,iseq)
|
||||
print(transform_filename(file_in,iseq))
|
||||
fid = open(file_in,'rb')
|
||||
info = tarfile.TarInfo(name=transform_filename(file_in,iseq))
|
||||
info.size = os.path.getsize(file_in)
|
||||
ftar.addfile(info,fileobj=fid)
|
||||
fid.close()
|
||||
|
||||
# Add grid.bin
|
||||
file_in = '{}/grid_{:04d}.bin'.format(dir_in,iseq)
|
||||
print(transform_filename(file_in,iseq))
|
||||
fid = open(file_in,'rb')
|
||||
info = tarfile.TarInfo(name=transform_filename(file_in,iseq))
|
||||
info.size = os.path.getsize(file_in)
|
||||
ftar.addfile(info,fileobj=fid)
|
||||
fid.close()
|
||||
|
||||
# Add particles.bin
|
||||
file_in = '{}/particles_{:04d}.bin'.format(dir_in,iseq)
|
||||
print(transform_filename(file_in,iseq))
|
||||
fid = open(file_in,'rb')
|
||||
info = tarfile.TarInfo(name=transform_filename(file_in,iseq))
|
||||
info.size = os.path.getsize(file_in)
|
||||
ftar.addfile(info,fileobj=fid)
|
||||
fid.close()
|
||||
|
||||
# Split uvwp/scal files and add them
|
||||
def positionFromRank(rank,nxp,nyp,nzp):
|
||||
ip = rank//(nyp*nzp)
|
||||
jp = (rank//nzp)%nyp
|
||||
kp = rank%nzp
|
||||
return (ip,jp,kp)
|
||||
|
||||
file_in = '{}/{}_{:04d}.bin'.format(dir_in,base,iseq)
|
||||
ucf_data = ucf.UCF(file=file_in)
|
||||
if nprocs!=ucf_data.NumTimestep:
|
||||
raise ValueError('Number of steps does not equal number of procs: {}, {}',ucf_data.NumTimestep,nprocs)
|
||||
|
||||
for iproc in range(0,nprocs):
|
||||
print(base+'.{:05d}'.format(iproc))
|
||||
# Construct a new UCF file
|
||||
ucf_data.addFileHeaderToBuffer(
|
||||
rank=iproc,
|
||||
rankijk=positionFromRank(iproc,nxprocs,nyprocs,nzprocs),
|
||||
ftype=ucf_data._UCF__typeID
|
||||
)
|
||||
ucf_data.copyStepToBuffer(iproc+1,recursive=True)
|
||||
ucf_bytes = ucf_data.flushBuffer()
|
||||
# Write it to tar
|
||||
info = tarfile.TarInfo(name=base+'.{:05d}'.format(iproc))
|
||||
info.size = len(ucf_bytes)
|
||||
ftar.addfile(info,fileobj=io.BytesIO(ucf_bytes))
|
||||
|
||||
# Close tar file
|
||||
ftar.close()
|
||||
Loading…
Reference in New Issue