Skip to content

Commit

Permalink
fix endless loop in ingest_v2 (#5252)
Browse files Browse the repository at this point in the history
* fix endless loop in ingest_v2

Fix endless loop by temporarily increasing the buffer size.
Alternative would be to reject the message, but I'm not sure this should be done
here.

Based on proposed solution of @Stool233

Fixes #5240

* Apply suggestions from code review

---------

Co-authored-by: Paul Masurel <[email protected]>
  • Loading branch information
PSeitz and fulmicoton authored Jul 25, 2024
1 parent d89287f commit b6cccd6
Showing 1 changed file with 79 additions and 1 deletion.
80 changes: 79 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ impl FetchStreamTask {
break;
};
for Record { payload, .. } in mrecords {
if mrecord_buffer.len() + payload.len() > mrecord_buffer.capacity() {
// Accept at least one message
if !mrecord_buffer.is_empty()
&& (mrecord_buffer.len() + payload.len() > mrecord_buffer.capacity())
{
has_drained_queue = false;
break;
}
Expand Down Expand Up @@ -1199,6 +1202,81 @@ pub(super) mod tests {
);
}

#[tokio::test]
async fn test_fetch_task_batch_num_bytes_less_than_record_payload() {
let tempdir = tempfile::tempdir().unwrap();
let mrecordlog = Arc::new(RwLock::new(Some(
MultiRecordLogAsync::open(tempdir.path()).await.unwrap(),
)));
let client_id = "test-client".to_string();
let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
let source_id = "test-source".to_string();
let shard_id = ShardId::from(1);
let queue_id = queue_id(&index_uid, &source_id, &shard_id);

let open_fetch_stream_request = OpenFetchStreamRequest {
client_id: client_id.clone(),
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
shard_id: Some(shard_id.clone()),
from_position_exclusive: Some(Position::Beginning),
};
let (shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default());
let (mut fetch_stream, _fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
mrecordlog.clone(),
shard_status_rx,
10, //< we request batch larger than 10 bytes.
);

let mut mrecordlog_guard = mrecordlog.write().await;

mrecordlog_guard
.as_mut()
.unwrap()
.create_queue(&queue_id)
.await
.unwrap();

mrecordlog_guard
.as_mut()
.unwrap()
.append_records(
&queue_id,
None,
// This doc is longer than 10 bytes.
std::iter::once(MRecord::new_doc("test-doc-foo").encode()),
)
.await
.unwrap();

drop(mrecordlog_guard);

let shard_status = (ShardState::Open, Position::offset(1u64));
shard_status_tx.send(shard_status).unwrap();

let fetch_message = timeout(Duration::from_millis(100), fetch_stream.next())
.await
.unwrap()
.unwrap()
.unwrap();

let fetch_payload = into_fetch_payload(fetch_message);

assert_eq!(
fetch_payload
.mrecord_batch
.as_ref()
.unwrap()
.mrecord_lengths,
[14]
);
assert_eq!(
fetch_payload.mrecord_batch.as_ref().unwrap().mrecord_buffer,
"\0\0test-doc-foo"
);
}

#[test]
fn test_select_preferred_and_failover_ingesters() {
let self_node_id: NodeId = "test-ingester-0".into();
Expand Down

0 comments on commit b6cccd6

Please sign in to comment.