-
Notifications
You must be signed in to change notification settings - Fork 3
/
splitosm.py
70 lines (64 loc) · 2.82 KB
/
splitosm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import logging
import os
import geopandas
from shapely.geometry import Polygon
from osmxml import write_osmxml
from shapely_utils import way_is_inside_or_crossing, shortest_way_inside_or_crossing
_log = logging.getLogger("splitosm")
def sort_polygons(polygons):
out_polygons = []
while len(polygons) > 0:
topmost_p = polygons[0].exterior.coords[0]
topmost_polygon = polygons[0]
found_one = False
for polygon in polygons:
if len(out_polygons) > 0:
found_count = 0
for p in polygon.exterior.coords:
if p in out_polygons[-1].exterior.coords:
found_count += 1
if found_count < 2:
continue
found_one = True
for p in polygon.exterior.coords:
if p[1] > topmost_p[1] or (p[1] == topmost_p[1] and p[0] > topmost_p[0]):
topmost_p = p
topmost_polygon = polygon
if not found_one:
for polygon in polygons:
for p in polygon.exterior.coords:
if p[1] > topmost_p[1] or (p[1] == topmost_p[1] and p[0] > topmost_p[0]):
topmost_p = p
topmost_polygon = polygon
out_polygons.append(topmost_polygon)
polygons.remove(topmost_polygon)
return out_polygons
def read_geojson_with_polygons(filename):
gdf = geopandas.read_file(filename, encoding='utf-8')
gdf.to_crs("epsg:3006", inplace=True)
polygons = []
for _, row in gdf.iterrows():
polygons.append(Polygon(row.geometry))
return sort_polygons(polygons)
def splitosm(way_db, outer_polygons, polygons, output_dir, basename, write_rlid=True):
for idx, polygon in enumerate(polygons):
_log.info(f"Getting all ways and points that is inside or intersects area {idx+1} (of {len(polygons)})")
ways = []
points = []
for segs in way_db.way_db.values():
for seg in segs:
w = shortest_way_inside_or_crossing(polygon, seg.way)
for poly in outer_polygons:
w = shortest_way_inside_or_crossing(poly, w)
if w is not None:
ways.append(seg.make_copy_new_way(w))
break
for segs in way_db.point_db.values():
for point in segs:
if way_is_inside_or_crossing(polygon, point.way):
points.append(point)
_log.info(f"{basename} subarea {idx+1} contains {len(ways)} ways and {len(points)} points")
output_filename = os.path.join(output_dir, f"{basename}-subarea-{idx+1:02d}.osm")
_log.info(f"Writing output to {output_filename}")
write_osmxml(ways, points, output_filename, write_rlid)
_log.info("done writing output")