-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDataDump.py
75 lines (66 loc) · 2.21 KB
/
DataDump.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import v20
import pandas as pd
import numpy as np
import time
import iso8601
import sys
from datetime import timedelta
def date_obj(date_str):
return iso8601.parse_date(date_str)
def max_weighted_bid(all_bids, time_str):
timestamp = date_obj(time_str)
if all_bids:
bids = [bid.price for bid in all_bids]
weighted_bids = [bid.liquidity * bid.price for bid in all_bids]
max_bid_idx = np.argmax(weighted_bids)
info = [timestamp, bids[max_bid_idx]]
else:
info = None
return info
class DataDump:
def __init__(self, ctx, accounts):
self.symbols = "EUR_DKK,XAU_CHF,XPD_USD"
self.dir = 'prices'
self.accounts = accounts
self.ctx = ctx
self.prices = {
sym: {
'time': [],
'bid': []
}
for sym in self.symbols.split(',')
}
self.candles = {
sym: {
'time_start': [],
'time_end': [],
'open': [],
'high': [],
'low': [],
'close': []
}
for sym in self.symbols.split(',')
}
self.candle_start = None
self.candle_end = None
# self.candle_interval = timedelta(15 * 60)
self.candle_interval = timedelta(60)
# self.latest_time = None
def start_stream(self, evt):
try:
response = self.ctx.pricing.stream(instruments=self.symbols,
accountID=self.accounts[0])
for msg_type, msg in response.parts():
if evt.is_set():
for instrument, data in self.prices.items():
df = pd.DataFrame(data)
df.to_csv(f"{self.dir}/{instrument}.csv")
return
if msg_type == 'pricing.ClientPrice':
info = max_weighted_bid(msg.bids, msg.time)
if info:
self.prices[msg.instrument]['time'].append(info[0])
self.prices[msg.instrument]['bid'].append(info[1])
except Exception as e:
print("EXCEPTION", e)
sys.exit(0)