Skip to content

Commit

Permalink
Revert "Revert "feat: Add option to use rocksdb record store (#2074)" (
Browse files Browse the repository at this point in the history
…#2082)"

This reverts commit 66b7e67.
  • Loading branch information
chubei authored Sep 25, 2023
1 parent c2f278b commit 2532585
Show file tree
Hide file tree
Showing 24 changed files with 1,024 additions and 610 deletions.
52 changes: 51 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use dozer_api::grpc::internal::internal_pipeline_server::LogEndpoint;
use dozer_cache::dozer_log::camino::Utf8Path;
use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir};
use dozer_cache::dozer_log::replication::Log;
use dozer_core::checkpoint::OptionCheckpoint;
use dozer_core::checkpoint::{CheckpointOptions, OptionCheckpoint};
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::models::app_config::DataStorage;
use dozer_types::models::flags::Flags;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -47,7 +46,7 @@ impl<'a> Executor<'a> {
sources: &'a [Source],
sql: Option<&'a str>,
api_endpoints: &'a [ApiEndpoint],
storage_config: DataStorage,
checkpoint_options: CheckpointOptions,
labels: LabelsAndProgress,
udfs: &'a [UdfConfig],
) -> Result<Executor<'a>, OrchestrationError> {
Expand All @@ -59,7 +58,7 @@ impl<'a> Executor<'a> {

// Load pipeline checkpoint.
let checkpoint =
OptionCheckpoint::new(build_path.data_dir.to_string(), storage_config).await?;
OptionCheckpoint::new(build_path.data_dir.to_string(), checkpoint_options).await?;

let mut endpoint_and_logs = vec![];
for endpoint in api_endpoints {
Expand Down
7 changes: 5 additions & 2 deletions dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use crate::pipeline::PipelineBuilder;
use crate::shutdown::ShutdownReceiver;
use crate::simple::build;
use crate::simple::helper::validate_config;
use crate::utils::{get_cache_manager_options, get_default_max_num_records, get_executor_options};
use crate::utils::{
get_cache_manager_options, get_checkpoint_options, get_default_max_num_records,
get_executor_options,
};

use crate::{flatten_join_handle, join_handle_map_err};
use dozer_api::auth::{Access, Authorizer};
Expand Down Expand Up @@ -226,7 +229,7 @@ impl SimpleOrchestrator {
&self.config.sources,
self.config.sql.as_deref(),
&self.config.endpoints,
self.config.app.data_storage.clone(),
get_checkpoint_options(&self.config),
self.labels.clone(),
&self.config.udfs,
))?;
Expand Down
12 changes: 11 additions & 1 deletion dozer-cli/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use dozer_cache::cache::CacheManagerOptions;
use dozer_core::{
checkpoint::CheckpointFactoryOptions, epoch::EpochManagerOptions, executor::ExecutorOptions,
checkpoint::{CheckpointFactoryOptions, CheckpointOptions},
epoch::EpochManagerOptions,
executor::ExecutorOptions,
};
use dozer_types::{
constants::DEFAULT_DEFAULT_MAX_NUM_RECORDS,
Expand Down Expand Up @@ -62,6 +64,14 @@ fn get_max_interval_before_persist_in_seconds(config: &Config) -> u64 {
.unwrap_or_else(default_max_interval_before_persist_in_seconds)
}

pub fn get_checkpoint_options(config: &Config) -> CheckpointOptions {
let app = &config.app;
CheckpointOptions {
data_storage: app.data_storage.clone(),
record_store: app.record_store,
}
}

fn get_checkpoint_factory_options(config: &Config) -> CheckpointFactoryOptions {
CheckpointFactoryOptions {
persist_queue_capacity: config
Expand Down
24 changes: 16 additions & 8 deletions dozer-core/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer,
use dozer_types::{
bincode,
log::{error, info},
models::app_config::DataStorage,
models::app_config::{DataStorage, RecordStore},
node::{NodeHandle, OpIdentifier, SourceStates, TableState},
parking_lot::Mutex,
serde::{Deserialize, Serialize},
Expand Down Expand Up @@ -58,14 +58,21 @@ pub struct OptionCheckpoint {
checkpoint: Option<Checkpoint>,
}

#[derive(Debug, Clone, Default)]
pub struct CheckpointOptions {
pub data_storage: DataStorage,
pub record_store: RecordStore,
}

impl OptionCheckpoint {
pub async fn new(
checkpoint_dir: String,
storage_config: DataStorage,
options: CheckpointOptions,
) -> Result<Self, ExecutionError> {
let (storage, prefix) =
create_data_storage(storage_config, checkpoint_dir.to_string()).await?;
let (record_store, checkpoint) = read_record_store_slices(&*storage, &prefix).await?;
create_data_storage(options.data_storage, checkpoint_dir.to_string()).await?;
let (record_store, checkpoint) =
read_record_store_slices(&*storage, &prefix, options.record_store).await?;
if let Some(checkpoint) = &checkpoint {
info!(
"Restored record store from {}th checkpoint, last epoch id is {}, processor states are stored in {}",
Expand Down Expand Up @@ -289,8 +296,9 @@ impl Drop for CheckpointWriter {
async fn read_record_store_slices(
storage: &dyn Storage,
factory_prefix: &str,
record_store: RecordStore,
) -> Result<(ProcessorRecordStoreDeserializer, Option<Checkpoint>), ExecutionError> {
let record_store = ProcessorRecordStoreDeserializer::new()?;
let record_store = ProcessorRecordStoreDeserializer::new(record_store)?;
let record_store_prefix = record_store_prefix(factory_prefix);

let mut last_checkpoint: Option<Checkpoint> = None;
Expand Down Expand Up @@ -358,7 +366,7 @@ async fn read_record_store_slices(
pub async fn create_checkpoint_for_test() -> (TempDir, OptionCheckpoint) {
let temp_dir = TempDir::new("create_checkpoint_for_test").unwrap();
let checkpoint_dir = temp_dir.path().to_str().unwrap().to_string();
let checkpoint = OptionCheckpoint::new(checkpoint_dir.clone(), DataStorage::Local)
let checkpoint = OptionCheckpoint::new(checkpoint_dir.clone(), Default::default())
.await
.unwrap();
(temp_dir, checkpoint)
Expand All @@ -371,7 +379,7 @@ pub async fn create_checkpoint_factory_for_test(
// Create empty checkpoint storage.
let temp_dir = TempDir::new("create_checkpoint_factory_for_test").unwrap();
let checkpoint_dir = temp_dir.path().to_str().unwrap().to_string();
let checkpoint = OptionCheckpoint::new(checkpoint_dir.clone(), DataStorage::Local)
let checkpoint = OptionCheckpoint::new(checkpoint_dir.clone(), Default::default())
.await
.unwrap();
let (checkpoint_factory, handle) = CheckpointFactory::new(checkpoint, Default::default())
Expand Down Expand Up @@ -404,7 +412,7 @@ pub async fn create_checkpoint_factory_for_test(
handle.await.unwrap();

// Create a new factory that loads from the checkpoint.
let checkpoint = OptionCheckpoint::new(checkpoint_dir, DataStorage::Local)
let checkpoint = OptionCheckpoint::new(checkpoint_dir, Default::default())
.await
.unwrap();
let last_checkpoint = checkpoint.checkpoint.as_ref().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/executor/receiver_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ mod tests {
#[test]
fn receiver_loop_forwards_op() {
let (mut test_loop, senders) = TestReceiverLoop::new(2);
let record_store = ProcessorRecordStore::new().unwrap();
let record_store = ProcessorRecordStore::new(Default::default()).unwrap();
let record: ProcessorRecord = record_store
.create_record(&Record::new(vec![Field::Int(1)]))
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions dozer-recordstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ fuzz = ["dozer-types/arbitrary"]

[dependencies]
dozer-types = { path = "../dozer-types" }
dozer-storage = { path = "../dozer-storage" }
slice-dst = { version = "1.5.1", default-features = false }
tempdir = "0.3.7"
Loading

0 comments on commit 2532585

Please sign in to comment.