-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutils.py
231 lines (193 loc) · 8.96 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# Testing the Alpaca API
import alpaca_trade_api as tradeapi
import pandas as pd
import os
import tools
import tqdm
# Set up API and check to see that it is working
def set_API(ID=None, key=None, paperMode=True, ask=False):
if ID == None or key == None:
ask = True
if ask:
ID = str(input("Enter Alpaca Account ID: "))
key = str(input("Enter Alpaca Account Key: "))
url = ""
if paperMode:
url = 'https://paper-api.alpaca.markets'
else:
raise ConnectionRefusedError("Live mode not Enabled!")
api = tradeapi.REST(ID, key, base_url=url) # or use ENV Vars shown below
try:
account = api.get_account()
except Exception:
raise ConnectionError("Unable to get account for keys.")
return api
def check_for_data(stocks, start, end, timeframe):
'''Checks if stock data is already in the data directory by using the runs.txt file.
Args:
stocks (list): list of stocks to check for
start (str): string representation of the start date
end (str): string reprsentation of the end date
timeframe (str): timeframe for which the data should be checked
Returns:
returnStocks (list): list of stocks for which data has not been found
'''
# Implement further validation here - check if the stock name is within the list of directories
# Check if the start and end dates match the dates within the file
# Create a clear function that removes all of the files within the data file and use it for reset=True
if not isinstance(stocks, list):
raise TypeError("stocks must be a list!")
if not isinstance(start, str):
raise TypeError("start must be a string formatted YYYY-MM-DD!")
if not isinstance(end, str):
raise TypeError("end must be a string formatted YYYY-MM-DD!")
f = open("bin\\runs.txt", "a+")
f.seek(0)
lines = f.readlines()
returnStocks = list()
all_files = os.listdir("data")
# Check that stocks are in the directory
for stock in stocks:
search_text = stock.upper() + start + end + timeframe + "\n"
file_text = stock.upper() + "_" + timeframe + "_data.csv"
if search_text in lines and file_text in all_files:
continue
else:
if search_text in lines and file_text not in all_files:
print(stock + " data has been deleted, re-getting data.")
f.seek(2)
f.write(search_text)
returnStocks.append(stock.upper())
f.close()
return returnStocks
def make_data_csv(stocks, start, end, api=None, timeframe="day", reset=False):
'''Find data for the specificed stocks and create csv's in the data directory
Args:
stocks (list): list of stocks to find data for
start (str): string representation of the start date
end (str): string reprsentation of the end date
timeframe (str): timeframe for getting data; can be minute, 1Min, 5Min, 15Min, day or 1D
reset (bool): if True, create new csv files regardless if data is already present
if False, create csv files for only missing stocks
Returns:
None
'''
if api == None:
api = set_API(ask=True)
else:
pass
timeframe_list = ["minute", "1Min", "5Min", "15Min", "day", "1D"]
# Basic Error Checking
if not isinstance(stocks, list):
raise TypeError("stocks musbt be of type list with string values!")
if len(stocks) == 0:
raise ValueError("list of stocks musbt be greater than 0!")
if not isinstance(start, str):
raise TypeError("start must be a string formatted YYYY-MM-DD!")
if not isinstance(end, str):
raise TypeError("end must be a string formatted YYYY-MM-DD!")
if not isinstance(timeframe, str):
raise TypeError("timeframe must be of type str and be minute, 1Min, 5Min, 15Min, day or 1D")
if timeframe not in timeframe_list:
timeframe_str = ", ".join([elem for elem in timeframe_list])
raise ValueError("timeframe must be of: " + timeframe_str + ", not " + timeframe)
#Check for previous runs
if reset:
all_files = os.listdir("data")
for nfile in all_files:
os.remove("data\\" + nfile)
new_runs = open("bin\\runs.txt", "w+")
for stock in stocks:
search_text = stock.upper() + start + end + timeframe + "\n"
new_runs.write(search_text)
new_runs.close()
else:
stocks = check_for_data(stocks, start, end, timeframe)
if len(stocks) == 0 and reset == False:
print("Data saved from previous runs, new csv('s) not needed.\nPlease check that old csv's have the correct time ranges.\n")
return
# Get Data from API
NY = 'America/New_York'
start = pd.Timestamp(start, tz=NY).isoformat()
end = pd.Timestamp(end, tz=NY).isoformat()
try:
data = api.get_barset(stocks, timeframe, start=start, end=end).df
except Exception as e:
print("Alpaca API Error: Unable to get data for: ", stocks, "\n", e)
return
# Save Data in CSV for later use
stop_index = len(data.columns)
if stop_index % 5 != 0:
raise TypeError("API did not return expected number of columns")
start_index = 0
end_index = 5
# Save Data
while (start_index < stop_index):
stock_data = data[data.columns[start_index:end_index]]
stock_name = stock_data.columns[0][0].upper()
filename = "data\\" + stock_name + "_" + timeframe + "_data.csv"
stock_data.to_csv(filename)
start_index += 5
end_index += 5
print("Saved file: " + filename)
if not reset:
print("\nPlease ensure previous files in \\data has the correct time ranges.\nOtherwise, delete all files in \\data and run this program again.\n")
def get_data(stocks, timeframe):
'''Get data from the data directory for the stocks given.
Args:
stocks (list): list of stocks for which to get data for
timeframe (str): timeframe for stock data; can be minute, 1Min, 5Min, 15Min, day or 1D
Returns:
stock_dict (dict): dictonary of stocks and the associated data
'''
if not isinstance(stocks, list):
raise TypeError("get_data must have an input of type list!")
if not isinstance(timeframe, str):
raise TypeError("timeframe must be of type str and be minute, 1Min, 5Min, 15Min, day or 1D")
timeframe_list = ["minute", "1Min", "5Min", "15Min", "day", "1D"]
if timeframe not in timeframe_list:
timeframe_str = ", ".join([elem for elem in timeframe_list])
raise ValueError("timeframe must be of: " + timeframe_str + ", not " + timeframe)
stock_dict = dict()
for stock in stocks:
stock = stock.upper()
filename = "data\\" + stock + "_" + timeframe + "_data.csv"
# Check if data exsists
try:
stock_data = pd.read_csv(filename).dropna()
except:
print("Unable to find data for: ", stock, " skipping for now.")
continue
columns = ["date", stock + "_open", stock + "_high", stock + "_low", stock + "_close", stock + "_volume"]
stock_data.columns = columns
stock_dict[stock.upper()] = stock_data
return stock_dict
def analyze(stocks, start, end, api=None, impulseN=[15,30], lookback=3, valueZoneN=[5,10],
MACDN=[12,26], ADXN=14, reset=True, timeframe="day", progress_bar=True):
'''Pesudo Main Class for testing programs'''
# Create data and dictonaries
make_data_csv(stocks, start, end, api=api, reset=reset, timeframe=timeframe)
stock_dict = get_data(stocks, timeframe)
keys = stock_dict.keys()
progress_bar = not(progress_bar)
analyzed = dict()
# For each stock, create a new signal table
for key in tqdm.tqdm(keys, ncols=100, desc=("Analyzing"), disable=progress_bar):
data = stock_dict[key]
to_analyze = pd.DataFrame()
# Transform data for analysis
value_zone = tools.get_ValueZone(stock=key, data=data, method="average", n=valueZoneN)
MACD = tools.get_MACD(stock=key, data=data, n=MACDN, method="average")
supertrend = tools.get_SuperTrend(stock=key, data=data)
shortImpulse = tools.get_Impulse(stock=key, data=data, n=impulseN[0], method="average", lookback=lookback)
longImpulse = tools.get_Impulse(stock=key, data=data, n=impulseN[1], method="average", lookback=lookback)
ADX = tools.get_ADX(stock=key, data=data, n=10, lookback=lookback)
# Assign transformed data
to_analyze[key + "_date"] = data["date"]
to_analyze[key + "_average"] = tools.make_average(key, data)[key + "_average"]
to_analyze[key + "_MACD"] = MACD["MACD"].apply(float)
to_analyze[key + "_MACD_Signal"] = MACD["MACD_Signal"].apply(float)
to_analyze[key + "_ST"] = supertrend["ST"].apply(float)
to_analyze[key + "_ST_BUYSELL"] = supertrend["ST_BUY_SELL"]
analyzed[key] = to_analyze
return analyzed