From a15317958a11eb0c6d457395425a2b2349224bb5 Mon Sep 17 00:00:00 2001 From: Rob Savoye Date: Fri, 20 Oct 2023 11:57:33 -0600 Subject: [PATCH] fix: Make importing parquet files from Overture multi-threaded --- osm_rawdata/importer.py | 194 ++++++++++++++++++++++++++-------------- 1 file changed, 128 insertions(+), 66 deletions(-) diff --git a/osm_rawdata/importer.py b/osm_rawdata/importer.py index f35e8ad..c6db989 100755 --- a/osm_rawdata/importer.py +++ b/osm_rawdata/importer.py @@ -67,7 +67,7 @@ def importThread( Args: data (list): The list of tiles to download - db + db (Connection): A database connection """ # log.debug(f"In importThread()") #timer = Timer(text="importThread() took {seconds:.0f}s") @@ -118,6 +118,90 @@ def importThread( db.execute(sql) # db.commit() +def parquatThread( + data: list, + db: Connection, + ): + """Thread to handle importing + + Args: + data (list): The list of tiles to download + db (Connection): A database connection + """ + timer = Timer(text="parquetThread() took {seconds:.0f}s") + timer.start() + ways = table( + "ways_poly", + column("id"), + column("user"), + column("geom"), + column("tags"), + ) + + nodes = table( + "nodes", + column("id"), + column("user"), + column("geom"), + column("tags"), + ) + + index = -1 + log.debug(f"There are {len(data)} entries in the data") + for i in range(0, len(data)): + # spin.next() + entry = dict() + entry["fixme"] = data[0][i].as_py() + # entry['names'] = data[3][i].as_py() + # entry['level'] = data[4][i].as_py() + if data[5][i].as_py() is not None: + entry["height"] = int(data[5][i].as_py()) + else: + entry["height"] = 0 + if data[6][i].as_py() is not None: + entry["levels"] = int(data[6][i].as_py()) + else: + entry["levels"] = 0 + entry["source"] = data[8][i][0][0][1].as_py() + if entry["source"] == "OpenStreetMap" or entry["source"] == "Microsoft ML Buildings": + # log.warning("Ignoring OpenStreetMap or MS Building entries as they are out of date") + # osm = data[8][i][0][2][1].as_py().split('@') + continue + entry["id"] = index + # LIDAR has no record ID + try: + entry["record"] = data[8][i][0][2][1].as_py() + except: + entry["record"] = 0 + entry["geometry"] = data[10][i].as_py() + entry["building"] = "yes" + geom = entry["geometry"] + type = wkb.loads(entry["geometry"]).geom_type + if type != "Polygon": + # log.warning("Got Multipolygon") + continue + # FIXME: This is a hack, for some weird reason the + # entry dict doesn't convert to jsonb, it just + # becomes bytes + tags = { + "building": "yes", + "source": entry["source"], + "levels": entry["levels"], + "record": entry["record"], + "height": entry["height"], + } + scalar = select(cast(tags, JSONB)) + sql = insert(ways).values( + # osm_id = entry['osm_id'], + geom=geom, + tags=scalar, + ) + index -= 1 + db.execute(sql) + # db.commit() + # print(f"FIXME2: {entry}") + timer.stop() + class MapImporter(object): def __init__( self, @@ -213,71 +297,49 @@ def importParquet( spin = PixelSpinner(f"Processing {infile}...") timer = Timer(text="importParquet() took {seconds:.0f}s") timer.start() - try: - ways = table( - "ways_poly", - column("id"), - column("user"), - column("geom"), - column("tags"), - ) - pfile = pq.ParquetFile(infile) - data = pfile.read() - index = -1 - for i in range(0, len(data) - 1): - spin.next() - entry = dict() - entry["fixme"] = data[0][i].as_py() - # entry['names'] = data[3][i].as_py() - # entry['level'] = data[4][i].as_py() - if data[5][i].as_py() is not None: - entry["height"] = int(data[5][i].as_py()) - else: - entry["height"] = 0 - if data[6][i].as_py() is not None: - entry["levels"] = int(data[6][i].as_py()) - else: - entry["levels"] = 0 - entry["source"] = data[8][i][0][0][1].as_py() - if entry["source"] == "OpenStreetMap": - # log.warning("Ignoring OpenStreetMap entries as they are out of date") - # osm = data[8][i][0][2][1].as_py().split('@') - continue - entry["id"] = index - # LIDAR has no record ID - try: - entry["record"] = data[8][i][0][2][1].as_py() - except: - entry["record"] = 0 - entry["geometry"] = data[10][i].as_py() - entry["building"] = "yes" - geom = entry["geometry"] - type = wkb.loads(entry["geometry"]).geom_type - if type != "Polygon": - # log.warning("Got Multipolygon") - continue - # FIXME: This is a hack, for some weird reason the - # entry dict doesn't convert to jsonb, it just - # becomes bytes - tags = { - "building": "yes", - "source": entry["source"], - "levels": entry["levels"], - "record": entry["record"], - "height": entry["height"], - } - scalar = select(cast(tags, JSONB)) - sql = insert(ways).values( - # osm_id = entry['osm_id'], - geom=geom, - tags=scalar, - ) - index -= 1 - self.db.execute(sql) - self.db.commit() - # print(f"FIXME2: {entry}") - except Exception as e: - log.error(e) + ways = table( + "ways_poly", + column("id"), + column("user"), + column("geom"), + column("tags"), + ) + pfile = pq.ParquetFile(infile) + data = pfile.read() + + connections = list() + for thread in range(0, cores + 1): + engine = create_engine(f"postgresql://{self.dburi}", echo=False) + if not database_exists(engine.url): + create_database(engine.url) + connections.append(engine.connect()) + sessionmaker(autocommit=False, autoflush=False, bind=engine) + + if thread == 0: + meta = MetaData() + meta.create_all(engine) + + # A chunk is a group of threads + entries = len(data) + log.debug(f"There are {entries} entries in {infile}") + chunk = round(entries / cores) + + if entries <= chunk: + resut = parquatThread(data, connections[0]) + timer.stop() + return True + + index = 0 + with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor: + block = 0 + while block <= entries: + log.debug("Dispatching Block %d:%d" % (block, block + chunk)) + result = executor.submit(parquatThread, data[block : block + chunk], connections[index]) + block += chunk + index += 1 + executor.shutdown() + timer.stop() + timer.stop() def importGeoJson(