Skip to content

Commit

Permalink
Merge pull request #449 from OpenHistoricalMap/images/tiler
Browse files Browse the repository at this point in the history
Update script for materialized views to work with indexes
  • Loading branch information
Rub21 authored Dec 27, 2024
2 parents e10dd0b + cad2541 commit 928ccef
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 40 deletions.
77 changes: 40 additions & 37 deletions images/tiler-imposm/materialized_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@
import subprocess
import time

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

DB_CONFIG = {
"dbname": os.getenv("POSTGRES_DB"),
"user": os.getenv("POSTGRES_USER"),
"password": os.getenv("POSTGRES_PASSWORD"),
"host": os.getenv("POSTGRES_HOST"),
"port": int(os.getenv("POSTGRES_PORT", 5432))
"port": int(os.getenv("POSTGRES_PORT", 5432)),
}

REQUIRED_ENV_VARS = ["POSTGRES_DB", "POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_HOST"]
Expand All @@ -26,6 +23,7 @@

PSQL_CONN = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['dbname']}"


# ------------------------------------------------------------------------------
# HELPER FUNCTIONS
# ------------------------------------------------------------------------------
Expand All @@ -34,31 +32,25 @@ def execute_psql_query(query: str):
Executes an SQL query using the psql command-line tool.
"""
try:
result = subprocess.run(
["psql", PSQL_CONN, "-c", query],
text=True,
capture_output=True
)
result = subprocess.run(["psql", PSQL_CONN, "-c", query], text=True, capture_output=True)
if result.returncode != 0:
logger.error(f"Error executing the query: {result.stderr.strip()}")
else:
logger.info(f"Query executed successfully:\n{result.stdout.strip()}")
except Exception as e:
logger.error(f"Error executing the query with psql: {e}")


def object_exists(object_name: str) -> bool:
"""
Checks if a table or materialized view exists in the public schema.
"""
query = f"SELECT to_regclass('public.{object_name}');"
result = subprocess.run(
["psql", PSQL_CONN, "-t", "-c", query],
text=True,
capture_output=True
)
result = subprocess.run(["psql", PSQL_CONN, "-t", "-c", query], text=True, capture_output=True)
output = result.stdout.strip()
return output not in ("", "-")


def get_columns_of_table(table_name: str) -> list:
"""
Returns a list of columns for the given table in the 'public' schema.
Expand All @@ -70,29 +62,33 @@ def get_columns_of_table(table_name: str) -> list:
AND table_name = '{table_name}'
ORDER BY ordinal_position;
"""
result = subprocess.run(
["psql", PSQL_CONN, "-t", "-c", query],
text=True,
capture_output=True
)
result = subprocess.run(["psql", PSQL_CONN, "-t", "-c", query], text=True, capture_output=True)
if result.returncode != 0:
logger.error(f"Error retrieving columns for {table_name}: {result.stderr.strip()}")
return []
columns = [col.strip() for col in result.stdout.strip().split('\n') if col.strip()]

columns = [col.strip() for col in result.stdout.strip().split("\n") if col.strip()]
return columns

def combine_sql_filters(filter_a: str, filter_b: str) -> str:

def create_indexes_for_mview(mview_name: str, columns: list):
"""
Combines two SQL filters with an AND. Handles cases where one or both might be None/empty.
Creates indexes for 'osm_id' (B-Tree) and 'geometry' (GiST) in the specified materialized view,
only if those columns exist.
"""
if filter_a and filter_b:
return f"({filter_a}) AND ({filter_b})"
elif filter_a:
return filter_a
elif filter_b:
return filter_b
return ""
# Check if the columns exist
if "osm_id" in columns:
create_idx_osm = f"CREATE INDEX idx_{mview_name}_osm_id ON {mview_name} (osm_id);"
logger.info(f"Creating index for osm_id in {mview_name}")
execute_psql_query(create_idx_osm)

if "geometry" in columns:
create_idx_geom = (
f"CREATE INDEX idx_{mview_name}_geom ON {mview_name} USING GIST (geometry);"
)
logger.info(f"Creating index for geometry in {mview_name}")
execute_psql_query(create_idx_geom)


def create_materialized_view(
mview_name: str,
Expand Down Expand Up @@ -132,11 +128,16 @@ def create_materialized_view(
logger.info(f"Creating materialized view: {mview_name}")
logger.info(f"{create_query}")
execute_psql_query(create_query)
create_indexes_for_mview(mview_name, columns)
else:
logger.info(f"Materialized view {mview_name} already exists and force_recreate=False. Skipping CREATE.")
logger.info(
f"Materialized view {mview_name} already exists and force_recreate=False. Skipping CREATE."
)
else:
logger.info(f"Materialized view {mview_name} does not exist. Creating it...")
execute_psql_query(create_query)
create_indexes_for_mview(mview_name, columns)


# ------------------------------------------------------------------------------
# CREATION/UPDATE OF VIEWS FROM LOADED CONFIG (NO RELOADING THE JSON)
Expand All @@ -154,7 +155,7 @@ def apply_materialized_views(config_dict: dict, force_recreate: bool = True):
for gtable_name, gtable_info in generalized_tables.items():

imposm_base_table = f"osm_{gtable_name}"
logger.info("-"*80)
logger.info("-" * 80)
logger.info(f"Imposm base table name: {imposm_base_table}")

materialized_views = gtable_info.get("materialized_views", [])
Expand All @@ -171,7 +172,7 @@ def apply_materialized_views(config_dict: dict, force_recreate: bool = True):
mview_name = "osm_" + mt_view.get("view")
geometry_transform = mt_view.get("geometry_transform", "geometry")
sql_filter = mt_view.get("sql_filter", "")
logger.info("-"*40)
logger.info("-" * 40)
logger.info(f"Processing view {mview_name} | {geometry_transform} | {sql_filter}")

logger.info(f" -> Creating/Updating materialized view: {mview_name}")
Expand All @@ -182,9 +183,10 @@ def apply_materialized_views(config_dict: dict, force_recreate: bool = True):
columns=columns,
geometry_transform=geometry_transform,
sql_filter=sql_filter,
force_recreate=force_recreate
force_recreate=force_recreate,
)


# ------------------------------------------------------------------------------
# REFRESH OF ALL VIEWS
# ------------------------------------------------------------------------------
Expand All @@ -207,12 +209,13 @@ def refresh_all_materialized_views(config_dict: dict):
else:
logger.warning(f"Materialized view {mview_name} not found. Skipping refresh.")


# ------------------------------------------------------------------------------
# MAIN: LOADS CONFIG ONCE, CREATES/UPDATES VIEWS, THEN REFRESHES THEM IN A LOOP
# ------------------------------------------------------------------------------
def main():
config_path = "config/imposm3.json"

# 1) Load the config
logger.info(f"Loading configuration from: {config_path}")
with open(config_path, "r") as f:
Expand All @@ -230,6 +233,6 @@ def main():
logger.info("All materialized views refreshed. Sleeping 60 seconds...")
time.sleep(60)


if __name__ == "__main__":
main()

6 changes: 3 additions & 3 deletions values.production.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -939,14 +939,14 @@ ohm:
EXECUTE_PURGE: true
PURGE_CONCURRENCY: 128
PURGE_MIN_ZOOM: 3
PURGE_MAX_ZOOM: 12 # Purging zoom 13,14,15,16,17,18,19,20 takes hours to complete,we are going to remove direct from s3 the tiles for zoom 19-20
PURGE_MAX_ZOOM: 14 # Purging zoom 15,16,17,18,19,20 takes hours to complete,we are going to remove direct from s3 the tiles for zoom 19-20
## Execute seeding
EXECUTE_SEED: true
SEED_CONCURRENCY: 128
SEED_MIN_ZOOM: 0
SEED_MAX_ZOOM: 12
SEED_MAX_ZOOM: 14
## Remove tiles from s3 for zoom levels
ZOOM_LEVELS_TO_DELETE: 13,14,15,16,17,18,19,20
ZOOM_LEVELS_TO_DELETE: 15,16,17,18,19,20
S3_BUCKET_CACHE_TILER: tiler-cache-production
S3_BUCKET_PATH_FILES: mnt/data/osm
resources:
Expand Down

0 comments on commit 928ccef

Please sign in to comment.