Skip to content

Commit

Permalink
Switch the transaction runner to semaphores too
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy committed Nov 17, 2023
1 parent eb50764 commit 4cb74d6
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 36 deletions.
18 changes: 6 additions & 12 deletions workloads/IMDB_extended/run_repeating_analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def simulation_runner(
all_query_runtime: npt.NDArray,
runner_idx: int,
start_queue: mp.Queue,
stop_queue: mp.Queue,
control_semaphore: mp.Semaphore,
args,
queries: List[int],
query_frequency_original: Optional[npt.NDArray] = None,
Expand Down Expand Up @@ -307,13 +307,13 @@ def noop(_signal, _frame):

# Signal that we're ready to start and wait for the controller.
start_queue.put_nowait("")
msg = stop_queue.get()

if msg == RUNNER_EXIT:
print(f"Simulation runner {runner_idx} is stopping without having started.")
return
control_semaphore.acquire()

while True:
should_exit = control_semaphore.acquire(False)
if should_exit:
break

if execution_gap_dist is not None:
now = datetime.now().astimezone(pytz.utc)
time_unsimulated = get_time_of_the_day_unsimulated(
Expand Down Expand Up @@ -368,12 +368,6 @@ def noop(_signal, _frame):
flush=True,
)

try:
_ = stop_queue.get_nowait()
break
except queue.Empty:
pass


def run_warmup(args, query_bank: List[str], queries: List[int]):
if args.engine is not None:
Expand Down
73 changes: 49 additions & 24 deletions workloads/IMDB_extended/run_transactions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import argparse
import pathlib
import queue
import random
import signal
import sys
Expand All @@ -21,13 +20,15 @@
from workload_utils.connect import connect_to_db
from workload_utils.transaction_worker import TransactionWorker

STARTUP_FAILED = "startup_failed"


def runner(
args,
worker_idx: int,
directory: Optional[Directory],
start_queue: mp.Queue,
stop_queue: mp.Queue,
control_semaphore: mp.Semaphore, # type: ignore
) -> None:
"""
Meant to be launched as a subprocess with multiprocessing.
Expand Down Expand Up @@ -59,12 +60,17 @@ def noop_handler(_signal, _frame):
aborts = [0 for _ in range(len(transactions))]

# Connect and set the isolation level.
db = connect_to_db(
args, worker_idx, direct_engine=Engine.Aurora, directory=directory
)
db.execute_sync(
f"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL {args.isolation_level}"
)
try:
db = connect_to_db(
args, worker_idx, direct_engine=Engine.Aurora, directory=directory
)
db.execute_sync(
f"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL {args.isolation_level}"
)
except BradClientError as ex:
print(f"[T {worker_idx}] Failed to connect to BRAD:", str(ex))
start_queue.put_nowait(STARTUP_FAILED)
return

# For printing out results.
if "COND_OUT" in os.environ:
Expand All @@ -77,7 +83,7 @@ def noop_handler(_signal, _frame):

# Signal that we are ready to start and wait for other clients.
start_queue.put("")
_ = stop_queue.get()
control_semaphore.acquire()

rand_backoff = None
overall_start = time.time()
Expand All @@ -88,6 +94,12 @@ def noop_handler(_signal, _frame):
print("txn_idx,timestamp,run_time_s", file=latency_file)

while True:
# Note that `False` means to not block.
should_exit = control_semaphore.acquire(False) # type: ignore
if should_exit:
print(f"T Runner {worker_idx} is exiting.")
break

txn_idx = txn_prng.choices(txn_indexes, weights=transaction_weights, k=1)[0]
txn = transactions[txn_idx]

Expand Down Expand Up @@ -183,12 +195,6 @@ def noop_handler(_signal, _frame):
f"[T {worker_idx}] Abort rate is higher than expected ({abort_rate:.4f})."
)

try:
_ = stop_queue.get_nowait()
break
except queue.Empty:
pass

finally:
overall_end = time.time()
print(f"[{worker_idx}] Done running transactions.", flush=True, file=sys.stderr)
Expand Down Expand Up @@ -279,8 +285,8 @@ def main():
args = parser.parse_args()

mgr = mp.Manager()
start_queue = mgr.Queue()
stop_queue = mgr.Queue()
start_queue = [mgr.Queue() for _ in range(args.num_clients)]
control_semaphore = [mgr.Semaphore(value=0) for _ in range(args.num_clients)]

if args.brad_direct:
assert args.config_file is not None
Expand All @@ -294,22 +300,40 @@ def main():
clients = []
for idx in range(args.num_clients):
p = mp.Process(
target=runner, args=(args, idx, directory, start_queue, stop_queue)
target=runner,
args=(args, idx, directory, start_queue[idx], control_semaphore[idx]),
)
p.start()
clients.append(p)

print("Waiting for startup...", file=sys.stderr, flush=True)
for _ in range(args.num_clients):
start_queue.get()
one_startup_failed = False
for i in range(args.num_clients):
msg = start_queue[i].get()
if msg == STARTUP_FAILED:
one_startup_failed = True

if one_startup_failed:
print(
"At least one transactional runner failed to start up. Aborting the experiment.",
flush=True,
file=sys.stderr,
)
for i in range(args.num_clients):
control_semaphore[i].release()
control_semaphore[i].release()
for p in clients:
p.join()
print("Transactional client abort complete.", file=sys.stderr, flush=True)
return

print(
"Telling {} clients to start.".format(args.num_clients),
file=sys.stderr,
flush=True,
)
for _ in range(args.num_clients):
stop_queue.put("")
for idx in range(args.num_clients):
control_semaphore[idx].release()

if args.run_for_s is not None:
print(
Expand All @@ -336,12 +360,13 @@ def signal_handler(_signal, _frame):
should_shutdown.wait()

print("Stopping clients...", flush=True, file=sys.stderr)
for _ in range(args.num_clients):
stop_queue.put("")
for idx in range(args.num_clients):
control_semaphore[idx].release()

print("Waiting for clients to terminate...", flush=True, file=sys.stderr)
for c in clients:
c.join()
print("Done transactions!", flush=True, file=sys.stderr)


if __name__ == "__main__":
Expand Down

0 comments on commit 4cb74d6

Please sign in to comment.