Skip to content

Commit

Permalink
fix(torture): use the right rr
Browse files Browse the repository at this point in the history
This PR fixes the issue when the workload might error out because of the
Broken Pipe error. This is unmasked by the previous PR in the stack.

The error is that the rr is not reloaded for sampling/checking the
snapshot applied/not applied after a crash. Instead the rr of the dead
process was used.

This fix comes with a refactor where we store the rr in a field and keep
it synchronized with the agent. This seems to be an easier task than
trying to keep from not reloading it at the right time.
  • Loading branch information
pepyakin committed Feb 21, 2025
1 parent dd6d283 commit 72ba3da
Showing 1 changed file with 47 additions and 49 deletions.
96 changes: 47 additions & 49 deletions torture/src/supervisor/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ pub struct Workload {
///
/// Initially `None`.
agent: Option<SpawnedAgentController>,
/// The communication channel to the agent.
///
/// Only `Some` if the agent is some.
///
/// This must be the same agent as the one in `self.agent`.
rr: Option<comms::RequestResponse>,
/// How many iterations the workload should perform.
iterations: usize,
/// The current state of the workload.
Expand Down Expand Up @@ -329,6 +335,7 @@ impl Workload {
workdir,
trick_handle,
agent: None,
rr: None,
iterations: workload_params.iterations,
state,
workload_id,
Expand Down Expand Up @@ -377,16 +384,14 @@ impl Workload {
}

async fn run_iteration(&mut self) -> Result<()> {
let agent = self.agent.as_ref().unwrap();
let rr = agent.rr().clone();
trace!("run_iteration");

if self.scheduled_rollback.as_ref().map_or(false, |(r, _)| {
r.sync_seqn == self.state.committed.sync_seqn
}) {
// UNWRAP: scheduled_rollback has just be checked to be `Some`
let (scheduled_rollback, should_crash) = self.scheduled_rollback.take().unwrap();
self.exercise_rollback(&rr, scheduled_rollback, should_crash)
self.exercise_rollback(scheduled_rollback, should_crash)
.await?;
return Ok(());
}
Expand Down Expand Up @@ -420,17 +425,13 @@ impl Workload {
}

let should_crash = self.state.rng.gen_bool(self.state.biases.commit_crash);
self.exercise_commit(&rr, should_crash).await?;
self.exercise_commit(should_crash).await?;

Ok(())
}

/// Commit a changeset.
async fn exercise_commit(
&mut self,
rr: &comms::RequestResponse,
should_crash: bool,
) -> anyhow::Result<()> {
async fn exercise_commit(&mut self, should_crash: bool) -> anyhow::Result<()> {
let should_crash = if should_crash {
trace!("exercising commit crash");
Some(self.get_crash_delay())
Expand All @@ -441,7 +442,8 @@ impl Workload {

// Generate a changeset and the associated snapshot
let (snapshot, changeset) = self.state.gen_commit();
let commit_response = rr
let commit_response = self
.rr()
.send_request(crate::message::ToAgent::Commit(
crate::message::CommitPayload {
changeset: changeset.clone(),
Expand All @@ -461,11 +463,9 @@ impl Workload {
// However the agent will be respawned, so it will just
// make sure the changeset was correctly applied or reverted.
self.spawn_new_agent().await?;
let agent = self.agent.as_ref().unwrap();
let rr = agent.rr().clone();

// Sample the agent to make sure the changeset was correctly applied or reverted.
let agent_sync_seqn = rr.send_query_sync_seqn().await?;
let agent_sync_seqn = self.rr().send_query_sync_seqn().await?;
if snapshot.sync_seqn == agent_sync_seqn {
true
} else if self.state.committed.sync_seqn == agent_sync_seqn {
Expand All @@ -480,15 +480,15 @@ impl Workload {

// Keep track of ENOSPC because the flag could be erased during the agent's respawn
let was_enospc_enabled = self.enabled_enospc;
self.ensure_outcome_validity(rr, &outcome).await?;
self.ensure_outcome_validity(&outcome).await?;

if matches!(outcome, crate::message::Outcome::Success) {
self.n_successfull_commit += 1;
self.tot_commit_time += elapsed;
}

// Sample the agent to make sure the changeset was correctly applied or reverted.
let agent_sync_seqn = rr.send_query_sync_seqn().await?;
let agent_sync_seqn = self.rr().send_query_sync_seqn().await?;
if was_enospc_enabled {
false
} else if snapshot.sync_seqn == agent_sync_seqn {
Expand All @@ -499,10 +499,10 @@ impl Workload {
};

if is_applied {
self.ensure_changeset_applied(&rr, &changeset).await?;
self.ensure_changeset_applied(&changeset).await?;
self.state.commit(snapshot);
} else {
self.ensure_changeset_reverted(&rr, &changeset).await?;
self.ensure_changeset_reverted(&changeset).await?;
}

Ok(())
Expand All @@ -522,11 +522,7 @@ impl Workload {
Duration::from_millis(crash_delay_millis)
}

async fn ensure_outcome_validity(
&mut self,
rr: &comms::RequestResponse,
outcome: &crate::message::Outcome,
) -> Result<()> {
async fn ensure_outcome_validity(&mut self, outcome: &crate::message::Outcome) -> Result<()> {
match outcome {
crate::message::Outcome::Success => {
// The operation was successful.
Expand All @@ -542,7 +538,7 @@ impl Workload {
// At this point, we expect the agent will have its NOMT instance poisoned.
//
// But we still should be able to make the sync_seqn and the kv queries.
let agent_sync_seqn = rr.send_query_sync_seqn().await?;
let agent_sync_seqn = self.rr().send_query_sync_seqn().await?;
if self.state.committed.sync_seqn != agent_sync_seqn {
return Err(anyhow::anyhow!(
"Unexpected sync_seqn after failed operation with ENOSPC"
Expand All @@ -554,7 +550,7 @@ impl Workload {
self.ensure_agent_open_db().await?;

// Verify that the sync_seqn is still the same for the second time.
let agent_sync_seqn = rr.send_query_sync_seqn().await?;
let agent_sync_seqn = self.rr().send_query_sync_seqn().await?;
if self.state.committed.sync_seqn != agent_sync_seqn {
return Err(anyhow::anyhow!(
"Unexpected sync_seqn after failed operation with ENOSPC"
Expand Down Expand Up @@ -603,7 +599,6 @@ impl Workload {

async fn exercise_rollback(
&mut self,
rr: &comms::RequestResponse,
scheduled_rollback: ScheduledRollback,
should_crash: Option<Duration>,
) -> anyhow::Result<()> {
Expand All @@ -620,7 +615,8 @@ impl Workload {
n_commits_to_rollback
);

let rollback_outcome = rr
let rollback_outcome = self
.rr()
.send_request(crate::message::ToAgent::Rollback(
crate::message::RollbackPayload {
n_commits: n_commits_to_rollback,
Expand All @@ -642,10 +638,8 @@ impl Workload {
// However the agent will be respawned, so it will just
// make sure the rollback was correctly applied or not.
self.spawn_new_agent().await?;
let agent = self.agent.as_ref().unwrap();
let rr = agent.rr().clone();

let agent_sync_seqn = rr.send_query_sync_seqn().await?;
let agent_sync_seqn = self.rr().send_query_sync_seqn().await?;
let last_sync_seqn = self.state.committed.sync_seqn;
if agent_sync_seqn == last_sync_seqn + 1 {
// sync_seqn has increased, so the rollback is expected to be applied correctly
Expand All @@ -667,9 +661,9 @@ impl Workload {
};

let was_enospc_enabled = self.enabled_enospc;
self.ensure_outcome_validity(rr, &outcome).await?;
self.ensure_outcome_validity(&outcome).await?;

let agent_sync_seqn = rr.send_query_sync_seqn().await?;
let agent_sync_seqn = self.rr().send_query_sync_seqn().await?;
let last_sync_seqn = self.state.committed.sync_seqn;
if agent_sync_seqn == last_sync_seqn + 1 {
// sync_seqn has increased, so the rollback is expected to be applied correctly
Expand All @@ -690,7 +684,7 @@ impl Workload {
}
}

self.ensure_snapshot_validity(&rr).await?;
self.ensure_snapshot_validity().await?;
Ok(())
}

Expand All @@ -703,6 +697,7 @@ impl Workload {
let agent = self.agent.as_mut().unwrap();
let agent_died_or_timeout = timeout(TOLERANCE, agent.died()).await;
self.agent.take().unwrap().teardown().await;
self.rr = None;
if let Err(Elapsed { .. }) = agent_died_or_timeout {
return Err(anyhow::anyhow!("agent did not die"));
}
Expand All @@ -712,7 +707,6 @@ impl Workload {

async fn ensure_changeset_applied(
&self,
rr: &comms::RequestResponse,
changeset: &Vec<KeyValueChange>,
) -> anyhow::Result<()> {
if !self.ensure_changeset {
Expand All @@ -722,11 +716,13 @@ impl Workload {
for change in changeset {
match change {
KeyValueChange::Insert(key, value)
if rr.send_request_query(*key).await?.as_ref() != Some(&value) =>
if self.rr().send_request_query(*key).await?.as_ref() != Some(&value) =>
{
return Err(anyhow::anyhow!("Inserted item not present after commit"));
}
KeyValueChange::Delete(key) if rr.send_request_query(*key).await?.is_some() => {
KeyValueChange::Delete(key)
if self.rr().send_request_query(*key).await?.is_some() =>
{
return Err(anyhow::anyhow!("Deleted item still present after commit"));
}
_ => (),
Expand All @@ -737,7 +733,6 @@ impl Workload {

async fn ensure_changeset_reverted(
&self,
rr: &comms::RequestResponse,
changeset: &Vec<KeyValueChange>,
) -> anyhow::Result<()> {
if !self.ensure_changeset {
Expand All @@ -750,7 +745,7 @@ impl Workload {
match change {
KeyValueChange::Insert(key, _value) => {
// The current value must be equal to the previous one.
let current_value = rr.send_request_query(*key).await?;
let current_value = self.rr().send_request_query(*key).await?;
match self.state.committed.state.get(key) {
None | Some(None) if current_value.is_some() => {
return Err(anyhow::anyhow!("New inserted item should not be present"));
Expand All @@ -767,7 +762,7 @@ impl Workload {
// UNWRAP: Non existing keys are not deleted.
let prev_value = self.state.committed.state.get(key).unwrap();
assert!(prev_value.is_some());
if rr.send_request_query(*key).await?.as_ref() != prev_value.as_ref() {
if self.rr().send_request_query(*key).await?.as_ref() != prev_value.as_ref() {
return Err(anyhow::anyhow!(
"Deleted item should be reverted to previous state"
));
Expand All @@ -778,16 +773,13 @@ impl Workload {
Ok(())
}

async fn ensure_snapshot_validity(
&mut self,
rr: &comms::RequestResponse,
) -> anyhow::Result<()> {
async fn ensure_snapshot_validity(&mut self) -> anyhow::Result<()> {
if !self.ensure_snapshot && !self.sample_snapshot {
return Ok(());
}

let expected_sync_seqn = self.state.committed.sync_seqn;
let sync_seqn = rr.send_query_sync_seqn().await?;
let sync_seqn = self.rr().send_query_sync_seqn().await?;
if expected_sync_seqn != sync_seqn {
return Err(anyhow::anyhow!(
"Unexpected sync_seqn while ensuring snapshot validity, expected: {}, found: {}",
Expand All @@ -797,15 +789,15 @@ impl Workload {
}

if self.ensure_snapshot {
return self.check_entire_snapshot(rr).await;
return self.check_entire_snapshot().await;
}

self.check_sampled_snapshot(rr).await
self.check_sampled_snapshot().await
}

async fn check_entire_snapshot(&self, rr: &comms::RequestResponse) -> anyhow::Result<()> {
async fn check_entire_snapshot(&self) -> anyhow::Result<()> {
for (i, (key, expected_value)) in (self.state.committed.state.iter()).enumerate() {
let value = rr.send_request_query(*key).await?;
let value = self.rr().send_request_query(*key).await?;
if &value != expected_value {
return Err(anyhow::anyhow!(
"Wrong {}ith key in snapshot,\n key: {:?},\n expected value: {:?},\n found value: {:?}",
Expand All @@ -819,7 +811,7 @@ impl Workload {
Ok(())
}

async fn check_sampled_snapshot(&mut self, rr: &comms::RequestResponse) -> anyhow::Result<()> {
async fn check_sampled_snapshot(&mut self) -> anyhow::Result<()> {
let mut key = [0; 32];
// The amount of items randomly sampled is equal to 5% of the entire state size.
let sample_check_size = (self.state.committed.state.len() as f64 * 0.05) as usize;
Expand All @@ -833,7 +825,7 @@ impl Workload {
}
};

let value = rr.send_request_query(*key).await?;
let value = self.rr().send_request_query(*key).await?;
if value.as_ref().map_or(true, |v| v != expected_value) {
return Err(anyhow::anyhow!(
"Wrong key in snapshot,\n key: {:?},\n expected value: {:?},\n found value: {:?}",
Expand All @@ -849,6 +841,7 @@ impl Workload {
async fn spawn_new_agent(&mut self) -> anyhow::Result<()> {
assert!(self.agent.is_none());
controller::spawn_agent_into(&mut self.agent).await?;
self.rr = Some(self.agent.as_ref().unwrap().rr().clone());
let workdir = self.workdir.path().display().to_string();
let outcome = self
.agent
Expand Down Expand Up @@ -914,6 +907,10 @@ impl Workload {
Ok(())
}

fn rr(&self) -> &comms::RequestResponse {
self.rr.as_ref().expect("no agent")
}

/// Collects the stack trace of the agent and prints it if available.
async fn collect_and_display_backtrace(&self) {
if let Some(agent) = self.agent.as_ref() {
Expand All @@ -935,6 +932,7 @@ impl Workload {
async fn teardown(&mut self) {
if let Some(agent) = self.agent.take() {
agent.teardown().await;
let _ = self.rr.take();
}
if let Some(trick_handle) = self.trick_handle.take() {
tokio::task::block_in_place(move || {
Expand Down

0 comments on commit 72ba3da

Please sign in to comment.