Skip to content

Commit

Permalink
Decommission node gracefully (#4117)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Nov 14, 2023
1 parent b850639 commit abea575
Show file tree
Hide file tree
Showing 22 changed files with 1,457 additions and 697 deletions.
23 changes: 21 additions & 2 deletions quickwit/quickwit-common/src/stream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use std::fmt;
use std::pin::Pin;

use futures::{stream, Stream, TryStreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tokio::sync::{mpsc, watch};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream, WatchStream};
use tracing::warn;

pub type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Unpin + 'static>>;
Expand Down Expand Up @@ -57,6 +57,15 @@ where T: Send + 'static
}
}

impl<T> ServiceStream<T>
where T: Clone + Send + Sync + 'static
{
pub fn new_watch(init: T) -> (watch::Sender<T>, Self) {
let (sender, receiver) = watch::channel(init);
(sender, receiver.into())
}
}

impl<T, E> ServiceStream<Result<T, E>>
where
T: Send + 'static,
Expand Down Expand Up @@ -104,6 +113,16 @@ where T: Send + 'static
}
}

impl<T> From<watch::Receiver<T>> for ServiceStream<T>
where T: Clone + Send + Sync + 'static
{
fn from(receiver: watch::Receiver<T>) -> Self {
Self {
inner: Box::pin(WatchStream::new(receiver)),
}
}
}

/// Adapts a server-side tonic::Streaming into a ServiceStream of `Result<T, tonic::Status>`. Once
/// an error is encountered, the stream will be closed and subsequent calls to `poll_next` will
/// return `None`.
Expand Down
10 changes: 3 additions & 7 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use quickwit_proto::control_plane::{
GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSuccess,
};
use quickwit_proto::ingest::ingester::{IngesterService, PingRequest};
use quickwit_proto::ingest::{ClosedShards, IngestV2Error, ShardState};
use quickwit_proto::ingest::{IngestV2Error, ShardIds, ShardState};
use quickwit_proto::metastore;
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient};
use quickwit_proto::types::{IndexUid, NodeId};
Expand Down Expand Up @@ -172,11 +172,7 @@ impl IngestController {
None
}

