From 0785381df96a4750e5169cd8b4949133a5256402 Mon Sep 17 00:00:00 2001 From: Sam <78538841+spwoodcock@users.noreply.github.com> Date: Thu, 8 Feb 2024 19:43:44 +0000 Subject: [PATCH] feat: allow passing extra param to execQuery (for direct URL access) (#14) * refactor: update the json config parsing for clarity * fix: handle data extract zip entirely in memory * feat: allow passing extra params to queryExec, return URL if fgb * test: for generating data extracts, zip and fgb formats * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix: add extra error handling for extract respone json * test: add conftest to init logger * refactor: update error handling for data extract download * feat: add optional support for auth token with remote query * refactor: extra debug logging for raw data api polling * fix: handle raw-data-api status PENDING and STARTED --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- osm_rawdata/config.py | 85 +++++++------ osm_rawdata/pgasync.py | 65 +++++----- osm_rawdata/postgres.py | 257 ++++++++++++++++++++++++++-------------- tests/conftest.py | 39 ++++++ tests/test_postgres.py | 42 ++++++- 5 files changed, 323 insertions(+), 165 deletions(-) create mode 100644 tests/conftest.py diff --git a/osm_rawdata/config.py b/osm_rawdata/config.py index 4c67f31..027d653 100755 --- a/osm_rawdata/config.py +++ b/osm_rawdata/config.py @@ -185,8 +185,7 @@ def _yaml_parse_select_and_keep(self, data): self.config["select"][table].append({tag: []}) def parseJson(self, config: Union[str, BytesIO]): - """Parse the JSON format config file used by the raw-data-api - and export tool. + """Parse the JSON format config file using the Underpass schema. Args: config (str, BytesIO): the file or BytesIO object to read. @@ -194,6 +193,7 @@ def parseJson(self, config: Union[str, BytesIO]): Returns: config (dict): the config data """ + # Check the type of config and load data accordingly if isinstance(config, str): with open(config, "r") as config_file: data = json.load(config_file) @@ -203,51 +203,58 @@ def parseJson(self, config: Union[str, BytesIO]): log.error(f"Unsupported config format: {config}") raise ValueError(f"Invalid config {config}") - # Get the geometry + # Helper function to convert geometry names + def convert_geometry(geom): + if geom == "point": + return "nodes" + elif geom == "line": + return "ways_line" + elif geom == "polygon": + return "ways_poly" + return geom + + # Extract geometry self.geometry = shape(data["geometry"]) + + # Iterate through each key-value pair in the flattened dictionary for key, value in flatdict.FlatDict(data).items(): keys = key.split(":") - # print(keys) - # print(f"\t{value}") - # We already have the geometry - if key[:8] == "geometry": + # Skip the keys related to geometry + if key.startswith("geometry"): continue + # If it's a top-level key, directly update self.config if len(keys) == 1: - self.config.update({key: value}) + self.config[key] = value continue - # keys[0] is currently always 'filters' - # keys[1] is currently 'tags' for the WHERE clause, - # of attributes for the SELECT - geom = keys[2] - # tag = keys[4] - # Get the geometry - if geom == "point": - geom = "nodes" - elif geom == "line": - geom = "ways_line" - elif geom == "polygon": - geom = "ways_poly" - if keys[1] == "attributes": - for v1 in value: - if geom == "all_geometry": - self.config["select"]["nodes"].append({v1: {}}) - self.config["select"]["ways_line"].append({v1: {}}) - self.config["select"]["ways_poly"].append({v1: {}}) - self.config["tables"].append("nodes") - self.config["tables"].append("ways_poly") - self.config["tables"].append("ways_line") + + # Extract meaningful parts from the key + section, subsection = keys[:2] + geom_type = keys[2] if len(keys) > 2 else None + tag_type = keys[3] if len(keys) > 3 else None + tag_name = keys[4] if len(keys) > 4 else None + + # Convert geometry type to meaningful names + geom_type = convert_geometry(geom_type) + + if subsection == "attributes": + # For attributes, update select fields and tables + for attribute_name in value: + if geom_type == "all_geometry": + for geometry_type in ["nodes", "ways_line", "ways_poly"]: + self.config["select"][geometry_type].append({attribute_name: {}}) + self.config["tables"].append(geometry_type) else: - self.config["tables"].append(geom) - self.config["select"][geom].append({v1: {}}) - if keys[1] == "tags": - newtag = {keys[4]: value} - newtag["op"] = keys[3][5:] - if geom == "all_geometry": - self.config["where"]["nodes"].append(newtag) - self.config["where"]["ways_poly"].append(newtag) - self.config["where"]["ways_line"].append(newtag) + self.config["select"][geom_type].append({attribute_name: {}}) + self.config["tables"].append(geom_type) + elif subsection == "tags": + # For tags, update where fields + option = tag_type[5:] if tag_type else None + new_tag = {tag_name: value, "op": option} + if geom_type == "all_geometry": + for geometry_type in ["nodes", "ways_line", "ways_poly"]: + self.config["where"][geometry_type].append(new_tag) else: - self.config["where"][geom].append(newtag) + self.config["where"][geom_type].append(new_tag) return self.config diff --git a/osm_rawdata/pgasync.py b/osm_rawdata/pgasync.py index 210e93b..596d7f4 100755 --- a/osm_rawdata/pgasync.py +++ b/osm_rawdata/pgasync.py @@ -20,6 +20,7 @@ # import argparse +import asyncio import json import logging import os @@ -28,13 +29,11 @@ import zipfile from io import BytesIO from pathlib import Path -from sys import argv from urllib.parse import urlparse + +import asyncpg import geojson import requests -import asyncio -import asyncpg -from asyncpg import exceptions from geojson import Feature, FeatureCollection, Polygon from shapely import wkt from shapely.geometry import Polygon, shape @@ -48,6 +47,7 @@ # Instantiate logger log = logging.getLogger(__name__) + class DatabaseAccess(object): def __init__(self): """This is a class to setup a database connection.""" @@ -55,32 +55,33 @@ def __init__(self): self.dburi = None self.qc = None - async def connect(self, - dburi: str, - ): + async def connect( + self, + dburi: str, + ): self.dburi = dict() uri = urlparse(dburi) if not uri.username: - self.dburi['dbuser'] = os.getenv("PGUSER", default=None) - if not self.dburi['dbuser']: - log.error(f"You must specify the user name in the database URI, or set PGUSER") + self.dburi["dbuser"] = os.getenv("PGUSER", default=None) + if not self.dburi["dbuser"]: + log.error("You must specify the user name in the database URI, or set PGUSER") else: - self.dburi['dbuser'] = uri.username + self.dburi["dbuser"] = uri.username if not uri.password: - self.dburi['dbpass'] = os.getenv("PGPASSWORD", default=None) - if not self.dburi['dbpass']: - log.error(f"You must specify the user password in the database URI, or set PGPASSWORD") + self.dburi["dbpass"] = os.getenv("PGPASSWORD", default=None) + if not self.dburi["dbpass"]: + log.error("You must specify the user password in the database URI, or set PGPASSWORD") else: - self.dburi['dbpass'] = uri.password + self.dburi["dbpass"] = uri.password if not uri.hostname: - self.dburi['dbhost'] = os.getenv("PGHOST", default="localhost") + self.dburi["dbhost"] = os.getenv("PGHOST", default="localhost") else: - self.dburi['dbhost'] = uri.hostname + self.dburi["dbhost"] = uri.hostname - slash = uri.path.find('/') - self.dburi['dbname'] = uri.path[slash + 1:] + slash = uri.path.find("/") + self.dburi["dbname"] = uri.path[slash + 1 :] connect = f"postgres://{self.dburi['dbuser']}:{ self.dburi['dbpass']}@{self.dburi['dbhost']}/{self.dburi['dbname']}" - + if self.dburi["dbname"] == "underpass": # Authentication data # self.auth = HTTPBasicAuth(self.user, self.passwd) @@ -292,11 +293,11 @@ async def createTable( return True - async def execute(self, - sql: str, - ): - """ - Execute a raw SQL query and return the results. + async def execute( + self, + sql: str, + ): + """Execute a raw SQL query and return the results. Args: sql (str): The SQL to execute @@ -441,17 +442,18 @@ def __init__( # output: str = None ): """This is a client for a postgres database. + Returns: (PostgresClient): An instance of this class """ super().__init__() self.qc = None - async def loadConfig(self, - config: str, - ): - """ - Load the JSON or YAML config file that defines the SQL query + async def loadConfig( + self, + config: str, + ): + """Load the JSON or YAML config file that defines the SQL query Args: config (str): The filespec for the query config file @@ -534,6 +536,7 @@ async def execQuery( collection = await self.queryRemote(request) return collection + async def main(): """This main function lets this class be run standalone by a bash script.""" parser = argparse.ArgumentParser( @@ -601,9 +604,9 @@ async def main(): log.debug(f"Wrote {args.outfile}") + if __name__ == "__main__": """This is just a hook so this file can be run standalone during development.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(main()) - diff --git a/osm_rawdata/postgres.py b/osm_rawdata/postgres.py index 4cde6b0..e491122 100755 --- a/osm_rawdata/postgres.py +++ b/osm_rawdata/postgres.py @@ -34,9 +34,11 @@ import geojson import psycopg2 import requests -from geojson import Feature, FeatureCollection, Polygon -from shapely import wkt +from geojson import Feature, FeatureCollection +from geojson import Polygon as GeojsonPolygon +from shapely import to_geojson, wkt from shapely.geometry import Polygon, shape +from shapely.ops import unary_union # Find the other files for this project import osm_rawdata as rw @@ -164,82 +166,105 @@ def __del__(self): def createJson( self, config: QueryConfig, - boundary: Polygon, + boundary: GeojsonPolygon, allgeom: bool = False, - ): - """This class generates a JSON file, which is used for remote access - to an OSM raw database using the Underpass schema. + extra_params: dict = {}, + ) -> str: + """Generate a JSON file used for remote access to raw-data-api. + + Uses the Underpass schema. Args: config (QueryConfig): The config data from the query config file - boundary (Polygon): The boundary polygon + boundary (GeojsonPolygon): The boundary polygon allgeom (bool): Whether to return centroids or all the full geometry + TODO this is not implemented. + extra_params (dict): Extra parameters to include in JSON config root. Returns: - (FeatureCollection): the json data + str: The stringified JSON data. """ - feature = dict() - feature["geometry"] = boundary - - filters = dict() - filters["tags"] = dict() - # filters["tags"]["all_geometry"] = dict() - - # This only effects the output file - geometrytype = list() - # for table in config.config['tables']: - if len(config.config["select"]["nodes"]) > 0 or len(config.config["where"]["nodes"]) > 0: - geometrytype.append("point") - if len(config.config["select"]["ways_line"]) > 0 or len(config.config["where"]["ways_line"]) > 0: - geometrytype.append("line") - if len(config.config["select"]["ways_poly"]) > 0 or len(config.config["where"]["ways_poly"]) > 0: - geometrytype.append("polygon") - feature["geometryType"] = geometrytype - - tables = {"nodes": "point", "ways_poly": "polygon", "ways_line": "line"} - # The database tables to query - # if tags exists, then only query those fields - join_or = { - "point": [], - "polygon": [], - "line": [], + json_data = { + "geometry": boundary, + "geometryType": self._get_geometry_types(config), + "filters": self._get_filters(config), + "centroid": config.config.get("centroid", False), + "attributes": self._get_attributes(config), + **extra_params, } - join_and = { - "point": [], - "polygon": [], - "line": [], - } - filters["tags"] = { - "point": {"join_or": {}, "join_and": {}}, - "polygon": {"join_or": {}, "join_and": {}}, - "line": {"join_or": {}, "join_and": {}}, + return json.dumps(json_data) + + def _get_geometry_types(self, config: QueryConfig) -> Union[list, None]: + """Get the geometry types based on the QueryConfig. + + Args: + config (QueryConfig): The query configuration. + + Returns: + Union[list, None]: A list of geometry types or None if empty. + """ + geometry_types = [] + for table, geometry_type in {"nodes": "point", "ways_line": "line", "ways_poly": "polygon"}.items(): + if config.config["select"][table] or config.config["where"][table]: + geometry_types.append(geometry_type) + return geometry_types or None + + def _get_filters(self, config: QueryConfig) -> dict: + """Get the filters based on the QueryConfig. + + Args: + config (QueryConfig): The query configuration. + + Returns: + dict: The filters. + """ + # Initialize the filters dictionary with empty join_or and join_and + # dictionaries for point, line, and polygon + filters = { + "tags": { + "point": {"join_or": {}, "join_and": {}}, + "line": {"join_or": {}, "join_and": {}}, + "polygon": {"join_or": {}, "join_and": {}}, + } } - for table in config.config["where"].keys(): - for item in config.config["where"][table]: - key = list(item.keys())[0] - if item["op"] == "or": - join_or[tables[table]].append(key) - if item["op"] == "and": - join_and[tables[table]].append(key) - if "not null" in item.get(key, []): + # Mapping between database table names and geometry types + tables = {"nodes": "point", "ways_poly": "polygon", "ways_line": "line"} + + # Iterate through the conditions in the 'where' clause of the query configuration + for table, conditions in config.config["where"].items(): + for condition in conditions: + # Access the 'op' field in the condition + key, option = list(condition.items())[0] + if option == "or" or option == "and": + # If the option is 'or' or 'and', add the condition to the respective join dictionary + filters["tags"][tables[table]][f"join_{option}"][key] = [] + elif "not null" in option: + # If the condition indicates 'not null', add it to both join_or and join_and with empty values filters["tags"][tables[table]]["join_or"][key] = [] filters["tags"][tables[table]]["join_and"][key] = [] else: - filters["tags"][tables[table]]["join_or"][key] = item[key] - filters["tags"][tables[table]]["join_and"][key] = item[key] - feature.update({"filters": filters}) + # Otherwise, set the condition value in both join_or and join_and dictionaries + filters["tags"][tables[table]]["join_or"][key] = option + filters["tags"][tables[table]]["join_and"][key] = option - attributes = list() + return filters + + def _get_attributes(self, config: QueryConfig) -> list: + """Get the attributes based on the QueryConfig. + + Args: + config (QueryConfig): The query configuration. + + Returns: + list: The list of attributes. + """ + attributes = [] for table, data in config.config["select"].items(): for value in data: [[k, v]] = value.items() if k not in attributes: attributes.append(k) - - # Whether to dump centroids or polygons - if "centroid" in config.config: - feature["centroid"] = true - return json.dumps(feature) + return attributes def createSQL( self, @@ -441,15 +466,16 @@ def queryLocal( def queryRemote( self, query: str, - ): + ) -> Optional[Union[str, dict]]: """This queries a remote postgres database using the FastAPI backend to the HOT Export Tool. Args: - query (str): The JSON query to execute + query (str): The JSON query to execute. Returns: - (FeatureCollection): the results of the query + (str, FeatureCollection): either the data URL, or extracted geojson. + Returns None on failure. """ # Send the request to raw data api result = None @@ -485,10 +511,13 @@ def queryRemote( elapsed_time = 0 while elapsed_time < max_polling_duration: - result = self.session.get(task_query_url, headers=self.headers) - result_json = result.json() + response = self.session.get(task_query_url, headers=self.headers) + response_json = response.json() + response_status = response_json.get("status") - if result_json.get("status") == "PENDING": + log.debug(f"Current status: {response_status}") + + if response_status == "STARTED" or response_status == "PENDING": # Adjust polling frequency after the first minute if elapsed_time > 60: polling_interval = 10 # Poll every 10 seconds after the first minute @@ -498,7 +527,7 @@ def queryRemote( time.sleep(polling_interval) elapsed_time += polling_interval - elif result_json.get("status") == "SUCCESS": + elif response_status == "SUCCESS": break else: @@ -506,15 +535,28 @@ def queryRemote( log.error(f"{max_polling_duration} second elapsed. Aborting data extract.") return None - zip_url = result_json["result"]["download_url"] - result = self.session.get(zip_url, headers=self.headers) - fp = BytesIO(result.content) - zfp = zipfile.ZipFile(fp, "r") - zfp.extract("Export.geojson", "/tmp/") - # Now take that taskid and hit /tasks/status url with get - data = zfp.read("Export.geojson") - os.remove("/tmp/Export.geojson") - return json.loads(data) + if not isinstance(response_json, dict): + log.error(f"Raw data api response in wrong format: {response_json}") + return None + + info = response_json.get("result", {}) + log.debug(f"Raw Data API Response: {info}") + + data_url = info.get("download_url") + + if not data_url: + log.error("Raw data api no download_url returned. Skipping.") + return None + + if not data_url.endswith(".zip"): + return data_url + + with self.session.get(data_url, headers=self.headers) as response: + buffer = BytesIO(response.content) + with zipfile.ZipFile(buffer, "r") as zipped_file: + with zipped_file.open("Export.geojson") as geojson_file: + geojson_data = json.load(geojson_file) + return geojson_data class PostgresClient(DatabaseAccess): @@ -524,6 +566,7 @@ def __init__( self, uri: str, config: Optional[Union[str, BytesIO]] = None, + auth_token: Optional[str] = None, # output: str = None ): """This is a client for a postgres database. @@ -539,6 +582,10 @@ def __init__( super().__init__(uri) self.qc = QueryConfig() + # Optional authentication + if auth_token: + self.headers["access-token"] = auth_token + if config: # filespec string passed if isinstance(config, str): @@ -594,31 +641,41 @@ def createDB(self, dburi: uriParser): def execQuery( self, - boundary: FeatureCollection, + boundary: Union[FeatureCollection, Feature, dict], customsql: str = None, allgeom: bool = True, + clip_to_aoi: bool = False, + extra_params: dict = {}, ): """This class generates executes the query using a local postgres database, or a remote one that uses the Underpass schema. Args: - boundary (FeatureCollection): The boundary polygon - customsql (str): Don't create the SQL, use the one supplied - allgeom (bool): Whether to return centroids or all the full geometry + boundary (FeatureCollection, Feature, dict): The boundary polygon. + customsql (str): Don't create the SQL, use the one supplied. + allgeom (bool): Whether to return centroids or all the full geometry. + clip_to_aoi (bool): Remove polygons with centroids outside AOI. Returns: query (FeatureCollection): the json """ - log.info("Extracting features from Postgres...") - - if "features" in boundary: - # FIXME: ideally this should support multipolygons - poly = boundary["features"][0]["geometry"] + log.info("Parsing AOI geojson for data extract") + + if (geom_type := boundary.get("type")) == "FeatureCollection": + # Convert each feature into a Shapely geometry + geometries = [shape(feature.get("geometry")) for feature in boundary.get("features", [])] + merged_geom = unary_union(geometries) + elif geom_type == "Feature": + merged_geom = shape(boundary.get("geometry")) else: - poly = boundary["geometry"] - wkt = shape(poly) + merged_geom = shape(boundary) + + # Convert the merged geoms to a single Polygon GeoJSON using convex hull + aoi_polygon = json.loads(to_geojson(merged_geom.convex_hull)) + aoi_shape = shape(aoi_polygon) if self.dbshell: + log.info("Extracting features from Postgres...") if not customsql: sql = self.createSQL(self.qc, allgeom) else: @@ -626,14 +683,36 @@ def execQuery( alldata = list() for query in sql: # print(query) - result = self.queryLocal(query, allgeom, wkt) + result = self.queryLocal(query, allgeom, aoi_shape) if len(result) > 0: alldata += result["features"] collection = FeatureCollection(alldata) else: - json_config = self.createJson(self.qc, poly, allgeom) + log.info("Extracting features via remote call...") + json_config = self.createJson(self.qc, aoi_polygon, allgeom, extra_params) collection = self.queryRemote(json_config) - return collection + # bind_zip=False, data is not zipped, return URL directly + if not json.loads(json_config).get("bind_zip", True): + return collection + + if not collection: + log.warning("No data returned for data extract") + return collection + + if not clip_to_aoi: + return collection + + # TODO this may be implemented in raw-data-api directly + # TODO https://github.com/hotosm/raw-data-api/issues/207 + # TODO remove code here if complete + # Only return polygons with centroids inside AOI + filtered_features = [] + for feature in collection["features"]: + if (geom := feature.get("geometry")).get("type") == "Polygon": + if aoi_shape.contains(shape(shape(geom).centroid)): + filtered_features.append(feature) + + return FeatureCollection(filtered_features) def main(): diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..c6a5b30 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,39 @@ +# Copyright (c) 2022, 2023 Humanitarian OpenStreetMap Team +# +# This file is part of osm-rawdata. +# +# osm-rawdata is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# osm-rawdata is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with osm-rawdata. If not, see . +# +"""Configuration and fixtures for PyTest.""" + +import logging +import sys + +import psycopg2 +import pytest + +logging.basicConfig( + level="DEBUG", + format=("%(asctime)s.%(msecs)03d [%(levelname)s] " "%(name)s | %(funcName)s:%(lineno)d | %(message)s"), + datefmt="%y-%m-%d %H:%M:%S", + stream=sys.stdout, +) + +log = logging.getLogger(__name__) + + +@pytest.fixture(scope="session") +def db(): + """Existing psycopg2 connection.""" + return psycopg2.connect("postgresql://fmtm:testpass@db:5432/underpass") diff --git a/tests/test_postgres.py b/tests/test_postgres.py index 8a0fd89..996280c 100644 --- a/tests/test_postgres.py +++ b/tests/test_postgres.py @@ -20,8 +20,10 @@ import logging import os +import time import geojson +import requests import osm_rawdata as rw from osm_rawdata.postgres import PostgresClient @@ -41,9 +43,37 @@ def test_data_extract(): assert len(data_extract.get("features")) == 16 -# def test_data_extract_flatgeobuf(): -# pg = PostgresClient("underpass", f"{rootdir}/buildings.yaml") -# aoi_file = open(f"{rootdir}/AOI_small.geojson", "r") -# boundary = geojson.load(aoi_file) -# data_extract = pg.execQuery(boundary) -# assert len(data_extract.get("features")) == 16 +def test_data_extract_with_clipping(): + # Sleep 5 seconds to reduce API load + time.sleep(5) + + pg = PostgresClient("underpass", f"{rootdir}/buildings.yaml") + aoi_file = open(f"{rootdir}/AOI_small.geojson", "r") + boundary = geojson.load(aoi_file) + data_extract = pg.execQuery(boundary, clip_to_aoi=True) + print(data_extract) + assert len(data_extract.get("features")) == 13 + + +def test_data_extract_flatgeobuf(): + # Sleep 5 seconds to reduce API load + time.sleep(5) + + pg = PostgresClient("underpass", f"{rootdir}/buildings.yaml") + aoi_file = open(f"{rootdir}/AOI_small.geojson", "r") + boundary = geojson.load(aoi_file) + extract_url = pg.execQuery( + boundary, + extra_params={ + "fileName": "osm-rawdata-test-extract", + "outputType": "fgb", + "bind_zip": False, + }, + # param options: https://hotosm.github.io/raw-data-api/api/endpoints/#rawdatacurrentparams + ) + assert extract_url.startswith("http") + + with requests.head(extract_url) as response: + assert response.status_code == 200 + assert response.headers["Content-Type"] == "binary/octet-stream" + assert response.headers["Content-Length"] == "8376"