From ad8bba536d3d98292864e0d50ffe643e8ffe1c6c Mon Sep 17 00:00:00 2001 From: Yuri Astrakhan Date: Thu, 28 Sep 2023 00:06:59 -0400 Subject: [PATCH] wip --- martin-mbtiles/src/mbtiles.rs | 108 ++++++++++++++++-------------- martin-mbtiles/src/tile_copier.rs | 67 +++++++++++------- 2 files changed, 100 insertions(+), 75 deletions(-) diff --git a/martin-mbtiles/src/mbtiles.rs b/martin-mbtiles/src/mbtiles.rs index fc878a095..8de3cc78a 100644 --- a/martin-mbtiles/src/mbtiles.rs +++ b/martin-mbtiles/src/mbtiles.rs @@ -15,11 +15,8 @@ use serde::ser::SerializeStruct; use serde::Serialize; use serde_json::{Value as JSONValue, Value}; use sqlite_hashes::register_md5_function; -use sqlite_hashes::rusqlite::{ - Connection as RusqliteConnection, Connection, OpenFlags, OptionalExtension, -}; -use sqlx::sqlite::SqliteRow; -use sqlx::{query, Row, SqliteExecutor}; +use sqlx::sqlite::{SqliteConnectOptions, SqliteRow}; +use sqlx::{query, Connection as _, Row, SqliteConnection, SqliteExecutor}; use tilejson::{tilejson, Bounds, Center, TileJSON}; use crate::errors::{MbtError, MbtResult}; @@ -96,6 +93,24 @@ impl Mbtiles { }) } + pub async fn open_with_hashes(&self, readonly: bool) -> MbtResult { + let opt = SqliteConnectOptions::new() + .filename(self.filepath()) + .read_only(readonly); + let mut conn = SqliteConnection::connect_with(&opt).await?; + self.attach_hash_fn(&mut conn).await?; + Ok(conn) + } + + async fn attach_hash_fn(&self, conn: &mut SqliteConnection) -> MbtResult<()> { + let handle = conn.lock_handle().await?.as_raw_handle().as_ptr(); + // Safety: we know that the handle is a SQLite connection is locked and is not used anywhere else. + // The registered functions will be dropped when SQLX drops DB connection. + let rc = unsafe { sqlite_hashes::rusqlite::Connection::from_handle(handle) }?; + register_md5_function(&rc)?; + Ok(()) + } + #[must_use] pub fn filepath(&self) -> &str { &self.filepath @@ -420,41 +435,6 @@ impl Mbtiles { Err(MbtError::NoUniquenessConstraint(self.filepath.clone())) } - /// Compute the hash of the combined tiles in the mbtiles file tiles table/view. - /// This should work on all mbtiles files perf `MBTiles` specification. - fn calc_agg_tiles_hash(&self) -> MbtResult { - Ok(self.open_with_hashes(true)?.query_row_and_then( - // The md5_concat func will return NULL if there are no rows in the tiles table. - // For our use case, we will treat it as an empty string, and hash that. - "SELECT hex( - coalesce( - md5_concat( - cast(zoom_level AS text), - cast(tile_column AS text), - cast(tile_row AS text), - tile_data - ), - md5('') - ) - ) - FROM tiles - ORDER BY zoom_level, tile_column, tile_row;", - [], - |row| row.get(0), - )?) - } - - pub(crate) fn open_with_hashes(&self, is_readonly: bool) -> MbtResult { - let flags = if is_readonly { - OpenFlags::SQLITE_OPEN_READ_ONLY - } else { - OpenFlags::default() - }; - let rusqlite_conn = RusqliteConnection::open_with_flags(self.filepath(), flags)?; - register_md5_function(&rusqlite_conn)?; - Ok(rusqlite_conn) - } - /// Perform `SQLite` internal integrity check pub async fn check_integrity( &self, @@ -499,7 +479,8 @@ impl Mbtiles { return Err(AggHashValueNotFound(self.filepath().to_string())); }; - let computed = self.calc_agg_tiles_hash()?; + // let conn = self.open_with_hashes(true)?; + let computed = calc_agg_tiles_hash(&mut *conn).await?; if stored != computed { let file = self.filepath().to_string(); return Err(AggHashMismatch(computed, stored, file)); @@ -509,12 +490,12 @@ impl Mbtiles { } /// Compute new aggregate tiles hash and save it to the metadata table (if needed) - pub async fn update_agg_tiles_hash(&self, conn: &mut T) -> MbtResult<()> + pub(crate) async fn update_agg_tiles_hash(&self, conn: &mut T) -> MbtResult<()> where for<'e> &'e mut T: SqliteExecutor<'e>, { let old_hash = self.get_agg_tiles_hash(&mut *conn).await?; - let hash = self.calc_agg_tiles_hash()?; + let hash = calc_agg_tiles_hash(&mut *conn).await?; if old_hash.as_ref() == Some(&hash) { info!( "agg_tiles_hash is already set to the correct value `{hash}` in {}", @@ -570,21 +551,50 @@ impl Mbtiles { } }; - self.open_with_hashes(true)? - .query_row_and_then(sql, [], |r| Ok((r.get(0)?, r.get(1)?))) - .optional()? - .map_or(Ok(()), |v: (String, String)| { - Err(IncorrectTileHash(self.filepath().to_string(), v.0, v.1)) + query(sql) + .fetch_optional(&mut *conn) + .await? + .map_or(Ok(()), |v| { + Err(IncorrectTileHash( + self.filepath().to_string(), + v.get(0), + v.get(1), + )) }) } } +/// Compute the hash of the combined tiles in the mbtiles file tiles table/view. +/// This should work on all mbtiles files perf `MBTiles` specification. +async fn calc_agg_tiles_hash(conn: &mut T) -> MbtResult +where + for<'e> &'e mut T: SqliteExecutor<'e>, +{ + let query = query( + // The md5_concat func will return NULL if there are no rows in the tiles table. + // For our use case, we will treat it as an empty string, and hash that. + "SELECT hex( + coalesce( + md5_concat( + cast(zoom_level AS text), + cast(tile_column AS text), + cast(tile_row AS text), + tile_data + ), + md5('') + ) + ) + FROM tiles + ORDER BY zoom_level, tile_column, tile_row;", + ); + return Ok(query.fetch_one(conn).await?.get::(0)); +} + #[cfg(test)] mod tests { use std::collections::HashMap; use martin_tile_utils::Encoding; - use sqlx::{Connection, SqliteConnection}; use tilejson::VectorLayer; use super::*; diff --git a/martin-mbtiles/src/tile_copier.rs b/martin-mbtiles/src/tile_copier.rs index eae8f5248..05616c4d0 100644 --- a/martin-mbtiles/src/tile_copier.rs +++ b/martin-mbtiles/src/tile_copier.rs @@ -175,21 +175,15 @@ impl TileCopier { let dst_type = if is_empty { let dst_type = self.options.dst_type.unwrap_or(src_type); - self.create_new_mbtiles(&mut conn, dst_type, src_type) - .await?; + self.init_new_mbtiles(&mut conn, src_type, dst_type).await?; dst_type } else if self.options.diff_with_file.is_some() { return Err(MbtError::NonEmptyTargetFile(self.options.dst_file)); } else { - open_and_detect_type(&self.dst_mbtiles).await? + self.dst_mbtiles.detect_type(&mut conn).await? }; - let rusqlite_conn = self.dst_mbtiles.open_with_hashes(false)?; - rusqlite_conn.execute( - "ATTACH DATABASE ? AS sourceDb", - [self.src_mbtiles.filepath()], - )?; - + self.attach_source_db(&mut conn, &self.src_mbtiles).await?; let (on_dupl, sql_cond) = self.get_on_duplicate_sql(dst_type); let (select_from, query_args) = { @@ -197,8 +191,10 @@ impl TileCopier { let diff_with_mbtiles = Mbtiles::new(diff_file)?; let diff_type = open_and_detect_type(&diff_with_mbtiles).await?; - rusqlite_conn - .execute("ATTACH DATABASE ? AS newDb", [diff_with_mbtiles.filepath()])?; + let path = diff_with_mbtiles.filepath(); + query!("ATTACH DATABASE ? AS newDb", path) + .execute(&mut *conn) + .await?; Self::get_select_from_with_diff(dst_type, diff_type) } else { @@ -238,29 +234,31 @@ impl TileCopier { }; if !self.options.skip_agg_tiles_hash { - self.dst_mbtiles.update_agg_tiles_hash(&mut conn).await?; + self.dst_mbtiles + .update_agg_tiles_hash_conn(&mut conn, &rusqlite_conn) + .await?; } Ok(conn) } - async fn create_new_mbtiles( + async fn init_new_mbtiles( &self, conn: &mut SqliteConnection, - dst_type: MbtType, - src_type: MbtType, + src: MbtType, + dst: MbtType, ) -> MbtResult<()> { + query!("PRAGMA page_size = 512").execute(&mut *conn).await?; + query!("VACUUM").execute(&mut *conn).await?; + let path = self.src_mbtiles.filepath(); query!("ATTACH DATABASE ? AS sourceDb", path) .execute(&mut *conn) .await?; - query!("PRAGMA page_size = 512").execute(&mut *conn).await?; - query!("VACUUM").execute(&mut *conn).await?; - - if dst_type == src_type { + if src == dst { // DB objects must be created in a specific order: tables, views, triggers, indexes. - for row in query( + let sql_objects = query( "SELECT sql FROM sourceDb.sqlite_schema WHERE tbl_name IN ('metadata', 'tiles', 'map', 'images', 'tiles_with_hash') @@ -273,19 +271,20 @@ impl TileCopier { ELSE 5 END", ) .fetch_all(&mut *conn) - .await? - { + .await?; + + for row in sql_objects { query(row.get(0)).execute(&mut *conn).await?; } } else { - match dst_type { + match dst { Flat => self.create_flat_tables(&mut *conn).await?, FlatWithHash => self.create_flat_with_hash_tables(&mut *conn).await?, Normalized => self.create_normalized_tables(&mut *conn).await?, }; }; - if dst_type == Normalized { + if dst == Normalized { query( "CREATE VIEW tiles_with_hash AS SELECT @@ -372,7 +371,7 @@ impl TileCopier { }; format!( - "AND NOT EXISTS ( + "AND NOT EXISTS ( SELECT 1 FROM {main_table} WHERE @@ -381,7 +380,7 @@ impl TileCopier { AND {main_table}.tile_row = sourceDb.{main_table}.tile_row AND {main_table}.{tile_identifier} != sourceDb.{main_table}.{tile_identifier} )" - ) + ) }), } } @@ -456,6 +455,22 @@ impl TileCopier { } } +async fn attach_source_db(conn: &mut SqliteConnection, src: &Mbtiles) -> MbtResult<()> { + let path = src.filepath(); + query!("ATTACH DATABASE ? AS sourceDb", path) + .execute(&mut *conn) + .await?; + Ok(()) +} + +async fn attach_other_db(conn: &mut SqliteConnection, other: &Mbtiles) -> MbtResult<()> { + let path = other.filepath(); + query!("ATTACH DATABASE ? AS otherDb", path) + .execute(&mut *conn) + .await?; + Ok(()) +} + async fn open_and_detect_type(mbtiles: &Mbtiles) -> MbtResult { let opt = SqliteConnectOptions::new() .read_only(true)