diff --git a/dozer-ingestion/mysql/src/binlog.rs b/dozer-ingestion/mysql/src/binlog.rs index 7e02c9d144..b8831fe404 100644 --- a/dozer-ingestion/mysql/src/binlog.rs +++ b/dozer-ingestion/mysql/src/binlog.rs @@ -1,6 +1,6 @@ use crate::{ connection::is_network_failure, conversion::get_field_type_for_sql_type, schema::SchemaHelper, - MySQLConnectorError, + BreakingSchemaChange, MySQLConnectorError, }; use super::{ @@ -254,7 +254,9 @@ impl BinlogIngestor<'_, '_, '_> { .databases() .contains(&database_name) { - Err(MySQLConnectorError::BreakingSchemaChange(format!("Database \"{}\" was dropped", database_name)))? + Err(BreakingSchemaChange::DatabaseDropped( + database_name, + ))? } } } @@ -267,7 +269,9 @@ impl BinlogIngestor<'_, '_, '_> { if let Some(table) = table_cache .find_table_by_object_name(&name, schema) { - Err(MySQLConnectorError::BreakingSchemaChange(format!("Table \"{}\" was dropped", table)))? + Err(BreakingSchemaChange::TableDropped( + table.to_string(), + ))? } } } @@ -299,7 +303,7 @@ impl BinlogIngestor<'_, '_, '_> { if let Some(column) = find_column(column_name) { - Err(MySQLConnectorError::BreakingSchemaChange(format!("Column \"{}\" from table \"{}\" was dropped", column, table)))? + Err(BreakingSchemaChange::ColumnDropped { table_name: table.to_string(), column_name: column.to_string()})? } schema_change_tracker.column_order_changed_in(table.table_index); } @@ -316,14 +320,21 @@ impl BinlogIngestor<'_, '_, '_> { if let Some(column) = find_column(old_column_name) { - Err(MySQLConnectorError::BreakingSchemaChange(format!("Column \"{}\" from table \"{}\" was renamed to \"{}\"", column, table, new_column_name.value)))? + Err(BreakingSchemaChange::ColumnRenamed{ + table_name: table.to_string(), + old_column_name: column.to_string(), + new_column_name: new_column_name.value.clone() + })? } } } AlterTableOperation::RenameTable { table_name, } => { - Err(MySQLConnectorError::BreakingSchemaChange(format!("Table \"{}\" was renamed to \"{}\"", table, object_name_to_string(table_name))))? + Err(BreakingSchemaChange::TableRenamed{ + old_table_name: table.to_string(), + new_table_name: object_name_to_string(table_name), + })? } AlterTableOperation::ChangeColumn { old_name, @@ -333,11 +344,20 @@ impl BinlogIngestor<'_, '_, '_> { } => { if let Some(column) = find_column(old_name) { if !old_name.value.eq_ignore_ascii_case(&new_name.value) { - Err(MySQLConnectorError::BreakingSchemaChange(format!("Column \"{}\" from table \"{}\" was renamed to \"{}\"", column, table, new_name.value)))? + Err(BreakingSchemaChange::ColumnRenamed{ + table_name: table.to_string(), + old_column_name: column.to_string(), + new_column_name: new_name.value.clone(), + })? } let new_type = get_field_type_for_sql_type(data_type); if new_type != column.typ { - Err(MySQLConnectorError::BreakingSchemaChange(format!("Column \"{}\" from table \"{}\" changed data type from \"{}\" to \"{}\"", column, table, column.typ, new_type)))? + Err(BreakingSchemaChange::ColumnDataTypeChanged{ + table_name: table.to_string(), + column_name: column.to_string(), + old_data_type: column.typ, + new_column_name: new_type, + })? } } } diff --git a/dozer-ingestion/mysql/src/lib.rs b/dozer-ingestion/mysql/src/lib.rs index 4d72130d5c..117fd80cfb 100644 --- a/dozer-ingestion/mysql/src/lib.rs +++ b/dozer-ingestion/mysql/src/lib.rs @@ -1,6 +1,7 @@ use dozer_ingestion_connector::dozer_types::{ errors::types::DeserializationError, thiserror::{self, Error}, + types::FieldType, }; use geozero::error::GeozeroError; @@ -49,5 +50,50 @@ pub enum MySQLConnectorError { QueryResultError(#[source] mysql_async::Error), #[error("Schema had a breaking change: {0}")] - BreakingSchemaChange(String), + BreakingSchemaChange(#[from] BreakingSchemaChange), +} + +#[derive(Error, Debug)] +pub enum BreakingSchemaChange { + #[error("Database \"{0}\" was dropped")] + DatabaseDropped(String), + #[error("Table \"{0}\" was dropped")] + TableDropped(String), + #[error("Table \"{0}\" has been dropped or renamed")] + TableDroppedOrRenamed(String), + #[error("Multiple tables have been dropped or renamed: {}", .0.join(", "))] + MultipleTablesDroppedOrRenamed(Vec), + #[error("Table \"{old_table_name}\" was renamed to \"{new_table_name}\"")] + TableRenamed { + old_table_name: String, + new_table_name: String, + }, + #[error("Column \"{column_name}\" from table \"{table_name}\" was dropped")] + ColumnDropped { + table_name: String, + column_name: String, + }, + #[error("Column \"{old_column_name}\" from table \"{table_name}\" was renamed to \"{new_column_name}\"")] + ColumnRenamed { + table_name: String, + old_column_name: String, + new_column_name: String, + }, + #[error("Column \"{column_name}\" from table \"{table_name}\" has been dropped or renamed")] + ColumnDroppedOrRenamed { + table_name: String, + column_name: String, + }, + #[error("Multiple columns from table \"{table_name}\" have been dropped or renamed: {}", .columns.join(", "))] + MultipleColumnsDroppedOrRenamed { + table_name: String, + columns: Vec, + }, + #[error("Column \"{column_name}\" from table \"{table_name}\" changed data type from \"{old_data_type}\" to \"{new_column_name}\"")] + ColumnDataTypeChanged { + table_name: String, + column_name: String, + old_data_type: FieldType, + new_column_name: FieldType, + }, } diff --git a/dozer-ingestion/mysql/src/schema.rs b/dozer-ingestion/mysql/src/schema.rs index 6af825a985..dbe4de1ab9 100644 --- a/dozer-ingestion/mysql/src/schema.rs +++ b/dozer-ingestion/mysql/src/schema.rs @@ -1,4 +1,4 @@ -use crate::{helpers::escape_identifier, MySQLConnectorError}; +use crate::{helpers::escape_identifier, BreakingSchemaChange, MySQLConnectorError}; use super::{ connection::{Conn, QueryResult}, @@ -277,19 +277,13 @@ impl SchemaHelper<'_> { ) .collect::>(); if missing.len() == 1 { - Err(MySQLConnectorError::BreakingSchemaChange(format!( - "Table \"{}\" has been dropped or renamed", - missing[0] - )))? + Err(BreakingSchemaChange::TableDroppedOrRenamed( + missing[0].to_string(), + ))? } else { - Err(MySQLConnectorError::BreakingSchemaChange(format!( - "Multiple tables have been dropped or renamed: {}", - missing - .iter() - .map(|td| td.to_string()) - .collect::>() - .join(", ") - )))? + Err(BreakingSchemaChange::MultipleTablesDroppedOrRenamed( + missing.iter().map(|td| td.to_string()).collect::>(), + ))? } } @@ -313,20 +307,15 @@ impl SchemaHelper<'_> { ) .collect::>(); if missing.len() == 1 { - Err(MySQLConnectorError::BreakingSchemaChange(format!( - "Column \"{}\" from table \"{}\" has been dropped or renamed", - missing[0], old - )))? + Err(BreakingSchemaChange::ColumnDroppedOrRenamed { + column_name: missing[0].to_string(), + table_name: old.to_string(), + })? } else { - Err(MySQLConnectorError::BreakingSchemaChange(format!( - "Multiple columns from table \"{}\" have been dropped or renamed: {}", - old, - missing - .iter() - .map(|cd| cd.to_string()) - .collect::>() - .join(", ") - )))? + Err(BreakingSchemaChange::MultipleColumnsDroppedOrRenamed { + table_name: old.to_string(), + columns: missing.iter().map(|cd| cd.to_string()).collect::>(), + })? } } @@ -339,10 +328,12 @@ impl SchemaHelper<'_> { .unwrap(); if old_column.typ != new_column.typ { - Err(MySQLConnectorError::BreakingSchemaChange(format!( - "Column \"{}\" from table \"{}\" has changed data type from \"{}\" to \"{}\"", - old_column, old, old_column.typ, new_column.typ - )))? + Err(BreakingSchemaChange::ColumnDataTypeChanged { + table_name: old.to_string(), + column_name: old_column.to_string(), + old_data_type: old_column.typ, + new_column_name: new_column.typ, + })? } }