forked from camisatx/pySecMaster
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquery_data.py
285 lines (234 loc) · 11.8 KB
/
query_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
from datetime import datetime
import numpy as np
import pandas as pd
import psycopg2
import re
import time
__author__ = 'Josh Schertz'
__copyright__ = 'Copyright (C) 2018 Josh Schertz'
__description__ = 'An automated system to store and maintain financial data.'
__email__ = 'josh[AT]joshschertz[DOT]com'
__license__ = 'GNU AGPLv3'
__maintainer__ = 'Josh Schertz'
__status__ = 'Development'
__url__ = 'https://joshschertz.com/'
__version__ = '1.5.0'
'''
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
def calculate_adjusted_prices(df, column):
""" Vectorized approach for calculating the adjusted prices for the
specified column in the provided DataFrame. This creates a new column
called 'adj_<column name>' with the adjusted prices. This function requires
that the DataFrame have columns with dividend and split values.
NOTE: This assumes the input split values direct. E.g. 7-for-1 split = 7
:param df: DataFrame with raw prices along with dividend and split_ratio
values
:param column: String of which price column should have adjusted prices
created for it
:return: DataFrame with the addition of the adjusted price column
"""
adj_column = 'adj_' + column
# Reverse the DataFrame order, sorting by date in descending order
df.sort_index(ascending=False, inplace=True)
price_col = df[column].values
split_col = df['split'].values
dividend_col = df['dividend'].values
adj_price_col = np.zeros(len(df.index))
adj_price_col[0] = price_col[0]
for i in range(1, len(price_col)):
adj_price_col[i] = \
round((adj_price_col[i - 1] + adj_price_col[i - 1] *
(((price_col[i] * (1/split_col[i - 1])) -
price_col[i - 1] -
dividend_col[i - 1]) / price_col[i - 1])), 4)
df[adj_column] = adj_price_col
# Change the DataFrame order back to dates ascending
df.sort_index(ascending=True, inplace=True)
return df
def pull_daily_prices(database, user, password, host, port, query_type,
data_vendor_id, beg_date, end_date, adjust=True,
source='tsid', *args):
""" Query the daily prices from the database for the tsid provided between
the start and end dates. Return a DataFrame with the prices.
:param database: String of the database name
:param user: String of the username used to login to the database
:param password: String of the password used to login to the database
:param host: String of the database address (localhost, url, ip, etc.)
:param port: Integer of the database port number (5432)
:param query_type: String of which query to run
:param data_vendor_id: Integer of which data vendor id to return prices for
:param beg_date: String of the ISO date to start with
:param end_date: String of the ISO date to end with
:param adjust: Boolean of whether to adjust the values or not; default True
:param source: String of the ticker's source
:return: DataFrame of the returned prices
"""
conn = psycopg2.connect(database=database, user=user, password=password,
host=host, port=port)
try:
with conn:
cur = conn.cursor()
if query_type == 'ticker':
tsid, = args
print('Extracting the daily prices for %s' % (tsid,))
cur.execute("""SELECT date, source_id AS tsid, open, high, low,
close, volume, dividend, split
FROM daily_prices
WHERE source_id=%s AND source=%s
AND data_vendor_id=%s
AND date>=%s AND date<=%s""",
(tsid, source, data_vendor_id, beg_date, end_date))
else:
raise NotImplementedError('Query type %s is not implemented '
'within pull_daily_prices' %
query_type)
rows = cur.fetchall()
if rows:
df = pd.DataFrame(rows,
columns=['date', 'tsid', 'open', 'high',
'low', 'close', 'volume',
'dividend', 'split'])
else:
raise SystemExit('No data returned from table query. Try '
'adjusting the criteria for the query.')
# The next two lines change the index of the df to be the date.
df.set_index(['date'], inplace=True)
df.index.name = 'date'
df.sort_index(inplace=True)
if adjust:
# Change the columns from decimal to float
df['dividend'] = df['dividend'].apply(lambda x: float(x))
df['split'] = df['split'].apply(lambda x: float(x))
df['close'] = df['close'].apply(lambda x: float(x))
# Calculate the adjusted prices for the close column
df = calculate_adjusted_prices(df=df, column='close')
return df
except psycopg2.Error as e:
print('Error when trying to retrieve price data from the %s database '
'in pull_daily_prices' % database)
print(e)
except conn.OperationalError:
raise SystemError('Unable to connect to the %s database in '
'pull_daily_prices. Make sure the database '
'address/name are correct.' % database)
except Exception as e:
print(e)
raise SystemError('Error: Unknown issue occurred in pull_daily_prices')
def pull_minute_prices(database, user, password, host, port, query_type,
data_vendor_id, beg_date, end_date, source='tsid',
*args):
""" Query the minute prices from the database for the tsid provided between
the start and end dates. Return a DataFrame with the prices.
:param database: String of the database name
:param user: String of the username used to login to the database
:param password: String of the password used to login to the database
:param host: String of the database address (localhost, url, ip, etc.)
:param port: Integer of the database port number (5432)
:param query_type: String of which query to run
:param data_vendor_id: Integer of which data vendor id to return prices for
:param beg_date: String of the ISO date to start with
:param end_date: String of the ISO date to end with
:param source: String of the source
:param args:
:return: DataFrame of the returned prices
"""
conn = psycopg2.connect(database=database, user=user, password=password,
host=host, port=port)
try:
with conn:
cur = conn.cursor()
if query_type == 'ticker':
tsid, = args
print('Extracting the minute prices for %s' % (tsid,))
cur.execute("""SELECT date, source_id AS tsid, open, high, low,
close, volume
FROM minute_prices
WHERE source_id=%s AND source=%s
AND data_vendor_id=%s
AND date>=%s AND date<=%s""",
(tsid, source, data_vendor_id, beg_date, end_date))
else:
raise NotImplementedError('Query type %s is not implemented '
'within pull_minute_prices' %
query_type)
rows = cur.fetchall()
if rows:
df = pd.DataFrame(rows,
columns=['date', 'tsid', 'open', 'high',
'low', 'close', 'volume'])
else:
raise SystemExit('No data returned from table query. Try '
'adjusting the criteria for the query.')
# The next two lines change the index of the df to be the date.
df.set_index(['date'], inplace=True)
df.index.name = ['date']
df.sort_index(inplace=True)
return df
except psycopg2.Error as e:
print('Error when trying to retrieve price data from the %s database '
'in pull_minute_prices' % database)
print(e)
except conn.OperationalError:
raise SystemError('Unable to connect to the %s database in '
'pull_minute_prices. Make sure the database '
'address/name are correct.' % database)
except Exception as e:
print(e)
raise SystemError('Error: Unknown issue occurred in pull_minute_prices')
if __name__ == '__main__':
from utilities.user_dir import user_dir
userdir = user_dir()
test_database = userdir['postgresql']['pysecmaster_db']
test_user = userdir['postgresql']['pysecmaster_user']
test_password = userdir['postgresql']['pysecmaster_password']
test_host = userdir['postgresql']['pysecmaster_host']
test_port = userdir['postgresql']['pysecmaster_port']
test_query_type = 'ticker' # index, ticker
test_tsid = 'AAPL.Q.0'
test_data_vendor_id = 1 # Quandl WIKi
# test_data_vendor_id = 11 # Quandl EOD
# test_data_vendor_id = 15 # pySecMaster_Consensus
# test_data_vendor_id = 12 # Google_Finance
test_beg_date = '1950-01-01 00:00:00'
test_end_date = '2018-12-30 00:00:00'
frequency = 'daily' # daily, minute
start_time = time.time()
if test_query_type == 'ticker':
if frequency == 'daily':
prices_df = pull_daily_prices(test_database, test_user,
test_password, test_host, test_port,
test_query_type, test_data_vendor_id,
test_beg_date, test_end_date,
True, 'tsid', test_tsid)
elif frequency == 'minute':
prices_df = pull_minute_prices(test_database, test_user,
test_password, test_host, test_port,
test_query_type, test_data_vendor_id,
test_beg_date, test_end_date,
'tsid', test_tsid)
else:
raise NotImplementedError('Frequency %s is not implemented within '
'query_data.py' % frequency)
else:
raise NotImplementedError('Query type %s is not implemented within '
'query_data.py' % test_query_type)
csv_friendly_tsid = re.sub('[.]', '_', test_tsid)
print('Query took %0.2f seconds' % (time.time() - start_time))
print(prices_df)
#prices_df.to_csv('output/%s_%s_%s.csv' %
# (csv_friendly_tsid, frequency,
# datetime.today().strftime('%Y%m%d')))
unique_codes = pd.unique((prices_df['tsid']).values)
print('There are %i unique tsid codes' % (len(unique_codes)))
print('There are %s rows' % ('{:,}'.format(len(prices_df.index))))
print(datetime.today().strftime('%Y%m%d'))