From 4f58e488282822f53084106348ac4416a88f3f88 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Sun, 29 Dec 2024 13:13:22 +1000 Subject: [PATCH 1/2] fix: Don't silence disk full in SQLite --- src/sqlite.rs | 20 +++++++++++++++----- src/sqlite/write.rs | 19 ++++++++++++++++++- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/src/sqlite.rs b/src/sqlite.rs index 2f22e57f..ed9beefb 100644 --- a/src/sqlite.rs +++ b/src/sqlite.rs @@ -77,6 +77,9 @@ pub enum Error { #[snafu(display("Unable to insert data into the Sqlite table: {source}"))] UnableToInsertIntoTableAsync { source: tokio_rusqlite::Error }, + #[snafu(display("Unable to insert data into the Sqlite table. The disk is full."))] + DiskFull {}, + #[snafu(display("Unable to deleta all table data in Sqlite: {source}"))] UnableToDeleteAllTableData { source: rusqlite::Error }, @@ -103,7 +106,9 @@ pub enum Error { ))] UnableToParseBusyTimeoutParameter { source: fundu::ParseError }, - #[snafu(display("Failed to create '{table_name}': creating a table with a schema is not supported"))] + #[snafu(display( + "Failed to create '{table_name}': creating a table with a schema is not supported" + ))] TableWithSchemaCreationNotSupported { table_name: String }, } @@ -219,12 +224,12 @@ impl TableProviderFactory for SqliteTableProviderFactory { _state: &dyn Session, cmd: &CreateExternalTable, ) -> DataFusionResult> { - if cmd.name.schema().is_some() { TableWithSchemaCreationNotSupportedSnafu { table_name: cmd.name.to_string(), } - .fail().map_err(to_datafusion_error)?; + .fail() + .map_err(to_datafusion_error)?; } let name = cmd.name.clone(); @@ -504,7 +509,10 @@ impl Sqlite { } fn delete_all_table_data(&self, transaction: &Transaction<'_>) -> rusqlite::Result<()> { - transaction.execute(format!(r#"DELETE FROM {}"#, self.table.to_quoted_string()).as_str(), [])?; + transaction.execute( + format!(r#"DELETE FROM {}"#, self.table.to_quoted_string()).as_str(), + [], + )?; Ok(()) } @@ -625,7 +633,9 @@ impl Sqlite { ) -> DataFusionResult { let expected_indexes_str_map: HashSet = indexes .iter() - .map(|(col, _)| IndexBuilder::new(self.table.table(), col.iter().collect()).index_name()) + .map(|(col, _)| { + IndexBuilder::new(self.table.table(), col.iter().collect()).index_name() + }) .collect(); let actual_indexes_str_map = self.get_indexes(sqlite_conn).await?; diff --git a/src/sqlite/write.rs b/src/sqlite/write.rs index 3aa8fe0e..8b5ef0aa 100644 --- a/src/sqlite/write.rs +++ b/src/sqlite/write.rs @@ -198,7 +198,24 @@ impl DataSink for SqliteDataSink { }) .await .context(super::UnableToInsertIntoTableAsyncSnafu) - .map_err(to_retriable_data_write_error)?; + .map_err(|e| { + if let super::Error::UnableToInsertIntoTableAsync { + source: + tokio_rusqlite::Error::Rusqlite(rusqlite::Error::SqliteFailure( + rusqlite::ffi::Error { + code: rusqlite::ffi::ErrorCode::DiskFull, + .. + }, + _, + )), + } = e + { + DataFusionError::External(super::Error::DiskFull {}.into()) + } else { + to_retriable_data_write_error(e) + } + })?; + // .map_err(to_retriable_data_write_error)?; let num_rows = task.await.map_err(to_retriable_data_write_error)??; From 4a528d430306faf5585ee5045fecb378f755dc82 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Sun, 29 Dec 2024 13:32:54 +1000 Subject: [PATCH 2/2] chore: Remove commented out line --- src/sqlite/write.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sqlite/write.rs b/src/sqlite/write.rs index 8b5ef0aa..492be2d9 100644 --- a/src/sqlite/write.rs +++ b/src/sqlite/write.rs @@ -215,7 +215,6 @@ impl DataSink for SqliteDataSink { to_retriable_data_write_error(e) } })?; - // .map_err(to_retriable_data_write_error)?; let num_rows = task.await.map_err(to_retriable_data_write_error)??;