script now supports push/pull and archive splitting

This commit is contained in:
Michael Krayer 2023-07-26 16:40:10 +02:00
parent 2066c85270
commit 335cd792de
1 changed files with 230 additions and 62 deletions

292
bwdatar
View File

@ -10,18 +10,25 @@ source $HOME/.bwda
# Usage function # Usage function
usage(){ usage(){
echo "Usage: $(basename $0) [opt] [dir]" echo "Usage: $(basename $0) [opt] {push,pull} [dir]"
echo " {push,pull} script mode: push directory to bwda, or pull it from there."
echo " dir path to local directory on LSDF" echo " dir path to local directory on LSDF"
echo " --include-size-mismatch include files which mismatch in size to sync list" echo " --include-size-mismatch include files which mismatch in size to sync list"
echo " -n | --dry-run print commands to stdout instead of executing them" echo " -c | --channel N use N channels per file (pull only)"
echo " -d | --tempdir dir Path to the temporary directory where tar-file is created (default: \$PWD)" echo " -d | --workdir dir Path to the temporary directory where tar-file is created (default: \$PWD)"
echo " -h | --help display this message" echo " -h | --help display this message"
echo " -n | --dry-run print commands to stdout instead of executing them"
echo " -P | --parallel N transfer N files in parallel (pull only)"
echo " -s | --split bytes chunk size of tar-archive in bytes. Default 500G. Expands K,M,G,KB,MB,GB. K=1024, KB=1000,..."
} }
# Get command line argument # Get command line argument
flag_sizemismatch=false flag_sizemismatch=false
flag_dryrun=false flag_dryrun=false
tempdir=$PWD nparallel=1
nchannel=1
work_dir=$PWD
bytes_split=500G
POSITIONAL=() POSITIONAL=()
while [[ $# -gt 0 ]] while [[ $# -gt 0 ]]
@ -32,12 +39,27 @@ case $key in
flag_sizemismatch=true flag_sizemismatch=true
shift shift
;; ;;
-c|--channel)
nchannel=$2
shift
shift
;;
-d|--work_dir)
work_dir="$2"
shift
shift
;;
-n|--dry-run) -n|--dry-run)
flag_dryrun=true flag_dryrun=true
shift shift
;; ;;
-d|--tempdir) -P|--parallel)
tempdir="$2" nparallel=$2
shift
shift
;;
-s|--split)
bytes_split=$2
shift shift
shift shift
;; ;;
@ -54,86 +76,232 @@ esac
done done
set -- "${POSITIONAL[@]}" # restore positional parameters set -- "${POSITIONAL[@]}" # restore positional parameters
if [ $# -ne 1 ]; then if [ $# -ne 2 ]; then
usage usage
exit -1 exit -1
fi fi
ldir_target="$(realpath $1)" # Set mode
if [ ! -d ${ldir_target} ]; then scriptmode=$1
>&2 echo "Not a directory: $1" if [[ ! ${scriptmode} == 'push' ]] && [[ ! ${scriptmode} == 'pull' ]]; then
>&2 echo "Invalid mode: '$scriptmode'. (Valid: 'push','pull')"
exit -5
fi
# Set input directory and expand to absolute path
input_dir="$(realpath $2)"
if [[ $scriptmode == 'push' ]] && [[ ! -d ${input_dir} ]]; then
>&2 echo "Input is not a directory: $input_dir"
exit -3 exit -3
fi fi
# Get current path relative to base # Set working directory where tar file will be stored temporarily
if [[ $ldir_target == "${ldir_base}"* ]]; then work_dir=$(realpath ${work_dir})
dir_target=${ldir_target#"$ldir_base"} if [ ! -d ${work_dir} ]; then
else >&2 echo "Workdir is not a directory: $work_dir"
exit -4
fi
# Validate nchannel and nparallel
nchannel=$(printf %d $nchannel)
[[ $? -ne 0 ]] && >&2 echo "nchannel is not numeric." && exit 20
nparallel=$(printf %d $nparallel)
[[ $? -ne 0 ]] && >&2 echo "nparallel is not numeric." && exit 20
# Expand bytes_split
bytes_split=${bytes_split/KB/*1000}
bytes_split=${bytes_split/MB/*1000000}
bytes_split=${bytes_split/GB/*1000000000}
bytes_split=${bytes_split/K/*1024}
bytes_split=${bytes_split/M/*1024**2}
bytes_split=${bytes_split/G/*1024**3}
if ! [[ $bytes_split =~ ^[0-9\*]+$ ]]; then
>&2 echo "bytes_split is not numeric."
exit 20
fi
bytes_split=$(( $bytes_split ))
[[ $? -ne 0 ]] && >&2 echo "bytes_split is invalid." && exit 20
# Set target directory relative to LSDF base directory
if [[ ! $input_dir == "${ldir_base}"* ]]; then
echo "Target directory is not located on LSDF!" echo "Target directory is not located on LSDF!"
echo "Is the base directory setting correct?" echo "Is the base directory setting correct?"
echo "ldir_base: $ldir_base" echo "ldir_base: $ldir_base"
exit -2 exit -2
fi fi
input_dir_rel=${input_dir#"$ldir_base"} # path of input_dir relative to LSDF base directory
target_dir_rel="$(dirname $input_dir_rel)" # path of parent directory in which tar of input_dir will be located
# echo $input_dir
# echo $target_dir_rel
# Construct name of tar file # Construct base name of tar file: a number might be added if the file is split
tar_exec_dir="$(dirname $ldir_target)" tar_exec_dir="$(dirname $input_dir)" # directory from which tar is executed
tar_target_dir="$(basename $ldir_target)" tar_target_dir="$(basename $input_dir)" # input directory for tar command
tar_filename="${tar_target_dir}.tar" tar_filename="${tar_target_dir}.tar" # file name of tar archive (trailing .XXXX may be added if archive needs to be split)
# tar -cf - Ga_010 | split --numeric-suffixes --suffix-length=4 --bytes=100MB - chunk.split.
# Construct paths of tar file # Construct paths of tar file
rdir_target="$(dirname ${rdir_base}/${dir_target})" rdir_target="${rdir_base}/${target_dir_rel}"
rfile_target="${rdir_target}/${tar_filename}" # rfile_target="${rdir_target}/${tar_filenam`e}"
lfile_target="${work_dir}/${tar_filename}"
tempdir=$(realpath ${tempdir})
ltmpfile="${tempdir}/${tar_filename}"
# Receive list of files on SFTP server, omit directories # Receive list of files on SFTP server, omit directories
rfilelist=$(lftp sftp://${bwda_acc}@${bwda_url} -e "ls -l ${rdir_target}; bye" | grep -v '^d') rfilelist=$(lftp sftp://${bwda_acc}@${bwda_url} -e "ls -l ${rdir_target}; bye" | grep -v '^d')
rfilename=($(echo "$rfilelist" | awk '{print $9}')) rfilename=($(echo "$rfilelist" | awk '{print $9}'))
rfilesize=($(echo "$rfilelist" | awk '{print $5}')) rfilesize=($(echo "$rfilelist" | awk '{print $5}'))
# Check if remote tar file already exists # Extract the relevant tar files. It should be a single file if its size is smaller than $bytes_split.
ipos=-1 # Otherwise the archive is split in several chunks
nchunk=0
chunkname=()
chunksize=()
for ir in ${!rfilename[@]}; do for ir in ${!rfilename[@]}; do
if [[ ${rfilename[${ir}]} == ${tar_filename} ]]; then if [[ ${rfilename[${ir}]} == "${tar_filename}"* ]]; then
ipos=$ir chunkname[${nchunk}]=${rfilename[${ir}]}
break chunksize[${nchunk}]=${rfilesize[${ir}]}
let nchunk=nchunk+1
fi fi
done done
# If file already exists, check if filesize matches ## MODE
if [[ $ipos -ge 0 ]]; then if [[ ${scriptmode} == 'pull' ]]; then
filesize=$(cd ${tar_exec_dir}; \ # Check if there is data to pull
tar cf /dev/null --totals ${tar_target_dir} 2>&1 | \ if [[ $nchunk -lt 1 ]]; then
awk -F: '{print $2}' | awk '{print $1}') echo "Archive not available on bwda."
if [[ ${rfilesize[${ipos}]} != ${filesize} ]]; then exit 51
>&2 echo "Filesize mismatch: ${rfilename[${ipos}]}, HPSS=${rfilesize[${ipos}]}, estimate=${filesize}" fi
if [[ ! "$flag_sizemismatch" == true ]]; then # Construct lftp command
exit 10 cmd=""
cmd+="set cmd:parallel ${nparallel}; "
cmd+="cd ${rdir_target}; "
cmd+="lcd ${work_dir}; "
for chunk in "${chunkname[@]}"; do
cmd+="pget -n ${nchannel} $chunk; "
done
cmd+="bye"
# Get files from bwda
lftp sftp://${bwda_acc}@${bwda_url} -e "$cmd"
ec=$?
if [[ $ec -ne 0 ]]; then
>&2 echo "lftp failed with status $ec."
exit 101
fi
# Extract archive
cat_args=""
for chunk in "${chunkname[@]}"; do
cat_args+="${work_dir}/${chunk} "
done
cat ${cat_args} | tar -C ${tar_exec_dir} -xkf -
ec=$?
if [[ $ec -ne 0 ]]; then
>&2 echo "tar failed with status $ec."
exit 100
fi
rm ${cat_args}
elif [[ ${scriptmode} == 'push' ]]; then
# Estimate file size of archive
filesize_est=$(cd ${tar_exec_dir}; \
tar cf /dev/null --totals ${tar_target_dir} 2>&1 | \
awk -F: '{print $2}' | awk '{print $1}')
filesize_est=$(printf %d ${filesize_est})
# Get file size on bwda (if there are files already)
if [[ ${nchunk} -gt 0 ]]; then
filesize_bwda=0
for ((ii=0;ii<${nchunk};ii++)); do
let filesize_bwda=filesize_bwda+chunksize[ii]
done
filesize_bwda=$(printf %d ${filesize_bwda})
fi
# Check if file size matches
if [[ ${nchunk} -gt 0 ]]; then
if [[ ${filesize_est} != ${filesize_bwda} ]]; then
>&2 echo "Filesize mismatch: HPSS=${filesize_bwda}, estimate=${filesize_est}"
if [[ ! "$flag_sizemismatch" == true ]]; then
exit 10
fi
else
echo "File exists on BWDA and matches expected size. Not doing anything."
exit 0
fi
fi
# Estimate the number of chunks
nchunk_est=$(( ($filesize_est + $bytes_split - 1) / $bytes_split )) # ceil()
#
if [ ${nchunk_est} -gt 1 ]; then
# nchunk>1: Asyncronously tar and upload available chunks
# trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT
# trap 'kill $(jobs -pr)' SIGINT SIGTERM EXIT
# Define upload function to work asyncronously while archives are written
upload_to_bwda (){ # file, size
local chunk_file=$(realpath $1)
local chunk_size=$2
while ! ([[ -f ${chunk_file} ]] && [[ $(stat --printf=%s ${chunk_file}) -eq ${chunk_size} ]]); do
sleep 10
done
# local cur_size=$(stat --printf=%s ${chunk_file})
# while ; do
# sleep 10
# cur_size=$(stat --printf=%s ${chunk_file})
# done
lftp sftp://${bwda_acc}@${bwda_url} -e "cd ${rdir_target}; lcd ${work_dir}; put ${chunk_file}; bye"
return $?
}
# Create lists of subarchives and check if already exists
file_upload=()
for ((ii=0;ii<${nchunk_est};ii++)); do
file_upload[${ii}]=${lfile_target}.$(printf %04d $ii)
if [ -f ${file_upload[${ii}]} ]; then
>&2 echo "Error: temporary file ${file_upload[${ii}]} exists. Refusing to overwrite."
exit 90
fi
done
# Create tar archive
tar -C ${tar_exec_dir} -cf - ${tar_target_dir} | split --numeric-suffixes --suffix-length=4 --bytes=${bytes_split} - ${lfile_target}. &
pid_tar=$!
# Start upload tasks
pid_upload=()
echo $nchunk_est
for (( ii=0;ii<${nchunk_est}-1;ii++ )); do
upload_to_bwda ${file_upload[${ii}]} ${bytes_split} &
pid_upload[${ii}]=$!
done
upload_to_bwda ${file_upload[${ii}]} $(( $filesize_est % ${bytes_split} )) &
pid_upload[${ii}]=$!
# Wait for tar process
wait $pid_tar
ec=$?
if [[ $ec -ne 0 ]]; then
>&2 echo "tar failed with status $ec."
exit 100
fi
# Wait for upload processes
for ((ii=0;ii<${nchunk_est};ii++)); do
wait ${pid_upload[${ii}]}
ec=$?
if [[ $ec -ne 0 ]]; then
>&2 echo "lftp failed with status $ec."
exit 101
fi
rm ${file_upload[${ii}]}
done
else
# nchunk=1: tar and upload are done sequentially
# Create tar archive
tar -C ${tar_exec_dir} -cf ${lfile_target} ${tar_target_dir}
ec=$?
if [[ $ec -ne 0 ]]; then
>&2 echo "tar failed with status $ec."
exit 100
fi
# Upload archive
lftp sftp://${bwda_acc}@${bwda_url} -e "cd ${rdir_target}; lcd ${work_dir}; put ${tar_filename}; bye"
ec=$?
if [[ $ec -ne 0 ]]; then
>&2 echo "lftp failed with status $ec."
exit 101
fi
# Remove temporary archive
rm ${lfile_target}
fi fi
else
echo "Nothing to sync."
exit 0
fi
fi
# Write a lftp batch script for syncing
>&2 echo "Uploading ${rdir_target}/${tar_filename}"
cmd="open sftp://${bwda_acc}@${bwda_url}\n"
cmd+="set cmd:parallel 1\n"
cmd+="cd ${rdir_target}\n"
cmd+="lcd ${tempdir}\n"
cmd+="put -c ${tar_filename}\n"
cmd+="bye\n"
# Execute
if [[ "$flag_dryrun" == true ]]; then
printf "$cmd"
else
(cd ${tar_exec_dir}; tar cf ${ltmpfile} ${tar_target_dir})
tmpfile=$(mktemp)
printf "$cmd" > $tmpfile
lftp -f $tmpfile
rm $tmpfile
rm ${ltmpfile}
fi fi