Skip to content

Commit

Permalink
refactor: Change OpIdentifier to RestartableState
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Dec 22, 2023
1 parent 3b5db3d commit 1bdde41
Show file tree
Hide file tree
Showing 36 changed files with 155 additions and 184 deletions.
4 changes: 2 additions & 2 deletions dozer-cli/src/pipeline/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,12 @@ impl Source for ConnectorSource {
.iter()
.zip(&self.ports)
.map(|(table, port)| {
let checkpoint = last_checkpoint.get(port).copied().flatten();
let state = last_checkpoint.get(port).cloned().flatten();
TableToIngest {
schema: table.schema.clone(),
name: table.name.clone(),
column_names: table.column_names.clone(),
checkpoint,
state,
}
})
.collect::<Vec<_>>();
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl BuilderDag {
last_checkpoint_by_name
.as_mut()
.and_then(|last_checkpoint| {
last_checkpoint.remove(&port_name).flatten()
last_checkpoint.remove(&port_name).flatten().cloned()
}),
);
}
Expand Down
10 changes: 5 additions & 5 deletions dozer-core/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use dozer_types::{
bincode,
log::{error, info},
models::app_config::{DataStorage, RecordStore},
node::{NodeHandle, OpIdentifier, SourceStates, TableState},
node::{NodeHandle, RestartableState, SourceStates, TableState},
parking_lot::Mutex,
types::Field,
};
Expand Down Expand Up @@ -116,7 +116,7 @@ impl OptionCheckpoint {
pub fn get_source_state(
&self,
node_handle: &NodeHandle,
) -> Result<Option<HashMap<String, Option<OpIdentifier>>>, ExecutionError> {
) -> Result<Option<HashMap<String, Option<&RestartableState>>>, ExecutionError> {
let Some(checkpoint) = self.checkpoint.as_ref() else {
return Ok(None);
};
Expand All @@ -126,17 +126,17 @@ impl OptionCheckpoint {

let mut result = HashMap::new();
for (table_name, state) in source_state {
let id = match state {
let state = match state {
TableState::NotStarted => None,
TableState::NonRestartable => {
return Err(ExecutionError::SourceCannotRestart {
source_name: node_handle.clone(),
table_name: table_name.clone(),
});
}
TableState::Restartable(id) => Some(*id),
TableState::Restartable(state) => Some(state),
};
result.insert(table_name.clone(), id);
result.insert(table_name.clone(), state);
}
Ok(Some(result))
}
Expand Down
6 changes: 3 additions & 3 deletions dozer-core/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ impl SourceChannelManager {
request_termination: bool,
) -> Result<bool, ExecutionError> {
match message {
IngestionMessage::OperationEvent { op, id, .. } => {
IngestionMessage::OperationEvent { op, state, .. } => {
let port_name = self.port_names[&port].clone();
self.current_op_ids.insert(
port_name,
if let Some(id) = id {
TableState::Restartable(id)
if let Some(state) = state {
TableState::Restartable(state)
} else {
TableState::NonRestartable
},
Expand Down
4 changes: 2 additions & 2 deletions dozer-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer};

use dozer_log::storage::{Object, Queue};
use dozer_types::errors::internal::BoxedError;
use dozer_types::node::OpIdentifier;
use dozer_types::node::RestartableState;
use dozer_types::serde::{Deserialize, Serialize};
use dozer_types::tonic::async_trait;
use dozer_types::types::Schema;
Expand Down Expand Up @@ -54,7 +54,7 @@ pub trait SourceFactory: Send + Sync + Debug {
) -> Result<Box<dyn Source>, BoxedError>;
}

pub type SourceState = HashMap<PortHandle, Option<OpIdentifier>>;
pub type SourceState = HashMap<PortHandle, Option<RestartableState>>;

pub trait Source: Send + Sync + Debug {
fn start(
Expand Down
4 changes: 2 additions & 2 deletions dozer-core/src/tests/dag_base_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use dozer_log::tokio;
use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer};
use dozer_types::errors::internal::BoxedError;
use dozer_types::models::ingestion_types::IngestionMessage;
use dozer_types::node::{NodeHandle, OpIdentifier};
use dozer_types::node::NodeHandle;
use dozer_types::tonic::async_trait;
use dozer_types::types::{
Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
Expand Down Expand Up @@ -378,7 +378,7 @@ impl Source for ErrGeneratorSource {
Field::String(format!("value_{n}")),
]),
},
id: Some(OpIdentifier::new(n, 0)),
state: Some(n.to_be_bytes().to_vec().into()),
},
GENERATOR_SOURCE_OUTPUT_PORT,
)?;
Expand Down
22 changes: 10 additions & 12 deletions dozer-core/src/tests/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFacto
use crate::DEFAULT_PORT_HANDLE;
use dozer_types::errors::internal::BoxedError;
use dozer_types::models::ingestion_types::IngestionMessage;
use dozer_types::node::OpIdentifier;
use dozer_types::types::{
Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
};
Expand Down Expand Up @@ -96,15 +95,14 @@ impl Source for GeneratorSource {
fw: &mut dyn SourceChannelForwarder,
last_checkpoint: SourceState,
) -> Result<(), BoxedError> {
let start = last_checkpoint
.values()
.copied()
.next()
.flatten()
.unwrap_or(OpIdentifier::new(0, 0))
.txid;
let state = last_checkpoint.values().next().and_then(|state| {
state
.as_ref()
.map(|state| u64::from_be_bytes(state.0.as_slice().try_into().unwrap()))
});
let start = state.map(|state| state + 1).unwrap_or(0);

for n in start + 1..(start + self.count + 1) {
for n in start..(start + self.count) {
fw.send(
IngestionMessage::OperationEvent {
table_index: 0,
Expand All @@ -114,7 +112,7 @@ impl Source for GeneratorSource {
Field::String(format!("value_{n}")),
]),
},
id: Some(OpIdentifier::new(n, 0)),
state: Some(n.to_be_bytes().to_vec().into()),
},
GENERATOR_SOURCE_OUTPUT_PORT,
)?;
Expand Down Expand Up @@ -237,7 +235,7 @@ impl Source for DualPortGeneratorSource {
Field::String(format!("value_{n}")),
]),
},
id: Some(OpIdentifier::new(n, 0)),
state: Some(n.to_be_bytes().to_vec().into()),
},
DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1,
)?;
Expand All @@ -250,7 +248,7 @@ impl Source for DualPortGeneratorSource {
Field::String(format!("value_{n}")),
]),
},
id: Some(OpIdentifier::new(n, 0)),
state: Some(n.to_be_bytes().to_vec().into()),
},
DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_2,
)?;
Expand Down
6 changes: 3 additions & 3 deletions dozer-ingestion/connector/src/ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ mod tests {
.handle_message(IngestionMessage::OperationEvent {
table_index: 0,
op: operation.clone(),
id: None,
state: None,
})
.await
.unwrap();
ingestor
.handle_message(IngestionMessage::OperationEvent {
table_index: 0,
op: operation2.clone(),
id: None,
state: None,
})
.await
.unwrap();
Expand All @@ -121,7 +121,7 @@ mod tests {
IngestionMessage::OperationEvent {
table_index: 0,
op,
id: None
state: None
},
msg
);
Expand Down
8 changes: 4 additions & 4 deletions dozer-ingestion/connector/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt::Debug;

use dozer_types::errors::internal::BoxedError;
use dozer_types::node::OpIdentifier;
use dozer_types::node::RestartableState;
use dozer_types::serde;
use dozer_types::serde::{Deserialize, Serialize};
pub use dozer_types::tonic::async_trait;
Expand Down Expand Up @@ -146,8 +146,8 @@ pub struct TableToIngest {
pub name: String,
/// The column names to be mapped.
pub column_names: Vec<String>,
/// The checkpoint to start after.
pub checkpoint: Option<OpIdentifier>,
/// The state to restart after.
pub state: Option<RestartableState>,
}

impl TableToIngest {
Expand All @@ -156,7 +156,7 @@ impl TableToIngest {
schema: table_info.schema,
name: table_info.name,
column_names: table_info.column_names,
checkpoint: None,
state: None,
}
}
}
4 changes: 2 additions & 2 deletions dozer-ingestion/deltalake/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl DeltaLakeReader {
table: &TableToIngest,
ingestor: &Ingestor,
) -> Result<(), BoxedError> {
assert!(table.checkpoint.is_none());
assert!(table.state.is_none());

let table_path = table_path(&self.config, &table.name)?;
let ctx = SessionContext::new();
Expand Down Expand Up @@ -75,7 +75,7 @@ impl DeltaLakeReader {
lifetime: None,
},
},
id: None,
state: None,
})
.await
.unwrap();
Expand Down
27 changes: 21 additions & 6 deletions dozer-ingestion/dozer/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use dozer_ingestion_connector::{
default_buffer_size, default_log_batch_size, default_timeout, IngestionMessage,
NestedDozerConfig, NestedDozerLogOptions,
},
node::OpIdentifier,
node::RestartableState,
serde_json,
tonic::{async_trait, transport::Channel},
types::{FieldType, Operation, Record, Schema},
Expand Down Expand Up @@ -208,10 +208,11 @@ async fn read_table(
reader_builder: LogReaderBuilder,
sender: Sender<IngestionMessage>,
) -> Result<(), NestedDozerConnectorError> {
let starting_point = table_info
.checkpoint
.map(|checkpoint| checkpoint.seq_in_tx + 1)
.unwrap_or(0);
let state = table_info
.state
.map(|state| decode_state(&state))
.transpose()?;
let starting_point = state.map(|pos| pos + 1).unwrap_or(0);
let mut reader = reader_builder.build(starting_point);
let schema = reader.schema.schema.clone();
let map = SchemaMapper::new(schema, &table_info.column_names)?;
Expand Down Expand Up @@ -243,12 +244,26 @@ async fn read_table(
.send(IngestionMessage::OperationEvent {
table_index,
op,
id: Some(OpIdentifier::new(0, op_and_pos.pos)),
state: Some(encode_state(op_and_pos.pos)),
})
.await;
}
}

