Skip to content

Commit

Permalink
Merge pull request #398 from OpenHistoricalMap/staging
Browse files Browse the repository at this point in the history
Tiler Cache - Purge and Seed
  • Loading branch information
Rub21 authored Nov 25, 2024
2 parents 01c7175 + 862bede commit c9951c5
Show file tree
Hide file tree
Showing 19 changed files with 926 additions and 28 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/chartpress.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
- 'main'
- 'staging'
- 'development'
- 'tiler-cache'
jobs:
build:
runs-on: ubuntu-20.04
Expand Down Expand Up @@ -58,6 +59,7 @@ jobs:
DEVELOPMENT_TILER_DB_PASSWORD: ${{ secrets.STAGING_TILER_DB_PASSWORD }}
DEVELOPMENT_TILER_CACHE_AWS_ACCESS_KEY_ID: ${{ secrets.STAGING_TILER_CACHE_AWS_ACCESS_KEY_ID }}
DEVELOPMENT_TILER_CACHE_AWS_SECRET_ACCESS_KEY: ${{ secrets.STAGING_TILER_CACHE_AWS_SECRET_ACCESS_KEY }}
DEVELOPMENT_SQS_QUEUE_URL: ${{ secrets.STAGING_SQS_QUEUE_URL }}
## tm
DEVELOPMENT_TM_DB_PASSWORD: ${{ secrets.STAGING_TM_DB_PASSWORD }}
DEVELOPMENT_TM_API_SECRET: ${{ secrets.STAGING_TM_API_SECRET }}
Expand Down Expand Up @@ -100,6 +102,7 @@ jobs:
STAGING_TILER_DB_PASSWORD: ${{ secrets.STAGING_TILER_DB_PASSWORD }}
STAGING_TILER_CACHE_AWS_ACCESS_KEY_ID: ${{ secrets.STAGING_TILER_CACHE_AWS_ACCESS_KEY_ID }}
STAGING_TILER_CACHE_AWS_SECRET_ACCESS_KEY: ${{ secrets.STAGING_TILER_CACHE_AWS_SECRET_ACCESS_KEY }}
STAGING_SQS_QUEUE_URL: ${{ secrets.STAGING_SQS_QUEUE_URL }}
## tm
STAGING_TM_API_CONSUMER_KEY: ${{ secrets.STAGING_TM_API_CONSUMER_KEY }}
STAGING_TM_API_CONSUMER_SECRET: ${{ secrets.STAGING_TM_API_CONSUMER_SECRET }}
Expand Down Expand Up @@ -145,6 +148,7 @@ jobs:
## tiler
PRODUCTION_TILER_DB_PASSWORD: ${{ secrets.PRODUCTION_TILER_DB_PASSWORD }}
PRODUCTION_TILER_CACHE_AWS_ACCESS_KEY_ID: ${{ secrets.PRODUCTION_TILER_CACHE_AWS_ACCESS_KEY_ID }}
RODUCTION_SQS_QUEUE_URL: ${{ secrets.RODUCTION_SQS_QUEUE_URL }}
## tm
PRODUCTION_TILER_CACHE_AWS_SECRET_ACCESS_KEY: ${{ secrets.PRODUCTION_TILER_CACHE_AWS_SECRET_ACCESS_KEY }}
PRODUCTION_TM_API_CONSUMER_KEY: ${{ secrets.PRODUCTION_TM_API_CONSUMER_KEY }}
Expand Down
2 changes: 2 additions & 0 deletions chartpress.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ charts:
valuesPath: osm-seed.tilerServer.image
tasking-manager-api:
valuesPath: osm-seed.tmApi.image
tiler-cache:
valuesPath: ohm.tilerCache.image
9 changes: 9 additions & 0 deletions compose/tiler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,12 @@ services:
env_file:
- ../envs/.env.tiler
restart: always
tilercache:
image: ohm-tiler-cache:v1
build:
context: ../images/tiler-cache
dockerfile: Dockerfile
volumes:
# - ../data/tiler-cache-data:/mnt/data
- ../images/tiler-cache:/app

31 changes: 31 additions & 0 deletions images/tiler-cache/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
FROM ghcr.io/openhistoricalmap/tiler-server:0.0.1-0.dev.git.1734.h5b4d15d

RUN apk update && apk add --no-cache \
proj \
proj-dev \
gdal \
gdal-dev \
geos \
geos-dev \
python3-dev \
py3-pip \
build-base \
proj-util \
git

# Ensure PROJ_DIR and PATH are set
ENV PROJ_DIR=/usr
ENV PATH="/usr/share/proj:${PATH}"

# Upgrade pip
RUN python3 -m pip install --upgrade pip

WORKDIR /app

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY *.py .

ENTRYPOINT ["python purge.py"]
8 changes: 8 additions & 0 deletions images/tiler-cache/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Tiler seed script

Tiler seeding is a group of scripts aimed at generating tile cache for a specific zoom level, for example, from 1 to 7. The script will receive a GeoJSON of all the areas where tile cache generation is required for OHM tiles. This approach aims to reduce latency when a user starts interacting with OHM tiles.


