diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index 08763636beb..29f9f3565f4 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -115,7 +115,11 @@ impl FetchStreamTask { "spawning fetch task" ); let mut has_drained_queue = false; - let mut to_position_inclusive = Position::Beginning; + let mut to_position_inclusive = if self.from_position_inclusive == 0 { + Position::Beginning + } else { + Position::offset(self.from_position_inclusive - 1) + }; loop { if has_drained_queue && self.shard_status_rx.changed().await.is_err() { @@ -626,11 +630,14 @@ pub(super) mod tests { let client_id = "test-client".to_string(); let index_uid: IndexUid = IndexUid::for_test("test-index", 0); let source_id = "test-source".to_string(); + let shard_id = ShardId::from(1); + let queue_id = queue_id(&index_uid, &source_id, &shard_id); + let open_fetch_stream_request = OpenFetchStreamRequest { client_id: client_id.clone(), index_uid: Some(index_uid.clone()), source_id: source_id.clone(), - shard_id: Some(ShardId::from(1)), + shard_id: Some(shard_id.clone()), from_position_exclusive: Some(Position::Beginning), }; let (shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default()); @@ -640,8 +647,6 @@ pub(super) mod tests { shard_status_rx, 1024, ); - let queue_id = queue_id(&index_uid, &source_id, &ShardId::from(1)); - let mut mrecordlog_guard = mrecordlog.write().await; mrecordlog_guard @@ -670,8 +675,8 @@ pub(super) mod tests { let fetch_payload = into_fetch_payload(fetch_message); assert_eq!(fetch_payload.index_uid(), &index_uid); - assert_eq!(fetch_payload.source_id, "test-source"); - assert_eq!(fetch_payload.shard_id(), ShardId::from(1)); + assert_eq!(fetch_payload.source_id, source_id); + assert_eq!(fetch_payload.shard_id(), shard_id); assert_eq!(fetch_payload.from_position_exclusive(), Position::Beginning); assert_eq!( fetch_payload.to_position_inclusive(), @@ -805,15 +810,79 @@ pub(super) mod tests { let fetch_eof = into_fetch_eof(fetch_message); assert_eq!(fetch_eof.index_uid(), &index_uid); - assert_eq!(fetch_eof.source_id, "test-source"); - assert_eq!(fetch_eof.shard_id(), ShardId::from(1)); + assert_eq!(fetch_eof.source_id, source_id); + assert_eq!(fetch_eof.shard_id(), shard_id); assert_eq!(fetch_eof.eof_position, Some(Position::eof(3u64))); fetch_task_handle.await.unwrap(); } #[tokio::test] - async fn test_fetch_task_eof_at_beginning() { + async fn test_fetch_task_signals_eof() { + let tempdir = tempfile::tempdir().unwrap(); + let mrecordlog = Arc::new(RwLock::new(Some( + MultiRecordLogAsync::open(tempdir.path()).await.unwrap(), + ))); + let client_id = "test-client".to_string(); + let index_uid: IndexUid = IndexUid::for_test("test-index", 0); + let source_id = "test-source".to_string(); + let shard_id = ShardId::from(1); + let queue_id = queue_id(&index_uid, &source_id, &shard_id); + + let mut mrecordlog_guard = mrecordlog.write().await; + + mrecordlog_guard + .as_mut() + .unwrap() + .create_queue(&queue_id) + .await + .unwrap(); + + mrecordlog_guard + .as_mut() + .unwrap() + .append_records( + &queue_id, + None, + std::iter::once(MRecord::new_doc("test-doc-foo").encode()), + ) + .await + .unwrap(); + drop(mrecordlog_guard); + + let open_fetch_stream_request = OpenFetchStreamRequest { + client_id: client_id.clone(), + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + shard_id: Some(shard_id.clone()), + from_position_exclusive: Some(Position::offset(0u64)), + }; + let shard_status = (ShardState::Closed, Position::offset(0u64)); + let (_shard_status_tx, shard_status_rx) = watch::channel(shard_status); + + let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn( + open_fetch_stream_request, + mrecordlog.clone(), + shard_status_rx, + 1024, + ); + let fetch_message = timeout(Duration::from_millis(100), fetch_stream.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + let fetch_eof = into_fetch_eof(fetch_message); + + assert_eq!(fetch_eof.index_uid(), &index_uid); + assert_eq!(fetch_eof.source_id, source_id); + assert_eq!(fetch_eof.shard_id(), shard_id); + assert_eq!(fetch_eof.eof_position, Some(Position::eof(0u64).as_eof())); + + fetch_task_handle.await.unwrap(); + } + + #[tokio::test] + async fn test_fetch_task_signals_eof_at_beginning() { let tempdir = tempfile::tempdir().unwrap(); let mrecordlog = Arc::new(RwLock::new(Some( MultiRecordLogAsync::open(tempdir.path()).await.unwrap(), @@ -821,11 +890,14 @@ pub(super) mod tests { let client_id = "test-client".to_string(); let index_uid: IndexUid = IndexUid::for_test("test-index", 0); let source_id = "test-source".to_string(); + let shard_id = ShardId::from(1); + let queue_id = queue_id(&index_uid, &source_id, &shard_id); + let open_fetch_stream_request = OpenFetchStreamRequest { client_id: client_id.clone(), index_uid: Some(index_uid.clone()), source_id: source_id.clone(), - shard_id: Some(ShardId::from(1)), + shard_id: Some(shard_id.clone()), from_position_exclusive: Some(Position::Beginning), }; let (shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default()); @@ -835,8 +907,6 @@ pub(super) mod tests { shard_status_rx, 1024, ); - let queue_id = queue_id(&index_uid, &source_id, &ShardId::from(1)); - let mut mrecordlog_guard = mrecordlog.write().await; mrecordlog_guard @@ -858,8 +928,8 @@ pub(super) mod tests { let fetch_eof = into_fetch_eof(fetch_message); assert_eq!(fetch_eof.index_uid(), &index_uid); - assert_eq!(fetch_eof.source_id, "test-source"); - assert_eq!(fetch_eof.shard_id(), ShardId::from(1)); + assert_eq!(fetch_eof.source_id, source_id); + assert_eq!(fetch_eof.shard_id(), shard_id); assert_eq!(fetch_eof.eof_position, Some(Position::Beginning.as_eof())); fetch_task_handle.await.unwrap(); @@ -874,11 +944,14 @@ pub(super) mod tests { let client_id = "test-client".to_string(); let index_uid: IndexUid = IndexUid::for_test("test-index", 0); let source_id = "test-source".to_string(); + let shard_id = ShardId::from(1); + let queue_id = queue_id(&index_uid, &source_id, &shard_id); + let open_fetch_stream_request = OpenFetchStreamRequest { client_id: client_id.clone(), index_uid: Some(index_uid.clone()), source_id: source_id.clone(), - shard_id: Some(ShardId::from(1)), + shard_id: Some(shard_id.clone()), from_position_exclusive: Some(Position::offset(0u64)), }; let (shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default()); @@ -888,8 +961,6 @@ pub(super) mod tests { shard_status_rx, 1024, ); - let queue_id = queue_id(&index_uid, &source_id, &ShardId::from(1)); - let mut mrecordlog_guard = mrecordlog.write().await; mrecordlog_guard @@ -950,8 +1021,8 @@ pub(super) mod tests { let fetch_payload = into_fetch_payload(fetch_message); assert_eq!(fetch_payload.index_uid(), &index_uid); - assert_eq!(fetch_payload.source_id, "test-source"); - assert_eq!(fetch_payload.shard_id(), ShardId::from(1)); + assert_eq!(fetch_payload.source_id, source_id); + assert_eq!(fetch_payload.shard_id(), shard_id); assert_eq!( fetch_payload.from_position_exclusive(), Position::offset(0u64) @@ -983,11 +1054,13 @@ pub(super) mod tests { let client_id = "test-client".to_string(); let index_uid: IndexUid = IndexUid::for_test("test-index", 0); let source_id = "test-source".to_string(); + let shard_id = ShardId::from(1); + let open_fetch_stream_request = OpenFetchStreamRequest { client_id: client_id.clone(), index_uid: Some(index_uid.clone()), source_id: source_id.clone(), - shard_id: Some(ShardId::from(1)), + shard_id: Some(shard_id.clone()), from_position_exclusive: Some(Position::Beginning), }; let (_shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default()); @@ -1016,11 +1089,14 @@ pub(super) mod tests { let client_id = "test-client".to_string(); let index_uid: IndexUid = IndexUid::for_test("test-index", 0); let source_id = "test-source".to_string(); + let shard_id = ShardId::from(1); + let queue_id = queue_id(&index_uid, &source_id, &shard_id); + let open_fetch_stream_request = OpenFetchStreamRequest { client_id: client_id.clone(), index_uid: Some(index_uid.clone()), source_id: source_id.clone(), - shard_id: Some(ShardId::from(1)), + shard_id: Some(shard_id.clone()), from_position_exclusive: Some(Position::Beginning), }; let (shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default()); @@ -1030,8 +1106,6 @@ pub(super) mod tests { shard_status_rx, 30, ); - let queue_id = queue_id(&index_uid, &source_id, &ShardId::from(1)); - let mut mrecordlog_guard = mrecordlog.write().await; mrecordlog_guard @@ -1159,8 +1233,8 @@ pub(super) mod tests { let fetch_payload = FetchPayload { index_uid: Some(index_uid.clone()), - source_id: "test-source".into(), - shard_id: Some(ShardId::from(1)), + source_id: source_id.clone(), + shard_id: Some(shard_id.clone()), mrecord_batch: MRecordBatch::for_test(["\0\0test-doc-foo"]), from_position_exclusive: Some(Position::offset(0u64)), to_position_inclusive: Some(Position::offset(1u64)), @@ -1170,8 +1244,8 @@ pub(super) mod tests { let fetch_eof = FetchEof { index_uid: Some(index_uid.clone()), - source_id: "test-source".into(), - shard_id: Some(ShardId::from(1)), + source_id: source_id.clone(), + shard_id: Some(shard_id.clone()), eof_position: Some(Position::eof(1u64)), }; let fetch_message = FetchMessage::new_eof(fetch_eof); @@ -1271,8 +1345,8 @@ pub(super) mod tests { let fetch_payload = FetchPayload { index_uid: Some(index_uid.clone()), - source_id: "test-source".into(), - shard_id: Some(ShardId::from(1)), + source_id: source_id.clone(), + shard_id: Some(shard_id.clone()), mrecord_batch: MRecordBatch::for_test(["\0\0test-doc-foo"]), from_position_exclusive: Some(Position::offset(0u64)), to_position_inclusive: Some(Position::offset(1u64)), @@ -1282,8 +1356,8 @@ pub(super) mod tests { let fetch_eof = FetchEof { index_uid: Some(index_uid.clone()), - source_id: "test-source".into(), - shard_id: Some(ShardId::from(1)), + source_id: source_id.clone(), + shard_id: Some(shard_id.clone()), eof_position: Some(Position::eof(1u64)), }; let fetch_message = FetchMessage::new_eof(fetch_eof); @@ -1382,8 +1456,8 @@ pub(super) mod tests { let fetch_payload = FetchPayload { index_uid: Some(index_uid.clone()), - source_id: "test-source".into(), - shard_id: Some(ShardId::from(1)), + source_id: source_id.clone(), + shard_id: Some(shard_id.clone()), mrecord_batch: MRecordBatch::for_test(["\0\0test-doc-foo"]), from_position_exclusive: Some(Position::offset(0u64)), to_position_inclusive: Some(Position::offset(1u64)), @@ -1396,8 +1470,8 @@ pub(super) mod tests { let fetch_eof = FetchEof { index_uid: Some(index_uid.clone()), - source_id: "test-source".into(), - shard_id: Some(ShardId::from(1)), + source_id: source_id.clone(), + shard_id: Some(shard_id.clone()), eof_position: Some(Position::eof(1u64)), }; let fetch_message = FetchMessage::new_eof(fetch_eof); @@ -1568,8 +1642,8 @@ pub(super) mod tests { let fetch_payload = FetchPayload { index_uid: Some(index_uid.clone()), - source_id: "test-source".into(), - shard_id: Some(ShardId::from(1)), + source_id: source_id.clone(), + shard_id: Some(shard_id.clone()), mrecord_batch: MRecordBatch::for_test(["\0\0test-doc-foo"]), from_position_exclusive: Some(Position::offset(0u64)), to_position_inclusive: Some(Position::offset(1u64)), @@ -1582,8 +1656,8 @@ pub(super) mod tests { let fetch_payload = FetchPayload { index_uid: Some(index_uid.clone()), - source_id: "test-source".into(), - shard_id: Some(ShardId::from(1)), + source_id: source_id.clone(), + shard_id: Some(shard_id.clone()), mrecord_batch: MRecordBatch::for_test(["\0\0test-doc-bar"]), from_position_exclusive: Some(Position::offset(1u64)), to_position_inclusive: Some(Position::offset(2u64)),