Skip to content

Commit

Permalink
Record metrics for remote cache store errors (#15744)
Browse files Browse the repository at this point in the history
The TODO referenced from #12544 was even more wrong than initially suspected. All of the relevant `remote::Store` accessing methods have internal retry, so the only issue was that we were not recording errors for requests which failed to fetch process outputs.

Fixes #12544.

[ci skip-build-wheels]
  • Loading branch information
stuhood authored Jun 3, 2022
1 parent 67f93a2 commit dac89c4
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 40 deletions.
74 changes: 40 additions & 34 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 +1337,7 @@ pub async fn check_action_cache(
workunit.increment_counter(Metric::RemoteCacheRequests, 1);

let client = action_cache_client.as_ref().clone();
let action_result_response = retry_call(
let response = retry_call(
client,
move |mut client| {
let request = remexec::GetActionResultRequest {
Expand All @@ -1356,41 +1356,47 @@ pub async fn check_action_cache(
},
status_is_retryable,
)
.and_then(|action_result| async move {
let action_result = action_result.into_inner();
let response = populate_fallible_execution_result(
store.clone(),
context.run_id,
&action_result,
platform,
false,
ProcessResultSource::HitRemotely,
)
.await
.map_err(|e| Status::unavailable(format!("Output roots could not be loaded: {e}")))?;

if eager_fetch {
// NB: `ensure_local_has_file` and `ensure_local_has_recursive_directory` are internally
// retried.
let response = response.clone();
in_workunit!(
"eager_fetch_action_cache",
Level::Trace,
desc = Some("eagerly fetching after action cache hit".to_owned()),
|_workunit| async move {
future::try_join_all(vec![
store.ensure_local_has_file(response.stdout_digest).boxed(),
store.ensure_local_has_file(response.stderr_digest).boxed(),
store
.ensure_local_has_recursive_directory(response.output_directory)
.boxed(),
])
.await
}
)
.await
.map_err(|e| Status::unavailable(format!("Output content could not be loaded: {e}")))?;
}
Ok(response)
})
.await;

match action_result_response {
Ok(action_result) => {
let action_result = action_result.into_inner();
let response = populate_fallible_execution_result(
store.clone(),
context.run_id,
&action_result,
platform,
false,
ProcessResultSource::HitRemotely,
)
.await?;
// TODO: This should move inside the retry_call above, both in order to be retried, and
// to ensure that we increment a miss if we fail to eagerly fetch.
if eager_fetch {
let response = response.clone();
in_workunit!(
"eager_fetch_action_cache",
Level::Trace,
desc = Some("eagerly fetching after action cache hit".to_owned()),
|_workunit| async move {
future::try_join_all(vec![
store.ensure_local_has_file(response.stdout_digest).boxed(),
store.ensure_local_has_file(response.stderr_digest).boxed(),
store
.ensure_local_has_recursive_directory(response.output_directory)
.boxed(),
])
.await
}
)
.await?;
}
match response {
Ok(response) => {
workunit.increment_counter(Metric::RemoteCacheRequestsCached, 1);
Ok(Some(response))
}
Expand Down
56 changes: 50 additions & 6 deletions src/rust/engine/process_execution/src/remote_cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ fn create_cached_runner(

async fn create_process(store: &Store) -> (Process, Digest) {
let process = Process::new(vec![
testutil::path::find_bash(),
"echo -n hello world".to_string(),
"this process will not execute: see MockLocalCommandRunner".to_string(),
]);
let (action, command, _exec_request) =
make_execute_request(&process, ProcessMetadata::default()).unwrap();
Expand Down Expand Up @@ -202,24 +201,69 @@ async fn cache_read_success() {
assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
}

/// If the cache has any issues during reads, we should gracefully fallback to the local runner.
/// If the cache has any issues during reads from the action cache, we should gracefully fallback
/// to the local runner.
#[tokio::test]
async fn cache_read_skipped_on_errors() {
let (_, mut workunit) = WorkunitStore::setup_for_tests();
async fn cache_read_skipped_on_action_cache_errors() {
let (workunit_store, mut workunit) = WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1, 100);
let (local_runner, local_runner_call_counter) = create_local_runner(1, 500);
let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false);

let (process, action_digest) = create_process(&store_setup.store).await;
insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);
action_cache.always_errors.store(true, Ordering::SeqCst);

assert_eq!(
workunit_store.get_metrics().get("remote_cache_read_errors"),
None
);
assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
let remote_result = cache_runner
.run(Context::default(), &mut workunit, process.clone().into())
.await
.unwrap();
assert_eq!(remote_result.exit_code, 1);
assert_eq!(
workunit_store.get_metrics().get("remote_cache_read_errors"),
Some(&1)
);
assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1);
}

/// If the cache has any issues during reads from the store during eager_fetch, we should gracefully
/// fallback to the local runner.
#[tokio::test]
async fn cache_read_skipped_on_store_errors() {
let (workunit_store, mut workunit) = WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1, 500);
let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, true);

// Claim that the process has a non-empty and not-persisted stdout digest.
let (process, action_digest) = create_process(&store_setup.store).await;
insert_into_action_cache(
&action_cache,
&action_digest,
0,
Digest::of_bytes("pigs flying".as_bytes()),
EMPTY_DIGEST,
);

assert_eq!(
workunit_store.get_metrics().get("remote_cache_read_errors"),
None
);
assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
let remote_result = cache_runner
.run(Context::default(), &mut workunit, process.clone().into())
.await
.unwrap();
assert_eq!(remote_result.exit_code, 1);
assert_eq!(
workunit_store.get_metrics().get("remote_cache_read_errors"),
Some(&1)
);
assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1);
}

Expand Down

0 comments on commit dac89c4

Please sign in to comment.