diff --git a/config/tutorials/otel-trace/index-config.yaml b/config/tutorials/otel-traces/index-config.yaml
similarity index 100%
rename from config/tutorials/otel-trace/index-config.yaml
rename to config/tutorials/otel-traces/index-config.yaml
diff --git a/config/tutorials/otel-traces/kafka-source.yaml b/config/tutorials/otel-traces/kafka-source.yaml
new file mode 100644
index 00000000000..808f3d3b7c7
--- /dev/null
+++ b/config/tutorials/otel-traces/kafka-source.yaml
@@ -0,0 +1,8 @@
+version: 0.6
+source_id: kafka-source
+source_type: kafka
+input_format: otlp_trace_proto
+params:
+ topic: otlp_spans
+ client_params:
+ bootstrap.servers: localhost:9092
diff --git a/docker-compose.yml b/docker-compose.yml
index a33fc4ca166..23b42b48078 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -93,6 +93,7 @@ services:
ports:
- "${MAP_HOST_KAFKA:-127.0.0.1}:9092:9092"
- "${MAP_HOST_KAFKA:-127.0.0.1}:9101:9101"
+ - "${MAP_HOST_KAFKA:-127.0.0.1}:29092:29092"
profiles:
- all
- kafka
@@ -100,7 +101,7 @@ services:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker:29092,PLAINTEXT_HOST://localhost:9092
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT_HOST://localhost:9092,PLAINTEXT://kafka-broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
@@ -167,8 +168,6 @@ services:
image: jaegertracing/all-in-one:${JAEGER_VERSION:-1.48.0}
container_name: jaeger
ports:
- - "${MAP_HOST_JAEGER:-127.0.0.1}:4317:4317" # OTLP over gRPC
- - "${MAP_HOST_JAEGER:-127.0.0.1}:4318:4318" # OTLP over HTTP
- "${MAP_HOST_JAEGER:-127.0.0.1}:16686:16686" # Frontend
profiles:
- jaeger
@@ -206,18 +205,19 @@ services:
- "host.docker.internal:host-gateway"
gcp-pubsub-emulator:
- profiles:
- - gcp-pubsub
- - all
# It is not an official docker image
# if we prefer we can build a docker from the official docker image (gcloud cli)
# and install the pubsub emulator https://cloud.google.com/pubsub/docs/emulator
image: thekevjames/gcloud-pubsub-emulator:${GCLOUD_EMULATOR:-7555256f2c}
+ container_name: gcp-pubsub-emulator
ports:
- "${MAP_HOST_GCLOUD_EMULATOR:-127.0.0.1}:8681:8681"
environment:
# create a fake gcp project and a topic / subscription
- PUBSUB_PROJECT1=quickwit-emulator,emulator_topic:emulator_subscription
+ profiles:
+ - all
+ - gcp-pubsub
volumes:
localstack_data:
diff --git a/docs/reference/metrics.md b/docs/reference/metrics.md
index 0364fea173a..315b6e3fc9c 100644
--- a/docs/reference/metrics.md
+++ b/docs/reference/metrics.md
@@ -33,8 +33,8 @@ Currently Quickwit exposes metrics for three caches: `fastfields`, `shortlived`,
| Namespace | Metric Name | Description | Labels | Type |
| --------- | ----------- | ----------- | ------ | ---- |
-| `quickwit_indexing` | `processed_docs_total`| Number of processed docs by index, source and processed status in [`valid`, `missing_field`, `parsing_error`, `transform_error`] | [`index`, `source`, `docs_processed_status`] | `counter` |
-| `quickwit_indexing` | `processed_docs_total`| Number of processed bytes by index, source and processed status in [`valid`, `missing_field`, `parsing_error`, `transform_error`] | [`index`, `source`, `docs_processed_status`] | `counter` |
+| `quickwit_indexing` | `processed_docs_total`| Number of processed docs by index, source and processed status in [`valid`, `schema_error`, `parse_error`, `transform_error`] | [`index`, `source`, `docs_processed_status`] | `counter` |
+| `quickwit_indexing` | `processed_docs_total`| Number of processed bytes by index, source and processed status in [`valid`, `schema_error`, `parse_error`, `transform_error`] | [`index`, `source`, `docs_processed_status`] | `counter` |
| `quickwit_indexing` | `available_concurrent_upload_permits`| Number of available concurrent upload permits by component in [`merger`, `indexer`] | [`component`] | `gauge` |
| `quickwit_indexing` | `ongoing_merge_operations`| Number of available concurrent upload permits by component in [`merger`, `indexer`]. | [`index`, `source`] | `gauge` |
diff --git a/monitoring/otel-collector-config.yaml b/monitoring/otel-collector-config.yaml
index 1be656fb4f4..4a1d514b83b 100644
--- a/monitoring/otel-collector-config.yaml
+++ b/monitoring/otel-collector-config.yaml
@@ -20,6 +20,10 @@ exporters:
tls:
insecure: true
+ kafka:
+ brokers:
+ - kafka-broker:29092
+
otlp/qw:
endpoint: host.docker.internal:7281
tls:
@@ -36,7 +40,7 @@ service:
traces:
receivers: [jaeger, otlp]
processors: [batch]
- exporters: [jaeger, otlp/qw]
+ exporters: [jaeger, kafka, otlp/qw]
# metrics:
# receivers: [otlp]
# processors: [batch]
diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock
index 2fb3a4b762f..2f052c4b08d 100644
--- a/quickwit/Cargo.lock
+++ b/quickwit/Cargo.lock
@@ -5302,6 +5302,7 @@ dependencies = [
"oneshot",
"openssl",
"proptest",
+ "prost",
"pulsar",
"quickwit-actors",
"quickwit-aws",
@@ -5312,6 +5313,7 @@ dependencies = [
"quickwit-doc-mapper",
"quickwit-ingest",
"quickwit-metastore",
+ "quickwit-opentelemetry",
"quickwit-proto",
"quickwit-query",
"quickwit-storage",
diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs
index 0c632371c31..c7c044006ce 100644
--- a/quickwit/quickwit-config/src/source_config/mod.rs
+++ b/quickwit/quickwit-config/src/source_config/mod.rs
@@ -201,11 +201,14 @@ impl TestableForRegression for SourceConfig {
}
}
-#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
+#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum SourceInputFormat {
#[default]
Json,
+ OtlpTraceJson,
+ #[serde(alias = "otlp_trace_proto")]
+ OtlpTraceProtobuf,
#[serde(alias = "plain")]
PlainText,
}
diff --git a/quickwit/quickwit-config/src/source_config/serialize.rs b/quickwit/quickwit-config/src/source_config/serialize.rs
index 75a4b23c2ea..ed68e53707a 100644
--- a/quickwit/quickwit-config/src/source_config/serialize.rs
+++ b/quickwit/quickwit-config/src/source_config/serialize.rs
@@ -108,6 +108,12 @@ impl SourceConfigForSerialization {
}
if let Some(transform_config) = &self.transform {
+ if matches!(
+ self.input_format,
+ SourceInputFormat::OtlpTraceJson | SourceInputFormat::OtlpTraceProtobuf
+ ) {
+ bail!("VRL transforms are not supported for OTLP input formats");
+ }
transform_config.validate_vrl_script()?;
}
diff --git a/quickwit/quickwit-indexing/Cargo.toml b/quickwit/quickwit-indexing/Cargo.toml
index 26dda1c92fb..c4280731836 100644
--- a/quickwit/quickwit-indexing/Cargo.toml
+++ b/quickwit/quickwit-indexing/Cargo.toml
@@ -61,6 +61,7 @@ quickwit-directories = { workspace = true }
quickwit-doc-mapper = { workspace = true }
quickwit-ingest = { workspace = true }
quickwit-metastore = { workspace = true }
+quickwit-opentelemetry = { workspace = true }
quickwit-proto = { workspace = true }
quickwit-storage = { workspace = true }
@@ -87,9 +88,10 @@ bytes = { workspace = true }
criterion = { workspace = true, features = ["async_tokio"] }
mockall = { workspace = true }
proptest = { workspace = true }
+prost = { workspace = true }
rand = { workspace = true }
-tempfile = { workspace = true }
reqwest = { workspace = true }
+tempfile = { workspace = true }
quickwit-actors = { workspace = true, features = ["testsuite"] }
quickwit-cluster = { workspace = true, features = ["testsuite"] }
diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs
index 279c0624b0a..1b185e4202e 100644
--- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs
+++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs
@@ -18,15 +18,19 @@
// along with this program. If not, see .
use std::string::FromUtf8Error;
+use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
-use anyhow::Context;
+use anyhow::{bail, Context};
use async_trait::async_trait;
use bytes::Bytes;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_common::runtimes::RuntimeType;
use quickwit_config::{SourceInputFormat, TransformConfig};
use quickwit_doc_mapper::{DocMapper, DocParsingError, JsonObject};
+use quickwit_opentelemetry::otlp::{
+ parse_otlp_spans_json, parse_otlp_spans_protobuf, JsonSpanIterator, OtlpTraceError,
+};
use serde::Serialize;
use serde_json::Value as JsonValue;
use tantivy::schema::{Field, Value};
@@ -43,71 +47,176 @@ use crate::models::{
const PLAIN_TEXT: &str = "plain_text";
-enum InputDoc {
- Json(Bytes),
- PlainText(Bytes),
+pub(super) struct JsonDoc {
+ json_obj: JsonObject,
+ num_bytes: usize,
}
-impl InputDoc {
- fn from_bytes(input_format: &SourceInputFormat, bytes: Bytes) -> Self {
- match input_format {
- SourceInputFormat::Json => InputDoc::Json(bytes),
- SourceInputFormat::PlainText => InputDoc::PlainText(bytes),
+impl JsonDoc {
+ pub fn new(json_obj: JsonObject, num_bytes: usize) -> Self {
+ Self {
+ json_obj,
+ num_bytes,
}
}
- #[cfg(feature = "vrl")]
- fn try_into_vrl_doc(self) -> Result {
- let vrl_doc = match self {
- InputDoc::Json(bytes) => serde_json::from_slice::(&bytes)?,
- InputDoc::PlainText(bytes) => {
- use std::collections::BTreeMap;
- let mut map = BTreeMap::new();
- let key = PLAIN_TEXT.to_string();
- let value = VrlValue::Bytes(bytes);
- map.insert(key, value);
- VrlValue::Object(map)
- }
- };
- Ok(vrl_doc)
+ pub fn try_from_json_value(
+ json_value: JsonValue,
+ num_bytes: usize,
+ ) -> Result {
+ match json_value {
+ JsonValue::Object(json_obj) => Ok(Self::new(json_obj, num_bytes)),
+ _ => Err(DocProcessorError::Parse),
+ }
}
- fn try_into_json_doc(self) -> Result {
- let json_doc = match self {
- InputDoc::Json(doc_bytes) => serde_json::from_slice::(&doc_bytes)?,
- InputDoc::PlainText(doc_bytes) => {
- let mut map = serde_json::Map::with_capacity(1);
- let key = PLAIN_TEXT.to_string();
- let value = String::from_utf8(doc_bytes.to_vec())?;
- map.insert(key, JsonValue::String(value));
- map
- }
- };
- Ok(json_doc)
+ #[cfg(feature = "vrl")]
+ pub fn try_from_vrl_doc(vrl_doc: VrlDoc) -> Result {
+ let json_value = serde_json::to_value(vrl_doc.vrl_value)?;
+ Self::try_from_json_value(json_value, vrl_doc.num_bytes)
}
}
#[derive(Debug)]
pub enum DocProcessorError {
- ParsingError,
- MissingField,
+ Parse,
+ Schema,
#[cfg(feature = "vrl")]
- TransformError(VrlTerminate),
+ Transform(VrlTerminate),
}
impl From for DocProcessorError {
fn from(_error: serde_json::Error) -> Self {
- DocProcessorError::ParsingError
+ DocProcessorError::Parse
}
}
impl From for DocProcessorError {
fn from(_error: FromUtf8Error) -> Self {
- DocProcessorError::ParsingError
+ DocProcessorError::Parse
+ }
+}
+
+#[cfg(feature = "vrl")]
+fn try_into_vrl_doc(
+ input_format: SourceInputFormat,
+ raw_doc: Bytes,
+ num_bytes: usize,
+) -> Result {
+ let vrl_value = match input_format {
+ SourceInputFormat::Json => serde_json::from_slice::(&raw_doc)?,
+ SourceInputFormat::PlainText => {
+ let mut map = std::collections::BTreeMap::new();
+ let key = PLAIN_TEXT.to_string();
+ let value = VrlValue::Bytes(raw_doc);
+ map.insert(key, value);
+ VrlValue::Object(map)
+ }
+ SourceInputFormat::OtlpTraceJson | SourceInputFormat::OtlpTraceProtobuf => {
+ panic!("OTP log or trace data does not support VRL transforms")
+ }
+ };
+ let vrl_doc = VrlDoc::new(vrl_value, num_bytes);
+ Ok(vrl_doc)
+}
+
+fn try_into_json_docs(
+ input_format: SourceInputFormat,
+ raw_doc: Bytes,
+ num_bytes: usize,
+) -> JsonDocIterator {
+ match input_format {
+ SourceInputFormat::Json => {
+ let json_doc_result = serde_json::from_slice::(&raw_doc)
+ .map(|json_obj| JsonDoc::new(json_obj, num_bytes));
+ JsonDocIterator::from(json_doc_result)
+ }
+ SourceInputFormat::OtlpTraceJson => {
+ let spans = parse_otlp_spans_json(&raw_doc);
+ JsonDocIterator::from(spans)
+ }
+ SourceInputFormat::OtlpTraceProtobuf => {
+ let spans = parse_otlp_spans_protobuf(&raw_doc);
+ JsonDocIterator::from(spans)
+ }
+ SourceInputFormat::PlainText => {
+ let json_doc_result = String::from_utf8(raw_doc.to_vec()).map(|value| {
+ let mut json_obj = serde_json::Map::with_capacity(1);
+ let key = PLAIN_TEXT.to_string();
+ json_obj.insert(key, JsonValue::String(value));
+ JsonDoc::new(json_obj, num_bytes)
+ });
+ JsonDocIterator::from(json_doc_result)
+ }
+ }
+}
+
+#[cfg(feature = "vrl")]
+fn parse_raw_doc(
+ input_format: SourceInputFormat,
+ raw_doc: Bytes,
+ num_bytes: usize,
+ vrl_program_opt: Option<&mut VrlProgram>,
+) -> JsonDocIterator {
+ let Some(vrl_program) = vrl_program_opt else {
+ return try_into_json_docs(input_format, raw_doc, num_bytes);
+ };
+ let json_doc_result = try_into_vrl_doc(input_format, raw_doc, num_bytes)
+ .and_then(|vrl_doc| vrl_program.transform_doc(vrl_doc))
+ .and_then(JsonDoc::try_from_vrl_doc);
+
+ JsonDocIterator::from(json_doc_result)
+}
+
+#[cfg(not(feature = "vrl"))]
+fn parse_raw_doc(
+ input_format: SourceInputFormat,
+ raw_doc: Bytes,
+ num_bytes: usize,
+ _vrl_program_opt: Option<&mut VrlProgram>,
+) -> JsonDocIterator {
+ try_into_json_docs(input_format, raw_doc, num_bytes)
+}
+
+enum JsonDocIterator {
+ One(Option>),
+ Spans(JsonSpanIterator),
+}
+
+impl Iterator for JsonDocIterator {
+ type Item = Result;
+
+ fn next(&mut self) -> Option {
+ match self {
+ Self::One(opt) => opt.take(),
+ Self::Spans(spans) => spans
+ .next()
+ .map(|(json_value, num_bytes)| JsonDoc::try_from_json_value(json_value, num_bytes)),
+ }
+ }
+}
+
+impl From> for JsonDocIterator
+where E: Into
+{
+ fn from(result: Result) -> Self {
+ match result {
+ Ok(json_doc) => Self::One(Some(Ok(json_doc))),
+ Err(error) => Self::One(Some(Err(error.into()))),
+ }
+ }
+}
+
+impl From> for JsonDocIterator {
+ fn from(result: Result) -> Self {
+ match result {
+ Ok(json_doc) => Self::Spans(json_doc),
+ Err(_) => Self::One(Some(Err(DocProcessorError::Parse))),
+ }
}
}
-#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
+#[derive(Debug, Serialize)]
pub struct DocProcessorCounters {
index_id: String,
source_id: String,
@@ -115,19 +224,18 @@ pub struct DocProcessorCounters {
/// into 4 categories:
/// - number of docs that could not be parsed.
/// - number of docs that could not be transformed.
- /// - number of docs without a timestamp (if the index has no timestamp field,
- /// then this counter is equal to zero)
+ /// - number of docs for which the doc mapper returnd an error.
/// - number of valid docs.
- pub num_parse_errors: u64,
- pub num_transform_errors: u64,
- pub num_docs_with_missing_fields: u64,
- pub num_valid_docs: u64,
+ pub num_parse_errors: AtomicU64,
+ pub num_transform_errors: AtomicU64,
+ pub num_schema_errors: AtomicU64,
+ pub num_valid_docs: AtomicU64,
/// Number of bytes that went through the indexer
/// during its entire lifetime.
///
/// Includes both valid and invalid documents.
- pub overall_num_bytes: u64,
+ pub num_bytes_total: AtomicU64,
}
impl DocProcessorCounters {
@@ -135,102 +243,71 @@ impl DocProcessorCounters {
Self {
index_id,
source_id,
- num_parse_errors: 0,
- num_transform_errors: 0,
- num_docs_with_missing_fields: 0,
- num_valid_docs: 0,
- overall_num_bytes: 0,
+ num_parse_errors: Default::default(),
+ num_transform_errors: Default::default(),
+ num_schema_errors: Default::default(),
+ num_valid_docs: Default::default(),
+ num_bytes_total: Default::default(),
}
}
/// Returns the overall number of docs that went through the indexer (valid or not).
pub fn num_processed_docs(&self) -> u64 {
- self.num_valid_docs
- + self.num_parse_errors
- + self.num_docs_with_missing_fields
- + self.num_transform_errors
+ self.num_valid_docs.load(Ordering::Relaxed)
+ + self.num_parse_errors.load(Ordering::Relaxed)
+ + self.num_schema_errors.load(Ordering::Relaxed)
+ + self.num_transform_errors.load(Ordering::Relaxed)
}
/// Returns the overall number of docs that were sent to the indexer but were invalid.
/// (For instance, because they were missing a required field or because their because
/// their format was invalid)
pub fn num_invalid_docs(&self) -> u64 {
- self.num_parse_errors + self.num_docs_with_missing_fields + self.num_transform_errors
+ self.num_parse_errors.load(Ordering::Relaxed)
+ + self.num_schema_errors.load(Ordering::Relaxed)
+ + self.num_transform_errors.load(Ordering::Relaxed)
}
- pub fn record_parsing_error(&mut self, num_bytes: u64) {
- self.num_parse_errors += 1;
- self.overall_num_bytes += num_bytes;
- crate::metrics::INDEXER_METRICS
- .processed_docs_total
- .with_label_values([
- self.index_id.as_str(),
- self.source_id.as_str(),
- "parsing_error",
- ])
- .inc();
- crate::metrics::INDEXER_METRICS
- .processed_bytes
- .with_label_values([
- self.index_id.as_str(),
- self.source_id.as_str(),
- "parsing_error",
- ])
- .inc_by(num_bytes);
- }
+ pub fn record_valid(&self, num_bytes: u64) {
+ self.num_valid_docs.fetch_add(1, Ordering::Relaxed);
+ self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed);
- pub fn record_transform_error(&mut self, num_bytes: u64) {
- self.num_transform_errors += 1;
- self.overall_num_bytes += num_bytes;
crate::metrics::INDEXER_METRICS
.processed_docs_total
- .with_label_values([
- self.index_id.as_str(),
- self.source_id.as_str(),
- "transform_error",
- ])
+ .with_label_values([&self.index_id, &self.source_id, "valid"])
.inc();
crate::metrics::INDEXER_METRICS
.processed_bytes
- .with_label_values([
- self.index_id.as_str(),
- self.source_id.as_str(),
- "transform_error",
- ])
+ .with_label_values([&self.index_id, &self.source_id, "valid"])
.inc_by(num_bytes);
}
- pub fn record_missing_field(&mut self, num_bytes: u64) {
- self.num_docs_with_missing_fields += 1;
- self.overall_num_bytes += num_bytes;
+ pub fn record_error(&self, error: DocProcessorError, num_bytes: u64) {
+ let label = match error {
+ DocProcessorError::Parse => {
+ self.num_parse_errors.fetch_add(1, Ordering::Relaxed);
+ "parse_error"
+ }
+ DocProcessorError::Schema => {
+ self.num_schema_errors.fetch_add(1, Ordering::Relaxed);
+ "schema_error"
+ }
+ #[cfg(feature = "vrl")]
+ DocProcessorError::Transform(_) => {
+ self.num_transform_errors.fetch_add(1, Ordering::Relaxed);
+ "transform_error"
+ }
+ };
crate::metrics::INDEXER_METRICS
.processed_docs_total
- .with_label_values([
- self.index_id.as_str(),
- self.source_id.as_str(),
- "missing_field",
- ])
+ .with_label_values([&self.index_id, &self.source_id, label])
.inc();
- crate::metrics::INDEXER_METRICS
- .processed_bytes
- .with_label_values([
- self.index_id.as_str(),
- self.source_id.as_str(),
- "missing_field",
- ])
- .inc_by(num_bytes);
- }
- pub fn record_valid(&mut self, num_bytes: u64) {
- self.num_valid_docs += 1;
- self.overall_num_bytes += num_bytes;
- crate::metrics::INDEXER_METRICS
- .processed_docs_total
- .with_label_values([self.index_id.as_str(), self.source_id.as_str(), "valid"])
- .inc();
+ self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed);
+
crate::metrics::INDEXER_METRICS
.processed_bytes
- .with_label_values([self.index_id.as_str(), self.source_id.as_str(), "valid"])
+ .with_label_values([&self.index_id, &self.source_id, label])
.inc_by(num_bytes);
}
}
@@ -239,7 +316,7 @@ pub struct DocProcessor {
doc_mapper: Arc,
indexer_mailbox: Mailbox,
timestamp_field_opt: Option,
- counters: DocProcessorCounters,
+ counters: Arc,
publish_lock: PublishLock,
#[cfg(feature = "vrl")]
transform_opt: Option,
@@ -255,15 +332,15 @@ impl DocProcessor {
transform_config_opt: Option,
input_format: SourceInputFormat,
) -> anyhow::Result {
- let timestamp_field_opt = extract_timestamp_field(doc_mapper.as_ref())?;
+ let timestamp_field_opt = extract_timestamp_field(&*doc_mapper)?;
if cfg!(not(feature = "vrl")) && transform_config_opt.is_some() {
- anyhow::bail!("VRL is not enabled. please recompile with the `vrl` feature")
+ bail!("VRL is not enabled. please recompile with the `vrl` feature")
}
let doc_processor = Self {
doc_mapper,
indexer_mailbox,
timestamp_field_opt,
- counters: DocProcessorCounters::new(index_id, source_id),
+ counters: Arc::new(DocProcessorCounters::new(index_id, source_id)),
publish_lock: PublishLock::default(),
#[cfg(feature = "vrl")]
transform_opt: transform_config_opt
@@ -288,48 +365,45 @@ impl DocProcessor {
let timestamp = doc
.get_first(timestamp_field)
.and_then(|val| val.as_datetime())
- .ok_or(DocProcessorError::MissingField)?;
+ .ok_or(DocProcessorError::Schema)?;
Ok(Some(timestamp))
}
- #[cfg(feature = "vrl")]
- fn get_json_doc(&mut self, input_doc: InputDoc) -> Result {
- if let Some(vrl_program) = self.transform_opt.as_mut() {
- let vrl_doc = input_doc.try_into_vrl_doc()?;
- let transformed_vrl_doc = vrl_program.transform_doc(vrl_doc)?;
- if let Ok(JsonValue::Object(json_doc)) = serde_json::to_value(transformed_vrl_doc) {
- Ok(json_doc)
- } else {
- Err(DocProcessorError::ParsingError)
+ fn process_raw_doc(&mut self, raw_doc: Bytes, processed_docs: &mut Vec) {
+ let num_bytes = raw_doc.len();
+
+ #[cfg(feature = "vrl")]
+ let transform_opt = self.transform_opt.as_mut();
+ #[cfg(not(feature = "vrl"))]
+ let transform_opt: Option<&mut VrlProgram> = None;
+
+ for json_doc_result in parse_raw_doc(self.input_format, raw_doc, num_bytes, transform_opt) {
+ let processed_doc_result =
+ json_doc_result.and_then(|json_doc| self.process_json_doc(json_doc));
+
+ match processed_doc_result {
+ Ok(processed_doc) => {
+ self.counters.record_valid(processed_doc.num_bytes as u64);
+ processed_docs.push(processed_doc);
+ }
+ Err(error) => {
+ self.counters.record_error(error, num_bytes as u64);
+ }
}
- } else {
- input_doc.try_into_json_doc()
}
}
- #[cfg(not(feature = "vrl"))]
- fn get_json_doc(&mut self, input_doc: InputDoc) -> Result {
- input_doc.try_into_json_doc()
- }
-
- fn process_document(
- &mut self,
- doc_bytes: Bytes,
- ctx: &ActorContext,
- ) -> Result {
- let _protect_guard = ctx.protect_zone();
+ fn process_json_doc(&self, json_doc: JsonDoc) -> Result {
+ let num_bytes = json_doc.num_bytes;
- let num_bytes = doc_bytes.len();
- let input_doc = InputDoc::from_bytes(&self.input_format, doc_bytes);
- let json_doc: JsonObject = self.get_json_doc(input_doc)?;
let (partition, doc) = self
.doc_mapper
- .doc_from_json_obj(json_doc)
+ .doc_from_json_obj(json_doc.json_obj)
.map_err(|error| {
warn!(error=?error);
match error {
- DocParsingError::RequiredField(_) => DocProcessorError::MissingField,
- _ => DocProcessorError::ParsingError,
+ DocParsingError::RequiredField(_) => DocProcessorError::Schema,
+ _ => DocProcessorError::Parse,
}
})?;
let timestamp_opt = self.extract_timestamp(&doc)?;
@@ -353,9 +427,12 @@ fn extract_timestamp_field(doc_mapper: &dyn DocMapper) -> anyhow::Result