From 0b5ddafadcc5736de28fca3f823d5f8cd04a9d10 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Fri, 10 Nov 2023 16:23:51 +0000 Subject: [PATCH] =?UTF-8?q?fix:=20replace=20actor=20heartbeat=20duration?= =?UTF-8?q?=20with=20emit=20timeout=20as=20the=20loop=20i=E2=80=A6=20(#411?= =?UTF-8?q?2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: replace actor heartbeat duration with emit timeout as the loop interval for sources * chore: remove dead code * chore: use Bytesize for BATCH_NUM_BYTES_LIMIT const --- .../src/source/gcp_pubsub_source.rs | 5 ++--- .../src/source/ingest/mod.rs | 5 ++--- .../src/source/kafka_source.rs | 20 +++++-------------- .../src/source/kinesis/kinesis_source.rs | 11 +++++----- quickwit/quickwit-indexing/src/source/mod.rs | 16 +++++++++++++++ .../src/source/pulsar_source.rs | 16 ++------------- 6 files changed, 33 insertions(+), 40 deletions(-) diff --git a/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs b/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs index 389ec96d366..a22e5e2c454 100644 --- a/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs +++ b/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs @@ -37,11 +37,10 @@ use serde_json::{json, Value as JsonValue}; use tokio::time; use tracing::{debug, info, warn}; -use super::SourceActor; +use super::{SourceActor, BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT}; use crate::actors::DocProcessor; use crate::source::{BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory}; -const BATCH_NUM_BYTES_LIMIT: u64 = 5_000_000; const DEFAULT_MAX_MESSAGES_PER_PULL: i32 = 1_000; pub struct GcpPubSubSourceFactory; @@ -168,7 +167,7 @@ impl Source for GcpPubSubSource { ) -> Result { let now = Instant::now(); let mut batch: BatchBuilder = BatchBuilder::default(); - let deadline = time::sleep(*quickwit_actors::HEARTBEAT / 2); + let deadline = time::sleep(EMIT_BATCHES_TIMEOUT); tokio::pin!(deadline); // TODO: ensure we ACK the message after being commit: at least once // TODO: ensure we increase_ack_deadline for the items diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index 1d7aee86ed1..53aa50a4b1e 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -45,12 +45,11 @@ use ulid::Ulid; use super::{ Assignment, BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory, + BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT, }; use crate::actors::DocProcessor; use crate::models::{NewPublishLock, NewPublishToken, PublishLock}; -const EMIT_BATCHES_TIMEOUT: Duration = Duration::from_millis(if cfg!(test) { 100 } else { 1_000 }); - pub struct IngestSourceFactory; #[async_trait] @@ -309,7 +308,7 @@ impl Source for IngestSource { Ok(Ok(fetch_payload)) => { self.process_fetch_response(&mut batch_builder, fetch_payload)?; - if batch_builder.num_bytes >= 5 * 1024 * 1024 { + if batch_builder.num_bytes >= BATCH_NUM_BYTES_LIMIT { break; } } diff --git a/quickwit/quickwit-indexing/src/source/kafka_source.rs b/quickwit/quickwit-indexing/src/source/kafka_source.rs index 845a1a59b72..52c97722a47 100644 --- a/quickwit/quickwit-indexing/src/source/kafka_source.rs +++ b/quickwit/quickwit-indexing/src/source/kafka_source.rs @@ -49,20 +49,10 @@ use tracing::{debug, info, warn}; use crate::actors::DocProcessor; use crate::models::{NewPublishLock, PublishLock}; -use crate::source::{BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory}; - -/// Number of bytes after which we cut a new batch. -/// -/// We try to emit chewable batches for the indexer. -/// One batch = one message to the indexer actor. -/// -/// If batches are too large: -/// - we might not be able to observe the state of the indexer for 5 seconds. -/// - we will be needlessly occupying resident memory in the mailbox. -/// - we will not have a precise control of the timeout before commit. -/// -/// 5MB seems like a good one size fits all value. -const BATCH_NUM_BYTES_LIMIT: u64 = 5_000_000; +use crate::source::{ + BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory, + BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT, +}; type GroupId = String; @@ -486,7 +476,7 @@ impl Source for KafkaSource { ) -> Result { let now = Instant::now(); let mut batch = BatchBuilder::default(); - let deadline = time::sleep(*quickwit_actors::HEARTBEAT / 2); + let deadline = time::sleep(EMIT_BATCHES_TIMEOUT); tokio::pin!(deadline); loop { diff --git a/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs b/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs index 7b3ba961781..5df7892a39f 100644 --- a/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs +++ b/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs @@ -43,9 +43,10 @@ use super::shard_consumer::{ShardConsumer, ShardConsumerHandle, ShardConsumerMes use crate::actors::DocProcessor; use crate::models::RawDocBatch; use crate::source::kinesis::helpers::get_kinesis_client; -use crate::source::{Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory}; - -const TARGET_BATCH_NUM_BYTES: u64 = 5_000_000; +use crate::source::{ + Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory, BATCH_NUM_BYTES_LIMIT, + EMIT_BATCHES_TIMEOUT, +}; type ShardId = String; @@ -215,7 +216,7 @@ impl Source for KinesisSource { let mut docs = Vec::new(); let mut checkpoint_delta = SourceCheckpointDelta::default(); - let deadline = time::sleep(*quickwit_actors::HEARTBEAT / 2); + let deadline = time::sleep(EMIT_BATCHES_TIMEOUT); tokio::pin!(deadline); loop { @@ -278,7 +279,7 @@ impl Source for KinesisSource { ).context("failed to record partition delta")?; } } - if batch_num_bytes >= TARGET_BATCH_NUM_BYTES { + if batch_num_bytes >= BATCH_NUM_BYTES_LIMIT { break; } } diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index 94bf12fbb94..d3325fc2195 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -77,6 +77,7 @@ use std::time::Duration; use async_trait::async_trait; use bytes::Bytes; +use bytesize::ByteSize; pub use file_source::{FileSource, FileSourceFactory}; #[cfg(feature = "gcp-pubsub")] pub use gcp_pubsub_source::{GcpPubSubSource, GcpPubSubSourceFactory}; @@ -109,6 +110,21 @@ use crate::models::RawDocBatch; use crate::source::ingest::IngestSourceFactory; use crate::source::ingest_api_source::IngestApiSourceFactory; +/// Number of bytes after which we cut a new batch. +/// +/// We try to emit chewable batches for the indexer. +/// One batch = one message to the indexer actor. +/// +/// If batches are too large: +/// - we might not be able to observe the state of the indexer for 5 seconds. +/// - we will be needlessly occupying resident memory in the mailbox. +/// - we will not have a precise control of the timeout before commit. +/// +/// 5MB seems like a good one size fits all value. +const BATCH_NUM_BYTES_LIMIT: u64 = ByteSize::mib(5).as_u64(); + +const EMIT_BATCHES_TIMEOUT: Duration = Duration::from_millis(if cfg!(test) { 100 } else { 1_000 }); + /// Runtime configuration used during execution of a source actor. pub struct SourceRuntimeArgs { pub pipeline_id: IndexingPipelineId, diff --git a/quickwit/quickwit-indexing/src/source/pulsar_source.rs b/quickwit/quickwit-indexing/src/source/pulsar_source.rs index 4ee7718841e..23063c554ff 100644 --- a/quickwit/quickwit-indexing/src/source/pulsar_source.rs +++ b/quickwit/quickwit-indexing/src/source/pulsar_source.rs @@ -42,21 +42,9 @@ use tracing::{debug, info, warn}; use crate::actors::DocProcessor; use crate::source::{ BatchBuilder, Source, SourceActor, SourceContext, SourceRuntimeArgs, TypedSourceFactory, + BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT, }; -/// Number of bytes after which we cut a new batch. -/// -/// We try to emit chewable batches for the indexer. -/// One batch = one message to the indexer actor. -/// -/// If batches are too large: -/// - we might not be able to observe the state of the indexer for 5 seconds. -/// - we will be needlessly occupying resident memory in the mailbox. -/// - we will not have a precise control of the timeout before commit. -/// -/// 5MB seems like a good one size fits all value. -const BATCH_NUM_BYTES_LIMIT: u64 = 5_000_000; - type PulsarConsumer = Consumer; pub struct PulsarSourceFactory; @@ -225,7 +213,7 @@ impl Source for PulsarSource { ) -> Result { let now = Instant::now(); let mut batch = BatchBuilder::default(); - let deadline = time::sleep(*quickwit_actors::HEARTBEAT / 2); + let deadline = time::sleep(EMIT_BATCHES_TIMEOUT); tokio::pin!(deadline); loop {