Skip to content

Commit

Permalink
Load checkpoint lazily in sources
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Apr 26, 2024
1 parent 22fae74 commit 686ea05
Show file tree
Hide file tree
Showing 16 changed files with 710 additions and 762 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ services:
- "${MAP_HOST_PULSAR:-127.0.0.1}:6650:6650"
- "${MAP_HOST_PULSAR:-127.0.0.1}:8081:8080"
environment:
PULSAR_MEM: "-Xms256M -Xmx256M"
PULSAR_MEM: "-Xms384M -Xmx384M"
profiles:
- all
- pulsar
Expand Down
235 changes: 118 additions & 117 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs

Large diffs are not rendered by default.

32 changes: 17 additions & 15 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ mod tests {
// Test `spawn_pipeline`.
let source_config_0 = SourceConfig {
source_id: "test-indexing-service--source-0".to_string(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params: SourceParams::void(),
transform_config: None,
Expand Down Expand Up @@ -1050,19 +1050,9 @@ mod tests {
let index_uri = format!("ram:///indexes/{index_id}");
let index_config = IndexConfig::for_test(&index_id, &index_uri);

let create_index_request =
CreateIndexRequest::try_from_index_config(&index_config).unwrap();
metastore.create_index(create_index_request).await.unwrap();

let universe = Universe::new();
let temp_dir = tempfile::tempdir().unwrap();
let (indexing_service, indexing_server_handle) =
spawn_indexing_service_for_test(temp_dir.path(), &universe, metastore, cluster).await;

// Test `supervise_pipelines`
let source_config = SourceConfig {
source_id: "test-indexing-service--source".to_string(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params: SourceParams::Vec(VecSourceParams {
docs: Vec::new(),
Expand All @@ -1072,6 +1062,18 @@ mod tests {
transform_config: None,
input_format: SourceInputFormat::Json,
};
let create_index_request = CreateIndexRequest::try_from_index_and_source_configs(
&index_config,
&[source_config.clone()],
)
.unwrap();
metastore.create_index(create_index_request).await.unwrap();

let universe = Universe::new();
let temp_dir = tempfile::tempdir().unwrap();
let (indexing_service, indexing_server_handle) =
spawn_indexing_service_for_test(temp_dir.path(), &universe, metastore, cluster).await;

indexing_service
.ask_for_res(SpawnPipeline {
index_id: index_id.clone(),
Expand Down Expand Up @@ -1132,7 +1134,7 @@ mod tests {
// Test `apply plan`.
let source_config_1 = SourceConfig {
source_id: "test-indexing-service--source-1".to_string(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params: SourceParams::void(),
transform_config: None,
Expand Down Expand Up @@ -1337,7 +1339,7 @@ mod tests {

let source_config = SourceConfig {
source_id: "test-indexing-service--source".to_string(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params: SourceParams::void(),
transform_config: None,
Expand Down Expand Up @@ -1466,7 +1468,7 @@ mod tests {
let mut index_metadata = IndexMetadata::for_test(&index_id, &index_uri);
let source_config = SourceConfig {
source_id: "test-indexing-service--source".to_string(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params: SourceParams::void(),
transform_config: None,
Expand Down
103 changes: 40 additions & 63 deletions quickwit/quickwit-indexing/src/source/file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::borrow::Borrow;
use std::ffi::OsStr;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use std::{fmt, io};

Expand All @@ -30,7 +30,7 @@ use bytes::Bytes;
use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_common::uri::Uri;
use quickwit_config::FileSourceParams;
use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint};
use quickwit_metastore::checkpoint::PartitionId;
use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::Position;
use serde::Serialize;
Expand All @@ -39,7 +39,7 @@ use tracing::info;

use super::BatchBuilder;
use crate::actors::DocProcessor;
use crate::source::{Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory};
use crate::source::{Source, SourceContext, SourceRuntime, TypedSourceFactory};

/// Number of bytes after which a new batch is cut.
pub(crate) const BATCH_NUM_BYTES_LIMIT: u64 = 500_000u64;
Expand Down Expand Up @@ -113,15 +113,15 @@ impl Source for FileSource {
.await?;
}
if reached_eof {
info!("EOF");
info!("reached end of file");
ctx.send_exit_with_success(doc_processor_mailbox).await?;
return Err(ActorExitStatus::Success);
}
Ok(Duration::default())
}

fn name(&self) -> String {
format!("FileSource{{source_id={}}}", self.source_id)
format!("{:?}", self)
}

fn observable_state(&self) -> serde_json::Value {
Expand All @@ -136,15 +136,15 @@ impl TypedSourceFactory for FileSourceFactory {
type Source = FileSource;
type Params = FileSourceParams;

// TODO handle checkpoint for files.
async fn typed_create_source(
ctx: Arc<SourceRuntimeArgs>,
source_runtime: SourceRuntime,
params: FileSourceParams,
checkpoint: SourceCheckpoint,
) -> anyhow::Result<FileSource> {
let checkpoint = source_runtime.fetch_checkpoint().await?;
let mut offset = 0;

let reader: FileSourceReader = if let Some(filepath) = &params.filepath {
let partition_id = PartitionId::from(filepath.to_string_lossy().to_string());
let partition_id = PartitionId::from(filepath.to_string_lossy().borrow());
offset = checkpoint
.position_for_partition(&partition_id)
.map(|position| {
Expand All @@ -154,7 +154,7 @@ impl TypedSourceFactory for FileSourceFactory {
})
.unwrap_or(0);
let (dir_uri, file_name) = dir_and_filename(filepath)?;
let storage = ctx.storage_resolver.resolve(&dir_uri).await?;
let storage = source_runtime.storage_resolver.resolve(&dir_uri).await?;
let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap();
// If it's a gzip file, we can't seek to a specific offset, we need to start from the
// beginning of the file, decompress and skip the first `offset` bytes.
Expand All @@ -172,7 +172,7 @@ impl TypedSourceFactory for FileSourceFactory {
FileSourceReader::new(Box::new(tokio::io::stdin()), 0)
};
let file_source = FileSource {
source_id: ctx.source_id().to_string(),
source_id: source_runtime.source_id().to_string(),
counters: FileSourceCounters {
previous_offset: offset as u64,
current_offset: offset as u64,
Expand Down Expand Up @@ -239,17 +239,16 @@ pub(crate) fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)>
mod tests {
use std::io::{Cursor, Write};
use std::num::NonZeroUsize;
use std::path::PathBuf;

use async_compression::tokio::write::GzipEncoder;
use quickwit_actors::{Command, Universe};
use quickwit_config::{SourceConfig, SourceInputFormat, SourceParams};
use quickwit_metastore::checkpoint::{SourceCheckpoint, SourceCheckpointDelta};
use quickwit_metastore::metastore_for_test;
use quickwit_metastore::checkpoint::SourceCheckpointDelta;
use quickwit_proto::types::IndexUid;

use super::*;
use crate::models::RawDocBatch;
use crate::source::tests::SourceRuntimeBuilder;
use crate::source::SourceActor;

#[tokio::test]
Expand All @@ -274,19 +273,11 @@ mod tests {
transform_config: None,
input_format: SourceInputFormat::Json,
};
let metastore = metastore_for_test();
let file_source = FileSourceFactory::typed_create_source(
SourceRuntimeArgs::for_test(
IndexUid::new_with_random_ulid("test-index"),
source_config,
metastore,
PathBuf::from("./queues"),
),
params,
SourceCheckpoint::default(),
)
.await
.unwrap();
let index_uid = IndexUid::new_with_random_ulid("test-index");
let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build();
let file_source = FileSourceFactory::typed_create_source(source_runtime, params)
.await
.unwrap();
let file_source_actor = SourceActor {
source: Box::new(file_source),
doc_processor_mailbox,
Expand Down Expand Up @@ -356,21 +347,13 @@ mod tests {
transform_config: None,
input_format: SourceInputFormat::Json,
};
let metastore = metastore_for_test();
let source = FileSourceFactory::typed_create_source(
SourceRuntimeArgs::for_test(
IndexUid::new_with_random_ulid("test-index"),
source_config,
metastore,
PathBuf::from("./queues"),
),
params,
SourceCheckpoint::default(),
)
.await
.unwrap();
let index_uid = IndexUid::new_with_random_ulid("test-index");
let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build();
let file_source = FileSourceFactory::typed_create_source(source_runtime, params)
.await
.unwrap();
let file_source_actor = SourceActor {
source: Box::new(source),
source: Box::new(file_source),
doc_processor_mailbox,
};
let (_file_source_mailbox, file_source_handle) =
Expand Down Expand Up @@ -446,16 +429,6 @@ mod tests {
temp_file.flush().unwrap();

let params = FileSourceParams::file(&temp_file_path);
let mut checkpoint = SourceCheckpoint::default();
let partition_id = PartitionId::from(temp_file_path.to_string_lossy().to_string());
let checkpoint_delta = SourceCheckpointDelta::from_partition_delta(
partition_id,
Position::offset(0u64),
Position::offset(4u64),
)
.unwrap();
checkpoint.try_apply_delta(checkpoint_delta).unwrap();

let source_config = SourceConfig {
source_id: "test-file-source".to_string(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
Expand All @@ -464,21 +437,25 @@ mod tests {
transform_config: None,
input_format: SourceInputFormat::Json,
};
let metastore = metastore_for_test();
let source = FileSourceFactory::typed_create_source(
SourceRuntimeArgs::for_test(
IndexUid::new_with_random_ulid("test-index"),
source_config,
metastore,
PathBuf::from("./queues"),
),
params,
checkpoint,
let partition_id = PartitionId::from(temp_file_path.to_string_lossy().borrow());
let source_checkpoint_delta = SourceCheckpointDelta::from_partition_delta(
partition_id,
Position::Beginning,
Position::offset(4u64),
)
.await
.unwrap();

let index_uid = IndexUid::new_with_random_ulid("test-index");
let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config)
.with_mock_metastore(Some(source_checkpoint_delta))
.with_queues_dir(temp_file_path)
.build();

let file_source = FileSourceFactory::typed_create_source(source_runtime, params)
.await
.unwrap();
let file_source_actor = SourceActor {
source: Box::new(source),
source: Box::new(file_source),
doc_processor_mailbox,
};
let (_file_source_mailbox, file_source_handle) =
Expand Down
Loading

0 comments on commit 686ea05

Please sign in to comment.