Skip to content

Commit

Permalink
Stop deleting queue on EOF truncate (#4096)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Nov 8, 2023
1 parent 49d2a39 commit d5264f0
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl IngestController {
index_id=%index_uid.index_id(),
source_id=%source_id,
shard_ids=?PrettySample::new(&closed_shard_ids, 5),
"closed {} shards reported by router",
"closed {} shard(s) reported by router",
closed_shard_ids.len()
);
}
Expand Down
44 changes: 27 additions & 17 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::time::Duration;
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use mrecordlog::error::{CreateQueueError, DeleteQueueError, TruncateError};
use mrecordlog::error::{CreateQueueError, TruncateError};
use mrecordlog::MultiRecordLog;
use quickwit_common::tower::Pool;
use quickwit_common::ServiceStream;
Expand Down Expand Up @@ -134,6 +134,11 @@ impl Ingester {
.map(|queue_id| queue_id.to_string())
.collect();

if queue_ids.is_empty() {
return Ok(());
}
info!("closing {} shard(s)", queue_ids.len());

for queue_id in queue_ids {
append_eof_record_if_necessary(&mut state_guard.mrecordlog, &queue_id).await;

Expand Down Expand Up @@ -171,7 +176,8 @@ impl Ingester {
let primary_shard = PrimaryShard::new(follower_id.clone());
IngesterShard::Primary(primary_shard)
} else {
IngesterShard::Solo(SoloShard::default())
let solo_shard = SoloShard::new(ShardState::Open, Position::Beginning);
IngesterShard::Solo(solo_shard)
};
let entry = state.shards.entry(queue_id.clone());
Ok(entry.or_insert(shard))
Expand Down Expand Up @@ -505,28 +511,28 @@ impl IngesterService for Ingester {

for subrequest in truncate_request.subrequests {
let queue_id = subrequest.queue_id();
let to_position_inclusive = subrequest.to_position_inclusive();

if let Some(to_offset_inclusive) = to_position_inclusive.as_u64() {
let truncate_position_opt = match subrequest.to_position_inclusive() {
Position::Beginning => None,
Position::Offset(offset) => offset.as_u64(),
Position::Eof => state_guard
.mrecordlog
.current_position(&queue_id)
.ok()
.flatten(),
};
if let Some(truncate_position) = truncate_position_opt {
match state_guard
.mrecordlog
.truncate(&queue_id, to_offset_inclusive)
.truncate(&queue_id, truncate_position)
.await
{
Ok(_) | Err(TruncateError::MissingQueue(_)) => {}
Err(error) => {
error!("failed to truncate queue `{}`: {}", queue_id, error);
}
}
} else if to_position_inclusive == Position::Eof {
match state_guard.mrecordlog.delete_queue(&queue_id).await {
Ok(_) | Err(DeleteQueueError::MissingQueue(_)) => {}
Err(error) => {
error!("failed to delete queue `{}`: {}", queue_id, error);
}
}
state_guard.shards.remove(&queue_id);
};
}
}
let truncate_response = TruncateResponse {};
Ok(truncate_response)
Expand Down Expand Up @@ -1230,12 +1236,16 @@ mod tests {
ingester.truncate(truncate_request).await.unwrap();

let state_guard = ingester.state.read().await;
assert_eq!(state_guard.shards.len(), 1);
assert!(state_guard.shards.contains_key(&queue_id_01));
assert_eq!(state_guard.shards.len(), 2);

assert!(state_guard.shards.contains_key(&queue_id_01));
state_guard
.mrecordlog
.assert_records_eq(&queue_id_01, .., &[(1, "\0\0test-doc-011")]);
assert!(!state_guard.shards.contains_key(&queue_id_02));

assert!(state_guard.shards.contains_key(&queue_id_02));
state_guard
.mrecordlog
.assert_records_eq(&queue_id_02, .., &[]);
}
}
32 changes: 19 additions & 13 deletions quickwit/quickwit-ingest/src/ingest_v2/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,28 +105,19 @@ impl fmt::Debug for SoloShard {
}
}

impl Default for SoloShard {
fn default() -> Self {
let (new_records_tx, new_records_rx) = watch::channel(());
Self {
shard_state: ShardState::Open,
replication_position_inclusive: Position::Beginning,
new_records_tx,
new_records_rx,
}
}
}

impl SoloShard {
pub fn new(shard_state: ShardState, replication_position_inclusive: Position) -> Self {
let (new_records_tx, new_records_rx) = watch::channel(());
Self {
shard_state,
replication_position_inclusive,
..Default::default()
new_records_tx,
new_records_rx,
}
}
}

#[derive(Debug)]
pub(super) enum IngesterShard {
/// A primary shard hosted on a leader and replicated on a follower.
Primary(PrimaryShard),
Expand Down Expand Up @@ -192,3 +183,18 @@ impl IngesterShard {
.expect("channel should be open");
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_new_solo_shard() {
let solo_shard = SoloShard::new(ShardState::Closed, Position::from(42u64));
assert_eq!(solo_shard.shard_state, ShardState::Closed);
assert_eq!(
solo_shard.replication_position_inclusive,
Position::from(42u64)
);
}
}
14 changes: 13 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use quickwit_proto::ingest::router::{
use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result};
use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SubrequestId};
use tokio::sync::RwLock;
use tracing::{error, warn};
use tracing::{error, info, warn};

use super::ingester::PERSIST_REQUEST_TIMEOUT;
use super::shard_table::ShardTable;
Expand Down Expand Up @@ -133,6 +133,18 @@ impl IngestRouter {
get_open_shards_subrequests.push(subrequest);
}
}
if !closed_shards.is_empty() {
info!(
"reporting {} closed shard(s) to control-plane",
closed_shards.len()
)
}
if !unavailable_leaders.is_empty() {
info!(
"reporting {} unavailable leader(s) to control-plane",
unavailable_leaders.len()
);
}
GetOrCreateOpenShardsRequest {
subrequests: get_open_shards_subrequests,
closed_shards,
Expand Down

0 comments on commit d5264f0

Please sign in to comment.