Skip to content

Commit

Permalink
Improve silo import scripts etc
Browse files Browse the repository at this point in the history
  • Loading branch information
corneliusroemer committed Sep 11, 2024
1 parent ae7425e commit fab0ccc
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 31 deletions.
74 changes: 51 additions & 23 deletions kubernetes/loculus/silo_import_job.sh
Original file line number Diff line number Diff line change
@@ -1,27 +1,52 @@
#!/bin/bash

set -e

# Default values
root_dir=""
last_snapshot=""

# Parse command-line arguments
usage() {
echo "Usage: $0 [--last-snapshot=UNIXTIMESTAMP]"
echo "Usage: $0 [--root-dir=PATH] [--last-snapshot=UNIXTIMESTAMP] [--backend-base-url=BACKEND_BASE_URL]"
exit 1
}

for arg in "$@"; do
case $arg in
--root-dir=*)
root_dir="${arg#*=}"
shift
;;
--last-snapshot=*)
last_snapshot="${arg#*=}"
shift
;;
--backend-base-url=*)
backend_base_url="${arg#*=}"
shift
;;
*)
usage
;;
esac
done

input_data_dir="/preprocessing/input"
# Check if backend base URL is provided
if [ -z "$backend_base_url" ]; then
echo "Error: Missing mandatory argument --backend-base-url"
usage
fi

# Example usage of $rootDir:
echo "Root directory: $root_dir"
echo "Last snapshot: $last_snapshot"

preprocessing_dir="${root_dir}/preprocessing"

input_data_dir="${preprocessing_dir}/input"
output_data_dir="${preprocessing_dir}/output"

mkdir -p "$input_data_dir" "$output_data_dir"

