Skip to content

Commit

Permalink
chore: refactor errors messages to an error enum
Browse files Browse the repository at this point in the history
  • Loading branch information
Solomon committed Dec 11, 2023
1 parent 2e91bac commit 4146917
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 39 deletions.
36 changes: 28 additions & 8 deletions dozer-ingestion/mysql/src/binlog.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
connection::is_network_failure, conversion::get_field_type_for_sql_type, schema::SchemaHelper,
MySQLConnectorError,
BreakingSchemaChange, MySQLConnectorError,
};

use super::{
Expand Down Expand Up @@ -254,7 +254,9 @@ impl BinlogIngestor<'_, '_, '_> {
.databases()
.contains(&database_name)
{
Err(MySQLConnectorError::BreakingSchemaChange(format!("Database \"{}\" was dropped", database_name)))?
Err(BreakingSchemaChange::DatabaseDropped(
database_name,
))?
}
}
}
Expand All @@ -267,7 +269,9 @@ impl BinlogIngestor<'_, '_, '_> {
if let Some(table) = table_cache
.find_table_by_object_name(&name, schema)
{
Err(MySQLConnectorError::BreakingSchemaChange(format!("Table \"{}\" was dropped", table)))?
Err(BreakingSchemaChange::TableDropped(
table.to_string(),
))?
}
}
}
Expand Down Expand Up @@ -299,7 +303,7 @@ impl BinlogIngestor<'_, '_, '_> {
if let Some(column) =
find_column(column_name)
{
Err(MySQLConnectorError::BreakingSchemaChange(format!("Column \"{}\" from table \"{}\" was dropped", column, table)))?
Err(BreakingSchemaChange::ColumnDropped { table_name: table.to_string(), column_name: column.to_string()})?
}
schema_change_tracker.column_order_changed_in(table.table_index);
}
Expand All @@ -316,14 +320,21 @@ impl BinlogIngestor<'_, '_, '_> {
if let Some(column) =
find_column(old_column_name)
{
Err(MySQLConnectorError::BreakingSchemaChange(format!("Column \"{}\" from table \"{}\" was renamed to \"{}\"", column, table, new_column_name.value)))?
Err(BreakingSchemaChange::ColumnRenamed{
table_name: table.to_string(),
old_column_name: column.to_string(),
new_column_name: new_column_name.value.clone()
})?
}
}
}
AlterTableOperation::RenameTable {
table_name,
} => {
Err(MySQLConnectorError::BreakingSchemaChange(format!("Table \"{}\" was renamed to \"{}\"", table, object_name_to_string(table_name))))?
Err(BreakingSchemaChange::TableRenamed{
old_table_name: table.to_string(),
new_table_name: object_name_to_string(table_name),
})?
}
AlterTableOperation::ChangeColumn {
old_name,
Expand All @@ -333,11 +344,20 @@ impl BinlogIngestor<'_, '_, '_> {
} => {
if let Some(column) = find_column(old_name) {
if !old_name.value.eq_ignore_ascii_case(&new_name.value) {
Err(MySQLConnectorError::BreakingSchemaChange(format!("Column \"{}\" from table \"{}\" was renamed to \"{}\"", column, table, new_name.value)))?
Err(BreakingSchemaChange::ColumnRenamed{
table_name: table.to_string(),
old_column_name: column.to_string(),
new_column_name: new_name.value.clone(),
})?
}
let new_type = get_field_type_for_sql_type(data_type);
if new_type != column.typ {
Err(MySQLConnectorError::BreakingSchemaChange(format!("Column \"{}\" from table \"{}\" changed data type from \"{}\" to \"{}\"", column, table, column.typ, new_type)))?
Err(BreakingSchemaChange::ColumnDataTypeChanged{
table_name: table.to_string(),
column_name: column.to_string(),
old_data_type: column.typ,
new_column_name: new_type,
})?
}
}
}
Expand Down
48 changes: 47 additions & 1 deletion dozer-ingestion/mysql/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use dozer_ingestion_connector::dozer_types::{
errors::types::DeserializationError,
thiserror::{self, Error},
types::FieldType,
};
use geozero::error::GeozeroError;

