Skip to content

Commit

Permalink
Merge branch 'main' into staging
Browse files Browse the repository at this point in the history
  • Loading branch information
Rub21 committed Dec 30, 2024
2 parents 8b42cb7 + 46f7f10 commit ef3f082
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 59 deletions.
82 changes: 41 additions & 41 deletions images/tiler-imposm/materialized_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@
import subprocess
import time

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

DB_CONFIG = {
"dbname": os.getenv("POSTGRES_DB"),
"user": os.getenv("POSTGRES_USER"),
"password": os.getenv("POSTGRES_PASSWORD"),
"host": os.getenv("POSTGRES_HOST"),
"port": int(os.getenv("POSTGRES_PORT", 5432))
"port": int(os.getenv("POSTGRES_PORT", 5432)),
}

REQUIRED_ENV_VARS = ["POSTGRES_DB", "POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_HOST"]
Expand All @@ -26,6 +23,7 @@

PSQL_CONN = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['dbname']}"

REFRESH_MATERIALIZED_VIEWS_TIME= int(os.getenv("REFRESH_MATERIALIZED_VIEWS_TIME", 300))
# ------------------------------------------------------------------------------
# HELPER FUNCTIONS
# ------------------------------------------------------------------------------
Expand All @@ -34,31 +32,25 @@ def execute_psql_query(query: str):
Executes an SQL query using the psql command-line tool.
"""
try:
result = subprocess.run(
["psql", PSQL_CONN, "-c", query],
text=True,
capture_output=True
)
result = subprocess.run(["psql", PSQL_CONN, "-c", query], text=True, capture_output=True)
if result.returncode != 0:
logger.error(f"Error executing the query: {result.stderr.strip()}")
else:
logger.info(f"Query executed successfully:\n{result.stdout.strip()}")
except Exception as e:
logger.error(f"Error executing the query with psql: {e}")


def object_exists(object_name: str) -> bool:
"""
Checks if a table or materialized view exists in the public schema.
"""
query = f"SELECT to_regclass('public.{object_name}');"
result = subprocess.run(
["psql", PSQL_CONN, "-t", "-c", query],
text=True,
capture_output=True
)
result = subprocess.run(["psql", PSQL_CONN, "-t", "-c", query], text=True, capture_output=True)
output = result.stdout.strip()
return output not in ("", "-")


def get_columns_of_table(table_name: str) -> list:
"""
Returns a list of columns for the given table in the 'public' schema.
Expand All @@ -70,29 +62,33 @@ def get_columns_of_table(table_name: str) -> list:
AND table_name = '{table_name}'
ORDER BY ordinal_position;
"""
result = subprocess.run(
["psql", PSQL_CONN, "-t", "-c", query],
text=True,
capture_output=True
)
result = subprocess.run(["psql", PSQL_CONN, "-t", "-c", query], text=True, capture_output=True)
if result.returncode != 0:
logger.error(f"Error retrieving columns for {table_name}: {result.stderr.strip()}")
return []
columns = [col.strip() for col in result.stdout.strip().split('\n') if col.strip()]

columns = [col.strip() for col in result.stdout.strip().split("\n") if col.strip()]
return columns

def combine_sql_filters(filter_a: str, filter_b: str) -> str:

def create_indexes_for_mview(mview_name: str, columns: list):
"""
Combines two SQL filters with an AND. Handles cases where one or both might be None/empty.
Creates indexes for 'osm_id' (B-Tree) and 'geometry' (GiST) in the specified materialized view,
only if those columns exist.
"""
if filter_a and filter_b:
return f"({filter_a}) AND ({filter_b})"
elif filter_a:
return filter_a
elif filter_b:
return filter_b
return ""
# Check if the columns exist
if "osm_id" in columns:
create_idx_osm = f"CREATE INDEX idx_{mview_name}_osm_id ON {mview_name} (osm_id);"
logger.info(f"Creating index for osm_id in {mview_name}")
execute_psql_query(create_idx_osm)

if "geometry" in columns:
create_idx_geom = (
f"CREATE INDEX idx_{mview_name}_geom ON {mview_name} USING GIST (geometry);"
)
logger.info(f"Creating index for geometry in {mview_name}")
execute_psql_query(create_idx_geom)


def create_materialized_view(
mview_name: str,
Expand Down Expand Up @@ -132,11 +128,16 @@ def create_materialized_view(
logger.info(f"Creating materialized view: {mview_name}")
logger.info(f"{create_query}")
execute_psql_query(create_query)
create_indexes_for_mview(mview_name, columns)
else:
logger.info(f"Materialized view {mview_name} already exists and force_recreate=False. Skipping CREATE.")
logger.info(
f"Materialized view {mview_name} already exists and force_recreate=False. Skipping CREATE."
)
else:
logger.info(f"Materialized view {mview_name} does not exist. Creating it...")
execute_psql_query(create_query)
create_indexes_for_mview(mview_name, columns)


# ------------------------------------------------------------------------------
# CREATION/UPDATE OF VIEWS FROM LOADED CONFIG (NO RELOADING THE JSON)
Expand All @@ -154,7 +155,7 @@ def apply_materialized_views(config_dict: dict, force_recreate: bool = True):
for gtable_name, gtable_info in generalized_tables.items():

imposm_base_table = f"osm_{gtable_name}"
logger.info("-"*80)
logger.info("-" * 80)
logger.info(f"Imposm base table name: {imposm_base_table}")

materialized_views = gtable_info.get("materialized_views", [])
Expand All @@ -171,7 +172,7 @@ def apply_materialized_views(config_dict: dict, force_recreate: bool = True):
mview_name = "osm_" + mt_view.get("view")
geometry_transform = mt_view.get("geometry_transform", "geometry")
sql_filter = mt_view.get("sql_filter", "")
logger.info("-"*40)
logger.info("-" * 40)
logger.info(f"Processing view {mview_name} | {geometry_transform} | {sql_filter}")

logger.info(f" -> Creating/Updating materialized view: {mview_name}")
Expand All @@ -182,9 +183,10 @@ def apply_materialized_views(config_dict: dict, force_recreate: bool = True):
columns=columns,
geometry_transform=geometry_transform,
sql_filter=sql_filter,
force_recreate=force_recreate
force_recreate=force_recreate,
)


# ------------------------------------------------------------------------------
# REFRESH OF ALL VIEWS
# ------------------------------------------------------------------------------
Expand All @@ -201,21 +203,19 @@ def refresh_all_materialized_views(config_dict: dict):
continue
mview_name = "osm_" + mt_view.get("view")
if object_exists(mview_name):
logger.info("-"*40)
logger.info(f"Refreshing materialized view: {mview_name}")
query = f"REFRESH MATERIALIZED VIEW {mview_name};"
logger.info(query)
execute_psql_query(query)
logger.info(f"Refresh completed for {mview_name}")
else:
logger.warning(f"Materialized view {mview_name} not found. Skipping refresh.")


# ------------------------------------------------------------------------------
# MAIN: LOADS CONFIG ONCE, CREATES/UPDATES VIEWS, THEN REFRESHES THEM IN A LOOP
# ------------------------------------------------------------------------------
def main():
config_path = "config/imposm3.json"

# 1) Load the config
logger.info(f"Loading configuration from: {config_path}")
with open(config_path, "r") as f:
Expand All @@ -231,8 +231,8 @@ def main():
logger.info("Refreshing all materialized views...")
refresh_all_materialized_views(config_dict)
logger.info("All materialized views refreshed. Sleeping 60 seconds...")
time.sleep(30)
time.sleep(REFRESH_MATERIALIZED_VIEWS_TIME)


if __name__ == "__main__":
main()

12 changes: 6 additions & 6 deletions images/tiler-imposm/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,13 @@ function updateData() {
echo "Create views"
python materialized_views.py &

local s3_last_state_path="${AWS_S3_BUCKET}/${BUCKET_IMPOSM_FOLDER}/last.state.txt"
# local s3_last_state_path="${AWS_S3_BUCKET}/${BUCKET_IMPOSM_FOLDER}/last.state.txt"
local local_last_state_path="$DIFF_DIR/last.state.txt"
echo "Checking if $s3_last_state_path exists in S3..."
if aws s3 ls "$s3_last_state_path" > /dev/null 2>&1; then
echo "Found $s3_last_state_path. Downloading..."
aws s3 cp "$s3_last_state_path" "$local_last_state_path"
fi
# echo "Checking if $s3_last_state_path exists in S3..."
# if aws s3 ls "$s3_last_state_path" > /dev/null 2>&1; then
# echo "Found $s3_last_state_path. Downloading..."
# aws s3 cp "$s3_last_state_path" "$local_last_state_path"
# fi

### Update the DB with the new data from minute replication
if [ "$OVERWRITE_STATE" = "true" ]; then
Expand Down
24 changes: 12 additions & 12 deletions values.production.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,14 @@ osm-seed:
tilerDb:
enabled: true
useExternalHost: # When we are using useExternalHost.enabled= true other variables are giong to be disable ans use the external host config
enabled: true
enabled: false
nodeSelector:
enabled: true
label_key: nodegroup_type
label_value: api_db
env:
POSTGRES_HOST: {{PRODUCTION_TILER_DB_HOST}}
POSTGRES_DB: tiler_osm_prod_v4
POSTGRES_DB: tiler_osm_prod_v6
POSTGRES_USER: postgres
POSTGRES_PASSWORD: {{PRODUCTION_TILER_DB_PASSWORD}}
POSTGRES_PORT: 5432
Expand All @@ -373,7 +373,7 @@ osm-seed:
mountPath: /var/lib/postgresql/data
subPath: postgresql-d
# In case cloudProvider: aws
AWS_ElasticBlockStore_volumeID: vol-07b5a7a8e85a6caee
AWS_ElasticBlockStore_volumeID: vol-07773a4c5de68397f
AWS_ElasticBlockStore_size: 200Gi
resources:
enabled: true
Expand Down Expand Up @@ -455,19 +455,19 @@ osm-seed:
label_value: web_large
env:
TILER_IMPORT_FROM: osm
TILER_IMPORT_PBF_URL: https://s3.amazonaws.com/planet.openhistoricalmap.org/planet/planet-241214_1202.osm.pbf
TILER_IMPORT_PBF_URL: https://s3.amazonaws.com/planet.openhistoricalmap.org/planet/planet-241228_1119.osm.pbf
REPLICATION_URL: http://s3.amazonaws.com/planet.openhistoricalmap.org/replication/minute/
SEQUENCE_NUMBER: '1677612'
SEQUENCE_NUMBER: "1682000"
OVERWRITE_STATE: false
UPLOAD_EXPIRED_FILES: true
IMPORT_NATURAL_EARTH: true
IMPORT_OSM_LAND: true
persistenceDisk:
enabled: false
enabled: true
accessMode: ReadWriteOnce
mountPath: /mnt/data
# In case cloudProvider: aws
AWS_ElasticBlockStore_volumeID: vol-05d06ac388569461f
AWS_ElasticBlockStore_volumeID: vol-0f13ca01ffd4a14d2
AWS_ElasticBlockStore_size: 50Gi
resources:
enabled: true
Expand Down Expand Up @@ -929,20 +929,20 @@ ohm:
env:
REGION_NAME: us-east-1
NAMESPACE: default # Namespace to run the job
DOCKER_IMAGE: ghcr.io/openhistoricalmap/tiler-server:0.0.1-0.dev.git.1964.h8703c77 # TODO, this should be automatically updated from tiler server image
DOCKER_IMAGE: ghcr.io/openhistoricalmap/tiler-server:0.0.1-0.dev.git.1967.h8492956 # TODO, this should be automatically updated from tiler server image
SQS_QUEUE_URL: {{PRODUCTION_SQS_QUEUE_URL}}
NODEGROUP_TYPE: web_large # Nodegroup type to run the purge and seed job
# Maximum number of active jobs in high concurrency queue
MAX_ACTIVE_JOBS: 10
DELETE_OLD_JOBS_AGE: 3600 # 1 hours
## Execute purging
EXECUTE_PURGE: true
PURGE_CONCURRENCY: 128
PURGE_CONCURRENCY: 64
PURGE_MIN_ZOOM: 3
PURGE_MAX_ZOOM: 12 # Purging zoom 13,14,15,16,17,18,19,20 takes hours to complete,we are going to remove direct from s3 the tiles for zoom 19-20
PURGE_MAX_ZOOM: 12 # Purging zoom 15,16,17,18,19,20 takes hours to complete,we are going to remove direct from s3 the tiles for zoom 19-20
## Execute seeding
EXECUTE_SEED: true
SEED_CONCURRENCY: 128
SEED_CONCURRENCY: 64
SEED_MIN_ZOOM: 0
SEED_MAX_ZOOM: 12
## Remove tiles from s3 for zoom levels
Expand All @@ -969,7 +969,7 @@ ohm:
env:
GEOJSON_URL: https://osmseed-dev.s3.us-east-1.amazonaws.com/tiler/usa-eu.geojson
ZOOM_LEVELS: '8,9,10'
CONCURRENCY: 256
CONCURRENCY: 64
S3_BUCKET: osmseed-dev
OUTPUT_FILE: /logs/tiler_benchmark.log
resources:
Expand Down

0 comments on commit ef3f082

Please sign in to comment.