diff --git a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs new file mode 100644 index 00000000000..a6eb686fc94 --- /dev/null +++ b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs @@ -0,0 +1,337 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::ffi::OsStr; +use std::io; +use std::path::Path; + +use anyhow::Context; +use async_compression::tokio::bufread::GzipDecoder; +use bytes::Bytes; +use quickwit_common::uri::Uri; +use quickwit_storage::StorageResolver; +use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; + +pub struct FileRecord { + pub next_offset: u64, + pub doc: Bytes, +} + +/// A helper wrapper that lets you skip bytes in compressed files where you +/// cannot seek (e.g. gzip files). +struct SkipReader { + reader: BufReader>, + num_bytes_to_skip: usize, +} + +impl SkipReader { + fn new(reader: Box, num_bytes_to_skip: usize) -> Self { + Self { + reader: BufReader::new(reader), + num_bytes_to_skip, + } + } + + async fn skip(&mut self) -> io::Result<()> { + // Allocating 64KB once on the stack should be fine (<1% of the Linux stack size) + let mut buf = [0u8; 64000]; + while self.num_bytes_to_skip > 0 { + let num_bytes_to_read = self.num_bytes_to_skip.min(buf.len()); + let num_bytes_read = self + .reader + .read_exact(&mut buf[..num_bytes_to_read]) + .await?; + self.num_bytes_to_skip -= num_bytes_read; + } + Ok(()) + } + + async fn read_line<'a>(&mut self, buf: &'a mut String) -> io::Result { + if self.num_bytes_to_skip > 0 { + self.skip().await?; + } + self.reader.read_line(buf).await + } +} + +pub struct DocFileReader { + reader: SkipReader, + next_offset: u64, +} + +impl DocFileReader { + pub async fn from_path( + storage_resolver: &StorageResolver, + filepath: &Path, + offset: usize, + ) -> anyhow::Result { + let (dir_uri, file_name) = dir_and_filename(filepath)?; + let storage = storage_resolver.resolve(&dir_uri).await?; + let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap(); + // If it's a gzip file, we can't seek to a specific offset. `SkipReader` + // starts from the beginning of the file, decompresses and skips the + // first `offset` bytes. + let reader = if filepath.extension() == Some(OsStr::new("gz")) { + let stream = storage.get_slice_stream(file_name, 0..file_size).await?; + let decompressed_stream = Box::new(GzipDecoder::new(BufReader::new(stream))); + DocFileReader { + reader: SkipReader::new(decompressed_stream, offset), + next_offset: offset as u64, + } + } else { + let stream = storage + .get_slice_stream(file_name, offset..file_size) + .await?; + DocFileReader { + reader: SkipReader::new(stream, 0), + next_offset: offset as u64, + } + }; + Ok(reader) + } + + pub fn from_stdin() -> Self { + DocFileReader { + reader: SkipReader::new(Box::new(tokio::io::stdin()), 0), + next_offset: 0, + } + } + + /// Reads the next record from the underlying file. Returns `None` when EOF + /// is reached. + pub async fn next_record(&mut self) -> anyhow::Result> { + let mut buf = String::new(); + let res = self.reader.read_line(&mut buf).await?; + if res == 0 { + Ok(None) + } else { + self.next_offset += res as u64; + Ok(Some(FileRecord { + next_offset: self.next_offset, + doc: Bytes::from(buf), + })) + } + } +} + +pub(crate) fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> { + let dir_uri: Uri = filepath + .parent() + .context("Parent directory could not be resolved")? + .to_str() + .context("Path cannot be turned to string")? + .parse()?; + let file_name = filepath + .file_name() + .context("Path does not appear to be a file")?; + Ok((dir_uri, file_name.as_ref())) +} + +#[cfg(test)] +pub mod file_test_helpers { + use std::io::Write; + + use async_compression::tokio::write::GzipEncoder; + use tempfile::NamedTempFile; + + pub const DUMMY_DOC: &[u8] = r#"{"body": "hello happy tax payer!"}"#.as_bytes(); + + async fn gzip_bytes(bytes: &[u8]) -> Vec { + let mut gzip_documents = Vec::new(); + let mut encoder = GzipEncoder::new(&mut gzip_documents); + tokio::io::AsyncWriteExt::write_all(&mut encoder, bytes) + .await + .unwrap(); + // flush is not sufficient here and reading the file will raise a unexpected end of file + // error. + tokio::io::AsyncWriteExt::shutdown(&mut encoder) + .await + .unwrap(); + gzip_documents + } + + async fn write_to_tmp(data: Vec, gzip: bool) -> NamedTempFile { + let mut temp_file: tempfile::NamedTempFile = if gzip { + tempfile::Builder::new().suffix(".gz").tempfile().unwrap() + } else { + tempfile::NamedTempFile::new().unwrap() + }; + if gzip { + let gzip_documents = gzip_bytes(&data).await; + temp_file.write_all(&gzip_documents).unwrap(); + } else { + temp_file.write_all(&data).unwrap(); + } + temp_file.flush().unwrap(); + temp_file + } + + pub async fn generate_dummy_doc_file(gzip: bool, lines: usize) -> NamedTempFile { + let mut documents_bytes = Vec::with_capacity(DUMMY_DOC.len() * lines); + for _ in 0..lines { + documents_bytes.write_all(DUMMY_DOC).unwrap(); + documents_bytes.write_all("\n".as_bytes()).unwrap(); + } + write_to_tmp(documents_bytes, gzip).await + } + + pub async fn generate_index_doc_file(gzip: bool, lines: usize) -> NamedTempFile { + let mut documents_bytes = Vec::new(); + for i in 0..lines { + documents_bytes + .write_all(format!("{i}\n").as_bytes()) + .unwrap(); + } + write_to_tmp(documents_bytes, gzip).await + } +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + use std::path::PathBuf; + + use file_test_helpers::generate_index_doc_file; + + use super::*; + + #[tokio::test] + async fn test_skip_reader() { + { + // Skip 0 bytes. + let mut reader = SkipReader::new(Box::new("hello".as_bytes()), 0); + let mut buf = String::new(); + reader.read_line(&mut buf).await.unwrap(); + assert_eq!(buf, "hello"); + } + { + // Skip 2 bytes. + let mut reader = SkipReader::new(Box::new("hello".as_bytes()), 2); + let mut buf = String::new(); + reader.read_line(&mut buf).await.unwrap(); + assert_eq!(buf, "llo"); + } + { + let input = "hello"; + let cursor = Cursor::new(input); + let mut reader = SkipReader::new(Box::new(cursor), 5); + let mut buf = String::new(); + assert!(reader.read_line(&mut buf).await.is_ok()); + } + { + let input = "hello"; + let cursor = Cursor::new(input); + let mut reader = SkipReader::new(Box::new(cursor), 10); + let mut buf = String::new(); + assert!(reader.read_line(&mut buf).await.is_err()); + } + { + let input = "hello world".repeat(10000); + let cursor = Cursor::new(input.clone()); + let mut reader = SkipReader::new(Box::new(cursor), 64000); + let mut buf = String::new(); + reader.read_line(&mut buf).await.unwrap(); + assert_eq!(buf, input[64000..]); + } + { + let input = "hello world".repeat(10000); + let cursor = Cursor::new(input.clone()); + let mut reader = SkipReader::new(Box::new(cursor), 64001); + let mut buf = String::new(); + reader.read_line(&mut buf).await.unwrap(); + assert_eq!(buf, input[64001..]); + } + } + + async fn aux_test_full_read(file: impl Into, expected_lines: usize) { + let storage_resolver = StorageResolver::for_test(); + let file_path = file.into(); + let mut doc_reader = DocFileReader::from_path(&storage_resolver, &file_path, 0) + .await + .unwrap(); + let mut parsed_lines = 0; + while doc_reader.next_record().await.unwrap().is_some() { + parsed_lines += 1; + } + assert_eq!(parsed_lines, expected_lines); + } + + #[tokio::test] + async fn test_full_read() { + aux_test_full_read("data/test_corpus.json", 4).await; + } + + #[tokio::test] + async fn test_full_read_gz() { + aux_test_full_read("data/test_corpus.json.gz", 4).await; + } + + async fn aux_test_resumed_read( + file: impl Into, + expected_lines: usize, + stop_at_line: usize, + ) { + let storage_resolver = StorageResolver::for_test(); + let file_path = file.into(); + // read the first part of the file + let mut first_part_reader = DocFileReader::from_path(&storage_resolver, &file_path, 0) + .await + .unwrap(); + let mut resume_offset = 0; + let mut parsed_lines = 0; + for _ in 0..stop_at_line { + let rec = first_part_reader + .next_record() + .await + .unwrap() + .expect("EOF happened before stop_at_line"); + resume_offset = rec.next_offset as usize; + assert_eq!(Bytes::from(format!("{parsed_lines}\n")), rec.doc); + parsed_lines += 1; + } + // read the second part of the file + let mut second_part_reader = + DocFileReader::from_path(&storage_resolver, &file_path, resume_offset) + .await + .unwrap(); + while let Some(rec) = second_part_reader.next_record().await.unwrap() { + assert_eq!(Bytes::from(format!("{parsed_lines}\n")), rec.doc); + parsed_lines += 1; + } + assert_eq!(parsed_lines, expected_lines); + } + + #[tokio::test] + async fn test_resume_read() { + let dummy_doc_file = generate_index_doc_file(false, 1000).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 1).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 40).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 999).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 1000).await; + } + + #[tokio::test] + async fn test_resume_read_gz() { + let dummy_doc_file = generate_index_doc_file(true, 1000).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 1).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 40).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 999).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 1000).await; + } +} diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index 9ad169f701f..ac00147bb29 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -18,44 +18,38 @@ // along with this program. If not, see . use std::borrow::Borrow; -use std::ffi::OsStr; -use std::path::Path; +use std::fmt; use std::time::Duration; -use std::{fmt, io}; use anyhow::Context; -use async_compression::tokio::bufread::GzipDecoder; use async_trait::async_trait; -use bytes::Bytes; use quickwit_actors::{ActorExitStatus, Mailbox}; -use quickwit_common::uri::Uri; use quickwit_config::FileSourceParams; use quickwit_metastore::checkpoint::PartitionId; use quickwit_proto::metastore::SourceType; use quickwit_proto::types::{Position, SourceId}; use serde::Serialize; -use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; use tracing::info; +use super::doc_file_reader::DocFileReader; use super::BatchBuilder; use crate::actors::DocProcessor; use crate::source::{Source, SourceContext, SourceRuntime, TypedSourceFactory}; /// Number of bytes after which a new batch is cut. -pub(crate) const BATCH_NUM_BYTES_LIMIT: u64 = 500_000u64; +pub(crate) const BATCH_NUM_BYTES_LIMIT: u64 = 500_000; #[derive(Default, Clone, Debug, Eq, PartialEq, Serialize)] pub struct FileSourceCounters { - pub previous_offset: u64, - pub current_offset: u64, - pub num_lines_processed: u64, + pub offset: u64, + pub num_lines_processed: usize, } pub struct FileSource { source_id: SourceId, - params: FileSourceParams, + reader: DocFileReader, + partition_id: Option, counters: FileSourceCounters, - reader: FileSourceReader, } impl fmt::Debug for FileSource { @@ -71,48 +65,36 @@ impl Source for FileSource { doc_processor_mailbox: &Mailbox, ctx: &SourceContext, ) -> Result { - // We collect batches of documents before sending them to the indexer. - let limit_num_bytes = self.counters.previous_offset + BATCH_NUM_BYTES_LIMIT; - let mut reached_eof = false; + let limit_num_bytes = self.counters.offset + BATCH_NUM_BYTES_LIMIT; + let mut new_offset = self.counters.offset; let mut batch_builder = BatchBuilder::new(SourceType::File); - - while self.counters.current_offset < limit_num_bytes { - let mut doc_line = String::new(); - // guard the zone in case of slow read, such as reading from someone - // typing to stdin - let num_bytes = ctx - .protect_future(self.reader.read_line(&mut doc_line)) - .await - .map_err(anyhow::Error::from)?; - if num_bytes == 0 { - reached_eof = true; + let mut is_eof = false; + while new_offset < limit_num_bytes { + if let Some(record) = ctx.protect_future(self.reader.next_record()).await? { + new_offset = record.next_offset; + self.counters.num_lines_processed += 1; + batch_builder.add_doc(record.doc); + } else { + is_eof = true; break; } - batch_builder.add_doc(Bytes::from(doc_line)); - self.counters.current_offset += num_bytes as u64; - self.counters.num_lines_processed += 1; } - if !batch_builder.docs.is_empty() { - if let Some(filepath) = &self.params.filepath { - let filepath_str = filepath - .to_str() - .context("path is invalid utf-8")? - .to_string(); - let partition_id = PartitionId::from(filepath_str); + if new_offset > self.counters.offset { + if let Some(partition_id) = &self.partition_id { batch_builder .checkpoint_delta .record_partition_delta( - partition_id, - Position::offset(self.counters.previous_offset), - Position::offset(self.counters.current_offset), + partition_id.clone(), + Position::offset(self.counters.offset), + Position::offset(new_offset), ) .unwrap(); } - self.counters.previous_offset = self.counters.current_offset; ctx.send_message(doc_processor_mailbox, batch_builder.build()) .await?; + self.counters.offset = new_offset; } - if reached_eof { + if is_eof { info!("reached end of file"); ctx.send_exit_with_success(doc_processor_mailbox).await?; return Err(ActorExitStatus::Success); @@ -140,114 +122,57 @@ impl TypedSourceFactory for FileSourceFactory { source_runtime: SourceRuntime, params: FileSourceParams, ) -> anyhow::Result { + let Some(filepath) = ¶ms.filepath else { + return Ok(FileSource { + source_id: source_runtime.source_id().to_string(), + reader: DocFileReader::from_stdin(), + partition_id: None, + counters: FileSourceCounters::default(), + }); + }; + + let partition_id = PartitionId::from(filepath.to_string_lossy().borrow()); let checkpoint = source_runtime.fetch_checkpoint().await?; - let mut offset = 0; + let offset = checkpoint + .position_for_partition(&partition_id) + .map(|position| { + position + .as_usize() + .context("file offset should be stored as usize") + }) + .transpose()? + .unwrap_or(0); - let reader: FileSourceReader = if let Some(filepath) = ¶ms.filepath { - let partition_id = PartitionId::from(filepath.to_string_lossy().borrow()); - offset = checkpoint - .position_for_partition(&partition_id) - .map(|position| { - position - .as_usize() - .expect("file offset should be stored as usize") - }) - .unwrap_or(0); - let (dir_uri, file_name) = dir_and_filename(filepath)?; - let storage = source_runtime.storage_resolver.resolve(&dir_uri).await?; - let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap(); - // If it's a gzip file, we can't seek to a specific offset, we need to start from the - // beginning of the file, decompress and skip the first `offset` bytes. - if filepath.extension() == Some(OsStr::new("gz")) { - let stream = storage.get_slice_stream(file_name, 0..file_size).await?; - FileSourceReader::new(Box::new(GzipDecoder::new(BufReader::new(stream))), offset) - } else { - let stream = storage - .get_slice_stream(file_name, offset..file_size) - .await?; - FileSourceReader::new(stream, 0) - } - } else { - // We cannot use the checkpoint. - FileSourceReader::new(Box::new(tokio::io::stdin()), 0) - }; - let file_source = FileSource { + let reader = + DocFileReader::from_path(&source_runtime.storage_resolver, filepath, offset).await?; + + Ok(FileSource { source_id: source_runtime.source_id().to_string(), + reader, + partition_id: Some(partition_id), counters: FileSourceCounters { - previous_offset: offset as u64, - current_offset: offset as u64, - num_lines_processed: 0, + offset: offset as u64, + ..Default::default() }, - reader, - params, - }; - Ok(file_source) + }) } } -struct FileSourceReader { - reader: BufReader>, - num_bytes_to_skip: usize, -} - -impl FileSourceReader { - fn new(reader: Box, num_bytes_to_skip: usize) -> Self { - Self { - reader: BufReader::new(reader), - num_bytes_to_skip, - } - } - - // This function is only called for GZIP file. - // Because they cannot be seeked into, we have to scan them to the right initial position. - async fn skip(&mut self) -> io::Result<()> { - // Allocate once a 64kb buffer. - let mut buf = [0u8; 64000]; - while self.num_bytes_to_skip > 0 { - let num_bytes_to_read = self.num_bytes_to_skip.min(buf.len()); - let num_bytes_read = self - .reader - .read_exact(&mut buf[..num_bytes_to_read]) - .await?; - self.num_bytes_to_skip -= num_bytes_read; - } - Ok(()) - } - - async fn read_line<'a>(&mut self, buf: &'a mut String) -> io::Result { - if self.num_bytes_to_skip > 0 { - self.skip().await?; - } - self.reader.read_line(buf).await - } -} - -pub(crate) fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> { - let dir_uri: Uri = filepath - .parent() - .context("Parent directory could not be resolved")? - .to_str() - .context("Path cannot be turned to string")? - .parse()?; - let file_name = filepath - .file_name() - .context("Path does not appear to be a file")?; - Ok((dir_uri, file_name.as_ref())) -} - #[cfg(test)] mod tests { - use std::io::{Cursor, Write}; + use std::borrow::Borrow; use std::num::NonZeroUsize; - use async_compression::tokio::write::GzipEncoder; use quickwit_actors::{Command, Universe}; use quickwit_config::{SourceConfig, SourceInputFormat, SourceParams}; - use quickwit_metastore::checkpoint::SourceCheckpointDelta; - use quickwit_proto::types::IndexUid; + use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpointDelta}; + use quickwit_proto::types::{IndexUid, Position}; use super::*; use crate::models::RawDocBatch; + use crate::source::doc_file_reader::file_test_helpers::{ + generate_dummy_doc_file, generate_index_doc_file, + }; use crate::source::tests::SourceRuntimeBuilder; use crate::source::SourceActor; @@ -289,8 +214,7 @@ mod tests { assert_eq!( counters, serde_json::json!({ - "previous_offset": 1030u64, - "current_offset": 1030u64, + "offset": 1030u64, "num_lines_processed": 4u32 }) ); @@ -312,25 +236,7 @@ mod tests { quickwit_common::setup_logging_for_tests(); let universe = Universe::with_accelerated_time(); let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); - let mut documents_bytes = Vec::new(); - for _ in 0..20_000 { - documents_bytes - .write_all(r#"{"body": "hello happy tax payer!"}"#.as_bytes()) - .unwrap(); - documents_bytes.write_all("\n".as_bytes()).unwrap(); - } - let mut temp_file: tempfile::NamedTempFile = if gzip { - tempfile::Builder::new().suffix(".gz").tempfile().unwrap() - } else { - tempfile::NamedTempFile::new().unwrap() - }; - if gzip { - let gzip_documents = gzip_bytes(&documents_bytes).await; - temp_file.write_all(&gzip_documents).unwrap(); - } else { - temp_file.write_all(&documents_bytes).unwrap(); - } - temp_file.flush().unwrap(); + let temp_file = generate_dummy_doc_file(gzip, 20_000).await; let params = FileSourceParams::file(temp_file.path()); let filepath = params .filepath @@ -363,8 +269,7 @@ mod tests { assert_eq!( counters, serde_json::json!({ - "previous_offset": 700_000u64, - "current_offset": 700_000u64, + "offset": 700_000u64, "num_lines_processed": 20_000u64 }) ); @@ -408,27 +313,9 @@ mod tests { quickwit_common::setup_logging_for_tests(); let universe = Universe::with_accelerated_time(); let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); - let mut documents_bytes = Vec::new(); - for i in 0..100 { - documents_bytes - .write_all(format!("{i}\n").as_bytes()) - .unwrap(); - } - let mut temp_file: tempfile::NamedTempFile = if gzip { - tempfile::Builder::new().suffix(".gz").tempfile().unwrap() - } else { - tempfile::NamedTempFile::new().unwrap() - }; - let temp_file_path = temp_file.path().canonicalize().unwrap(); - if gzip { - let gzipped_documents = gzip_bytes(&documents_bytes).await; - temp_file.write_all(&gzipped_documents).unwrap(); - } else { - temp_file.write_all(&documents_bytes).unwrap(); - } - temp_file.flush().unwrap(); - - let params = FileSourceParams::file(&temp_file_path); + let temp_file = generate_index_doc_file(gzip, 100).await; + let temp_file_path = temp_file.path(); + let params = FileSourceParams::file(temp_file_path); let source_config = SourceConfig { source_id: "test-file-source".to_string(), num_pipelines: NonZeroUsize::new(1).unwrap(), @@ -465,74 +352,11 @@ mod tests { assert_eq!( counters, serde_json::json!({ - "previous_offset": 290u64, - "current_offset": 290u64, + "offset": 290u64, "num_lines_processed": 98u64 }) ); let indexer_messages: Vec = doc_processor_inbox.drain_for_test_typed(); assert!(&indexer_messages[0].docs[0].starts_with(b"2\n")); } - - async fn gzip_bytes(bytes: &[u8]) -> Vec { - let mut gzip_documents = Vec::new(); - let mut encoder = GzipEncoder::new(&mut gzip_documents); - tokio::io::AsyncWriteExt::write_all(&mut encoder, bytes) - .await - .unwrap(); - // flush is not sufficient here and reading the file will raise a unexpected end of file - // error. - tokio::io::AsyncWriteExt::shutdown(&mut encoder) - .await - .unwrap(); - gzip_documents - } - - #[tokio::test] - async fn test_skip_reader() { - { - // Skip 0 bytes. - let mut reader = FileSourceReader::new(Box::new("hello".as_bytes()), 0); - let mut buf = String::new(); - reader.read_line(&mut buf).await.unwrap(); - assert_eq!(buf, "hello"); - } - { - // Skip 2 bytes. - let mut reader = FileSourceReader::new(Box::new("hello".as_bytes()), 2); - let mut buf = String::new(); - reader.read_line(&mut buf).await.unwrap(); - assert_eq!(buf, "llo"); - } - { - let input = "hello"; - let cursor = Cursor::new(input); - let mut reader = FileSourceReader::new(Box::new(cursor), 5); - let mut buf = String::new(); - assert!(reader.read_line(&mut buf).await.is_ok()); - } - { - let input = "hello"; - let cursor = Cursor::new(input); - let mut reader = FileSourceReader::new(Box::new(cursor), 10); - let mut buf = String::new(); - assert!(reader.read_line(&mut buf).await.is_err()); - } - { - let input = "hello world".repeat(10000); - let cursor = Cursor::new(input.clone()); - let mut reader = FileSourceReader::new(Box::new(cursor), 64000); - let mut buf = String::new(); - reader.read_line(&mut buf).await.unwrap(); - assert_eq!(buf, input[64000..]); - } - { - let input = "hello world".repeat(10000); - let cursor = Cursor::new(input.clone()); - let mut reader = FileSourceReader::new(Box::new(cursor), 64001); - let mut buf = String::new(); - reader.read_line(&mut buf).await.unwrap(); - assert_eq!(buf, input[64001..]); - } - } } diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index db2583d0e95..ddd4b00aff7 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -57,6 +57,7 @@ //! that file. //! - the kafka source: the partition id is a kafka topic partition id, and the position is a kafka //! offset. +mod doc_file_reader; mod file_source; #[cfg(feature = "gcp-pubsub")] mod gcp_pubsub_source; @@ -111,7 +112,7 @@ use tracing::error; pub use vec_source::{VecSource, VecSourceFactory}; pub use void_source::{VoidSource, VoidSourceFactory}; -use self::file_source::dir_and_filename; +use self::doc_file_reader::dir_and_filename; use crate::actors::DocProcessor; use crate::models::RawDocBatch; use crate::source::ingest::IngestSourceFactory; @@ -232,7 +233,7 @@ pub trait Source: Send + 'static { /// In that case, `batch_sink` will block. /// /// It returns an optional duration specifying how long the batch requester - /// should wait before pooling gain. + /// should wait before polling again. async fn emit_batches( &mut self, doc_processor_mailbox: &Mailbox,