-
Notifications
You must be signed in to change notification settings - Fork 7
/
strategies.py
381 lines (260 loc) · 13.3 KB
/
strategies.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
import logging
from typing import *
import time
from threading import Timer
import pandas as pd
from models import *
if TYPE_CHECKING: # Import the connector class names only for typing purpose (the classes aren't actually imported)
from connectors.bitmex import BitmexClient
from connectors.binance import BinanceClient
logger = logging.getLogger()
# TF_EQUIV is used in parse_trades() to compare the last candle timestamp to the new trade timestamp
TF_EQUIV = {"1m": 60, "5m": 300, "15m": 900, "30m": 1800, "1h": 3600, "4h": 14400}
class Strategy:
def __init__(self, client: Union["BitmexClient", "BinanceClient"], contract: Contract, exchange: str,
timeframe: str, balance_pct: float, take_profit: float, stop_loss: float, strat_name):
self.client = client
self.contract = contract
self.exchange = exchange
self.tf = timeframe
self.tf_equiv = TF_EQUIV[timeframe] * 1000
self.balance_pct = balance_pct
self.take_profit = take_profit
self.stop_loss = stop_loss
self.strat_name = strat_name
self.ongoing_position = False
self.candles: List[Candle] = []
self.trades: List[Trade] = []
self.logs = []
def _add_log(self, msg: str):
logger.info("%s", msg)
self.logs.append({"log": msg, "displayed": False})
def parse_trades(self, price: float, size: float, timestamp: int) -> str:
"""
Parse new trades coming in from the websocket and update the Candle list based on the timestamp.
:param price: The trade price
:param size: The trade size
:param timestamp: Unix timestamp in milliseconds
:return:
"""
timestamp_diff = int(time.time() * 1000) - timestamp
if timestamp_diff >= 2000:
logger.warning("%s %s: %s milliseconds of difference between the current time and the trade time",
self.exchange, self.contract.symbol, timestamp_diff)
last_candle = self.candles[-1]
# Same Candle
if timestamp < last_candle.timestamp + self.tf_equiv:
last_candle.close = price
last_candle.volume += size
if price > last_candle.high:
last_candle.high = price
elif price < last_candle.low:
last_candle.low = price
# Check Take profit / Stop loss
for trade in self.trades:
if trade.status == "open" and trade.entry_price is not None:
self._check_tp_sl(trade)
return "same_candle"
# Missing Candle(s)
elif timestamp >= last_candle.timestamp + 2 * self.tf_equiv:
missing_candles = int((timestamp - last_candle.timestamp) / self.tf_equiv) - 1
logger.info("%s missing %s candles for %s %s (%s %s)", self.exchange, missing_candles, self.contract.symbol,
self.tf, timestamp, last_candle.timestamp)
for missing in range(missing_candles):
new_ts = last_candle.timestamp + self.tf_equiv
candle_info = {'ts': new_ts, 'open': last_candle.close, 'high': last_candle.close,
'low': last_candle.close, 'close': last_candle.close, 'volume': 0}
new_candle = Candle(candle_info, self.tf, "parse_trade")
self.candles.append(new_candle)
last_candle = new_candle
new_ts = last_candle.timestamp + self.tf_equiv
candle_info = {'ts': new_ts, 'open': price, 'high': price, 'low': price, 'close': price, 'volume': size}
new_candle = Candle(candle_info, self.tf, "parse_trade")
self.candles.append(new_candle)
return "new_candle"
# New Candle
elif timestamp >= last_candle.timestamp + self.tf_equiv:
new_ts = last_candle.timestamp + self.tf_equiv
candle_info = {'ts': new_ts, 'open': price, 'high': price, 'low': price, 'close': price, 'volume': size}
new_candle = Candle(candle_info, self.tf, "parse_trade")
self.candles.append(new_candle)
logger.info("%s New candle for %s %s", self.exchange, self.contract.symbol, self.tf)
return "new_candle"
def _check_order_status(self, order_id):
"""
Called regularly after an order has been placed, until it is filled.
:param order_id: The order id to check.
:return:
"""
order_status = self.client.get_order_status(self.contract, order_id)
if order_status is not None:
logger.info("%s order status: %s", self.exchange, order_status.status)
if order_status.status == "filled":
for trade in self.trades:
if trade.entry_id == order_id:
trade.entry_price = order_status.avg_price
trade.quantity = order_status.executed_qty
break
return
t = Timer(2.0, lambda: self._check_order_status(order_id))
t.start()
def _open_position(self, signal_result: int):
"""
Open Long or Short position based on the signal result.
:param signal_result: 1 (Long) or -1 (Short)
:return:
"""
# Short is not allowed on Spot platforms
if self.client.platform == "binance_spot" and signal_result == -1:
return
trade_size = self.client.get_trade_size(self.contract, self.candles[-1].close, self.balance_pct)
if trade_size is None:
return
order_side = "buy" if signal_result == 1 else "sell"
position_side = "long" if signal_result == 1 else "short"
self._add_log(f"{position_side.capitalize()} signal on {self.contract.symbol} {self.tf}")
order_status = self.client.place_order(self.contract, "MARKET", trade_size, order_side)
if order_status is not None:
self._add_log(f"{order_side.capitalize()} order placed on {self.exchange} | Status: {order_status.status}")
self.ongoing_position = True
avg_fill_price = None
if order_status.status == "filled":
avg_fill_price = order_status.avg_price
else:
t = Timer(2.0, lambda: self._check_order_status(order_status.order_id))
t.start()
new_trade = Trade({"time": int(time.time() * 1000), "entry_price": avg_fill_price,
"contract": self.contract, "strategy": self.strat_name, "side": position_side,
"status": "open", "pnl": 0, "quantity": order_status.executed_qty, "entry_id": order_status.order_id})
self.trades.append(new_trade)
def _check_tp_sl(self, trade: Trade):
"""
Based on the average entry price, calculates whether the defined stop loss or take profit has been reached.
:param trade:
:return:
"""
tp_triggered = False
sl_triggered = False
price = self.candles[-1].close
if trade.side == "long":
if self.stop_loss is not None:
if price <= trade.entry_price * (1 - self.stop_loss / 100):
sl_triggered = True
if self.take_profit is not None:
if price >= trade.entry_price * (1 + self.take_profit / 100):
tp_triggered = True
elif trade.side == "short":
if self.stop_loss is not None:
if price >= trade.entry_price * (1 + self.stop_loss / 100):
sl_triggered = True
if self.take_profit is not None:
if price <= trade.entry_price * (1 - self.take_profit / 100):
tp_triggered = True
if tp_triggered or sl_triggered:
self._add_log(f"{'Stop loss' if sl_triggered else 'Take profit'} for {self.contract.symbol} {self.tf} "
f"| Current Price = {price} (Entry price was {trade.entry_price})")
order_side = "SELL" if trade.side == "long" else "BUY"
if not self.client.futures:
# Make sure we don't sell more than what's in the available balance on Binance Spot
current_balances = self.client.get_balances()
if current_balances is not None:
if order_side == "SELL" and self.contract.base_asset in current_balances:
trade.quantity = min(current_balances[self.contract.base_asset].free, trade.quantity)
order_status = self.client.place_order(self.contract, "MARKET", trade.quantity, order_side)
if order_status is not None:
self._add_log(f"Exit order on {self.contract.symbol} {self.tf} placed successfully")
trade.status = "closed"
self.ongoing_position = False
class TechnicalStrategy(Strategy):
def __init__(self, client, contract: Contract, exchange: str, timeframe: str, balance_pct: float, take_profit: float,
stop_loss: float, other_params: Dict):
super().__init__(client, contract, exchange, timeframe, balance_pct, take_profit, stop_loss, "Technical")
self._ema_fast = other_params['ema_fast']
self._ema_slow = other_params['ema_slow']
self._ema_signal = other_params['ema_signal']
self._rsi_length = other_params['rsi_length']
def _rsi(self) -> float:
"""
Compute the Relative Strength Index.
:return: The RSI value of the previous candlestick
"""
close_list = []
for candle in self.candles:
close_list.append(candle.close)
closes = pd.Series(close_list)
# Calculate the different between the value of one row and the value of the row before
delta = closes.diff().dropna()
up, down = delta.copy(), delta.copy()
up[up < 0] = 0
down[down > 0] = 0 # Keep only the negative change, others are set to 0
avg_gain = up.ewm(com=(self._rsi_length - 1), min_periods=self._rsi_length).mean()
avg_loss = down.abs().ewm(com=(self._rsi_length - 1), min_periods=self._rsi_length).mean()
rs = avg_gain / avg_loss # Relative Strength
rsi = 100 - 100 / (1 + rs)
rsi = rsi.round(2)
return rsi.iloc[-2]
def _macd(self) -> Tuple[float, float]:
"""
Compute the MACD and its Signal line.
:return: The MACD and the MACD Signal value of the previous candlestick
"""
close_list = []
for candle in self.candles:
close_list.append(candle.close) # Use only the close price of each candlestick for the calculations
closes = pd.Series(close_list) # Converts the close prices list to a pandas Series.
ema_fast = closes.ewm(span=self._ema_fast).mean() # Exponential Moving Average method
ema_slow = closes.ewm(span=self._ema_slow).mean()
macd_line = ema_fast - ema_slow
macd_signal = macd_line.ewm(span=self._ema_signal).mean()
return macd_line.iloc[-2], macd_signal.iloc[-2]
def _check_signal(self):
"""
Compute technical indicators and compare their value to some predefined levels to know whether to go Long,
Short, or do nothing.
:return: 1 for a Long signal, -1 for a Short signal, 0 for no signal
"""
macd_line, macd_signal = self._macd()
rsi = self._rsi()
if rsi < 30 and macd_line > macd_signal:
return 1
elif rsi > 70 and macd_line < macd_signal:
return -1
else:
return 0
def check_trade(self, tick_type: str):
"""
To be triggered from the websocket _on_message() methods. Triggered only once per candlestick to avoid
constantly calculating the indicators. A trade can occur only if the is no open position at the moment.
:param tick_type: same_candle or new_candle
:return:
"""
if tick_type == "new_candle" and not self.ongoing_position:
signal_result = self._check_signal()
if signal_result in [1, -1]:
self._open_position(signal_result)
class BreakoutStrategy(Strategy):
def __init__(self, client, contract: Contract, exchange: str, timeframe: str, balance_pct: float, take_profit: float,
stop_loss: float, other_params: Dict):
super().__init__(client, contract, exchange, timeframe, balance_pct, take_profit, stop_loss, "Breakout")
self._min_volume = other_params['min_volume']
def _check_signal(self) -> int:
"""
Use candlesticks OHLC data to define Long or Short patterns.
:return: 1 for a Long signal, -1 for a Short signal, 0 for no signal
"""
if self.candles[-1].close > self.candles[-2].high and self.candles[-1].volume > self._min_volume:
return 1
elif self.candles[-1].close < self.candles[-2].low and self.candles[-1].volume > self._min_volume:
return -1
else:
return 0
def check_trade(self, tick_type: str):
"""
To be triggered from the websocket _on_message() methods
:param tick_type: same_candle or new_candle
:return:
"""
if not self.ongoing_position:
signal_result = self._check_signal()
if signal_result in [1, -1]:
self._open_position(signal_result)