Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed May 29, 2024
1 parent 982a4dd commit 14ea667
Show file tree
Hide file tree
Showing 33 changed files with 496 additions and 891 deletions.
2 changes: 2 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions quickwit/quickwit-common/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl ThreadPool {
/// but is not running yet "cancellable".
pub fn run_cpu_intensive<F, R>(
&self,
cpu_heavy_task: F,
cpu_intensive_fn: F,
) -> impl Future<Output = Result<R, Panicked>>
where
F: FnOnce() -> R + Send + 'static,
Expand All @@ -103,7 +103,7 @@ impl ThreadPool {
let _guard = span.enter();
let mut ongoing_task_guard = GaugeGuard::from_gauge(&ongoing_tasks);
ongoing_task_guard.add(1i64);
let result = cpu_heavy_task();
let result = cpu_intensive_fn();
let _ = tx.send(result);
});
rx.map_err(|_| Panicked)
Expand All @@ -118,7 +118,7 @@ impl ThreadPool {
///
/// Disclaimer: The function will no be executed if the Future is dropped.
#[must_use = "run_cpu_intensive will not run if the future it returns is dropped"]
pub fn run_cpu_intensive<F, R>(cpu_heavy_task: F) -> impl Future<Output = Result<R, Panicked>>
pub fn run_cpu_intensive<F, R>(cpu_intensive_fn: F) -> impl Future<Output = Result<R, Panicked>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
Expand All @@ -129,7 +129,7 @@ where
let num_threads: usize = (crate::num_cpus() / 3).max(2);
ThreadPool::new("small_tasks", Some(num_threads))
})
.run_cpu_intensive(cpu_heavy_task)
.run_cpu_intensive(cpu_intensive_fn)
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ pub struct DocProcessorCounters {
/// into 4 categories:
/// - number of docs that could not be parsed.
/// - number of docs that could not be transformed.
/// - number of docs for which the doc mapper returnd an error.
/// - number of docs for which the doc mapper returned an error.
/// - number of valid docs.
pub num_doc_parse_errors: AtomicU64,
pub num_transform_errors: AtomicU64,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ quickwit-actors = { workspace = true }
quickwit-cluster = { workspace = true }
quickwit-common = { workspace = true }
quickwit-config = { workspace = true }
quickwit-doc-mapper = { workspace = true, features = ["testsuite"] }
quickwit-proto = { workspace = true }

[dev-dependencies]
Expand Down
93 changes: 77 additions & 16 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS};
use quickwit_common::pretty::PrettyDisplay;
use quickwit_common::pubsub::{EventBroker, EventSubscriber};
use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings};
use quickwit_common::thread_pool::run_cpu_intensive;
use quickwit_common::tower::Pool;
use quickwit_common::{rate_limited_warn, ServiceStream};
use quickwit_config::{build_doc_mapper, DocMapping, SearchSettings};
use quickwit_doc_mapper::DocMapper;
use quickwit_proto::control_plane::{
AdviseResetShardsRequest, ControlPlaneService, ControlPlaneServiceClient,
};
Expand All @@ -47,10 +50,11 @@ use quickwit_proto::ingest::ingester::{
DecommissionResponse, FetchMessage, IngesterService, IngesterServiceClient,
IngesterServiceStream, IngesterStatus, InitShardFailure, InitShardSuccess, InitShardsRequest,
InitShardsResponse, ObservationMessage, OpenFetchStreamRequest, OpenObservationStreamRequest,
OpenReplicationStreamRequest, OpenReplicationStreamResponse, PersistFailure,
PersistFailureReason, PersistRequest, PersistResponse, PersistSuccess, ReplicateFailureReason,
ReplicateSubrequest, RetainShardsForSource, RetainShardsRequest, RetainShardsResponse,
SynReplicationMessage, TruncateShardsRequest, TruncateShardsResponse,
OpenReplicationStreamRequest, OpenReplicationStreamResponse, ParseFailure, ParseFailureKind,
ParseFailureReason, PersistFailure, PersistFailureReason, PersistRequest, PersistResponse,
PersistSuccess, ReplicateFailureReason, ReplicateSubrequest, RetainShardsForSource,
RetainShardsRequest, RetainShardsResponse, SynReplicationMessage, TruncateShardsRequest,
TruncateShardsResponse,
};
use quickwit_proto::ingest::{
CommitTypeV2, IngestV2Error, IngestV2Result, Shard, ShardIds, ShardState,
Expand Down Expand Up @@ -222,6 +226,7 @@ impl Ingester {
let primary_shard = if let Some(follower_id) = &shard.follower_id {
let leader_id: NodeId = shard.leader_id.clone().into();
let follower_id: NodeId = follower_id.clone().into();
let doc_mapper = try_build_doc_mapper(&shard.doc_mapping_json)?;

let replication_client = self
.init_replication_stream(
Expand All @@ -233,14 +238,14 @@ impl Ingester {

if let Err(error) = replication_client.init_replica(shard).await {
// TODO: Remove dangling queue from the WAL.
error!("failed to initialize replica shard: {error}",);
return Err(IngestV2Error::Internal(format!(
"failed to initialize replica shard: {error}"
)));
error!("failed to initialize replica shard: {error}");
let message = format!("failed to initialize replica shard: {error}");
return Err(IngestV2Error::Internal(message));
}
IngesterShard::new_primary(
follower_id,
ShardState::Open,
doc_mapper,
Position::Beginning,
Position::Beginning,
now,
Expand Down Expand Up @@ -475,7 +480,7 @@ impl Ingester {

// first verify if we would locally accept each subrequest
{
let mut total_requested_capacity = bytesize::ByteSize::b(0);
let mut total_requested_capacity = ByteSize::b(0);

for subrequest in persist_request.subrequests {
let queue_id = subrequest.queue_id();
Expand All @@ -502,7 +507,9 @@ impl Ingester {
persist_failures.push(persist_failure);
continue;
}

let doc_mapper = shard
.doc_mapper_opt()
.expect("shard should be primary shard");
let follower_id_opt = shard.follower_id_opt().cloned();
let from_position_exclusive = shard.replication_position_inclusive.clone();

Expand All @@ -512,14 +519,17 @@ impl Ingester {
_ => {
warn!("received empty persist request");

let replication_position_inclusive =
shard.replication_position_inclusive.clone();

let persist_success = PersistSuccess {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
replication_position_inclusive: Some(
shard.replication_position_inclusive.clone(),
),
replication_position_inclusive: Some(replication_position_inclusive),
num_persisted_docs: 0,
parse_failures: Vec::new(),
};
persist_successes.push(persist_success);
continue;
Expand Down Expand Up @@ -566,6 +576,42 @@ impl Ingester {
persist_failures.push(persist_failure);
continue;
}
let doc_batch_clone = doc_batch.clone();
let parse_doc_batch_fn = move || {
let mut parse_failures: Vec<ParseFailure> = Vec::new();

for (doc_uid, doc) in doc_batch_clone.into_docs() {
let Ok(json_doc) = serde_json::from_slice::<JsonValue>(&doc) else {
let parse_failure = ParseFailure {
doc_uid: Some(doc_uid),
reason: ParseFailureReason::InvalidJson as i32,
message: "failed to parse JSON document".to_string(),
};
parse_failures.push(parse_failure);
continue;
};
let JsonValue::Object(json_obj) = json_doc else {
let parse_failure = ParseFailure {
doc_uid: Some(doc_uid),
reason: ParseFailureReason::InvalidJson as i32,
message: "JSON document is not an object".to_string(),
};
parse_failures.push(parse_failure);
continue;
};
if let Err(error) = doc_mapper.doc_from_json_obj(json_obj, doc.len() as u64)
{
let parse_failure = ParseFailure {
doc_uid: Some(doc_uid),
reason: ParseFailureReason::InvalidSchema as i32,
message: error.to_string(),
};
continue;
}
}
persist_failures
};
let parse_failures = run_cpu_intensive(parse_doc_batch_fn).await.expect("TODO");

let batch_num_bytes = doc_batch.num_bytes() as u64;
rate_meter.update(batch_num_bytes);
Expand All @@ -592,6 +638,7 @@ impl Ingester {
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
doc_batch,
parse_failures,
expected_position_inclusive: None,
})
}
Expand Down Expand Up @@ -646,6 +693,7 @@ impl Ingester {
source_id: replicate_success.source_id,
shard_id: replicate_success.shard_id,
doc_batch,
parse_failures,
expected_position_inclusive: replicate_success
.replication_position_inclusive,
};
Expand Down Expand Up @@ -748,6 +796,8 @@ impl Ingester {
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
replication_position_inclusive: Some(current_position_inclusive),
num_persisted_docs: subrequest.num_persisted_docs,
parse_failures: subrequest.parse_failures,
};
persist_successes.push(persist_success);
}
Expand Down Expand Up @@ -1027,7 +1077,7 @@ impl Ingester {
"truncation_position_inclusive": shard.truncation_position_inclusive,
});
match &shard.shard_type {
IngesterShardType::Primary { follower_id } => {
IngesterShardType::Primary { follower_id, .. } => {
shard_json["type"] = json!("primary");
shard_json["leader_id"] = json!(self.self_node_id.to_string());
shard_json["follower_id"] = json!(follower_id.to_string());
Expand Down Expand Up @@ -1232,11 +1282,22 @@ struct LocalPersistSubrequest {
subrequest_id: u32,
index_uid: IndexUid,
source_id: SourceId,
shard_id: Option<quickwit_proto::types::ShardId>,
doc_batch: quickwit_proto::ingest::DocBatchV2,
shard_id: Option<ShardId>,
doc_batch: DocBatchV2,
parse_failures: Vec<ParseFailure>,
expected_position_inclusive: Option<Position>,
}

fn try_build_doc_mapper(doc_mapping_json: &str) -> IngestV2Result<Arc<dyn DocMapper>> {
let doc_mapping: DocMapping = serde_json::from_str(doc_mapping_json).map_err(|error| {
IngestV2Error::Internal(format!("failed to parse doc mapping: {error}"))
})?;
let search_settings = SearchSettings::default();
let doc_mapper = build_doc_mapper(&doc_mapping, &search_settings)
.map_err(|error| IngestV2Error::Internal(format!("failed to build doc mapper: {error}")))?;
Ok(doc_mapper)
}

#[cfg(test)]
mod tests {
#![allow(clippy::mutable_key_type)]
Expand Down
44 changes: 35 additions & 9 deletions quickwit/quickwit-ingest/src/ingest_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::ops::{Add, AddAssign};
use std::time::Duration;
use std::{env, fmt};
use std::{env, fmt, mem};

pub use broadcast::{setup_local_shards_update_listener, LocalShardsUpdate, ShardInfo, ShardInfos};
use bytes::{BufMut, BytesMut};
Expand All @@ -48,6 +48,7 @@ use quickwit_proto::ingest::router::{IngestRequestV2, IngestSubrequest};
use quickwit_proto::ingest::{CommitTypeV2, DocBatchV2};
use quickwit_proto::types::{IndexId, NodeId};
use tracing::{error, info};
use ulid::Ulid;

pub use self::fetch::{FetchStreamError, MultiFetchStream};
pub use self::ingester::{wait_for_ingester_decommission, wait_for_ingester_status, Ingester};
Expand Down Expand Up @@ -110,27 +111,50 @@ pub(crate) fn get_ingest_router_buffer_size() -> ByteSize {
}

/// Helper struct to build a [`DocBatchV2`]`.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct DocBatchV2Builder {
doc_buffer: BytesMut,
doc_lengths: Vec<u32>,
doc_uids: Vec<Ulid>,
current_ulid: Ulid,
}

impl Default for DocBatchV2Builder {
fn default() -> Self {
DocBatchV2Builder {
doc_buffer: BytesMut::default(),
doc_lengths: Vec::default(),
doc_uids: Vec::default(),
current_ulid: Ulid::new(),
}
}
}

impl DocBatchV2Builder {
/// Adds a document to the batch.
pub fn add_doc(&mut self, doc: &[u8]) {
/// Adds a document to the batch, returning its [`Ulid`].
pub fn add_doc(&mut self, doc: &[u8]) -> Ulid {
self.doc_lengths.push(doc.len() as u32);
self.doc_buffer.put(doc);
self.doc_uids.push(self.current_ulid);

let next_ulid = self.current_ulid.increment().unwrap_or_else(Ulid::new);
mem::replace(&mut self.current_ulid, next_ulid)
}

/// Builds the [`DocBatchV2`], returning `None` if the batch is empty.
pub fn build(self) -> Option<DocBatchV2> {
if self.doc_lengths.is_empty() {
return None;
}
let doc_uids = self
.doc_uids
.into_iter()
.flat_map(|ulid| ulid.to_bytes())
.collect();
let doc_batch = DocBatchV2 {
doc_buffer: self.doc_buffer.freeze(),
doc_lengths: self.doc_lengths,
doc_uids,
};
Some(doc_batch)
}
Expand All @@ -145,20 +169,20 @@ pub struct IngestRequestV2Builder {

impl IngestRequestV2Builder {
/// Adds a document to the request.
pub fn add_doc(&mut self, index_id: IndexId, doc: &[u8]) -> u32 {
pub fn add_doc(&mut self, index_id: IndexId, doc: &[u8]) -> (u32, Ulid) {
match self.per_index_id_doc_batch_builders.entry(index_id) {
Entry::Occupied(mut entry) => {
let (subrequest_id, doc_batch_builder) = entry.get_mut();
doc_batch_builder.add_doc(doc);
*subrequest_id
let doc_uid = doc_batch_builder.add_doc(doc);
(*subrequest_id, doc_uid)
}
Entry::Vacant(entry) => {
let subrequest_id = self.subrequest_id_sequence;
self.subrequest_id_sequence += 1;
let mut doc_batch_builder = DocBatchV2Builder::default();
doc_batch_builder.add_doc(doc);
let doc_uid = doc_batch_builder.add_doc(doc);
entry.insert((subrequest_id, doc_batch_builder));
subrequest_id
(subrequest_id, doc_uid)
}
}
}
Expand Down Expand Up @@ -347,12 +371,14 @@ mod tests {
let doc_batch = DocBatchV2 {
doc_buffer: Vec::new().into(),
doc_lengths: Vec::new(),
doc_uids: Vec::new(),
};
assert_eq!(estimate_size(&doc_batch), ByteSize(0));

let doc_batch = DocBatchV2 {
doc_buffer: vec![0u8; 100].into(),
doc_lengths: vec![10, 20, 30],
doc_uids: Vec::new(),
};
assert_eq!(estimate_size(&doc_batch), ByteSize(118));
}
Expand Down
Loading

0 comments on commit 14ea667

Please sign in to comment.