diff --git a/quickwit/quickwit-cli/tests/cli.rs b/quickwit/quickwit-cli/tests/cli.rs index 524098537b6..63a7b00d29c 100644 --- a/quickwit/quickwit-cli/tests/cli.rs +++ b/quickwit/quickwit-cli/tests/cli.rs @@ -285,6 +285,45 @@ async fn test_ingest_docs_cli() { )); } +#[tokio::test] +async fn test_reingest_same_file_cli() { + quickwit_common::setup_logging_for_tests(); + let index_id = append_random_suffix("test-index-simple"); + let test_env = create_test_env(index_id.clone(), TestStorageType::LocalFileSystem) + .await + .unwrap(); + test_env.start_server().await.unwrap(); + create_logs_index(&test_env).await.unwrap(); + let index_uid = test_env.index_metadata().await.unwrap().index_uid; + + for _ in 0..2 { + let args = LocalIngestDocsArgs { + config_uri: test_env.resource_files.config.clone(), + index_id: index_id.clone(), + input_path_opt: Some(test_env.resource_files.log_docs.clone()), + input_format: SourceInputFormat::Json, + overwrite: false, + clear_cache: true, + vrl_script: None, + }; + + local_ingest_docs_cli(args).await.unwrap(); + } + + let splits_metadata: Vec = test_env + .metastore() + .await + .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) + .await + .unwrap() + .collect_splits_metadata() + .await + .unwrap(); + + assert_eq!(splits_metadata.len(), 1); + assert_eq!(splits_metadata[0].num_docs, 5); +} + /// Helper function to compare a json payload. /// /// It will serialize and deserialize the value in order diff --git a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs index f8fc3c0b8d3..071aa1d6cad 100644 --- a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs +++ b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs @@ -190,9 +190,12 @@ impl ObjectUriBatchReader { source_progress: &Progress, source_type: SourceType, ) -> anyhow::Result { + let mut batch_builder = BatchBuilder::new(source_type); + if self.is_eof { + return Ok(batch_builder); + } let limit_num_bytes = self.current_offset + BATCH_NUM_BYTES_LIMIT as usize; let mut new_offset = self.current_offset; - let mut batch_builder = BatchBuilder::new(source_type); while new_offset < limit_num_bytes { if let Some(record) = source_progress .protect_future(self.reader.next_record())