Expand Down Expand Up @@ -49,5 +50,50 @@ pub enum MySQLConnectorError {
QueryResultError(#[source] mysql_async::Error),

#[error("Schema had a breaking change: {0}")]
BreakingSchemaChange(String),
BreakingSchemaChange(#[from] BreakingSchemaChange),
}

#[derive(Error, Debug)]
pub enum BreakingSchemaChange {
#[error("Database \"{0}\" was dropped")]
DatabaseDropped(String),
#[error("Table \"{0}\" was dropped")]
TableDropped(String),
#[error("Table \"{0}\" has been dropped or renamed")]
TableDroppedOrRenamed(String),
#[error("Multiple tables have been dropped or renamed: {}", .0.join(", "))]
MultipleTablesDroppedOrRenamed(Vec<String>),
#[error("Table \"{old_table_name}\" was renamed to \"{new_table_name}\"")]
TableRenamed {
old_table_name: String,
new_table_name: String,
},
#[error("Column \"{column_name}\" from table \"{table_name}\" was dropped")]
ColumnDropped {
table_name: String,
column_name: String,
},
#[error("Column \"{old_column_name}\" from table \"{table_name}\" was renamed to \"{new_column_name}\"")]
ColumnRenamed {
table_name: String,
old_column_name: String,
new_column_name: String,
},
#[error("Column \"{column_name}\" from table \"{table_name}\" has been dropped or renamed")]
ColumnDroppedOrRenamed {
table_name: String,
column_name: String,
},
#[error("Multiple columns from table \"{table_name}\" have been dropped or renamed: {}", .columns.join(", "))]
MultipleColumnsDroppedOrRenamed {
table_name: String,
columns: Vec<String>,
},
#[error("Column \"{column_name}\" from table \"{table_name}\" changed data type from \"{old_data_type}\" to \"{new_column_name}\"")]
ColumnDataTypeChanged {
table_name: String,
column_name: String,
old_data_type: FieldType,
new_column_name: FieldType,
},
}
51 changes: 21 additions & 30 deletions dozer-ingestion/mysql/src/schema.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{helpers::escape_identifier, MySQLConnectorError};
use crate::{helpers::escape_identifier, BreakingSchemaChange, MySQLConnectorError};

use super::{
connection::{Conn, QueryResult},
Expand Down Expand Up @@ -277,19 +277,13 @@ impl SchemaHelper<'_> {
)
.collect::<Vec<_>>();
if missing.len() == 1 {
Err(MySQLConnectorError::BreakingSchemaChange(format!(
"Table \"{}\" has been dropped or renamed",
missing[0]
)))?
Err(BreakingSchemaChange::TableDroppedOrRenamed(
missing[0].to_string(),
))?
} else {
Err(MySQLConnectorError::BreakingSchemaChange(format!(
"Multiple tables have been dropped or renamed: {}",
missing
.iter()
.map(|td| td.to_string())
.collect::<Vec<_>>()
.join(", ")
)))?
Err(BreakingSchemaChange::MultipleTablesDroppedOrRenamed(
missing.iter().map(|td| td.to_string()).collect::<Vec<_>>(),
))?
}
}

Expand All @@ -313,20 +307,15 @@ impl SchemaHelper<'_> {
)
.collect::<Vec<_>>();
if missing.len() == 1 {
Err(MySQLConnectorError::BreakingSchemaChange(format!(
"Column \"{}\" from table \"{}\" has been dropped or renamed",
missing[0], old
)))?
Err(BreakingSchemaChange::ColumnDroppedOrRenamed {
column_name: missing[0].to_string(),
table_name: old.to_string(),
})?
} else {
Err(MySQLConnectorError::BreakingSchemaChange(format!(
"Multiple columns from table \"{}\" have been dropped or renamed: {}",
old,
missing
.iter()
.map(|cd| cd.to_string())
.collect::<Vec<_>>()
.join(", ")
)))?
Err(BreakingSchemaChange::MultipleColumnsDroppedOrRenamed {
table_name: old.to_string(),
columns: missing.iter().map(|cd| cd.to_string()).collect::<Vec<_>>(),
})?
}
}

Expand All @@ -339,10 +328,12 @@ impl SchemaHelper<'_> {
.unwrap();

if old_column.typ != new_column.typ {
Err(MySQLConnectorError::BreakingSchemaChange(format!(
"Column \"{}\" from table \"{}\" has changed data type from \"{}\" to \"{}\"",
old_column, old, old_column.typ, new_column.typ
)))?
Err(BreakingSchemaChange::ColumnDataTypeChanged {
table_name: old.to_string(),
column_name: old_column.to_string(),
old_data_type: old_column.typ,
new_column_name: new_column.typ,
})?
}
}

Expand Down

0 comments on commit 4146917

Please sign in to comment.