forked from vinodvidhole/yahoo-finance-scraper
-
Notifications
You must be signed in to change notification settings - Fork 0
/
yahooscraper.py
239 lines (210 loc) · 8.99 KB
/
yahooscraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
#1
import requests
from bs4 import BeautifulSoup
import pandas as pd
#2
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import os
#3
import re
import json
from io import StringIO
from IPython.display import display
#4
import smtplib
BASE_URL = 'https://finance.yahoo.com' #Global Variable
#1
def get_page(url):
"""Download a webpage and return a beautiful soup doc"""
response = requests.get(url)
if not response.ok:
print('Status code:', response.status_code)
raise Exception('Failed to load page {}'.format(url))
doc = BeautifulSoup(response.text, 'html.parser')
return doc
def get_news_tags(doc):
"""Get the list of tags containing news information"""
news_class = "Ov(h) Pend(44px) Pstart(25px)" ## class name of div tag
news_list = doc.find_all('div', {'class': news_class})
return news_list
def parse_news(news_tag):
"""Get the news data point and return dictionary"""
news_source = news_tag.find('div').text #source
news_headline = news_tag.find('a').text #heading
news_url = news_tag.find('a')['href'] #link
news_content = news_tag.find('p').text #content
news_image = news_tag.findParent().find('img')['src'] #thumb image
return { 'source' : news_source,
'headline' : news_headline,
'url' : BASE_URL + news_url,
'content' : news_content,
'image' : news_image
}
def scrape_yahoo_news(url, path=None):
"""Get the yahoo finance market news and write them to CSV file """
if path is None:
path = 'stock-market-news.csv'
print('Requesting html page')
doc = get_page(url)
print('Extracting news tags')
news_list = get_news_tags(doc)
print('Parsing news tags')
news_data = [parse_news(news_tag) for news_tag in news_list]
print('Save the data to a CSV')
news_df = pd.DataFrame(news_data)
news_df.to_csv(path, index=None)
display(news_df.head())
#This return statement is optional, we are doing this just analyze the final output
return news_df
#2
def get_driver(url):
"""Return web driver"""
chrome_options = Options()
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--headless')
chrome_options.add_argument('--start-maximized') #
chrome_options.add_argument('--start-fullscreen')#
chrome_options.add_argument('--single-process')#
serv = Service(os.getcwd()+'/chromedriver')
driver = webdriver.Chrome(options=chrome_options, service=serv)
driver.get(url)
return driver
def get_table_header(driver):
"""Return Table columns in list form """
header = driver.find_elements(By.TAG_NAME, value= 'th')
header_list = [item.text for index, item in enumerate(header) if index < 10]
return header_list
def get_table_rows(driver):
"""Get number of rows available on the page """
tablerows = len(driver.find_elements(By.XPATH, value='//*[@id="scr-res-table"]/div[1]/table/tbody/tr'))
return tablerows
def parse_table_rows(rownum, driver, header_list):
"""get the data for one row at a time and return column value in the form of dictionary"""
row_dictionary = {}
#time.sleep(1/3)
for index , item in enumerate(header_list):
time.sleep(1/20)
column_xpath = '//*[@id="scr-res-table"]/div[1]/table/tbody/tr[{}]/td[{}]'.format(rownum, index+1)
row_dictionary[item] = driver.find_element(By.XPATH, value=column_xpath).text
return row_dictionary
def parse_multiple_pages(driver, total_crypto):
"""Loop through each row, perform Next button click at the end of page
return total_crypto numbers of rows
"""
table_data = []
page_num = 1
is_scraping = True
header_list = get_table_header(driver)
while is_scraping:
table_rows = get_table_rows(driver)
print('Found {} rows on Page : {}'.format(table_rows, page_num))
print('Parsing Page : {}'.format(page_num))
table_data += [parse_table_rows(i, driver, header_list) for i in range (1, table_rows + 1)]
total_count = len(table_data)
print('Total rows scraped : {}'.format(total_count))
if total_count >= total_crypto:
print('Done Parsing..')
is_scraping = False
else:
print('Clicking Next Button')
element = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, '//*[@id="scr-res-table"]/div[2]/button[3]')))
element.click()
page_num += 1
return table_data
def scrape_yahoo_crypto(url, total_crypto, path=None):
"""Get the list of yahoo finance crypto-currencies and write them to CSV file """
if path is None:
path = 'crypto-currencies.csv'
print('Creating driver')
driver = get_driver(url)
table_data = parse_multiple_pages(driver, total_crypto)
driver.close()
driver.quit()
print('Save the data to a CSV')
table_df = pd.DataFrame(table_data)
table_df.to_csv(path, index=None)
#This return statement is optional, we are doing this just analyze the final output
display(table_df.head())
return table_df
#3
def get_event_page(scraper_url):
"""Download a webpage and return a beautiful soup doc"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
"(KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19582"
}
response = requests.get(scraper_url, headers=headers)
if not response.ok:
print('Status code:', response.status_code)
raise Exception('Failed to fetch web page ' + scraper_url)
# Construct a beautiful soup document
doc = BeautifulSoup(response.text, 'html.parser')
return doc
def get_json_dictionary(doc):
"""Get Json formated data in the form of Python Dictionary"""
pattern = re.compile(r'\s--\sData\s--\s')
script_data = doc.find('script', text=pattern).text
script_data = doc.find('script', text=pattern).contents[0]
start = script_data.find('context')-2
json_text = script_data[start:-12]
parsed_dictionary = json.loads(json_text)
return parsed_dictionary
def get_total_rows(parsed_dictionary):
'''Get the Total Rows for the search criteria & Columns detail'''
total_rows = parsed_dictionary['context']['dispatcher']['stores']['ScreenerResultsStore']['results']['total']
return total_rows
def get_page_rows(parsed_dictionary):
"""Get the Content current page"""
data_dictionary = parsed_dictionary['context']['dispatcher']['stores']['ScreenerResultsStore']['results']['rows']
return data_dictionary
def scrape_all_pages(event_type, date):
"""Loop through each row and return lists of data dictiionary"""
YAHOO_CAL_URL = BASE_URL+'/calendar/{}?day={}&offset={}&size={}'
max_rows_per_page = '100' # this indicates max rows per page
page_number = 1
final_data_dictionary = []
while page_number > 0:
print("Processing page # {}".format(page_number))
page_url = str((page_number - 1 ) * int(max_rows_per_page))
scrape_url = YAHOO_CAL_URL.format(event_type, date, page_url, max_rows_per_page)
print("Scrape url for page {} is {}".format(page_number,scrape_url))
page_doc = get_event_page(scrape_url)
parse_dict = get_json_dictionary(page_doc)
if page_number == 1:
total_rows = get_total_rows(parse_dict)
final_data_dictionary += get_page_rows(parse_dict)
if len(final_data_dictionary) >= total_rows:
page_number = 0
return final_data_dictionary
page_number += 1
def scrape_yahoo_calendar(event_types, date_param):
"""Get the list of yahoo finance calendar and write them to CSV file """
for event in event_types:
data_dict = {}
print('Web Scraping for ', event )
data_dict = scrape_all_pages(event, date_param)
if len(data_dict) > 0:
scraped_df = pd.DataFrame(data_dict)
scraped_df.to_csv(event+'_'+date_param+'.csv',index=False)
print("checking few rows.. for event : {} & date : {}".format(event, date_param))
display(scraped_df.head())
else:
print("No data found for event : {} & date : {}".format(event, date_param))
if __name__ == "__main__" :
YAHOO_NEWS_URL = BASE_URL+'/topic/stock-market-news/'
news_df = scrape_yahoo_news(YAHOO_NEWS_URL)
YAHOO_FINANCE_URL = BASE_URL+'/cryptocurrencies'
TOTAL_CRYPTO = 50
crypto_df = scrape_yahoo_crypto(YAHOO_FINANCE_URL, TOTAL_CRYPTO,'crypto-currencies.csv')
#date_param = '2022-03-18' # no data condition
date_param = '2022-02-28'
event_types = ['splits','economic','ipo','earnings']
scrape_yahoo_calendar(event_types, date_param)
print("Processing Done")