fn handle_closed_shards(
&self,
closed_shards: Vec<ClosedShards>,
model: &mut ControlPlaneModel,
) {
fn handle_closed_shards(&self, closed_shards: Vec<ShardIds>, model: &mut ControlPlaneModel) {
for closed_shard in closed_shards {
let index_uid: IndexUid = closed_shard.index_uid.into();
let source_id = closed_shard.source_id;
Expand Down Expand Up @@ -764,7 +760,7 @@ mod tests {

let request = GetOrCreateOpenShardsRequest {
subrequests: Vec::new(),
closed_shards: vec![ClosedShards {
closed_shards: vec![ShardIds {
index_uid: index_uid.clone().into(),
source_id: source_id.clone(),
shard_ids: vec![1, 2],
Expand Down
63 changes: 43 additions & 20 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ use anyhow::{bail, Context};
use async_trait::async_trait;
use itertools::Itertools;
use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_common::retry::RetryParams;
use quickwit_ingest::{
decoded_mrecords, FetchStreamError, IngesterPool, MRecord, MultiFetchStream,
};
use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint};
use quickwit_proto::ingest::ingester::{
FetchResponseV2, IngesterService, TruncateRequest, TruncateSubrequest,
FetchResponseV2, IngesterService, TruncateShardsRequest, TruncateShardsSubrequest,
};
use quickwit_proto::metastore::{
AcquireShardsRequest, AcquireShardsSubrequest, AcquireShardsSubresponse, MetastoreService,
Expand Down Expand Up @@ -237,7 +238,7 @@ impl IngestSource {
}

async fn truncate(&self, truncation_point: &[(ShardId, Position)]) {
let mut per_ingester_truncate_subrequests: HashMap<&NodeId, Vec<TruncateSubrequest>> =
let mut per_ingester_subrequests: HashMap<&NodeId, Vec<TruncateShardsSubrequest>> =
HashMap::new();

for (shard_id, to_position_exclusive) in truncation_point {
Expand All @@ -251,38 +252,60 @@ impl IngestSource {
);
continue;
};
let truncate_subrequest = TruncateSubrequest {
let truncate_shards_subrequest = TruncateShardsSubrequest {
index_uid: self.client_id.index_uid.clone().into(),
source_id: self.client_id.source_id.clone(),
shard_id: *shard_id,
to_position_inclusive: Some(to_position_exclusive.clone()),
};
if let Some(follower_id) = &shard.follower_id_opt {
per_ingester_truncate_subrequests
per_ingester_subrequests
.entry(follower_id)
.or_default()
.push(truncate_subrequest.clone());
.push(truncate_shards_subrequest.clone());
}
per_ingester_truncate_subrequests
per_ingester_subrequests
.entry(&shard.leader_id)
.or_default()
.push(truncate_subrequest);
.push(truncate_shards_subrequest);
}
for (ingester_id, truncate_subrequests) in per_ingester_truncate_subrequests {
for (ingester_id, truncate_subrequests) in per_ingester_subrequests {
let Some(mut ingester) = self.ingester_pool.get(ingester_id) else {
warn!(
"failed to truncate shard: ingester `{}` is unavailable",
ingester_id
);
continue;
};
let truncate_request = TruncateRequest {
let truncate_shards_request = TruncateShardsRequest {
ingester_id: ingester_id.clone().into(),
subrequests: truncate_subrequests,
};
let truncate_future = async move {
if let Err(error) = ingester.truncate(truncate_request).await {
warn!("failed to truncate shard(s): {error}");
let retry_params = RetryParams {
base_delay: Duration::from_secs(1),
max_delay: Duration::from_secs(30),
max_attempts: 5,
};
let mut num_attempts = 0;

while num_attempts < retry_params.max_attempts {
let Err(error) = ingester
.truncate_shards(truncate_shards_request.clone())
.await
else {
return;
};
num_attempts += 1;
let delay = retry_params.compute_delay(num_attempts);
time::sleep(delay).await;

if num_attempts == retry_params.max_attempts {
error!(
ingester_id=%truncate_shards_request.ingester_id,
"failed to truncate shard(s): {error}"
);
}
}
};
// Truncation is best-effort, so fire and forget.
Expand Down Expand Up @@ -480,7 +503,7 @@ mod tests {
use quickwit_common::ServiceStream;
use quickwit_config::{SourceConfig, SourceParams};
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::ingest::ingester::{IngesterServiceClient, TruncateResponse};
use quickwit_proto::ingest::ingester::{IngesterServiceClient, TruncateShardsResponse};
use quickwit_proto::ingest::{IngestV2Error, MRecordBatch, Shard, ShardState};
use quickwit_proto::metastore::{AcquireShardsResponse, AcquireShardsSubresponse};
use quickwit_storage::StorageResolver;
Expand Down Expand Up @@ -552,7 +575,7 @@ mod tests {
Ok(service_stream)
});
ingester_mock_0
.expect_truncate()
.expect_truncate_shards()
.once()
.returning(|request| {
assert_eq!(request.ingester_id, "test-ingester-0");
Expand All @@ -564,7 +587,7 @@ mod tests {
assert_eq!(subrequest.shard_id, 1);
assert_eq!(subrequest.to_position_inclusive, Some(11u64.into()));

let response = TruncateResponse {};
let response = TruncateShardsResponse {};
Ok(response)
});

Expand Down Expand Up @@ -792,7 +815,7 @@ mod tests {

let mut ingester_mock_0 = IngesterServiceClient::mock();
ingester_mock_0
.expect_truncate()
.expect_truncate_shards()
.once()
.returning(|request| {
assert_eq!(request.ingester_id, "test-ingester-0");
Expand All @@ -810,14 +833,14 @@ mod tests {
assert_eq!(subrequest_2.shard_id, 3);
assert_eq!(subrequest_2.to_position_inclusive, Some(33u64.into()));

Ok(TruncateResponse {})
Ok(TruncateShardsResponse {})
});
let ingester_0: IngesterServiceClient = ingester_mock_0.into();
ingester_pool.insert("test-ingester-0".into(), ingester_0.clone());

let mut ingester_mock_1 = IngesterServiceClient::mock();
ingester_mock_1
.expect_truncate()
.expect_truncate_shards()
.once()
.returning(|request| {
assert_eq!(request.ingester_id, "test-ingester-1");
Expand All @@ -831,14 +854,14 @@ mod tests {
assert_eq!(subrequest_1.shard_id, 3);
assert_eq!(subrequest_1.to_position_inclusive, Some(33u64.into()));

Ok(TruncateResponse {})
Ok(TruncateShardsResponse {})
});
let ingester_1: IngesterServiceClient = ingester_mock_1.into();
ingester_pool.insert("test-ingester-1".into(), ingester_1.clone());

let mut ingester_mock_3 = IngesterServiceClient::mock();
ingester_mock_3
.expect_truncate()
.expect_truncate_shards()
.once()
.returning(|request| {
assert_eq!(request.ingester_id, "test-ingester-3");
Expand All @@ -848,7 +871,7 @@ mod tests {
assert_eq!(subrequest_0.shard_id, 4);
assert_eq!(subrequest_0.to_position_inclusive, Some(44u64.into()));

Ok(TruncateResponse {})
Ok(TruncateShardsResponse {})
});
let ingester_3: IngesterServiceClient = ingester_mock_3.into();
ingester_pool.insert("test-ingester-3".into(), ingester_3.clone());
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/codegen/ingest_service.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 18 additions & 4 deletions quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,9 @@ mod tests {

use bytes::Bytes;
use mrecordlog::MultiRecordLog;
use quickwit_proto::ingest::ingester::IngesterServiceClient;
use quickwit_proto::ingest::ingester::{
IngesterServiceClient, IngesterStatus, ObservationMessage,
};
use quickwit_proto::types::queue_id;
use tokio::time::timeout;

Expand All @@ -534,14 +536,17 @@ mod tests {
shard_id: 1,
from_position_exclusive: None,
};
let (new_records_tx, new_records_rx) = watch::channel(());
let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default()));
let state = Arc::new(RwLock::new(IngesterState {
mrecordlog,
shards: HashMap::new(),
rate_limiters: HashMap::new(),
replication_streams: HashMap::new(),
replication_tasks: HashMap::new(),
status: IngesterStatus::Ready,
observation_tx,
}));
let (new_records_tx, new_records_rx) = watch::channel(());
let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
state.clone(),
Expand Down Expand Up @@ -700,14 +705,17 @@ mod tests {
shard_id: 1,
from_position_exclusive: Some(Position::from(0u64)),
};
let (new_records_tx, new_records_rx) = watch::channel(());
let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default()));
let state = Arc::new(RwLock::new(IngesterState {
mrecordlog,
shards: HashMap::new(),
rate_limiters: HashMap::new(),
replication_streams: HashMap::new(),
replication_tasks: HashMap::new(),
status: IngesterStatus::Ready,
observation_tx,
}));
let (new_records_tx, new_records_rx) = watch::channel(());
let (mut fetch_stream, _fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
state.clone(),
Expand Down Expand Up @@ -800,14 +808,17 @@ mod tests {
shard_id: 1,
from_position_exclusive: None,
};
let (_new_records_tx, new_records_rx) = watch::channel(());
let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default()));
let state = Arc::new(RwLock::new(IngesterState {
mrecordlog,
shards: HashMap::new(),
rate_limiters: HashMap::new(),
replication_streams: HashMap::new(),
replication_tasks: HashMap::new(),
status: IngesterStatus::Ready,
observation_tx,
}));
let (_new_records_tx, new_records_rx) = watch::channel(());
let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
state.clone(),
Expand Down Expand Up @@ -838,12 +849,15 @@ mod tests {
shard_id: 1,
from_position_exclusive: None,
};
let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default()));
let state = Arc::new(RwLock::new(IngesterState {
mrecordlog,
shards: HashMap::new(),
rate_limiters: HashMap::new(),
replication_streams: HashMap::new(),
replication_tasks: HashMap::new(),
status: IngesterStatus::Ready,
observation_tx,
}));
let (new_records_tx, new_records_rx) = watch::channel(());
let (mut fetch_stream, _fetch_task_handle) =
Expand Down
Loading

0 comments on commit abea575

Please sign in to comment.