Skip to content

Commit

Permalink
Switch OTLP endpoints to ingest v2 (#5283)
Browse files Browse the repository at this point in the history
* Use ingest V2 for OTLP

* Integration test cleanup

* Assert OTLP searchable

* Disable Jaeger tests for now

* Fix clippy

* Fix integration tests with orderly shutdown

* Migrate Jaeger integ tests to integ tests

* Use correct nano timestamps in tests

* Simplify JsonDocBatchV2Builder

* Refactor some common code

* Rename ingest helper method
  • Loading branch information
rdettai authored Aug 23, 2024
1 parent d9577ba commit d43f013
Show file tree
Hide file tree
Showing 17 changed files with 765 additions and 655 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions quickwit/quickwit-ingest/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use quickwit_actors::AskError;
use quickwit_common::rate_limited_error;
use quickwit_common::tower::BufferError;
pub(crate) use quickwit_proto::error::{grpc_error_to_grpc_status, grpc_status_to_service_error};
use quickwit_proto::ingest::router::{IngestFailure, IngestFailureReason};
use quickwit_proto::ingest::{IngestV2Error, RateLimitingCause};
use quickwit_proto::types::IndexId;
use quickwit_proto::{tonic, GrpcServiceError, ServiceError, ServiceErrorCode};
Expand Down Expand Up @@ -96,6 +97,47 @@ impl From<IngestV2Error> for IngestServiceError {
}
}

impl From<IngestFailure> for IngestServiceError {
fn from(ingest_failure: IngestFailure) -> Self {
match ingest_failure.reason() {
IngestFailureReason::Unspecified => {
IngestServiceError::Internal("unknown error".to_string())
}
IngestFailureReason::IndexNotFound => IngestServiceError::IndexNotFound {
index_id: ingest_failure.index_id,
},
IngestFailureReason::SourceNotFound => IngestServiceError::Internal(format!(
"Ingest v2 source not found for index {}",
ingest_failure.index_id
)),
IngestFailureReason::Internal => {
IngestServiceError::Internal("internal error".to_string())
}
IngestFailureReason::NoShardsAvailable => {
IngestServiceError::Unavailable("no shards available".to_string())
}
IngestFailureReason::ShardRateLimited => {
IngestServiceError::RateLimited(RateLimitingCause::ShardRateLimiting)
}
IngestFailureReason::WalFull => {
IngestServiceError::RateLimited(RateLimitingCause::WalFull)
}
IngestFailureReason::Timeout => {
IngestServiceError::Internal("request timed out".to_string())
}
IngestFailureReason::RouterLoadShedding => {
IngestServiceError::RateLimited(RateLimitingCause::RouterLoadShedding)
}
IngestFailureReason::LoadShedding => {
IngestServiceError::RateLimited(RateLimitingCause::LoadShedding)
}
IngestFailureReason::CircuitBreaker => {
IngestServiceError::RateLimited(RateLimitingCause::CircuitBreaker)
}
}
}
}

impl ServiceError for IngestServiceError {
fn error_code(&self) -> ServiceErrorCode {
match self {
Expand Down
39 changes: 39 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ use std::time::Duration;
use std::{env, fmt};

pub use broadcast::{setup_local_shards_update_listener, LocalShardsUpdate, ShardInfo, ShardInfos};
use bytes::buf::Writer;
use bytes::{BufMut, BytesMut};
use bytesize::ByteSize;
use quickwit_common::tower::Pool;
use quickwit_proto::ingest::ingester::IngesterServiceClient;
use quickwit_proto::ingest::router::{IngestRequestV2, IngestSubrequest};
use quickwit_proto::ingest::{CommitTypeV2, DocBatchV2};
use quickwit_proto::types::{DocUid, DocUidGenerator, IndexId, NodeId, SubrequestId};
use serde::Serialize;
use tracing::{error, info};
use workbench::pending_subrequests;

Expand Down Expand Up @@ -141,6 +143,43 @@ impl DocBatchV2Builder {
}
}

/// Batch builder that can append [`Serialize`] structs without an extra copy
pub struct JsonDocBatchV2Builder {
doc_uids: Vec<DocUid>,
doc_buffer: Writer<BytesMut>,
doc_lengths: Vec<u32>,
}

impl Default for JsonDocBatchV2Builder {
fn default() -> Self {
Self {
doc_uids: Vec::new(),
doc_buffer: BytesMut::new().writer(),
doc_lengths: Vec::new(),
}
}
}

impl JsonDocBatchV2Builder {
pub fn add_doc(&mut self, doc_uid: DocUid, payload: impl Serialize) -> serde_json::Result<()> {
let old_len = self.doc_buffer.get_ref().len();
serde_json::to_writer(&mut self.doc_buffer, &payload)?;
let new_len = self.doc_buffer.get_ref().len();
let written_len = new_len - old_len;
self.doc_uids.push(doc_uid);
self.doc_lengths.push(written_len as u32);
Ok(())
}

pub fn build(self) -> DocBatchV2 {
DocBatchV2 {
doc_uids: self.doc_uids,
doc_buffer: self.doc_buffer.into_inner().freeze(),
doc_lengths: self.doc_lengths,
}
}
}

/// Helper struct to build an [`IngestRequestV2`].
#[derive(Debug, Default)]
pub struct IngestRequestV2Builder {
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-integration-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-config = { workspace = true, features = ["testsuite"] }
quickwit-indexing = { workspace = true, features = ["testsuite"] }
quickwit-metastore = { workspace = true, features = ["testsuite"] }
quickwit-opentelemetry = { workspace = true, features = ["testsuite"] }
quickwit-proto = { workspace = true, features = ["testsuite"] }
quickwit-rest-client = { workspace = true }
quickwit-serve = { workspace = true }
Expand Down
Loading

0 comments on commit d43f013

Please sign in to comment.