Skip to content

Commit

Permalink
feat: handle breaking and non-breaking schema changes for MySQL conne…
Browse files Browse the repository at this point in the history
…ctor
  • Loading branch information
Solomon committed Dec 11, 2023
1 parent 5233422 commit 2e91bac
Show file tree
Hide file tree
Showing 7 changed files with 788 additions and 133 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dozer-ingestion/mysql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ geozero = { version = "0.11.0", default-features = false, features = [
"with-wkb",
] }
rand = "0.8.5"
sqlparser = "0.40.0"

[dev-dependencies]
serial_test = "1.0.0"
Expand Down
450 changes: 389 additions & 61 deletions dozer-ingestion/mysql/src/binlog.rs

Large diffs are not rendered by default.

13 changes: 6 additions & 7 deletions dozer-ingestion/mysql/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl MySQLConnector {
.map_err(|err| MySQLConnectorError::ConnectionFailure(self.conn_url.clone(), err))
}

fn schema_helper(&self) -> SchemaHelper<'_, '_> {
fn schema_helper(&self) -> SchemaHelper<'_> {
SchemaHelper::new(&self.conn_url, &self.conn_pool)
}

Expand All @@ -226,15 +226,15 @@ impl MySQLConnector {
ingestor: &Ingestor,
table_infos: Vec<TableInfo>,
) -> Result<(), MySQLConnectorError> {
let table_definitions = self
let mut table_definitions = self
.schema_helper()
.get_table_definitions(&table_infos)
.await?;
let binlog_positions = self.replicate_tables(ingestor, &table_definitions).await?;

let binlog_position = self.sync_with_binlog(ingestor, binlog_positions).await?;

self.ingest_binlog(ingestor, &table_definitions, binlog_position, None)
self.ingest_binlog(ingestor, &mut table_definitions, binlog_position, None)
.await?;

Ok(())
Expand Down Expand Up @@ -367,7 +367,7 @@ impl MySQLConnector {

self.ingest_binlog(
ingestor,
&synced_tables,
&mut synced_tables,
start_position,
Some(end_position),
)
Expand All @@ -386,22 +386,21 @@ impl MySQLConnector {
async fn ingest_binlog(
&self,
ingestor: &Ingestor,
tables: &[TableDefinition],
tables: &mut [TableDefinition],
start_position: BinlogPosition,
stop_position: Option<BinlogPosition>,
) -> Result<(), MySQLConnectorError> {
let server_id = self.server_id.unwrap_or_else(|| rand::thread_rng().gen());

let mut binlog_ingestor = BinlogIngestor::new(
ingestor,
tables,
start_position,
stop_position,
server_id,
(&self.conn_pool, &self.conn_url),
);

binlog_ingestor.ingest().await
binlog_ingestor.ingest(tables, self.schema_helper()).await
}
}

Expand Down
65 changes: 65 additions & 0 deletions dozer-ingestion/mysql/src/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,71 @@ pub fn get_field_type_for_mysql_column_type(
Ok(field_type)
}

pub fn get_field_type_for_sql_type(sql_data_type: &sqlparser::ast::DataType) -> FieldType {
use sqlparser::ast::DataType;
match sql_data_type {
DataType::Character(_)
| DataType::Char(_)
| DataType::String(_)
| DataType::Enum(_)
| DataType::Set(_) => FieldType::String,
DataType::CharacterVarying(_)
| DataType::CharVarying(_)
| DataType::Varchar(_)
| DataType::Nvarchar(_)
| DataType::Text => FieldType::Text,
DataType::Uuid
| DataType::CharacterLargeObject(_)
| DataType::CharLargeObject(_)
| DataType::Clob(_)
| DataType::Regclass
| DataType::Custom(_, _)
| DataType::Array(_)
| DataType::Struct(_) => unreachable!("MySQL does not support this type: {sql_data_type}"),
DataType::Binary(_)
| DataType::Varbinary(_)
| DataType::Blob(_)
| DataType::Bytes(_)
| DataType::Bytea => FieldType::Binary,
DataType::Numeric(_)
| DataType::Decimal(_)
| DataType::BigNumeric(_)
| DataType::BigDecimal(_)
| DataType::Dec(_) => FieldType::Decimal,
DataType::Float(_)
| DataType::Float4
| DataType::Float64
| DataType::Real
| DataType::Float8
| DataType::Double
| DataType::DoublePrecision => FieldType::Float,
DataType::TinyInt(_)
| DataType::Int2(_)
| DataType::SmallInt(_)
| DataType::MediumInt(_)
| DataType::Int(_)
| DataType::Int4(_)
| DataType::Int64
| DataType::Integer(_)
| DataType::BigInt(_)
| DataType::Int8(_) => FieldType::Int,
DataType::UnsignedTinyInt(_)
| DataType::UnsignedInt2(_)
| DataType::UnsignedSmallInt(_)
| DataType::UnsignedMediumInt(_)
| DataType::UnsignedInt(_)
| DataType::UnsignedInt4(_)
| DataType::UnsignedInteger(_)
| DataType::UnsignedBigInt(_)
| DataType::UnsignedInt8(_) => FieldType::UInt,
DataType::Bool | DataType::Boolean => FieldType::Boolean,
DataType::Date => FieldType::Date,
DataType::Time(_, _) | DataType::Interval => FieldType::Duration,
DataType::Datetime(_) | DataType::Timestamp(_, _) => FieldType::Timestamp,
DataType::JSON => FieldType::Json,
}
}

pub trait IntoFields<'a> {
type Ctx: 'a;

Expand Down
3 changes: 3 additions & 0 deletions dozer-ingestion/mysql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,7 @@ pub enum MySQLConnectorError {

#[error("Failed to fetch query result. {0}")]
QueryResultError(#[source] mysql_async::Error),

#[error("Schema had a breaking change: {0}")]
BreakingSchemaChange(String),
}
Loading

0 comments on commit 2e91bac

Please sign in to comment.