From 1bdde41875b03a47495c8511dbf2913a1f06f595 Mon Sep 17 00:00:00 2001 From: chubei <914745487@qq.com> Date: Fri, 22 Dec 2023 10:47:53 +0800 Subject: [PATCH] refactor: Change `OpIdentifier` to `RestartableState` --- dozer-cli/src/pipeline/connector_source.rs | 4 +- dozer-core/src/builder_dag.rs | 2 +- dozer-core/src/checkpoint/mod.rs | 10 ++-- dozer-core/src/forwarder.rs | 6 +- dozer-core/src/node.rs | 4 +- dozer-core/src/tests/dag_base_errors.rs | 4 +- dozer-core/src/tests/sources.rs | 22 ++++--- dozer-ingestion/connector/src/ingestor.rs | 6 +- dozer-ingestion/connector/src/lib.rs | 8 +-- dozer-ingestion/deltalake/src/reader.rs | 4 +- dozer-ingestion/dozer/src/connector.rs | 27 +++++++-- dozer-ingestion/dozer/src/lib.rs | 3 + dozer-ingestion/ethereum/src/log/sender.rs | 4 +- .../ethereum/src/trace/connector.rs | 2 +- dozer-ingestion/grpc/src/adapter/arrow.rs | 2 +- dozer-ingestion/grpc/src/adapter/default.rs | 2 +- .../javascript/src/js_extension/mod.rs | 2 +- .../kafka/src/debezium/stream_consumer.rs | 8 +-- .../kafka/src/stream_consumer_basic.rs | 4 +- dozer-ingestion/mongodb/src/lib.rs | 4 +- dozer-ingestion/mysql/src/binlog.rs | 2 +- dozer-ingestion/mysql/src/connector.rs | 18 +++--- dozer-ingestion/object-store/src/connector.rs | 2 +- .../object-store/src/table_reader.rs | 2 +- dozer-ingestion/postgres/src/connector.rs | 2 +- dozer-ingestion/postgres/src/replicator.rs | 26 +++++--- dozer-ingestion/postgres/src/snapshotter.rs | 2 +- dozer-ingestion/postgres/src/xlog_mapper.rs | 13 ++-- .../snowflake/src/connector/snowflake.rs | 14 +++-- dozer-ingestion/snowflake/src/lib.rs | 3 + .../snowflake/src/stream_consumer.rs | 21 ++++++- dozer-ingestion/tests/test_suite/basic.rs | 34 +---------- dozer-sql/src/tests/builder_test.rs | 5 +- dozer-tests/src/sql_tests/helper/pipeline.rs | 2 +- dozer-types/src/models/ingestion_types.rs | 6 +- dozer-types/src/node.rs | 59 +++---------------- 36 files changed, 155 insertions(+), 184 deletions(-) diff --git a/dozer-cli/src/pipeline/connector_source.rs b/dozer-cli/src/pipeline/connector_source.rs index 7b2c9d0656..5fd3a3194d 100644 --- a/dozer-cli/src/pipeline/connector_source.rs +++ b/dozer-cli/src/pipeline/connector_source.rs @@ -262,12 +262,12 @@ impl Source for ConnectorSource { .iter() .zip(&self.ports) .map(|(table, port)| { - let checkpoint = last_checkpoint.get(port).copied().flatten(); + let state = last_checkpoint.get(port).cloned().flatten(); TableToIngest { schema: table.schema.clone(), name: table.name.clone(), column_names: table.column_names.clone(), - checkpoint, + state, } }) .collect::>(); diff --git a/dozer-core/src/builder_dag.rs b/dozer-core/src/builder_dag.rs index 0ef79afa33..a4d1d82b19 100644 --- a/dozer-core/src/builder_dag.rs +++ b/dozer-core/src/builder_dag.rs @@ -81,7 +81,7 @@ impl BuilderDag { last_checkpoint_by_name .as_mut() .and_then(|last_checkpoint| { - last_checkpoint.remove(&port_name).flatten() + last_checkpoint.remove(&port_name).flatten().cloned() }), ); } diff --git a/dozer-core/src/checkpoint/mod.rs b/dozer-core/src/checkpoint/mod.rs index 6d770fb698..be8b5e9731 100644 --- a/dozer-core/src/checkpoint/mod.rs +++ b/dozer-core/src/checkpoint/mod.rs @@ -11,7 +11,7 @@ use dozer_types::{ bincode, log::{error, info}, models::app_config::{DataStorage, RecordStore}, - node::{NodeHandle, OpIdentifier, SourceStates, TableState}, + node::{NodeHandle, RestartableState, SourceStates, TableState}, parking_lot::Mutex, types::Field, }; @@ -116,7 +116,7 @@ impl OptionCheckpoint { pub fn get_source_state( &self, node_handle: &NodeHandle, - ) -> Result>>, ExecutionError> { + ) -> Result>>, ExecutionError> { let Some(checkpoint) = self.checkpoint.as_ref() else { return Ok(None); }; @@ -126,7 +126,7 @@ impl OptionCheckpoint { let mut result = HashMap::new(); for (table_name, state) in source_state { - let id = match state { + let state = match state { TableState::NotStarted => None, TableState::NonRestartable => { return Err(ExecutionError::SourceCannotRestart { @@ -134,9 +134,9 @@ impl OptionCheckpoint { table_name: table_name.clone(), }); } - TableState::Restartable(id) => Some(*id), + TableState::Restartable(state) => Some(state), }; - result.insert(table_name.clone(), id); + result.insert(table_name.clone(), state); } Ok(Some(result)) } diff --git a/dozer-core/src/forwarder.rs b/dozer-core/src/forwarder.rs index 200787b9c2..26c8b5c6e3 100644 --- a/dozer-core/src/forwarder.rs +++ b/dozer-core/src/forwarder.rs @@ -222,12 +222,12 @@ impl SourceChannelManager { request_termination: bool, ) -> Result { match message { - IngestionMessage::OperationEvent { op, id, .. } => { + IngestionMessage::OperationEvent { op, state, .. } => { let port_name = self.port_names[&port].clone(); self.current_op_ids.insert( port_name, - if let Some(id) = id { - TableState::Restartable(id) + if let Some(state) = state { + TableState::Restartable(state) } else { TableState::NonRestartable }, diff --git a/dozer-core/src/node.rs b/dozer-core/src/node.rs index 7c9510fd08..4f434df51b 100644 --- a/dozer-core/src/node.rs +++ b/dozer-core/src/node.rs @@ -5,7 +5,7 @@ use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer}; use dozer_log::storage::{Object, Queue}; use dozer_types::errors::internal::BoxedError; -use dozer_types::node::OpIdentifier; +use dozer_types::node::RestartableState; use dozer_types::serde::{Deserialize, Serialize}; use dozer_types::tonic::async_trait; use dozer_types::types::Schema; @@ -54,7 +54,7 @@ pub trait SourceFactory: Send + Sync + Debug { ) -> Result, BoxedError>; } -pub type SourceState = HashMap>; +pub type SourceState = HashMap>; pub trait Source: Send + Sync + Debug { fn start( diff --git a/dozer-core/src/tests/dag_base_errors.rs b/dozer-core/src/tests/dag_base_errors.rs index 7fae7b9303..ff3c4ae631 100644 --- a/dozer-core/src/tests/dag_base_errors.rs +++ b/dozer-core/src/tests/dag_base_errors.rs @@ -16,7 +16,7 @@ use dozer_log::tokio; use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer}; use dozer_types::errors::internal::BoxedError; use dozer_types::models::ingestion_types::IngestionMessage; -use dozer_types::node::{NodeHandle, OpIdentifier}; +use dozer_types::node::NodeHandle; use dozer_types::tonic::async_trait; use dozer_types::types::{ Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, @@ -378,7 +378,7 @@ impl Source for ErrGeneratorSource { Field::String(format!("value_{n}")), ]), }, - id: Some(OpIdentifier::new(n, 0)), + state: Some(n.to_be_bytes().to_vec().into()), }, GENERATOR_SOURCE_OUTPUT_PORT, )?; diff --git a/dozer-core/src/tests/sources.rs b/dozer-core/src/tests/sources.rs index 70ab205649..449ef66507 100644 --- a/dozer-core/src/tests/sources.rs +++ b/dozer-core/src/tests/sources.rs @@ -3,7 +3,6 @@ use crate::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFacto use crate::DEFAULT_PORT_HANDLE; use dozer_types::errors::internal::BoxedError; use dozer_types::models::ingestion_types::IngestionMessage; -use dozer_types::node::OpIdentifier; use dozer_types::types::{ Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, }; @@ -96,15 +95,14 @@ impl Source for GeneratorSource { fw: &mut dyn SourceChannelForwarder, last_checkpoint: SourceState, ) -> Result<(), BoxedError> { - let start = last_checkpoint - .values() - .copied() - .next() - .flatten() - .unwrap_or(OpIdentifier::new(0, 0)) - .txid; + let state = last_checkpoint.values().next().and_then(|state| { + state + .as_ref() + .map(|state| u64::from_be_bytes(state.0.as_slice().try_into().unwrap())) + }); + let start = state.map(|state| state + 1).unwrap_or(0); - for n in start + 1..(start + self.count + 1) { + for n in start..(start + self.count) { fw.send( IngestionMessage::OperationEvent { table_index: 0, @@ -114,7 +112,7 @@ impl Source for GeneratorSource { Field::String(format!("value_{n}")), ]), }, - id: Some(OpIdentifier::new(n, 0)), + state: Some(n.to_be_bytes().to_vec().into()), }, GENERATOR_SOURCE_OUTPUT_PORT, )?; @@ -237,7 +235,7 @@ impl Source for DualPortGeneratorSource { Field::String(format!("value_{n}")), ]), }, - id: Some(OpIdentifier::new(n, 0)), + state: Some(n.to_be_bytes().to_vec().into()), }, DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1, )?; @@ -250,7 +248,7 @@ impl Source for DualPortGeneratorSource { Field::String(format!("value_{n}")), ]), }, - id: Some(OpIdentifier::new(n, 0)), + state: Some(n.to_be_bytes().to_vec().into()), }, DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_2, )?; diff --git a/dozer-ingestion/connector/src/ingestor.rs b/dozer-ingestion/connector/src/ingestor.rs index a01088d16f..140fd240cb 100644 --- a/dozer-ingestion/connector/src/ingestor.rs +++ b/dozer-ingestion/connector/src/ingestor.rs @@ -96,7 +96,7 @@ mod tests { .handle_message(IngestionMessage::OperationEvent { table_index: 0, op: operation.clone(), - id: None, + state: None, }) .await .unwrap(); @@ -104,7 +104,7 @@ mod tests { .handle_message(IngestionMessage::OperationEvent { table_index: 0, op: operation2.clone(), - id: None, + state: None, }) .await .unwrap(); @@ -121,7 +121,7 @@ mod tests { IngestionMessage::OperationEvent { table_index: 0, op, - id: None + state: None }, msg ); diff --git a/dozer-ingestion/connector/src/lib.rs b/dozer-ingestion/connector/src/lib.rs index a1015e847b..71256f6e9b 100644 --- a/dozer-ingestion/connector/src/lib.rs +++ b/dozer-ingestion/connector/src/lib.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use dozer_types::errors::internal::BoxedError; -use dozer_types::node::OpIdentifier; +use dozer_types::node::RestartableState; use dozer_types::serde; use dozer_types::serde::{Deserialize, Serialize}; pub use dozer_types::tonic::async_trait; @@ -146,8 +146,8 @@ pub struct TableToIngest { pub name: String, /// The column names to be mapped. pub column_names: Vec, - /// The checkpoint to start after. - pub checkpoint: Option, + /// The state to restart after. + pub state: Option, } impl TableToIngest { @@ -156,7 +156,7 @@ impl TableToIngest { schema: table_info.schema, name: table_info.name, column_names: table_info.column_names, - checkpoint: None, + state: None, } } } diff --git a/dozer-ingestion/deltalake/src/reader.rs b/dozer-ingestion/deltalake/src/reader.rs index 7a9f8d6dc8..3e25e91b1e 100644 --- a/dozer-ingestion/deltalake/src/reader.rs +++ b/dozer-ingestion/deltalake/src/reader.rs @@ -40,7 +40,7 @@ impl DeltaLakeReader { table: &TableToIngest, ingestor: &Ingestor, ) -> Result<(), BoxedError> { - assert!(table.checkpoint.is_none()); + assert!(table.state.is_none()); let table_path = table_path(&self.config, &table.name)?; let ctx = SessionContext::new(); @@ -75,7 +75,7 @@ impl DeltaLakeReader { lifetime: None, }, }, - id: None, + state: None, }) .await .unwrap(); diff --git a/dozer-ingestion/dozer/src/connector.rs b/dozer-ingestion/dozer/src/connector.rs index 7a4678c68a..299bde40e5 100644 --- a/dozer-ingestion/dozer/src/connector.rs +++ b/dozer-ingestion/dozer/src/connector.rs @@ -12,7 +12,7 @@ use dozer_ingestion_connector::{ default_buffer_size, default_log_batch_size, default_timeout, IngestionMessage, NestedDozerConfig, NestedDozerLogOptions, }, - node::OpIdentifier, + node::RestartableState, serde_json, tonic::{async_trait, transport::Channel}, types::{FieldType, Operation, Record, Schema}, @@ -208,10 +208,11 @@ async fn read_table( reader_builder: LogReaderBuilder, sender: Sender, ) -> Result<(), NestedDozerConnectorError> { - let starting_point = table_info - .checkpoint - .map(|checkpoint| checkpoint.seq_in_tx + 1) - .unwrap_or(0); + let state = table_info + .state + .map(|state| decode_state(&state)) + .transpose()?; + let starting_point = state.map(|pos| pos + 1).unwrap_or(0); let mut reader = reader_builder.build(starting_point); let schema = reader.schema.schema.clone(); let map = SchemaMapper::new(schema, &table_info.column_names)?; @@ -243,12 +244,26 @@ async fn read_table( .send(IngestionMessage::OperationEvent { table_index, op, - id: Some(OpIdentifier::new(0, op_and_pos.pos)), + state: Some(encode_state(op_and_pos.pos)), }) .await; } } +fn encode_state(pos: u64) -> RestartableState { + pos.to_be_bytes().to_vec().into() +} + +fn decode_state(state: &RestartableState) -> Result { + Ok(u64::from_be_bytes( + state + .0 + .as_slice() + .try_into() + .map_err(|_| NestedDozerConnectorError::CorruptedState)?, + )) +} + struct SchemaMapper { source_schema: Schema, fields: Vec, diff --git a/dozer-ingestion/dozer/src/lib.rs b/dozer-ingestion/dozer/src/lib.rs index 5620508c91..14a6fe6834 100644 --- a/dozer-ingestion/dozer/src/lib.rs +++ b/dozer-ingestion/dozer/src/lib.rs @@ -8,6 +8,9 @@ use dozer_log::errors::{ReaderBuilderError, ReaderError}; #[derive(Error, Debug)] enum NestedDozerConnectorError { + #[error("Failed to parse checkpoint state")] + CorruptedState, + #[error("Failed to connect to upstream dozer at {0}: {1:?}")] ConnectionError(String, #[source] dozer_types::tonic::transport::Error), diff --git a/dozer-ingestion/ethereum/src/log/sender.rs b/dozer-ingestion/ethereum/src/log/sender.rs index 2e28270d69..b2de12777b 100644 --- a/dozer-ingestion/ethereum/src/log/sender.rs +++ b/dozer-ingestion/ethereum/src/log/sender.rs @@ -222,7 +222,7 @@ async fn process_log(details: Arc>, msg: Log) { .handle_message(IngestionMessage::OperationEvent { table_index, op, - id: None, + state: None, }) .await .is_err() @@ -245,7 +245,7 @@ async fn process_log(details: Arc>, msg: Log) { .handle_message(IngestionMessage::OperationEvent { table_index, op, - id: None, + state: None, }) .await; } else { diff --git a/dozer-ingestion/ethereum/src/trace/connector.rs b/dozer-ingestion/ethereum/src/trace/connector.rs index 63fd7c7b4e..9ca8525788 100644 --- a/dozer-ingestion/ethereum/src/trace/connector.rs +++ b/dozer-ingestion/ethereum/src/trace/connector.rs @@ -155,7 +155,7 @@ pub async fn run( .handle_message(IngestionMessage::OperationEvent { table_index: 0, // We have only one table op, - id: None, + state: None, }) .await .is_err() diff --git a/dozer-ingestion/grpc/src/adapter/arrow.rs b/dozer-ingestion/grpc/src/adapter/arrow.rs index 72ff83f150..a42f6b7301 100644 --- a/dozer-ingestion/grpc/src/adapter/arrow.rs +++ b/dozer-ingestion/grpc/src/adapter/arrow.rs @@ -115,7 +115,7 @@ pub async fn handle_message( .handle_message(IngestionMessage::OperationEvent { table_index, op, - id: None, + state: None, }) .await .is_err() diff --git a/dozer-ingestion/grpc/src/adapter/default.rs b/dozer-ingestion/grpc/src/adapter/default.rs index 1614ffb79b..8bbc1092b5 100644 --- a/dozer-ingestion/grpc/src/adapter/default.rs +++ b/dozer-ingestion/grpc/src/adapter/default.rs @@ -88,7 +88,7 @@ pub async fn handle_message( .handle_message(IngestionMessage::OperationEvent { table_index, op, - id: None, + state: None, }) .await; Ok(()) diff --git a/dozer-ingestion/javascript/src/js_extension/mod.rs b/dozer-ingestion/javascript/src/js_extension/mod.rs index 18ee1a9ee6..e9bd5f5535 100644 --- a/dozer-ingestion/javascript/src/js_extension/mod.rs +++ b/dozer-ingestion/javascript/src/js_extension/mod.rs @@ -120,7 +120,7 @@ async fn send(ingestor: Ingestor, val: JsMessage) -> Result<(), Error> { IngestionMessage::OperationEvent { table_index: 0, op, - id: None, + state: None, } } }; diff --git a/dozer-ingestion/kafka/src/debezium/stream_consumer.rs b/dozer-ingestion/kafka/src/debezium/stream_consumer.rs index ba907de845..1429436910 100644 --- a/dozer-ingestion/kafka/src/debezium/stream_consumer.rs +++ b/dozer-ingestion/kafka/src/debezium/stream_consumer.rs @@ -94,7 +94,7 @@ impl StreamConsumer for DebeziumStreamConsumer { let topics: Vec<&str> = tables .iter() .map(|t| { - assert!(t.checkpoint.is_none()); + assert!(t.state.is_none()); t.name.as_str() }) .collect(); @@ -154,7 +154,7 @@ impl StreamConsumer for DebeziumStreamConsumer { lifetime: None, }, }, - id: None, + state: None, }) .await .is_err() @@ -176,7 +176,7 @@ impl StreamConsumer for DebeziumStreamConsumer { lifetime: None, }, }, - id: None, + state: None, }) .await .is_err() @@ -198,7 +198,7 @@ impl StreamConsumer for DebeziumStreamConsumer { lifetime: None, }, }, - id: None, + state: None, }) .await .is_err() diff --git a/dozer-ingestion/kafka/src/stream_consumer_basic.rs b/dozer-ingestion/kafka/src/stream_consumer_basic.rs index 34c28a5ddf..f72f8db69d 100644 --- a/dozer-ingestion/kafka/src/stream_consumer_basic.rs +++ b/dozer-ingestion/kafka/src/stream_consumer_basic.rs @@ -88,7 +88,7 @@ impl StreamConsumer for StreamConsumerBasic { let mut schemas = HashMap::new(); for (table_index, table) in tables.into_iter().enumerate() { - assert!(table.checkpoint.is_none()); + assert!(table.state.is_none()); let schema = if let Some(url) = schema_registry_url { SchemaRegistryBasic::get_single_schema(&table.name, url).await? @@ -158,7 +158,7 @@ impl StreamConsumer for StreamConsumerBasic { lifetime: None, }, }, - id: None, + state: None, }) .await .is_err() diff --git a/dozer-ingestion/mongodb/src/lib.rs b/dozer-ingestion/mongodb/src/lib.rs index f734f0eb85..f2441f5632 100644 --- a/dozer-ingestion/mongodb/src/lib.rs +++ b/dozer-ingestion/mongodb/src/lib.rs @@ -626,7 +626,7 @@ impl Connector for MongodbConnector { .handle_message(IngestionMessage::OperationEvent { table_index, op, - id: None, + state: None, }) .await .is_err() @@ -673,7 +673,7 @@ impl Connector for MongodbConnector { .handle_message(IngestionMessage::OperationEvent { table_index, op, - id: None, + state: None, }) .await .is_err() diff --git a/dozer-ingestion/mysql/src/binlog.rs b/dozer-ingestion/mysql/src/binlog.rs index b8831fe404..f1e4202efb 100644 --- a/dozer-ingestion/mysql/src/binlog.rs +++ b/dozer-ingestion/mysql/src/binlog.rs @@ -459,7 +459,7 @@ impl BinlogIngestor<'_, '_, '_> { .handle_message(IngestionMessage::OperationEvent { table_index: table.def.table_index, op: op?, - id: None, + state: None, }) .await .is_err() diff --git a/dozer-ingestion/mysql/src/connector.rs b/dozer-ingestion/mysql/src/connector.rs index 8324788a39..d33a92e12c 100644 --- a/dozer-ingestion/mysql/src/connector.rs +++ b/dozer-ingestion/mysql/src/connector.rs @@ -196,7 +196,7 @@ impl Connector for MySQLConnector { let table_infos = tables .into_iter() .map(|table| { - assert!(table.checkpoint.is_none()); + assert!(table.state.is_none()); TableInfo { schema: table.schema, name: table.name, @@ -318,7 +318,7 @@ impl MySQLConnector { .handle_message(IngestionMessage::OperationEvent { table_index, op, - id: None, + state: None, }) .await .is_err() @@ -531,7 +531,7 @@ mod tests { Field::Float(1.0.into()), ]), }, - id: None, + state: None, }, IngestionMessage::OperationEvent { table_index: 0, @@ -542,7 +542,7 @@ mod tests { Field::Float(2.0.into()), ]), }, - id: None, + state: None, }, IngestionMessage::OperationEvent { table_index: 0, @@ -553,7 +553,7 @@ mod tests { Field::Float(3.0.into()), ]), }, - id: None, + state: None, }, IngestionMessage::SnapshottingDone, ]; @@ -619,7 +619,7 @@ mod tests { op: Insert { new: Record::new(vec![Field::Int(4), Field::Float(4.0.into())]), }, - id: None, + state: None, }, IngestionMessage::SnapshottingDone, IngestionMessage::SnapshottingStarted, @@ -628,7 +628,7 @@ mod tests { op: Insert { new: Record::new(vec![Field::Int(1), Field::Json(true.into())]), }, - id: None, + state: None, }, IngestionMessage::SnapshottingDone, ]; @@ -648,7 +648,7 @@ mod tests { old: Record::new(vec![Field::Int(4), Field::Float(4.0.into())]), new: Record::new(vec![Field::Int(4), Field::Float(5.0.into())]), }, - id: None, + state: None, }, IngestionMessage::SnapshottingDone, ]; @@ -667,7 +667,7 @@ mod tests { op: Delete { old: Record::new(vec![Field::Int(4), Field::Float(5.0.into())]), }, - id: None, + state: None, }, IngestionMessage::SnapshottingDone, ]; diff --git a/dozer-ingestion/object-store/src/connector.rs b/dozer-ingestion/object-store/src/connector.rs index cdd1ce1092..f51512df0e 100644 --- a/dozer-ingestion/object-store/src/connector.rs +++ b/dozer-ingestion/object-store/src/connector.rs @@ -132,7 +132,7 @@ impl Connector for ObjectStoreConnector { let mut handles = vec![]; for (table_index, table_info) in tables.iter().enumerate() { - assert!(table_info.checkpoint.is_none()); + assert!(table_info.state.is_none()); let table_info = TableInfo { schema: table_info.schema.clone(), name: table_info.name.clone(), diff --git a/dozer-ingestion/object-store/src/table_reader.rs b/dozer-ingestion/object-store/src/table_reader.rs index 846a9c9b5f..d5fa1a94a7 100644 --- a/dozer-ingestion/object-store/src/table_reader.rs +++ b/dozer-ingestion/object-store/src/table_reader.rs @@ -114,7 +114,7 @@ pub async fn read( .send(Ok(Some(IngestionMessage::OperationEvent { table_index, op: evt, - id: None, + state: None, }))) .await .is_err() diff --git a/dozer-ingestion/postgres/src/connector.rs b/dozer-ingestion/postgres/src/connector.rs index ccac425f81..8270a7fc2c 100644 --- a/dozer-ingestion/postgres/src/connector.rs +++ b/dozer-ingestion/postgres/src/connector.rs @@ -186,7 +186,7 @@ impl Connector for PostgresConnector { let tables = tables .into_iter() .map(|table| { - assert!(table.checkpoint.is_none()); + assert!(table.state.is_none()); ListOrFilterColumns { schema: table.schema, name: table.name, diff --git a/dozer-ingestion/postgres/src/replicator.rs b/dozer-ingestion/postgres/src/replicator.rs index 27b5bbbf61..b0a183ec3d 100644 --- a/dozer-ingestion/postgres/src/replicator.rs +++ b/dozer-ingestion/postgres/src/replicator.rs @@ -2,11 +2,12 @@ use dozer_ingestion_connector::dozer_types::bytes; use dozer_ingestion_connector::dozer_types::chrono::{TimeZone, Utc}; use dozer_ingestion_connector::dozer_types::log::{error, info}; use dozer_ingestion_connector::dozer_types::models::ingestion_types::IngestionMessage; -use dozer_ingestion_connector::dozer_types::node::OpIdentifier; +use dozer_ingestion_connector::dozer_types::node::RestartableState; use dozer_ingestion_connector::futures::StreamExt; use dozer_ingestion_connector::Ingestor; use postgres_protocol::message::backend::ReplicationMessage::*; use postgres_protocol::message::backend::{LogicalReplicationMessage, ReplicationMessage}; +use postgres_protocol::Lsn; use postgres_types::PgLsn; use tokio_postgres::Error; @@ -30,9 +31,9 @@ pub struct CDCHandler<'a> { pub slot_name: String, pub start_lsn: PgLsn, - pub begin_lsn: u64, - pub offset_lsn: u64, - pub last_commit_lsn: u64, + pub begin_lsn: Lsn, + pub offset_lsn: Lsn, + pub last_commit_lsn: Lsn, pub offset: u64, pub seq_no: u64, @@ -59,8 +60,8 @@ impl<'a> CDCHandler<'a> { publication_name = self.publication_name ); - self.offset_lsn = u64::from(lsn); - self.last_commit_lsn = u64::from(lsn); + self.offset_lsn = Lsn::from(lsn); + self.last_commit_lsn = Lsn::from(lsn); let mut stream = LogicalReplicationStream::new(client, self.slot_name.clone(), lsn, options) @@ -116,8 +117,8 @@ impl<'a> CDCHandler<'a> { let message = mapper.handle_message(body)?; match message { - Some(MappedReplicationMessage::Commit(commit)) => { - self.last_commit_lsn = commit.txid; + Some(MappedReplicationMessage::Commit(lsn)) => { + self.last_commit_lsn = lsn; } Some(MappedReplicationMessage::Begin) => { self.begin_lsn = lsn; @@ -131,7 +132,7 @@ impl<'a> CDCHandler<'a> { .handle_message(IngestionMessage::OperationEvent { table_index, op, - id: Some(OpIdentifier::new(self.begin_lsn, self.seq_no)), + state: Some(encode_state(self.begin_lsn, self.seq_no)), }) .await .is_err() @@ -155,6 +156,13 @@ impl<'a> CDCHandler<'a> { } } +fn encode_state(lsn: Lsn, seq_no: u64) -> RestartableState { + let mut state = vec![]; + state.extend_from_slice(&lsn.to_be_bytes()); + state.extend_from_slice(&seq_no.to_be_bytes()); + state.into() +} + pub struct LogicalReplicationStream { client: Client, slot_name: String, diff --git a/dozer-ingestion/postgres/src/snapshotter.rs b/dozer-ingestion/postgres/src/snapshotter.rs index 76ff91135d..d06775aa2b 100644 --- a/dozer-ingestion/postgres/src/snapshotter.rs +++ b/dozer-ingestion/postgres/src/snapshotter.rs @@ -136,7 +136,7 @@ impl<'a> PostgresSnapshotter<'a> { .handle_message(IngestionMessage::OperationEvent { table_index, op: evt, - id: None, + state: None, }) .await .is_err() diff --git a/dozer-ingestion/postgres/src/xlog_mapper.rs b/dozer-ingestion/postgres/src/xlog_mapper.rs index 08b0303e6d..5996705270 100644 --- a/dozer-ingestion/postgres/src/xlog_mapper.rs +++ b/dozer-ingestion/postgres/src/xlog_mapper.rs @@ -1,13 +1,11 @@ -use dozer_ingestion_connector::dozer_types::{ - node::OpIdentifier, - types::{Field, Operation, Record}, -}; +use dozer_ingestion_connector::dozer_types::types::{Field, Operation, Record}; use postgres_protocol::message::backend::LogicalReplicationMessage::{ Begin, Commit, Delete, Insert, Relation, Update, }; use postgres_protocol::message::backend::{ LogicalReplicationMessage, RelationBody, ReplicaIdentity, TupleData, UpdateBody, XLogDataBody, }; +use postgres_protocol::Lsn; use postgres_types::Type; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -34,7 +32,7 @@ pub struct TableColumn { #[derive(Debug, Clone)] pub enum MappedReplicationMessage { Begin, - Commit(OpIdentifier), + Commit(Lsn), Operation { table_index: usize, op: Operation }, } @@ -63,10 +61,7 @@ impl XlogMapper { self.ingest_schema(relation)?; } Commit(commit) => { - return Ok(Some(MappedReplicationMessage::Commit(OpIdentifier::new( - commit.end_lsn(), - 0, - )))); + return Ok(Some(MappedReplicationMessage::Commit(commit.end_lsn()))); } Begin(_begin) => { return Ok(Some(MappedReplicationMessage::Begin)); diff --git a/dozer-ingestion/snowflake/src/connector/snowflake.rs b/dozer-ingestion/snowflake/src/connector/snowflake.rs index dde8c76d28..4cda88d0b1 100644 --- a/dozer-ingestion/snowflake/src/connector/snowflake.rs +++ b/dozer-ingestion/snowflake/src/connector/snowflake.rs @@ -4,7 +4,6 @@ use dozer_ingestion_connector::{ errors::internal::BoxedError, log::{info, warn}, models::ingestion_types::{default_snowflake_poll_interval, SnowflakeConfig}, - node::OpIdentifier, types::FieldType, }, tokio, Connector, Ingestor, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, @@ -13,7 +12,9 @@ use dozer_ingestion_connector::{ use odbc::create_environment_v3; use crate::{ - connection::client::Client, schema_helper::SchemaHelper, stream_consumer::StreamConsumer, + connection::client::Client, + schema_helper::SchemaHelper, + stream_consumer::{decode_state, StreamConsumer}, SnowflakeError, SnowflakeStreamError, }; @@ -151,16 +152,17 @@ fn run( for (idx, table) in tables.iter().enumerate() { // We only check stream status on first iteration if iteration == 0 { - match table.checkpoint { - None | Some(OpIdentifier { txid: 0, .. }) => { + let state = table.state.as_ref().map(decode_state).transpose()?; + match state { + None | Some((0, _)) => { info!("[{}][{}] Creating new stream", name, table.name); StreamConsumer::drop_stream(&stream_client, &table.name)?; StreamConsumer::create_stream(&stream_client, &table.name)?; } - Some(OpIdentifier { txid, seq_in_tx }) => { + Some((iteration, index)) => { info!( "[{}][{}] Continuing ingestion from {}/{}", - name, table.name, txid, seq_in_tx + name, table.name, iteration, index ); if let Ok(false) = StreamConsumer::is_stream_created(&stream_client, &table.name) diff --git a/dozer-ingestion/snowflake/src/lib.rs b/dozer-ingestion/snowflake/src/lib.rs index 38f38124c8..4a449e05c0 100644 --- a/dozer-ingestion/snowflake/src/lib.rs +++ b/dozer-ingestion/snowflake/src/lib.rs @@ -17,6 +17,9 @@ mod tests; #[derive(Error, Debug)] pub enum SnowflakeError { + #[error("Failed to parse checkpoint state")] + CorruptedState, + #[error("Snowflake query error")] QueryError(#[source] Box), diff --git a/dozer-ingestion/snowflake/src/stream_consumer.rs b/dozer-ingestion/snowflake/src/stream_consumer.rs index b1b4ed4dfd..3272c1ee26 100644 --- a/dozer-ingestion/snowflake/src/stream_consumer.rs +++ b/dozer-ingestion/snowflake/src/stream_consumer.rs @@ -1,7 +1,7 @@ use dozer_ingestion_connector::{ dozer_types::{ models::ingestion_types::IngestionMessage, - node::OpIdentifier, + node::RestartableState, types::{Field, Operation, Record}, }, Ingestor, @@ -126,7 +126,7 @@ impl StreamConsumer { .blocking_handle_message(IngestionMessage::OperationEvent { table_index, op, - id: Some(OpIdentifier::new(iteration, idx as u64)), + state: Some(encode_state(iteration, idx as u64)), }) .is_err() { @@ -141,3 +141,20 @@ impl StreamConsumer { client.exec(&query) } } + +fn encode_state(iteration: u64, index: u64) -> RestartableState { + let mut state = vec![]; + state.extend_from_slice(&iteration.to_be_bytes()); + state.extend_from_slice(&index.to_be_bytes()); + state.into() +} + +pub fn decode_state(state: &RestartableState) -> Result<(u64, u64), SnowflakeError> { + if state.0.len() != 16 { + return Err(SnowflakeError::CorruptedState); + } + let state = state.0.as_slice(); + let iteration = u64::from_be_bytes(state[0..8].try_into().unwrap()); + let index = u64::from_be_bytes(state[8..16].try_into().unwrap()); + Ok((iteration, index)) +} diff --git a/dozer-ingestion/tests/test_suite/basic.rs b/dozer-ingestion/tests/test_suite/basic.rs index 2fd1430e19..e7bfc3e4ac 100644 --- a/dozer-ingestion/tests/test_suite/basic.rs +++ b/dozer-ingestion/tests/test_suite/basic.rs @@ -40,21 +40,13 @@ pub async fn run_test_suite_basic_data_ready(runtime: let (mut iterator, abort_handle) = spawn_connector(runtime, connector, tables); // Loop over messages until timeout. - let mut last_identifier = None; let mut num_operations = 0; while let Some(message) = iterator.next_timeout(Duration::from_secs(1)).await { // Check message identifier. if let IngestionMessage::OperationEvent { - table_index, - op, - id, + table_index, op, .. } = &message { - if let Some((last_id, id)) = last_identifier.zip(*id) { - assert!(id > last_id); - } - last_identifier = *id; - num_operations += 1; // Check record schema consistency. match op { @@ -147,22 +139,12 @@ pub async fn run_test_suite_basic_insert_only(runtim let mut record_iter = records.iter(); - let mut last_identifier = None; while let Some(message) = iterator.next_timeout(Duration::from_secs(1)).await { // Filter out non-operation events. - let IngestionMessage::OperationEvent { - op: operation, id, .. - } = message - else { + let IngestionMessage::OperationEvent { op: operation, .. } = message else { continue; }; - // Identifier must be increasing. - if let Some((last_id, id)) = last_identifier.zip(id) { - assert!(id > last_id); - } - last_identifier = id; - // Operation must be insert. let Operation::Insert { new: actual_record } = operation else { panic!("Expected an insert event, but got {:?}", operation); @@ -230,23 +212,13 @@ pub async fn run_test_suite_basic_cud(runtime: Arc let (mut iterator, abort_handle) = spawn_connector(runtime, connector, tables); // Check data schema consistency. - let mut last_identifier = None; let mut records = Records::new(actual_primary_index.clone()); while let Some(message) = iterator.next_timeout(Duration::from_secs(1)).await { // Filter out non-operation events. - let IngestionMessage::OperationEvent { - op: operation, id, .. - } = message - else { + let IngestionMessage::OperationEvent { op: operation, .. } = message else { continue; }; - // Identifier must be increasing. - if let Some((last_id, id)) = last_identifier.zip(id) { - assert!(id > last_id); - } - last_identifier = id; - // Record must match schema. match operation { Operation::Insert { new } => { diff --git a/dozer-sql/src/tests/builder_test.rs b/dozer-sql/src/tests/builder_test.rs index b5aa6efdee..5d16aeec14 100644 --- a/dozer-sql/src/tests/builder_test.rs +++ b/dozer-sql/src/tests/builder_test.rs @@ -16,7 +16,6 @@ use dozer_types::chrono::DateTime; use dozer_types::errors::internal::BoxedError; use dozer_types::log::debug; use dozer_types::models::ingestion_types::IngestionMessage; -use dozer_types::node::OpIdentifier; use dozer_types::ordered_float::OrderedFloat; use dozer_types::types::{ Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, @@ -112,7 +111,7 @@ impl Source for TestSource { fw: &mut dyn SourceChannelForwarder, _last_checkpoint: SourceState, ) -> Result<(), BoxedError> { - for n in 0..10 { + for n in 0..10u64 { fw.send( IngestionMessage::OperationEvent { table_index: 0, @@ -126,7 +125,7 @@ impl Source for TestSource { ), ]), }, - id: Some(OpIdentifier::new(n, 0)), + state: Some(n.to_be_bytes().to_vec().into()), }, DEFAULT_PORT_HANDLE, ) diff --git a/dozer-tests/src/sql_tests/helper/pipeline.rs b/dozer-tests/src/sql_tests/helper/pipeline.rs index e332811107..5514d7a02c 100644 --- a/dozer-tests/src/sql_tests/helper/pipeline.rs +++ b/dozer-tests/src/sql_tests/helper/pipeline.rs @@ -133,7 +133,7 @@ impl Source for TestSource { IngestionMessage::OperationEvent { table_index: 0, op, - id: None, + state: None, }, *port, ) diff --git a/dozer-types/src/models/ingestion_types.rs b/dozer-types/src/models/ingestion_types.rs index c0216dd7d0..0ed9eb674b 100644 --- a/dozer-types/src/models/ingestion_types.rs +++ b/dozer-types/src/models/ingestion_types.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use crate::{ helper::{deserialize_duration_secs_f64, f64_schema, serialize_duration_secs_f64}, models::connection::SchemaExample, - node::OpIdentifier, + node::RestartableState, types::Operation, }; @@ -24,8 +24,8 @@ pub enum IngestionMessage { table_index: usize, /// The CDC event. op: Operation, - /// If this connector supports restarting from a specific CDC event, it should provide an identifier. - id: Option, + /// If this connector supports restarting from a specific CDC event, it should provide a `RestartableState`. + state: Option, }, /// A connector uses this message kind to notify Dozer that a initial snapshot of the source tables is started SnapshottingStarted, diff --git a/dozer-types/src/node.rs b/dozer-types/src/node.rs index d7cfdcf40c..5682247148 100644 --- a/dozer-types/src/node.rs +++ b/dozer-types/src/node.rs @@ -63,60 +63,19 @@ impl Display for NodeHandle { } #[derive( - Clone, - Debug, - Copy, - PartialEq, - Eq, - PartialOrd, - Ord, - Hash, - Default, - Serialize, - Deserialize, - bincode::Encode, - bincode::Decode, + Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, bincode::Encode, bincode::Decode, )] -/// A identifier made of two `u64`s. -pub struct OpIdentifier { - /// High 64 bits of the identifier. - pub txid: u64, - /// Low 64 bits of the identifier. - pub seq_in_tx: u64, -} - -impl OpIdentifier { - pub fn new(txid: u64, seq_in_tx: u64) -> Self { - Self { txid, seq_in_tx } - } - - pub fn to_bytes(&self) -> [u8; 16] { - let mut result = [0_u8; 16]; - result[0..8].copy_from_slice(&self.txid.to_be_bytes()); - result[8..16].copy_from_slice(&self.seq_in_tx.to_be_bytes()); - result - } +/// A table's restartable state, any binary data. +pub struct RestartableState(pub Vec); - pub fn from_bytes(bytes: [u8; 16]) -> Self { - let txid = u64::from_be_bytes(bytes[0..8].try_into().unwrap()); - let seq_in_tx = u64::from_be_bytes(bytes[8..16].try_into().unwrap()); - Self::new(txid, seq_in_tx) +impl From> for RestartableState { + fn from(value: Vec) -> Self { + Self(value) } } #[derive( - Debug, - Clone, - Copy, - Serialize, - Deserialize, - PartialEq, - Eq, - PartialOrd, - Ord, - Hash, - bincode::Encode, - bincode::Decode, + Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, bincode::Encode, bincode::Decode, )] /// A table's ingestion state. pub enum TableState { @@ -124,8 +83,8 @@ pub enum TableState { NotStarted, /// This table has some data ingested, and it can't be restarted. NonRestartable, - /// This table has some data ingested, and it can be restarted using the given identifier. - Restartable(OpIdentifier), + /// This table has some data ingested, and it can be restarted if it's given the state. + Restartable(RestartableState), } /// Map from a `Source` node's handle to its tables' states.