Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added range search into scraper #84

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions build/lib/search_engines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from .engines import *


__title__ = 'search_engines'
__version__ = '0.5'
__author__ = 'Tasos M. Adamopoulos'

__all__ = [
'Google',
'Bing',
'Yahoo',
'Aol',
'Duckduckgo',
'Startpage',
'Dogpile',
'Ask',
'Mojeek',
'Qwant',
'Torch'
]
30 changes: 30 additions & 0 deletions build/lib/search_engines/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from os import path as os_path, pardir as os_pardir, name as os_name
from sys import version_info


## Python version
PYTHON_VERSION = version_info.major

## Maximum number or pages to search
SEARCH_ENGINE_RESULTS_PAGES = 20

## HTTP request timeout
TIMEOUT = 10

## Default User-Agent string
USER_AGENT = 'search_engines/0.5 Repo: https://github.com/tasos-py/Search-Engines-Scraper'

## Fake User-Agent string - Google desn't like the default user-agent
FAKE_USER_AGENT = 'Mozilla/5.0 (Windows NT 6.1; rv:84.0) Gecko/20100101 Firefox/84.0'

## Proxy server
PROXY = None

## TOR proxy server
TOR = 'socks5h://127.0.0.1:9050'

_base_dir = os_path.abspath(os_path.dirname(os_path.abspath(__file__)))

## Path to output files
OUTPUT_DIR = os_path.join(_base_dir, 'search_results') + os_path.sep

205 changes: 205 additions & 0 deletions build/lib/search_engines/engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
from bs4 import BeautifulSoup
from time import sleep
from random import uniform as random_uniform
from collections import namedtuple

from .results import SearchResults
from .http_client import HttpClient
from . import utils
from . import output as out
from . import config as cfg


class SearchEngine(object):
'''The base class for all Search Engines.'''
def __init__(self, proxy=cfg.PROXY, timeout=cfg.TIMEOUT):
'''
:param str proxy: optional, a proxy server
:param int timeout: optional, the HTTP timeout
'''
self._http_client = HttpClient(timeout, proxy)
self._delay = (1, 4)
self._query = ''
self._filters = []

self.results = SearchResults()
'''The search results.'''
self.ignore_duplicate_urls = False
'''Collects only unique URLs.'''
self.ignore_duplicate_domains = False
'''Collects only unique domains.'''
self.is_banned = False
'''Indicates if a ban occured'''

def _selectors(self, element):
'''Returns the appropriate CSS selector.'''
raise NotImplementedError()

def _first_page(self):
'''Returns the initial page URL.'''
raise NotImplementedError()

def _next_page(self, tags):
'''Returns the next page URL and post data.'''
raise NotImplementedError()

def _get_url(self, tag, item='href'):
'''Returns the URL of search results items.'''
selector = self._selectors('url')
url = self._get_tag_item(tag.select_one(selector), item)
return utils.unquote_url(url)

def _get_title(self, tag, item='text'):
'''Returns the title of search results items.'''
selector = self._selectors('title')
return self._get_tag_item(tag.select_one(selector), item)

def _get_text(self, tag, item='text'):
'''Returns the text of search results items.'''
selector = self._selectors('text')
return self._get_tag_item(tag.select_one(selector), item)

def _get_page(self, page, data=None):
'''Gets pagination links.'''
if data:
return self._http_client.post(page, data)
return self._http_client.get(page)

def _get_tag_item(self, tag, item):
'''Returns Tag attributes.'''
if not tag:
return u''
return tag.text if item == 'text' else tag.get(item, u'')

def _item(self, link):
'''Returns a dictionary of the link data.'''
return {
'host': utils.domain(self._get_url(link)),
'link': self._get_url(link),
'title': self._get_title(link).strip(),
'text': self._get_text(link).strip()
}

def _query_in(self, item):
'''Checks if query is contained in the item.'''
return self._query.lower() in item.lower()

def _filter_results(self, soup):
'''Processes and filters the search results.'''
tags = soup.select(self._selectors('links'))
results = [self._item(l) for l in tags]

