-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasset.py
99 lines (83 loc) · 3.31 KB
/
asset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""An asset that is from China A-share stock market."""
from enum import Enum
import datetime
import math
import numpy as np
class Duration(Enum):
"""Frequency of the quote of an asset."""
DAY = 1
WEEK = 2
MONTH = 3
YEAR = 4
class Asset(object):
"""An asset that is from China A-share stock market.
Attributes:
id: Identifier of the asset.
name: Name of this asset.
"""
def __init__(self, id: str, name: str) -> None:
self.id = id
self.name = name
self._return_cache: 'dict[Duration, float]' = {}
self._risk_cache: 'dict[Duration, float]' = {}
self._corr_cache: 'dict[str, float]' = {}
def add_quote_history(self, prices: 'list[float]',
dates: 'list[datetime.date]') -> None:
if len(prices) != len(dates):
raise ValueError(
'Length of prices ({}) is not equal to dates ({})'.format(
len(prices), len(dates)))
self._prices = prices.copy()
self._dates = dates.copy()
def correlation_coefficient(self, asset: 'Asset') -> float:
"""Computes the correlation coefficient of two assets.
"""
if asset.id in self._corr_cache:
return self._corr_cache[asset.id]
date_to_price_l = {
d: self._prices[i]
for i, d in enumerate(self._dates)
}
date_to_price_r = {
d: asset._prices[i]
for i, d in enumerate(asset._dates)
}
common_dates = sorted(
set(date_to_price_l.keys()).intersection(date_to_price_r.keys()))
prices_l = [date_to_price_l[d] for d in common_dates]
prices_r = [date_to_price_r[d] for d in common_dates]
cov_matrix = np.cov(np.stack([prices_l, prices_r]))
self._corr_cache[asset.id] = cov_matrix[0][1] / math.sqrt(
cov_matrix[0][0] * cov_matrix[1][1])
return self._corr_cache[asset.id]
def expected_return(self, duration: Duration) -> float:
if duration in self._return_cache:
return self._return_cache[duration]
self._return_cache[duration] = np.array(
self._price_changes(duration)).mean()
return self._return_cache[duration]
def risk(self, duration: Duration) -> float:
if duration in self._risk_cache:
return self._risk_cache[duration]
self._risk_cache[duration] = np.array(
self._price_changes(duration)).std()
return self._risk_cache[duration]
def _price_changes(self, duration: Duration) -> 'list[float]':
date_to_prices = {
self._get_date_string(self._dates[i], duration): p
for i, p in enumerate(self._prices)
}
dates = sorted(date_to_prices.keys())
changes = []
for i in range(1, len(dates)):
changes.append(date_to_prices[dates[i]] /
date_to_prices[dates[i - 1]] - 1)
return changes
def _get_date_string(self, date: datetime.date, duration: Duration) -> str:
if duration == Duration.DAY:
return date.strftime("%Y%m%d")
elif duration == Duration.WEEK:
return f'{date.year}{date.month}-{date.day // 7}'
elif duration == Duration.MONTH:
return f'{date.year}{date.month}'
return f'{date.year}'