-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathtest_bugs.py
149 lines (130 loc) · 5.14 KB
/
test_bugs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python3
# encoding: utf-8
"""
tests.test_bugs
~~~~~~~~~~~~~~~
:author: Wannes Meert
:copyright: Copyright 2018 DTAI, KU Leuven and Sirris.
:license: Apache License, Version 2.0, see LICENSE for details.
"""
import os
import sys
import logging
from pathlib import Path
import csv
import leuvenmapmatching as mm
from leuvenmapmatching.map.inmem import InMemMap
from leuvenmapmatching.map.sqlite import SqliteMap
from leuvenmapmatching.matcher.simple import SimpleMatcher
from leuvenmapmatching.matcher.distance import DistanceMatcher
import leuvenmapmatching.visualization as mm_viz
MYPY = False
if MYPY:
from typing import List, Tuple
logger = mm.logger
directory = None
def test_bug1():
dist = 10
nb_steps = 20
map_con = InMemMap("map", graph={
"A": ((1, dist), ["B"]),
"B": ((2, dist), ["A", "C", "CC"]),
"C": ((3, 0), ["B", "D"]),
"D": ((4 + dist, 0), ["C", "E"]),
"CC": ((3, 2 * dist), ["B", "DD"]),
"DD": ((4 + dist, 2 * dist), ["CC", "E"]),
"E": ((5 + dist, dist), ["F", "D", "DD"]),
"F": ((6 + dist, dist), ["E", ]),
}, use_latlon=False)
i = 10
path = [(1.1, 2*dist*i/nb_steps),
(2.1, 2*dist*i/nb_steps),
(5.1+dist, 2*dist*i/nb_steps),
(6.1+dist, 2*dist*i/nb_steps)
# (1, len*i/nb_steps),
# (2, len*i/nb_steps),
# (3, len*i/nb_steps)
]
matcher = SimpleMatcher(map_con, max_dist=dist + 1, obs_noise=dist + 1, min_prob_norm=None,
non_emitting_states=True)
nodes = matcher.match(path, unique=False)
print("Solution: ", nodes)
if directory:
import leuvenmapmatching.visualization as mm_vis
matcher.print_lattice()
matcher.print_lattice_stats()
mm_vis.plot_map(map_con, path=path, nodes=nodes, counts=matcher.node_counts(),
show_labels=True, filename=str(directory / "test_bugs_1.png"))
def test_bug2():
this_path = Path(os.path.realpath(__file__)).parent / "rsrc" / "bug2"
edges_fn = this_path / "edgesrl.csv"
nodes_fn = this_path / "nodesrl.csv"
path_fn = this_path / "path.csv"
zip_fn = this_path / "leuvenmapmatching_testdata.zip"
if not (edges_fn.exists() and nodes_fn.exists() and path_fn.exists()):
import requests
url = 'https://people.cs.kuleuven.be/wannes.meert/leuvenmapmatching/leuvenmapmatching_testdata.zip'
logger.debug("Download testfiles from kuleuven.be")
r = requests.get(url, stream=True)
with zip_fn.open('wb') as ofile:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
ofile.write(chunk)
import zipfile
logger.debug("Unzipping leuvenmapmatching_testdata.zip")
with zipfile.ZipFile(str(zip_fn), "r") as zip_ref:
zip_ref.extractall(str(zip_fn.parent))
logger.debug(f"Reading map ...")
mmap = SqliteMap("road_network", use_latlon=True, dir=this_path)
path = []
with path_fn.open("r") as path_f:
reader = csv.reader(path_f, delimiter=',')
for row in reader:
lat, lon = [float(coord) for coord in row]
path.append((lat, lon))
node_cnt = 0
with nodes_fn.open("r") as nodes_f:
reader = csv.reader(nodes_f, delimiter=',')
for row in reader:
nid, lonlat, _ = row
nid = int(nid)
lon, lat = [float(coord) for coord in lonlat[1:-1].split(",")]
mmap.add_node(nid, (lat, lon), ignore_doubles=True, no_index=True, no_commit=True)
node_cnt += 1
edge_cnt = 0
with edges_fn.open("r") as edges_f:
reader = csv.reader(edges_f, delimiter=',')
for row in reader:
_eid, nid1, nid2, pid = [int(val) for val in row]
mmap.add_edge(nid1, nid2, edge_type=0, path=pid, no_index=True, no_commit=True)
edge_cnt += 1
logger.debug(f"... done: {node_cnt} nodes and {edge_cnt} edges")
logger.debug("Indexing ...")
mmap.reindex_nodes()
mmap.reindex_edges()
logger.debug("... done")
matcher = DistanceMatcher(mmap, min_prob_norm=0.001,
max_dist=200, obs_noise=4.07,
non_emitting_states=True)
# path = path[:2]
nodes, idx = matcher.match(path, unique=True)
path_pred = matcher.path_pred
if directory:
import matplotlib.pyplot as plt
matcher.print_lattice_stats()
logger.debug("Plotting post map ...")
fig = plt.figure(figsize=(100, 100))
ax = fig.get_axes()
mm_viz.plot_map(mmap, matcher=matcher, use_osm=True, ax=ax,
show_lattice=False, show_labels=True, show_graph=False, zoom_path=True,
show_matching=True)
plt.savefig(str(directory / "test_bug1.png"))
plt.close(fig)
logger.debug("... done")
if __name__ == "__main__":
mm.logger.setLevel(logging.DEBUG)
mm.logger.addHandler(logging.StreamHandler(sys.stdout))
directory = Path(os.environ.get('TESTDIR', Path(__file__).parent))
print(f"Saving files to {directory}")
# test_bug1()
test_bug2()