Created repo
This commit is contained in:
commit
ad4c22c171
|
|
@ -0,0 +1,183 @@
|
|||
#!/bin/bash
|
||||
set -u
|
||||
usage(){
|
||||
cat << EOF
|
||||
A wrapper around rsync which partitions the input files to blocks of roughly
|
||||
equal sizes and transfers them in multiple streams in parallel.
|
||||
|
||||
Usage: $(basename $0) [rsync flags] <num_streams> <src>... <dest>
|
||||
Options:
|
||||
<num_streams> Number of blocks/transfer streams.
|
||||
<src> Source files (potentially multiple files).
|
||||
<dest> Destination of transfer.
|
||||
-a, --archive Activate rsyncs archive mode.
|
||||
-h, --help Print this message.
|
||||
-n, --dry-run Do not transfer any data.
|
||||
-r, --recursive Copy directories recursively
|
||||
EOF
|
||||
}
|
||||
|
||||
# Parse arguments
|
||||
declare -a positional flags_rsync flags_du
|
||||
positional=()
|
||||
flags_rsync=()
|
||||
flags_du=('-b')
|
||||
flag_copy_links=0
|
||||
flag_recursive=0
|
||||
while [[ $# -gt 0 ]]; do
|
||||
case $1 in
|
||||
'-h'|'--help')
|
||||
usage
|
||||
exit 0
|
||||
;;
|
||||
'-a'|'--archive')
|
||||
flags_rsync=+=('-a')
|
||||
shift
|
||||
;;
|
||||
'-L'|'--copy-links')
|
||||
flag_copy_links=1
|
||||
flags_du+=('-L')
|
||||
shift
|
||||
;;
|
||||
'-n'|'--dry-run')
|
||||
flags_rsync+=('-n')
|
||||
shift
|
||||
;;
|
||||
'-r'|'--recursive')
|
||||
flag_recursive=1
|
||||
shift
|
||||
;;
|
||||
-*|--*)
|
||||
echo "Invalid option: $1" >&2
|
||||
exit 1
|
||||
;;
|
||||
*)
|
||||
positional+=("$1")
|
||||
shift
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
declare -i num_args
|
||||
num_args=${#positional[@]}
|
||||
[[ $num_args -lt 3 ]] && usage && exit 1
|
||||
|
||||
# Parse <num_streams>
|
||||
declare -i num_streams
|
||||
num_streams=${positional[0]}
|
||||
if [[ $num_streams -eq 0 ]]; then
|
||||
echo "Error: invalid value for <num_streams> (${positional[0]})" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Parse source
|
||||
function walk_dir {
|
||||
shopt -s nullglob dotglob
|
||||
for path in "$1"/*; do
|
||||
if [ -d "$path" ]; then
|
||||
walk_dir "$path"
|
||||
else
|
||||
printf '%s\n' "$path"
|
||||
fi
|
||||
done
|
||||
}
|
||||
declare -a src
|
||||
for path in "${positional[@]:1:num_args-2}"; do
|
||||
if [[ -L ${path} ]] && [[ $flag_copy_links -eq 0 ]]; then
|
||||
continue
|
||||
elif [[ -f ${path} ]]; then
|
||||
src+=(${path})
|
||||
elif [[ -d ${path} ]] && [[ ${flag_recursive} -ne 0 ]]; then
|
||||
if [[ ${path:0:1} == '/' ]]; then
|
||||
echo "Error: -r/--recursive does not support absolute paths"
|
||||
exit 1
|
||||
fi
|
||||
src+=($(walk_dir ${path}))
|
||||
fi
|
||||
done
|
||||
[[ ${#src[@]} -eq 0 ]] && exit 0
|
||||
|
||||
# Get destination
|
||||
dest="${positional[-1]}"
|
||||
|
||||
# Sort source files sorted by size
|
||||
declare -a filesize
|
||||
declare -i totalsize
|
||||
# src_sorted=($(du ${flags_du[@]} "${src[@]}" | sort -r -nk1 | awk '{print $2}'))
|
||||
tmp=$(du ${flags_du[@]} "${src[@]}" | sort -r -nk1)
|
||||
filesize=($(echo "$tmp" | awk '{print $1}'))
|
||||
src=($(echo "$tmp" | awk '{print $2}'))
|
||||
totalsize=$(IFS=+; echo "$((${filesize[*]}))")
|
||||
|
||||
# Create file lists
|
||||
# dir_temp=$(mktemp -dt rsync.XXXXX)
|
||||
dir_temp="/scratch/rsync.MCbI5"
|
||||
for ((istream=0;istream<num_streams;istream++)); do
|
||||
file_temp="${dir_temp}/rsync-stream-${istream}.files"
|
||||
echo -n > $file_temp
|
||||
for ((ii=${istream};ii<${#src[@]};ii+=num_streams)); do
|
||||
echo "${src[ii]}" >> $file_temp
|
||||
done
|
||||
done
|
||||
|
||||
# Transfer data
|
||||
declare -a rsync_pids
|
||||
declare -i timer dt_transfer
|
||||
function rsync_sigint {
|
||||
echo "Sending SIGINT to remaining streams..."
|
||||
for pid in ${rsync_pids[@]}; do
|
||||
kill -s SIGINT ${pid}
|
||||
done
|
||||
}
|
||||
rsync_pids=()
|
||||
timer=$(date +%s%N)
|
||||
trap rsync_sigint SIGINT
|
||||
for ((istream=0;istream<num_streams;istream++)); do
|
||||
file_temp="${dir_temp}/rsync-stream-${istream}.files"
|
||||
file_log="${dir_temp}/rsync-stream-${istream}.log"
|
||||
rsync -v ${flags_rsync[@]} $(cat $file_temp) $dest > $file_log &
|
||||
rsync_pids+=($!)
|
||||
echo "Started stream #${istream}... pid=${rsync_pids[istream]}, log=${file_log}"
|
||||
done
|
||||
for pid in ${rsync_pids[@]}; do
|
||||
wait $pid
|
||||
echo "Process ${pid} finished with exit code $?"
|
||||
done
|
||||
|
||||
# Evaluate time and print transfer info
|
||||
dt_transfer=$(($(date +%s%N) - timer))
|
||||
transfer_time=$(echo "$dt_transfer / 1000000000" | bc) # in second
|
||||
transfer_rate=$(echo "($totalsize * 1000000000) / $dt_transfer" | bc) # in byte/second
|
||||
function scale_bytes {
|
||||
local str
|
||||
declare -r OneKiB=1024
|
||||
declare -r OneMiB=1048576
|
||||
declare -r OneGiB=1073741824
|
||||
declare -r OneTiB=1099511627776
|
||||
if [ $1 -gt $OneTiB ]; then
|
||||
str=$(echo "scale=2; $1 / $OneTiB" | bc -l)
|
||||
str+=" TiB"
|
||||
elif [ $1 -gt $OneGiB ]; then
|
||||
str=$(echo "scale=2; $1 / $OneGiB" | bc -l)
|
||||
str+=" GiB"
|
||||
elif [ $1 -gt $OneMiB ]; then
|
||||
str=$(echo "scale=2; $1 / $OneMiB" | bc -l)
|
||||
str+=" MiB"
|
||||
elif [ $1 -gt $OneKiB ]; then
|
||||
str=$(echo "scale=2; $1 / $OneKiB" | bc -l)
|
||||
str+=" KiB"
|
||||
else
|
||||
str="$1 B"
|
||||
fi
|
||||
echo -n $str
|
||||
}
|
||||
function scale_time {
|
||||
local h m s
|
||||
h=$(echo "$1 / 3600" | bc)
|
||||
m=$(echo "($1 % 3600)/60" | bc)
|
||||
s=$(echo "$1 % 60" | bc)
|
||||
[[ $h -gt 0 ]] && printf "%dh " $h
|
||||
[[ $m -gt 0 ]] && printf "%dm " $m
|
||||
printf "%ds" $s
|
||||
}
|
||||
echo "Transferred $(scale_bytes $totalsize) in $(scale_time $transfer_time) ($(scale_bytes $transfer_rate)/s)."
|
||||
Loading…
Reference in New Issue