-
Notifications
You must be signed in to change notification settings - Fork 51
/
ranking_client.py
389 lines (319 loc) · 13.9 KB
/
ranking_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
from polygon import RESTClient
from config import POLYGON_API_KEY, FINANCIAL_PREP_API_KEY, MONGO_DB_USER, MONGO_DB_PASS, API_KEY, API_SECRET, BASE_URL, mongo_url
import threading
from concurrent.futures import ThreadPoolExecutor
from urllib.request import urlopen
from zoneinfo import ZoneInfo
from pymongo import MongoClient
import time
from datetime import datetime, timedelta
import alpaca
from alpaca.trading.client import TradingClient
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
from alpaca.data.historical.stock import StockHistoricalDataClient
from alpaca.trading.stream import TradingStream
from alpaca.data.live.stock import StockDataStream
from alpaca.data.requests import (
StockBarsRequest,
StockTradesRequest,
StockQuotesRequest
)
from alpaca.trading.requests import (
GetAssetsRequest,
MarketOrderRequest,
LimitOrderRequest,
StopOrderRequest,
StopLimitOrderRequest,
TakeProfitRequest,
StopLossRequest,
TrailingStopOrderRequest,
GetOrdersRequest,
ClosePositionRequest
)
from alpaca.trading.enums import (
AssetStatus,
AssetExchange,
OrderSide,
OrderType,
TimeInForce,
OrderClass,
QueryOrderStatus
)
from alpaca.common.exceptions import APIError
from strategies.talib_indicators import *
import math
import yfinance as yf
import logging
from collections import Counter
from trading_client import market_status
from helper_files.client_helper import strategies, get_latest_price, get_ndaq_tickers, dynamic_period_selector
import time
from datetime import datetime
import heapq
import certifi
ca = certifi.where()
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.FileHandler('rank_system.log'), # Log messages to a file
logging.StreamHandler() # Log messages to the console
]
)
def process_ticker(ticker, mongo_client):
try:
current_price = None
historical_data = None
while current_price is None:
try:
current_price = get_latest_price(ticker)
except Exception as fetch_error:
logging.warning(f"Error fetching price for {ticker}. Retrying... {fetch_error}")
time.sleep(10)
while historical_data is None:
try:
historical_data = get_data(ticker)
except Exception as fetch_error:
logging.warning(f"Error fetching historical data for {ticker}. Retrying... {fetch_error}")
time.sleep(10)
for strategy in strategies:
db = mongo_client.trading_simulator
holdings_collection = db.algorithm_holdings
print(f"Processing {strategy.__name__} for {ticker}")
strategy_doc = holdings_collection.find_one({"strategy": strategy.__name__})
if not strategy_doc:
logging.warning(f"Strategy {strategy.__name__} not found in database. Skipping.")
continue
account_cash = strategy_doc["amount_cash"]
total_portfolio_value = strategy_doc["portfolio_value"]
portfolio_qty = strategy_doc["holdings"].get(ticker, {}).get("quantity", 0)
simulate_trade(ticker, strategy, historical_data, current_price,
account_cash, portfolio_qty, total_portfolio_value, mongo_client)
print(f"{ticker} processing completed.")
except Exception as e:
logging.error(f"Error in thread for {ticker}: {e}")
def simulate_trade(ticker, strategy, historical_data, current_price, account_cash, portfolio_qty, total_portfolio_value, mongo_client):
"""
Simulates a trade based on the given strategy and updates MongoDB.
"""
# Simulate trading action from strategy
print(f"Simulating trade for {ticker} with strategy {strategy.__name__} and quantity of {portfolio_qty}")
action, quantity = simulate_strategy(strategy, ticker, current_price, historical_data, account_cash, portfolio_qty, total_portfolio_value)
# MongoDB setup
db = mongo_client.trading_simulator
holdings_collection = db.algorithm_holdings
points_collection = db.points_tally
# Find the strategy document in MongoDB
strategy_doc = holdings_collection.find_one({"strategy": strategy.__name__})
holdings_doc = strategy_doc.get("holdings", {})
time_delta = db.time_delta.find_one({})['time_delta']
# Update holdings and cash based on trade action
if action in ["buy"] and strategy_doc["amount_cash"] - quantity * current_price > 15000 and quantity > 0 and ((portfolio_qty + quantity) * current_price) / total_portfolio_value < 0.10:
logging.info(f"Action: {action} | Ticker: {ticker} | Quantity: {quantity} | Price: {current_price}")
# Calculate average price if already holding some shares of the ticker
if ticker in holdings_doc:
current_qty = holdings_doc[ticker]["quantity"]
new_qty = current_qty + quantity
average_price = (holdings_doc[ticker]["price"] * current_qty + current_price * quantity) / new_qty
else:
new_qty = quantity
average_price = current_price
# Update the holdings document for the ticker.
holdings_doc[ticker] = {
"quantity": new_qty,
"price": average_price
}
# Deduct the cash used for buying and increment total trades
holdings_collection.update_one(
{"strategy": strategy.__name__},
{
"$set": {
"holdings": holdings_doc,
"amount_cash": strategy_doc["amount_cash"] - quantity * current_price,
"last_updated": datetime.now()
},
"$inc": {"total_trades": 1}
},
upsert=True
)
elif action in ["sell"] and str(ticker) in holdings_doc and holdings_doc[str(ticker)]["quantity"] > 0:
logging.info(f"Action: {action} | Ticker: {ticker} | Quantity: {quantity} | Price: {current_price}")
current_qty = holdings_doc[ticker]["quantity"]
# Ensure we do not sell more than we have
sell_qty = min(quantity, current_qty)
holdings_doc[ticker]["quantity"] = current_qty - sell_qty
price_change_ratio = current_price / holdings_doc[ticker]["price"] if ticker in holdings_doc else 1
if current_price > holdings_doc[ticker]["price"]:
#increment successful trades
holdings_collection.update_one(
{"strategy": strategy.__name__},
{"$inc": {"successful_trades": 1}},
upsert=True
)
# Calculate points to add if the current price is higher than the purchase price
if price_change_ratio < 1.05:
points = time_delta * 1
elif price_change_ratio < 1.1:
points = time_delta * 1.5
else:
points = time_delta * 2
else:
# Calculate points to deduct if the current price is lower than the purchase price
if holdings_doc[ticker]["price"] == current_price:
holdings_collection.update_one(
{"strategy": strategy.__name__},
{"$inc": {"neutral_trades": 1}}
)
else:
holdings_collection.update_one(
{"strategy": strategy.__name__},
{"$inc": {"failed_trades": 1}},
upsert=True
)
if price_change_ratio > 0.975:
points = -time_delta * 1
elif price_change_ratio > 0.95:
points = -time_delta * 1.5
else:
points = -time_delta * 2
# Update the points tally
points_collection.update_one(
{"strategy": strategy.__name__},
{
"$set" : {
"last_updated": datetime.now()
},
"$inc": {"total_points": points}
},
upsert=True
)
if holdings_doc[ticker]["quantity"] == 0:
del holdings_doc[ticker]
# Update cash after selling
holdings_collection.update_one(
{"strategy": strategy.__name__},
{
"$set": {
"holdings": holdings_doc,
"amount_cash": strategy_doc["amount_cash"] + sell_qty * current_price,
"last_updated": datetime.now()
},
"$inc": {"total_trades": 1}
},
upsert=True
)
# Remove the ticker if quantity reaches zero
if holdings_doc[ticker]["quantity"] == 0:
del holdings_doc[ticker]
else:
logging.info(f"Action: {action} | Ticker: {ticker} | Quantity: {quantity} | Price: {current_price}")
print(f"Action: {action} | Ticker: {ticker} | Quantity: {quantity} | Price: {current_price}")
# Close the MongoDB connection
def update_portfolio_values(client):
"""
still need to implement.
we go through each strategy and update portfolio value buy cash + summation(holding * current price)
"""
db = client.trading_simulator
holdings_collection = db.algorithm_holdings
# Update portfolio values
for strategy_doc in holdings_collection.find({}):
# Calculate the portfolio value for the strategy
portfolio_value = strategy_doc["amount_cash"]
for ticker, holding in strategy_doc["holdings"].items():
# Get the current price of the ticker from the Polygon API
current_price = None
while current_price is None:
try:
current_price = get_latest_price(ticker)
except:
print(f"Error fetching price for {ticker}. Retrying...")
print(f"Current price of {ticker}: {current_price}")
# Calculate the value of the holding
holding_value = holding["quantity"] * current_price
# Add the holding value to the portfolio value
portfolio_value += holding_value
# Update the portfolio value in the strategy document
holdings_collection.update_one({"strategy": strategy_doc["strategy"]}, {"$set": {"portfolio_value": portfolio_value}}, upsert=True)
# Update MongoDB with the modified strategy documents
def update_ranks(client):
""""
based on portfolio values, rank the strategies to use for actual trading_simulator
"""
db = client.trading_simulator
points_collection = db.points_tally
rank_collection = db.rank
algo_holdings = db.algorithm_holdings
"""
delete all documents in rank collection first
"""
rank_collection.delete_many({})
"""
now update rank based on successful_trades - failed
"""
q = []
for strategy_doc in algo_holdings.find({}):
"""
based on (points_tally (less points pops first), failed-successful(more negtive pops first), portfolio value (less value pops first), and then strategy_name), we add to heapq.
"""
strategy_name = strategy_doc["strategy"]
if strategy_name == "test" or strategy_name == "test_strategy":
continue
if points_collection.find_one({"strategy": strategy_name})["total_points"] > 0:
heapq.heappush(q, (points_collection.find_one({"strategy": strategy_name})["total_points"] * 2 + (strategy_doc["portfolio_value"]), strategy_doc["successful_trades"] - strategy_doc["failed_trades"], strategy_doc["amount_cash"], strategy_doc["strategy"]))
else:
heapq.heappush(q, (strategy_doc["portfolio_value"], strategy_doc["successful_trades"] - strategy_doc["failed_trades"], strategy_doc["amount_cash"], strategy_doc["strategy"]))
rank = 1
while q:
_, _, _, strategy_name = heapq.heappop(q)
rank_collection.insert_one({"strategy": strategy_name, "rank": rank})
rank+=1
def main():
"""
Main function to control the workflow based on the market's status.
"""
ndaq_tickers = []
early_hour_first_iteration = True
post_market_hour_first_iteration = True
while True:
mongo_client = MongoClient(mongo_url, tlsCAFile=ca)
status = mongo_client.market_data.market_status.find_one({})["market_status"]
if status == "open":
if not ndaq_tickers:
logging.info("Market is open. Processing strategies.")
ndaq_tickers = get_ndaq_tickers(mongo_url, FINANCIAL_PREP_API_KEY)
threads = []
for ticker in ndaq_tickers:
thread = threading.Thread(target=process_ticker, args=(ticker, mongo_client))
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
logging.info("Finished processing all strategies. Waiting for 60 seconds.")
time.sleep(60)
elif status == "early_hours":
if early_hour_first_iteration is True:
ndaq_tickers = get_ndaq_tickers(mongo_url, FINANCIAL_PREP_API_KEY)
early_hour_first_iteration = False
post_market_hour_first_iteration = True
logging.info("Market is in early hours. Waiting for 60 seconds.")
time.sleep(60)
elif status == "closed":
if post_market_hour_first_iteration is True:
early_hour_first_iteration = True
logging.info("Market is closed. Performing post-market analysis.")
post_market_hour_first_iteration = False
#increment time_delta in database by 0.01
mongo_client.trading_simulator.time_delta.update_one({}, {"$inc": {"time_delta": 0.01}})
#Update ranks
update_portfolio_values(mongo_client)
update_ranks(mongo_client)
time.sleep(60)
else:
logging.error("An error occurred while checking market status.")
time.sleep(60)
mongo_client.close()
if __name__ == "__main__":
main()