# Tiler purge script

Script that reads an AWS SQS queue and creates a container to purge and seed the tiler cache for specific imposm expired files.
Empty file added images/tiler-cache/__init__.py
Empty file.
178 changes: 178 additions & 0 deletions images/tiler-cache/purge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import boto3
import time
from kubernetes import client, config
import os
import json
from datetime import datetime, timezone, timedelta
import logging
from utils import check_tiler_db_postgres_status

logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s",
level=logging.INFO,
)

# Environment variables
ENVIRONMENT = os.getenv("ENVIRONMENT", "development")
NAMESPACE = os.getenv("NAMESPACE", "default")
SQS_QUEUE_URL = os.getenv("SQS_QUEUE_URL", "default-queue-url")
REGION_NAME = os.getenv("REGION_NAME", "us-east-1")
DOCKER_IMAGE = os.getenv(
"DOCKER_IMAGE",
"ghcr.io/openhistoricalmap/tiler-server:0.0.1-0.dev.git.1734.h5b4d15d",
)
NODEGROUP_TYPE = os.getenv("NODEGROUP_TYPE", "job_large")
MAX_ACTIVE_JOBS = int(os.getenv("MAX_ACTIVE_JOBS", 2))
DELETE_OLD_JOBS_AGE = int(os.getenv("DELETE_OLD_JOBS_AGE", 86400)) # default 1 day
MIN_ZOOM = os.getenv("MIN_ZOOM", 8)
MAX_ZOOM = os.getenv("MAX_ZOOM", 16)
JOB_NAME_PREFIX = f"{ENVIRONMENT}-tiler-cache-purge-seed"
POSTGRES_HOST = os.getenv("POSTGRES_HOST", "localhost")
POSTGRES_PORT = int(os.getenv("POSTGRES_PORT", 5432))
POSTGRES_DB = os.getenv("POSTGRES_DB", "postgres")
POSTGRES_USER = os.getenv("POSTGRES_USER", "postgres")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "password")

# Initialize Kubernetes and AWS clients
sqs = boto3.client("sqs", region_name=REGION_NAME)
config.load_incluster_config()
batch_v1 = client.BatchV1Api()
core_v1 = client.CoreV1Api()


def get_active_jobs_count():
"""Returns the number of active jobs in the namespace with names starting with 'tiler-purge-seed-'."""
logging.info("Checking active or pending jobs...")
jobs = batch_v1.list_namespaced_job(namespace=NAMESPACE)
active_jobs_count = 0

for job in jobs.items:
if not job.metadata.name.startswith(JOB_NAME_PREFIX):
continue

label_selector = f"job-name={job.metadata.name}"
pods = core_v1.list_namespaced_pod(namespace=NAMESPACE, label_selector=label_selector)

for pod in pods.items:
if pod.status.phase in [
"Pending",
"PodInitializing",
"ContainerCreating",
"Running",
"Error",
]:
logging.debug(f"Job '{job.metadata.name}' has a pod in {pod.status.phase} state.")
active_jobs_count += 1
break

logging.info(f"Total active or pending jobs: {active_jobs_count}")
return active_jobs_count


def create_kubernetes_job(file_url, file_name):
"""Create a Kubernetes Job to process a file."""
config_map_name = f"{ENVIRONMENT}-tiler-server-cm"
job_name = f"{JOB_NAME_PREFIX}-{file_name}"
job_manifest = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {"name": job_name},
"spec": {
"ttlSecondsAfterFinished": DELETE_OLD_JOBS_AGE,
"template": {
"spec": {
"nodeSelector": {"nodegroup_type": NODEGROUP_TYPE},
"containers": [
{
"name": "tiler-purge-seed",
"image": DOCKER_IMAGE,
"command": ["sh", "./purge_and_seed.sh"],
"envFrom": [{"configMapRef": {"name": config_map_name}}],
"env": [
{"name": "IMPOSM_EXPIRED_FILE", "value": file_url},
{"name": "MIN_ZOOM", "value": str(MIN_ZOOM)},
{"name": "MAX_ZOOM", "value": str(MAX_ZOOM)},
],
}
],
"restartPolicy": "Never",
}
},
"backoffLimit": 0,
},
}

try:
batch_v1.create_namespaced_job(namespace=NAMESPACE, body=job_manifest)
logging.info(f"Kubernetes Job '{job_name}' created for file: {file_url}")
except Exception as e:
logging.error(f"Failed to create Kubernetes Job '{job_name}': {e}")


def process_sqs_messages():
"""Process messages from the SQS queue and create Kubernetes Jobs for each file."""
while True:
response = sqs.receive_message(
QueueUrl=SQS_QUEUE_URL,
MaxNumberOfMessages=1,
WaitTimeSeconds=10,
AttributeNames=["All"],
MessageAttributeNames=["All"],
)