if u'url' in self._filters:
results = [l for l in results if self._query_in(l['link'])]
if u'title' in self._filters:
results = [l for l in results if self._query_in(l['title'])]
if u'text' in self._filters:
results = [l for l in results if self._query_in(l['text'])]
if u'host' in self._filters:
results = [l for l in results if self._query_in(utils.domain(l['link']))]
return results

def _collect_results(self, items):
'''Colects the search results items.'''
for item in items:
if not utils.is_url(item['link']):
continue
if item in self.results:
continue
if self.ignore_duplicate_urls and item['link'] in self.results.links():
continue
if self.ignore_duplicate_domains and item['host'] in self.results.hosts():
continue
self.results.append(item)

def _is_ok(self, response):
'''Checks if the HTTP response is 200 OK.'''
self.is_banned = response.http in [403, 429, 503]

if response.http == 200:
return True
msg = ('HTTP ' + str(response.http)) if response.http else response.html
out.console(msg, level=out.Level.error)
return False

def disable_console(self):
'''Disables console output'''
out.console = lambda msg, end='\n', level=None: None

def set_headers(self, headers):
'''Sets HTTP headers.

:param headers: dict The headers
'''
self._http_client.session.headers.update(headers)

def set_search_operator(self, operator):
'''Filters search results based on the operator.
Supported operators: 'url', 'title', 'text', 'host'

:param operator: str The search operator(s)
'''
operators = utils.decode_bytes(operator or u'').lower().split(u',')
supported_operators = [u'url', u'title', u'text', u'host']

for operator in operators:
if operator not in supported_operators:
msg = u'Ignoring unsupported operator "{}"'.format(operator)
out.console(msg, level=out.Level.warning)
else:
self._filters += [operator]

def search(self, query, pages=cfg.SEARCH_ENGINE_RESULTS_PAGES):
'''Queries the search engine, goes through the pages and collects the results.

:param query: str The search query
:param pages: int Optional, the maximum number of results pages to search
:returns SearchResults object
'''
out.console('Searching {}'.format(self.__class__.__name__))
self._query = utils.decode_bytes(query)
self.results = SearchResults()
request = self._first_page()

for page in range(1, pages + 1):
try:
response = self._get_page(request['url'], request['data'])
if not self._is_ok(response):
break
tags = BeautifulSoup(response.html, "html.parser")
items = self._filter_results(tags)
self._collect_results(items)

msg = 'page: {:<8} links: {}'.format(page, len(self.results))
out.console(msg, end='')
request = self._next_page(tags)

if not request['url']:
break
if page < pages:
sleep(random_uniform(*self._delay))
except KeyboardInterrupt:
break
out.console('', end='')
return self.results

def output(self, output=out.PRINT, path=None):
'''Prints search results and/or creates report files.
Supported output format: html, csv, json.

:param output: str Optional, the output format
:param path: str Optional, the file to save the report
'''
output = (output or '').lower()
if not path:
path = cfg.os_path.join(cfg.OUTPUT_DIR, u'_'.join(self._query.split()))
out.console('')

if out.PRINT in output:
out.print_results([self])
if out.HTML in output:
out.write_file(out.create_html_data([self]), path + u'.html')
if out.CSV in output:
out.write_file(out.create_csv_data([self]), path + u'.csv')
if out.JSON in output:
out.write_file(out.create_json_data([self]), path + u'.json')
28 changes: 28 additions & 0 deletions build/lib/search_engines/engines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from .aol import Aol
from .ask import Ask
from .bing import Bing
from .dogpile import Dogpile
from .duckduckgo import Duckduckgo
from .google import Google
from .mojeek import Mojeek
from .startpage import Startpage
from .torch import Torch
from .yahoo import Yahoo
from .qwant import Qwant
from .brave import Brave


