diff --git a/python/ucf/tools/ucf_mpiio_tar b/python/ucf/tools/ucf_mpiio_tar new file mode 100755 index 0000000..55b9ba8 --- /dev/null +++ b/python/ucf/tools/ucf_mpiio_tar @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 +import sys +import io +import os +import tarfile +import argparse +import numpy as np +import ucf +import configparser + +parser = argparse.ArgumentParser(description='Reads an ucf.tar archive, downsamples it and saves it to a new ucf.tar archive. Can be used as a pipe.') +parser.add_argument('indir', metavar='dirin', help='input directory') +parser.add_argument('iseq', metavar='iseq', help='sequence number') +parser.add_argument('base', metavar='base', help='filebase to be archived: "uvwp" or "scal"') +parser.add_argument("-o", "--outfile", metavar='file',nargs='?', default=None, help="name of the output file [default: snapshot_XXXX.ucf.tar]", action="store") +parser.add_argument("-v", "--verbose", help="activate verbose output", action="store_true") +args = parser.parse_args() + +dir_in = args.indir +iseq = int(args.iseq) +base = args.base +verbose = args.verbose + +if not base in ('uvwp','scal'): + raise ValueError('Invalid base key: {}',base) + +if args.outfile is None: + if base=='uvwp': + file_out = 'snapshot_{:04d}.ucf.tar'.format(iseq) + elif base=='scal': + file_out = 'snapshot_scalar_{:04d}.ucf.tar'.format(iseq) +else: + file_out = args.outfile + +# Check if all required files are available +files_req = ('parameters_{:04d}.asc','proc_{:04d}.bin','grid_{:04d}.bin','particles_{:04d}.bin',base+'_{:04d}.bin') +for files in files_req: + path_check = '{}/{}'.format(dir_in,files.format(iseq)) + if not os.path.isfile(path_check): + raise IOError('File does not exist: {}'.format(path_check)) +if verbose: + print("[x] parameters") + print("[x] grid") + print("[x] processor grid") + print("[x] particles") + if base=='uvwp': + print("[x] fluid field") + elif base=='scal': + print("[x] scalar field") + +# Open tar file for output +ftar = tarfile.open(name=file_out,mode='w',pax_headers=tarfile.USTAR_FORMAT) + +def transform_filename(filename,iseq): + return os.path.basename(file_in).replace('_{:04d}'.format(iseq),'') + +# Parse parameters to construct file headers, then add it to tar +file_in = '{}/parameters_{:04d}.asc'.format(dir_in,iseq) +config = configparser.ConfigParser() +config.read(file_in) +nxprocs = int(config['parallel']['nxprocs']) +nyprocs = int(config['parallel']['nyprocs']) +nzprocs = int(config['parallel']['nzprocs']) +nprocs = nxprocs*nyprocs*nzprocs + +print(transform_filename(file_in,iseq)) +fid = open(file_in,'rb') +info = tarfile.TarInfo(name=transform_filename(file_in,iseq)) +info.size = os.path.getsize(file_in) +ftar.addfile(info,fileobj=fid) +fid.close() + +# Add proc.bin +file_in = '{}/proc_{:04d}.bin'.format(dir_in,iseq) +print(transform_filename(file_in,iseq)) +fid = open(file_in,'rb') +info = tarfile.TarInfo(name=transform_filename(file_in,iseq)) +info.size = os.path.getsize(file_in) +ftar.addfile(info,fileobj=fid) +fid.close() + +# Add grid.bin +file_in = '{}/grid_{:04d}.bin'.format(dir_in,iseq) +print(transform_filename(file_in,iseq)) +fid = open(file_in,'rb') +info = tarfile.TarInfo(name=transform_filename(file_in,iseq)) +info.size = os.path.getsize(file_in) +ftar.addfile(info,fileobj=fid) +fid.close() + +# Add particles.bin +file_in = '{}/particles_{:04d}.bin'.format(dir_in,iseq) +print(transform_filename(file_in,iseq)) +fid = open(file_in,'rb') +info = tarfile.TarInfo(name=transform_filename(file_in,iseq)) +info.size = os.path.getsize(file_in) +ftar.addfile(info,fileobj=fid) +fid.close() + +# Split uvwp/scal files and add them +def positionFromRank(rank,nxp,nyp,nzp): + ip = rank//(nyp*nzp) + jp = (rank//nzp)%nyp + kp = rank%nzp + return (ip,jp,kp) + +file_in = '{}/{}_{:04d}.bin'.format(dir_in,base,iseq) +ucf_data = ucf.UCF(file=file_in) +if nprocs!=ucf_data.NumTimestep: + raise ValueError('Number of steps does not equal number of procs: {}, {}',ucf_data.NumTimestep,nprocs) + +for iproc in range(0,nprocs): + print(base+'.{:05d}'.format(iproc)) + # Construct a new UCF file + ucf_data.addFileHeaderToBuffer( + rank=iproc, + rankijk=positionFromRank(iproc,nxprocs,nyprocs,nzprocs), + ftype=ucf_data._UCF__typeID + ) + ucf_data.copyStepToBuffer(iproc+1,recursive=True) + ucf_bytes = ucf_data.flushBuffer() + # Write it to tar + info = tarfile.TarInfo(name=base+'.{:05d}'.format(iproc)) + info.size = len(ucf_bytes) + ftar.addfile(info,fileobj=io.BytesIO(ucf_bytes)) + +# Close tar file +ftar.close()