messages = response.get("Messages", [])
if not messages:
logging.info("No messages in the queue. Retrying in 5 seconds...")
time.sleep(5)
continue

for message in messages:
try:
# Check PostgreSQL status
if not check_tiler_db_postgres_status():
logging.error("PostgreSQL database is down. Retrying in 1 minute...")
time.sleep(60)
continue

# Check active job count before processing
while get_active_jobs_count() >= MAX_ACTIVE_JOBS:
logging.warning(
f"Max active jobs limit ({MAX_ACTIVE_JOBS}) reached. Waiting 1 minute..."
)
time.sleep(60)

# Parse the SQS message
body = json.loads(message["Body"])

if "Records" in body and body["Records"][0]["eventSource"] == "aws:s3":
record = body["Records"][0]
bucket_name = record["s3"]["bucket"]["name"]
object_key = record["s3"]["object"]["key"]

file_url = f"s3://{bucket_name}/{object_key}"
file_name = os.path.basename(object_key)

logging.info(f"Processing S3 event for file: {file_url}")

# Create a Kubernetes job
create_kubernetes_job(file_url, file_name)

elif "Event" in body and body["Event"] == "s3:TestEvent":
logging.info("Test event detected. Ignoring...")

# Delete the processed message
sqs.delete_message(
QueueUrl=SQS_QUEUE_URL,
ReceiptHandle=message["ReceiptHandle"],
)
logging.info(f"Message processed and deleted: {message['MessageId']}")

except Exception as e:
logging.error(f"Error processing message: {e}")

time.sleep(10)


if __name__ == "__main__":
logging.info("Starting SQS message processing...")
process_sqs_messages()
14 changes: 14 additions & 0 deletions images/tiler-cache/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
requests
smart_open
joblib
tqdm
click
mercantile
aiohttp
boto3
shapely
geopandas
pyproj
boto3
kubernetes
psycopg2-binary
93 changes: 93 additions & 0 deletions images/tiler-cache/seed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import os
import logging
from urllib.parse import urlparse
import click
from utils import (
upload_to_s3,
seed_tiles,
save_geojson_boundary,
check_tiler_db_postgres_status,
process_geojson_to_feature_tiles,
)

logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s",
level=logging.INFO,
)


@click.command(short_help="Script to request or seed tiles from a Tiler API.")
@click.option(
"--geojson-url",
required=True,
help="URL to the GeoJSON file defining the area of interest.",
)
@click.option(
"--zoom-levels",
help="Comma-separated list of zoom levels",
default="6,7,8,9",
)
@click.option(
"--concurrency",
help="Number of concurrent processes for seeding",
default=32,
type=int,
)
@click.option(
"--s3-bucket",
help="S3 bucket to upload the result CSV file",
default="osmseed-dev",
)
@click.option(
"--log-file",
help="CSV file to save the logs results",
default="log_file.csv",
)
def main(geojson_url, zoom_levels, concurrency, log_file, s3_bucket):
"""
Main function to process and seed tiles
"""
logging.info("Starting the tile seeding process.")

# Check PostgreSQL status
logging.info("Checking PostgreSQL database status...")
if not check_tiler_db_postgres_status():
logging.error("PostgreSQL database is not running or unreachable. Exiting.")
return
logging.info("PostgreSQL database is running and reachable.")

# Extract base name from the GeoJSON URL
parsed_url = urlparse(geojson_url)
base_name = os.path.splitext(os.path.basename(parsed_url.path))[0]
logging.info(f"Base name extracted from GeoJSON URL: {base_name}")

# Parse zoom levels
zoom_levels = list(map(int, zoom_levels.split(",")))
min_zoom = min(zoom_levels)
max_zoom = max(zoom_levels)
logging.info(f"Zoom levels parsed: Min Zoom: {min_zoom}, Max Zoom: {max_zoom}")

features, tiles = process_geojson_to_feature_tiles(geojson_url, min_zoom)
geojson_file = f"{base_name}_tiles.geojson"
save_geojson_boundary(features, geojson_file)

# Use base name for skipped tiles and log files
skipped_tiles_file = f"{base_name}_skipped_tiles.tiles"
log_file = f"{base_name}_seeding_log.csv"

# Seed the tiles
logging.info("Starting the seeding process...")
seed_tiles(tiles, concurrency, min_zoom, max_zoom, log_file, skipped_tiles_file)
logging.info("Tile seeding complete.")
logging.info(f"Skipped tiles saved to: {skipped_tiles_file}")
logging.info(f"Log of seeding performance saved to: {log_file}")

# Upload log files to S3
upload_to_s3(log_file, s3_bucket, f"tiler/logs/{log_file}")
upload_to_s3(skipped_tiles_file, s3_bucket, f"tiler/logs/{skipped_tiles_file}")
upload_to_s3(skipped_tiles_file, s3_bucket, f"tiler/logs/{geojson_file}")
logging.info("Log files uploaded to S3.")


if __name__ == "__main__":
main()
Loading

0 comments on commit c9951c5

Please sign in to comment.