diff --git a/bwdatar b/bwdatar index 1f90dba..ccc9db7 100755 --- a/bwdatar +++ b/bwdatar @@ -10,18 +10,25 @@ source $HOME/.bwda # Usage function usage(){ - echo "Usage: $(basename $0) [opt] [dir]" + echo "Usage: $(basename $0) [opt] {push,pull} [dir]" + echo " {push,pull} script mode: push directory to bwda, or pull it from there." echo " dir path to local directory on LSDF" echo " --include-size-mismatch include files which mismatch in size to sync list" - echo " -n | --dry-run print commands to stdout instead of executing them" - echo " -d | --tempdir dir Path to the temporary directory where tar-file is created (default: \$PWD)" + echo " -c | --channel N use N channels per file (pull only)" + echo " -d | --workdir dir Path to the temporary directory where tar-file is created (default: \$PWD)" echo " -h | --help display this message" + echo " -n | --dry-run print commands to stdout instead of executing them" + echo " -P | --parallel N transfer N files in parallel (pull only)" + echo " -s | --split bytes chunk size of tar-archive in bytes. Default 500G. Expands K,M,G,KB,MB,GB. K=1024, KB=1000,..." } # Get command line argument flag_sizemismatch=false flag_dryrun=false -tempdir=$PWD +nparallel=1 +nchannel=1 +work_dir=$PWD +bytes_split=500G POSITIONAL=() while [[ $# -gt 0 ]] @@ -32,12 +39,27 @@ case $key in flag_sizemismatch=true shift ;; + -c|--channel) + nchannel=$2 + shift + shift + ;; + -d|--work_dir) + work_dir="$2" + shift + shift + ;; -n|--dry-run) flag_dryrun=true shift ;; - -d|--tempdir) - tempdir="$2" + -P|--parallel) + nparallel=$2 + shift + shift + ;; + -s|--split) + bytes_split=$2 shift shift ;; @@ -54,86 +76,232 @@ esac done set -- "${POSITIONAL[@]}" # restore positional parameters -if [ $# -ne 1 ]; then +if [ $# -ne 2 ]; then usage exit -1 fi -ldir_target="$(realpath $1)" -if [ ! -d ${ldir_target} ]; then - >&2 echo "Not a directory: $1" +# Set mode +scriptmode=$1 +if [[ ! ${scriptmode} == 'push' ]] && [[ ! ${scriptmode} == 'pull' ]]; then + >&2 echo "Invalid mode: '$scriptmode'. (Valid: 'push','pull')" + exit -5 +fi + +# Set input directory and expand to absolute path +input_dir="$(realpath $2)" +if [[ $scriptmode == 'push' ]] && [[ ! -d ${input_dir} ]]; then + >&2 echo "Input is not a directory: $input_dir" exit -3 fi -# Get current path relative to base -if [[ $ldir_target == "${ldir_base}"* ]]; then - dir_target=${ldir_target#"$ldir_base"} -else +# Set working directory where tar file will be stored temporarily +work_dir=$(realpath ${work_dir}) +if [ ! -d ${work_dir} ]; then + >&2 echo "Workdir is not a directory: $work_dir" + exit -4 +fi + +# Validate nchannel and nparallel +nchannel=$(printf %d $nchannel) +[[ $? -ne 0 ]] && >&2 echo "nchannel is not numeric." && exit 20 +nparallel=$(printf %d $nparallel) +[[ $? -ne 0 ]] && >&2 echo "nparallel is not numeric." && exit 20 + +# Expand bytes_split +bytes_split=${bytes_split/KB/*1000} +bytes_split=${bytes_split/MB/*1000000} +bytes_split=${bytes_split/GB/*1000000000} +bytes_split=${bytes_split/K/*1024} +bytes_split=${bytes_split/M/*1024**2} +bytes_split=${bytes_split/G/*1024**3} +if ! [[ $bytes_split =~ ^[0-9\*]+$ ]]; then + >&2 echo "bytes_split is not numeric." + exit 20 +fi +bytes_split=$(( $bytes_split )) +[[ $? -ne 0 ]] && >&2 echo "bytes_split is invalid." && exit 20 + +# Set target directory relative to LSDF base directory +if [[ ! $input_dir == "${ldir_base}"* ]]; then echo "Target directory is not located on LSDF!" echo "Is the base directory setting correct?" echo "ldir_base: $ldir_base" exit -2 fi +input_dir_rel=${input_dir#"$ldir_base"} # path of input_dir relative to LSDF base directory +target_dir_rel="$(dirname $input_dir_rel)" # path of parent directory in which tar of input_dir will be located +# echo $input_dir +# echo $target_dir_rel -# Construct name of tar file -tar_exec_dir="$(dirname $ldir_target)" -tar_target_dir="$(basename $ldir_target)" -tar_filename="${tar_target_dir}.tar" +# Construct base name of tar file: a number might be added if the file is split +tar_exec_dir="$(dirname $input_dir)" # directory from which tar is executed +tar_target_dir="$(basename $input_dir)" # input directory for tar command +tar_filename="${tar_target_dir}.tar" # file name of tar archive (trailing .XXXX may be added if archive needs to be split) + +# tar -cf - Ga_010 | split --numeric-suffixes --suffix-length=4 --bytes=100MB - chunk.split. # Construct paths of tar file -rdir_target="$(dirname ${rdir_base}/${dir_target})" -rfile_target="${rdir_target}/${tar_filename}" - -tempdir=$(realpath ${tempdir}) -ltmpfile="${tempdir}/${tar_filename}" +rdir_target="${rdir_base}/${target_dir_rel}" +# rfile_target="${rdir_target}/${tar_filenam`e}" +lfile_target="${work_dir}/${tar_filename}" # Receive list of files on SFTP server, omit directories rfilelist=$(lftp sftp://${bwda_acc}@${bwda_url} -e "ls -l ${rdir_target}; bye" | grep -v '^d') rfilename=($(echo "$rfilelist" | awk '{print $9}')) rfilesize=($(echo "$rfilelist" | awk '{print $5}')) -# Check if remote tar file already exists -ipos=-1 +# Extract the relevant tar files. It should be a single file if its size is smaller than $bytes_split. +# Otherwise the archive is split in several chunks +nchunk=0 +chunkname=() +chunksize=() for ir in ${!rfilename[@]}; do - if [[ ${rfilename[${ir}]} == ${tar_filename} ]]; then - ipos=$ir - break + if [[ ${rfilename[${ir}]} == "${tar_filename}"* ]]; then + chunkname[${nchunk}]=${rfilename[${ir}]} + chunksize[${nchunk}]=${rfilesize[${ir}]} + let nchunk=nchunk+1 fi done -# If file already exists, check if filesize matches -if [[ $ipos -ge 0 ]]; then - filesize=$(cd ${tar_exec_dir}; \ - tar cf /dev/null --totals ${tar_target_dir} 2>&1 | \ - awk -F: '{print $2}' | awk '{print $1}') - if [[ ${rfilesize[${ipos}]} != ${filesize} ]]; then - >&2 echo "Filesize mismatch: ${rfilename[${ipos}]}, HPSS=${rfilesize[${ipos}]}, estimate=${filesize}" - if [[ ! "$flag_sizemismatch" == true ]]; then - exit 10 +## MODE +if [[ ${scriptmode} == 'pull' ]]; then + # Check if there is data to pull + if [[ $nchunk -lt 1 ]]; then + echo "Archive not available on bwda." + exit 51 + fi + # Construct lftp command + cmd="" + cmd+="set cmd:parallel ${nparallel}; " + cmd+="cd ${rdir_target}; " + cmd+="lcd ${work_dir}; " + for chunk in "${chunkname[@]}"; do + cmd+="pget -n ${nchannel} $chunk; " + done + cmd+="bye" + # Get files from bwda + lftp sftp://${bwda_acc}@${bwda_url} -e "$cmd" + ec=$? + if [[ $ec -ne 0 ]]; then + >&2 echo "lftp failed with status $ec." + exit 101 + fi + # Extract archive + cat_args="" + for chunk in "${chunkname[@]}"; do + cat_args+="${work_dir}/${chunk} " + done + cat ${cat_args} | tar -C ${tar_exec_dir} -xkf - + ec=$? + if [[ $ec -ne 0 ]]; then + >&2 echo "tar failed with status $ec." + exit 100 + fi + rm ${cat_args} +elif [[ ${scriptmode} == 'push' ]]; then + # Estimate file size of archive + filesize_est=$(cd ${tar_exec_dir}; \ + tar cf /dev/null --totals ${tar_target_dir} 2>&1 | \ + awk -F: '{print $2}' | awk '{print $1}') + filesize_est=$(printf %d ${filesize_est}) + # Get file size on bwda (if there are files already) + if [[ ${nchunk} -gt 0 ]]; then + filesize_bwda=0 + for ((ii=0;ii<${nchunk};ii++)); do + let filesize_bwda=filesize_bwda+chunksize[ii] + done + filesize_bwda=$(printf %d ${filesize_bwda}) + fi + # Check if file size matches + if [[ ${nchunk} -gt 0 ]]; then + if [[ ${filesize_est} != ${filesize_bwda} ]]; then + >&2 echo "Filesize mismatch: HPSS=${filesize_bwda}, estimate=${filesize_est}" + if [[ ! "$flag_sizemismatch" == true ]]; then + exit 10 + fi + else + echo "File exists on BWDA and matches expected size. Not doing anything." + exit 0 + fi + fi + # Estimate the number of chunks + nchunk_est=$(( ($filesize_est + $bytes_split - 1) / $bytes_split )) # ceil() + # + if [ ${nchunk_est} -gt 1 ]; then + # nchunk>1: Asyncronously tar and upload available chunks + # trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT + # trap 'kill $(jobs -pr)' SIGINT SIGTERM EXIT + # Define upload function to work asyncronously while archives are written + upload_to_bwda (){ # file, size + local chunk_file=$(realpath $1) + local chunk_size=$2 + while ! ([[ -f ${chunk_file} ]] && [[ $(stat --printf=%s ${chunk_file}) -eq ${chunk_size} ]]); do + sleep 10 + done + # local cur_size=$(stat --printf=%s ${chunk_file}) + # while ; do + # sleep 10 + # cur_size=$(stat --printf=%s ${chunk_file}) + # done + lftp sftp://${bwda_acc}@${bwda_url} -e "cd ${rdir_target}; lcd ${work_dir}; put ${chunk_file}; bye" + return $? + } + # Create lists of subarchives and check if already exists + file_upload=() + for ((ii=0;ii<${nchunk_est};ii++)); do + file_upload[${ii}]=${lfile_target}.$(printf %04d $ii) + if [ -f ${file_upload[${ii}]} ]; then + >&2 echo "Error: temporary file ${file_upload[${ii}]} exists. Refusing to overwrite." + exit 90 + fi + done + # Create tar archive + tar -C ${tar_exec_dir} -cf - ${tar_target_dir} | split --numeric-suffixes --suffix-length=4 --bytes=${bytes_split} - ${lfile_target}. & + pid_tar=$! + # Start upload tasks + pid_upload=() + echo $nchunk_est + for (( ii=0;ii<${nchunk_est}-1;ii++ )); do + upload_to_bwda ${file_upload[${ii}]} ${bytes_split} & + pid_upload[${ii}]=$! + done + upload_to_bwda ${file_upload[${ii}]} $(( $filesize_est % ${bytes_split} )) & + pid_upload[${ii}]=$! + # Wait for tar process + wait $pid_tar + ec=$? + if [[ $ec -ne 0 ]]; then + >&2 echo "tar failed with status $ec." + exit 100 + fi + # Wait for upload processes + for ((ii=0;ii<${nchunk_est};ii++)); do + wait ${pid_upload[${ii}]} + ec=$? + if [[ $ec -ne 0 ]]; then + >&2 echo "lftp failed with status $ec." + exit 101 + fi + rm ${file_upload[${ii}]} + done + else + # nchunk=1: tar and upload are done sequentially + # Create tar archive + tar -C ${tar_exec_dir} -cf ${lfile_target} ${tar_target_dir} + ec=$? + if [[ $ec -ne 0 ]]; then + >&2 echo "tar failed with status $ec." + exit 100 + fi + # Upload archive + lftp sftp://${bwda_acc}@${bwda_url} -e "cd ${rdir_target}; lcd ${work_dir}; put ${tar_filename}; bye" + ec=$? + if [[ $ec -ne 0 ]]; then + >&2 echo "lftp failed with status $ec." + exit 101 + fi + # Remove temporary archive + rm ${lfile_target} fi - else - echo "Nothing to sync." - exit 0 - fi -fi - -# Write a lftp batch script for syncing ->&2 echo "Uploading ${rdir_target}/${tar_filename}" -cmd="open sftp://${bwda_acc}@${bwda_url}\n" -cmd+="set cmd:parallel 1\n" -cmd+="cd ${rdir_target}\n" -cmd+="lcd ${tempdir}\n" -cmd+="put -c ${tar_filename}\n" -cmd+="bye\n" - -# Execute -if [[ "$flag_dryrun" == true ]]; then - printf "$cmd" -else - (cd ${tar_exec_dir}; tar cf ${ltmpfile} ${tar_target_dir}) - tmpfile=$(mktemp) - printf "$cmd" > $tmpfile - lftp -f $tmpfile - rm $tmpfile - rm ${ltmpfile} fi