diff --git a/osm_rawdata/config.py b/osm_rawdata/config.py index e861ae4..8de7488 100755 --- a/osm_rawdata/config.py +++ b/osm_rawdata/config.py @@ -60,12 +60,14 @@ def __init__(self, boundary: Polygon = None): "nodes": [], "ways_poly": [], "ways_line": [], + "relations": [], }, "tables": [], "where": { "nodes": [], "ways_poly": [], "ways_line": [], + "relations": [], }, "keep": [], } @@ -280,6 +282,25 @@ def convert_geometry(geom_type): return self.config + def getKeys(self): + """ """ + keys = list() + # The first column returned is always the geometry + keys.append("geometry") + for key, value in self.config["select"].items(): + if isinstance(value, list): + for v in value: + if isinstance(v, str): + # print(f"\tSelecting table '{key}' has value '{v}'") + keys.append(v) + continue + for k1, v1 in v.items(): + keys.append(k1) + # print(f"\tSelecting table '{key}' tag '{k1}'") + # else: + # print(f"\tSelecting tag '{key}'") + return keys + def dump(self): """Dump the contents of the internal data strucute for debugging purposes.""" print("Dumping QueryConfig class") diff --git a/osm_rawdata/pgasync.py b/osm_rawdata/pgasync.py index e4928b5..677e665 100755 --- a/osm_rawdata/pgasync.py +++ b/osm_rawdata/pgasync.py @@ -54,6 +54,7 @@ def __init__(self): self.pg = None self.dburi = None self.qc = None + self.clipped = False async def connect( self, @@ -199,6 +200,35 @@ async def createJson( feature["centroid"] = true return json.dumps(feature) + async def recordsToFeatures( + self, + records: list, + ) -> list: + """Convert an asyncpg.Record to a GeoJson FeatureCollection. + + Args: + records (list): The records from an SQL query + + Returns: + (list): The converted data + """ + data = list() + keys = self.qc.getKeys() + for entry in records: + i = 0 + geom = None + last = len(entry) - 1 + props = dict() + while i <= last: + if keys[i] == "geometry": + geom = wkt.loads(entry[i]) + elif entry[i] is not None: + props[keys[i]] = entry[i] + i += 1 + data.append(Feature(geometry=geom, properties=props)) + + return data + async def createSQL( self, config: QueryConfig, @@ -218,12 +248,18 @@ async def createSQL( for table in config.config["tables"]: select = "SELECT " if allgeom: - select += "ST_AsText(geom)" + select += "ST_AsText(geom) AS geometry" else: - select += "ST_AsText(ST_Centroid(geom))" + select += "ST_AsText(ST_Centroid(geom)) AS geometry" + # FIXME: This part is OSM specific, and should be made more + # general. these two columns are OSM attributes, so each + # have their own column in the database. All the other + # values are in a single JSON column. select += ", osm_id, version, " for entry in config.config["select"][table]: for k1, v1 in entry.items(): + if k1 == "osm_id" or k1 == "version": + continue select += f"tags->>'{k1}', " select = select[:-2] @@ -368,7 +404,7 @@ async def getPage( async def execute( self, sql: str, - ): + ) -> list: """Execute a raw SQL query and return the results. Args: @@ -378,13 +414,31 @@ async def execute( (list): The results of the query """ # print(sql) + data = list() + if sql.find(";") <= 0: + queries = [sql] + async with self.pg.transaction(): - try: - result = await self.pg.fetch(sql) - return result - except Exception as e: - log.error(f"Couldn't execute query! {e}\n{sql}") - return list() + queries = list() + # If using an SRID, we have to hide the sem-colon so the string + # doesn't split in the wrong place. + cmds = sql.replace("SRID=4326;P", "SRID=4326@P").split(";") + for sub in cmds: + queries.append(sub.replace("@", ";")) + continue + + for query in queries: + try: + # print(query) + result = await self.pg.fetch(query) + if len(result) > 0: + data += result + + except Exception as e: + log.error(f"Couldn't execute query! {e}\n{query}") + return list() + + return data async def queryLocal( self, @@ -404,15 +458,16 @@ async def queryLocal( """ features = list() # if no boundary, it's already been setup + # if boundary and not self.clipped: if boundary: sql = f"DROP VIEW IF EXISTS ways_view;CREATE VIEW ways_view AS SELECT * FROM ways_poly WHERE ST_CONTAINS(ST_GeomFromEWKT('SRID=4326;{boundary.wkt}'), geom)" - await self.execute(sql) + await self.pg.execute(sql) sql = f"DROP VIEW IF EXISTS nodes_view;CREATE VIEW nodes_view AS SELECT * FROM nodes WHERE ST_CONTAINS(ST_GeomFromEWKT('SRID=4326;{boundary.wkt}'), geom)" - await self.execute(sql) + await self.pg.execute(sql) sql = f"DROP VIEW IF EXISTS lines_view;CREATE VIEW lines_view AS SELECT * FROM ways_line WHERE ST_CONTAINS(ST_GeomFromEWKT('SRID=4326;{boundary.wkt}'), geom)" - await self.execute(sql) - sql = f"DROP VIEW IF EXISTS relations_view;CREATE TEMP VIEW relations_view AS SELECT * FROM nodes WHERE ST_CONTAINS(ST_GeomFromEWKT('SRID=4326;{boundary.wkt}'), geom)" - await self.execute(sql) + await self.pg.execute(sql) + sql = f"DROP VIEW IF EXISTS relations_view;CREATE VIEW relations_view AS SELECT * FROM nodes WHERE ST_CONTAINS(ST_GeomFromEWKT('SRID=4326;{boundary.wkt}'), geom)" + await self.pg.execute(sql) if query.find(" ways_poly ") > 0: query = query.replace("ways_poly", "ways_view") @@ -467,6 +522,7 @@ async def queryLocal( # This should be the version tags[res[3][:-1]] = item[2] features.append(Feature(geometry=geom, properties=tags)) + return FeatureCollection(features) # return features @@ -588,28 +644,47 @@ async def execQuery( """ log.info("Extracting features from Postgres...") - if "features" in boundary: - # FIXME: ideally this should support multipolygons - poly = boundary["features"][0]["geometry"] - else: - poly = boundary["geometry"] - wkt = shape(poly) - - if not self.pg.is_closed(): - if not customsql: - sql = await self.createSQL(self.qc, allgeom) + polygons = list() + if "geometry" in boundary: + polygons.append(boundary["geometry"]) + + if boundary["type"] == "MultiPolygon": + # poly = boundary["features"][0]["geometry"] + points = list() + for coords in boundary["features"]["coordinates"]: + for pt in coords: + for xy in pt: + points.append([float(xy[0]), float(xy[1])]) + poly = Polygon(points) + polygons.append(poly) + + for poly in polygons: + wkt = shape(poly) + + if not self.pg.is_closed(): + if not customsql: + sql = await self.createSQL(self.qc, allgeom) + else: + sql = [customsql] + alldata = list() + queries = list() + if type(sql) != list: + queries = sql.split(";") + else: + queries = sql + + for query in queries: + # print(query) + result = await self.queryLocal(query, allgeom, wkt) + if len(result) > 0: + # Some queries don't return any data, for example + # when creating a VIEW. + alldata += await self.recordsToFeatures(result) + collection = FeatureCollection(alldata) else: - sql = [customsql] - alldata = list() - for query in sql: - # print(query) - result = await self.queryLocal(query, allgeom, wkt) - if len(result) > 0: - alldata += result["features"] - collection = FeatureCollection(alldata) - else: - request = await self.createJson(self.qc, poly, allgeom) - collection = await self.queryRemote(request) + request = await self.createJson(self.qc, poly, allgeom) + collection = await self.queryRemote(request) + return collection @@ -664,15 +739,21 @@ async def main(): stream=sys.stdout, ) - infile = open(args.boundary, "r") - poly = geojson.load(infile) + if args.boundary: + infile = open(args.boundary, "r") + inpoly = geojson.load(infile) + if inpoly["type"] == "MultiPolygon": + poly = FeatureCollection(inpoly) + else: + log.error("A boundary file is needed!") + if args.uri is not None: log.info("Using a Postgres database for the data source") db = DatabaseAccess() await db.connect(args.uri) - result = await db.execute("SELECT * FROM nodes LIMIT 10;") - print(result) - quit() + # result = await db.execute("SELECT * FROM nodes LIMIT 10;") + # print(result) + # quit() # await db.connect(args.uri) # data = await db.pg.fetch("SELECT * FROM schemas LIMIT 10;") # print(data) @@ -700,3 +781,5 @@ async def main(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(main()) + +# ./pgasync.py -u localhost/colorado -b /play/MapData/States/Colorado/Boundaries/NationalForest/MedicineBowNationalForest.geojson -c /usr/local/lib/python3.12/site-packages/osm_fieldwork/data_models/highways.yaml