diff --git a/build/lib/search_engines/__init__.py b/build/lib/search_engines/__init__.py new file mode 100644 index 0000000..e806d50 --- /dev/null +++ b/build/lib/search_engines/__init__.py @@ -0,0 +1,20 @@ +from .engines import * + + +__title__ = 'search_engines' +__version__ = '0.5' +__author__ = 'Tasos M. Adamopoulos' + +__all__ = [ + 'Google', + 'Bing', + 'Yahoo', + 'Aol', + 'Duckduckgo', + 'Startpage', + 'Dogpile', + 'Ask', + 'Mojeek', + 'Qwant', + 'Torch' +] diff --git a/build/lib/search_engines/config.py b/build/lib/search_engines/config.py new file mode 100644 index 0000000..97f69d1 --- /dev/null +++ b/build/lib/search_engines/config.py @@ -0,0 +1,30 @@ +from os import path as os_path, pardir as os_pardir, name as os_name +from sys import version_info + + +## Python version +PYTHON_VERSION = version_info.major + +## Maximum number or pages to search +SEARCH_ENGINE_RESULTS_PAGES = 20 + +## HTTP request timeout +TIMEOUT = 10 + +## Default User-Agent string +USER_AGENT = 'search_engines/0.5 Repo: https://github.com/tasos-py/Search-Engines-Scraper' + +## Fake User-Agent string - Google desn't like the default user-agent +FAKE_USER_AGENT = 'Mozilla/5.0 (Windows NT 6.1; rv:84.0) Gecko/20100101 Firefox/84.0' + +## Proxy server +PROXY = None + +## TOR proxy server +TOR = 'socks5h://127.0.0.1:9050' + +_base_dir = os_path.abspath(os_path.dirname(os_path.abspath(__file__))) + +## Path to output files +OUTPUT_DIR = os_path.join(_base_dir, 'search_results') + os_path.sep + diff --git a/build/lib/search_engines/engine.py b/build/lib/search_engines/engine.py new file mode 100644 index 0000000..769306f --- /dev/null +++ b/build/lib/search_engines/engine.py @@ -0,0 +1,205 @@ +from bs4 import BeautifulSoup +from time import sleep +from random import uniform as random_uniform +from collections import namedtuple + +from .results import SearchResults +from .http_client import HttpClient +from . import utils +from . import output as out +from . import config as cfg + + +class SearchEngine(object): + '''The base class for all Search Engines.''' + def __init__(self, proxy=cfg.PROXY, timeout=cfg.TIMEOUT): + ''' + :param str proxy: optional, a proxy server + :param int timeout: optional, the HTTP timeout + ''' + self._http_client = HttpClient(timeout, proxy) + self._delay = (1, 4) + self._query = '' + self._filters = [] + + self.results = SearchResults() + '''The search results.''' + self.ignore_duplicate_urls = False + '''Collects only unique URLs.''' + self.ignore_duplicate_domains = False + '''Collects only unique domains.''' + self.is_banned = False + '''Indicates if a ban occured''' + + def _selectors(self, element): + '''Returns the appropriate CSS selector.''' + raise NotImplementedError() + + def _first_page(self): + '''Returns the initial page URL.''' + raise NotImplementedError() + + def _next_page(self, tags): + '''Returns the next page URL and post data.''' + raise NotImplementedError() + + def _get_url(self, tag, item='href'): + '''Returns the URL of search results items.''' + selector = self._selectors('url') + url = self._get_tag_item(tag.select_one(selector), item) + return utils.unquote_url(url) + + def _get_title(self, tag, item='text'): + '''Returns the title of search results items.''' + selector = self._selectors('title') + return self._get_tag_item(tag.select_one(selector), item) + + def _get_text(self, tag, item='text'): + '''Returns the text of search results items.''' + selector = self._selectors('text') + return self._get_tag_item(tag.select_one(selector), item) + + def _get_page(self, page, data=None): + '''Gets pagination links.''' + if data: + return self._http_client.post(page, data) + return self._http_client.get(page) + + def _get_tag_item(self, tag, item): + '''Returns Tag attributes.''' + if not tag: + return u'' + return tag.text if item == 'text' else tag.get(item, u'') + + def _item(self, link): + '''Returns a dictionary of the link data.''' + return { + 'host': utils.domain(self._get_url(link)), + 'link': self._get_url(link), + 'title': self._get_title(link).strip(), + 'text': self._get_text(link).strip() + } + + def _query_in(self, item): + '''Checks if query is contained in the item.''' + return self._query.lower() in item.lower() + + def _filter_results(self, soup): + '''Processes and filters the search results.''' + tags = soup.select(self._selectors('links')) + results = [self._item(l) for l in tags] + + if u'url' in self._filters: + results = [l for l in results if self._query_in(l['link'])] + if u'title' in self._filters: + results = [l for l in results if self._query_in(l['title'])] + if u'text' in self._filters: + results = [l for l in results if self._query_in(l['text'])] + if u'host' in self._filters: + results = [l for l in results if self._query_in(utils.domain(l['link']))] + return results + + def _collect_results(self, items): + '''Colects the search results items.''' + for item in items: + if not utils.is_url(item['link']): + continue + if item in self.results: + continue + if self.ignore_duplicate_urls and item['link'] in self.results.links(): + continue + if self.ignore_duplicate_domains and item['host'] in self.results.hosts(): + continue + self.results.append(item) + + def _is_ok(self, response): + '''Checks if the HTTP response is 200 OK.''' + self.is_banned = response.http in [403, 429, 503] + + if response.http == 200: + return True + msg = ('HTTP ' + str(response.http)) if response.http else response.html + out.console(msg, level=out.Level.error) + return False + + def disable_console(self): + '''Disables console output''' + out.console = lambda msg, end='\n', level=None: None + + def set_headers(self, headers): + '''Sets HTTP headers. + + :param headers: dict The headers + ''' + self._http_client.session.headers.update(headers) + + def set_search_operator(self, operator): + '''Filters search results based on the operator. + Supported operators: 'url', 'title', 'text', 'host' + + :param operator: str The search operator(s) + ''' + operators = utils.decode_bytes(operator or u'').lower().split(u',') + supported_operators = [u'url', u'title', u'text', u'host'] + + for operator in operators: + if operator not in supported_operators: + msg = u'Ignoring unsupported operator "{}"'.format(operator) + out.console(msg, level=out.Level.warning) + else: + self._filters += [operator] + + def search(self, query, pages=cfg.SEARCH_ENGINE_RESULTS_PAGES): + '''Queries the search engine, goes through the pages and collects the results. + + :param query: str The search query + :param pages: int Optional, the maximum number of results pages to search + :returns SearchResults object + ''' + out.console('Searching {}'.format(self.__class__.__name__)) + self._query = utils.decode_bytes(query) + self.results = SearchResults() + request = self._first_page() + + for page in range(1, pages + 1): + try: + response = self._get_page(request['url'], request['data']) + if not self._is_ok(response): + break + tags = BeautifulSoup(response.html, "html.parser") + items = self._filter_results(tags) + self._collect_results(items) + + msg = 'page: {:<8} links: {}'.format(page, len(self.results)) + out.console(msg, end='') + request = self._next_page(tags) + + if not request['url']: + break + if page < pages: + sleep(random_uniform(*self._delay)) + except KeyboardInterrupt: + break + out.console('', end='') + return self.results + + def output(self, output=out.PRINT, path=None): + '''Prints search results and/or creates report files. + Supported output format: html, csv, json. + + :param output: str Optional, the output format + :param path: str Optional, the file to save the report + ''' + output = (output or '').lower() + if not path: + path = cfg.os_path.join(cfg.OUTPUT_DIR, u'_'.join(self._query.split())) + out.console('') + + if out.PRINT in output: + out.print_results([self]) + if out.HTML in output: + out.write_file(out.create_html_data([self]), path + u'.html') + if out.CSV in output: + out.write_file(out.create_csv_data([self]), path + u'.csv') + if out.JSON in output: + out.write_file(out.create_json_data([self]), path + u'.json') diff --git a/build/lib/search_engines/engines/__init__.py b/build/lib/search_engines/engines/__init__.py new file mode 100644 index 0000000..be3b1ea --- /dev/null +++ b/build/lib/search_engines/engines/__init__.py @@ -0,0 +1,28 @@ +from .aol import Aol +from .ask import Ask +from .bing import Bing +from .dogpile import Dogpile +from .duckduckgo import Duckduckgo +from .google import Google +from .mojeek import Mojeek +from .startpage import Startpage +from .torch import Torch +from .yahoo import Yahoo +from .qwant import Qwant +from .brave import Brave + + +search_engines_dict = { + 'google': Google, + 'bing': Bing, + 'yahoo': Yahoo, + 'aol': Aol, + 'duckduckgo': Duckduckgo, + 'startpage': Startpage, + 'dogpile': Dogpile, + 'ask': Ask, + 'mojeek': Mojeek, + 'qwant': Qwant, + 'brave': Brave, + 'torch': Torch +} diff --git a/build/lib/search_engines/engines/aol.py b/build/lib/search_engines/engines/aol.py new file mode 100644 index 0000000..a30249a --- /dev/null +++ b/build/lib/search_engines/engines/aol.py @@ -0,0 +1,17 @@ +from .yahoo import Yahoo +from ..config import PROXY, TIMEOUT + + +class Aol(Yahoo): + '''Seaches aol.com''' + def __init__(self, proxy=PROXY, timeout=TIMEOUT): + super(Aol, self).__init__(proxy, timeout) + self._base_url = u'https://search.aol.com' + + def _first_page(self): + '''Returns the initial page and query.''' + url_str = u'{}/aol/search?q={}&ei=UTF-8&nojs=1' + url = url_str.format(self._base_url, self._query) + self._http_client.get(self._base_url) + return {'url':url, 'data':None} + diff --git a/build/lib/search_engines/engines/ask.py b/build/lib/search_engines/engines/ask.py new file mode 100644 index 0000000..28dec2c --- /dev/null +++ b/build/lib/search_engines/engines/ask.py @@ -0,0 +1,35 @@ +from ..engine import SearchEngine +from ..config import PROXY, TIMEOUT + + +class Ask(SearchEngine): + '''Searches ask.com''' + def __init__(self, proxy=PROXY, timeout=TIMEOUT): + super(Ask, self).__init__(proxy, timeout) + self._base_url = 'https://uk.ask.com' + + def _selectors(self, element): + '''Returns the appropriate CSS selector.''' + selectors = { + 'url': 'a.PartialSearchResults-item-title-link.result-link', + 'title': 'a.PartialSearchResults-item-title-link.result-link', + 'text': 'p.PartialSearchResults-item-abstract', + 'links': 'div.PartialSearchResults-body div.PartialSearchResults-item', + 'next': 'li.PartialWebPagination-next a[href]' + } + return selectors[element] + + def _first_page(self): + '''Returns the initial page and query.''' + url_str = u'{}/web?o=0&l=dir&qo=serpSearchTopBox&q={}' + url = url_str.format(self._base_url, self._query) + return {'url':url, 'data':None} + + def _next_page(self, tags): + '''Returns the next page URL and post data (if any)''' + next_page = tags.select_one(self._selectors('next')) + url = None + if next_page: + url = self._base_url + next_page['href'] + return {'url':url, 'data':None} + diff --git a/build/lib/search_engines/engines/bing.py b/build/lib/search_engines/engines/bing.py new file mode 100644 index 0000000..45dcd4f --- /dev/null +++ b/build/lib/search_engines/engines/bing.py @@ -0,0 +1,57 @@ +import base64 +from urllib.parse import urlparse, parse_qs + +from ..engine import SearchEngine +from ..config import PROXY, TIMEOUT, FAKE_USER_AGENT + + +class Bing(SearchEngine): + '''Searches bing.com''' + def __init__(self, proxy=PROXY, timeout=TIMEOUT): + super(Bing, self).__init__(proxy, timeout) + self._base_url = u'https://www.bing.com' + self.set_headers({'User-Agent':FAKE_USER_AGENT}) + + def _selectors(self, element): + '''Returns the appropriate CSS selector.''' + selectors = { + 'url': 'h2 a', + 'title': 'h2', + 'text': 'p', + 'links': 'ol#b_results > li.b_algo', + 'next': 'div#b_content nav[role="navigation"] a.sb_pagN' + } + return selectors[element] + + def _first_page(self): + '''Returns the initial page and query.''' + self._get_page(self._base_url) + url = u'{}/search?q={}&search=&form=QBLH'.format(self._base_url, self._query) + return {'url':url, 'data':None} + + def _next_page(self, tags): + '''Returns the next page URL and post data (if any)''' + selector = self._selectors('next') + next_page = self._get_tag_item(tags.select_one(selector), 'href') + url = None + if next_page: + url = (self._base_url + next_page) + return {'url':url, 'data':None} + + def _get_url(self, tag, item='href'): + '''Returns the URL of search results items.''' + url = super(Bing, self)._get_url(tag, 'href') + + try: + parsed_url = urlparse(url) + query_params = parse_qs(parsed_url.query) + encoded_url = query_params["u"][0][2:] + # fix base64 padding + encoded_url += (len(encoded_url) % 4) * "=" + + decoded_bytes = base64.b64decode(encoded_url) + resp = decoded_bytes.decode('utf-8') + except Exception as e: + print(f"Error decoding Base64 string: {e}") + + return resp diff --git a/build/lib/search_engines/engines/brave.py b/build/lib/search_engines/engines/brave.py new file mode 100644 index 0000000..376761e --- /dev/null +++ b/build/lib/search_engines/engines/brave.py @@ -0,0 +1,39 @@ +from ..engine import SearchEngine +from ..config import PROXY, TIMEOUT + + +class Brave(SearchEngine): + '''Searches brave.com''' + def __init__(self, proxy=PROXY, timeout=TIMEOUT): + super(Brave, self).__init__(proxy, timeout) + self._base_url = 'https://search.brave.com' + + def _selectors(self, element): + '''Returns the appropriate CSS selector.''' + selectors = { + 'url': 'a.result-header[href]', + 'title': 'a.result-header[href] span.snippet-title', + 'text': 'div.snippet-content', + 'links': 'div#results div[data-loc="main"]', + 'next': {'tag':'div#pagination a[href]', 'text':'Next', 'skip':'disabled'} + } + return selectors[element] + + def _first_page(self): + '''Returns the initial page and query.''' + url_str = u'{}/search?q={}&source=web' + url = url_str.format(self._base_url, self._query) + return {'url':url, 'data':None} + + def _next_page(self, tags): + '''Returns the next page URL and post data (if any)''' + selector = self._selectors('next') + next_page = [ + tag for tag in tags.select(selector['tag']) + if tag.get_text().strip() == selector['text'] and selector['skip'] not in tag['class'] + ] + url = None + if next_page: + url = self._base_url + next_page[0]['href'] + return {'url':url, 'data':None} + diff --git a/build/lib/search_engines/engines/dogpile.py b/build/lib/search_engines/engines/dogpile.py new file mode 100644 index 0000000..ac1074e --- /dev/null +++ b/build/lib/search_engines/engines/dogpile.py @@ -0,0 +1,42 @@ +from ..engine import SearchEngine +from ..config import PROXY, TIMEOUT, FAKE_USER_AGENT +from ..utils import unquote_url + + +class Dogpile(SearchEngine): + '''Seaches dogpile.com''' + def __init__(self, proxy=PROXY, timeout=TIMEOUT): + super(Dogpile, self).__init__(proxy, timeout) + self._base_url = 'https://www.dogpile.com' + self.set_headers({'User-Agent':FAKE_USER_AGENT}) + + def _selectors(self, element): + '''Returns the appropriate CSS selector.''' + selectors = { + 'url': 'a[class$=title]', + 'title': 'a[class$=title]', + 'text': {'tag':'span', 'index':-1}, + 'links': 'div[class^=web-] div[class$=__result]', + 'next': 'a.pagination__num--next' + } + return selectors[element] + + def _first_page(self): + '''Returns the initial page and query.''' + url = u'{}/serp?q={}'.format(self._base_url, self._query) + return {'url':url, 'data':None} + + def _next_page(self, tags): + '''Returns the next page URL and post data (if any)''' + selector = self._selectors('next') + next_page = self._get_tag_item(tags.select_one(selector), 'href') + url = (self._base_url + next_page) if next_page else None + return {'url':url, 'data':None} + + def _get_text(self, tag, item='text'): + '''Returns the text of search results items.''' + selector = self._selectors('text') + tag = tag.select(selector['tag'])[selector['index']] + return self._get_tag_item(tag, 'text') + + diff --git a/build/lib/search_engines/engines/duckduckgo.py b/build/lib/search_engines/engines/duckduckgo.py new file mode 100644 index 0000000..9e2459e --- /dev/null +++ b/build/lib/search_engines/engines/duckduckgo.py @@ -0,0 +1,46 @@ +from ..engine import SearchEngine +from ..config import PROXY, TIMEOUT, FAKE_USER_AGENT +from ..utils import unquote_url, quote_url + +class Duckduckgo(SearchEngine): + '''Searches duckduckgo.com''' + def __init__(self, proxy=PROXY, timeout=TIMEOUT): + super(Duckduckgo, self).__init__(proxy, timeout) + self._base_url = u'https://html.duckduckgo.com' + self._current_page = 1 + self.set_headers({'User-Agent':FAKE_USER_AGENT}) + + def _selectors(self, element): + '''Returns the appropriate CSS selector.''' + selectors = { + 'url': 'a.result__a', + 'title': 'a.result__a', + 'text': 'a.result__snippet', + 'links': 'div#links div.result', + 'next': 'input[value="next"]' + } + return selectors[element] + + def _first_page(self): + '''Returns the initial page and query.''' + url = u'{}/html/?q={}'.format(self._base_url, quote_url(self._query, '')) + return {'url':url, 'data':None} + + def _next_page(self, tags): + '''Returns the next page URL and post data (if any)''' + self._current_page += 1 + selector = self._selectors('next').format(page=self._current_page) + next_page = self._get_tag_item(tags.select_one(selector), 'href') + url = None + if next_page: + url = self._base_url + next_page + return {'url':url, 'data':None} + + def _get_url(self, tag, item='href'): + '''Returns the URL of search results item.''' + selector = self._selectors('url') + url = self._get_tag_item(tag.select_one(selector), item) + + if url.startswith(u'/url?q='): + url = url.replace(u'/url?q=', u'').split(u'&sa=')[0] + return unquote_url(url) diff --git a/build/lib/search_engines/engines/google.py b/build/lib/search_engines/engines/google.py new file mode 100644 index 0000000..7599f16 --- /dev/null +++ b/build/lib/search_engines/engines/google.py @@ -0,0 +1,111 @@ +from ..engine import SearchEngine +from ..config import PROXY, TIMEOUT, FAKE_USER_AGENT +from ..utils import unquote_url, quote_url +from .. import output as out +from bs4 import BeautifulSoup +from urllib.parse import urlparse, parse_qs + + +class Google(SearchEngine): + '''Searches google.com''' + def __init__(self, proxy=PROXY, timeout=TIMEOUT, before=None, after=None): + super(Google, self).__init__(proxy, timeout) + self._base_url = 'https://www.google.com' + self._delay = (2, 6) + self.before = before + self.after = after + + self.set_headers({'User-Agent':FAKE_USER_AGENT}) + + def _selectors(self, element): + '''Returns the appropriate CSS selector.''' + selectors = { + 'url': 'a[href]', + 'title': 'a h3', + 'text': 'div', + 'links': 'div#main > div', + 'next': 'footer a[href][aria-label="Next page"]' + } + return selectors[element] + + def _first_page(self): + '''Returns the initial page and query.''' + url = u'{}/search?q={}'.format(self._base_url, quote_url(self._query, '')) + response = self._get_page(url) + bs = BeautifulSoup(response.html, "html.parser") + + noscript_link = bs.select_one('noscript a') + if noscript_link and 'href' in noscript_link.attrs: + url = noscript_link['href'] + url = u'{}/search?{}'.format(self._base_url, url) + else: + # Look for any 'a' tag with a 'data-ved' attribute + data_ved_link = bs.select_one('a[data-ved]') + if data_ved_link and 'href' in data_ved_link.attrs: + url = data_ved_link['href'] + if url.startswith('/url?'): + # Extract the actual URL from Google's redirect URL + parsed_url = urlparse(url) + query_params = parse_qs(parsed_url.query) + if 'q' in query_params: + url = query_params['q'][0] + else: + url = u'{}{}'.format(self._base_url, url) + else: + msg = "Warning: Could not find expected 'noscript a' element or any 'a' tag with 'data-ved'. Using original URL." + out.console(msg, level=out.Level.error) + + response = self._get_page(url) + bs = BeautifulSoup(response.html, "html.parser") + + inputs = {i['name']:i.get('value') for i in bs.select('form input[name]') if i['name'] != 'btnI'} + inputs['q'] = quote_url(self._query, '') + inputs['q'] = inputs['q']+f'+after:{self.after}+before:{self.before}' + url = u'{}/search?{}'.format(self._base_url, '&'.join([k + '=' + (v or '') for k,v in inputs.items()])) + print("!!! NEW URL", url) + # if self.after and self.before: + # url = url+f'+after:{self.after}+before:{self.before}' + # print("!!! NEW URL", url) + + return {'url':url, 'data':None} + + def _next_page(self, tags): + '''Returns the next page URL and post data (if any)''' + #tags = self._check_consent(tags) + tag = tags.select_one(self._selectors('next')) + next_page = self._get_tag_item(tag, 'href') + + url = None + if next_page: + url = self._base_url + next_page + return {'url':url, 'data':None} + + def _get_url(self, tag, item='href'): + '''Returns the URL of search results item.''' + selector = self._selectors('url') + url = self._get_tag_item(tag.select_one(selector), item) + + if url.startswith(u'/url?q='): + url = url.replace(u'/url?q=', u'').split(u'&sa=')[0] + return unquote_url(url) + + def _get_text(self, tag, item='text'): + '''Returns the text of search results items.''' + tag = tag.select_one(self._selectors('text')) + return '\n'.join(list(tag.stripped_strings)[2:]) if tag else '' + + def _check_consent(self, page): + '''Checks if cookies consent is required''' + url = 'https://consent.google.com/save' + bs = BeautifulSoup(page.html, "html.parser") + consent_form = bs.select('form[action="{}"] input[name]'.format(url)) + if consent_form: + data = {i['name']:i.get('value') for i in consent_form if i['name'] not in ['set_sc', 'set_aps']} + page = self._get_page(url, data) + return page + + def _get_page(self, page, data=None): + '''Gets pagination links.''' + page = super(Google, self)._get_page(page, data) + page = self._check_consent(page) + return page diff --git a/build/lib/search_engines/engines/metager.py b/build/lib/search_engines/engines/metager.py new file mode 100644 index 0000000..ada1acc --- /dev/null +++ b/build/lib/search_engines/engines/metager.py @@ -0,0 +1,48 @@ +from bs4 import BeautifulSoup + +from search_engines.engine import SearchEngine +from search_engines.config import PROXY, TIMEOUT, FAKE_USER_AGENT + + +class Metager(SearchEngine): + '''Searches metager.org''' + + def __init__(self, proxy=PROXY, timeout=TIMEOUT): + super(Metager, self).__init__(proxy, timeout) + self._base_url = 'https://metager.org' + self.set_headers({'User-Agent': FAKE_USER_AGENT}) + + def _selectors(self, element): + """Returns the appropriate CSS selector.""" + selectors = { + 'url': 'a.result-link', + 'title': 'h2.result-title a', + 'text': 'div.result-description', + 'links': '#results div.result', + 'next': '#next-search-link a', + } + return selectors[element] + + def redirect(self, query): + '''Redirects initial request to actual result page.''' + response = self._get_page(query) + src_page = BeautifulSoup(response.html, "html.parser") + url = src_page.select_one('iframe').get('src') + + return url + + def _first_page(self): + '''Returns the initial page and query.''' + query = f'{self._base_url}/meta/meta.ger3?eingabe={self._query}' + url = self.redirect(query) + + return {'url': url, 'data': None} + + def _next_page(self, tags): + '''Returns the next page URL.''' + next_page = tags.select_one(self._selectors('next')) + url = None + if next_page: + url = self.redirect(next_page['href']) + + return {'url': url, 'data': None} diff --git a/build/lib/search_engines/engines/mojeek.py b/build/lib/search_engines/engines/mojeek.py new file mode 100644 index 0000000..406445a --- /dev/null +++ b/build/lib/search_engines/engines/mojeek.py @@ -0,0 +1,37 @@ +from ..engine import SearchEngine +from ..config import PROXY, TIMEOUT, FAKE_USER_AGENT + + +class Mojeek(SearchEngine): + '''Searches mojeek.com''' + def __init__(self, proxy=PROXY, timeout=TIMEOUT): + super(Mojeek, self).__init__(proxy, timeout) + self._base_url = 'https://www.mojeek.com' + self.set_headers({'User-Agent':FAKE_USER_AGENT}) + + def _selectors(self, element): + '''Returns the appropriate CSS selector.''' + selectors = { + 'url': 'a.ob[href]', + 'title': 'a.ob[href]', + 'text': 'p.s', + 'links': 'ul.results-standard > li', + 'next': {'href':'div.pagination li a[href]', 'text':'Next'} + } + return selectors[element] + + def _first_page(self): + '''Returns the initial page and query.''' + url = u'{}/search?q={}'.format(self._base_url, self._query) + return {'url':url, 'data':None} + + def _next_page(self, tags): + '''Returns the next page URL and post data (if any)''' + selector = self._selectors('next') + next_page = [ + i['href'] for i in tags.select(selector['href']) + if i.text == selector['text'] + ] + url = (self._base_url + next_page[0]) if next_page else None + return {'url':url, 'data':None} + diff --git a/build/lib/search_engines/engines/qwant.py b/build/lib/search_engines/engines/qwant.py new file mode 100644 index 0000000..cedc109 --- /dev/null +++ b/build/lib/search_engines/engines/qwant.py @@ -0,0 +1,66 @@ +from json import loads + +from ..engine import SearchEngine +from ..config import PROXY, TIMEOUT +from ..utils import unquote_url + + +class Qwant(SearchEngine): + '''Searches qwant.com''' + def __init__(self, proxy=PROXY, timeout=TIMEOUT): + super(Qwant, self).__init__(proxy, timeout) + self._base_url = u'https://api.qwant.com/v3/search/web?q={}&count=10&locale=en_US&offset={}&device=desktop&safesearch=1' + self._offset = 0 + self._max_offset = 50 + + def _selectors(self, element): + '''Returns the appropriate CSS selector.''' + selectors = { + 'url': 'url', + 'title': 'title', + 'text': 'desc', + 'links': ['data', 'result', 'items', 'mainline'] + } + return selectors[element] + + def _first_page(self): + '''Returns the initial page and query.''' + url = self._base_url.format(self._query, self._offset) + return {'url':url, 'data':None} + + def _next_page(self, tags): + '''Returns the next page URL and post data (if any)''' + self._offset += 10 + url = None + status = loads(tags.get_text())['status'] + if status == 'success' and self._offset <= self._max_offset: + url = self._base_url.format(self._query, self._offset) + return {'url':url, 'data':None} + + def _get_url(self, tag, item='href'): + '''Returns the URL of search results item.''' + return unquote_url(tag.get(self._selectors('url'), u'')) + + def _get_title(self, tag, item='text'): + '''Returns the title of search results items.''' + return tag.get(self._selectors('title'), u'') + + def _get_text(self, tag, item='text'): + '''Returns the text of search results items.''' + return tag.get(self._selectors('text'), u'') + + def _filter_results(self, soup): + '''Processes and filters the search results.''' + tags = loads(soup.get_text())['data']['result']['items']['mainline'] + tags = [j for i in tags for j in i['items'] if i['type'] != u'ads'] + results = [self._item(l) for l in tags] + + if u'url' in self._filters: + results = [l for l in results if self._query_in(l['link'])] + if u'title' in self._filters: + results = [l for l in results if self._query_in(l['title'])] + if u'text' in self._filters: + results = [l for l in results if self._query_in(l['text'])] + if u'host' in self._filters: + results = [l for l in results if self._query_in(utils.domain(l['link']))] + return results diff --git a/build/lib/search_engines/engines/startpage.py b/build/lib/search_engines/engines/startpage.py new file mode 100644 index 0000000..39d29cb --- /dev/null +++ b/build/lib/search_engines/engines/startpage.py @@ -0,0 +1,71 @@ +from bs4 import BeautifulSoup + +from ..engine import SearchEngine +from ..config import PROXY, TIMEOUT, FAKE_USER_AGENT +from .. import output as out + + +class Startpage(SearchEngine): + '''Searches startpage.com''' + def __init__(self, proxy=PROXY, timeout=TIMEOUT): + super(Startpage, self).__init__(proxy, timeout) + self._base_url = 'https://www.startpage.com' + self.set_headers({'User-Agent':FAKE_USER_AGENT}) + + def _selectors(self, element): + '''Returns the appropriate CSS selector.''' + selectors = { + 'url': 'a.w-gl__result-url', + 'title': 'a.w-gl__result-title h3', + 'text': 'p.w-gl__description', + 'links': 'section.w-gl div.w-gl__result', + 'next': {'form':'form.pagination__form', 'text':'Next'}, + 'search_form': 'form#search input[name]', + 'blocked_form': 'form#blocked_feedback_form' + } + return selectors[element] + + def _first_page(self): + '''Returns the initial page and query.''' + response = self._get_page(self._base_url) + tags = BeautifulSoup(response.html, "html.parser") + selector = self._selectors('search_form') + + data = { + i['name']: i.get('value', '') + for i in tags.select(selector) + } + data['query'] = self._query + url = self._base_url + '/sp/search' + return {'url':url, 'data':data} + + def _next_page(self, tags): + '''Returns the next page URL and post data (if any)''' + selector = self._selectors('next') + forms = [ + form + for form in tags.select(selector['form']) + if form.get_text(strip=True) == selector['text'] + ] + url, data = None, None + if forms: + url = self._base_url + forms[0]['action'] + data = { + i['name']:i.get('value', '') + for i in forms[0].select('input') + } + return {'url':url, 'data':data} + + def _is_ok(self, response): + '''Checks if the HTTP response is 200 OK.''' + soup = BeautifulSoup(response.html, 'html.parser') + selector = self._selectors('blocked_form') + is_blocked = soup.select_one(selector) + + self.is_banned = response.http in [403, 429, 503] or is_blocked + + if response.http == 200 and not is_blocked: + return True + msg = 'Banned' if is_blocked else ('HTTP ' + str(response.http)) if response.http else response.html + out.console(msg, level=out.Level.error) + return False diff --git a/build/lib/search_engines/engines/torch.py b/build/lib/search_engines/engines/torch.py new file mode 100644 index 0000000..31f0b13 --- /dev/null +++ b/build/lib/search_engines/engines/torch.py @@ -0,0 +1,37 @@ +from ..engine import SearchEngine +from ..config import TOR, TIMEOUT +from .. import output as out + + +class Torch(SearchEngine): + '''Uses torch search engine. Requires TOR proxy.''' + def __init__(self, proxy=TOR, timeout=TIMEOUT): + super(Torch, self).__init__(proxy, timeout) + self._base_url = u'http://torchdeedp3i2jigzjdmfpn5ttjhthh5wbmda2rr3jvqjg5p77c54dqd.onion' + if not proxy: + out.console('Torch requires TOR proxy!', level=out.Level.warning) + self._current_page = 1 + + def _selectors(self, element): + '''Returns the appropriate CSS selector.''' + selectors = { + 'url': 'h5 a[href]', + 'title': 'h5 a[href]', + 'text': 'p', + 'links': 'div.result.mb-3', + 'next': 'ul.pagination a.page-link' + } + return selectors[element] + + def _first_page(self): + '''Returns the initial page and query.''' + url_str = u'{}/search?query={}&action=search' + url = url_str.format(self._base_url, self._query) + return {'url':url, 'data':None} + + def _next_page(self, tags): + '''Returns the next page URL and post data (if any)''' + self._current_page += 1 + url_str = u'{}/search?query={}&page={}' + url = url_str.format(self._base_url, self._query, self._current_page) + return {'url':url, 'data':None} diff --git a/build/lib/search_engines/engines/yahoo.py b/build/lib/search_engines/engines/yahoo.py new file mode 100644 index 0000000..a2f72ec --- /dev/null +++ b/build/lib/search_engines/engines/yahoo.py @@ -0,0 +1,49 @@ +from ..engine import SearchEngine +from ..config import PROXY, TIMEOUT +from ..utils import unquote_url + + +class Yahoo(SearchEngine): + '''Searches yahoo.com''' + def __init__(self, proxy=PROXY, timeout=TIMEOUT): + super(Yahoo, self).__init__(proxy, timeout) + self._base_url = 'https://search.yahoo.com' + + def _selectors(self, element): + '''Returns the appropriate CSS selector.''' + selectors = { + 'url': 'div.compTitle h3.title a', + 'title': 'div.compTitle h3.title', + 'text': 'div.compText', + 'links': 'div#web li div.dd.algo.algo-sr', + 'next': 'a.next' + } + return selectors[element] + + def _first_page(self): + '''Returns the initial page and query.''' + url_str = u'{}/search?p={}&ei=UTF-8&nojs=1' + url = url_str.format(self._base_url, self._query) + return {'url':url, 'data':None} + + def _next_page(self, tags): + '''Returns the next page URL and post data (if any)''' + selector = self._selectors('next') + next_page = self._get_tag_item(tags.select_one(selector), 'href') + url = self._base_url + next_page if next_page else None + return {'url':url, 'data':None} + + def _get_url(self, link, item='href'): + selector = self._selectors('url') + url = self._get_tag_item(link.select_one(selector), 'href') + url = url.split(u'/RU=')[-1].split(u'/R')[0] + return unquote_url(url) + + def _get_title(self, tag, item='text'): + '''Returns the title of search results items.''' + title = tag.select_one(self._selectors('title')) + for span in title.select('span'): + span.decompose() + return self._get_tag_item(title, item) + + diff --git a/build/lib/search_engines/http_client.py b/build/lib/search_engines/http_client.py new file mode 100644 index 0000000..61244ba --- /dev/null +++ b/build/lib/search_engines/http_client.py @@ -0,0 +1,52 @@ +import requests +from collections import namedtuple + +from .config import TIMEOUT, PROXY, USER_AGENT +from . import utils as utl + + +class HttpClient(object): + '''Performs HTTP requests. A `requests` wrapper, essentialy''' + def __init__(self, timeout=TIMEOUT, proxy=PROXY): + self.session = requests.session() + self.session.proxies = self._set_proxy(proxy) + self.session.headers['User-Agent'] = USER_AGENT + self.session.headers['Accept-Language'] = 'en-GB,en;q=0.5' + + self.timeout = timeout + self.response = namedtuple('response', ['http', 'html']) + + def get(self, page): + '''Submits a HTTP GET request.''' + page = self._quote(page) + try: + req = self.session.get(page, timeout=self.timeout) + self.session.headers['Referer'] = page + except requests.exceptions.RequestException as e: + return self.response(http=0, html=e.__doc__) + return self.response(http=req.status_code, html=req.text) + + def post(self, page, data): + '''Submits a HTTP POST request.''' + page = self._quote(page) + try: + req = self.session.post(page, data, timeout=self.timeout) + self.session.headers['Referer'] = page + except requests.exceptions.RequestException as e: + return self.response(http=0, html=e.__doc__) + return self.response(http=req.status_code, html=req.text) + + def _quote(self, url): + '''URL-encodes URLs.''' + if utl.decode_bytes(utl.unquote_url(url)) == utl.decode_bytes(url): + url = utl.quote_url(url) + return url + + def _set_proxy(self, proxy): + '''Returns HTTP or SOCKS proxies dictionary.''' + if proxy: + if not utl.is_url(proxy): + raise ValueError('Invalid proxy format!') + proxy = {'http':proxy, 'https':proxy} + return proxy + diff --git a/build/lib/search_engines/libs/__init__.py b/build/lib/search_engines/libs/__init__.py new file mode 100644 index 0000000..9cad736 --- /dev/null +++ b/build/lib/search_engines/libs/__init__.py @@ -0,0 +1 @@ +'''''' diff --git a/build/lib/search_engines/libs/get_terminal_size.py b/build/lib/search_engines/libs/get_terminal_size.py new file mode 100644 index 0000000..6b71f88 --- /dev/null +++ b/build/lib/search_engines/libs/get_terminal_size.py @@ -0,0 +1,98 @@ +## Code taken from https://github.com/chrippa/backports.shutil_get_terminal_size/blob/master/backports/shutil_get_terminal_size/get_terminal_size.py + +"""This is a backport of shutil.get_terminal_size from Python 3.3. +The original implementation is in C, but here we use the ctypes and +fcntl modules to create a pure Python version of os.get_terminal_size. +""" + +import os +import struct +import sys + +from collections import namedtuple + +__all__ = ["get_terminal_size"] + + +terminal_size = namedtuple("terminal_size", "columns lines") + +try: + from ctypes import windll, create_string_buffer, WinError + + _handle_ids = { + 0: -10, + 1: -11, + 2: -12, + } + + def _get_terminal_size(fd): + handle = windll.kernel32.GetStdHandle(_handle_ids[fd]) + if handle == 0: + raise OSError('handle cannot be retrieved') + if handle == -1: + raise WinError() + csbi = create_string_buffer(22) + res = windll.kernel32.GetConsoleScreenBufferInfo(handle, csbi) + if res: + res = struct.unpack("hhhhHhhhhhh", csbi.raw) + left, top, right, bottom = res[5:9] + columns = right - left + 1 + lines = bottom - top + 1 + return terminal_size(columns, lines) + else: + raise WinError() + +except ImportError: + import fcntl + import termios + + def _get_terminal_size(fd): + try: + res = fcntl.ioctl(fd, termios.TIOCGWINSZ, b"\x00" * 4) + except IOError as e: + raise OSError(e) + lines, columns = struct.unpack("hh", res) + + return terminal_size(columns, lines) + + +def get_terminal_size(fallback=(80, 24)): + """Get the size of the terminal window. + For each of the two dimensions, the environment variable, COLUMNS + and LINES respectively, is checked. If the variable is defined and + the value is a positive integer, it is used. + When COLUMNS or LINES is not defined, which is the common case, + the terminal connected to sys.__stdout__ is queried + by invoking os.get_terminal_size. + If the terminal size cannot be successfully queried, either because + the system doesn't support querying, or because we are not + connected to a terminal, the value given in fallback parameter + is used. Fallback defaults to (80, 24) which is the default + size used by many terminal emulators. + The value returned is a named tuple of type os.terminal_size. + """ + # Try the environment first + try: + columns = int(os.environ["COLUMNS"]) + except (KeyError, ValueError): + columns = 0 + + try: + lines = int(os.environ["LINES"]) + except (KeyError, ValueError): + lines = 0 + + # Only query if necessary + if columns <= 0 or lines <= 0: + try: + size = _get_terminal_size(sys.__stdout__.fileno()) + except (NameError, OSError): + size = terminal_size(*fallback) + + if columns <= 0: + columns = size.columns + if lines <= 0: + lines = size.lines + + return terminal_size(columns, lines) + diff --git a/build/lib/search_engines/libs/windows_cmd_encoding.py b/build/lib/search_engines/libs/windows_cmd_encoding.py new file mode 100644 index 0000000..2bcd43d --- /dev/null +++ b/build/lib/search_engines/libs/windows_cmd_encoding.py @@ -0,0 +1,194 @@ +# Code taken from this great answer on stackoverflow.com: +# https://stackoverflow.com/questions/878972/windows-cmd-encoding-change-causes-python-crash/3259271#3259271 +# Many thanks to the author, Daira Hopwood https://stackoverflow.com/users/393146/daira-hopwood + + +import sys + +if sys.platform == "win32" and sys.version_info.major == 2: + import codecs + from ctypes import WINFUNCTYPE, windll, POINTER, byref, c_int + from ctypes.wintypes import BOOL, HANDLE, DWORD, LPWSTR, LPCWSTR, LPVOID + + original_stderr = sys.stderr + + # If any exception occurs in this code, we'll probably try to print it on stderr, + # which makes for frustrating debugging if stderr is directed to our wrapper. + # So be paranoid about catching errors and reporting them to original_stderr, + # so that we can at least see them. + def _complain(message): + print >>original_stderr, message if isinstance(message, str) else repr(message) + + # Work around . + codecs.register(lambda name: codecs.lookup('utf-8') if name == 'cp65001' else None) + + # Make Unicode console output work independently of the current code page. + # This also fixes . + # Credit to Michael Kaplan + # and TZOmegaTZIOY + # . + try: + # + # HANDLE WINAPI GetStdHandle(DWORD nStdHandle); + # returns INVALID_HANDLE_VALUE, NULL, or a valid handle + # + # + # DWORD WINAPI GetFileType(DWORD hFile); + # + # + # BOOL WINAPI GetConsoleMode(HANDLE hConsole, LPDWORD lpMode); + + GetStdHandle = WINFUNCTYPE(HANDLE, DWORD)(("GetStdHandle", windll.kernel32)) + STD_OUTPUT_HANDLE = DWORD(-11) + STD_ERROR_HANDLE = DWORD(-12) + GetFileType = WINFUNCTYPE(DWORD, DWORD)(("GetFileType", windll.kernel32)) + FILE_TYPE_CHAR = 0x0002 + FILE_TYPE_REMOTE = 0x8000 + GetConsoleMode = WINFUNCTYPE(BOOL, HANDLE, POINTER(DWORD))(("GetConsoleMode", windll.kernel32)) + INVALID_HANDLE_VALUE = DWORD(-1).value + + def not_a_console(handle): + if handle == INVALID_HANDLE_VALUE or handle is None: + return True + return ((GetFileType(handle) & ~FILE_TYPE_REMOTE) != FILE_TYPE_CHAR + or GetConsoleMode(handle, byref(DWORD())) == 0) + + old_stdout_fileno = None + old_stderr_fileno = None + if hasattr(sys.stdout, 'fileno'): + old_stdout_fileno = sys.stdout.fileno() + if hasattr(sys.stderr, 'fileno'): + old_stderr_fileno = sys.stderr.fileno() + + STDOUT_FILENO = 1 + STDERR_FILENO = 2 + real_stdout = (old_stdout_fileno == STDOUT_FILENO) + real_stderr = (old_stderr_fileno == STDERR_FILENO) + + if real_stdout: + hStdout = GetStdHandle(STD_OUTPUT_HANDLE) + if not_a_console(hStdout): + real_stdout = False + + if real_stderr: + hStderr = GetStdHandle(STD_ERROR_HANDLE) + if not_a_console(hStderr): + real_stderr = False + + if real_stdout or real_stderr: + # BOOL WINAPI WriteConsoleW(HANDLE hOutput, LPWSTR lpBuffer, DWORD nChars, + # LPDWORD lpCharsWritten, LPVOID lpReserved); + + WriteConsoleW = WINFUNCTYPE(BOOL, HANDLE, LPWSTR, DWORD, POINTER(DWORD), LPVOID)(("WriteConsoleW", windll.kernel32)) + + class UnicodeOutput: + def __init__(self, hConsole, stream, fileno, name): + self._hConsole = hConsole + self._stream = stream + self._fileno = fileno + self.closed = False + self.softspace = False + self.mode = 'w' + self.encoding = 'utf-8' + self.name = name + self.flush() + + def isatty(self): + return False + + def close(self): + # don't really close the handle, that would only cause problems + self.closed = True + + def fileno(self): + return self._fileno + + def flush(self): + if self._hConsole is None: + try: + self._stream.flush() + except Exception as e: + _complain("%s.flush: %r from %r" % (self.name, e, self._stream)) + raise + + def write(self, text): + try: + if self._hConsole is None: + if isinstance(text, unicode): + text = text.encode('utf-8') + self._stream.write(text) + else: + if not isinstance(text, unicode): + text = str(text).decode('utf-8') + remaining = len(text) + while remaining: + n = DWORD(0) + # There is a shorter-than-documented limitation on the + # length of the string passed to WriteConsoleW (see + # . + retval = WriteConsoleW(self._hConsole, text, min(remaining, 10000), byref(n), None) + if retval == 0 or n.value == 0: + raise IOError("WriteConsoleW returned %r, n.value = %r" % (retval, n.value)) + remaining -= n.value + if not remaining: + break + text = text[n.value:] + except Exception as e: + _complain("%s.write: %r" % (self.name, e)) + raise + + def writelines(self, lines): + try: + for line in lines: + self.write(line) + except Exception as e: + _complain("%s.writelines: %r" % (self.name, e)) + raise + + if real_stdout: + sys.stdout = UnicodeOutput(hStdout, None, STDOUT_FILENO, '') + else: + sys.stdout = UnicodeOutput(None, sys.stdout, old_stdout_fileno, '') + + if real_stderr: + sys.stderr = UnicodeOutput(hStderr, None, STDERR_FILENO, '') + else: + sys.stderr = UnicodeOutput(None, sys.stderr, old_stderr_fileno, '') + except Exception as e: + _complain("exception %r while fixing up sys.stdout and sys.stderr" % (e,)) + + + # While we're at it, let's unmangle the command-line arguments: + + # This works around . + GetCommandLineW = WINFUNCTYPE(LPWSTR)(("GetCommandLineW", windll.kernel32)) + CommandLineToArgvW = WINFUNCTYPE(POINTER(LPWSTR), LPCWSTR, POINTER(c_int))(("CommandLineToArgvW", windll.shell32)) + + argc = c_int(0) + argv_unicode = CommandLineToArgvW(GetCommandLineW(), byref(argc)) + + argv = [argv_unicode[i].encode('utf-8') for i in xrange(0, argc.value)] + + if not hasattr(sys, 'frozen'): + # If this is an executable produced by py2exe or bbfreeze, then it will + # have been invoked directly. Otherwise, unicode_argv[0] is the Python + # interpreter, so skip that. + argv = argv[1:] + + # Also skip option arguments to the Python interpreter. + while len(argv) > 0: + arg = argv[0] + if not arg.startswith(u"-") or arg == u"-": + break + argv = argv[1:] + if arg == u'-m': + # sys.argv[0] should really be the absolute path of the module source, + # but never mind + break + if arg == u'-c': + argv[0] = u'-c' + break + + # if you like: + sys.argv = argv + diff --git a/build/lib/search_engines/multiple_search_engines.py b/build/lib/search_engines/multiple_search_engines.py new file mode 100644 index 0000000..b47b9c3 --- /dev/null +++ b/build/lib/search_engines/multiple_search_engines.py @@ -0,0 +1,80 @@ +from .results import SearchResults +from .engines import search_engines_dict +from . import output as out +from . import config as cfg + + +class MultipleSearchEngines(object): + '''Uses multiple search engines.''' + def __init__(self, engines, proxy=cfg.PROXY, timeout=cfg.TIMEOUT): + self._engines = [ + se(proxy, timeout) + for se in search_engines_dict.values() + if se.__name__.lower() in engines + ] + self._filter = None + + self.ignore_duplicate_urls = False + self.ignore_duplicate_domains = False + self.results = SearchResults() + self.banned_engines = [] + + def disable_console(self): + '''Disables console output''' + out.console = lambda msg, end='\n', level=None: None + + def set_search_operator(self, operator): + '''Filters search results based on the operator.''' + self._filter = operator + + def search(self, query, pages=cfg.SEARCH_ENGINE_RESULTS_PAGES): + '''Searches multiples engines and collects the results.''' + self.results = SearchResults() + for engine in self._engines: + engine.ignore_duplicate_urls = self.ignore_duplicate_urls + engine.ignore_duplicate_domains = self.ignore_duplicate_domains + if self._filter: + engine.set_search_operator(self._filter) + + engine_results = engine.search(query, pages) + if engine.ignore_duplicate_urls: + engine_results._results = [ + item for item in engine_results._results + if item['link'] not in self.results.links() + ] + if self.ignore_duplicate_domains: + engine_results._results = [ + item for item in engine_results._results + if item['host'] not in self.results.hosts() + ] + self.results._results += engine_results._results + + if engine.is_banned: + self.banned_engines.append(engine.__class__.__name__) + return self.results + + def output(self, output=out.PRINT, path=None): + '''Prints search results and/or creates report files.''' + output = (output or '').lower() + query = self._engines[0]._query if self._engines else u'' + if not path: + path = cfg.OUTPUT_DIR + u'_'.join(query.split()) + out.console('') + + if out.PRINT in output: + out.print_results(self._engines) + if out.HTML in output: + out.write_file(out.create_html_data(self._engines), path + u'.html') + if out.CSV in output: + out.write_file(out.create_csv_data(self._engines), path + u'.csv') + if out.JSON in output: + out.write_file(out.create_json_data(self._engines), path + u'.json') + + +class AllSearchEngines(MultipleSearchEngines): + '''Uses all search engines.''' + def __init__(self, proxy=cfg.PROXY, timeout=cfg.TIMEOUT): + super(AllSearchEngines, self).__init__( + list(search_engines_dict), proxy, timeout + ) + diff --git a/build/lib/search_engines/output.py b/build/lib/search_engines/output.py new file mode 100644 index 0000000..9855e32 --- /dev/null +++ b/build/lib/search_engines/output.py @@ -0,0 +1,157 @@ +from __future__ import print_function + +import csv +import json +import io +import re +from collections import namedtuple + +try: + from shutil import get_terminal_size +except ImportError: + from .libs.get_terminal_size import get_terminal_size + +from .utils import encode_str, decode_bytes +from .libs import windows_cmd_encoding +from .config import PYTHON_VERSION + + +def print_results(search_engines): + '''Prints the search results.''' + for engine in search_engines: + console(engine.__class__.__name__ + u' results') + + for i, v in enumerate(engine.results, 1): + console(u'{:<4}{}'.format(i, v['link'])) + console(u'') + +def create_csv_data(search_engines): + '''CSV formats the search results.''' + encoder = decode_bytes if PYTHON_VERSION == 3 else encode_str + data = [['query', 'engine', 'domain', 'URL', 'title', 'text']] + + for engine in search_engines: + for i in engine.results: + row = [ + engine._query, engine.__class__.__name__, + i['host'], i['link'], i['title'], i['text'] + ] + row = [encoder(i) for i in row] + data.append(row) + return data + +def create_json_data(search_engines): + '''JSON formats the search results.''' + jobj = { + u'query': search_engines[0]._query, + u'results': { + se.__class__.__name__: [i for i in se.results] + for se in search_engines + } + } + return json.dumps(jobj) + +def create_html_data(search_engines): + '''HTML formats the search results.''' + query = decode_bytes(search_engines[0]._query) if search_engines else u'' + tables = u'' + + for engine in search_engines: + rows = u'' + for i, v in enumerate(engine.results, 1): + data = u'' + if u'title' in engine._filters: + data += HtmlTemplate.data.format(_replace_with_bold(query, v['title'])) + if u'text' in engine._filters: + data += HtmlTemplate.data.format(_replace_with_bold(query, v['text'])) + link = _replace_with_bold(query, v['link']) if u'url' in engine._filters else v['link'] + rows += HtmlTemplate.row.format(number=i, href=v['link'], link=link, data=data) + + engine_name = engine.__class__.__name__ + tables += HtmlTemplate.table.format(engine=engine_name, rows=rows) + return HtmlTemplate.html.format(query=query, table=tables) + +def _replace_with_bold(query, data): + '''Places the query in tags.''' + for match in re.findall(query, data, re.I): + data = data.replace(match, u'{}'.format(match)) + return data + + +def write_file(data, path, encoding='utf-8'): + '''Writes search results data to file.''' + try: + if PYTHON_VERSION == 2 and type(data) in (list, str): + f = io.open(path, 'wb') + else: + f = io.open(path, 'w', encoding=encoding, newline='') + + if type(data) is list: + writer = csv.writer(f) + writer.writerows(data) + else: + f.write(data) + f.close() + console(u'Output file: ' + path) + except IOError as e: + console(e, level=Level.error) + + +def console(msg, end='\n', level=None): + '''Prints data on the console.''' + console_len = get_terminal_size().columns + clear_line = u'\r{}\r'.format(u' ' * (console_len - 1)) + msg = clear_line + (level or u'') + msg + print(msg, end=end) + +Level = namedtuple('Level', ['info', 'warning', 'error'])( + info = u'INFO ', + warning = u'WARNING ', + error = u'ERROR ' +) + +PRINT = 'print' +HTML = 'html' +JSON = 'json' +CSV = 'csv' + + +class HtmlTemplate: + '''HTML template.''' + html = u''' + + + Search Results + + + + + + +
Query: '{query}'
+ {table} + + + ''' + table = u''' + +
{engine} search results
+ + {rows} +
+
+ ''' + row = u''' + {number}) + {link} + {data} + + ''' + data = u'''{}''' + diff --git a/build/lib/search_engines/results.py b/build/lib/search_engines/results.py new file mode 100644 index 0000000..1306aaa --- /dev/null +++ b/build/lib/search_engines/results.py @@ -0,0 +1,41 @@ +class SearchResults(object): + '''Stores the search results''' + def __init__(self, items=None): + self._results = items or [] + + def links(self): + '''Returns the links found in search results''' + return [row.get('link') for row in self._results] + + def titles(self): + '''Returns the titles found in search results''' + return [row.get('title') for row in self._results] + + def text(self): + '''Returns the text found in search results''' + return [row.get('text') for row in self._results] + + def hosts(self): + '''Returns the domains found in search results''' + return [row.get('host') for row in self._results] + + def results(self): + '''Returns all data found in search results''' + return self._results + + def __getitem__(self, index): + return self._results[index] + + def __len__(self): + return len(self._results) + + def __str__(self): + return ''.format(len(self._results)) + + def append(self, item): + '''appends an item to the results list.''' + self._results.append(item) + + def extend(self, items): + '''appends items to the results list.''' + self._results.extend(items) diff --git a/build/lib/search_engines/search_results/__init__.py b/build/lib/search_engines/search_results/__init__.py new file mode 100644 index 0000000..9cad736 --- /dev/null +++ b/build/lib/search_engines/search_results/__init__.py @@ -0,0 +1 @@ +'''''' diff --git a/build/lib/search_engines/utils.py b/build/lib/search_engines/utils.py new file mode 100644 index 0000000..98f438f --- /dev/null +++ b/build/lib/search_engines/utils.py @@ -0,0 +1,34 @@ +import requests +from .config import PYTHON_VERSION + + +def quote_url(url, safe=';/?:@&=+$,#'): + '''encodes URLs.''' + if PYTHON_VERSION == 2: + url = encode_str(url) + return requests.utils.quote(url, safe=safe) + +def unquote_url(url): + '''decodes URLs.''' + if PYTHON_VERSION == 2: + url = encode_str(url) + return decode_bytes(requests.utils.unquote(url)) + +def is_url(link): + '''Checks if link is URL''' + parts = requests.utils.urlparse(link) + return bool(parts.scheme and parts.netloc) + +def domain(url): + '''Returns domain form URL''' + host = requests.utils.urlparse(url).netloc + return host.lower().split(':')[0].replace('www.', '') + +def encode_str(s, encoding='utf-8', errors='replace'): + '''Encodes unicode to str, str to bytes.''' + return s if type(s) is bytes else s.encode(encoding, errors=errors) + +def decode_bytes(s, encoding='utf-8', errors='replace'): + '''Decodes bytes to str, str to unicode.''' + return s.decode(encoding, errors=errors) if type(s) is bytes else s + diff --git a/search_engines/engines/google.py b/search_engines/engines/google.py index e390979..7599f16 100644 --- a/search_engines/engines/google.py +++ b/search_engines/engines/google.py @@ -8,10 +8,12 @@ class Google(SearchEngine): '''Searches google.com''' - def __init__(self, proxy=PROXY, timeout=TIMEOUT): + def __init__(self, proxy=PROXY, timeout=TIMEOUT, before=None, after=None): super(Google, self).__init__(proxy, timeout) self._base_url = 'https://www.google.com' self._delay = (2, 6) + self.before = before + self.after = after self.set_headers({'User-Agent':FAKE_USER_AGENT}) @@ -58,7 +60,12 @@ def _first_page(self): inputs = {i['name']:i.get('value') for i in bs.select('form input[name]') if i['name'] != 'btnI'} inputs['q'] = quote_url(self._query, '') + inputs['q'] = inputs['q']+f'+after:{self.after}+before:{self.before}' url = u'{}/search?{}'.format(self._base_url, '&'.join([k + '=' + (v or '') for k,v in inputs.items()])) + print("!!! NEW URL", url) + # if self.after and self.before: + # url = url+f'+after:{self.after}+before:{self.before}' + # print("!!! NEW URL", url) return {'url':url, 'data':None}