Skip to content

Commit

Permalink
Fix actor timeout in ingest integration tests (#5389)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai authored Sep 5, 2024
1 parent 1900197 commit f49b253
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 8 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl Source for GcpPubSubSource {
) -> Result<Duration, ActorExitStatus> {
let now = Instant::now();
let mut batch_builder = BatchBuilder::new(SourceType::PubSub);
let deadline = time::sleep(EMIT_BATCHES_TIMEOUT);
let deadline = time::sleep(*EMIT_BATCHES_TIMEOUT);
tokio::pin!(deadline);
// TODO: ensure we ACK the message after being commit: at least once
// TODO: ensure we increase_ack_deadline for the items
Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,7 @@ impl Source for IngestSource {
let mut batch_builder = BatchBuilder::new(SourceType::IngestV2);

let now = time::Instant::now();
let deadline = now + EMIT_BATCHES_TIMEOUT;

let deadline = now + *EMIT_BATCHES_TIMEOUT;
loop {
match time::timeout_at(deadline, self.fetch_stream.next()).await {
Ok(Ok(fetch_message)) => match fetch_message.message {
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/source/kafka_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ impl Source for KafkaSource {
) -> Result<Duration, ActorExitStatus> {
let now = Instant::now();
let mut batch_builder = BatchBuilder::new(SourceType::Kafka);
let deadline = time::sleep(EMIT_BATCHES_TIMEOUT);
let deadline = time::sleep(*EMIT_BATCHES_TIMEOUT);
tokio::pin!(deadline);

loop {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl Source for KinesisSource {
ctx: &SourceContext,
) -> Result<Duration, ActorExitStatus> {
let mut batch_builder = BatchBuilder::new(SourceType::Kinesis);
let deadline = time::sleep(EMIT_BATCHES_TIMEOUT);
let deadline = time::sleep(*EMIT_BATCHES_TIMEOUT);
tokio::pin!(deadline);

loop {
Expand Down
16 changes: 14 additions & 2 deletions quickwit/quickwit-indexing/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub use gcp_pubsub_source::{GcpPubSubSource, GcpPubSubSourceFactory};
pub use kafka_source::{KafkaSource, KafkaSourceFactory};
#[cfg(feature = "kinesis")]
pub use kinesis::kinesis_source::{KinesisSource, KinesisSourceFactory};
use once_cell::sync::OnceCell;
use once_cell::sync::{Lazy, OnceCell};
#[cfg(feature = "pulsar")]
pub use pulsar_source::{PulsarSource, PulsarSourceFactory};
#[cfg(feature = "sqs")]
Expand Down Expand Up @@ -138,7 +138,19 @@ use crate::source::ingest_api_source::IngestApiSourceFactory;
/// 5MB seems like a good one size fits all value.
const BATCH_NUM_BYTES_LIMIT: u64 = ByteSize::mib(5).as_u64();

const EMIT_BATCHES_TIMEOUT: Duration = Duration::from_millis(if cfg!(test) { 100 } else { 1_000 });
static EMIT_BATCHES_TIMEOUT: Lazy<Duration> = Lazy::new(|| {
if cfg!(any(test, feature = "testsuite")) {
let timeout = Duration::from_millis(100);
assert!(timeout < *quickwit_actors::HEARTBEAT);
timeout
} else {
let timeout = Duration::from_millis(1_000);
if *quickwit_actors::HEARTBEAT < timeout {
error!("QW_ACTOR_HEARTBEAT_SECS smaller than batch timeout");
}
timeout
}
});

/// Runtime configuration used during execution of a source actor.
#[derive(Clone)]
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/source/pulsar_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl Source for PulsarSource {
) -> Result<Duration, ActorExitStatus> {
let now = Instant::now();
let mut batch_builder = BatchBuilder::new(SourceType::Pulsar);
let deadline = time::sleep(EMIT_BATCHES_TIMEOUT);
let deadline = time::sleep(*EMIT_BATCHES_TIMEOUT);
tokio::pin!(deadline);

loop {
Expand Down

0 comments on commit f49b253

Please sign in to comment.