Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nyurik committed Sep 28, 2023
1 parent 9a0c3f3 commit ad8bba5
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 75 deletions.
108 changes: 59 additions & 49 deletions martin-mbtiles/src/mbtiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@ use serde::ser::SerializeStruct;
use serde::Serialize;
use serde_json::{Value as JSONValue, Value};
use sqlite_hashes::register_md5_function;
use sqlite_hashes::rusqlite::{
Connection as RusqliteConnection, Connection, OpenFlags, OptionalExtension,
};
use sqlx::sqlite::SqliteRow;
use sqlx::{query, Row, SqliteExecutor};
use sqlx::sqlite::{SqliteConnectOptions, SqliteRow};
use sqlx::{query, Connection as _, Row, SqliteConnection, SqliteExecutor};
use tilejson::{tilejson, Bounds, Center, TileJSON};

use crate::errors::{MbtError, MbtResult};
Expand Down Expand Up @@ -96,6 +93,24 @@ impl Mbtiles {
})
}

pub async fn open_with_hashes(&self, readonly: bool) -> MbtResult<SqliteConnection> {
let opt = SqliteConnectOptions::new()
.filename(self.filepath())
.read_only(readonly);
let mut conn = SqliteConnection::connect_with(&opt).await?;
self.attach_hash_fn(&mut conn).await?;
Ok(conn)
}

async fn attach_hash_fn(&self, conn: &mut SqliteConnection) -> MbtResult<()> {
let handle = conn.lock_handle().await?.as_raw_handle().as_ptr();
// Safety: we know that the handle is a SQLite connection is locked and is not used anywhere else.
// The registered functions will be dropped when SQLX drops DB connection.
let rc = unsafe { sqlite_hashes::rusqlite::Connection::from_handle(handle) }?;
register_md5_function(&rc)?;
Ok(())
}

#[must_use]
pub fn filepath(&self) -> &str {
&self.filepath
Expand Down Expand Up @@ -420,41 +435,6 @@ impl Mbtiles {
Err(MbtError::NoUniquenessConstraint(self.filepath.clone()))
}

/// Compute the hash of the combined tiles in the mbtiles file tiles table/view.
/// This should work on all mbtiles files perf `MBTiles` specification.
fn calc_agg_tiles_hash(&self) -> MbtResult<String> {
Ok(self.open_with_hashes(true)?.query_row_and_then(
// The md5_concat func will return NULL if there are no rows in the tiles table.
// For our use case, we will treat it as an empty string, and hash that.
"SELECT hex(
coalesce(
md5_concat(
cast(zoom_level AS text),
cast(tile_column AS text),
cast(tile_row AS text),
tile_data
),
md5('')
)
)
FROM tiles
ORDER BY zoom_level, tile_column, tile_row;",
[],
|row| row.get(0),
)?)
}

pub(crate) fn open_with_hashes(&self, is_readonly: bool) -> MbtResult<Connection> {
let flags = if is_readonly {
OpenFlags::SQLITE_OPEN_READ_ONLY
} else {
OpenFlags::default()
};
let rusqlite_conn = RusqliteConnection::open_with_flags(self.filepath(), flags)?;
register_md5_function(&rusqlite_conn)?;
Ok(rusqlite_conn)
}

