diff --git a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs
new file mode 100644
index 00000000000..a6eb686fc94
--- /dev/null
+++ b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs
@@ -0,0 +1,337 @@
+// Copyright (C) 2024 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+use std::ffi::OsStr;
+use std::io;
+use std::path::Path;
+
+use anyhow::Context;
+use async_compression::tokio::bufread::GzipDecoder;
+use bytes::Bytes;
+use quickwit_common::uri::Uri;
+use quickwit_storage::StorageResolver;
+use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader};
+
+pub struct FileRecord {
+ pub next_offset: u64,
+ pub doc: Bytes,
+}
+
+/// A helper wrapper that lets you skip bytes in compressed files where you
+/// cannot seek (e.g. gzip files).
+struct SkipReader {
+ reader: BufReader>,
+ num_bytes_to_skip: usize,
+}
+
+impl SkipReader {
+ fn new(reader: Box, num_bytes_to_skip: usize) -> Self {
+ Self {
+ reader: BufReader::new(reader),
+ num_bytes_to_skip,
+ }
+ }
+
+ async fn skip(&mut self) -> io::Result<()> {
+ // Allocating 64KB once on the stack should be fine (<1% of the Linux stack size)
+ let mut buf = [0u8; 64000];
+ while self.num_bytes_to_skip > 0 {
+ let num_bytes_to_read = self.num_bytes_to_skip.min(buf.len());
+ let num_bytes_read = self
+ .reader
+ .read_exact(&mut buf[..num_bytes_to_read])
+ .await?;
+ self.num_bytes_to_skip -= num_bytes_read;
+ }
+ Ok(())
+ }
+
+ async fn read_line<'a>(&mut self, buf: &'a mut String) -> io::Result {
+ if self.num_bytes_to_skip > 0 {
+ self.skip().await?;
+ }
+ self.reader.read_line(buf).await
+ }
+}
+
+pub struct DocFileReader {
+ reader: SkipReader,
+ next_offset: u64,
+}
+
+impl DocFileReader {
+ pub async fn from_path(
+ storage_resolver: &StorageResolver,
+ filepath: &Path,
+ offset: usize,
+ ) -> anyhow::Result {
+ let (dir_uri, file_name) = dir_and_filename(filepath)?;
+ let storage = storage_resolver.resolve(&dir_uri).await?;
+ let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap();
+ // If it's a gzip file, we can't seek to a specific offset. `SkipReader`
+ // starts from the beginning of the file, decompresses and skips the
+ // first `offset` bytes.
+ let reader = if filepath.extension() == Some(OsStr::new("gz")) {
+ let stream = storage.get_slice_stream(file_name, 0..file_size).await?;
+ let decompressed_stream = Box::new(GzipDecoder::new(BufReader::new(stream)));
+ DocFileReader {
+ reader: SkipReader::new(decompressed_stream, offset),
+ next_offset: offset as u64,
+ }
+ } else {
+ let stream = storage
+ .get_slice_stream(file_name, offset..file_size)
+ .await?;
+ DocFileReader {
+ reader: SkipReader::new(stream, 0),
+ next_offset: offset as u64,
+ }
+ };
+ Ok(reader)
+ }
+
+ pub fn from_stdin() -> Self {
+ DocFileReader {
+ reader: SkipReader::new(Box::new(tokio::io::stdin()), 0),
+ next_offset: 0,
+ }
+ }
+
+ /// Reads the next record from the underlying file. Returns `None` when EOF
+ /// is reached.
+ pub async fn next_record(&mut self) -> anyhow::Result> {
+ let mut buf = String::new();
+ let res = self.reader.read_line(&mut buf).await?;
+ if res == 0 {
+ Ok(None)
+ } else {
+ self.next_offset += res as u64;
+ Ok(Some(FileRecord {
+ next_offset: self.next_offset,
+ doc: Bytes::from(buf),
+ }))
+ }
+ }
+}
+
+pub(crate) fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> {
+ let dir_uri: Uri = filepath
+ .parent()
+ .context("Parent directory could not be resolved")?
+ .to_str()
+ .context("Path cannot be turned to string")?
+ .parse()?;
+ let file_name = filepath
+ .file_name()
+ .context("Path does not appear to be a file")?;
+ Ok((dir_uri, file_name.as_ref()))
+}
+
+#[cfg(test)]
+pub mod file_test_helpers {
+ use std::io::Write;
+
+ use async_compression::tokio::write::GzipEncoder;
+ use tempfile::NamedTempFile;
+
+ pub const DUMMY_DOC: &[u8] = r#"{"body": "hello happy tax payer!"}"#.as_bytes();
+
+ async fn gzip_bytes(bytes: &[u8]) -> Vec {
+ let mut gzip_documents = Vec::new();
+ let mut encoder = GzipEncoder::new(&mut gzip_documents);
+ tokio::io::AsyncWriteExt::write_all(&mut encoder, bytes)
+ .await
+ .unwrap();
+ // flush is not sufficient here and reading the file will raise a unexpected end of file
+ // error.
+ tokio::io::AsyncWriteExt::shutdown(&mut encoder)
+ .await
+ .unwrap();
+ gzip_documents
+ }
+
+ async fn write_to_tmp(data: Vec, gzip: bool) -> NamedTempFile {
+ let mut temp_file: tempfile::NamedTempFile = if gzip {
+ tempfile::Builder::new().suffix(".gz").tempfile().unwrap()
+ } else {
+ tempfile::NamedTempFile::new().unwrap()
+ };
+ if gzip {
+ let gzip_documents = gzip_bytes(&data).await;
+ temp_file.write_all(&gzip_documents).unwrap();
+ } else {
+ temp_file.write_all(&data).unwrap();
+ }
+ temp_file.flush().unwrap();
+ temp_file
+ }
+
+ pub async fn generate_dummy_doc_file(gzip: bool, lines: usize) -> NamedTempFile {
+ let mut documents_bytes = Vec::with_capacity(DUMMY_DOC.len() * lines);
+ for _ in 0..lines {
+ documents_bytes.write_all(DUMMY_DOC).unwrap();
+ documents_bytes.write_all("\n".as_bytes()).unwrap();
+ }
+ write_to_tmp(documents_bytes, gzip).await
+ }
+
+ pub async fn generate_index_doc_file(gzip: bool, lines: usize) -> NamedTempFile {
+ let mut documents_bytes = Vec::new();
+ for i in 0..lines {
+ documents_bytes
+ .write_all(format!("{i}\n").as_bytes())
+ .unwrap();
+ }
+ write_to_tmp(documents_bytes, gzip).await
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::io::Cursor;
+ use std::path::PathBuf;
+
+ use file_test_helpers::generate_index_doc_file;
+
+ use super::*;
+
+ #[tokio::test]
+ async fn test_skip_reader() {
+ {
+ // Skip 0 bytes.
+ let mut reader = SkipReader::new(Box::new("hello".as_bytes()), 0);
+ let mut buf = String::new();
+ reader.read_line(&mut buf).await.unwrap();
+ assert_eq!(buf, "hello");
+ }
+ {
+ // Skip 2 bytes.
+ let mut reader = SkipReader::new(Box::new("hello".as_bytes()), 2);
+ let mut buf = String::new();
+ reader.read_line(&mut buf).await.unwrap();
+ assert_eq!(buf, "llo");
+ }
+ {
+ let input = "hello";
+ let cursor = Cursor::new(input);
+ let mut reader = SkipReader::new(Box::new(cursor), 5);
+ let mut buf = String::new();
+ assert!(reader.read_line(&mut buf).await.is_ok());
+ }
+ {
+ let input = "hello";
+ let cursor = Cursor::new(input);
+ let mut reader = SkipReader::new(Box::new(cursor), 10);
+ let mut buf = String::new();
+ assert!(reader.read_line(&mut buf).await.is_err());
+ }
+ {
+ let input = "hello world".repeat(10000);
+ let cursor = Cursor::new(input.clone());
+ let mut reader = SkipReader::new(Box::new(cursor), 64000);
+ let mut buf = String::new();
+ reader.read_line(&mut buf).await.unwrap();
+ assert_eq!(buf, input[64000..]);
+ }
+ {
+ let input = "hello world".repeat(10000);
+ let cursor = Cursor::new(input.clone());
+ let mut reader = SkipReader::new(Box::new(cursor), 64001);
+ let mut buf = String::new();
+ reader.read_line(&mut buf).await.unwrap();
+ assert_eq!(buf, input[64001..]);
+ }
+ }
+
+ async fn aux_test_full_read(file: impl Into, expected_lines: usize) {
+ let storage_resolver = StorageResolver::for_test();
+ let file_path = file.into();
+ let mut doc_reader = DocFileReader::from_path(&storage_resolver, &file_path, 0)
+ .await
+ .unwrap();
+ let mut parsed_lines = 0;
+ while doc_reader.next_record().await.unwrap().is_some() {
+ parsed_lines += 1;
+ }
+ assert_eq!(parsed_lines, expected_lines);
+ }
+
+ #[tokio::test]
+ async fn test_full_read() {
+ aux_test_full_read("data/test_corpus.json", 4).await;
+ }
+
+ #[tokio::test]
+ async fn test_full_read_gz() {
+ aux_test_full_read("data/test_corpus.json.gz", 4).await;
+ }
+
+ async fn aux_test_resumed_read(
+ file: impl Into,
+ expected_lines: usize,
+ stop_at_line: usize,
+ ) {
+ let storage_resolver = StorageResolver::for_test();
+ let file_path = file.into();
+ // read the first part of the file
+ let mut first_part_reader = DocFileReader::from_path(&storage_resolver, &file_path, 0)
+ .await
+ .unwrap();
+ let mut resume_offset = 0;
+ let mut parsed_lines = 0;
+ for _ in 0..stop_at_line {
+ let rec = first_part_reader
+ .next_record()
+ .await
+ .unwrap()
+ .expect("EOF happened before stop_at_line");
+ resume_offset = rec.next_offset as usize;
+ assert_eq!(Bytes::from(format!("{parsed_lines}\n")), rec.doc);
+ parsed_lines += 1;
+ }
+ // read the second part of the file
+ let mut second_part_reader =
+ DocFileReader::from_path(&storage_resolver, &file_path, resume_offset)
+ .await
+ .unwrap();
+ while let Some(rec) = second_part_reader.next_record().await.unwrap() {
+ assert_eq!(Bytes::from(format!("{parsed_lines}\n")), rec.doc);
+ parsed_lines += 1;
+ }
+ assert_eq!(parsed_lines, expected_lines);
+ }
+
+ #[tokio::test]
+ async fn test_resume_read() {
+ let dummy_doc_file = generate_index_doc_file(false, 1000).await;
+ aux_test_resumed_read(dummy_doc_file.path(), 1000, 1).await;
+ aux_test_resumed_read(dummy_doc_file.path(), 1000, 40).await;
+ aux_test_resumed_read(dummy_doc_file.path(), 1000, 999).await;
+ aux_test_resumed_read(dummy_doc_file.path(), 1000, 1000).await;
+ }
+
+ #[tokio::test]
+ async fn test_resume_read_gz() {
+ let dummy_doc_file = generate_index_doc_file(true, 1000).await;
+ aux_test_resumed_read(dummy_doc_file.path(), 1000, 1).await;
+ aux_test_resumed_read(dummy_doc_file.path(), 1000, 40).await;
+ aux_test_resumed_read(dummy_doc_file.path(), 1000, 999).await;
+ aux_test_resumed_read(dummy_doc_file.path(), 1000, 1000).await;
+ }
+}
diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs
index 9ad169f701f..ac00147bb29 100644
--- a/quickwit/quickwit-indexing/src/source/file_source.rs
+++ b/quickwit/quickwit-indexing/src/source/file_source.rs
@@ -18,44 +18,38 @@
// along with this program. If not, see .
use std::borrow::Borrow;
-use std::ffi::OsStr;
-use std::path::Path;
+use std::fmt;
use std::time::Duration;
-use std::{fmt, io};
use anyhow::Context;
-use async_compression::tokio::bufread::GzipDecoder;
use async_trait::async_trait;
-use bytes::Bytes;
use quickwit_actors::{ActorExitStatus, Mailbox};
-use quickwit_common::uri::Uri;
use quickwit_config::FileSourceParams;
use quickwit_metastore::checkpoint::PartitionId;
use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::{Position, SourceId};
use serde::Serialize;
-use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader};
use tracing::info;
+use super::doc_file_reader::DocFileReader;
use super::BatchBuilder;
use crate::actors::DocProcessor;
use crate::source::{Source, SourceContext, SourceRuntime, TypedSourceFactory};
/// Number of bytes after which a new batch is cut.
-pub(crate) const BATCH_NUM_BYTES_LIMIT: u64 = 500_000u64;
+pub(crate) const BATCH_NUM_BYTES_LIMIT: u64 = 500_000;
#[derive(Default, Clone, Debug, Eq, PartialEq, Serialize)]
pub struct FileSourceCounters {
- pub previous_offset: u64,
- pub current_offset: u64,
- pub num_lines_processed: u64,
+ pub offset: u64,
+ pub num_lines_processed: usize,
}
pub struct FileSource {
source_id: SourceId,
- params: FileSourceParams,
+ reader: DocFileReader,
+ partition_id: Option,
counters: FileSourceCounters,
- reader: FileSourceReader,
}
impl fmt::Debug for FileSource {
@@ -71,48 +65,36 @@ impl Source for FileSource {
doc_processor_mailbox: &Mailbox,
ctx: &SourceContext,
) -> Result {
- // We collect batches of documents before sending them to the indexer.
- let limit_num_bytes = self.counters.previous_offset + BATCH_NUM_BYTES_LIMIT;
- let mut reached_eof = false;
+ let limit_num_bytes = self.counters.offset + BATCH_NUM_BYTES_LIMIT;
+ let mut new_offset = self.counters.offset;
let mut batch_builder = BatchBuilder::new(SourceType::File);
-
- while self.counters.current_offset < limit_num_bytes {
- let mut doc_line = String::new();
- // guard the zone in case of slow read, such as reading from someone
- // typing to stdin
- let num_bytes = ctx
- .protect_future(self.reader.read_line(&mut doc_line))
- .await
- .map_err(anyhow::Error::from)?;
- if num_bytes == 0 {
- reached_eof = true;
+ let mut is_eof = false;
+ while new_offset < limit_num_bytes {
+ if let Some(record) = ctx.protect_future(self.reader.next_record()).await? {
+ new_offset = record.next_offset;
+ self.counters.num_lines_processed += 1;
+ batch_builder.add_doc(record.doc);
+ } else {
+ is_eof = true;
break;
}
- batch_builder.add_doc(Bytes::from(doc_line));
- self.counters.current_offset += num_bytes as u64;
- self.counters.num_lines_processed += 1;
}
- if !batch_builder.docs.is_empty() {
- if let Some(filepath) = &self.params.filepath {
- let filepath_str = filepath
- .to_str()
- .context("path is invalid utf-8")?
- .to_string();
- let partition_id = PartitionId::from(filepath_str);
+ if new_offset > self.counters.offset {
+ if let Some(partition_id) = &self.partition_id {
batch_builder
.checkpoint_delta
.record_partition_delta(
- partition_id,
- Position::offset(self.counters.previous_offset),
- Position::offset(self.counters.current_offset),
+ partition_id.clone(),
+ Position::offset(self.counters.offset),
+ Position::offset(new_offset),
)
.unwrap();
}
- self.counters.previous_offset = self.counters.current_offset;
ctx.send_message(doc_processor_mailbox, batch_builder.build())
.await?;
+ self.counters.offset = new_offset;
}
- if reached_eof {
+ if is_eof {
info!("reached end of file");
ctx.send_exit_with_success(doc_processor_mailbox).await?;
return Err(ActorExitStatus::Success);
@@ -140,114 +122,57 @@ impl TypedSourceFactory for FileSourceFactory {
source_runtime: SourceRuntime,
params: FileSourceParams,
) -> anyhow::Result {
+ let Some(filepath) = ¶ms.filepath else {
+ return Ok(FileSource {
+ source_id: source_runtime.source_id().to_string(),
+ reader: DocFileReader::from_stdin(),
+ partition_id: None,
+ counters: FileSourceCounters::default(),
+ });
+ };
+
+ let partition_id = PartitionId::from(filepath.to_string_lossy().borrow());
let checkpoint = source_runtime.fetch_checkpoint().await?;
- let mut offset = 0;
+ let offset = checkpoint
+ .position_for_partition(&partition_id)
+ .map(|position| {
+ position
+ .as_usize()
+ .context("file offset should be stored as usize")
+ })
+ .transpose()?
+ .unwrap_or(0);
- let reader: FileSourceReader = if let Some(filepath) = ¶ms.filepath {
- let partition_id = PartitionId::from(filepath.to_string_lossy().borrow());
- offset = checkpoint
- .position_for_partition(&partition_id)
- .map(|position| {
- position
- .as_usize()
- .expect("file offset should be stored as usize")
- })
- .unwrap_or(0);
- let (dir_uri, file_name) = dir_and_filename(filepath)?;
- let storage = source_runtime.storage_resolver.resolve(&dir_uri).await?;
- let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap();
- // If it's a gzip file, we can't seek to a specific offset, we need to start from the
- // beginning of the file, decompress and skip the first `offset` bytes.
- if filepath.extension() == Some(OsStr::new("gz")) {
- let stream = storage.get_slice_stream(file_name, 0..file_size).await?;
- FileSourceReader::new(Box::new(GzipDecoder::new(BufReader::new(stream))), offset)
- } else {
- let stream = storage
- .get_slice_stream(file_name, offset..file_size)
- .await?;
- FileSourceReader::new(stream, 0)
- }
- } else {
- // We cannot use the checkpoint.
- FileSourceReader::new(Box::new(tokio::io::stdin()), 0)
- };
- let file_source = FileSource {
+ let reader =
+ DocFileReader::from_path(&source_runtime.storage_resolver, filepath, offset).await?;
+
+ Ok(FileSource {
source_id: source_runtime.source_id().to_string(),
+ reader,
+ partition_id: Some(partition_id),
counters: FileSourceCounters {
- previous_offset: offset as u64,
- current_offset: offset as u64,
- num_lines_processed: 0,
+ offset: offset as u64,
+ ..Default::default()
},
- reader,
- params,
- };
- Ok(file_source)
+ })
}
}
-struct FileSourceReader {
- reader: BufReader>,
- num_bytes_to_skip: usize,
-}
-
-impl FileSourceReader {
- fn new(reader: Box, num_bytes_to_skip: usize) -> Self {
- Self {
- reader: BufReader::new(reader),
- num_bytes_to_skip,
- }
- }
-
- // This function is only called for GZIP file.
- // Because they cannot be seeked into, we have to scan them to the right initial position.
- async fn skip(&mut self) -> io::Result<()> {
- // Allocate once a 64kb buffer.
- let mut buf = [0u8; 64000];
- while self.num_bytes_to_skip > 0 {
- let num_bytes_to_read = self.num_bytes_to_skip.min(buf.len());
- let num_bytes_read = self
- .reader
- .read_exact(&mut buf[..num_bytes_to_read])
- .await?;
- self.num_bytes_to_skip -= num_bytes_read;
- }
- Ok(())
- }
-
- async fn read_line<'a>(&mut self, buf: &'a mut String) -> io::Result {
- if self.num_bytes_to_skip > 0 {
- self.skip().await?;
- }
- self.reader.read_line(buf).await
- }
-}
-
-pub(crate) fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> {
- let dir_uri: Uri = filepath
- .parent()
- .context("Parent directory could not be resolved")?
- .to_str()
- .context("Path cannot be turned to string")?
- .parse()?;
- let file_name = filepath
- .file_name()
- .context("Path does not appear to be a file")?;
- Ok((dir_uri, file_name.as_ref()))
-}
-
#[cfg(test)]
mod tests {
- use std::io::{Cursor, Write};
+ use std::borrow::Borrow;
use std::num::NonZeroUsize;
- use async_compression::tokio::write::GzipEncoder;
use quickwit_actors::{Command, Universe};
use quickwit_config::{SourceConfig, SourceInputFormat, SourceParams};
- use quickwit_metastore::checkpoint::SourceCheckpointDelta;
- use quickwit_proto::types::IndexUid;
+ use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpointDelta};
+ use quickwit_proto::types::{IndexUid, Position};
use super::*;
use crate::models::RawDocBatch;
+ use crate::source::doc_file_reader::file_test_helpers::{
+ generate_dummy_doc_file, generate_index_doc_file,
+ };
use crate::source::tests::SourceRuntimeBuilder;
use crate::source::SourceActor;
@@ -289,8 +214,7 @@ mod tests {
assert_eq!(
counters,
serde_json::json!({
- "previous_offset": 1030u64,
- "current_offset": 1030u64,
+ "offset": 1030u64,
"num_lines_processed": 4u32
})
);
@@ -312,25 +236,7 @@ mod tests {
quickwit_common::setup_logging_for_tests();
let universe = Universe::with_accelerated_time();
let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox();
- let mut documents_bytes = Vec::new();
- for _ in 0..20_000 {
- documents_bytes
- .write_all(r#"{"body": "hello happy tax payer!"}"#.as_bytes())
- .unwrap();
- documents_bytes.write_all("\n".as_bytes()).unwrap();
- }
- let mut temp_file: tempfile::NamedTempFile = if gzip {
- tempfile::Builder::new().suffix(".gz").tempfile().unwrap()
- } else {
- tempfile::NamedTempFile::new().unwrap()
- };
- if gzip {
- let gzip_documents = gzip_bytes(&documents_bytes).await;
- temp_file.write_all(&gzip_documents).unwrap();
- } else {
- temp_file.write_all(&documents_bytes).unwrap();
- }
- temp_file.flush().unwrap();
+ let temp_file = generate_dummy_doc_file(gzip, 20_000).await;
let params = FileSourceParams::file(temp_file.path());
let filepath = params
.filepath
@@ -363,8 +269,7 @@ mod tests {
assert_eq!(
counters,
serde_json::json!({
- "previous_offset": 700_000u64,
- "current_offset": 700_000u64,
+ "offset": 700_000u64,
"num_lines_processed": 20_000u64
})
);
@@ -408,27 +313,9 @@ mod tests {
quickwit_common::setup_logging_for_tests();
let universe = Universe::with_accelerated_time();
let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox();
- let mut documents_bytes = Vec::new();
- for i in 0..100 {
- documents_bytes
- .write_all(format!("{i}\n").as_bytes())
- .unwrap();
- }
- let mut temp_file: tempfile::NamedTempFile = if gzip {
- tempfile::Builder::new().suffix(".gz").tempfile().unwrap()
- } else {
- tempfile::NamedTempFile::new().unwrap()
- };
- let temp_file_path = temp_file.path().canonicalize().unwrap();
- if gzip {
- let gzipped_documents = gzip_bytes(&documents_bytes).await;
- temp_file.write_all(&gzipped_documents).unwrap();
- } else {
- temp_file.write_all(&documents_bytes).unwrap();
- }
- temp_file.flush().unwrap();
-
- let params = FileSourceParams::file(&temp_file_path);
+ let temp_file = generate_index_doc_file(gzip, 100).await;
+ let temp_file_path = temp_file.path();
+ let params = FileSourceParams::file(temp_file_path);
let source_config = SourceConfig {
source_id: "test-file-source".to_string(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
@@ -465,74 +352,11 @@ mod tests {
assert_eq!(
counters,
serde_json::json!({
- "previous_offset": 290u64,
- "current_offset": 290u64,
+ "offset": 290u64,
"num_lines_processed": 98u64
})
);
let indexer_messages: Vec = doc_processor_inbox.drain_for_test_typed();
assert!(&indexer_messages[0].docs[0].starts_with(b"2\n"));
}
-
- async fn gzip_bytes(bytes: &[u8]) -> Vec {
- let mut gzip_documents = Vec::new();
- let mut encoder = GzipEncoder::new(&mut gzip_documents);
- tokio::io::AsyncWriteExt::write_all(&mut encoder, bytes)
- .await
- .unwrap();
- // flush is not sufficient here and reading the file will raise a unexpected end of file
- // error.
- tokio::io::AsyncWriteExt::shutdown(&mut encoder)
- .await
- .unwrap();
- gzip_documents
- }
-
- #[tokio::test]
- async fn test_skip_reader() {
- {
- // Skip 0 bytes.
- let mut reader = FileSourceReader::new(Box::new("hello".as_bytes()), 0);
- let mut buf = String::new();
- reader.read_line(&mut buf).await.unwrap();
- assert_eq!(buf, "hello");
- }
- {
- // Skip 2 bytes.
- let mut reader = FileSourceReader::new(Box::new("hello".as_bytes()), 2);
- let mut buf = String::new();
- reader.read_line(&mut buf).await.unwrap();
- assert_eq!(buf, "llo");
- }
- {
- let input = "hello";
- let cursor = Cursor::new(input);
- let mut reader = FileSourceReader::new(Box::new(cursor), 5);
- let mut buf = String::new();
- assert!(reader.read_line(&mut buf).await.is_ok());
- }
- {
- let input = "hello";
- let cursor = Cursor::new(input);
- let mut reader = FileSourceReader::new(Box::new(cursor), 10);
- let mut buf = String::new();
- assert!(reader.read_line(&mut buf).await.is_err());
- }
- {
- let input = "hello world".repeat(10000);
- let cursor = Cursor::new(input.clone());
- let mut reader = FileSourceReader::new(Box::new(cursor), 64000);
- let mut buf = String::new();
- reader.read_line(&mut buf).await.unwrap();
- assert_eq!(buf, input[64000..]);
- }
- {
- let input = "hello world".repeat(10000);
- let cursor = Cursor::new(input.clone());
- let mut reader = FileSourceReader::new(Box::new(cursor), 64001);
- let mut buf = String::new();
- reader.read_line(&mut buf).await.unwrap();
- assert_eq!(buf, input[64001..]);
- }
- }
}
diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs
index db2583d0e95..ddd4b00aff7 100644
--- a/quickwit/quickwit-indexing/src/source/mod.rs
+++ b/quickwit/quickwit-indexing/src/source/mod.rs
@@ -57,6 +57,7 @@
//! that file.
//! - the kafka source: the partition id is a kafka topic partition id, and the position is a kafka
//! offset.
+mod doc_file_reader;
mod file_source;
#[cfg(feature = "gcp-pubsub")]
mod gcp_pubsub_source;
@@ -111,7 +112,7 @@ use tracing::error;
pub use vec_source::{VecSource, VecSourceFactory};
pub use void_source::{VoidSource, VoidSourceFactory};
-use self::file_source::dir_and_filename;
+use self::doc_file_reader::dir_and_filename;
use crate::actors::DocProcessor;
use crate::models::RawDocBatch;
use crate::source::ingest::IngestSourceFactory;
@@ -232,7 +233,7 @@ pub trait Source: Send + 'static {
/// In that case, `batch_sink` will block.
///
/// It returns an optional duration specifying how long the batch requester
- /// should wait before pooling gain.
+ /// should wait before polling again.
async fn emit_batches(
&mut self,
doc_processor_mailbox: &Mailbox,