Skip to content

Commit

Permalink
Implement tile caching
Browse files Browse the repository at this point in the history
  • Loading branch information
nyurik committed Dec 25, 2023
1 parent f184cb0 commit 3ae9301
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 54 deletions.
4 changes: 2 additions & 2 deletions martin/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ impl Source for NullSource {

async fn get_tile(
&self,
_xyz: &TileCoord,
_query: &Option<UrlQuery>,
_xyz: TileCoord,
_url_query: Option<&UrlQuery>,
) -> MartinResult<TileData> {
Ok(Vec::new())
}
Expand Down
6 changes: 6 additions & 0 deletions martin/src/args/srv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ pub struct SrvArgs {
/// Number of web server workers
#[arg(short = 'W', long)]
pub workers: Option<usize>,
/// Cache size (in MB) for the tile cache
#[arg(short = 'c', long)]
pub cache_size: Option<u64>,
}

impl SrvArgs {
Expand All @@ -24,5 +27,8 @@ impl SrvArgs {
if self.workers.is_some() {
srv_config.worker_processes = self.workers;
}
if self.cache_size.is_some() {
srv_config.cache_size_mb = self.cache_size;
}
}
}
3 changes: 2 additions & 1 deletion martin/src/bin/martin-cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ async fn run_tile_copy(args: CopyArgs, state: ServerState) -> MartinCpResult<()>
.try_for_each_concurrent(concurrency, |xyz| {
let tx = tx.clone();
async move {
let tile = get_tile_content(sources, info, &xyz, query, encodings).await?;
let tile =
get_tile_content(sources, info, xyz, query, encodings, None).await?;
let data = tile.data;
tx.send(TileXyz { xyz, data })
.await
Expand Down
4 changes: 2 additions & 2 deletions martin/src/mbtiles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ impl Source for MbtSource {

async fn get_tile(
&self,
xyz: &TileCoord,
_url_query: &Option<UrlQuery>,
xyz: TileCoord,
_url_query: Option<&UrlQuery>,
) -> MartinResult<TileData> {
if let Some(tile) = self
.mbtiles
Expand Down
14 changes: 7 additions & 7 deletions martin/src/pg/configurator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl PgBuilder {
continue;
}
Ok((id, pg_sql, src_inf)) => {
debug!("{id} query: {}", pg_sql.query);
debug!("{id} query: {}", pg_sql.sql_query);
self.add_func_src(&mut res, id.clone(), &src_inf, pg_sql.clone());
info_map.insert(id, src_inf);
}
Expand Down Expand Up @@ -252,7 +252,7 @@ impl PgBuilder {
warn_on_rename(id, &id2, "Function");
let signature = &pg_sql.signature;
info!("Configured {dup}source {id2} from the function {signature}");
debug!("{id2} query: {}", pg_sql.query);
debug!("{id2} query: {}", pg_sql.sql_query);
info_map.insert(id2, merged_inf);
}

Expand Down Expand Up @@ -285,7 +285,7 @@ impl PgBuilder {
let id2 = self.resolve_id(&source_id, &db_inf);
self.add_func_src(&mut res, id2.clone(), &db_inf, pg_sql.clone());
info!("Discovered source {id2} from function {}", pg_sql.signature);
debug!("{id2} query: {}", pg_sql.query);
debug!("{id2} query: {}", pg_sql.sql_query);
info_map.insert(id2, db_inf);
}
}
Expand All @@ -302,11 +302,11 @@ impl PgBuilder {
&self,
sources: &mut TileInfoSources,
id: String,
info: &impl PgInfo,
sql: PgSqlInfo,
pg_info: &impl PgInfo,
sql_info: PgSqlInfo,
) {
let tilejson = info.to_tilejson(id.clone());
let source = PgSource::new(id, sql, tilejson, self.pool.clone());
let tilejson = pg_info.to_tilejson(id.clone());
let source = PgSource::new(id, sql_info, tilejson, self.pool.clone());
sources.push(Box::new(source));
}
}
Expand Down
4 changes: 2 additions & 2 deletions martin/src/pg/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,6 @@ pub enum PgError {
#[error(r#"Unable to get tile {2:#} from {1}: {0}"#)]
GetTileError(#[source] TokioPgError, String, TileCoord),

#[error(r#"Unable to get tile {2:#} with {:?} params from {1}: {0}"#, query_to_json(.3))]
GetTileWithQueryError(#[source] TokioPgError, String, TileCoord, UrlQuery),
#[error(r#"Unable to get tile {2:#} with {:?} params from {1}: {0}"#, query_to_json(.3.as_ref()))]
GetTileWithQueryError(#[source] TokioPgError, String, TileCoord, Option<UrlQuery>),
}
27 changes: 11 additions & 16 deletions martin/src/pg/pg_source.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::HashMap;

use async_trait::async_trait;
use deadpool_postgres::tokio_postgres::types::{ToSql, Type};
use log::debug;
Expand Down Expand Up @@ -58,35 +56,32 @@ impl Source for PgSource {

async fn get_tile(
&self,
xyz: &TileCoord,
url_query: &Option<UrlQuery>,
xyz: TileCoord,
url_query: Option<&UrlQuery>,
) -> MartinResult<TileData> {
let empty_query = HashMap::new();
let url_query = url_query.as_ref().unwrap_or(&empty_query);
let conn = self.pool.get().await?;

let param_types: &[Type] = if self.support_url_query() {
&[Type::INT2, Type::INT8, Type::INT8, Type::JSON]
} else {
&[Type::INT2, Type::INT8, Type::INT8]
};

let query = &self.info.query;
let sql = &self.info.sql_query;
let prep_query = conn
.prepare_typed_cached(query, param_types)
.prepare_typed_cached(sql, param_types)
.await
.map_err(|e| {
PrepareQueryError(
e,
self.id.to_string(),
self.info.signature.to_string(),
self.info.query.to_string(),
self.info.sql_query.to_string(),
)
})?;

let tile = if self.support_url_query() {
let json = query_to_json(url_query);
debug!("SQL: {query} [{xyz}, {json:?}]");
debug!("SQL: {sql} [{xyz}, {json:?}]");
let params: &[&(dyn ToSql + Sync)] = &[
&i16::from(xyz.z),
&i64::from(xyz.x),
Expand All @@ -95,7 +90,7 @@ impl Source for PgSource {
];
conn.query_opt(&prep_query, params).await
} else {
debug!("SQL: {query} [{xyz}]");
debug!("SQL: {sql} [{xyz}]");
conn.query_opt(
&prep_query,
&[&i16::from(xyz.z), &i64::from(xyz.x), &i64::from(xyz.y)],
Expand All @@ -107,9 +102,9 @@ impl Source for PgSource {
.map(|row| row.and_then(|r| r.get::<_, Option<TileData>>(0)))
.map_err(|e| {
if self.support_url_query() {
GetTileWithQueryError(e, self.id.to_string(), *xyz, url_query.clone())
GetTileWithQueryError(e, self.id.to_string(), xyz, url_query.cloned())
} else {
GetTileError(e, self.id.to_string(), *xyz)
GetTileError(e, self.id.to_string(), xyz)
}
})?
.unwrap_or_default();
Expand All @@ -120,7 +115,7 @@ impl Source for PgSource {

#[derive(Clone, Debug)]
pub struct PgSqlInfo {
pub query: String,
pub sql_query: String,
pub use_url_query: bool,
pub signature: String,
}
Expand All @@ -129,7 +124,7 @@ impl PgSqlInfo {
#[must_use]
pub fn new(query: String, has_query_params: bool, signature: String) -> Self {
Self {
query,
sql_query: query,
use_url_query: has_query_params,
signature,
}
Expand Down
12 changes: 7 additions & 5 deletions martin/src/pg/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,15 @@ pub fn patch_json(target: TileJSON, patch: &Option<serde_json::Value>) -> TileJS
}

#[must_use]
pub fn query_to_json(query: &UrlQuery) -> Json<HashMap<String, serde_json::Value>> {
pub fn query_to_json(query: Option<&UrlQuery>) -> Json<HashMap<String, serde_json::Value>> {
let mut query_as_json = HashMap::new();
for (k, v) in query {
let json_value: serde_json::Value =
serde_json::from_str(v).unwrap_or_else(|_| serde_json::Value::String(v.clone()));
if let Some(query) = query {
for (k, v) in query {
let json_value: serde_json::Value =
serde_json::from_str(v).unwrap_or_else(|_| serde_json::Value::String(v.clone()));

query_as_json.insert(k.clone(), json_value);
query_as_json.insert(k.clone(), json_value);
}
}

Json(query_as_json)
Expand Down
4 changes: 2 additions & 2 deletions martin/src/pmtiles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ macro_rules! impl_pmtiles_source {

async fn get_tile(
&self,
xyz: &TileCoord,
_url_query: &Option<UrlQuery>,
xyz: TileCoord,
_url_query: Option<&UrlQuery>,
) -> MartinResult<TileData> {
// TODO: optimize to return Bytes
if let Some(t) = self
Expand Down
6 changes: 5 additions & 1 deletion martin/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ pub trait Source: Send + Debug {
false
}

async fn get_tile(&self, xyz: &TileCoord, query: &Option<UrlQuery>) -> MartinResult<TileData>;
async fn get_tile(
&self,
xyz: TileCoord,
url_query: Option<&UrlQuery>,
) -> MartinResult<TileData>;

fn is_valid_zoom(&self, zoom: u8) -> bool {
let tj = self.get_tilejson();
Expand Down
3 changes: 3 additions & 0 deletions martin/src/srv/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub struct SrvConfig {
pub keep_alive: Option<u64>,
pub listen_addresses: Option<String>,
pub worker_processes: Option<usize>,
pub cache_size_mb: Option<u64>,
}

#[cfg(test)]
Expand All @@ -25,12 +26,14 @@ mod tests {
keep_alive: 75
listen_addresses: '0.0.0.0:3000'
worker_processes: 8
tile_cache_size_mb: 100
"})
.unwrap(),
SrvConfig {
keep_alive: Some(75),
listen_addresses: some("0.0.0.0:3000"),
worker_processes: Some(8),
cache_size_mb: Some(100),
}
);
}
Expand Down
Loading

0 comments on commit 3ae9301

Please sign in to comment.