diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index a0a3bea8598..1fadf51bb17 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -3797,7 +3797,7 @@ dependencies = [ [[package]] name = "mrecordlog" version = "0.4.0" -source = "git+https://github.com/quickwit-oss/mrecordlog?rev=0d1a7aa#0d1a7aa2bdf11aec832919503cefaf282d70e0d8" +source = "git+https://github.com/quickwit-oss/mrecordlog?rev=ebee0fd#ebee0fdb6556a18f7cb73eca32e40bf44b0c4c6c" dependencies = [ "async-trait", "bytes", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 13f78cc72dd..c559f5fde78 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -50,7 +50,10 @@ bytes = { version = "1", features = ["serde"] } bytesize = { version = "1.3.0", features = ["serde"] } bytestring = "1.3.0" chitchat = "0.7" -chrono = { version = "0.4.23", default-features = false, features = ["clock", "std"] } +chrono = { version = "0.4.23", default-features = false, features = [ + "clock", + "std", +] } clap = { version = "4.4.1", features = ["env", "string"] } colored = "2.0.0" console-subscriber = "0.1.8" @@ -96,12 +99,19 @@ libz-sys = "1.1.8" lru = "0.12" lindera-core = "0.27.0" lindera-dictionary = "0.27.0" -lindera-tokenizer = { version = "0.27.0", features = ["ipadic", "ipadic-compress", "cc-cedict", "cc-cedict-compress", "ko-dic", "ko-dic-compress"] } +lindera-tokenizer = { version = "0.27.0", features = [ + "cc-cedict-compress", + "cc-cedict", + "ipadic-compress", + "ipadic", + "ko-dic-compress", + "ko-dic", +] } matches = "0.1.9" md5 = "0.7" mime_guess = "2.0.4" mockall = "0.11" -mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "0d1a7aa" } +mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "ebee0fd" } new_string_template = "1.4.0" nom = "7.1.3" num_cpus = "1" @@ -113,7 +123,9 @@ opentelemetry = { version = "0.19", features = ["rt-tokio"] } opentelemetry-otlp = "0.12.0" pin-project = "1.1.0" pnet = { version = "0.33.0", features = ["std"] } -postcard = { version = "1.0.4", features = ["use-std"], default-features = false} +postcard = { version = "1.0.4", features = [ + "use-std", +], default-features = false } predicates = "3" prettyplease = "0.2.0" proc-macro2 = "1.0.50" @@ -124,7 +136,11 @@ prost = { version = "0.11.6", default-features = false, features = [ ] } prost-build = "0.11.6" prost-types = "0.11.6" -pulsar = { git = "https://github.com/quickwit-oss/pulsar-rs.git", rev = "f9eff04", default-features = false, features = ["compression", "tokio-runtime", "auth-oauth2"] } +pulsar = { git = "https://github.com/quickwit-oss/pulsar-rs.git", rev = "f9eff04", default-features = false, features = [ + "auth-oauth2", + "compression", + "tokio-runtime", +] } quote = "1.0.23" rand = "0.8" rand_distr = "0.4" @@ -143,7 +159,10 @@ reqwest = { version = "0.11", default-features = false, features = [ ] } rust-embed = "6.8.1" sea-query = { version = "0" } -sea-query-binder = { version = "0", features = ["sqlx-postgres", "runtime-tokio-rustls",] } +sea-query-binder = { version = "0", features = [ + "runtime-tokio-rustls", + "sqlx-postgres", +] } # ^1.0.184 due to serde-rs/serde#2538 serde = { version = "1.0.184", features = ["derive", "rc"] } serde_json = "1.0" @@ -152,12 +171,12 @@ serde_with = "3.4.0" serde_yaml = "0.9" siphasher = "0.3" sqlx = { version = "0.7", features = [ - "runtime-tokio-rustls", - "postgres", "migrate", + "postgres", + "runtime-tokio-rustls", "time", ] } -syn = { version = "2.0.11", features = [ "extra-traits", "full", "parsing" ]} +syn = { version = "2.0.11", features = ["extra-traits", "full", "parsing"] } sync_wrapper = "0.1.2" tabled = { version = "0.8", features = ["color"] } tempfile = "3" @@ -173,14 +192,20 @@ tokio-util = { version = "0.7", features = ["full"] } toml = "0.7.6" tonic = { version = "0.9.0", features = ["gzip"] } tonic-build = "0.9.0" -tower = { version = "0.4.13", features = ["balance", "buffer", "load", "retry", "util"] } +tower = { version = "0.4.13", features = [ + "balance", + "buffer", + "load", + "retry", + "util", +] } tower-http = { version = "0.4.0", features = ["compression-gzip", "cors"] } tracing = "0.1.37" tracing-opentelemetry = "0.19.0" tracing-subscriber = { version = "0.3.16", features = [ - "time", - "std", "env-filter", + "std", + "time", ] } ttl_cache = "0.5" typetag = "0.2" @@ -199,7 +224,9 @@ whichlang = { git = "https://github.com/quickwit-oss/whichlang", rev = "fe406416 wiremock = "0.5" aws-config = "0.55.0" -aws-credential-types = { version = "0.55.0", features = ["hardcoded-credentials"] } +aws-credential-types = { version = "0.55.0", features = [ + "hardcoded-credentials", +] } aws-sdk-kinesis = "0.28.0" aws-sdk-s3 = "0.28.0" aws-smithy-async = "0.55.0" @@ -209,8 +236,12 @@ aws-smithy-types = "0.55.0" aws-types = "0.55.0" azure_core = { version = "0.13.0", features = ["enable_reqwest_rustls"] } -azure_storage = { version = "0.13.0", default-features = false, features = ["enable_reqwest_rustls"] } -azure_storage_blobs = { version = "0.13.0", default-features = false, features = ["enable_reqwest_rustls"] } +azure_storage = { version = "0.13.0", default-features = false, features = [ + "enable_reqwest_rustls", +] } +azure_storage_blobs = { version = "0.13.0", default-features = false, features = [ + "enable_reqwest_rustls", +] } quickwit-actors = { version = "0.6.3", path = "./quickwit-actors" } quickwit-aws = { version = "0.6.3", path = "./quickwit-aws" } @@ -242,10 +273,10 @@ quickwit-storage = { version = "0.6.3", path = "./quickwit-storage" } quickwit-telemetry = { version = "0.6.3", path = "./quickwit-telemetry" } tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "927b443", default-features = false, features = [ - "mmap", "lz4-compression", - "zstd-compression", + "mmap", "quickwit", + "zstd-compression", ] } # This is actually not used directly the goal is to fix the version diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 53bc51d908b..c381d396b14 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -188,7 +188,7 @@ pub struct IngestApiConfig { pub max_queue_memory_usage: ByteSize, pub max_queue_disk_usage: ByteSize, pub replication_factor: usize, - pub content_length_limit: u64, + pub content_length_limit: ByteSize, } impl Default for IngestApiConfig { @@ -197,7 +197,7 @@ impl Default for IngestApiConfig { max_queue_memory_usage: ByteSize::gib(2), // TODO maybe we want more? max_queue_disk_usage: ByteSize::gib(4), // TODO maybe we want more? replication_factor: 1, - content_length_limit: ByteSize::mib(10).as_u64(), + content_length_limit: ByteSize::mib(10), } } } diff --git a/quickwit/quickwit-ingest/Cargo.toml b/quickwit/quickwit-ingest/Cargo.toml index 48d95f92664..99f2c3438d9 100644 --- a/quickwit/quickwit-ingest/Cargo.toml +++ b/quickwit/quickwit-ingest/Cargo.toml @@ -9,6 +9,7 @@ description = "Quickwit is a cost-efficient search engine." anyhow = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } +bytesize = { workspace = true } dyn-clone = { workspace = true } flume = { workspace = true } futures = { workspace = true } @@ -35,7 +36,6 @@ quickwit-config = { workspace = true } quickwit-proto = { workspace = true } [dev-dependencies] -bytesize = { workspace = true } itertools = { workspace = true } mockall = { workspace = true } rand = { workspace = true } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index a92755d3c2f..5d2465a5f01 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use bytesize::ByteSize; use futures::stream::FuturesUnordered; use futures::StreamExt; use mrecordlog::error::{CreateQueueError, TruncateError}; @@ -37,17 +38,17 @@ use quickwit_proto::ingest::ingester::{ IngesterService, IngesterServiceClient, IngesterServiceStream, OpenFetchStreamRequest, OpenReplicationStreamRequest, OpenReplicationStreamResponse, PersistFailure, PersistFailureReason, PersistRequest, PersistResponse, PersistSuccess, PingRequest, - PingResponse, ReplicateRequest, ReplicateSubrequest, SynReplicationMessage, TruncateRequest, - TruncateResponse, + PingResponse, ReplicateFailureReason, ReplicateRequest, ReplicateSubrequest, + SynReplicationMessage, TruncateRequest, TruncateResponse, }; use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, ShardState}; use quickwit_proto::types::{NodeId, Position, QueueId}; use tokio::sync::RwLock; -use tracing::{error, info}; +use tracing::{error, info, warn}; use super::fetch::FetchTask; use super::models::{IngesterShard, PrimaryShard}; -use super::mrecord::{is_eof_mrecord, MRecord}; +use super::mrecordlog_utils::{append_eof_record_if_necessary, check_enough_capacity}; use super::replication::{ ReplicationStreamTask, ReplicationStreamTaskHandle, ReplicationTask, ReplicationTaskHandle, SYN_REPLICATION_STREAM_CAPACITY, @@ -55,7 +56,7 @@ use super::replication::{ use super::IngesterPool; use crate::ingest_v2::models::SoloShard; use crate::metrics::INGEST_METRICS; -use crate::{FollowerId, LeaderId}; +use crate::{estimate_size, FollowerId, LeaderId, MRecord}; /// Duration after which persist requests time out with /// [`quickwit_proto::ingest::IngestV2Error::Timeout`]. @@ -70,6 +71,8 @@ pub struct Ingester { self_node_id: NodeId, ingester_pool: IngesterPool, state: Arc>, + disk_capacity: ByteSize, + memory_capacity: ByteSize, replication_factor: usize, } @@ -95,6 +98,8 @@ impl Ingester { self_node_id: NodeId, ingester_pool: Pool, wal_dir_path: &Path, + disk_capacity: ByteSize, + memory_capacity: ByteSize, replication_factor: usize, ) -> IngestV2Result { let mrecordlog = MultiRecordLog::open_with_prefs( @@ -114,6 +119,8 @@ impl Ingester { self_node_id, ingester_pool, state: Arc::new(RwLock::new(inner)), + disk_capacity, + memory_capacity, replication_factor, }; info!( @@ -245,8 +252,6 @@ impl IngesterService for Ingester { self.self_node_id, persist_request.leader_id, ))); } - let mut state_guard = self.state.write().await; - let mut persist_successes = Vec::with_capacity(persist_request.subrequests.len()); let mut persist_failures = Vec::new(); let mut replicate_subrequests: HashMap> = HashMap::new(); @@ -255,6 +260,8 @@ impl IngesterService for Ingester { let force_commit = commit_type == CommitTypeV2::Force; let leader_id: NodeId = persist_request.leader_id.into(); + let mut state_guard = self.state.write().await; + for subrequest in persist_request.subrequests { let queue_id = subrequest.queue_id(); let follower_id_opt: Option = subrequest.follower_id.map(Into::into); @@ -283,10 +290,44 @@ impl IngesterService for Ingester { persist_failures.push(persist_failure); continue; } - let doc_batch = subrequest - .doc_batch - .expect("router should not send empty persist subrequests"); + let doc_batch = match subrequest.doc_batch { + Some(doc_batch) if !doc_batch.is_empty() => doc_batch, + _ => { + warn!("received empty persist request"); + + let persist_success = PersistSuccess { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + replication_position_inclusive: Some( + shard.replication_position_inclusive(), + ), + }; + persist_successes.push(persist_success); + continue; + } + }; + let requested_capacity = estimate_size(&doc_batch); + if let Err(error) = check_enough_capacity( + &state_guard.mrecordlog, + self.disk_capacity, + self.memory_capacity, + requested_capacity, + ) { + warn!("failed to persist records: {error}"); + + let persist_failure = PersistFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: PersistFailureReason::ResourceExhausted as i32, + }; + persist_failures.push(persist_failure); + continue; + } let current_position_inclusive: Position = if force_commit { let encoded_mrecords = doc_batch .docs() @@ -396,12 +437,31 @@ impl IngesterService for Ingester { }; persist_successes.push(persist_success); } + for replicate_failure in replicate_response.failures { + // TODO: If the replica shard is closed, close the primary shard if it is not + // already. + let persist_failure_reason = match replicate_failure.reason() { + ReplicateFailureReason::Unspecified => PersistFailureReason::Unspecified, + ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed, + ReplicateFailureReason::ResourceExhausted => { + PersistFailureReason::ResourceExhausted + } + }; + let persist_failure = PersistFailure { + subrequest_id: replicate_failure.subrequest_id, + index_uid: replicate_failure.index_uid, + source_id: replicate_failure.source_id, + shard_id: replicate_failure.shard_id, + reason: persist_failure_reason as i32, + }; + persist_failures.push(persist_failure); + } } let leader_id = self.self_node_id.to_string(); let persist_response = PersistResponse { leader_id, successes: persist_successes, - failures: Vec::new(), // TODO + failures: persist_failures, }; Ok(persist_response) } @@ -446,6 +506,8 @@ impl IngesterService for Ingester { self.state.clone(), syn_replication_stream, ack_replication_stream_tx, + self.disk_capacity, + self.memory_capacity, ); entry.insert(replication_task_handle); Ok(ack_replication_stream) @@ -513,7 +575,7 @@ impl IngesterService for Ingester { Position::Offset(offset) => offset.as_u64(), Position::Eof => state_guard .mrecordlog - .current_position(&queue_id) + .last_position(&queue_id) .ok() .flatten(), }; @@ -525,7 +587,7 @@ impl IngesterService for Ingester { { Ok(_) | Err(TruncateError::MissingQueue(_)) => {} Err(error) => { - error!("failed to truncate queue `{}`: {}", queue_id, error); + error!("failed to truncate queue `{queue_id}`: {error}"); } } } @@ -558,32 +620,6 @@ impl IngesterService for Ingester { } } -/// Appends an EOF record to the queue if the it is empty or the last record is not an EOF -/// record. -/// -/// # Panics -/// -/// Panics if the queue does not exist. -async fn append_eof_record_if_necessary(mrecordlog: &mut MultiRecordLog, queue_id: &QueueId) { - let mut should_append_eof_record = true; - - if let Some(current_position) = mrecordlog.current_position(queue_id).expect("TODO") { - let mrecords = mrecordlog - .range(queue_id, current_position..) - .expect("TODO"); - - if let Some((_, last_mecord)) = mrecords.last() { - should_append_eof_record = !is_eof_mrecord(&last_mecord); - } - } - if should_append_eof_record { - mrecordlog - .append_record(queue_id, None, MRecord::Eof.encode()) - .await - .expect("TODO"); - } -} - #[cfg(test)] mod tests { use std::net::SocketAddr; @@ -599,6 +635,7 @@ mod tests { use super::*; use crate::ingest_v2::fetch::FetchRange; + use crate::ingest_v2::mrecord::is_eof_mrecord; use crate::ingest_v2::test_utils::{IngesterShardTestExt, MultiRecordLogTestExt}; #[tokio::test] @@ -607,11 +644,15 @@ mod tests { let self_node_id: NodeId = "test-ingester-0".into(); let ingester_pool = IngesterPool::default(); let wal_dir_path = tempdir.path(); + let disk_capacity = ByteSize::mb(256); + let memory_capacity = ByteSize::mb(1); let replication_factor = 2; let mut ingester = Ingester::try_new( self_node_id.clone(), ingester_pool, wal_dir_path, + disk_capacity, + memory_capacity, replication_factor, ) .await @@ -708,11 +749,15 @@ mod tests { let self_node_id: NodeId = "test-ingester-0".into(); let ingester_pool = IngesterPool::default(); let wal_dir_path = tempdir.path(); + let disk_capacity = ByteSize::mb(256); + let memory_capacity = ByteSize::mb(1); let replication_factor = 1; let mut ingester = Ingester::try_new( self_node_id.clone(), ingester_pool, wal_dir_path, + disk_capacity, + memory_capacity, replication_factor, ) .await @@ -740,7 +785,30 @@ mod tests { }, ], }; - ingester.persist(persist_request).await.unwrap(); + let persist_response = ingester.persist(persist_request).await.unwrap(); + assert_eq!(persist_response.leader_id, "test-ingester-0"); + assert_eq!(persist_response.successes.len(), 2); + assert_eq!(persist_response.failures.len(), 0); + + let persist_success_0 = &persist_response.successes[0]; + assert_eq!(persist_success_0.subrequest_id, 0); + assert_eq!(persist_success_0.index_uid, "test-index:0"); + assert_eq!(persist_success_0.source_id, "test-source"); + assert_eq!(persist_success_0.shard_id, 1); + assert_eq!( + persist_success_0.replication_position_inclusive, + Some(Position::from(1u64)) + ); + + let persist_success_1 = &persist_response.successes[1]; + assert_eq!(persist_success_1.subrequest_id, 1); + assert_eq!(persist_success_1.index_uid, "test-index:1"); + assert_eq!(persist_success_1.source_id, "test-source"); + assert_eq!(persist_success_1.shard_id, 1); + assert_eq!( + persist_success_1.replication_position_inclusive, + Some(Position::from(2u64)) + ); let state_guard = ingester.state.read().await; assert_eq!(state_guard.shards.len(), 2); @@ -780,11 +848,15 @@ mod tests { let self_node_id: NodeId = "test-follower".into(); let ingester_pool = IngesterPool::default(); let wal_dir_path = tempdir.path(); + let disk_capacity = ByteSize::mb(256); + let memory_capacity = ByteSize::mb(1); let replication_factor = 1; let mut ingester = Ingester::try_new( self_node_id.clone(), ingester_pool, wal_dir_path, + disk_capacity, + memory_capacity, replication_factor, ) .await @@ -821,11 +893,17 @@ mod tests { let leader_id: NodeId = "test-leader".into(); let ingester_pool = IngesterPool::default(); let wal_dir_path = tempdir.path(); + + let disk_capacity = ByteSize::mb(256); + let memory_capacity = ByteSize::mb(1); let replication_factor = 2; + let mut leader = Ingester::try_new( leader_id.clone(), ingester_pool.clone(), wal_dir_path, + disk_capacity, + memory_capacity, replication_factor, ) .await @@ -834,11 +912,13 @@ mod tests { let tempdir = tempfile::tempdir().unwrap(); let follower_id: NodeId = "test-follower".into(); let wal_dir_path = tempdir.path(); - let replication_factor = 2; + let follower = Ingester::try_new( follower_id.clone(), ingester_pool.clone(), wal_dir_path, + disk_capacity, + memory_capacity, replication_factor, ) .await @@ -876,6 +956,26 @@ mod tests { assert_eq!(persist_response.successes.len(), 2); assert_eq!(persist_response.failures.len(), 0); + let persist_success_0 = &persist_response.successes[0]; + assert_eq!(persist_success_0.subrequest_id, 0); + assert_eq!(persist_success_0.index_uid, "test-index:0"); + assert_eq!(persist_success_0.source_id, "test-source"); + assert_eq!(persist_success_0.shard_id, 1); + assert_eq!( + persist_success_0.replication_position_inclusive, + Some(Position::from(1u64)) + ); + + let persist_success_1 = &persist_response.successes[1]; + assert_eq!(persist_success_1.subrequest_id, 1); + assert_eq!(persist_success_1.index_uid, "test-index:1"); + assert_eq!(persist_success_1.source_id, "test-source"); + assert_eq!(persist_success_1.shard_id, 1); + assert_eq!( + persist_success_1.replication_position_inclusive, + Some(Position::from(2u64)) + ); + let leader_state_guard = leader.state.read().await; assert_eq!(leader_state_guard.shards.len(), 2); @@ -943,11 +1043,15 @@ mod tests { let leader_id: NodeId = "test-leader".into(); let ingester_pool = IngesterPool::default(); let wal_dir_path = tempdir.path(); + let disk_capacity = ByteSize::mb(256); + let memory_capacity = ByteSize::mb(1); let replication_factor = 2; let mut leader = Ingester::try_new( leader_id.clone(), ingester_pool.clone(), wal_dir_path, + disk_capacity, + memory_capacity, replication_factor, ) .await @@ -970,11 +1074,15 @@ mod tests { let tempdir = tempfile::tempdir().unwrap(); let follower_id: NodeId = "test-follower".into(); let wal_dir_path = tempdir.path(); + let disk_capacity = ByteSize::mb(256); + let memory_capacity = ByteSize::mb(1); let replication_factor = 2; let follower = Ingester::try_new( follower_id.clone(), ingester_pool.clone(), wal_dir_path, + disk_capacity, + memory_capacity, replication_factor, ) .await @@ -1028,6 +1136,26 @@ mod tests { assert_eq!(persist_response.successes.len(), 2); assert_eq!(persist_response.failures.len(), 0); + let persist_success_0 = &persist_response.successes[0]; + assert_eq!(persist_success_0.subrequest_id, 0); + assert_eq!(persist_success_0.index_uid, "test-index:0"); + assert_eq!(persist_success_0.source_id, "test-source"); + assert_eq!(persist_success_0.shard_id, 1); + assert_eq!( + persist_success_0.replication_position_inclusive, + Some(Position::from(0u64)) + ); + + let persist_success_1 = &persist_response.successes[1]; + assert_eq!(persist_success_1.subrequest_id, 1); + assert_eq!(persist_success_1.index_uid, "test-index:1"); + assert_eq!(persist_success_1.source_id, "test-source"); + assert_eq!(persist_success_1.shard_id, 1); + assert_eq!( + persist_success_1.replication_position_inclusive, + Some(Position::from(1u64)) + ); + let leader_state_guard = leader.state.read().await; assert_eq!(leader_state_guard.shards.len(), 2); @@ -1081,17 +1209,147 @@ mod tests { ); } + #[tokio::test] + async fn test_ingester_persist_shard_closed() { + let tempdir = tempfile::tempdir().unwrap(); + let self_node_id: NodeId = "test-ingester-0".into(); + let ingester_pool = IngesterPool::default(); + let wal_dir_path = tempdir.path(); + let disk_capacity = ByteSize::mib(256); + let memory_capacity = ByteSize::mib(1); + let replication_factor = 1; + let mut ingester = Ingester::try_new( + self_node_id.clone(), + ingester_pool, + wal_dir_path, + disk_capacity, + memory_capacity, + replication_factor, + ) + .await + .unwrap(); + + let queue_id_01 = queue_id("test-index:0", "test-source", 1); + + let solo_shard = SoloShard::new(ShardState::Closed, Position::Beginning); + let shard = IngesterShard::Solo(solo_shard); + + ingester + .state + .write() + .await + .shards + .insert(queue_id_01.clone(), shard); + + let persist_request = PersistRequest { + leader_id: self_node_id.to_string(), + commit_type: CommitTypeV2::Force as i32, + subrequests: vec![PersistSubrequest { + subrequest_id: 0, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + follower_id: None, + doc_batch: Some(DocBatchV2::for_test(["test-doc-010"])), + }], + }; + let persist_response = ingester.persist(persist_request).await.unwrap(); + assert_eq!(persist_response.leader_id, "test-ingester-0"); + assert_eq!(persist_response.successes.len(), 0); + assert_eq!(persist_response.failures.len(), 1); + + let persist_failure = &persist_response.failures[0]; + assert_eq!(persist_failure.subrequest_id, 0); + assert_eq!(persist_failure.index_uid, "test-index:0"); + assert_eq!(persist_failure.source_id, "test-source"); + assert_eq!(persist_failure.shard_id, 1); + assert_eq!(persist_failure.reason(), PersistFailureReason::ShardClosed); + + let state_guard = ingester.state.read().await; + assert_eq!(state_guard.shards.len(), 1); + + let solo_shard_01 = state_guard.shards.get(&queue_id_01).unwrap(); + solo_shard_01.assert_is_solo(); + solo_shard_01.assert_is_closed(); + solo_shard_01.assert_replication_position(Position::Beginning); + } + + #[tokio::test] + async fn test_ingester_persist_resource_exhausted() { + let tempdir = tempfile::tempdir().unwrap(); + let self_node_id: NodeId = "test-ingester-0".into(); + let ingester_pool = IngesterPool::default(); + let wal_dir_path = tempdir.path(); + let disk_capacity = ByteSize(0); + let memory_capacity = ByteSize(0); + let replication_factor = 1; + let mut ingester = Ingester::try_new( + self_node_id.clone(), + ingester_pool, + wal_dir_path, + disk_capacity, + memory_capacity, + replication_factor, + ) + .await + .unwrap(); + + let persist_request = PersistRequest { + leader_id: self_node_id.to_string(), + commit_type: CommitTypeV2::Force as i32, + subrequests: vec![PersistSubrequest { + subrequest_id: 0, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + follower_id: None, + doc_batch: Some(DocBatchV2::for_test(["test-doc-010"])), + }], + }; + let persist_response = ingester.persist(persist_request).await.unwrap(); + assert_eq!(persist_response.leader_id, "test-ingester-0"); + assert_eq!(persist_response.successes.len(), 0); + assert_eq!(persist_response.failures.len(), 1); + + let persist_failure = &persist_response.failures[0]; + assert_eq!(persist_failure.subrequest_id, 0); + assert_eq!(persist_failure.index_uid, "test-index:0"); + assert_eq!(persist_failure.source_id, "test-source"); + assert_eq!(persist_failure.shard_id, 1); + assert_eq!( + persist_failure.reason(), + PersistFailureReason::ResourceExhausted + ); + + let state_guard = ingester.state.read().await; + assert_eq!(state_guard.shards.len(), 1); + + let queue_id_01 = queue_id("test-index:0", "test-source", 1); + let solo_shard_01 = state_guard.shards.get(&queue_id_01).unwrap(); + solo_shard_01.assert_is_solo(); + solo_shard_01.assert_is_open(); + solo_shard_01.assert_replication_position(Position::Beginning); + + state_guard + .mrecordlog + .assert_records_eq(&queue_id_01, .., &[]); + } + #[tokio::test] async fn test_ingester_open_fetch_stream() { let tempdir = tempfile::tempdir().unwrap(); let self_node_id: NodeId = "test-ingester-0".into(); let ingester_pool = IngesterPool::default(); let wal_dir_path = tempdir.path(); + let disk_capacity = ByteSize::mb(256); + let memory_capacity = ByteSize::mb(1); let replication_factor = 1; let mut ingester = Ingester::try_new( self_node_id.clone(), ingester_pool, wal_dir_path, + disk_capacity, + memory_capacity, replication_factor, ) .await @@ -1185,11 +1443,15 @@ mod tests { let self_node_id: NodeId = "test-ingester-0".into(); let ingester_pool = IngesterPool::default(); let wal_dir_path = tempdir.path(); + let disk_capacity = ByteSize::mb(256); + let memory_capacity = ByteSize::mb(1); let replication_factor = 1; let mut ingester = Ingester::try_new( self_node_id.clone(), ingester_pool, wal_dir_path, + disk_capacity, + memory_capacity, replication_factor, ) .await @@ -1275,11 +1537,15 @@ mod tests { let self_node_id: NodeId = "test-ingester-0".into(); let ingester_pool = IngesterPool::default(); let wal_dir_path = tempdir.path(); + let disk_capacity = ByteSize::mb(256); + let memory_capacity = ByteSize::mb(1); let replication_factor = 1; let mut ingester = Ingester::try_new( self_node_id.clone(), ingester_pool, wal_dir_path, + disk_capacity, + memory_capacity, replication_factor, ) .await diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index 16e4317ab70..ec950ada275 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -21,6 +21,7 @@ mod fetch; mod ingester; mod models; mod mrecord; +mod mrecordlog_utils; mod replication; mod router; mod shard_table; @@ -28,12 +29,15 @@ mod shard_table; mod test_utils; mod workbench; +use bytesize::ByteSize; use quickwit_common::tower::Pool; use quickwit_proto::ingest::ingester::IngesterServiceClient; +use quickwit_proto::ingest::DocBatchV2; use quickwit_proto::types::NodeId; pub use self::fetch::{FetchStreamError, MultiFetchStream}; pub use self::ingester::Ingester; +use self::mrecord::MRECORD_HEADER_LEN; pub use self::mrecord::{decoded_mrecords, MRecord}; pub use self::router::IngestRouter; @@ -45,3 +49,28 @@ pub type ClientId = String; pub type LeaderId = NodeId; pub type FollowerId = NodeId; + +pub(super) fn estimate_size(doc_batch: &DocBatchV2) -> ByteSize { + let estimate = doc_batch.num_bytes() + doc_batch.num_docs() * MRECORD_HEADER_LEN; + ByteSize(estimate as u64) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_estimate_size() { + let doc_batch = DocBatchV2 { + doc_buffer: Vec::new().into(), + doc_lengths: Vec::new(), + }; + assert_eq!(estimate_size(&doc_batch), ByteSize(0)); + + let doc_batch = DocBatchV2 { + doc_buffer: vec![0u8; 100].into(), + doc_lengths: vec![10, 20, 30], + }; + assert_eq!(estimate_size(&doc_batch), ByteSize(106)); + } +} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mrecord.rs b/quickwit/quickwit-ingest/src/ingest_v2/mrecord.rs index 0d32ba4b52d..d73b7b83dc8 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mrecord.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mrecord.rs @@ -28,14 +28,17 @@ pub enum HeaderVersion { V0 = 0, } +/// Length of the header of a [`MRecord`] in bytes. +pub(super) const MRECORD_HEADER_LEN: usize = 2; + /// `Doc` header v0 composed of the header version and the `Doc = 0` record type. -const DOC_HEADER_V0: &[u8; 2] = &[HeaderVersion::V0 as u8, 0]; +const DOC_HEADER_V0: &[u8; MRECORD_HEADER_LEN] = &[HeaderVersion::V0 as u8, 0]; /// `Commit` header v0 composed of the header version and the `Commit = 1` record type. -const COMMIT_HEADER_V0: &[u8; 2] = &[HeaderVersion::V0 as u8, 1]; +const COMMIT_HEADER_V0: &[u8; MRECORD_HEADER_LEN] = &[HeaderVersion::V0 as u8, 1]; /// `Eof` header v0 composed of the header version and the `Eof = 2` record type. -const EOF_HEADER_V0: &[u8; 2] = &[HeaderVersion::V0 as u8, 2]; +const EOF_HEADER_V0: &[u8; MRECORD_HEADER_LEN] = &[HeaderVersion::V0 as u8, 2]; #[derive(Debug, Clone, Eq, PartialEq)] pub enum MRecord { diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs new file mode 100644 index 00000000000..15cf05a0465 --- /dev/null +++ b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs @@ -0,0 +1,159 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use bytesize::ByteSize; +use mrecordlog::error::{AppendError, MissingQueue}; +use mrecordlog::MultiRecordLog; +use quickwit_proto::types::QueueId; +use tracing::warn; + +use super::mrecord::is_eof_mrecord; +use crate::MRecord; + +/// Appends an EOF record to the queue if it is empty or the last record is not an EOF +/// record. +pub(super) async fn append_eof_record_if_necessary( + mrecordlog: &mut MultiRecordLog, + queue_id: &QueueId, +) { + let should_append_eof_record = match mrecordlog.last_record(queue_id) { + Ok(Some((_, last_mrecord))) => !is_eof_mrecord(&last_mrecord), + Ok(None) => true, + Err(MissingQueue(_)) => { + warn!("failed to append EOF record to queue `{queue_id}`: queue does not exist"); + return; + } + }; + if should_append_eof_record { + match mrecordlog + .append_record(queue_id, None, MRecord::Eof.encode()) + .await + { + Ok(_) | Err(AppendError::MissingQueue(_)) => {} + Err(error) => { + warn!("failed to append EOF record to queue `{queue_id}`: {error}"); + } + } + } +} + +/// Error returned when the mrecordlog does not have enough capacity to store some records. +#[derive(Debug, Clone, Copy, thiserror::Error)] +pub(super) enum NotEnoughCapacityError { + #[error( + "write-ahead log is full, capacity: usage: {usage}, capacity: {capacity}, requested: \ + {requested}" + )] + Disk { + usage: ByteSize, + capacity: ByteSize, + requested: ByteSize, + }, + #[error( + "write-ahead log memory buffer is full, usage: {usage}, capacity: {capacity}, requested: \ + {requested}" + )] + Memory { + usage: ByteSize, + capacity: ByteSize, + requested: ByteSize, + }, +} + +/// Checks whether the log has enough capacity to store some records. +pub(super) fn check_enough_capacity( + mrecordlog: &MultiRecordLog, + disk_capacity: ByteSize, + memory_capacity: ByteSize, + requested_capacity: ByteSize, +) -> Result<(), NotEnoughCapacityError> { + let disk_usage = ByteSize(mrecordlog.disk_usage() as u64); + + if disk_usage + requested_capacity > disk_capacity { + return Err(NotEnoughCapacityError::Disk { + usage: disk_usage, + capacity: disk_capacity, + requested: requested_capacity, + }); + } + let memory_usage = ByteSize(mrecordlog.memory_usage() as u64); + + if memory_usage + requested_capacity > memory_capacity { + return Err(NotEnoughCapacityError::Memory { + usage: memory_usage, + capacity: memory_capacity, + requested: requested_capacity, + }); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_append_eof_record_if_necessary() { + let tempdir = tempfile::tempdir().unwrap(); + let mut mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); + + append_eof_record_if_necessary(&mut mrecordlog, &"queue-not-found".to_string()).await; + + mrecordlog.create_queue("test-queue").await.unwrap(); + append_eof_record_if_necessary(&mut mrecordlog, &"test-queue".to_string()).await; + + let (last_position, last_record) = mrecordlog.last_record("test-queue").unwrap().unwrap(); + assert_eq!(last_position, 0); + assert!(is_eof_mrecord(&last_record)); + + append_eof_record_if_necessary(&mut mrecordlog, &"test-queue".to_string()).await; + let (last_position, last_record) = mrecordlog.last_record("test-queue").unwrap().unwrap(); + assert_eq!(last_position, 0); + assert!(is_eof_mrecord(&last_record)); + + mrecordlog.truncate("test-queue", 0).await.unwrap(); + + append_eof_record_if_necessary(&mut mrecordlog, &"test-queue".to_string()).await; + let (last_position, last_record) = mrecordlog.last_record("test-queue").unwrap().unwrap(); + assert_eq!(last_position, 1); + assert!(is_eof_mrecord(&last_record)); + } + + #[tokio::test] + async fn test_check_enough_capacity() { + let tempdir = tempfile::tempdir().unwrap(); + let mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); + + let disk_error = + check_enough_capacity(&mrecordlog, ByteSize(0), ByteSize(0), ByteSize(12)).unwrap_err(); + + assert!(matches!(disk_error, NotEnoughCapacityError::Disk { .. })); + + let memory_error = + check_enough_capacity(&mrecordlog, ByteSize::mb(256), ByteSize(11), ByteSize(12)) + .unwrap_err(); + + assert!(matches!( + memory_error, + NotEnoughCapacityError::Memory { .. } + )); + + check_enough_capacity(&mrecordlog, ByteSize::mb(256), ByteSize(12), ByteSize(12)).unwrap(); + } +} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index 1feb4a5e1a0..67d19cdb7b1 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -22,22 +22,26 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; +use bytesize::ByteSize; use futures::{Future, StreamExt}; use quickwit_common::ServiceStream; use quickwit_proto::ingest::ingester::{ - ack_replication_message, syn_replication_message, AckReplicationMessage, ReplicateRequest, - ReplicateResponse, ReplicateSuccess, SynReplicationMessage, + ack_replication_message, syn_replication_message, AckReplicationMessage, ReplicateFailure, + ReplicateFailureReason, ReplicateRequest, ReplicateResponse, ReplicateSuccess, + SynReplicationMessage, }; use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result}; use quickwit_proto::types::{NodeId, Position}; use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::task::JoinHandle; -use tracing::error; +use tracing::{error, warn}; use super::ingester::IngesterState; use super::models::{IngesterShard, ReplicaShard}; use super::mrecord::MRecord; +use super::mrecordlog_utils::check_enough_capacity; +use crate::estimate_size; use crate::metrics::INGEST_METRICS; pub(super) const SYN_REPLICATION_STREAM_CAPACITY: usize = 5; @@ -265,6 +269,8 @@ pub(super) struct ReplicationTask { syn_replication_stream: ServiceStream, ack_replication_stream_tx: mpsc::UnboundedSender>, current_replication_seqno: ReplicationSeqNo, + disk_capacity: ByteSize, + memory_capacity: ByteSize, } impl ReplicationTask { @@ -274,6 +280,8 @@ impl ReplicationTask { state: Arc>, syn_replication_stream: ServiceStream, ack_replication_stream_tx: mpsc::UnboundedSender>, + disk_capacity: ByteSize, + memory_capacity: ByteSize, ) -> ReplicationTaskHandle { let mut replication_task = Self { leader_id, @@ -282,6 +290,8 @@ impl ReplicationTask { syn_replication_stream, ack_replication_stream_tx, current_replication_seqno: 0, + disk_capacity, + memory_capacity, }; let join_handle = tokio::spawn(async move { replication_task.run().await }); ReplicationTaskHandle { join_handle } @@ -314,7 +324,9 @@ impl ReplicationTask { let commit_type = replicate_request.commit_type(); let force_commit = commit_type == CommitTypeV2::Force; + let mut replicate_successes = Vec::with_capacity(replicate_request.subrequests.len()); + let mut replicate_failures = Vec::new(); let mut state_guard = self.state.write().await; @@ -341,15 +353,57 @@ impl ReplicationTask { .expect("replica shard should be initialized") }; if replica_shard.is_closed() { - // TODO + let replicate_failure = ReplicateFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: ReplicateFailureReason::ShardClosed as i32, + }; + replicate_failures.push(replicate_failure); + continue; } if replica_shard.replication_position_inclusive() != from_position_exclusive { // TODO } - let doc_batch = subrequest - .doc_batch - .expect("leader should not send empty replicate subrequests"); + let doc_batch = match subrequest.doc_batch { + Some(doc_batch) if !doc_batch.is_empty() => doc_batch, + _ => { + warn!("received empty replicate request"); + let replicate_success = ReplicateSuccess { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + replication_position_inclusive: Some( + replica_shard.replication_position_inclusive(), + ), + }; + replicate_successes.push(replicate_success); + continue; + } + }; + let requested_capacity = estimate_size(&doc_batch); + + if let Err(error) = check_enough_capacity( + &state_guard.mrecordlog, + self.disk_capacity, + self.memory_capacity, + requested_capacity, + ) { + warn!("failed to replicate records: {error}"); + + let replicate_failure = ReplicateFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: ReplicateFailureReason::ResourceExhausted as i32, + }; + replicate_failures.push(replicate_failure); + continue; + } let current_position_inclusive: Position = if force_commit { let encoded_mrecords = doc_batch .docs() @@ -404,7 +458,7 @@ impl ReplicationTask { let replicate_response = ReplicateResponse { follower_id, successes: replicate_successes, - failures: Vec::new(), + failures: replicate_failures, replication_seqno: replicate_request.replication_seqno, }; Ok(replicate_response) @@ -463,7 +517,7 @@ mod tests { use mrecordlog::MultiRecordLog; use quickwit_proto::ingest::ingester::{ReplicateSubrequest, ReplicateSuccess}; - use quickwit_proto::ingest::DocBatchV2; + use quickwit_proto::ingest::{DocBatchV2, ShardState}; use quickwit_proto::types::queue_id; use super::*; @@ -627,12 +681,18 @@ mod tests { ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY); let (ack_replication_stream_tx, mut ack_replication_stream) = ServiceStream::new_unbounded(); + + let disk_capacity = ByteSize::mb(256); + let memory_capacity = ByteSize::mb(1); + let _replication_task_handle = ReplicationTask::spawn( leader_id, follower_id, state.clone(), syn_replication_stream, ack_replication_stream_tx, + disk_capacity, + memory_capacity, ); let replicate_request = ReplicateRequest { leader_id: "test-leader".to_string(), @@ -767,4 +827,151 @@ mod tests { &[(0, "\0\0test-doc-foo"), (1, "\0\0test-doc-moo")], ); } + + #[tokio::test] + async fn test_replication_task_shard_closed() { + let leader_id: NodeId = "test-leader".into(); + let follower_id: NodeId = "test-follower".into(); + let tempdir = tempfile::tempdir().unwrap(); + let mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); + let state = Arc::new(RwLock::new(IngesterState { + mrecordlog, + shards: HashMap::new(), + replication_streams: HashMap::new(), + replication_tasks: HashMap::new(), + })); + let (syn_replication_stream_tx, syn_replication_stream) = + ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY); + let (ack_replication_stream_tx, mut ack_replication_stream) = + ServiceStream::new_unbounded(); + + let disk_capacity = ByteSize::mb(256); + let memory_capacity = ByteSize::mb(1); + + let _replication_task_handle = ReplicationTask::spawn( + leader_id.clone(), + follower_id, + state.clone(), + syn_replication_stream, + ack_replication_stream_tx, + disk_capacity, + memory_capacity, + ); + + let queue_id_01 = queue_id("test-index:0", "test-source", 1); + + let mut replica_shard = ReplicaShard::new(leader_id); + replica_shard.shard_state = ShardState::Closed; + let shard = IngesterShard::Replica(replica_shard); + + state + .write() + .await + .shards + .insert(queue_id_01.clone(), shard); + + let replicate_request = ReplicateRequest { + leader_id: "test-leader".to_string(), + follower_id: "test-follower".to_string(), + commit_type: CommitTypeV2::Auto as i32, + subrequests: vec![ReplicateSubrequest { + subrequest_id: 0, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), + from_position_exclusive: Position::from(0u64).into(), + to_position_inclusive: Some(Position::from(1u64)), + }], + replication_seqno: 0, + }; + let syn_replication_message = + SynReplicationMessage::new_replicate_request(replicate_request); + syn_replication_stream_tx + .send(syn_replication_message) + .await + .unwrap(); + let ack_replication_message = ack_replication_stream.next().await.unwrap().unwrap(); + let replicate_response = into_replicate_response(ack_replication_message); + + assert_eq!(replicate_response.follower_id, "test-follower"); + assert_eq!(replicate_response.successes.len(), 0); + assert_eq!(replicate_response.failures.len(), 1); + + let replicate_failure = &replicate_response.failures[0]; + assert_eq!(replicate_failure.index_uid, "test-index:0"); + assert_eq!(replicate_failure.source_id, "test-source"); + assert_eq!(replicate_failure.shard_id, 1); + assert_eq!( + replicate_failure.reason(), + ReplicateFailureReason::ShardClosed + ); + } + + #[tokio::test] + async fn test_replication_task_resource_exhausted() { + let leader_id: NodeId = "test-leader".into(); + let follower_id: NodeId = "test-follower".into(); + let tempdir = tempfile::tempdir().unwrap(); + let mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); + let state = Arc::new(RwLock::new(IngesterState { + mrecordlog, + shards: HashMap::new(), + replication_streams: HashMap::new(), + replication_tasks: HashMap::new(), + })); + let (syn_replication_stream_tx, syn_replication_stream) = + ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY); + let (ack_replication_stream_tx, mut ack_replication_stream) = + ServiceStream::new_unbounded(); + + let disk_capacity = ByteSize(0); + let memory_capacity = ByteSize(0); + + let _replication_task_handle = ReplicationTask::spawn( + leader_id, + follower_id, + state.clone(), + syn_replication_stream, + ack_replication_stream_tx, + disk_capacity, + memory_capacity, + ); + let replicate_request = ReplicateRequest { + leader_id: "test-leader".to_string(), + follower_id: "test-follower".to_string(), + commit_type: CommitTypeV2::Auto as i32, + subrequests: vec![ReplicateSubrequest { + subrequest_id: 0, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), + from_position_exclusive: None, + to_position_inclusive: Some(Position::from(0u64)), + }], + replication_seqno: 0, + }; + let syn_replication_message = + SynReplicationMessage::new_replicate_request(replicate_request); + syn_replication_stream_tx + .send(syn_replication_message) + .await + .unwrap(); + let ack_replication_message = ack_replication_stream.next().await.unwrap().unwrap(); + let replicate_response = into_replicate_response(ack_replication_message); + + assert_eq!(replicate_response.follower_id, "test-follower"); + assert_eq!(replicate_response.successes.len(), 0); + assert_eq!(replicate_response.failures.len(), 1); + + let replicate_failure_0 = &replicate_response.failures[0]; + assert_eq!(replicate_failure_0.index_uid, "test-index:0"); + assert_eq!(replicate_failure_0.source_id, "test-source"); + assert_eq!(replicate_failure_0.shard_id, 1); + assert_eq!( + replicate_failure_0.reason(), + ReplicateFailureReason::ResourceExhausted + ); + } } diff --git a/quickwit/quickwit-ingest/src/queue.rs b/quickwit/quickwit-ingest/src/queue.rs index df5e80e593c..cb3e7e9eacb 100644 --- a/quickwit/quickwit-ingest/src/queue.rs +++ b/quickwit/quickwit-ingest/src/queue.rs @@ -209,11 +209,11 @@ impl Queues { } pub(crate) fn disk_usage(&self) -> usize { - self.record_log.on_disk_size() + self.record_log.disk_usage() } pub(crate) fn memory_usage(&self) -> usize { - self.record_log.in_memory_size() + self.record_log.memory_usage() } } diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index be924bc950b..109888c60f8 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -145,13 +145,19 @@ message ReplicateSuccess { quickwit.ingest.Position replication_position_inclusive = 5; } +enum ReplicateFailureReason { + REPLICATE_FAILURE_REASON_UNSPECIFIED = 0; + REPLICATE_FAILURE_REASON_SHARD_CLOSED = 1; + reserved 2; // REPLICATE_FAILURE_REASON_RATE_LIMITED = 2; + REPLICATE_FAILURE_REASON_RESOURCE_EXHAUSTED = 3; +} + message ReplicateFailure { uint32 subrequest_id = 1; string index_uid = 2; string source_id = 3; uint64 shard_id = 4; - // ingest.DocBatchV2 doc_batch = 4; - // ingest.IngestError error = 5; + ReplicateFailureReason reason = 5; } message TruncateRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index 26322f37715..53857e8ac79 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -194,10 +194,10 @@ pub struct ReplicateFailure { pub index_uid: ::prost::alloc::string::String, #[prost(string, tag = "3")] pub source_id: ::prost::alloc::string::String, - /// ingest.DocBatchV2 doc_batch = 4; - /// ingest.IngestError error = 5; #[prost(uint64, tag = "4")] pub shard_id: u64, + #[prost(enumeration = "ReplicateFailureReason", tag = "5")] + pub reason: i32, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -320,6 +320,43 @@ impl PersistFailureReason { } } } +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "snake_case")] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ReplicateFailureReason { + Unspecified = 0, + ShardClosed = 1, + ResourceExhausted = 3, +} +impl ReplicateFailureReason { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + ReplicateFailureReason::Unspecified => "REPLICATE_FAILURE_REASON_UNSPECIFIED", + ReplicateFailureReason::ShardClosed => { + "REPLICATE_FAILURE_REASON_SHARD_CLOSED" + } + ReplicateFailureReason::ResourceExhausted => { + "REPLICATE_FAILURE_REASON_RESOURCE_EXHAUSTED" + } + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "REPLICATE_FAILURE_REASON_UNSPECIFIED" => Some(Self::Unspecified), + "REPLICATE_FAILURE_REASON_SHARD_CLOSED" => Some(Self::ShardClosed), + "REPLICATE_FAILURE_REASON_RESOURCE_EXHAUSTED" => { + Some(Self::ResourceExhausted) + } + _ => None, + } + } +} /// BEGIN quickwit-codegen #[allow(unused_imports)] use std::str::FromStr; diff --git a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs index d698143d0ec..5a65411d262 100644 --- a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs @@ -79,7 +79,7 @@ fn ingest_filter( warp::path!(String / "ingest") .and(warp::post()) .and(warp::body::content_length_limit( - config.content_length_limit, + config.content_length_limit.as_u64(), )) .and(warp::body::bytes()) .and(serde_qs::warp::query::( @@ -103,7 +103,7 @@ fn ingest_v2_filter( warp::path!(String / "ingest-v2") .and(warp::post()) .and(warp::body::content_length_limit( - config.content_length_limit, + config.content_length_limit.as_u64(), )) .and(warp::body::bytes()) .and(serde_qs::warp::query::( @@ -355,7 +355,7 @@ pub(crate) mod tests { #[tokio::test] async fn test_ingest_api_return_413_if_above_content_limit() { let config = IngestApiConfig { - content_length_limit: 1, + content_length_limit: ByteSize(1), ..Default::default() }; let (universe, _temp_dir, ingest_service, _) = diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index e903211a632..fcd3618df9e 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -572,6 +572,8 @@ async fn setup_ingest_v2( self_node_id.clone(), ingester_pool.clone(), &wal_dir_path, + config.ingest_api_config.max_queue_disk_usage, + config.ingest_api_config.max_queue_memory_usage, replication_factor, ) .await?;