Skip to content

Commit

Permalink
Consider ingesters returning ResourceExhausted temporarily unavailable (
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Jun 27, 2024
1 parent b54d8f2 commit de2e150
Show file tree
Hide file tree
Showing 3 changed files with 292 additions and 111 deletions.
26 changes: 24 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,17 @@ impl Ingester {
if state_guard.status() != IngesterStatus::Ready {
return Err(IngestV2Error::Internal("node decommissioned".to_string()));
}
// If the WAL disk usage is too high, we reject the request.
let wal_usage = state_guard.mrecordlog.resource_usage();
report_wal_usage(wal_usage);

let disk_used = wal_usage.disk_used_bytes as u64;

if disk_used >= self.disk_capacity.as_u64() * 95 / 100 {
return Err(IngestV2Error::Internal(
"WAL disk usage too high".to_string(),
));
}
let mut successes = Vec::with_capacity(init_shards_request.subrequests.len());
let mut failures = Vec::new();
let now = Instant::now();
Expand Down Expand Up @@ -1681,15 +1692,18 @@ mod tests {
doc_mapping_json,
}],
};
let response = ingester.init_shards(init_shards_request).await.unwrap();
let response = ingester
.init_shards(init_shards_request.clone())
.await
.unwrap();
assert_eq!(response.successes.len(), 1);
assert_eq!(response.failures.len(), 0);

let init_shard_success = &response.successes[0];
assert_eq!(init_shard_success.subrequest_id, 0);
assert_eq!(init_shard_success.shard, Some(shard));

let state_guard = ingester.state.lock_fully().await.unwrap();
let mut state_guard = ingester.state.lock_fully().await.unwrap();

let queue_id = queue_id(&index_uid, "test-source", &ShardId::from(1));
let shard = state_guard.shards.get(&queue_id).unwrap();
Expand All @@ -1700,6 +1714,14 @@ mod tests {

assert!(state_guard.rate_trackers.contains_key(&queue_id));
assert!(state_guard.mrecordlog.queue_exists(&queue_id));

state_guard.set_status(IngesterStatus::Decommissioned);
drop(state_guard);

let error = ingester.init_shards(init_shards_request).await.unwrap_err();
assert!(
matches!(error, IngestV2Error::Internal(message) if message.contains("decommissioned"))
);
}

#[tokio::test]
Expand Down
Loading

0 comments on commit de2e150

Please sign in to comment.