diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..2a7449d
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,57 @@
+# Copyright (c) 2022, 2023 Humanitarian OpenStreetMap Team
+# This file is part of osm-rawdata.
+#
+# osm-rawdata is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# osm-rawdata is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with osm-rawdata. If not, see .
+#
+
+version: "3"
+
+networks:
+ net:
+ name: osm-rawdata
+
+services:
+ rawdata:
+ image: "ghcr.io/hotosm/osm-rawdata:ci"
+ build:
+ target: ci
+ container_name: osm-rawdata
+ volumes:
+ # Mount local package
+ - ./osm_rawdata:/root/.local/lib/python3.10/site-packages/osm_rawdata
+ # Mount local tests
+ - ./tests:/data/tests
+ depends_on:
+ db:
+ condition: service_healthy
+ networks:
+ - net
+ restart: "unless-stopped"
+ command: "pytest"
+
+ db:
+ image: "postgis/postgis:14-3.4-alpine"
+ environment:
+ - POSTGRES_USER=fmtm
+ - POSTGRES_PASSWORD=testpass
+ - POSTGRES_DB=underpass
+ networks:
+ - net
+ restart: "unless-stopped"
+ healthcheck:
+ test: pg_isready -U fmtm -d underpass
+ start_period: 5s
+ interval: 10s
+ timeout: 5s
+ retries: 3
diff --git a/osm_rawdata/config.py b/osm_rawdata/config.py
index ab49424..027d653 100755
--- a/osm_rawdata/config.py
+++ b/osm_rawdata/config.py
@@ -23,10 +23,12 @@
import json
import logging
import sys
+from io import BytesIO
# import time
from pathlib import Path
from sys import argv
+from typing import Union
import flatdict
import yaml
@@ -68,16 +70,16 @@ def __init__(self, boundary: Polygon = None):
# for polygon extracts, sometimes we just want the center point
self.centroid = False
- def parseYaml(self, filespec: str):
+ def parseYaml(self, config: Union[str, BytesIO]):
"""Parse the YAML config file format into the internal data structure.
Args:
- filespec (str): The file to read.
+ config (str, BytesIO): the file or BytesIO object to read.
Returns:
config (dict): The config data.
"""
- yaml_data = self.load_yaml(filespec)
+ yaml_data = self.load_yaml(config)
self._yaml_parse_tables(yaml_data)
self._yaml_parse_where(yaml_data)
@@ -87,7 +89,7 @@ def parseYaml(self, filespec: str):
return self.config
@staticmethod
- def load_yaml(filespec: str):
+ def load_yaml(config: Union[str, BytesIO]):
"""Private method to load YAML data from a file.
Args:
@@ -96,8 +98,14 @@ def load_yaml(filespec: str):
Returns:
data (dict): The loaded YAML data.
"""
- with open(filespec, "r") as file:
- return yaml.safe_load(file)
+ if isinstance(config, str):
+ with open(config, "r") as file:
+ return yaml.safe_load(file)
+ elif isinstance(config, BytesIO):
+ return yaml.safe_load(config.getvalue())
+ else:
+ log.error(f"Unsupported config format: {config}")
+ raise ValueError(f"Invalid config {config}")
def _yaml_parse_tables(self, data):
"""Private method to parse 'from' data.
@@ -176,63 +184,77 @@ def _yaml_parse_select_and_keep(self, data):
for tag in data["keep"]:
self.config["select"][table].append({tag: []})
- def parseJson(self, filespec: str):
- """Parse the JSON format config file used by the raw-data-api
- and export tool.
+ def parseJson(self, config: Union[str, BytesIO]):
+ """Parse the JSON format config file using the Underpass schema.
Args:
- filespec (str): the file to read
+ config (str, BytesIO): the file or BytesIO object to read.
Returns:
config (dict): the config data
"""
- file = open(filespec, "r")
- data = json.load(file)
- # Get the geometry
+ # Check the type of config and load data accordingly
+ if isinstance(config, str):
+ with open(config, "r") as config_file:
+ data = json.load(config_file)
+ elif isinstance(config, BytesIO):
+ data = json.load(config)
+ else:
+ log.error(f"Unsupported config format: {config}")
+ raise ValueError(f"Invalid config {config}")
+
+ # Helper function to convert geometry names
+ def convert_geometry(geom):
+ if geom == "point":
+ return "nodes"
+ elif geom == "line":
+ return "ways_line"
+ elif geom == "polygon":
+ return "ways_poly"
+ return geom
+
+ # Extract geometry
self.geometry = shape(data["geometry"])
+
+ # Iterate through each key-value pair in the flattened dictionary
for key, value in flatdict.FlatDict(data).items():
keys = key.split(":")
- # print(keys)
- # print(f"\t{value}")
- # We already have the geometry
- if key[:8] == "geometry":
+ # Skip the keys related to geometry
+ if key.startswith("geometry"):
continue
+ # If it's a top-level key, directly update self.config
if len(keys) == 1:
- self.config.update({key: value})
+ self.config[key] = value
continue
- # keys[0] is currently always 'filters'
- # keys[1] is currently 'tags' for the WHERE clause,
- # of attributes for the SELECT
- geom = keys[2]
- # tag = keys[4]
- # Get the geometry
- if geom == "point":
- geom = "nodes"
- elif geom == "line":
- geom = "ways_line"
- elif geom == "polygon":
- geom = "ways_poly"
- if keys[1] == "attributes":
- for v1 in value:
- if geom == "all_geometry":
- self.config["select"]["nodes"].append({v1: {}})
- self.config["select"]["ways_line"].append({v1: {}})
- self.config["select"]["ways_poly"].append({v1: {}})
- self.config["tables"].append("nodes")
- self.config["tables"].append("ways_poly")
- self.config["tables"].append("ways_line")
+
+ # Extract meaningful parts from the key
+ section, subsection = keys[:2]
+ geom_type = keys[2] if len(keys) > 2 else None
+ tag_type = keys[3] if len(keys) > 3 else None
+ tag_name = keys[4] if len(keys) > 4 else None
+
+ # Convert geometry type to meaningful names
+ geom_type = convert_geometry(geom_type)
+
+ if subsection == "attributes":
+ # For attributes, update select fields and tables
+ for attribute_name in value:
+ if geom_type == "all_geometry":
+ for geometry_type in ["nodes", "ways_line", "ways_poly"]:
+ self.config["select"][geometry_type].append({attribute_name: {}})
+ self.config["tables"].append(geometry_type)
else:
- self.config["tables"].append(geom)
- self.config["select"][geom].append({v1: {}})
- if keys[1] == "tags":
- newtag = {keys[4]: value}
- newtag["op"] = keys[3][5:]
- if geom == "all_geometry":
- self.config["where"]["nodes"].append(newtag)
- self.config["where"]["ways_poly"].append(newtag)
- self.config["where"]["ways_line"].append(newtag)
+ self.config["select"][geom_type].append({attribute_name: {}})
+ self.config["tables"].append(geom_type)
+ elif subsection == "tags":
+ # For tags, update where fields
+ option = tag_type[5:] if tag_type else None
+ new_tag = {tag_name: value, "op": option}
+ if geom_type == "all_geometry":
+ for geometry_type in ["nodes", "ways_line", "ways_poly"]:
+ self.config["where"][geometry_type].append(new_tag)
else:
- self.config["where"][geom].append(newtag)
+ self.config["where"][geom_type].append(new_tag)
return self.config
diff --git a/osm_rawdata/pgasync.py b/osm_rawdata/pgasync.py
index 05c02c4..ea93bf5 100755
--- a/osm_rawdata/pgasync.py
+++ b/osm_rawdata/pgasync.py
@@ -20,6 +20,7 @@
#
import argparse
+import asyncio
import json
import logging
import os
@@ -28,13 +29,11 @@
import zipfile
from io import BytesIO
from pathlib import Path
-from sys import argv
from urllib.parse import urlparse
+
+import asyncpg
import geojson
import requests
-import asyncio
-import asyncpg
-from asyncpg import exceptions
from geojson import Feature, FeatureCollection, Polygon
from shapely import wkt
from shapely.geometry import Polygon, shape
@@ -48,6 +47,7 @@
# Instantiate logger
log = logging.getLogger(__name__)
+
class DatabaseAccess(object):
def __init__(self):
"""This is a class to setup a database connection."""
@@ -55,32 +55,33 @@ def __init__(self):
self.dburi = None
self.qc = None
- async def connect(self,
- dburi: str,
- ):
+ async def connect(
+ self,
+ dburi: str,
+ ):
self.dburi = dict()
uri = urlparse(dburi)
if not uri.username:
- self.dburi['dbuser'] = os.getenv("PGUSER", default=None)
- if not self.dburi['dbuser']:
- log.error(f"You must specify the user name in the database URI, or set PGUSER")
+ self.dburi["dbuser"] = os.getenv("PGUSER", default=None)
+ if not self.dburi["dbuser"]:
+ log.error("You must specify the user name in the database URI, or set PGUSER")
else:
- self.dburi['dbuser'] = uri.username
+ self.dburi["dbuser"] = uri.username
if not uri.password:
- self.dburi['dbpass'] = os.getenv("PGPASSWORD", default=None)
- if not self.dburi['dbpass']:
- log.error(f"You must specify the user password in the database URI, or set PGPASSWORD")
+ self.dburi["dbpass"] = os.getenv("PGPASSWORD", default=None)
+ if not self.dburi["dbpass"]:
+ log.error("You must specify the user password in the database URI, or set PGPASSWORD")
else:
- self.dburi['dbpass'] = uri.password
+ self.dburi["dbpass"] = uri.password
if not uri.hostname:
- self.dburi['dbhost'] = os.getenv("PGHOST", default="localhost")
+ self.dburi["dbhost"] = os.getenv("PGHOST", default="localhost")
else:
- self.dburi['dbhost'] = uri.hostname
+ self.dburi["dbhost"] = uri.hostname
- slash = uri.path.find('/')
- self.dburi['dbname'] = uri.path[slash + 1:]
+ slash = uri.path.find("/")
+ self.dburi["dbname"] = uri.path[slash + 1 :]
connect = f"postgres://{self.dburi['dbuser']}:{ self.dburi['dbpass']}@{self.dburi['dbhost']}/{self.dburi['dbname']}"
-
+
if self.dburi["dbname"] == "underpass":
# Authentication data
# self.auth = HTTPBasicAuth(self.user, self.passwd)
@@ -292,11 +293,11 @@ async def createTable(
return True
- async def execute(self,
- sql: str,
- ):
- """
- Execute a raw SQL query and return the results.
+ async def execute(
+ self,
+ sql: str,
+ ):
+ """Execute a raw SQL query and return the results.
Args:
sql (str): The SQL to execute
@@ -442,17 +443,18 @@ def __init__(
# output: str = None
):
"""This is a client for a postgres database.
+
Returns:
(PostgresClient): An instance of this class
"""
super().__init__()
self.qc = None
- async def loadConfig(self,
- config: str,
- ):
- """
- Load the JSON or YAML config file that defines the SQL query
+ async def loadConfig(
+ self,
+ config: str,
+ ):
+ """Load the JSON or YAML config file that defines the SQL query
Args:
config (str): The filespec for the query config file
@@ -535,6 +537,7 @@ async def execQuery(
collection = await self.queryRemote(request)
return collection
+
async def main():
"""This main function lets this class be run standalone by a bash script."""
parser = argparse.ArgumentParser(
@@ -602,9 +605,9 @@ async def main():
log.debug(f"Wrote {args.outfile}")
+
if __name__ == "__main__":
"""This is just a hook so this file can be run standalone during development."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())
-
diff --git a/osm_rawdata/postgres.py b/osm_rawdata/postgres.py
index 4e2494a..3b03e7a 100755
--- a/osm_rawdata/postgres.py
+++ b/osm_rawdata/postgres.py
@@ -29,13 +29,16 @@
from io import BytesIO
from pathlib import Path
from sys import argv
+from typing import Optional, Union
import geojson
import psycopg2
import requests
-from geojson import Feature, FeatureCollection, Polygon
-from shapely import wkt
+from geojson import Feature, FeatureCollection
+from geojson import Polygon as GeojsonPolygon
+from shapely import to_geojson, wkt
from shapely.geometry import Polygon, shape
+from shapely.ops import unary_union
# Find the other files for this project
import osm_rawdata as rw
@@ -132,7 +135,7 @@ def __init__(
# Use a persistant connect, better for multiple requests
self.session = requests.Session()
- self.url = os.getenv("UNDERPASS_API_URL", "https://raw-data-api0.hotosm.org/v1")
+ self.url = os.getenv("UNDERPASS_API_URL", "https://api-prod.raw-data.hotosm.org/v1")
self.headers = {"accept": "application/json", "Content-Type": "application/json"}
else:
log.info(f"Opening database connection to: {self.uri['dbname']}")
@@ -156,90 +159,112 @@ def __init__(
log.error(f"Couldn't connect to database: {e}")
def __del__(self):
- """
- Close any open connections to Postgres.
- """
- self.dbshell.close()
+ """Close any open connections to Postgres."""
+ if self.dbshell:
+ self.dbshell.close()
def createJson(
self,
config: QueryConfig,
- boundary: Polygon,
+ boundary: GeojsonPolygon,
allgeom: bool = False,
- ):
- """This class generates a JSON file, which is used for remote access
- to an OSM raw database using the Underpass schema.
+ extra_params: dict = {},
+ ) -> str:
+ """Generate a JSON file used for remote access to raw-data-api.
+
+ Uses the Underpass schema.
Args:
config (QueryConfig): The config data from the query config file
- boundary (Polygon): The boundary polygon
+ boundary (GeojsonPolygon): The boundary polygon
allgeom (bool): Whether to return centroids or all the full geometry
+ TODO this is not implemented.
+ extra_params (dict): Extra parameters to include in JSON config root.
Returns:
- (FeatureCollection): the json data
+ str: The stringified JSON data.
"""
- feature = dict()
- feature["geometry"] = boundary
-
- filters = dict()
- filters["tags"] = dict()
- # filters["tags"]["all_geometry"] = dict()
-
- # This only effects the output file
- geometrytype = list()
- # for table in config.config['tables']:
- if len(config.config["select"]["nodes"]) > 0 or len(config.config["where"]["nodes"]) > 0:
- geometrytype.append("point")
- if len(config.config["select"]["ways_line"]) > 0 or len(config.config["where"]["ways_line"]) > 0:
- geometrytype.append("line")
- if len(config.config["select"]["ways_poly"]) > 0 or len(config.config["where"]["ways_poly"]) > 0:
- geometrytype.append("polygon")
- feature["geometryType"] = geometrytype
-
- tables = {"nodes": "point", "ways_poly": "polygon", "ways_line": "line"}
- # The database tables to query
- # if tags exists, then only query those fields
- join_or = {
- "point": [],
- "polygon": [],
- "line": [],
- }
- join_and = {
- "point": [],
- "polygon": [],
- "line": [],
+ json_data = {
+ "geometry": boundary,
+ "geometryType": self._get_geometry_types(config),
+ "filters": self._get_filters(config),
+ "centroid": config.config.get("centroid", False),
+ "attributes": self._get_attributes(config),
+ **extra_params,
}
- filters["tags"] = {
- "point": {"join_or": {}, "join_and": {}},
- "polygon": {"join_or": {}, "join_and": {}},
- "line": {"join_or": {}, "join_and": {}},
+ return json.dumps(json_data)
+
+ def _get_geometry_types(self, config: QueryConfig) -> Union[list, None]:
+ """Get the geometry types based on the QueryConfig.
+
+ Args:
+ config (QueryConfig): The query configuration.
+
+ Returns:
+ Union[list, None]: A list of geometry types or None if empty.
+ """
+ geometry_types = []
+ for table, geometry_type in {"nodes": "point", "ways_line": "line", "ways_poly": "polygon"}.items():
+ if config.config["select"][table] or config.config["where"][table]:
+ geometry_types.append(geometry_type)
+ return geometry_types or None
+
+ def _get_filters(self, config: QueryConfig) -> dict:
+ """Get the filters based on the QueryConfig.
+
+ Args:
+ config (QueryConfig): The query configuration.
+
+ Returns:
+ dict: The filters.
+ """
+ # Initialize the filters dictionary with empty join_or and join_and
+ # dictionaries for point, line, and polygon
+ filters = {
+ "tags": {
+ "point": {"join_or": {}, "join_and": {}},
+ "line": {"join_or": {}, "join_and": {}},
+ "polygon": {"join_or": {}, "join_and": {}},
+ }
}
- for table in config.config["where"].keys():
- for item in config.config["where"][table]:
- key = list(item.keys())[0]
- if item["op"] == "or":
- join_or[tables[table]].append(key)
- if item["op"] == "and":
- join_and[tables[table]].append(key)
- if "not null" in item.get(key, []):
+ # Mapping between database table names and geometry types
+ tables = {"nodes": "point", "ways_poly": "polygon", "ways_line": "line"}
+
+ # Iterate through the conditions in the 'where' clause of the query configuration
+ for table, conditions in config.config["where"].items():
+ for condition in conditions:
+ # Access the 'op' field in the condition
+ key, option = list(condition.items())[0]
+ if option == "or" or option == "and":
+ # If the option is 'or' or 'and', add the condition to the respective join dictionary
+ filters["tags"][tables[table]][f"join_{option}"][key] = []
+ elif "not null" in option:
+ # If the condition indicates 'not null', add it to both join_or and join_and with empty values
filters["tags"][tables[table]]["join_or"][key] = []
filters["tags"][tables[table]]["join_and"][key] = []
else:
- filters["tags"][tables[table]]["join_or"][key] = item[key]
- filters["tags"][tables[table]]["join_and"][key] = item[key]
- feature.update({"filters": filters})
+ # Otherwise, set the condition value in both join_or and join_and dictionaries
+ filters["tags"][tables[table]]["join_or"][key] = option
+ filters["tags"][tables[table]]["join_and"][key] = option
+
+ return filters
+
+ def _get_attributes(self, config: QueryConfig) -> list:
+ """Get the attributes based on the QueryConfig.
- attributes = list()
+ Args:
+ config (QueryConfig): The query configuration.
+
+ Returns:
+ list: The list of attributes.
+ """
+ attributes = []
for table, data in config.config["select"].items():
for value in data:
[[k, v]] = value.items()
if k not in attributes:
attributes.append(k)
-
- # Whether to dump centroids or polygons
- if "centroid" in config.config:
- feature["centroid"] = true
- return json.dumps(feature)
+ return attributes
def createSQL(
self,
@@ -289,12 +314,14 @@ def createSQL(
jor = ""
for entry in join_or:
for k, v in entry.items():
- if type(v[0]) == list:
- # It's an array of values
- value = str(v[0])
- any = f"ANY(ARRAY{value})"
- jor += f"tags->>'{k}'={any} OR "
- continue
+ # Check if v is a non-empty list
+ if isinstance(v, list) and v:
+ if isinstance(v[0], list):
+ # It's an array of values
+ value = str(v[0])
+ any = f"ANY(ARRAY{value})"
+ jor += f"tags->>'{k}'={any} OR "
+ continue
if k == "op":
continue
if len(v) == 1:
@@ -458,42 +485,98 @@ def queryLocal(
def queryRemote(
self,
- query: str = None,
- ):
+ query: str,
+ ) -> Optional[Union[str, dict]]:
"""This queries a remote postgres database using the FastAPI
backend to the HOT Export Tool.
Args:
- query (str): The JSON query to execute
+ query (str): The JSON query to execute.
Returns:
- (FeatureCollection): the results of the query
+ (str, FeatureCollection): either the data URL, or extracted geojson.
+ Returns None on failure.
"""
+ # Send the request to raw data api
+ result = None
+
url = f"{self.url}/snapshot/"
- result = self.session.post(url, data=query, headers=self.headers)
+ try:
+ result = self.session.post(url, data=query, headers=self.headers)
+ result.raise_for_status()
+ except requests.exceptions.HTTPError:
+ if result is not None:
+ error_dict = result.json()
+ error_dict["status_code"] = result.status_code
+ log.error(f"Failed to get extract from Raw Data API: {error_dict}")
+ return error_dict
+ else:
+ log.error("Failed to make request to raw data API")
+
+ if result is None:
+ log.error("Raw Data API did not return a response. Skipping.")
+ return None
+
if result.status_code != 200:
- log.error(f"{result.json()['detail'][0]['msg']}")
+ error_message = result.json().get("detail")[0].get("msg")
+ log.error(f"{error_message}")
return None
- task_id = result.json()["task_id"]
- newurl = f"{self.url}/tasks/status/{task_id}"
- while True:
- result = self.session.get(newurl, headers=self.headers)
- if result.json()["status"] == "PENDING":
- log.debug("Retrying...")
- time.sleep(1)
- elif result.json()["status"] == "SUCCESS":
+
+ task_id = result.json().get("task_id")
+ task_query_url = f"{self.url}/tasks/status/{task_id}"
+ log.debug(f"Raw Data API Query URL: {task_query_url}")
+
+ polling_interval = 2 # Initial polling interval in seconds
+ max_polling_duration = 600 # Maximum duration for polling in seconds (10 minutes)
+ elapsed_time = 0
+
+ while elapsed_time < max_polling_duration:
+ response = self.session.get(task_query_url, headers=self.headers)
+ response_json = response.json()
+ response_status = response_json.get("status")
+
+ log.debug(f"Current status: {response_status}")
+
+ if response_status == "STARTED" or response_status == "PENDING":
+ # Adjust polling frequency after the first minute
+ if elapsed_time > 60:
+ polling_interval = 10 # Poll every 10 seconds after the first minute
+
+ # Wait before polling again
+ log.debug(f"Waiting {polling_interval} seconds before polling API again...")
+ time.sleep(polling_interval)
+ elapsed_time += polling_interval
+
+ elif response_status == "SUCCESS":
break
- zip = result.json()["result"]["download_url"]
- result = self.session.get(zip, headers=self.headers)
- fp = BytesIO(result.content)
- zfp = zipfile.ZipFile(fp, "r")
- zfp.extract("Export.geojson", "/tmp/")
- # Now take that taskid and hit /tasks/status url with get
- data = zfp.read("Export.geojson")
- os.remove("/tmp/Export.geojson")
- return json.loads(data)
- # return zfp.read("Export.geojson")
+ else:
+ # Maximum polling duration reached
+ log.error(f"{max_polling_duration} second elapsed. Aborting data extract.")
+ return None
+
+ if not isinstance(response_json, dict):
+ log.error(f"Raw data api response in wrong format: {response_json}")
+ return None
+
+ info = response_json.get("result", {})
+ log.debug(f"Raw Data API Response: {info}")
+
+ data_url = info.get("download_url")
+
+ if not data_url:
+ log.error("Raw data api no download_url returned. Skipping.")
+ return None
+
+ if not data_url.endswith(".zip"):
+ return data_url
+
+ with self.session.get(data_url, headers=self.headers) as response:
+ buffer = BytesIO(response.content)
+ with zipfile.ZipFile(buffer, "r") as zipped_file:
+ with zipped_file.open("Export.geojson") as geojson_file:
+ geojson_data = json.load(geojson_file)
+ return geojson_data
class PostgresClient(DatabaseAccess):
@@ -502,30 +585,57 @@ class PostgresClient(DatabaseAccess):
def __init__(
self,
uri: str,
- config: str = None,
+ config: Optional[Union[str, BytesIO]] = None,
+ auth_token: Optional[str] = None,
# output: str = None
):
"""This is a client for a postgres database.
Args:
- uri (str): The URI string for the database connection
- config (str): The filespec for the query config file
+ uri (str): The URI string for the database connection.
+ config (str, BytesIO): The query config file path or BytesIO object.
+ Currently only YAML format is accepted if BytesIO is passed.
Returns:
(bool): Whether the data base connection was sucessful
"""
super().__init__(uri)
self.qc = QueryConfig()
+
+ # Optional authentication
+ if auth_token:
+ self.headers["access-token"] = auth_token
+
if config:
- # Load the config file for the SQL query
- path = Path(config)
- if path.suffix == ".json":
- self.qc.parseJson(config)
- elif path.suffix == ".yaml":
- self.qc.parseYaml(config)
+ # filespec string passed
+ if isinstance(config, str):
+ path = Path(config)
+ if not path.exists():
+ raise FileNotFoundError(f"Config file does not exist {config}")
+ with open(config, "rb") as config_file:
+ config_data = BytesIO(config_file.read())
+ if path.suffix == ".json":
+ config_type = "json"
+ elif path.suffix == ".yaml":
+ config_type = "yaml"
+ else:
+ log.error(f"Unsupported file format: {config}")
+ raise ValueError(f"Invalid config {config}")
+
+ # BytesIO object passed
+ elif isinstance(config, BytesIO):
+ config_data = config
+ config_type = "yaml"
+
else:
- log.error(f"{path} is an unsupported file format!")
- quit()
+ log.warning(f"Config input is invalid for PostgresClient: {config}")
+ raise ValueError(f"Invalid config {config}")
+
+ # Parse the config
+ if config_type == "json":
+ self.qc.parseJson(config_data)
+ elif config_type == "yaml":
+ self.qc.parseYaml(config_data)
def createDB(self, dburi: uriParser):
"""Setup the postgres database connection.
@@ -551,31 +661,41 @@ def createDB(self, dburi: uriParser):
def execQuery(
self,
- boundary: FeatureCollection,
+ boundary: Union[FeatureCollection, Feature, dict],
customsql: str = None,
allgeom: bool = True,
+ clip_to_aoi: bool = False,
+ extra_params: dict = {},
):
"""This class generates executes the query using a local postgres
database, or a remote one that uses the Underpass schema.
Args:
- boundary (FeatureCollection): The boundary polygon
- customsql (str): Don't create the SQL, use the one supplied
- allgeom (bool): Whether to return centroids or all the full geometry
+ boundary (FeatureCollection, Feature, dict): The boundary polygon.
+ customsql (str): Don't create the SQL, use the one supplied.
+ allgeom (bool): Whether to return centroids or all the full geometry.
+ clip_to_aoi (bool): Remove polygons with centroids outside AOI.
Returns:
query (FeatureCollection): the json
"""
- log.info("Extracting features from Postgres...")
-
- if "features" in boundary:
- # FIXME: ideally this should support multipolygons
- poly = boundary["features"][0]["geometry"]
+ log.info("Parsing AOI geojson for data extract")
+
+ if (geom_type := boundary.get("type")) == "FeatureCollection":
+ # Convert each feature into a Shapely geometry
+ geometries = [shape(feature.get("geometry")) for feature in boundary.get("features", [])]
+ merged_geom = unary_union(geometries)
+ elif geom_type == "Feature":
+ merged_geom = shape(boundary.get("geometry"))
else:
- poly = boundary["geometry"]
- wkt = shape(poly)
+ merged_geom = shape(boundary)
+
+ # Convert the merged geoms to a single Polygon GeoJSON using convex hull
+ aoi_polygon = json.loads(to_geojson(merged_geom.convex_hull))
+ aoi_shape = shape(aoi_polygon)
if self.dbshell:
+ log.info("Extracting features from Postgres...")
if not customsql:
sql = self.createSQL(self.qc, allgeom)
else:
@@ -583,14 +703,36 @@ def execQuery(
alldata = list()
for query in sql:
# print(query)
- result = self.queryLocal(query, allgeom, wkt)
+ result = self.queryLocal(query, allgeom, aoi_shape)
if len(result) > 0:
alldata += result["features"]
collection = FeatureCollection(alldata)
else:
- request = self.createJson(self.qc, poly, allgeom)
- collection = self.queryRemote(request)
- return collection
+ log.info("Extracting features via remote call...")
+ json_config = self.createJson(self.qc, aoi_polygon, allgeom, extra_params)
+ collection = self.queryRemote(json_config)
+ # bind_zip=False, data is not zipped, return URL directly
+ if not json.loads(json_config).get("bind_zip", True):
+ return collection
+
+ if not collection:
+ log.warning("No data returned for data extract")
+ return collection
+
+ if not clip_to_aoi:
+ return collection
+
+ # TODO this may be implemented in raw-data-api directly
+ # TODO https://github.com/hotosm/raw-data-api/issues/207
+ # TODO remove code here if complete
+ # Only return polygons with centroids inside AOI
+ filtered_features = []
+ for feature in collection["features"]:
+ if (geom := feature.get("geometry")).get("type") == "Polygon":
+ if aoi_shape.contains(shape(shape(geom).centroid)):
+ filtered_features.append(feature)
+
+ return FeatureCollection(filtered_features)
def main():
diff --git a/pyproject.toml b/pyproject.toml
index fc21654..f6a53ab 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -45,6 +45,8 @@ testpaths = [
"tests",
]
pythonpath = "osm_rawdata"
+log_cli = true
+log_cli_level = "DEBUG"
[tool.commitizen]
name = "cz_conventional_commits"
diff --git a/tests/AOI_small.geojson b/tests/AOI_small.geojson
new file mode 100644
index 0000000..3157b8d
--- /dev/null
+++ b/tests/AOI_small.geojson
@@ -0,0 +1,40 @@
+{
+ "type": "FeatureCollection",
+ "features": [
+ {
+ "type": "Feature",
+ "properties": {},
+ "geometry": {
+ "coordinates": [
+ [
+ [
+ 8.535546937565442,
+ 47.382230798083725
+ ],
+ [
+ 8.535174262357543,
+ 47.38195055440562
+ ],
+ [
+ 8.535883778618114,
+ 47.38146042768176
+ ],
+ [
+ 8.536713339298046,
+ 47.381921438094025
+ ],
+ [
+ 8.535987697666258,
+ 47.38250133494034
+ ],
+ [
+ 8.535546937565442,
+ 47.382230798083725
+ ]
+ ]
+ ],
+ "type": "Polygon"
+ }
+ }
+ ]
+}
diff --git a/tests/conftest.py b/tests/conftest.py
new file mode 100644
index 0000000..c6a5b30
--- /dev/null
+++ b/tests/conftest.py
@@ -0,0 +1,39 @@
+# Copyright (c) 2022, 2023 Humanitarian OpenStreetMap Team
+#
+# This file is part of osm-rawdata.
+#
+# osm-rawdata is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# osm-rawdata is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with osm-rawdata. If not, see .
+#
+"""Configuration and fixtures for PyTest."""
+
+import logging
+import sys
+
+import psycopg2
+import pytest
+
+logging.basicConfig(
+ level="DEBUG",
+ format=("%(asctime)s.%(msecs)03d [%(levelname)s] " "%(name)s | %(funcName)s:%(lineno)d | %(message)s"),
+ datefmt="%y-%m-%d %H:%M:%S",
+ stream=sys.stdout,
+)
+
+log = logging.getLogger(__name__)
+
+
+@pytest.fixture(scope="session")
+def db():
+ """Existing psycopg2 connection."""
+ return psycopg2.connect("postgresql://fmtm:testpass@db:5432/underpass")
diff --git a/tests/test_config.py b/tests/test_config.py
index 417df59..ee86b6a 100755
--- a/tests/test_config.py
+++ b/tests/test_config.py
@@ -2,7 +2,7 @@
# Copyright (c) 2023 Humanitarian OpenStreetMap Team
#
-# This file is part of osm_fieldwork.
+# This file is part of osm_rawdata.
#
# This is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -15,10 +15,11 @@
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
-# along with osm_fieldwork. If not, see .
+# along with osm_rawdata. If not, see .
#
import os
+from io import BytesIO
#
# The JSON data files came from the raw-data-api project, and are currently
@@ -81,6 +82,14 @@ def test_formats():
assert qc.config["outputType"] == "shp" and qc.config["fileName"] == "Pokhara_all_features"
+def test_bytesio():
+ qc = QueryConfig()
+ with open(f"{rootdir}/formats.json", "rb") as file:
+ json_obj = BytesIO(file.read())
+ qc.parseJson(json_obj)
+ assert qc.config["outputType"] == "shp" and qc.config["fileName"] == "Pokhara_all_features"
+
+
def test_yaml_no_joins():
qc = QueryConfig()
qc.parseYaml(f"{rootdir}/buildings_no_join.yaml")
@@ -102,6 +111,29 @@ def test_yaml_no_joins():
assert op == "or"
+def test_yaml_no_joins_bytesio():
+ qc = QueryConfig()
+ with open(f"{rootdir}/buildings_no_join.yaml", "rb") as file:
+ yaml_obj = BytesIO(file.read())
+ qc.parseYaml(yaml_obj)
+
+ selected = qc.config["select"]
+ assert len(selected.keys()) == 3
+ assert len(list(selected.values())[0]) == 4
+
+ where = qc.config["where"]
+ assert len(where.keys()) == 3
+
+ nodes = list(where.values())[0]
+ assert len(nodes) == 4
+
+ building = nodes[0]["building"]
+ assert building == ["yes"]
+
+ op = nodes[0]["op"]
+ assert op == "or"
+
+
def test_everything():
# this query contains only the geometry, we want everything within this polygon
qc = QueryConfig()
diff --git a/tests/test_output.py b/tests/test_output.py
index d16ecf6..9e4a021 100755
--- a/tests/test_output.py
+++ b/tests/test_output.py
@@ -2,7 +2,7 @@
# Copyright (c) 2023 Humanitarian OpenStreetMap Team
#
-# This file is part of osm_fieldwork.
+# This file is part of osm_rawdata.
#
# This is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -15,7 +15,7 @@
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
-# along with osm_fieldwork. If not, see .
+# along with osm_rawdata. If not, see .
#
import os
diff --git a/tests/test_postgres.py b/tests/test_postgres.py
new file mode 100644
index 0000000..996280c
--- /dev/null
+++ b/tests/test_postgres.py
@@ -0,0 +1,79 @@
+#!/usr/bin/python3
+
+# Copyright (c) 2023 Humanitarian OpenStreetMap Team
+#
+# This file is part of osm_rawdata.
+#
+# This is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Underpass is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with osm_rawdata. If not, see .
+#
+
+import logging
+import os
+import time
+
+import geojson
+import requests
+
+import osm_rawdata as rw
+from osm_rawdata.postgres import PostgresClient
+
+log = logging.getLogger(__name__)
+
+rootdir = rw.__path__[0]
+if os.path.basename(rootdir) == "osm_rawdata":
+ rootdir = "./tests/"
+
+
+def test_data_extract():
+ pg = PostgresClient("underpass", f"{rootdir}/buildings.yaml")
+ aoi_file = open(f"{rootdir}/AOI_small.geojson", "r")
+ boundary = geojson.load(aoi_file)
+ data_extract = pg.execQuery(boundary)
+ assert len(data_extract.get("features")) == 16
+
+
+def test_data_extract_with_clipping():
+ # Sleep 5 seconds to reduce API load
+ time.sleep(5)
+
+ pg = PostgresClient("underpass", f"{rootdir}/buildings.yaml")
+ aoi_file = open(f"{rootdir}/AOI_small.geojson", "r")
+ boundary = geojson.load(aoi_file)
+ data_extract = pg.execQuery(boundary, clip_to_aoi=True)
+ print(data_extract)
+ assert len(data_extract.get("features")) == 13
+
+
+def test_data_extract_flatgeobuf():
+ # Sleep 5 seconds to reduce API load
+ time.sleep(5)
+
+ pg = PostgresClient("underpass", f"{rootdir}/buildings.yaml")
+ aoi_file = open(f"{rootdir}/AOI_small.geojson", "r")
+ boundary = geojson.load(aoi_file)
+ extract_url = pg.execQuery(
+ boundary,
+ extra_params={
+ "fileName": "osm-rawdata-test-extract",
+ "outputType": "fgb",
+ "bind_zip": False,
+ },
+ # param options: https://hotosm.github.io/raw-data-api/api/endpoints/#rawdatacurrentparams
+ )
+ assert extract_url.startswith("http")
+
+ with requests.head(extract_url) as response:
+ assert response.status_code == 200
+ assert response.headers["Content-Type"] == "binary/octet-stream"
+ assert response.headers["Content-Length"] == "8376"