Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wait_for commit on ingest V2 #5359

Merged
merged 2 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions quickwit/quickwit-ingest/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, thiserror::Error, Serialize, Deserialize)]
pub enum IngestServiceError {
#[error("unimplemented: {0}")]
Unimplemented(String),
#[error("data corruption: {0}")]
Corruption(String),
#[error("index `{index_id}` already exists")]
Expand Down Expand Up @@ -143,7 +141,6 @@ impl From<IngestFailure> for IngestServiceError {
impl ServiceError for IngestServiceError {
fn error_code(&self) -> ServiceErrorCode {
match self {
Self::Unimplemented(_) => ServiceErrorCode::BadRequest,
Self::Corruption(err_msg) => {
rate_limited_error!(
limit_per_min = 6,
Expand Down Expand Up @@ -199,7 +196,6 @@ impl From<CorruptedKey> for IngestServiceError {
impl From<IngestServiceError> for tonic::Status {
fn from(error: IngestServiceError) -> tonic::Status {
let code = match &error {
IngestServiceError::Unimplemented(_) => tonic::Code::InvalidArgument,
IngestServiceError::Corruption { .. } => tonic::Code::DataLoss,
IngestServiceError::IndexAlreadyExists { .. } => tonic::Code::AlreadyExists,
IngestServiceError::IndexNotFound { .. } => tonic::Code::NotFound,
Expand Down
38 changes: 31 additions & 7 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub struct IngestRouter {
replication_factor: usize,
// Limits the number of ingest requests in-flight to some capacity in bytes.
ingest_semaphore: Arc<Semaphore>,
event_broker: EventBroker,
}

struct RouterState {
Expand All @@ -125,6 +126,7 @@ impl IngestRouter {
control_plane: ControlPlaneServiceClient,
ingester_pool: IngesterPool,
replication_factor: usize,
event_broker: EventBroker,
) -> Self {
let state = Arc::new(Mutex::new(RouterState {
debouncer: GetOrCreateOpenShardsRequestDebouncer::default(),
Expand All @@ -143,15 +145,16 @@ impl IngestRouter {
state,
replication_factor,
ingest_semaphore,
event_broker,
}
}

pub fn subscribe(&self, event_broker: &EventBroker) {
pub fn subscribe(&self) {
let weak_router_state = WeakRouterState(Arc::downgrade(&self.state));
event_broker
self.event_broker
.subscribe::<LocalShardsUpdate>(weak_router_state.clone())
.forever();
event_broker
self.event_broker
.subscribe::<ShardPositionsUpdate>(weak_router_state)
.forever();
}
Expand Down Expand Up @@ -454,12 +457,20 @@ impl IngestRouter {
max_num_attempts: usize,
) -> IngestResponseV2 {
let commit_type = ingest_request.commit_type();
let mut workbench = IngestWorkbench::new(ingest_request.subrequests, max_num_attempts);
let mut workbench = if matches!(commit_type, CommitTypeV2::Force | CommitTypeV2::WaitFor) {
IngestWorkbench::new_with_publish_tracking(
ingest_request.subrequests,
max_num_attempts,
self.event_broker.clone(),
)
} else {
IngestWorkbench::new(ingest_request.subrequests, max_num_attempts)
};
while !workbench.is_complete() {
workbench.new_attempt();
self.batch_persist(&mut workbench, commit_type).await;
}
workbench.into_ingest_result()
workbench.into_ingest_result().await
}

async fn ingest_timeout(
Expand Down Expand Up @@ -712,6 +723,7 @@ mod tests {
control_plane,
ingester_pool.clone(),
replication_factor,
EventBroker::default(),
);
let mut workbench = IngestWorkbench::default();
let (get_or_create_open_shard_request_opt, rendezvous) = router
Expand Down Expand Up @@ -948,6 +960,7 @@ mod tests {
control_plane,
ingester_pool.clone(),
replication_factor,
EventBroker::default(),
);
let ingest_subrequests = vec![
IngestSubrequest {
Expand Down Expand Up @@ -1062,6 +1075,7 @@ mod tests {
control_plane,
ingester_pool.clone(),
replication_factor,
EventBroker::default(),
);
let ingest_subrequests = vec![IngestSubrequest {
subrequest_id: 0,
Expand Down Expand Up @@ -1120,6 +1134,7 @@ mod tests {
control_plane,
ingester_pool.clone(),
replication_factor,
EventBroker::default(),
);
let ingest_subrequests = vec![IngestSubrequest {
subrequest_id: 0,
Expand Down Expand Up @@ -1149,6 +1164,7 @@ mod tests {
control_plane,
ingester_pool.clone(),
replication_factor,
EventBroker::default(),
);
let ingest_subrequests = vec![IngestSubrequest {
subrequest_id: 0,
Expand Down Expand Up @@ -1200,6 +1216,7 @@ mod tests {
control_plane,
ingester_pool.clone(),
replication_factor,
EventBroker::default(),
);
let ingest_subrequests = vec![IngestSubrequest {
subrequest_id: 0,
Expand Down Expand Up @@ -1251,6 +1268,7 @@ mod tests {
control_plane,
ingester_pool.clone(),
replication_factor,
EventBroker::default(),
);
let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0);
let mut state_guard = router.state.lock().await;
Expand Down Expand Up @@ -1337,6 +1355,7 @@ mod tests {
control_plane,
ingester_pool.clone(),
replication_factor,
EventBroker::default(),
);
let ingest_subrequests = vec![
IngestSubrequest {
Expand Down Expand Up @@ -1416,6 +1435,7 @@ mod tests {
control_plane,
ingester_pool.clone(),
replication_factor,
EventBroker::default(),
);
let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0);
let index_uid2: IndexUid = IndexUid::for_test("test-index-1", 0);
Expand Down Expand Up @@ -1653,6 +1673,7 @@ mod tests {
control_plane,
ingester_pool.clone(),
replication_factor,
EventBroker::default(),
);
let mut state_guard = router.state.lock().await;
let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0);
Expand Down Expand Up @@ -1757,14 +1778,15 @@ mod tests {
let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new());
let ingester_pool = IngesterPool::default();
let replication_factor = 1;
let event_broker = EventBroker::default();
let router = IngestRouter::new(
self_node_id,
control_plane,
ingester_pool.clone(),
replication_factor,
event_broker.clone(),
);
let event_broker = EventBroker::default();
router.subscribe(&event_broker);
router.subscribe();
let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0);

let mut state_guard = router.state.lock().await;
Expand Down Expand Up @@ -1854,6 +1876,7 @@ mod tests {
control_plane,
ingester_pool.clone(),
replication_factor,
EventBroker::default(),
);
let index_uid_0: IndexUid = IndexUid::for_test("test-index-0", 0);
let index_uid_1: IndexUid = IndexUid::for_test("test-index-1", 0);
Expand Down Expand Up @@ -1903,6 +1926,7 @@ mod tests {
control_plane,
ingester_pool.clone(),
replication_factor,
EventBroker::default(),
);
let mut state_guard = router.state.lock().await;
let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0);
Expand Down
Loading
Loading