current_timestamp=$(date +%s)
new_input_data_dir="$input_data_dir/$current_timestamp"
Expand All @@ -30,8 +55,8 @@ old_input_data_dir="$input_data_dir"/$(ls -1 "$input_data_dir" | sort -n | grep

new_input_data="$new_input_data_dir/data.ndjson.zst"
new_input_header="$new_input_data_dir/header.txt"
new_snapshot_time="$new_input_data_dir/snapshot_time.txt"
current_snapshot_time="$input_data_dir/snapshot_time.txt"
new_snapshot_time_path="$new_input_data_dir/snapshot_time.txt"
current_snapshot_time_path="$input_data_dir/snapshot_time.txt"

old_input_data="$old_input_data_dir/data.ndjson.zst"
new_input_touchfile="$new_input_data_dir/processing"
Expand All @@ -52,15 +77,15 @@ download_data() {
# Set flag to be cleared when processing succeeds to avoid getting stuck with no output data
touch "$new_input_touchfile"

released_data_endpoint="$BACKEND_BASE_URL/get-released-data?compression=zstd"
released_data_endpoint="$backend_base_url/get-released-data?compression=zstd"
echo "calling $released_data_endpoint"

set +e
response=$(curl -o "$new_input_data" --fail-with-body "$released_data_endpoint" -H "If-Modified-Since: $last_snapshot" -D "$new_input_header" -w "%{http_code}")
exit_code=$?
set -e
http_code="${response: -3}"
echo "http code: $http_code"
echo "Release data request returned with http status code: $http_code"
if [ "$http_code" -eq 304 ]; then
echo "State in Loculus backend has not changed: HTTP 304 Not Modified."
rm -rf "$new_input_data_dir"
Expand All @@ -74,11 +99,10 @@ download_data() {

last_modified=$(grep '^last-modified:' "$new_input_header" | awk '{print $2}')
echo "Last-modified from header: $last_modified"
echo "$last_modified" >> "$new_snapshot_time"
echo "$last_modified" > "$new_snapshot_time_path"

echo "downloaded sequences"
ls -l "$new_input_data_dir"
echo

echo "checking for old input data dir $old_input_data_dir"
if [[ -f "$old_input_data" ]]; then
Expand All @@ -96,7 +120,7 @@ download_data() {
echo "Hashes are equal, skipping preprocessing"
echo "Deleting new input data dir $new_input_data_dir"
rm -rf "$new_input_data_dir"
exit 0
update_snapshot_time
else
echo "Hashes are unequal, deleting old input data dir"
rm -rf "$old_input_data_dir:?}"
Expand Down Expand Up @@ -126,24 +150,28 @@ preprocessing() {
echo "SiloApi command failed with exit code $exit_code, cleaning up and exiting."
delete_all_input # Delete input so that we don't skip preprocessing next time due to hash equality
exit $exit_code
else
echo "SiloApi command succeeded"
echo "Removing touchfile $new_input_touchfile to indicate successful processing"
rm -f "$new_input_touchfile"
fi

# Only set new snapshot time when siloApi also succeeds
cp "$new_snapshot_time" "$current_snapshot_time"
echo "preprocessing for $current_timestamp done"
echo
echo "SiloApi command succeeded"
echo "Removing touchfile $new_input_touchfile to indicate successful processing"
rm "$new_input_touchfile"

update_snapshot_time
}

update_snapshot_time() {
new_snapshot_time=$(cat "$new_snapshot_time_path")
old_snapshot_time=$(cat "$current_snapshot_time_path")
echo "Updating snapshot time from $old_snapshot_time to $new_snapshot_time"
cp "$new_snapshot_time_path" "$current_snapshot_time_path"
}

# Potential race condition: silo might not release non-current dir if it's still being used
cleanup_output_data() {
for dir_type in "input" "output"; do
dir="/preprocessing/$dir_type"
dir="$preprocessing_dir/$dir_type"
echo "Removing all but the most recent $dir_type directory in $dir"
cd $dir || exit
cd "$dir" || exit

if [ -n "$(ls -d -- */ 2>/dev/null)" ]; then
directories=$(ls -dt -- */)
Expand All @@ -170,9 +198,9 @@ main() {
echo "Script started at: $(date)"

echo "Current content of input data dir: $input_data_dir"
ls -l $input_data_dir
echo "Current content of output data dir: /preprocessing/output"
ls -l /preprocessing/output
ls -l "$input_data_dir"
echo "Current content of output data dir: $output_data_dir"
ls -l "$output_data_dir"
echo

# cleanup at start in case we fail later
Expand Down
31 changes: 23 additions & 8 deletions kubernetes/loculus/silo_import_wrapper.sh
Original file line number Diff line number Diff line change
@@ -1,23 +1,38 @@
#!/bin/bash

input_data_dir="/preprocessing/input"
current_snapshot_time="$input_data_dir/snapshot_time.txt"
current_snapshot_time_path="$input_data_dir/snapshot_time.txt"
last_hard_refresh_time_path="$input_data_dir/last_hard_refresh_time.txt"

while true
do
if [ -f "$current_snapshot_time" ]; then
last_snapshot_time=$(cat "$current_snapshot_time" | tr -d '[:space:]')
get_time_from_file() {
if [ -f "$1" ]; then
cat "$1" | tr -d '[:space:]'
else
last_snapshot_time=0
echo 0
fi
}

while true
do
last_snapshot_time=$(get_time_from_file "$current_snapshot_time_path")
echo "Data in SILO corresponds to data in Loculus at time: $last_snapshot_time"

last_hard_refresh_time=$(get_time_from_file "$last_hard_refresh_time_path")

# Check if the difference is greater than or equal to 3600 seconds (1 hour)
# We only use cache
current_time=$(date +%s)
time_diff=$((current_time - last_snapshot_time))
time_diff=$((current_time - last_hard_refresh_time))
if [ "$time_diff" -ge 3600 ]; then
echo "Data in SILO is over 1h older than Loculus, ask for all data regardless of last-modified-since tag."
echo "Last hard refresh was more than 1 hour ago. Performing hard refresh."
bash /silo_import_job.sh --last-snapshot=0
exit_code=$?
if [ "$exit_code" -ne 0 ]; then
echo "Error: Hard refresh failed with exit code $exit_code"
else
echo "Hard refresh completed successfully"
echo "$current_time" > "$last_hard_refresh_time_path"
fi
else
bash /silo_import_job.sh --last-snapshot="$last_snapshot_time"
fi
Expand Down

0 comments on commit fab0ccc

Please sign in to comment.