Skip to content

Commit

Permalink
Additional workload adjustments
Browse files Browse the repository at this point in the history
- Terminate runners after a timeout
- fsync often to ensure data is written out
  • Loading branch information
geoffxy committed Nov 13, 2023
1 parent 54e1e45 commit 55f7bbe
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
15 changes: 14 additions & 1 deletion experiments/15-e2e-scenarios-v2/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ function graceful_shutdown() {
wait $brad_pid
}

function terminate_process_group() {
local pid=$1
local initial_wait_s=$2
sleep $2
if kill -0 $pid >/dev/null 2>&1; then
pkill -KILL -P $pid
pkill -KILL $pid
echo "NOTE: Forced process $pid to stop."
else
echo "Process $pid stopped gracefully."
fi
}

function log_workload_point() {
msg=$1
now=$(date --utc "+%Y-%m-%d %H:%M:%S")
Expand Down Expand Up @@ -117,7 +130,7 @@ function start_repeating_olap_runner() {
mkdir -p $results_dir

log_workload_point $results_name
COND_OUT=$results_dir python3 ../../../workloads/IMDB_extended/run_repeating_analytics.py "${args[@]}" &
COND_OUT=$results_dir python3.11 ../../../workloads/IMDB_extended/run_repeating_analytics.py "${args[@]}" &

# This is a special return value variable that we use.
runner_pid=$!
Expand Down
8 changes: 6 additions & 2 deletions experiments/15-e2e-scenarios-v2/scale_up/run_workload.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ sleep 30
log_workload_point "start_rana_8"
start_repeating_olap_runner 8 15 5 $initial_queries "ra_8"
rana_pid=$runner_pid
log_workload_point "started_rana_8_$rana_pid"
sleep 2

# Start with 4 transactional clients; hold for 10 minutes to stabilize.
Expand Down Expand Up @@ -94,24 +95,27 @@ sleep $((20 * 60)) # 32 mins total; 42 mins cumulative
log_workload_point "start_heavy_rana_8"
start_repeating_olap_runner 8 15 1 $heavier_queries "ra_8_heavy" 8
heavy_rana_pid=$runner_pid
log_workload_point "started_heavy_rana_8_$heavy_rana_pid"
sleep $((20 * 60)) # 20 mins total; 62 mins cumulative

log_workload_point "stopping_heavy_rana_8"
kill -INT $heavy_rana_pid
wait $heavy_rana_pid
terminate_process_group $heavy_rana_pid 10

log_workload_point "start_heavy_rana_10"
start_repeating_olap_runner 10 5 1 $heavier_queries "ra_10_heavy" 8
heavy_rana_pid=$runner_pid
log_workload_point "started_heavy_rana_10_$heavy_rana_pid"
sleep $((10 * 60)) # 10 mins total; 72 mins cumulative

log_workload_point "stopping_heavy_rana_10"
kill -INT $heavy_rana_pid
wait $heavy_rana_pid
terminate_process_group $heavy_rana_pid 10

log_workload_point "start_heavy_rana_20"
start_repeating_olap_runner 20 5 1 $heavier_queries "ra_20_heavy" 8
heavy_rana_pid=$runner_pid
log_workload_point "started_heavy_rana_20_$heavy_rana_pid"
sleep $((30 * 60)) # 30 mins total; 102 mins cumulative

log_workload_point "experiment_workload_done"
Expand Down
10 changes: 10 additions & 0 deletions workloads/IMDB_extended/run_repeating_analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def noop(_signal, _frame):
query_frequency = query_frequency[queries]
query_frequency = query_frequency / np.sum(query_frequency)

exec_count = 0
file = open(
out_dir / "repeating_olap_batch_{}.csv".format(runner_idx),
"w",
Expand Down Expand Up @@ -181,8 +182,13 @@ def noop(_signal, _frame):
file=file,
flush=True,
)
exec_count += 1
rand_backoff = None

if exec_count % 20 == 0:
# To avoid data loss if this script crashes.
os.fsync(file.fileno())

except BradClientError as ex:
if ex.is_transient():
print(
Expand Down Expand Up @@ -217,12 +223,16 @@ def noop(_signal, _frame):

try:
_ = stop_queue.get_nowait()
print(f"Runner {runner_idx} is exiting.")
break
except queue.Empty:
pass
finally:
os.fsync(file.fileno())
file.close()
database.close_sync()
# Make sure the queue is drained.
_ = stop_queue.get_nowait()


def simulation_runner(
Expand Down

0 comments on commit 55f7bbe

Please sign in to comment.