Skip to content

Commit

Permalink
Optimize oracle DML parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker committed Apr 8, 2024
1 parent a10dd4a commit 223cae8
Show file tree
Hide file tree
Showing 15 changed files with 677 additions and 490 deletions.
17 changes: 14 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions dozer-ingestion/oracle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ license = "AGPL-3.0-or-later"

[dependencies]
dozer-ingestion-connector = { path = "../connector" }
fxhash = "0.2.1"
memchr = "2.7.2"
oracle = { version = "0.5.7", features = ["chrono", "stmt_without_lifetime"] }
regex = "1.10.3"

Expand Down
30 changes: 22 additions & 8 deletions dozer-ingestion/oracle/src/connector/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::{
collections::{HashMap, HashSet},
num::ParseFloatError,
sync::Arc,
time::Duration,
sync::{mpsc::channel, Arc},
time::{Duration, Instant},
};

use dozer_ingestion_connector::{
dozer_types::{
chrono,
chrono::{self, DateTime},
epoch::SourceTime,
log::{debug, error},
log::{debug, error, info},
models::ingestion_types::{IngestionMessage, OracleReplicator, TransactionInfo},
node::OpIdentifier,
rust_decimal::{self, Decimal},
Expand All @@ -32,6 +32,14 @@ pub struct Connector {
replicator: OracleReplicator,
}

#[derive(Debug, thiserror::Error)]
enum ParseDateError {
#[error("Invalid date format: {0}")]
Chrono(#[from] chrono::ParseError),
#[error("Invalid oracle format")]
Oracle,
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("oracle error: {0:?}")]
Expand Down Expand Up @@ -65,13 +73,13 @@ pub enum Error {
#[error("cannot parse float: {0}")]
ParseFloat(#[from] ParseFloatError),
#[error("cannot parse date time from {1}: {0}")]
ParseDateTime(#[source] chrono::ParseError, String),
ParseDateTime(ParseDateError, String),
#[error("got overflow float number {0}")]
FloatOverflow(Decimal),
#[error("got error when parsing uint {0}")]
ParseUIntFailed(Decimal),
ParseUIntFailed(String),
#[error("got error when parsing int {0}")]
ParseIntFailed(Decimal),
ParseIntFailed(String),
#[error("type mismatch for {field}, expected {expected:?}, actual {actual:?}")]
TypeMismatch {
field: String,
Expand Down Expand Up @@ -377,7 +385,13 @@ impl Connector {
})
};

for transaction in processor.process(receiver) {
let mut recv = receiver.into_iter();
let first = processor
.process(recv.by_ref())
.find(|op| !op.as_ref().unwrap().operations.is_empty())
.unwrap()
.unwrap();
for transaction in recv.map(|_| Ok::<_, Error>(first.clone())) {
let transaction = match transaction {
Ok(transaction) => transaction,
Err(e) => {
Expand Down
2 changes: 2 additions & 0 deletions dozer-ingestion/oracle/src/connector/replicate/log/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::time::Instant;
use std::{sync::mpsc::SyncSender, time::Duration};

use dozer_ingestion_connector::dozer_types::log::debug;
Expand Down Expand Up @@ -31,6 +32,7 @@ pub struct LogManagerContent {
pub rbasqn: u32,
pub sql_redo: Option<String>,
pub csf: u8,
pub received: Instant,
}

/// `ingestor` is only used for checking if ingestion has ended so we can break the loop.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::env;
use std::{env, time::Instant};

use dozer_ingestion_connector::dozer_types::{
chrono::{DateTime, Utc},
Expand Down Expand Up @@ -62,7 +62,6 @@ impl RedoReader for LogMiner {
STARTSCN => :start_scn,
OPTIONS =>
DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG +
DBMS_LOGMNR.PRINT_PRETTY_SQL +
DBMS_LOGMNR.NO_ROWID_IN_STMT
);
END;";
Expand All @@ -74,7 +73,6 @@ impl RedoReader for LogMiner {
DBMS_LOGMNR.START_LOGMNR(
OPTIONS =>
DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG +
DBMS_LOGMNR.PRINT_PRETTY_SQL +
DBMS_LOGMNR.NO_ROWID_IN_STMT
);
END;";
Expand Down Expand Up @@ -162,6 +160,7 @@ impl RowValue for LogManagerContent {
rbasqn,
sql_redo,
csf,
received: Instant::now(),
})
}
}
2 changes: 1 addition & 1 deletion dozer-ingestion/oracle/src/connector/replicate/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod log;
mod transaction;
pub mod transaction;

pub use log::log_miner_loop;
pub use transaction::Processor;
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use crate::connector::{
pub struct Transaction {
pub commit_scn: Scn,
pub commit_timestamp: DateTime<Utc>,
pub operations: Vec<Operation>,
pub operations: Vec<RawOperation>,
}

#[derive(Debug, Clone)]
pub struct Operation {
pub struct RawOperation {
pub seg_owner: String,
pub table_name: String,
pub kind: OperationKind,
Expand Down Expand Up @@ -52,7 +52,7 @@ impl Aggregator {
}
}

type TransactionForest = forest::Forest<TransactionId, Vec<Operation>>;
type TransactionForest = forest::Forest<TransactionId, Vec<RawOperation>>;

#[derive(Debug)]
struct Processor<I: Iterator<Item = LogManagerContent>> {
Expand Down Expand Up @@ -120,7 +120,7 @@ impl<I: Iterator<Item = LogManagerContent>> Iterator for Processor<I> {
op::process_operation(
content.xid,
content.pxid,
Operation {
RawOperation {
seg_owner,
table_name,
kind,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use dozer_ingestion_connector::dozer_types::log::warn;

use crate::connector::replicate::log::TransactionId;

use super::{Operation, TransactionForest};
use super::{RawOperation, TransactionForest};

pub fn process_operation(
xid: TransactionId,
pxid: TransactionId,
operation: Operation,
operation: RawOperation,
transaction_forest: &mut TransactionForest,
) {
if xid == pxid {
Expand Down
Loading

0 comments on commit 223cae8

Please sign in to comment.