/// Perform `SQLite` internal integrity check
pub async fn check_integrity<T>(
&self,
Expand Down Expand Up @@ -499,7 +479,8 @@ impl Mbtiles {
return Err(AggHashValueNotFound(self.filepath().to_string()));
};

let computed = self.calc_agg_tiles_hash()?;
// let conn = self.open_with_hashes(true)?;
let computed = calc_agg_tiles_hash(&mut *conn).await?;
if stored != computed {
let file = self.filepath().to_string();
return Err(AggHashMismatch(computed, stored, file));
Expand All @@ -509,12 +490,12 @@ impl Mbtiles {
}

/// Compute new aggregate tiles hash and save it to the metadata table (if needed)
pub async fn update_agg_tiles_hash<T>(&self, conn: &mut T) -> MbtResult<()>
pub(crate) async fn update_agg_tiles_hash<T>(&self, conn: &mut T) -> MbtResult<()>
where
for<'e> &'e mut T: SqliteExecutor<'e>,
{
let old_hash = self.get_agg_tiles_hash(&mut *conn).await?;
let hash = self.calc_agg_tiles_hash()?;
let hash = calc_agg_tiles_hash(&mut *conn).await?;
if old_hash.as_ref() == Some(&hash) {
info!(
"agg_tiles_hash is already set to the correct value `{hash}` in {}",
Expand Down Expand Up @@ -570,21 +551,50 @@ impl Mbtiles {
}
};

self.open_with_hashes(true)?
.query_row_and_then(sql, [], |r| Ok((r.get(0)?, r.get(1)?)))
.optional()?
.map_or(Ok(()), |v: (String, String)| {
Err(IncorrectTileHash(self.filepath().to_string(), v.0, v.1))
query(sql)
.fetch_optional(&mut *conn)
.await?
.map_or(Ok(()), |v| {
Err(IncorrectTileHash(
self.filepath().to_string(),
v.get(0),
v.get(1),
))
})
}
}

/// Compute the hash of the combined tiles in the mbtiles file tiles table/view.
/// This should work on all mbtiles files perf `MBTiles` specification.
async fn calc_agg_tiles_hash<T>(conn: &mut T) -> MbtResult<String>
where
for<'e> &'e mut T: SqliteExecutor<'e>,
{
let query = query(
// The md5_concat func will return NULL if there are no rows in the tiles table.
// For our use case, we will treat it as an empty string, and hash that.
"SELECT hex(
coalesce(
md5_concat(
cast(zoom_level AS text),
cast(tile_column AS text),
cast(tile_row AS text),
tile_data
),
md5('')
)
)
FROM tiles
ORDER BY zoom_level, tile_column, tile_row;",
);
return Ok(query.fetch_one(conn).await?.get::<String, _>(0));
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use martin_tile_utils::Encoding;
use sqlx::{Connection, SqliteConnection};
use tilejson::VectorLayer;

use super::*;
Expand Down
67 changes: 41 additions & 26 deletions martin-mbtiles/src/tile_copier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,30 +175,26 @@ impl TileCopier {

let dst_type = if is_empty {
let dst_type = self.options.dst_type.unwrap_or(src_type);
self.create_new_mbtiles(&mut conn, dst_type, src_type)
.await?;
self.init_new_mbtiles(&mut conn, src_type, dst_type).await?;
dst_type
} else if self.options.diff_with_file.is_some() {
return Err(MbtError::NonEmptyTargetFile(self.options.dst_file));
} else {
open_and_detect_type(&self.dst_mbtiles).await?
self.dst_mbtiles.detect_type(&mut conn).await?
};

let rusqlite_conn = self.dst_mbtiles.open_with_hashes(false)?;
rusqlite_conn.execute(
"ATTACH DATABASE ? AS sourceDb",
[self.src_mbtiles.filepath()],
)?;

self.attach_source_db(&mut conn, &self.src_mbtiles).await?;
let (on_dupl, sql_cond) = self.get_on_duplicate_sql(dst_type);

let (select_from, query_args) = {
let select_from = if let Some(diff_file) = &self.options.diff_with_file {
let diff_with_mbtiles = Mbtiles::new(diff_file)?;
let diff_type = open_and_detect_type(&diff_with_mbtiles).await?;

rusqlite_conn
.execute("ATTACH DATABASE ? AS newDb", [diff_with_mbtiles.filepath()])?;
let path = diff_with_mbtiles.filepath();
query!("ATTACH DATABASE ? AS newDb", path)
.execute(&mut *conn)
.await?;

Self::get_select_from_with_diff(dst_type, diff_type)
} else {
Expand Down Expand Up @@ -238,29 +234,31 @@ impl TileCopier {
};

if !self.options.skip_agg_tiles_hash {
self.dst_mbtiles.update_agg_tiles_hash(&mut conn).await?;
self.dst_mbtiles
.update_agg_tiles_hash_conn(&mut conn, &rusqlite_conn)
.await?;
}

Ok(conn)
}

async fn create_new_mbtiles(
async fn init_new_mbtiles(
&self,
conn: &mut SqliteConnection,
dst_type: MbtType,
src_type: MbtType,
src: MbtType,
dst: MbtType,
) -> MbtResult<()> {
query!("PRAGMA page_size = 512").execute(&mut *conn).await?;
query!("VACUUM").execute(&mut *conn).await?;

let path = self.src_mbtiles.filepath();
query!("ATTACH DATABASE ? AS sourceDb", path)
.execute(&mut *conn)
.await?;

query!("PRAGMA page_size = 512").execute(&mut *conn).await?;
query!("VACUUM").execute(&mut *conn).await?;

if dst_type == src_type {
if src == dst {
// DB objects must be created in a specific order: tables, views, triggers, indexes.
for row in query(
let sql_objects = query(
"SELECT sql
FROM sourceDb.sqlite_schema
WHERE tbl_name IN ('metadata', 'tiles', 'map', 'images', 'tiles_with_hash')
Expand All @@ -273,19 +271,20 @@ impl TileCopier {
ELSE 5 END",
)
.fetch_all(&mut *conn)
.await?
{
.await?;

for row in sql_objects {
query(row.get(0)).execute(&mut *conn).await?;
}
} else {
match dst_type {
match dst {
Flat => self.create_flat_tables(&mut *conn).await?,
FlatWithHash => self.create_flat_with_hash_tables(&mut *conn).await?,
Normalized => self.create_normalized_tables(&mut *conn).await?,
};
};

if dst_type == Normalized {
if dst == Normalized {
query(
"CREATE VIEW tiles_with_hash AS
SELECT
Expand Down Expand Up @@ -372,7 +371,7 @@ impl TileCopier {
};

format!(
"AND NOT EXISTS (
"AND NOT EXISTS (
SELECT 1
FROM {main_table}
WHERE
Expand All @@ -381,7 +380,7 @@ impl TileCopier {
AND {main_table}.tile_row = sourceDb.{main_table}.tile_row
AND {main_table}.{tile_identifier} != sourceDb.{main_table}.{tile_identifier}
)"
)
)
}),
}
}
Expand Down Expand Up @@ -456,6 +455,22 @@ impl TileCopier {
}
}

async fn attach_source_db<T>(conn: &mut SqliteConnection, src: &Mbtiles) -> MbtResult<()> {
let path = src.filepath();
query!("ATTACH DATABASE ? AS sourceDb", path)
.execute(&mut *conn)
.await?;
Ok(())
}

async fn attach_other_db<T>(conn: &mut SqliteConnection, other: &Mbtiles) -> MbtResult<()> {
let path = other.filepath();
query!("ATTACH DATABASE ? AS otherDb", path)
.execute(&mut *conn)
.await?;
Ok(())
}

async fn open_and_detect_type(mbtiles: &Mbtiles) -> MbtResult<MbtType> {
let opt = SqliteConnectOptions::new()
.read_only(true)
Expand Down

0 comments on commit ad8bba5

Please sign in to comment.