fn encode_state(pos: u64) -> RestartableState {
pos.to_be_bytes().to_vec().into()
}

fn decode_state(state: &RestartableState) -> Result<u64, NestedDozerConnectorError> {
Ok(u64::from_be_bytes(
state
.0
.as_slice()
.try_into()
.map_err(|_| NestedDozerConnectorError::CorruptedState)?,
))
}

struct SchemaMapper {
source_schema: Schema,
fields: Vec<usize>,
Expand Down
3 changes: 3 additions & 0 deletions dozer-ingestion/dozer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use dozer_log::errors::{ReaderBuilderError, ReaderError};

#[derive(Error, Debug)]
enum NestedDozerConnectorError {
#[error("Failed to parse checkpoint state")]
CorruptedState,

#[error("Failed to connect to upstream dozer at {0}: {1:?}")]
ConnectionError(String, #[source] dozer_types::tonic::transport::Error),

Expand Down
4 changes: 2 additions & 2 deletions dozer-ingestion/ethereum/src/log/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ async fn process_log(details: Arc<EthDetails<'_>>, msg: Log) {
.handle_message(IngestionMessage::OperationEvent {
table_index,
op,
id: None,
state: None,
})
.await
.is_err()
Expand All @@ -245,7 +245,7 @@ async fn process_log(details: Arc<EthDetails<'_>>, msg: Log) {
.handle_message(IngestionMessage::OperationEvent {
table_index,
op,
id: None,
state: None,
})
.await;
} else {
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/ethereum/src/trace/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub async fn run(
.handle_message(IngestionMessage::OperationEvent {
table_index: 0, // We have only one table
op,
id: None,
state: None,
})
.await
.is_err()
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/grpc/src/adapter/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub async fn handle_message(
.handle_message(IngestionMessage::OperationEvent {
table_index,
op,
id: None,
state: None,
})
.await
.is_err()
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/grpc/src/adapter/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub async fn handle_message(
.handle_message(IngestionMessage::OperationEvent {
table_index,
op,
id: None,
state: None,
})
.await;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/javascript/src/js_extension/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ async fn send(ingestor: Ingestor, val: JsMessage) -> Result<(), Error> {
IngestionMessage::OperationEvent {
table_index: 0,
op,
id: None,
state: None,
}
}
};
Expand Down
8 changes: 4 additions & 4 deletions dozer-ingestion/kafka/src/debezium/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl StreamConsumer for DebeziumStreamConsumer {
let topics: Vec<&str> = tables
.iter()
.map(|t| {
assert!(t.checkpoint.is_none());
assert!(t.state.is_none());
t.name.as_str()
})
.collect();
Expand Down Expand Up @@ -154,7 +154,7 @@ impl StreamConsumer for DebeziumStreamConsumer {
lifetime: None,
},
},
id: None,
state: None,
})
.await
.is_err()
Expand All @@ -176,7 +176,7 @@ impl StreamConsumer for DebeziumStreamConsumer {
lifetime: None,
},
},
id: None,
state: None,
})
.await
.is_err()
Expand All @@ -198,7 +198,7 @@ impl StreamConsumer for DebeziumStreamConsumer {
lifetime: None,
},
},
id: None,
state: None,
})
.await
.is_err()
Expand Down
Loading

0 comments on commit 1bdde41

Please sign in to comment.