-
Notifications
You must be signed in to change notification settings - Fork 105
/
hrp_routines.py
80 lines (65 loc) · 2.51 KB
/
hrp_routines.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import numpy as np
import pandas as pd
from scipy.cluster.hierarchy import dendrogram, linkage
from scipy.cluster.hierarchy import cophenet
from scipy.spatial.distance import pdist
import pylab
# On 20151227 by MLdP <[email protected]>
# Hierarchical Risk Parity
def getIVP(cov, **kargs):
# Compute the inverse-variance portfolio
ivp = 1. / np.diag(cov)
ivp /= ivp.sum()
return ivp
def getClusterVar(cov,cItems):
# Compute variance per cluster
cov_=cov.loc[cItems,cItems] # matrix slice
w_=getIVP(cov_).reshape(-1,1)
cVar=np.dot(np.dot(w_.T,cov_),w_)[0,0]
return cVar
def getQuasiDiag(link):
# Sort clustered items by distance
link = link.astype(int)
sortIx = pd.Series([link[-1, 0], link[-1, 1]])
numItems = link[-1, 3] # number of original items
while sortIx.max() >= numItems:
sortIx.index = range(0, sortIx.shape[0] * 2, 2) # make space
df0 = sortIx[sortIx >= numItems] # find clusters
i = df0.index
j = df0.values - numItems
sortIx[i] = link[j, 0] # item 1
df0 = pd.Series(link[j, 1], index=i + 1)
sortIx = sortIx.append(df0) # item 2
sortIx = sortIx.sort_index() # re-sort
sortIx.index = range(sortIx.shape[0]) # re-index
return sortIx.tolist()
def getRecBipart(cov, sortIx):
# Compute HRP alloc
w = pd.Series(1, index=sortIx)
cItems = [sortIx] # initialize all items in one cluster
while len(cItems) > 0:
cItems = [i[j:k] for i in cItems for j, k in ((0, len(i) // 2), (len(i) // 2, len(i))) if len(i) > 1] # bi-section
for i in range(0, len(cItems), 2): # parse in pairs
cItems0 = cItems[i] # cluster 1
cItems1 = cItems[i + 1] # cluster 2
cVar0 = getClusterVar(cov, cItems0)
cVar1 = getClusterVar(cov, cItems1)
alpha = 1 - cVar0 / (cVar0 + cVar1)
w[cItems0] *= alpha # weight 1
w[cItems1] *= 1 - alpha # weight 2
return w
def correlDist(corr):
# A distance matrix based on correlation, where 0<=d[i,j]<=1
# This is a proper distance metric
dist = ((1 - corr) / 2.)**.5 # distance matrix
return dist
def getHRP(cov, corr):
# Construct a hierarchical portfolio
dist = correlDist(corr)
link = linkage(dist, 'single')
#dn = sch.dendrogram(link, labels=cov.index.values, label_rotation=90)
#plt.show()
sortIx = getQuasiDiag(link)
sortIx = corr.index[sortIx].tolist()
hrp = getRecBipart(cov, sortIx)
return hrp.sort_index()