Skip to content

Commit

Permalink
Remove triggers before apply transformation
Browse files Browse the repository at this point in the history
  • Loading branch information
Rub21 committed Dec 3, 2024
1 parent e1019be commit 37f39fa
Showing 1 changed file with 103 additions and 96 deletions.
199 changes: 103 additions & 96 deletions images/tiler-imposm/update_tables.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os
import json
import psycopg2
from psycopg2 import sql
import logging
import subprocess
import time

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
Expand All @@ -15,15 +15,16 @@
"port": int(os.getenv("POSTGRES_PORT", 5432))
}

# Verify that all required environment variables are defined
PSQL_CONN = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['dbname']}"

REQUIRED_ENV_VARS = ["POSTGRES_DB", "POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_HOST"]
for var in REQUIRED_ENV_VARS:
if not os.getenv(var):
logger.error(f"Environment variable {var} is not defined. Exiting.")
raise EnvironmentError(f"Environment variable {var} is not defined.")

def load_imposm_config(filepath):
"""Load the imposm3.json configuration file"""
"""Load the imposm3.json configuration file."""
logger.info(f"Loading configuration from {filepath}")
try:
with open(filepath, "r") as f:
Expand All @@ -35,117 +36,123 @@ def load_imposm_config(filepath):
logger.error(f"Error parsing JSON from {filepath}.")
raise

def apply_geometry_transformations(conn, generalized_tables):
"""Apply geometry transformations to the specified tables"""
def execute_psql_query(query):
"""Execute a query using psql and print the output."""
try:
logger.info(f"Executing query:\t{query}")
result = subprocess.run(
["psql", PSQL_CONN, "-c", query],
text=True,
capture_output=True
)
if result.returncode != 0:
logger.error(f"Error executing query: {result.stderr}")
else:
logger.info(f"Query executed successfully:\n{result.stdout}")
except Exception as e:
logger.error(f"Error while executing query with psql: {e}")

def delete_existing_triggers(generalized_tables):
"""Delete existing triggers before applying transformations."""
logger.info("Deleting existing triggers...")
for table_name in generalized_tables.keys():
fixed_table_name = f"osm_{table_name}"
trigger_name = f"{fixed_table_name}_before_insert_update"

drop_trigger_query = f"""
DROP TRIGGER IF EXISTS {trigger_name} ON {fixed_table_name};
DROP FUNCTION IF EXISTS {fixed_table_name}_transform_trigger();
"""
execute_psql_query(drop_trigger_query)

def apply_geometry_transformations(generalized_tables):
"""Apply geometry transformations using psql."""
logger.info("Starting geometry transformations...")
with conn.cursor() as cur:
for table_name, table_info in generalized_tables.items():
fixed_table_name = f"osm_{table_name}"
geometry_transform = table_info.get("geometry_transform")
geometry_transform_types = table_info.get("geometry_transform_types")

# Skip if transform or types are not defined
if not geometry_transform or not geometry_transform_types:
logger.warning(
f"Skipping transformations for {fixed_table_name}: "
"'geometry_transform' or 'geometry_transform_types' not defined."
)
continue

