From 570ba64afdd5b4f65de7ece21ee516e65a4d8e83 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 10 Jun 2024 16:42:00 +0000 Subject: [PATCH 1/5] Refactor file source --- .../src/source/doc_file_reader.rs | 395 ++++++++++++++++++ .../src/source/file_source.rs | 281 +------------ quickwit/quickwit-indexing/src/source/mod.rs | 5 +- 3 files changed, 421 insertions(+), 260 deletions(-) create mode 100644 quickwit/quickwit-indexing/src/source/doc_file_reader.rs diff --git a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs new file mode 100644 index 00000000000..d7fbfa94f6e --- /dev/null +++ b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs @@ -0,0 +1,395 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::borrow::Borrow; +use std::ffi::OsStr; +use std::io; +use std::path::{Path, PathBuf}; + +use anyhow::Context; +use async_compression::tokio::bufread::GzipDecoder; +use bytes::Bytes; +use quickwit_common::uri::Uri; +use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint}; +use quickwit_proto::metastore::SourceType; +use quickwit_proto::types::Position; +use quickwit_storage::StorageResolver; +use serde::Serialize; +use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; + +use super::{BatchBuilder, SourceContext}; +use crate::models::RawDocBatch; + +/// Number of bytes after which a new batch is cut. +pub(crate) const BATCH_NUM_BYTES_LIMIT: u64 = 500_000u64; + +#[derive(Default, Clone, Debug, Eq, PartialEq, Serialize)] +pub struct DocFileCounters { + pub previous_offset: u64, + pub current_offset: u64, + pub num_lines_processed: u64, +} + +/// A helper wrapper that lets you skip bytes in compressed files where you +/// cannot seek. +struct SkipReader { + reader: BufReader>, + num_bytes_to_skip: usize, +} + +impl SkipReader { + fn new(reader: Box, num_bytes_to_skip: usize) -> Self { + Self { + reader: BufReader::new(reader), + num_bytes_to_skip, + } + } + + // This function is only called for GZIP file. + // Because they cannot be seeked into, we have to scan them to the right initial position. + async fn skip(&mut self) -> io::Result<()> { + // Allocate once a 64kb buffer. + let mut buf = [0u8; 64000]; + while self.num_bytes_to_skip > 0 { + let num_bytes_to_read = self.num_bytes_to_skip.min(buf.len()); + let num_bytes_read = self + .reader + .read_exact(&mut buf[..num_bytes_to_read]) + .await?; + self.num_bytes_to_skip -= num_bytes_read; + } + Ok(()) + } + + async fn read_line<'a>(&mut self, buf: &'a mut String) -> io::Result { + if self.num_bytes_to_skip > 0 { + self.skip().await?; + } + self.reader.read_line(buf).await + } +} + +pub struct DocFileReader { + reader: SkipReader, + counters: DocFileCounters, + partition_id: Option, +} + +impl DocFileReader { + pub async fn from_path( + checkpoint: &SourceCheckpoint, + filepath: &PathBuf, + storage_resolver: &StorageResolver, + ) -> anyhow::Result { + let partition_id = PartitionId::from(filepath.to_string_lossy().borrow()); + let offset = checkpoint + .position_for_partition(&partition_id) + .map(|position| { + position + .as_usize() + .expect("file offset should be stored as usize") + }) + .unwrap_or(0); + let (dir_uri, file_name) = dir_and_filename(filepath)?; + let storage = storage_resolver.resolve(&dir_uri).await?; + let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap(); + // If it's a gzip file, we can't seek to a specific offset, we need to start from the + // beginning of the file, decompress and skip the first `offset` bytes. + let reader = if filepath.extension() == Some(OsStr::new("gz")) { + let stream = storage.get_slice_stream(file_name, 0..file_size).await?; + DocFileReader::new( + Some(partition_id), + Box::new(GzipDecoder::new(BufReader::new(stream))), + offset, + offset, + ) + } else { + let stream = storage + .get_slice_stream(file_name, offset..file_size) + .await?; + DocFileReader::new(Some(partition_id), stream, 0, offset) + }; + Ok(reader) + } + + pub fn from_stdin() -> Self { + DocFileReader::new(None, Box::new(tokio::io::stdin()), 0, 0) + } + + fn new( + partition_id: Option, + reader: Box, + num_bytes_to_skip: usize, + offset: usize, + ) -> Self { + Self { + reader: SkipReader::new(reader, num_bytes_to_skip), + counters: DocFileCounters { + num_lines_processed: 0, + current_offset: offset as u64, + previous_offset: offset as u64, + }, + partition_id, + } + } + + // Return true if the end of the file has been reached. + pub async fn read_batch( + &mut self, + ctx: &SourceContext, + ) -> anyhow::Result<(Option, bool)> { + // We collect batches of documents before sending them to the indexer. + let limit_num_bytes = self.counters.previous_offset + BATCH_NUM_BYTES_LIMIT; + let mut reached_eof = false; + let mut batch_builder = BatchBuilder::new(SourceType::File); + + while self.counters.current_offset < limit_num_bytes { + let mut doc_line = String::new(); + // guard the zone in case of slow read, such as reading from someone + // typing to stdin + let num_bytes = ctx + .protect_future(self.reader.read_line(&mut doc_line)) + .await + .map_err(anyhow::Error::from)?; + if num_bytes == 0 { + reached_eof = true; + break; + } + batch_builder.add_doc(Bytes::from(doc_line)); + self.counters.current_offset += num_bytes as u64; + self.counters.num_lines_processed += 1; + } + if !batch_builder.docs.is_empty() { + if let Some(partition_id) = &self.partition_id { + batch_builder + .checkpoint_delta + .record_partition_delta( + partition_id.clone(), + Position::offset(self.counters.previous_offset), + Position::offset(self.counters.current_offset), + ) + .unwrap(); + } + self.counters.previous_offset = self.counters.current_offset; + Ok((Some(batch_builder.build()), reached_eof)) + } else { + Ok((None, reached_eof)) + } + } + + pub fn counters(&self) -> &DocFileCounters { + &self.counters + } +} + +pub(crate) fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> { + let dir_uri: Uri = filepath + .parent() + .context("Parent directory could not be resolved")? + .to_str() + .context("Path cannot be turned to string")? + .parse()?; + let file_name = filepath + .file_name() + .context("Path does not appear to be a file")?; + Ok((dir_uri, file_name.as_ref())) +} + +#[cfg(any(test))] +pub mod file_test_helpers { + use std::io::Write; + + use async_compression::tokio::write::GzipEncoder; + use tempfile::NamedTempFile; + + pub const DUMMY_DOC: &[u8] = r#"{"body": "hello happy tax payer!"}"#.as_bytes(); + + async fn gzip_bytes(bytes: &[u8]) -> Vec { + let mut gzip_documents = Vec::new(); + let mut encoder = GzipEncoder::new(&mut gzip_documents); + tokio::io::AsyncWriteExt::write_all(&mut encoder, bytes) + .await + .unwrap(); + // flush is not sufficient here and reading the file will raise a unexpected end of file + // error. + tokio::io::AsyncWriteExt::shutdown(&mut encoder) + .await + .unwrap(); + gzip_documents + } + + async fn write_to_tmp(data: Vec, gzip: bool) -> NamedTempFile { + let mut temp_file: tempfile::NamedTempFile = if gzip { + tempfile::Builder::new().suffix(".gz").tempfile().unwrap() + } else { + tempfile::NamedTempFile::new().unwrap() + }; + if gzip { + let gzip_documents = gzip_bytes(&data).await; + temp_file.write_all(&gzip_documents).unwrap(); + } else { + temp_file.write_all(&data).unwrap(); + } + temp_file.flush().unwrap(); + temp_file + } + + pub async fn generate_dummy_doc_file(gzip: bool, lines: usize) -> NamedTempFile { + let mut documents_bytes = Vec::with_capacity(DUMMY_DOC.len() * lines); + for _ in 0..lines { + documents_bytes.write_all(DUMMY_DOC).unwrap(); + documents_bytes.write_all("\n".as_bytes()).unwrap(); + } + write_to_tmp(documents_bytes, gzip).await + } + + pub async fn generate_index_doc_file(gzip: bool, lines: usize) -> NamedTempFile { + let mut documents_bytes = Vec::new(); + for i in 0..lines { + documents_bytes + .write_all(format!("{i}\n").as_bytes()) + .unwrap(); + } + write_to_tmp(documents_bytes, gzip).await + } + + // let temp_file_path = temp_file.path().canonicalize().unwrap(); +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use file_test_helpers::{generate_dummy_doc_file, DUMMY_DOC}; + use quickwit_actors::{ActorContext, Universe}; + use serde_json::json; + use tokio::sync::watch; + + use super::*; + + #[tokio::test] + async fn test_skip_reader() { + { + // Skip 0 bytes. + let mut reader = SkipReader::new(Box::new("hello".as_bytes()), 0); + let mut buf = String::new(); + reader.read_line(&mut buf).await.unwrap(); + assert_eq!(buf, "hello"); + } + { + // Skip 2 bytes. + let mut reader = SkipReader::new(Box::new("hello".as_bytes()), 2); + let mut buf = String::new(); + reader.read_line(&mut buf).await.unwrap(); + assert_eq!(buf, "llo"); + } + { + let input = "hello"; + let cursor = Cursor::new(input); + let mut reader = SkipReader::new(Box::new(cursor), 5); + let mut buf = String::new(); + assert!(reader.read_line(&mut buf).await.is_ok()); + } + { + let input = "hello"; + let cursor = Cursor::new(input); + let mut reader = SkipReader::new(Box::new(cursor), 10); + let mut buf = String::new(); + assert!(reader.read_line(&mut buf).await.is_err()); + } + { + let input = "hello world".repeat(10000); + let cursor = Cursor::new(input.clone()); + let mut reader = SkipReader::new(Box::new(cursor), 64000); + let mut buf = String::new(); + reader.read_line(&mut buf).await.unwrap(); + assert_eq!(buf, input[64000..]); + } + { + let input = "hello world".repeat(10000); + let cursor = Cursor::new(input.clone()); + let mut reader = SkipReader::new(Box::new(cursor), 64001); + let mut buf = String::new(); + reader.read_line(&mut buf).await.unwrap(); + assert_eq!(buf, input[64001..]); + } + } + + fn setup_test_source_ctx() -> SourceContext { + let universe = Universe::with_accelerated_time(); + let (source_mailbox, _source_inbox) = universe.create_test_mailbox(); + let (observable_state_tx, _observable_state_rx) = watch::channel(json!({})); + ActorContext::for_test(&universe, source_mailbox, observable_state_tx) + } + + async fn aux_test_batch_reader( + file: impl Into, + expected_lines: usize, + expected_batches: usize, + ) { + let checkpoint = SourceCheckpoint::default(); + let storage_resolver = StorageResolver::for_test(); + let file_path = file.into(); + let mut doc_batch_reader = + DocFileReader::from_path(&checkpoint, &file_path, &storage_resolver) + .await + .unwrap(); + let ctx = setup_test_source_ctx(); + let mut parsed_lines = 0; + let mut parsed_batches = 0; + loop { + let (batch_opt, is_eof) = doc_batch_reader.read_batch(&ctx).await.unwrap(); + if let Some(batch) = batch_opt { + parsed_lines += batch.docs.len(); + parsed_batches += 1; + assert!(parsed_lines > 0); + assert!(parsed_lines <= expected_lines); + } else { + assert!(is_eof); + break; + } + } + assert_eq!(parsed_lines, expected_lines); + assert_eq!(parsed_batches, expected_batches); + } + + #[tokio::test] + async fn test_batch_reader_small() { + aux_test_batch_reader("data/test_corpus.json", 4, 1).await; + } + + #[tokio::test] + async fn test_batch_reader_small_gz() { + aux_test_batch_reader("data/test_corpus.json.gz", 4, 1).await; + } + + #[tokio::test] + async fn test_read_multiple_batches() { + let lines = BATCH_NUM_BYTES_LIMIT as usize / DUMMY_DOC.len() + 10; + let dummy_doc_file = generate_dummy_doc_file(false, lines).await; + aux_test_batch_reader(dummy_doc_file.path(), lines, 2).await; + } + + #[tokio::test] + async fn test_read_multiple_batches_gz() { + let lines = BATCH_NUM_BYTES_LIMIT as usize / DUMMY_DOC.len() + 10; + let dummy_doc_file_gz = generate_dummy_doc_file(true, lines).await; + aux_test_batch_reader(dummy_doc_file_gz.path(), lines, 2).await; + } +} diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index 9ad169f701f..2f427b49d04 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -17,45 +17,22 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::borrow::Borrow; -use std::ffi::OsStr; -use std::path::Path; +use std::fmt; use std::time::Duration; -use std::{fmt, io}; -use anyhow::Context; -use async_compression::tokio::bufread::GzipDecoder; use async_trait::async_trait; -use bytes::Bytes; use quickwit_actors::{ActorExitStatus, Mailbox}; -use quickwit_common::uri::Uri; use quickwit_config::FileSourceParams; -use quickwit_metastore::checkpoint::PartitionId; -use quickwit_proto::metastore::SourceType; -use quickwit_proto::types::{Position, SourceId}; -use serde::Serialize; -use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; +use quickwit_proto::types::SourceId; use tracing::info; -use super::BatchBuilder; +use super::doc_file_reader::DocFileReader; use crate::actors::DocProcessor; use crate::source::{Source, SourceContext, SourceRuntime, TypedSourceFactory}; -/// Number of bytes after which a new batch is cut. -pub(crate) const BATCH_NUM_BYTES_LIMIT: u64 = 500_000u64; - -#[derive(Default, Clone, Debug, Eq, PartialEq, Serialize)] -pub struct FileSourceCounters { - pub previous_offset: u64, - pub current_offset: u64, - pub num_lines_processed: u64, -} - pub struct FileSource { source_id: SourceId, - params: FileSourceParams, - counters: FileSourceCounters, - reader: FileSourceReader, + reader: DocFileReader, } impl fmt::Debug for FileSource { @@ -71,46 +48,9 @@ impl Source for FileSource { doc_processor_mailbox: &Mailbox, ctx: &SourceContext, ) -> Result { - // We collect batches of documents before sending them to the indexer. - let limit_num_bytes = self.counters.previous_offset + BATCH_NUM_BYTES_LIMIT; - let mut reached_eof = false; - let mut batch_builder = BatchBuilder::new(SourceType::File); - - while self.counters.current_offset < limit_num_bytes { - let mut doc_line = String::new(); - // guard the zone in case of slow read, such as reading from someone - // typing to stdin - let num_bytes = ctx - .protect_future(self.reader.read_line(&mut doc_line)) - .await - .map_err(anyhow::Error::from)?; - if num_bytes == 0 { - reached_eof = true; - break; - } - batch_builder.add_doc(Bytes::from(doc_line)); - self.counters.current_offset += num_bytes as u64; - self.counters.num_lines_processed += 1; - } - if !batch_builder.docs.is_empty() { - if let Some(filepath) = &self.params.filepath { - let filepath_str = filepath - .to_str() - .context("path is invalid utf-8")? - .to_string(); - let partition_id = PartitionId::from(filepath_str); - batch_builder - .checkpoint_delta - .record_partition_delta( - partition_id, - Position::offset(self.counters.previous_offset), - Position::offset(self.counters.current_offset), - ) - .unwrap(); - } - self.counters.previous_offset = self.counters.current_offset; - ctx.send_message(doc_processor_mailbox, batch_builder.build()) - .await?; + let (batch_opt, reached_eof) = self.reader.read_batch(ctx).await?; + if let Some(batch) = batch_opt { + ctx.send_message(doc_processor_mailbox, batch).await?; } if reached_eof { info!("reached end of file"); @@ -125,7 +65,7 @@ impl Source for FileSource { } fn observable_state(&self) -> serde_json::Value { - serde_json::to_value(&self.counters).unwrap() + serde_json::to_value(self.reader.counters()).unwrap() } } @@ -140,114 +80,37 @@ impl TypedSourceFactory for FileSourceFactory { source_runtime: SourceRuntime, params: FileSourceParams, ) -> anyhow::Result { - let checkpoint = source_runtime.fetch_checkpoint().await?; - let mut offset = 0; - - let reader: FileSourceReader = if let Some(filepath) = ¶ms.filepath { - let partition_id = PartitionId::from(filepath.to_string_lossy().borrow()); - offset = checkpoint - .position_for_partition(&partition_id) - .map(|position| { - position - .as_usize() - .expect("file offset should be stored as usize") - }) - .unwrap_or(0); - let (dir_uri, file_name) = dir_and_filename(filepath)?; - let storage = source_runtime.storage_resolver.resolve(&dir_uri).await?; - let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap(); - // If it's a gzip file, we can't seek to a specific offset, we need to start from the - // beginning of the file, decompress and skip the first `offset` bytes. - if filepath.extension() == Some(OsStr::new("gz")) { - let stream = storage.get_slice_stream(file_name, 0..file_size).await?; - FileSourceReader::new(Box::new(GzipDecoder::new(BufReader::new(stream))), offset) - } else { - let stream = storage - .get_slice_stream(file_name, offset..file_size) - .await?; - FileSourceReader::new(stream, 0) - } + let reader: DocFileReader = if let Some(filepath) = ¶ms.filepath { + let checkpoint = source_runtime.fetch_checkpoint().await?; + DocFileReader::from_path(&checkpoint, filepath, &source_runtime.storage_resolver) + .await? } else { // We cannot use the checkpoint. - FileSourceReader::new(Box::new(tokio::io::stdin()), 0) + DocFileReader::from_stdin() }; let file_source = FileSource { source_id: source_runtime.source_id().to_string(), - counters: FileSourceCounters { - previous_offset: offset as u64, - current_offset: offset as u64, - num_lines_processed: 0, - }, reader, - params, }; Ok(file_source) } } -struct FileSourceReader { - reader: BufReader>, - num_bytes_to_skip: usize, -} - -impl FileSourceReader { - fn new(reader: Box, num_bytes_to_skip: usize) -> Self { - Self { - reader: BufReader::new(reader), - num_bytes_to_skip, - } - } - - // This function is only called for GZIP file. - // Because they cannot be seeked into, we have to scan them to the right initial position. - async fn skip(&mut self) -> io::Result<()> { - // Allocate once a 64kb buffer. - let mut buf = [0u8; 64000]; - while self.num_bytes_to_skip > 0 { - let num_bytes_to_read = self.num_bytes_to_skip.min(buf.len()); - let num_bytes_read = self - .reader - .read_exact(&mut buf[..num_bytes_to_read]) - .await?; - self.num_bytes_to_skip -= num_bytes_read; - } - Ok(()) - } - - async fn read_line<'a>(&mut self, buf: &'a mut String) -> io::Result { - if self.num_bytes_to_skip > 0 { - self.skip().await?; - } - self.reader.read_line(buf).await - } -} - -pub(crate) fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> { - let dir_uri: Uri = filepath - .parent() - .context("Parent directory could not be resolved")? - .to_str() - .context("Path cannot be turned to string")? - .parse()?; - let file_name = filepath - .file_name() - .context("Path does not appear to be a file")?; - Ok((dir_uri, file_name.as_ref())) -} - #[cfg(test)] mod tests { - use std::io::{Cursor, Write}; + use std::borrow::Borrow; use std::num::NonZeroUsize; - use async_compression::tokio::write::GzipEncoder; use quickwit_actors::{Command, Universe}; use quickwit_config::{SourceConfig, SourceInputFormat, SourceParams}; - use quickwit_metastore::checkpoint::SourceCheckpointDelta; - use quickwit_proto::types::IndexUid; + use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpointDelta}; + use quickwit_proto::types::{IndexUid, Position}; use super::*; use crate::models::RawDocBatch; + use crate::source::doc_file_reader::file_test_helpers::{ + generate_dummy_doc_file, generate_index_doc_file, + }; use crate::source::tests::SourceRuntimeBuilder; use crate::source::SourceActor; @@ -312,25 +175,7 @@ mod tests { quickwit_common::setup_logging_for_tests(); let universe = Universe::with_accelerated_time(); let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); - let mut documents_bytes = Vec::new(); - for _ in 0..20_000 { - documents_bytes - .write_all(r#"{"body": "hello happy tax payer!"}"#.as_bytes()) - .unwrap(); - documents_bytes.write_all("\n".as_bytes()).unwrap(); - } - let mut temp_file: tempfile::NamedTempFile = if gzip { - tempfile::Builder::new().suffix(".gz").tempfile().unwrap() - } else { - tempfile::NamedTempFile::new().unwrap() - }; - if gzip { - let gzip_documents = gzip_bytes(&documents_bytes).await; - temp_file.write_all(&gzip_documents).unwrap(); - } else { - temp_file.write_all(&documents_bytes).unwrap(); - } - temp_file.flush().unwrap(); + let temp_file = generate_dummy_doc_file(gzip, 20_000).await; let params = FileSourceParams::file(temp_file.path()); let filepath = params .filepath @@ -408,27 +253,9 @@ mod tests { quickwit_common::setup_logging_for_tests(); let universe = Universe::with_accelerated_time(); let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); - let mut documents_bytes = Vec::new(); - for i in 0..100 { - documents_bytes - .write_all(format!("{i}\n").as_bytes()) - .unwrap(); - } - let mut temp_file: tempfile::NamedTempFile = if gzip { - tempfile::Builder::new().suffix(".gz").tempfile().unwrap() - } else { - tempfile::NamedTempFile::new().unwrap() - }; - let temp_file_path = temp_file.path().canonicalize().unwrap(); - if gzip { - let gzipped_documents = gzip_bytes(&documents_bytes).await; - temp_file.write_all(&gzipped_documents).unwrap(); - } else { - temp_file.write_all(&documents_bytes).unwrap(); - } - temp_file.flush().unwrap(); - - let params = FileSourceParams::file(&temp_file_path); + let temp_file = generate_index_doc_file(gzip, 100).await; + let temp_file_path = temp_file.path(); + let params = FileSourceParams::file(temp_file_path); let source_config = SourceConfig { source_id: "test-file-source".to_string(), num_pipelines: NonZeroUsize::new(1).unwrap(), @@ -473,66 +300,4 @@ mod tests { let indexer_messages: Vec = doc_processor_inbox.drain_for_test_typed(); assert!(&indexer_messages[0].docs[0].starts_with(b"2\n")); } - - async fn gzip_bytes(bytes: &[u8]) -> Vec { - let mut gzip_documents = Vec::new(); - let mut encoder = GzipEncoder::new(&mut gzip_documents); - tokio::io::AsyncWriteExt::write_all(&mut encoder, bytes) - .await - .unwrap(); - // flush is not sufficient here and reading the file will raise a unexpected end of file - // error. - tokio::io::AsyncWriteExt::shutdown(&mut encoder) - .await - .unwrap(); - gzip_documents - } - - #[tokio::test] - async fn test_skip_reader() { - { - // Skip 0 bytes. - let mut reader = FileSourceReader::new(Box::new("hello".as_bytes()), 0); - let mut buf = String::new(); - reader.read_line(&mut buf).await.unwrap(); - assert_eq!(buf, "hello"); - } - { - // Skip 2 bytes. - let mut reader = FileSourceReader::new(Box::new("hello".as_bytes()), 2); - let mut buf = String::new(); - reader.read_line(&mut buf).await.unwrap(); - assert_eq!(buf, "llo"); - } - { - let input = "hello"; - let cursor = Cursor::new(input); - let mut reader = FileSourceReader::new(Box::new(cursor), 5); - let mut buf = String::new(); - assert!(reader.read_line(&mut buf).await.is_ok()); - } - { - let input = "hello"; - let cursor = Cursor::new(input); - let mut reader = FileSourceReader::new(Box::new(cursor), 10); - let mut buf = String::new(); - assert!(reader.read_line(&mut buf).await.is_err()); - } - { - let input = "hello world".repeat(10000); - let cursor = Cursor::new(input.clone()); - let mut reader = FileSourceReader::new(Box::new(cursor), 64000); - let mut buf = String::new(); - reader.read_line(&mut buf).await.unwrap(); - assert_eq!(buf, input[64000..]); - } - { - let input = "hello world".repeat(10000); - let cursor = Cursor::new(input.clone()); - let mut reader = FileSourceReader::new(Box::new(cursor), 64001); - let mut buf = String::new(); - reader.read_line(&mut buf).await.unwrap(); - assert_eq!(buf, input[64001..]); - } - } } diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index db2583d0e95..ddd4b00aff7 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -57,6 +57,7 @@ //! that file. //! - the kafka source: the partition id is a kafka topic partition id, and the position is a kafka //! offset. +mod doc_file_reader; mod file_source; #[cfg(feature = "gcp-pubsub")] mod gcp_pubsub_source; @@ -111,7 +112,7 @@ use tracing::error; pub use vec_source::{VecSource, VecSourceFactory}; pub use void_source::{VoidSource, VoidSourceFactory}; -use self::file_source::dir_and_filename; +use self::doc_file_reader::dir_and_filename; use crate::actors::DocProcessor; use crate::models::RawDocBatch; use crate::source::ingest::IngestSourceFactory; @@ -232,7 +233,7 @@ pub trait Source: Send + 'static { /// In that case, `batch_sink` will block. /// /// It returns an optional duration specifying how long the batch requester - /// should wait before pooling gain. + /// should wait before polling again. async fn emit_batches( &mut self, doc_processor_mailbox: &Mailbox, From f96e61ee684691ca17152cac922083fb5ea21188 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 10 Jun 2024 16:42:00 +0000 Subject: [PATCH 2/5] More explicit reponse for read_batch --- .../src/source/doc_file_reader.rs | 36 +++++++++++++------ .../src/source/file_source.rs | 6 ++-- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs index d7fbfa94f6e..80d9e662d46 100644 --- a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs +++ b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs @@ -91,6 +91,12 @@ pub struct DocFileReader { partition_id: Option, } +/// Result of a read attempt on a `DocFileReader` +pub struct ReadBatchResponse { + pub batch_opt: Option, + pub is_eof: bool, +} + impl DocFileReader { pub async fn from_path( checkpoint: &SourceCheckpoint, @@ -149,12 +155,13 @@ impl DocFileReader { } } - // Return true if the end of the file has been reached. - pub async fn read_batch( - &mut self, - ctx: &SourceContext, - ) -> anyhow::Result<(Option, bool)> { - // We collect batches of documents before sending them to the indexer. + /// Builds a new record batch from the reader. + /// + /// Sets `is_eof` to true when the underlying reader reaches EOF while + /// building the batch. This helps detecting early that we are done reading + /// the file. The returned `batch_opt` is `None` if the reader is already + /// EOF. + pub async fn read_batch(&mut self, ctx: &SourceContext) -> anyhow::Result { let limit_num_bytes = self.counters.previous_offset + BATCH_NUM_BYTES_LIMIT; let mut reached_eof = false; let mut batch_builder = BatchBuilder::new(SourceType::File); @@ -187,9 +194,15 @@ impl DocFileReader { .unwrap(); } self.counters.previous_offset = self.counters.current_offset; - Ok((Some(batch_builder.build()), reached_eof)) + Ok(ReadBatchResponse { + batch_opt: Some(batch_builder.build()), + is_eof: reached_eof, + }) } else { - Ok((None, reached_eof)) + Ok(ReadBatchResponse { + batch_opt: None, + is_eof: reached_eof, + }) } } @@ -268,8 +281,6 @@ pub mod file_test_helpers { } write_to_tmp(documents_bytes, gzip).await } - - // let temp_file_path = temp_file.path().canonicalize().unwrap(); } #[cfg(test)] @@ -354,7 +365,8 @@ mod tests { let mut parsed_lines = 0; let mut parsed_batches = 0; loop { - let (batch_opt, is_eof) = doc_batch_reader.read_batch(&ctx).await.unwrap(); + let ReadBatchResponse { batch_opt, is_eof } = + doc_batch_reader.read_batch(&ctx).await.unwrap(); if let Some(batch) = batch_opt { parsed_lines += batch.docs.len(); parsed_batches += 1; @@ -362,6 +374,8 @@ mod tests { assert!(parsed_lines <= expected_lines); } else { assert!(is_eof); + } + if is_eof { break; } } diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index 2f427b49d04..b008b427759 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -26,7 +26,7 @@ use quickwit_config::FileSourceParams; use quickwit_proto::types::SourceId; use tracing::info; -use super::doc_file_reader::DocFileReader; +use super::doc_file_reader::{DocFileReader, ReadBatchResponse}; use crate::actors::DocProcessor; use crate::source::{Source, SourceContext, SourceRuntime, TypedSourceFactory}; @@ -48,11 +48,11 @@ impl Source for FileSource { doc_processor_mailbox: &Mailbox, ctx: &SourceContext, ) -> Result { - let (batch_opt, reached_eof) = self.reader.read_batch(ctx).await?; + let ReadBatchResponse { batch_opt, is_eof } = self.reader.read_batch(ctx).await?; if let Some(batch) = batch_opt { ctx.send_message(doc_processor_mailbox, batch).await?; } - if reached_eof { + if is_eof { info!("reached end of file"); ctx.send_exit_with_success(doc_processor_mailbox).await?; return Err(ActorExitStatus::Success); From d8eba166e3fde6dcdcce93e93dbb6fdb1b2746b2 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 11 Jun 2024 07:47:01 +0000 Subject: [PATCH 3/5] Fix clippy --- quickwit/quickwit-indexing/src/source/doc_file_reader.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs index 80d9e662d46..491f940bc8b 100644 --- a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs +++ b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs @@ -20,7 +20,7 @@ use std::borrow::Borrow; use std::ffi::OsStr; use std::io; -use std::path::{Path, PathBuf}; +use std::path::Path; use anyhow::Context; use async_compression::tokio::bufread::GzipDecoder; @@ -100,7 +100,7 @@ pub struct ReadBatchResponse { impl DocFileReader { pub async fn from_path( checkpoint: &SourceCheckpoint, - filepath: &PathBuf, + filepath: &Path, storage_resolver: &StorageResolver, ) -> anyhow::Result { let partition_id = PartitionId::from(filepath.to_string_lossy().borrow()); @@ -224,7 +224,7 @@ pub(crate) fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> Ok((dir_uri, file_name.as_ref())) } -#[cfg(any(test))] +#[cfg(test)] pub mod file_test_helpers { use std::io::Write; @@ -286,6 +286,7 @@ pub mod file_test_helpers { #[cfg(test)] mod tests { use std::io::Cursor; + use std::path::PathBuf; use file_test_helpers::{generate_dummy_doc_file, DUMMY_DOC}; use quickwit_actors::{ActorContext, Universe}; From 1e4ae6e7deb7ea200c5d274a44dff984646abc63 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 12 Jun 2024 21:05:58 +0000 Subject: [PATCH 4/5] Remove batch logic from DocFileReader --- .../src/source/doc_file_reader.rs | 259 +++++++----------- .../src/source/file_source.rs | 101 +++++-- 2 files changed, 174 insertions(+), 186 deletions(-) diff --git a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs index 491f940bc8b..a6eb686fc94 100644 --- a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs +++ b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs @@ -17,7 +17,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::borrow::Borrow; use std::ffi::OsStr; use std::io; use std::path::Path; @@ -26,28 +25,16 @@ use anyhow::Context; use async_compression::tokio::bufread::GzipDecoder; use bytes::Bytes; use quickwit_common::uri::Uri; -use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint}; -use quickwit_proto::metastore::SourceType; -use quickwit_proto::types::Position; use quickwit_storage::StorageResolver; -use serde::Serialize; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; -use super::{BatchBuilder, SourceContext}; -use crate::models::RawDocBatch; - -/// Number of bytes after which a new batch is cut. -pub(crate) const BATCH_NUM_BYTES_LIMIT: u64 = 500_000u64; - -#[derive(Default, Clone, Debug, Eq, PartialEq, Serialize)] -pub struct DocFileCounters { - pub previous_offset: u64, - pub current_offset: u64, - pub num_lines_processed: u64, +pub struct FileRecord { + pub next_offset: u64, + pub doc: Bytes, } /// A helper wrapper that lets you skip bytes in compressed files where you -/// cannot seek. +/// cannot seek (e.g. gzip files). struct SkipReader { reader: BufReader>, num_bytes_to_skip: usize, @@ -61,10 +48,8 @@ impl SkipReader { } } - // This function is only called for GZIP file. - // Because they cannot be seeked into, we have to scan them to the right initial position. async fn skip(&mut self) -> io::Result<()> { - // Allocate once a 64kb buffer. + // Allocating 64KB once on the stack should be fine (<1% of the Linux stack size) let mut buf = [0u8; 64000]; while self.num_bytes_to_skip > 0 { let num_bytes_to_read = self.num_bytes_to_skip.min(buf.len()); @@ -87,128 +72,62 @@ impl SkipReader { pub struct DocFileReader { reader: SkipReader, - counters: DocFileCounters, - partition_id: Option, -} - -/// Result of a read attempt on a `DocFileReader` -pub struct ReadBatchResponse { - pub batch_opt: Option, - pub is_eof: bool, + next_offset: u64, } impl DocFileReader { pub async fn from_path( - checkpoint: &SourceCheckpoint, - filepath: &Path, storage_resolver: &StorageResolver, + filepath: &Path, + offset: usize, ) -> anyhow::Result { - let partition_id = PartitionId::from(filepath.to_string_lossy().borrow()); - let offset = checkpoint - .position_for_partition(&partition_id) - .map(|position| { - position - .as_usize() - .expect("file offset should be stored as usize") - }) - .unwrap_or(0); let (dir_uri, file_name) = dir_and_filename(filepath)?; let storage = storage_resolver.resolve(&dir_uri).await?; let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap(); - // If it's a gzip file, we can't seek to a specific offset, we need to start from the - // beginning of the file, decompress and skip the first `offset` bytes. + // If it's a gzip file, we can't seek to a specific offset. `SkipReader` + // starts from the beginning of the file, decompresses and skips the + // first `offset` bytes. let reader = if filepath.extension() == Some(OsStr::new("gz")) { let stream = storage.get_slice_stream(file_name, 0..file_size).await?; - DocFileReader::new( - Some(partition_id), - Box::new(GzipDecoder::new(BufReader::new(stream))), - offset, - offset, - ) + let decompressed_stream = Box::new(GzipDecoder::new(BufReader::new(stream))); + DocFileReader { + reader: SkipReader::new(decompressed_stream, offset), + next_offset: offset as u64, + } } else { let stream = storage .get_slice_stream(file_name, offset..file_size) .await?; - DocFileReader::new(Some(partition_id), stream, 0, offset) + DocFileReader { + reader: SkipReader::new(stream, 0), + next_offset: offset as u64, + } }; Ok(reader) } pub fn from_stdin() -> Self { - DocFileReader::new(None, Box::new(tokio::io::stdin()), 0, 0) - } - - fn new( - partition_id: Option, - reader: Box, - num_bytes_to_skip: usize, - offset: usize, - ) -> Self { - Self { - reader: SkipReader::new(reader, num_bytes_to_skip), - counters: DocFileCounters { - num_lines_processed: 0, - current_offset: offset as u64, - previous_offset: offset as u64, - }, - partition_id, + DocFileReader { + reader: SkipReader::new(Box::new(tokio::io::stdin()), 0), + next_offset: 0, } } - /// Builds a new record batch from the reader. - /// - /// Sets `is_eof` to true when the underlying reader reaches EOF while - /// building the batch. This helps detecting early that we are done reading - /// the file. The returned `batch_opt` is `None` if the reader is already - /// EOF. - pub async fn read_batch(&mut self, ctx: &SourceContext) -> anyhow::Result { - let limit_num_bytes = self.counters.previous_offset + BATCH_NUM_BYTES_LIMIT; - let mut reached_eof = false; - let mut batch_builder = BatchBuilder::new(SourceType::File); - - while self.counters.current_offset < limit_num_bytes { - let mut doc_line = String::new(); - // guard the zone in case of slow read, such as reading from someone - // typing to stdin - let num_bytes = ctx - .protect_future(self.reader.read_line(&mut doc_line)) - .await - .map_err(anyhow::Error::from)?; - if num_bytes == 0 { - reached_eof = true; - break; - } - batch_builder.add_doc(Bytes::from(doc_line)); - self.counters.current_offset += num_bytes as u64; - self.counters.num_lines_processed += 1; - } - if !batch_builder.docs.is_empty() { - if let Some(partition_id) = &self.partition_id { - batch_builder - .checkpoint_delta - .record_partition_delta( - partition_id.clone(), - Position::offset(self.counters.previous_offset), - Position::offset(self.counters.current_offset), - ) - .unwrap(); - } - self.counters.previous_offset = self.counters.current_offset; - Ok(ReadBatchResponse { - batch_opt: Some(batch_builder.build()), - is_eof: reached_eof, - }) + /// Reads the next record from the underlying file. Returns `None` when EOF + /// is reached. + pub async fn next_record(&mut self) -> anyhow::Result> { + let mut buf = String::new(); + let res = self.reader.read_line(&mut buf).await?; + if res == 0 { + Ok(None) } else { - Ok(ReadBatchResponse { - batch_opt: None, - is_eof: reached_eof, - }) + self.next_offset += res as u64; + Ok(Some(FileRecord { + next_offset: self.next_offset, + doc: Bytes::from(buf), + })) } } - - pub fn counters(&self) -> &DocFileCounters { - &self.counters - } } pub(crate) fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> { @@ -288,10 +207,7 @@ mod tests { use std::io::Cursor; use std::path::PathBuf; - use file_test_helpers::{generate_dummy_doc_file, DUMMY_DOC}; - use quickwit_actors::{ActorContext, Universe}; - use serde_json::json; - use tokio::sync::watch; + use file_test_helpers::generate_index_doc_file; use super::*; @@ -343,68 +259,79 @@ mod tests { } } - fn setup_test_source_ctx() -> SourceContext { - let universe = Universe::with_accelerated_time(); - let (source_mailbox, _source_inbox) = universe.create_test_mailbox(); - let (observable_state_tx, _observable_state_rx) = watch::channel(json!({})); - ActorContext::for_test(&universe, source_mailbox, observable_state_tx) - } - - async fn aux_test_batch_reader( - file: impl Into, - expected_lines: usize, - expected_batches: usize, - ) { - let checkpoint = SourceCheckpoint::default(); + async fn aux_test_full_read(file: impl Into, expected_lines: usize) { let storage_resolver = StorageResolver::for_test(); let file_path = file.into(); - let mut doc_batch_reader = - DocFileReader::from_path(&checkpoint, &file_path, &storage_resolver) - .await - .unwrap(); - let ctx = setup_test_source_ctx(); + let mut doc_reader = DocFileReader::from_path(&storage_resolver, &file_path, 0) + .await + .unwrap(); let mut parsed_lines = 0; - let mut parsed_batches = 0; - loop { - let ReadBatchResponse { batch_opt, is_eof } = - doc_batch_reader.read_batch(&ctx).await.unwrap(); - if let Some(batch) = batch_opt { - parsed_lines += batch.docs.len(); - parsed_batches += 1; - assert!(parsed_lines > 0); - assert!(parsed_lines <= expected_lines); - } else { - assert!(is_eof); - } - if is_eof { - break; - } + while doc_reader.next_record().await.unwrap().is_some() { + parsed_lines += 1; } assert_eq!(parsed_lines, expected_lines); - assert_eq!(parsed_batches, expected_batches); } #[tokio::test] - async fn test_batch_reader_small() { - aux_test_batch_reader("data/test_corpus.json", 4, 1).await; + async fn test_full_read() { + aux_test_full_read("data/test_corpus.json", 4).await; } #[tokio::test] - async fn test_batch_reader_small_gz() { - aux_test_batch_reader("data/test_corpus.json.gz", 4, 1).await; + async fn test_full_read_gz() { + aux_test_full_read("data/test_corpus.json.gz", 4).await; + } + + async fn aux_test_resumed_read( + file: impl Into, + expected_lines: usize, + stop_at_line: usize, + ) { + let storage_resolver = StorageResolver::for_test(); + let file_path = file.into(); + // read the first part of the file + let mut first_part_reader = DocFileReader::from_path(&storage_resolver, &file_path, 0) + .await + .unwrap(); + let mut resume_offset = 0; + let mut parsed_lines = 0; + for _ in 0..stop_at_line { + let rec = first_part_reader + .next_record() + .await + .unwrap() + .expect("EOF happened before stop_at_line"); + resume_offset = rec.next_offset as usize; + assert_eq!(Bytes::from(format!("{parsed_lines}\n")), rec.doc); + parsed_lines += 1; + } + // read the second part of the file + let mut second_part_reader = + DocFileReader::from_path(&storage_resolver, &file_path, resume_offset) + .await + .unwrap(); + while let Some(rec) = second_part_reader.next_record().await.unwrap() { + assert_eq!(Bytes::from(format!("{parsed_lines}\n")), rec.doc); + parsed_lines += 1; + } + assert_eq!(parsed_lines, expected_lines); } #[tokio::test] - async fn test_read_multiple_batches() { - let lines = BATCH_NUM_BYTES_LIMIT as usize / DUMMY_DOC.len() + 10; - let dummy_doc_file = generate_dummy_doc_file(false, lines).await; - aux_test_batch_reader(dummy_doc_file.path(), lines, 2).await; + async fn test_resume_read() { + let dummy_doc_file = generate_index_doc_file(false, 1000).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 1).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 40).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 999).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 1000).await; } #[tokio::test] - async fn test_read_multiple_batches_gz() { - let lines = BATCH_NUM_BYTES_LIMIT as usize / DUMMY_DOC.len() + 10; - let dummy_doc_file_gz = generate_dummy_doc_file(true, lines).await; - aux_test_batch_reader(dummy_doc_file_gz.path(), lines, 2).await; + async fn test_resume_read_gz() { + let dummy_doc_file = generate_index_doc_file(true, 1000).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 1).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 40).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 999).await; + aux_test_resumed_read(dummy_doc_file.path(), 1000, 1000).await; } } diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index b008b427759..b739317c1af 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -17,22 +17,38 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::borrow::Borrow; use std::fmt; use std::time::Duration; use async_trait::async_trait; use quickwit_actors::{ActorExitStatus, Mailbox}; use quickwit_config::FileSourceParams; -use quickwit_proto::types::SourceId; +use quickwit_metastore::checkpoint::PartitionId; +use quickwit_proto::metastore::SourceType; +use quickwit_proto::types::{Position, SourceId}; +use serde::Serialize; use tracing::info; -use super::doc_file_reader::{DocFileReader, ReadBatchResponse}; +use super::doc_file_reader::DocFileReader; +use super::BatchBuilder; use crate::actors::DocProcessor; use crate::source::{Source, SourceContext, SourceRuntime, TypedSourceFactory}; +/// Number of bytes after which a new batch is cut. +pub(crate) const BATCH_NUM_BYTES_LIMIT: u64 = 500_000; + +#[derive(Default, Clone, Debug, Eq, PartialEq, Serialize)] +pub struct FileSourceCounters { + pub offset: u64, + pub num_lines_processed: usize, +} + pub struct FileSource { source_id: SourceId, reader: DocFileReader, + partition_id: Option, + counters: FileSourceCounters, } impl fmt::Debug for FileSource { @@ -48,9 +64,35 @@ impl Source for FileSource { doc_processor_mailbox: &Mailbox, ctx: &SourceContext, ) -> Result { - let ReadBatchResponse { batch_opt, is_eof } = self.reader.read_batch(ctx).await?; - if let Some(batch) = batch_opt { - ctx.send_message(doc_processor_mailbox, batch).await?; + let limit_num_bytes = self.counters.offset + BATCH_NUM_BYTES_LIMIT; + let mut new_offset = self.counters.offset; + let mut batch_builder = BatchBuilder::new(SourceType::File); + let is_eof = loop { + if new_offset >= limit_num_bytes { + break false; + } + if let Some(record) = ctx.protect_future(self.reader.next_record()).await? { + new_offset = record.next_offset; + self.counters.num_lines_processed += 1; + batch_builder.add_doc(record.doc); + } else { + break true; + } + }; + if new_offset > self.counters.offset { + if let Some(partition_id) = &self.partition_id { + batch_builder + .checkpoint_delta + .record_partition_delta( + partition_id.clone(), + Position::offset(self.counters.offset), + Position::offset(new_offset), + ) + .unwrap(); + } + ctx.send_message(doc_processor_mailbox, batch_builder.build()) + .await?; + self.counters.offset = new_offset; } if is_eof { info!("reached end of file"); @@ -65,7 +107,7 @@ impl Source for FileSource { } fn observable_state(&self) -> serde_json::Value { - serde_json::to_value(self.reader.counters()).unwrap() + serde_json::to_value(&self.counters).unwrap() } } @@ -80,17 +122,39 @@ impl TypedSourceFactory for FileSourceFactory { source_runtime: SourceRuntime, params: FileSourceParams, ) -> anyhow::Result { - let reader: DocFileReader = if let Some(filepath) = ¶ms.filepath { + let file_source = if let Some(filepath) = ¶ms.filepath { + let partition_id = PartitionId::from(filepath.to_string_lossy().borrow()); let checkpoint = source_runtime.fetch_checkpoint().await?; - DocFileReader::from_path(&checkpoint, filepath, &source_runtime.storage_resolver) - .await? + let offset = checkpoint + .position_for_partition(&partition_id) + .map(|position| { + position + .as_usize() + .expect("file offset should be stored as usize") + }) + .unwrap_or(0); + + let reader = + DocFileReader::from_path(&source_runtime.storage_resolver, filepath, offset) + .await?; + + FileSource { + source_id: source_runtime.source_id().to_string(), + reader, + partition_id: Some(partition_id), + counters: FileSourceCounters { + offset: offset as u64, + ..Default::default() + }, + } } else { // We cannot use the checkpoint. - DocFileReader::from_stdin() - }; - let file_source = FileSource { - source_id: source_runtime.source_id().to_string(), - reader, + FileSource { + source_id: source_runtime.source_id().to_string(), + reader: DocFileReader::from_stdin(), + partition_id: None, + counters: FileSourceCounters::default(), + } }; Ok(file_source) } @@ -152,8 +216,7 @@ mod tests { assert_eq!( counters, serde_json::json!({ - "previous_offset": 1030u64, - "current_offset": 1030u64, + "offset": 1030u64, "num_lines_processed": 4u32 }) ); @@ -208,8 +271,7 @@ mod tests { assert_eq!( counters, serde_json::json!({ - "previous_offset": 700_000u64, - "current_offset": 700_000u64, + "offset": 700_000u64, "num_lines_processed": 20_000u64 }) ); @@ -292,8 +354,7 @@ mod tests { assert_eq!( counters, serde_json::json!({ - "previous_offset": 290u64, - "current_offset": 290u64, + "offset": 290u64, "num_lines_processed": 98u64 }) ); From 6d6fdb1c9b2abf3546e1c51c2f80f18493ab398f Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Fri, 14 Jun 2024 07:29:07 +0000 Subject: [PATCH 5/5] Address styling comments --- .../src/source/file_source.rs | 70 +++++++++---------- 1 file changed, 34 insertions(+), 36 deletions(-) diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index b739317c1af..ac00147bb29 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -21,6 +21,7 @@ use std::borrow::Borrow; use std::fmt; use std::time::Duration; +use anyhow::Context; use async_trait::async_trait; use quickwit_actors::{ActorExitStatus, Mailbox}; use quickwit_config::FileSourceParams; @@ -67,18 +68,17 @@ impl Source for FileSource { let limit_num_bytes = self.counters.offset + BATCH_NUM_BYTES_LIMIT; let mut new_offset = self.counters.offset; let mut batch_builder = BatchBuilder::new(SourceType::File); - let is_eof = loop { - if new_offset >= limit_num_bytes { - break false; - } + let mut is_eof = false; + while new_offset < limit_num_bytes { if let Some(record) = ctx.protect_future(self.reader.next_record()).await? { new_offset = record.next_offset; self.counters.num_lines_processed += 1; batch_builder.add_doc(record.doc); } else { - break true; + is_eof = true; + break; } - }; + } if new_offset > self.counters.offset { if let Some(partition_id) = &self.partition_id { batch_builder @@ -122,41 +122,39 @@ impl TypedSourceFactory for FileSourceFactory { source_runtime: SourceRuntime, params: FileSourceParams, ) -> anyhow::Result { - let file_source = if let Some(filepath) = ¶ms.filepath { - let partition_id = PartitionId::from(filepath.to_string_lossy().borrow()); - let checkpoint = source_runtime.fetch_checkpoint().await?; - let offset = checkpoint - .position_for_partition(&partition_id) - .map(|position| { - position - .as_usize() - .expect("file offset should be stored as usize") - }) - .unwrap_or(0); - - let reader = - DocFileReader::from_path(&source_runtime.storage_resolver, filepath, offset) - .await?; - - FileSource { - source_id: source_runtime.source_id().to_string(), - reader, - partition_id: Some(partition_id), - counters: FileSourceCounters { - offset: offset as u64, - ..Default::default() - }, - } - } else { - // We cannot use the checkpoint. - FileSource { + let Some(filepath) = ¶ms.filepath else { + return Ok(FileSource { source_id: source_runtime.source_id().to_string(), reader: DocFileReader::from_stdin(), partition_id: None, counters: FileSourceCounters::default(), - } + }); }; - Ok(file_source) + + let partition_id = PartitionId::from(filepath.to_string_lossy().borrow()); + let checkpoint = source_runtime.fetch_checkpoint().await?; + let offset = checkpoint + .position_for_partition(&partition_id) + .map(|position| { + position + .as_usize() + .context("file offset should be stored as usize") + }) + .transpose()? + .unwrap_or(0); + + let reader = + DocFileReader::from_path(&source_runtime.storage_resolver, filepath, offset).await?; + + Ok(FileSource { + source_id: source_runtime.source_id().to_string(), + reader, + partition_id: Some(partition_id), + counters: FileSourceCounters { + offset: offset as u64, + ..Default::default() + }, + }) } }