Skip to content

Commit

Permalink
Update sqs processor
Browse files Browse the repository at this point in the history
  • Loading branch information
Rub21 committed Feb 6, 2025
1 parent ffd91b1 commit ca74cb4
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 53 deletions.
42 changes: 7 additions & 35 deletions images/tiler-cache/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,44 +1,16 @@
FROM python:3.9

# Install kubectl
ARG KUBECTL_VERSION=v1.30.1
RUN curl -LO https://storage.googleapis.com/kubernetes-release/release/${KUBECTL_VERSION}/bin/linux/amd64/kubectl \
&& chmod +x kubectl \
&& mv kubectl /usr/local/bin/
RUN kubectl version --client
WORKDIR /app
COPY ./requirements.txt ./requirements.txt
RUN pip install --no-cache-dir --upgrade -r ./requirements.txt
COPY *.sh .
COPY *.py .

# CMD ["bash", "/app/start.sh"]


# RUN apk update && apk add --no-cache \
# proj \
# proj-dev \
# gdal \
# gdal-dev \
# geos \
# geos-dev \
# python3-dev \
# py3-pip \
# build-base \
# proj-util \
# git

# # Ensure PROJ_DIR and PATH are set
# ENV PROJ_DIR=/usr
# ENV PATH="/usr/share/proj:${PATH}"
COPY ./requirements.txt .
RUN pip install --no-cache-dir --upgrade -r requirements.txt

# # Upgrade pip
# RUN python3 -m pip install --upgrade pip

# WORKDIR /app

# # Install Python dependencies
# COPY requirements.txt .
# RUN pip install --no-cache-dir -r requirements.txt

# COPY *.py .

# ENTRYPOINT ["python purge.py"]
COPY *.sh .
COPY *.py .
CMD ["python", "sqs_processor.py"]
31 changes: 18 additions & 13 deletions images/tiler-cache/s3_cleanup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import boto3
import re
import click
import logging

def compute_children_tiles(s3_path, zoom_levels):
Expand All @@ -12,9 +11,9 @@ def compute_children_tiles(s3_path, zoom_levels):
zoom_levels (list): List of zoom levels for which to compute children.
Returns:
list: A list of child tile paths in "zoom/x/y" format only for the target zoom levels.
list: A sorted list of unique child tile paths in "zoom/x/y" format only for the target zoom levels.
"""
logging.info(f"Starting computation of child tiles for {s3_path} and zoom levels {zoom_levels}.")
logging.info(f"Starting computation of child tiles for {s3_path} and zoom levels {sorted(set(zoom_levels))}.")

s3_client = boto3.client("s3")
s3_match = re.match(r"s3://([^/]+)/(.+)", s3_path)
Expand All @@ -35,23 +34,24 @@ def compute_children_tiles(s3_path, zoom_levels):
match = re.match(r"(\d+)/(\d+)/(\d+)", tile)
if match:
z, x, y = map(int, match.groups())
for target_zoom in zoom_levels:
for target_zoom in sorted(set(zoom_levels)):
while z < target_zoom:
x *= 2
y *= 2
z += 1
# Add all 4 children tiles only for the target zoom level
if z == target_zoom:
child_tiles.add(f"{z}/{x}/{y}")
child_tiles.add(f"{z}/{x+1}/{y}")
child_tiles.add(f"{z}/{x}/{y+1}")
child_tiles.add(f"{z}/{x+1}/{y+1}")
child_tiles.update([
f"{z}/{x}/{y}",
f"{z}/{x+1}/{y}",
f"{z}/{x}/{y+1}",
f"{z}/{x+1}/{y+1}"
])

except Exception as e:
logging.error(f"Error processing S3 file: {e}")
raise

return list(child_tiles)
return sorted(child_tiles)

def generate_tile_patterns(tiles):
"""
Expand All @@ -64,13 +64,18 @@ def generate_tile_patterns(tiles):
list: List of unique patterns in the format 'zoom/prefix'.
"""
patterns = set()

for tile in tiles:
match = re.match(r"(\d+)/(\d+)/(\d+)", tile)
if match:
zoom, x, _ = match.groups()
prefix = f"{zoom}/{str(x)[:2]}"
x_str = str(x)
# If x has 2 or more digits, take the first 2 digits; otherwise, keep it as is
prefix = f"{zoom}/{x_str[:2]}" if len(x_str) > 1 else f"{zoom}/{x_str}"
patterns.add(prefix)
return list(patterns)

return sorted(patterns)


def delete_folders_by_pattern(bucket_name, patterns, path_file):
"""
Expand All @@ -90,7 +95,7 @@ def delete_folders_by_pattern(bucket_name, patterns, path_file):
for pattern in patterns:
zoom, prefix = pattern.split("/")
folder_prefix = f"{path_file}/{zoom}/{prefix}"
logging.info(f"Looking for objects under folder: {folder_prefix}")
logging.info(f"Looking for objects under folder: {folder_prefix}...")
paginator = s3_client.get_paginator("list_objects_v2")
response_iterator = paginator.paginate(Bucket=bucket_name, Prefix=folder_prefix)

Expand Down
12 changes: 8 additions & 4 deletions images/tiler-cache/sqs_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
from utils import check_tiler_db_postgres_status
from s3_cleanup import compute_children_tiles, generate_tile_patterns, delete_folders_by_pattern
import threading

logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s",
Expand All @@ -23,7 +24,7 @@
"ghcr.io/openhistoricalmap/tiler-server:0.0.1-0.dev.git.1780.h62561a8",
)
NODEGROUP_TYPE = os.getenv("NODEGROUP_TYPE", "job_large")
MAX_ACTIVE_JOBS = int(os.getenv("MAX_ACTIVE_JOBS", 2))
MAX_ACTIVE_JOBS = 1
DELETE_OLD_JOBS_AGE = int(os.getenv("DELETE_OLD_JOBS_AGE", 3600)) # default 1 hour

# Tiler cache purge and seed settings
Expand Down Expand Up @@ -86,7 +87,7 @@ def get_active_jobs_count():
logging.info(f"Total active or pending jobs: {active_jobs_count}")
return active_jobs_count

def get_purge_and_seed_commands(script_path='purge_and_seed.sh'):
def get_purge_and_seed_commands(script_path='purge_seed_tiles.sh'):
try:
with open(script_path, 'r') as file:
commands = file.read()
Expand Down Expand Up @@ -225,7 +226,11 @@ def process_sqs_messages():
create_kubernetes_job(file_url, file_name)

# Remove zoom levels 18,19,20
cleanup_zoom_levels(file_url, ZOOM_LEVELS_TO_DELETE, S3_BUCKET_CACHE_TILER, S3_BUCKET_PATH_FILES)
cleanup_thread = threading.Thread(
target=cleanup_zoom_levels,
args=(file_url, ZOOM_LEVELS_TO_DELETE, S3_BUCKET_CACHE_TILER, S3_BUCKET_PATH_FILES)
)
cleanup_thread.start()

elif "Event" in body and body["Event"] == "s3:TestEvent":
logging.info("Test event detected. Ignoring...")
Expand All @@ -242,7 +247,6 @@ def process_sqs_messages():

time.sleep(10)


if __name__ == "__main__":
logging.info("Starting SQS message processing...")
process_sqs_messages()
2 changes: 1 addition & 1 deletion ohm/templates/tiler-cache-purge/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ spec:
- -c
- |
set -x
python purge.py
python sqs_processor.py
env:
- name: REGION_NAME
value: {{ .Values.ohm.tilerCachePurge.env.REGION_NAME | quote }}
Expand Down

0 comments on commit ca74cb4

Please sign in to comment.