search_engines_dict = {
'google': Google,
'bing': Bing,
'yahoo': Yahoo,
'aol': Aol,
'duckduckgo': Duckduckgo,
'startpage': Startpage,
'dogpile': Dogpile,
'ask': Ask,
'mojeek': Mojeek,
'qwant': Qwant,
'brave': Brave,
'torch': Torch
}
17 changes: 17 additions & 0 deletions build/lib/search_engines/engines/aol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from .yahoo import Yahoo
from ..config import PROXY, TIMEOUT


class Aol(Yahoo):
'''Seaches aol.com'''
def __init__(self, proxy=PROXY, timeout=TIMEOUT):
super(Aol, self).__init__(proxy, timeout)
self._base_url = u'https://search.aol.com'

def _first_page(self):
'''Returns the initial page and query.'''
url_str = u'{}/aol/search?q={}&ei=UTF-8&nojs=1'
url = url_str.format(self._base_url, self._query)
self._http_client.get(self._base_url)
return {'url':url, 'data':None}

35 changes: 35 additions & 0 deletions build/lib/search_engines/engines/ask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from ..engine import SearchEngine
from ..config import PROXY, TIMEOUT


class Ask(SearchEngine):
'''Searches ask.com'''
def __init__(self, proxy=PROXY, timeout=TIMEOUT):
super(Ask, self).__init__(proxy, timeout)
self._base_url = 'https://uk.ask.com'

def _selectors(self, element):
'''Returns the appropriate CSS selector.'''
selectors = {
'url': 'a.PartialSearchResults-item-title-link.result-link',
'title': 'a.PartialSearchResults-item-title-link.result-link',
'text': 'p.PartialSearchResults-item-abstract',
'links': 'div.PartialSearchResults-body div.PartialSearchResults-item',
'next': 'li.PartialWebPagination-next a[href]'
}
return selectors[element]

def _first_page(self):
'''Returns the initial page and query.'''
url_str = u'{}/web?o=0&l=dir&qo=serpSearchTopBox&q={}'
url = url_str.format(self._base_url, self._query)
return {'url':url, 'data':None}

def _next_page(self, tags):
'''Returns the next page URL and post data (if any)'''
next_page = tags.select_one(self._selectors('next'))
url = None
if next_page:
url = self._base_url + next_page['href']
return {'url':url, 'data':None}

57 changes: 57 additions & 0 deletions build/lib/search_engines/engines/bing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import base64
from urllib.parse import urlparse, parse_qs

from ..engine import SearchEngine
from ..config import PROXY, TIMEOUT, FAKE_USER_AGENT


class Bing(SearchEngine):
'''Searches bing.com'''
def __init__(self, proxy=PROXY, timeout=TIMEOUT):
super(Bing, self).__init__(proxy, timeout)
self._base_url = u'https://www.bing.com'
self.set_headers({'User-Agent':FAKE_USER_AGENT})

def _selectors(self, element):
'''Returns the appropriate CSS selector.'''
selectors = {
'url': 'h2 a',
'title': 'h2',
'text': 'p',
'links': 'ol#b_results > li.b_algo',
'next': 'div#b_content nav[role="navigation"] a.sb_pagN'
}
return selectors[element]

def _first_page(self):
'''Returns the initial page and query.'''
self._get_page(self._base_url)
url = u'{}/search?q={}&search=&form=QBLH'.format(self._base_url, self._query)
return {'url':url, 'data':None}

def _next_page(self, tags):
'''Returns the next page URL and post data (if any)'''
selector = self._selectors('next')
next_page = self._get_tag_item(tags.select_one(selector), 'href')
url = None
if next_page:
url = (self._base_url + next_page)
return {'url':url, 'data':None}

def _get_url(self, tag, item='href'):
'''Returns the URL of search results items.'''
url = super(Bing, self)._get_url(tag, 'href')

try:
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
encoded_url = query_params["u"][0][2:]
# fix base64 padding
encoded_url += (len(encoded_url) % 4) * "="

decoded_bytes = base64.b64decode(encoded_url)
resp = decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Error decoding Base64 string: {e}")

return resp
Loading