# Execute transformation query
sql_query = sql.SQL("""
UPDATE {table}
SET geometry = {transform}
WHERE {types_condition};
""").format(
table=sql.Identifier(fixed_table_name),
transform=sql.SQL(geometry_transform),
types_condition=sql.SQL(geometry_transform_types)
for table_name, table_info in generalized_tables.items():
fixed_table_name = f"osm_{table_name}"
geometry_transform = table_info.get("geometry_transform")
geometry_transform_types = table_info.get("geometry_transform_types")

# Skip if transform or types are not defined
if not geometry_transform or not geometry_transform_types:
logger.warning(
f"Skipping transformations for {fixed_table_name}: "
"'geometry_transform' or 'geometry_transform_types' not defined."
)
try:
logger.info(f"Applying transformation '{geometry_transform}' to table {fixed_table_name}")
cur.execute(sql_query)
logger.info(f"Transformation completed successfully for {fixed_table_name}")
except Exception as e:
logger.error(f"Error applying transformation to {fixed_table_name}: {e}")
conn.commit()

def create_triggers(conn, generalized_tables):
"""Create triggers for future inserts/updates"""
continue

# Build the SQL query
sql_query = f"""UPDATE {fixed_table_name} SET geometry = {geometry_transform} WHERE {geometry_transform_types};"""
start_time = time.time()
execute_psql_query(sql_query)
elapsed_time = time.time() - start_time

logger.info(f"Transformation for table {fixed_table_name} completed in {elapsed_time:.2f} seconds.")

def create_triggers(generalized_tables):
"""Create triggers for future updates using psql."""
logger.info("Creating triggers for future geometry transformations...")
with conn.cursor() as cur:
for table_name, table_info in generalized_tables.items():
fixed_table_name = f"osm_{table_name}"
geometry_transform = table_info.get("geometry_transform")
geometry_transform_types = table_info.get("geometry_transform_types")

# Skip if transform or types are not defined
if not geometry_transform or not geometry_transform_types:
logger.warning(
f"Skipping trigger creation for {fixed_table_name}: "
"'geometry_transform' or 'geometry_transform_types' not defined."
)
continue

# Create trigger function
trigger_function = f"""
CREATE OR REPLACE FUNCTION {fixed_table_name}_transform_trigger()
RETURNS TRIGGER AS $$
BEGIN
IF {geometry_transform_types} THEN
NEW.geometry = {geometry_transform};
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""
try:
logger.info(f"Creating trigger function for {fixed_table_name}")
cur.execute(trigger_function)

# Create the trigger
trigger = f"""
CREATE TRIGGER {fixed_table_name}_before_insert_update
BEFORE INSERT OR UPDATE ON {fixed_table_name}
FOR EACH ROW
EXECUTE FUNCTION {fixed_table_name}_transform_trigger();
"""
cur.execute(trigger)
logger.info(f"Trigger created successfully for {fixed_table_name}")
except Exception as e:
logger.error(f"Error creating trigger for {fixed_table_name}: {e}")
conn.commit()
for table_name, table_info in generalized_tables.items():
fixed_table_name = f"osm_{table_name}"
geometry_transform = table_info.get("geometry_transform")
geometry_transform_types = table_info.get("geometry_transform_types")

# Skip if transform or types are not defined
if not geometry_transform or not geometry_transform_types:
logger.warning(
f"Skipping trigger creation for {fixed_table_name}: "
"'geometry_transform' or 'geometry_transform_types' not defined."
)
continue

# Create the trigger function SQL
trigger_function = f"""
CREATE OR REPLACE FUNCTION {fixed_table_name}_transform_trigger()
RETURNS TRIGGER AS $$
BEGIN
IF {geometry_transform_types} THEN
NEW.geometry = {geometry_transform};
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""
execute_psql_query(trigger_function)

# Create the trigger SQL
trigger = f"""
CREATE TRIGGER {fixed_table_name}_before_insert_update
BEFORE INSERT OR UPDATE ON {fixed_table_name}
FOR EACH ROW
EXECUTE FUNCTION {fixed_table_name}_transform_trigger();
"""
execute_psql_query(trigger)

def main(imposm3_config_path):
logger.info("Connecting to the PostgreSQL database...")
try:
conn = psycopg2.connect(**DB_CONFIG)
logger.info("Connection established successfully.")
except Exception as e:
logger.error(f"Error connecting to the database: {e}")
raise

"""Main execution flow."""
try:
# Load the imposm3.json configuration
config = load_imposm_config(imposm3_config_path)
generalized_tables = config.get("generalized_tables", {})

# Delete existing triggers
logger.info("Deleting existing triggers...")
delete_existing_triggers(generalized_tables)

# Apply initial geometry transformations
logger.info("Starting initial geometry transformations...")
apply_geometry_transformations(conn, generalized_tables)
apply_geometry_transformations(generalized_tables)

# Create triggers for future transformations
logger.info("Setting up triggers for future updates...")
create_triggers(conn, generalized_tables)
# Recreate triggers for future transformations
logger.info("Recreating triggers for future updates...")
create_triggers(generalized_tables)

logger.info("All transformations and triggers completed successfully.")
except Exception as e:
logger.error(f"An error occurred during execution: {e}")
raise
finally:
conn.close()
logger.info("Database connection closed.")

if __name__ == "__main__":
imposm3_config_path = "config/imposm3.json"
main(imposm3_config_path)

0 comments on commit 37f39fa

Please sign in to comment.