Skip to content

Commit

Permalink
fix: replace actor heartbeat duration with emit timeout as the loop i… (
Browse files Browse the repository at this point in the history
#4112)

* fix: replace actor heartbeat duration with emit timeout as the loop interval for sources

* chore: remove dead code

* chore: use Bytesize for BATCH_NUM_BYTES_LIMIT const
  • Loading branch information
etolbakov authored Nov 10, 2023
1 parent 9e95ec9 commit 0b5ddaf
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 40 deletions.
5 changes: 2 additions & 3 deletions quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ use serde_json::{json, Value as JsonValue};
use tokio::time;
use tracing::{debug, info, warn};

use super::SourceActor;
use super::{SourceActor, BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT};
use crate::actors::DocProcessor;
use crate::source::{BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory};

const BATCH_NUM_BYTES_LIMIT: u64 = 5_000_000;
const DEFAULT_MAX_MESSAGES_PER_PULL: i32 = 1_000;

pub struct GcpPubSubSourceFactory;
Expand Down Expand Up @@ -168,7 +167,7 @@ impl Source for GcpPubSubSource {
) -> Result<Duration, ActorExitStatus> {
let now = Instant::now();
let mut batch: BatchBuilder = BatchBuilder::default();
let deadline = time::sleep(*quickwit_actors::HEARTBEAT / 2);
let deadline = time::sleep(EMIT_BATCHES_TIMEOUT);
tokio::pin!(deadline);
// TODO: ensure we ACK the message after being commit: at least once
// TODO: ensure we increase_ack_deadline for the items
Expand Down
5 changes: 2 additions & 3 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ use ulid::Ulid;

use super::{
Assignment, BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory,
BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT,
};
use crate::actors::DocProcessor;
use crate::models::{NewPublishLock, NewPublishToken, PublishLock};

const EMIT_BATCHES_TIMEOUT: Duration = Duration::from_millis(if cfg!(test) { 100 } else { 1_000 });

pub struct IngestSourceFactory;

#[async_trait]
Expand Down Expand Up @@ -309,7 +308,7 @@ impl Source for IngestSource {
Ok(Ok(fetch_payload)) => {
self.process_fetch_response(&mut batch_builder, fetch_payload)?;

if batch_builder.num_bytes >= 5 * 1024 * 1024 {
if batch_builder.num_bytes >= BATCH_NUM_BYTES_LIMIT {
break;
}
}
Expand Down
20 changes: 5 additions & 15 deletions quickwit/quickwit-indexing/src/source/kafka_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,10 @@ use tracing::{debug, info, warn};

use crate::actors::DocProcessor;
use crate::models::{NewPublishLock, PublishLock};
use crate::source::{BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory};

/// Number of bytes after which we cut a new batch.
///
/// We try to emit chewable batches for the indexer.
/// One batch = one message to the indexer actor.
///
/// If batches are too large:
/// - we might not be able to observe the state of the indexer for 5 seconds.
/// - we will be needlessly occupying resident memory in the mailbox.
/// - we will not have a precise control of the timeout before commit.
///
/// 5MB seems like a good one size fits all value.
const BATCH_NUM_BYTES_LIMIT: u64 = 5_000_000;
use crate::source::{
BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory,
BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT,
};

type GroupId = String;

Expand Down Expand Up @@ -486,7 +476,7 @@ impl Source for KafkaSource {
) -> Result<Duration, ActorExitStatus> {
let now = Instant::now();
let mut batch = BatchBuilder::default();
let deadline = time::sleep(*quickwit_actors::HEARTBEAT / 2);
let deadline = time::sleep(EMIT_BATCHES_TIMEOUT);
tokio::pin!(deadline);

loop {
Expand Down
11 changes: 6 additions & 5 deletions quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ use super::shard_consumer::{ShardConsumer, ShardConsumerHandle, ShardConsumerMes
use crate::actors::DocProcessor;
use crate::models::RawDocBatch;
use crate::source::kinesis::helpers::get_kinesis_client;
use crate::source::{Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory};

const TARGET_BATCH_NUM_BYTES: u64 = 5_000_000;
use crate::source::{
Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory, BATCH_NUM_BYTES_LIMIT,
EMIT_BATCHES_TIMEOUT,
};

type ShardId = String;

Expand Down Expand Up @@ -215,7 +216,7 @@ impl Source for KinesisSource {
let mut docs = Vec::new();
let mut checkpoint_delta = SourceCheckpointDelta::default();

let deadline = time::sleep(*quickwit_actors::HEARTBEAT / 2);
let deadline = time::sleep(EMIT_BATCHES_TIMEOUT);
tokio::pin!(deadline);

loop {
Expand Down Expand Up @@ -278,7 +279,7 @@ impl Source for KinesisSource {
).context("failed to record partition delta")?;
}
}
if batch_num_bytes >= TARGET_BATCH_NUM_BYTES {
if batch_num_bytes >= BATCH_NUM_BYTES_LIMIT {
break;
}
}
Expand Down
16 changes: 16 additions & 0 deletions quickwit/quickwit-indexing/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ use std::time::Duration;

use async_trait::async_trait;
use bytes::Bytes;
use bytesize::ByteSize;
pub use file_source::{FileSource, FileSourceFactory};
#[cfg(feature = "gcp-pubsub")]
pub use gcp_pubsub_source::{GcpPubSubSource, GcpPubSubSourceFactory};
Expand Down Expand Up @@ -109,6 +110,21 @@ use crate::models::RawDocBatch;
use crate::source::ingest::IngestSourceFactory;
use crate::source::ingest_api_source::IngestApiSourceFactory;

/// Number of bytes after which we cut a new batch.
///
/// We try to emit chewable batches for the indexer.
/// One batch = one message to the indexer actor.
///
/// If batches are too large:
/// - we might not be able to observe the state of the indexer for 5 seconds.
/// - we will be needlessly occupying resident memory in the mailbox.
/// - we will not have a precise control of the timeout before commit.
///
/// 5MB seems like a good one size fits all value.
const BATCH_NUM_BYTES_LIMIT: u64 = ByteSize::mib(5).as_u64();

const EMIT_BATCHES_TIMEOUT: Duration = Duration::from_millis(if cfg!(test) { 100 } else { 1_000 });

/// Runtime configuration used during execution of a source actor.
pub struct SourceRuntimeArgs {
pub pipeline_id: IndexingPipelineId,
Expand Down
16 changes: 2 additions & 14 deletions quickwit/quickwit-indexing/src/source/pulsar_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,9 @@ use tracing::{debug, info, warn};
use crate::actors::DocProcessor;
use crate::source::{
BatchBuilder, Source, SourceActor, SourceContext, SourceRuntimeArgs, TypedSourceFactory,
BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT,
};

/// Number of bytes after which we cut a new batch.
///
/// We try to emit chewable batches for the indexer.
/// One batch = one message to the indexer actor.
///
/// If batches are too large:
/// - we might not be able to observe the state of the indexer for 5 seconds.
/// - we will be needlessly occupying resident memory in the mailbox.
/// - we will not have a precise control of the timeout before commit.
///
/// 5MB seems like a good one size fits all value.
const BATCH_NUM_BYTES_LIMIT: u64 = 5_000_000;

type PulsarConsumer = Consumer<PulsarMessage, TokioExecutor>;

pub struct PulsarSourceFactory;
Expand Down Expand Up @@ -225,7 +213,7 @@ impl Source for PulsarSource {
) -> Result<Duration, ActorExitStatus> {
let now = Instant::now();
let mut batch = BatchBuilder::default();
let deadline = time::sleep(*quickwit_actors::HEARTBEAT / 2);
let deadline = time::sleep(EMIT_BATCHES_TIMEOUT);
tokio::pin!(deadline);

loop {
Expand Down

0 comments on commit 0b5ddaf

Please sign in to comment.