Skip to content

Commit

Permalink
fix: Make importing parquet files from Overture multi-threaded
Browse files Browse the repository at this point in the history
  • Loading branch information
rsavoye committed Oct 20, 2023
1 parent b5628ac commit a153179
Showing 1 changed file with 128 additions and 66 deletions.
194 changes: 128 additions & 66 deletions osm_rawdata/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def importThread(
Args:
data (list): The list of tiles to download
db
db (Connection): A database connection
"""
# log.debug(f"In importThread()")
#timer = Timer(text="importThread() took {seconds:.0f}s")
Expand Down Expand Up @@ -118,6 +118,90 @@ def importThread(
db.execute(sql)
# db.commit()

def parquatThread(
data: list,
db: Connection,
):
"""Thread to handle importing
Args:
data (list): The list of tiles to download
db (Connection): A database connection
"""
timer = Timer(text="parquetThread() took {seconds:.0f}s")
timer.start()
ways = table(
"ways_poly",
column("id"),
column("user"),
column("geom"),
column("tags"),
)

nodes = table(
"nodes",
column("id"),
column("user"),
column("geom"),
column("tags"),
)

index = -1
log.debug(f"There are {len(data)} entries in the data")
for i in range(0, len(data)):
# spin.next()
entry = dict()
entry["fixme"] = data[0][i].as_py()
# entry['names'] = data[3][i].as_py()
# entry['level'] = data[4][i].as_py()
if data[5][i].as_py() is not None:
entry["height"] = int(data[5][i].as_py())
else:
entry["height"] = 0
if data[6][i].as_py() is not None:
entry["levels"] = int(data[6][i].as_py())
else:
entry["levels"] = 0
entry["source"] = data[8][i][0][0][1].as_py()
if entry["source"] == "OpenStreetMap" or entry["source"] == "Microsoft ML Buildings":
# log.warning("Ignoring OpenStreetMap or MS Building entries as they are out of date")
# osm = data[8][i][0][2][1].as_py().split('@')
continue
entry["id"] = index
# LIDAR has no record ID
try:
entry["record"] = data[8][i][0][2][1].as_py()
except:
entry["record"] = 0
entry["geometry"] = data[10][i].as_py()
entry["building"] = "yes"
geom = entry["geometry"]
type = wkb.loads(entry["geometry"]).geom_type
if type != "Polygon":
# log.warning("Got Multipolygon")
continue
# FIXME: This is a hack, for some weird reason the
# entry dict doesn't convert to jsonb, it just
# becomes bytes
tags = {
"building": "yes",
"source": entry["source"],
"levels": entry["levels"],
"record": entry["record"],
"height": entry["height"],
}
scalar = select(cast(tags, JSONB))
sql = insert(ways).values(
# osm_id = entry['osm_id'],
geom=geom,
tags=scalar,
)
index -= 1
db.execute(sql)
# db.commit()
# print(f"FIXME2: {entry}")
timer.stop()

class MapImporter(object):
def __init__(
self,
Expand Down Expand Up @@ -213,71 +297,49 @@ def importParquet(
spin = PixelSpinner(f"Processing {infile}...")
timer = Timer(text="importParquet() took {seconds:.0f}s")
timer.start()
try:
ways = table(
"ways_poly",
column("id"),
column("user"),
column("geom"),
column("tags"),
)
pfile = pq.ParquetFile(infile)
data = pfile.read()
index = -1
for i in range(0, len(data) - 1):
spin.next()
entry = dict()
entry["fixme"] = data[0][i].as_py()
# entry['names'] = data[3][i].as_py()
# entry['level'] = data[4][i].as_py()
if data[5][i].as_py() is not None:
entry["height"] = int(data[5][i].as_py())
else:
entry["height"] = 0
if data[6][i].as_py() is not None:
entry["levels"] = int(data[6][i].as_py())
else:
entry["levels"] = 0
entry["source"] = data[8][i][0][0][1].as_py()
if entry["source"] == "OpenStreetMap":
# log.warning("Ignoring OpenStreetMap entries as they are out of date")
# osm = data[8][i][0][2][1].as_py().split('@')
continue
entry["id"] = index
# LIDAR has no record ID
try:
entry["record"] = data[8][i][0][2][1].as_py()
except:
entry["record"] = 0
entry["geometry"] = data[10][i].as_py()
entry["building"] = "yes"
geom = entry["geometry"]
type = wkb.loads(entry["geometry"]).geom_type
if type != "Polygon":
# log.warning("Got Multipolygon")
continue
# FIXME: This is a hack, for some weird reason the
# entry dict doesn't convert to jsonb, it just
# becomes bytes
tags = {
"building": "yes",
"source": entry["source"],
"levels": entry["levels"],
"record": entry["record"],
"height": entry["height"],
}
scalar = select(cast(tags, JSONB))
sql = insert(ways).values(
# osm_id = entry['osm_id'],
geom=geom,
tags=scalar,
)
index -= 1
self.db.execute(sql)
self.db.commit()
# print(f"FIXME2: {entry}")
except Exception as e:
log.error(e)
ways = table(
"ways_poly",
column("id"),
column("user"),
column("geom"),
column("tags"),
)
pfile = pq.ParquetFile(infile)
data = pfile.read()

connections = list()
for thread in range(0, cores + 1):
engine = create_engine(f"postgresql://{self.dburi}", echo=False)
if not database_exists(engine.url):
create_database(engine.url)
connections.append(engine.connect())
sessionmaker(autocommit=False, autoflush=False, bind=engine)

if thread == 0:
meta = MetaData()
meta.create_all(engine)

# A chunk is a group of threads
entries = len(data)
log.debug(f"There are {entries} entries in {infile}")
chunk = round(entries / cores)

if entries <= chunk:
resut = parquatThread(data, connections[0])
timer.stop()
return True

index = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
block = 0
while block <= entries:
log.debug("Dispatching Block %d:%d" % (block, block + chunk))
result = executor.submit(parquatThread, data[block : block + chunk], connections[index])
block += chunk
index += 1
executor.shutdown()
timer.stop()

timer.stop()

def importGeoJson(
Expand Down

0 comments on commit a153179

Please sign in to comment.