-
Notifications
You must be signed in to change notification settings - Fork 8
/
dtw.py
106 lines (89 loc) · 3.29 KB
/
dtw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import numbers
import numpy as np
from collections import defaultdict
def __difference(a, b):
return abs(a - b)
def __norm(p):
return lambda a, b: np.linalg.norm(np.atleast_1d(a) - np.atleast_1d(b), p)
def __prep_inputs(x, y, dist):
x = np.asanyarray(x, dtype='float')
y = np.asanyarray(y, dtype='float')
if x.ndim == y.ndim > 1 and x.shape[1] != y.shape[1]:
raise ValueError('second dimension of x and y must be the same')
if isinstance(dist, numbers.Number) and dist <= 0:
raise ValueError('dist cannot be a negative integer')
if dist is None:
if x.ndim == 1:
dist = __difference
else:
dist = __norm(p=1)
elif isinstance(dist, numbers.Number):
dist = __norm(p=dist)
return x, y, dist
def dtw(x, y, dist=None):
''' return the distance between 2 time series without approximation
Parameters
----------
x : array_like
input array 1
y : array_like
input array 2
dist : function or int
The method for calculating the distance between x[i] and y[j]. If
dist is an int of value p > 0, then the p-norm will be used. If
dist is a function then dist(x[i], y[j]) will be used. If dist is
None then abs(x[i] - y[j]) will be used.
Returns
-------
distance : float
the approximate distance between the 2 time series
path : list
list of indexes for the inputs x and y
'''
x, y, dist = __prep_inputs(x, y, dist)
return __dtw(x, y, None, dist)
def __dtw(x, y, window, dist):
len_x, len_y = len(x), len(y)
if window is None:
window = [(i, j) for i in range(len_x) for j in range(len_y)]
window = ((i + 1, j + 1) for i, j in window)
D = defaultdict(lambda: (float('inf'),))
D[0, 0] = (0, 0, 0)
for i, j in window:
dt = dist(x[i-1], y[j-1])
D[i, j] = min((D[i-1, j][0]+dt, i-1, j), (D[i, j-1][0]+dt, i, j-1),
(D[i-1, j-1][0]+dt, i-1, j-1), key=lambda a: a[0])
path = []
i, j = len_x, len_y
while not (i == j == 0):
path.append((i-1, j-1))
i, j = D[i, j][1], D[i, j][2]
path.reverse()
return (D[len_x, len_y][0], path)
def __reduce_by_half(x):
return [(x[i] + x[1+i]) / 2 for i in range(0, len(x) - len(x) % 2, 2)]
def __expand_window(path, len_x, len_y, radius):
path_ = set(path)
for i, j in path:
for a, b in ((i + a, j + b)
for a in range(-radius, radius+1)
for b in range(-radius, radius+1)):
path_.add((a, b))
window_ = set()
for i, j in path_:
for a, b in ((i * 2, j * 2), (i * 2, j * 2 + 1),
(i * 2 + 1, j * 2), (i * 2 + 1, j * 2 + 1)):
window_.add((a, b))
window = []
start_j = 0
for i in range(0, len_x):
new_start_j = None
for j in range(start_j, len_y):
if (i, j) in window_:
window.append((i, j))
if new_start_j is None:
new_start_j = j
elif new_start_j is not None:
break
start_j = new_start_j
return window