Skip to content

Commit

Permalink
feat: in memory BytesIO GeoJSON for boundary param basemapper (#261)
Browse files Browse the repository at this point in the history
* feat: added support for in-memory geojson object in basemapper

* refactor: move classes in handler and factory files to basemapper.py
  • Loading branch information
azharcodeit authored Jun 13, 2024
1 parent 4b7ceb8 commit cd3edc1
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 77 deletions.
201 changes: 126 additions & 75 deletions osm_fieldwork/basemapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,16 @@
import re
import sys
import threading
from io import BytesIO
from pathlib import Path
from typing import Union
from typing import Tuple, Union

import geojson
import mercantile
from cpuinfo import get_cpu_info
from pmtiles.tile import (
Compression as PMTileCompression,
)
from pmtiles.tile import (
TileType as PMTileType,
)
from pmtiles.tile import (
zxy_to_tileid,
)
from pmtiles.tile import Compression as PMTileCompression
from pmtiles.tile import TileType as PMTileType
from pmtiles.tile import zxy_to_tileid
from pmtiles.writer import Writer as PMTileWriter
from pySmartDL import SmartDL
from shapely.geometry import shape
Expand All @@ -53,6 +48,118 @@
# Instantiate logger
log = logging.getLogger(__name__)

BoundingBox = Tuple[float, float, float, float]


class BoundaryHandlerFactory:
"""Factory class for creating boundary handlers based on the type of input boundary provided."""

def __init__(self, boundary: Union[str, BytesIO]):
"""Initialize the BoundaryHandlerFactory with a boundary input.
Args:
boundary (Union[str, BytesIO]): The boundary input, either as a GeoJSON BytesIO object or a BBOX string.
"""
if isinstance(boundary, BytesIO):
self.handler = BytesIOBoundaryHandler(boundary)
elif isinstance(boundary, str):
self.handler = StringBoundaryHandler(boundary)
else:
raise ValueError("Unsupported type for boundary parameter.")

self.boundary_box = self.handler.make_bbox()

def get_bounding_box(self) -> BoundingBox:
"""Get bounding box.
Returns:
BoundingBox: The bounding box as a tuple (min_x, min_y, max_x, max_y).
"""
return self.boundary_box


class BoundaryHandler:
"""A class to extract Bounding Box (BBOX) from various boundary representations."""

def make_bbox(self) -> BoundingBox:
"""Extract and return the bounding box from the boundary representation.
Returns:
BoundingBox: The bounding box as a tuple (min_x, min_y, max_x, max_y).
"""
pass


class BytesIOBoundaryHandler(BoundaryHandler):
"""Extracts BBOX from GeoJSON data stored in a BytesIO object."""

def __init__(self, boundary: BytesIO):
"""Initialize the BytesIOBoundaryHandler with a BytesIO input."""
self.boundary = boundary

def make_bbox(self) -> BoundingBox:
"""Extract and return the bounding box from the GeoJSON data.
Returns:
BoundingBox: The bounding box as a tuple (min_x, min_y, max_x, max_y).
"""
log.debug(f"Reading geojson BytesIO : {self.boundary}")
# Rewind the BytesIO object to the beginning before passing it to geojson.load()
self.boundary.seek(0)
with self.boundary as buffer:
poly = geojson.load(buffer)

if "features" in poly:
geometry = shape(poly["features"][0]["geometry"])
elif "geometry" in poly:
geometry = shape(poly["geometry"])
else:
geometry = shape(poly)

if isinstance(geometry, list):
# Multiple geometries
log.debug("Creating union of multiple bbox geoms")
geometry = unary_union(geometry)

if geometry.is_empty:
msg = f"No bbox extracted from {geometry}"
log.error(msg)
raise ValueError(msg) from None

bbox = geometry.bounds
# left, bottom, right, top
# minX, minY, maxX, maxY
return bbox


class StringBoundaryHandler(BoundaryHandler):
"""Extracts BBOX from string representation."""

def __init__(self, boundary: str):
"""Initialize the StringBoundaryHandler with a BoundaryHandler input."""
self.boundary = boundary

def make_bbox(self) -> BoundingBox:
"""A function to parse BBOX string."""
try:
if "," in self.boundary:
bbox_parts = self.boundary.split(",")
else:
bbox_parts = self.boundary.split(" ")
bbox = tuple(float(x) for x in bbox_parts)
if len(bbox) == 4:
# BBOX valid
return bbox
else:
msg = f"BBOX string malformed: {bbox}"
log.error(msg)
raise ValueError(msg) from None
except Exception as e:
log.error(e)
msg = f"Failed to parse BBOX string: {self.boundary}"
log.error(msg)
raise ValueError(msg) from None


def dlthread(
dest: str,
Expand Down Expand Up @@ -126,15 +233,15 @@ class BaseMapper(object):

def __init__(
self,
boundary: str,
boundary: Union[str, BytesIO],
base: str,
source: str,
xy: bool,
):
"""Create an tile basemap for ODK Collect.
Args:
boundary (str): A BBOX string or GeoJSON file of the AOI.
boundary (Union[str, BytesIO]): A BBOX string or GeoJSON provided as BytesIO object of the AOI.
The GeoJSON can contain multiple geometries.
base (str): The base directory to cache map tile in
source (str): The upstream data source for map tiles
Expand All @@ -143,7 +250,8 @@ def __init__(
Returns:
(BaseMapper): An instance of this class
"""
self.bbox = self.makeBbox(boundary)
bbox_factory = BoundaryHandlerFactory(boundary)
self.bbox = bbox_factory.get_bounding_box()
self.tiles = list()
self.base = base
# sources for imagery
Expand Down Expand Up @@ -271,65 +379,6 @@ def tileExists(
log.debug("%s doesn't exists" % filespec)
return False

def makeBbox(
self,
boundary: str,
) -> tuple[float, float, float, float]:
"""Make a bounding box from a shapely geometry.
Args:
boundary (str): A BBOX string or GeoJSON file of the AOI.
The GeoJSON can contain multiple geometries.
Returns:
(list): The bounding box coordinates
"""
if not boundary.lower().endswith((".json", ".geojson")):
# Is BBOX string
try:
if "," in boundary:
bbox_parts = boundary.split(",")
else:
bbox_parts = boundary.split(" ")
bbox = tuple(float(x) for x in bbox_parts)
if len(bbox) == 4:
# BBOX valid
return bbox
else:
msg = f"BBOX string malformed: {bbox}"
log.error(msg)
raise ValueError(msg) from None
except Exception as e:
log.error(e)
msg = f"Failed to parse BBOX string: {boundary}"
log.error(msg)
raise ValueError(msg) from None

log.debug(f"Reading geojson file: {boundary}")
with open(boundary, "r") as f:
poly = geojson.load(f)
if "features" in poly:
geometry = shape(poly["features"][0]["geometry"])
elif "geometry" in poly:
geometry = shape(poly["geometry"])
else:
geometry = shape(poly)

if isinstance(geometry, list):
# Multiple geometries
log.debug("Creating union of multiple bbox geoms")
geometry = unary_union(geometry)

if geometry.is_empty:
msg = f"No bbox extracted from {geometry}"
log.error(msg)
raise ValueError(msg) from None

bbox = geometry.bounds
# left, bottom, right, top
# minX, minY, maxX, maxY
return bbox


def tileid_from_xyz_dir_path(filepath: Union[Path, str], is_xy: bool = False) -> int:
"""Helper function to get the tile id from a tile in xyz directory structure.
Expand Down Expand Up @@ -444,7 +493,7 @@ def create_basemap_file(
"""Create a basemap with given parameters.
Args:
boundary (str, optional): The boundary for the area you want.
boundary (str | BytesIO, optional): The boundary for the area you want.
tms (str, optional): Custom TMS URL.
xy (bool, optional): Swap the X & Y coordinates when using a
custom TMS if True.
Expand Down Expand Up @@ -472,7 +521,7 @@ def create_basemap_file(

# Validation
if not boundary:
err = "You need to specify a boundary! (file or bbox)"
err = "You need to specify a boundary! (in-memory object or bbox)"
log.error(err)
raise ValueError(err)

Expand Down Expand Up @@ -593,7 +642,9 @@ def main():
log.error("")
parser.print_help()
quit()
boundary_parsed = args.boundary[0]
with open(Path(args.boundary[0]), "rb") as geojson_file:
boundary = geojson_file.read()
boundary_parsed = BytesIO(boundary)
elif len(args.boundary) == 4:
boundary_parsed = ",".join(args.boundary)
else:
Expand Down
10 changes: 8 additions & 2 deletions tests/test_basemap.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import logging
import os
import shutil
from io import BytesIO
from pathlib import Path

import pytest
from pmtiles.reader import MemorySource
from pmtiles.reader import Reader as PMTileReader

Expand All @@ -33,7 +35,10 @@
log = logging.getLogger(__name__)

rootdir = os.path.dirname(os.path.abspath(__file__))
boundary = f"{rootdir}/testdata/Rollinsville.geojson"
string_boundary = "-105.642662 39.917580 -105.631343 39.929250"
with open(Path(f"{rootdir}/testdata/Rollinsville.geojson"), "rb") as geojson_file:
boundary = geojson_file.read()
object_boundary = BytesIO(boundary)
outfile = f"{rootdir}/testdata/rollinsville.mbtiles"
base = "./tiles"
# boundary = open(infile, "r")
Expand All @@ -46,7 +51,8 @@
# geometry = shape(poly)


def test_create():
@pytest.mark.parametrize("boundary", [string_boundary, object_boundary])
def test_create(boundary):
"""See if the file got loaded."""
hits = 0
basemap = BaseMapper(boundary, base, "topo", False)
Expand Down

0 comments on commit cd3edc1

Please sign in to comment.