From 53f1e7de43346d34a63202ad9a70691d9d0fc7f9 Mon Sep 17 00:00:00 2001 From: multiflexi Date: Thu, 20 Feb 2025 16:43:37 +0100 Subject: [PATCH 1/3] async rss --- src/collectors/collectors/base_collector.py | 37 +-- src/collectors/collectors/rss_collector.py | 290 +++++++++----------- src/collectors/requirements.txt | 1 + 3 files changed, 144 insertions(+), 184 deletions(-) diff --git a/src/collectors/collectors/base_collector.py b/src/collectors/collectors/base_collector.py index 34766cb4..f7f7f7c5 100644 --- a/src/collectors/collectors/base_collector.py +++ b/src/collectors/collectors/base_collector.py @@ -1,16 +1,14 @@ """Module for Base collector.""" +import aiohttp import bleach import datetime import hashlib import pytz import re -import socks import time -import urllib.request import uuid from functools import wraps -from sockshandler import SocksiPyHandler from urllib.parse import urlparse from dateutil.parser import parse as date_parse @@ -299,27 +297,6 @@ def refresh(self): logger.exception(f"Refreshing of sources failed: {error}") pass - @staticmethod - def get_proxy_handler(parsed_proxy): - """Get the proxy handler for the collector. - - Parameters: - parsed_proxy (urlparse object): The parsed proxy URL. - collector_source (string): Collector readable name - Returns: - (object): The proxy handler for the collector. - """ - if parsed_proxy.scheme in ["http", "https"]: - return urllib.request.ProxyHandler( - { - "http": f"{parsed_proxy.scheme}://{parsed_proxy.hostname}:{parsed_proxy.port}", - "https": f"{parsed_proxy.scheme}://{parsed_proxy.hostname}:{parsed_proxy.port}", - } - ) - elif parsed_proxy.scheme in ["socks4", "socks5"]: - socks_type = socks.SOCKS5 if parsed_proxy.scheme == "socks5" else socks.SOCKS4 - return SocksiPyHandler(socks_type, parsed_proxy.hostname, int(parsed_proxy.port)) - @staticmethod def get_parsed_proxy(proxy_string, collector_source): """Get the parsed proxy URL for the collector. @@ -342,7 +319,7 @@ def get_parsed_proxy(proxy_string, collector_source): return None @staticmethod - def not_modified(collector_source, url, last_collected, opener, user_agent=None): + async def not_modified(collector_source, url, last_collected, session, user_agent=None): """Check if the content has been modified since the given date using the If-Modified-Since and Last-Modified headers. Arguments: @@ -363,11 +340,9 @@ def not_modified(collector_source, url, last_collected, opener, user_agent=None) if user_agent: headers["User-Agent"] = user_agent - request = urllib.request.Request(url, method="HEAD", headers=headers) - last_collected_str = last_collected.strftime("%Y-%m-%d %H:%M") try: - with opener(request) as response: + async with session.head(url, headers=headers) as response: last_modified = response.headers.get("Last-Modified") if response.status == 304: logger.debug(f"{collector_source} Content has not been modified since {last_collected_str}") @@ -389,11 +364,11 @@ def not_modified(collector_source, url, last_collected, opener, user_agent=None) return False else: logger.debug( - f"{collector_source} Content has been modified since {last_collected_str} " f"(Last-Modified: header not received)" + f"{collector_source} Content has been modified since {last_collected_str} (Last-Modified: header not received)" ) return False - except urllib.error.HTTPError as e: - if e.code == 304: + except aiohttp.ClientResponseError as e: + if e.status == 304: logger.debug(f"{collector_source} Content has not been modified since {last_collected_str}") return True else: diff --git a/src/collectors/collectors/rss_collector.py b/src/collectors/collectors/rss_collector.py index 6239ae69..59bff0c0 100644 --- a/src/collectors/collectors/rss_collector.py +++ b/src/collectors/collectors/rss_collector.py @@ -3,9 +3,11 @@ import datetime import feedparser import hashlib -import urllib.request import uuid from bs4 import BeautifulSoup +import aiohttp +import aiohttp_socks +import asyncio from .base_collector import BaseCollector from managers.log_manager import logger @@ -14,11 +16,7 @@ class RSSCollector(BaseCollector): - """RSS collector class. - - Arguments: - BaseCollector -- Base collector class. - """ + """RSS collector class.""" type = "RSS_COLLECTOR" config = ConfigCollector().get_config_by_type(type) @@ -30,173 +28,159 @@ class RSSCollector(BaseCollector): @BaseCollector.ignore_exceptions def collect(self, source): - """Collect data from RSS or Atom feed. - - Arguments: - source: Source object. - """ + """Collect data from RSS or Atom feed.""" def strip_html_tags(html_string): - """Strip HTML tags from the given string. - - Arguments: - html_string (string): The HTML string. - - Returns: - string: The string without HTML tags. - """ + """Strip HTML tags from the given string.""" soup = BeautifulSoup(html_string, "html.parser") return soup.get_text(separator=" ", strip=True) - def get_feed(feed_url, last_collected=None, user_agent=None, proxy_handler=None): - """Fetch the feed data, using proxy if provided, and check modification status. - - Arguments: - feed_url (string): The URL of the feed. - last_collected (string): The datetime of the last collection. - proxy_handler (SocksiPyHandler): The proxy handler to use for the request (default: None). - - Returns: - dict: The parsed feed data or an empty dictionary if not modified. - """ - - def fetch_feed(url, handler=None): - """Fetch the feed using feedparser with optional handler.""" - if user_agent: - feedparser.USER_AGENT = user_agent - if handler: - return feedparser.parse(url, handlers=[handler]) - return feedparser.parse(url) - - # Determine the opener function based on the proxy handler - opener = urllib.request.build_opener(proxy_handler).open if proxy_handler else urllib.request.urlopen - - # Check if the feed has been modified since the last collection - if last_collected: - if BaseCollector.not_modified(self.collector_source, feed_url, last_collected, opener, user_agent): - return None - - logger.debug(f"{self.collector_source} Fetching feed from URL: {feed_url}") - return fetch_feed(feed_url, proxy_handler) + async def fetch_feed(session, url): + """Fetch the feed using feedparser.""" + async with session.get(url) as response: + content = await response.text() + return feedparser.parse(content) + + async def get_feed(feed_url, last_collected=None, user_agent=None): + """Fetch the feed data, using proxy if provided, and check modification status.""" + headers = {} + if user_agent: + headers["User-Agent"] = user_agent + + connector = aiohttp_socks.ProxyConnector.from_url(proxy) if proxy else None + + async with aiohttp.ClientSession(headers=headers, connector=connector) as session: + # Check if the feed has been modified since the last collection + if last_collected: + if await BaseCollector.not_modified(self.collector_source, feed_url, last_collected, session, user_agent): + return None + + logger.debug(f"{self.collector_source} Fetching feed from URL: {feed_url}") + return await fetch_feed(session, feed_url) + + async def process_feed_entry(feed_entry, user_agent, proxy): + """Process a single feed entry asynchronously.""" + author = feed_entry.get("author", "") + title = feed_entry.get("title", "") + published = feed_entry.get("published", "") + published_parsed = feed_entry.get("published_parsed", "") + updated = feed_entry.get("updated", "") + updated_parsed = feed_entry.get("updated_parsed", "") + summary = feed_entry.get("summary", "") + content = feed_entry.get("content", "") + date = "" + review = "" + article = "" + link_for_article = feed_entry.get("link", "") + if summary: + review = strip_html_tags(summary[:500]) + if content: + article = strip_html_tags(content[0].get("value", "")) + + if not link_for_article: + logger.debug(f"{self.collector_source} Skipping an empty link in feed entry '{title}'.") + return None + elif not article: + logger.info(f"{self.collector_source} Visiting an article: {link_for_article}") + html_article = "" + try: + headers = {"User-Agent": user_agent} if user_agent else {} + connector = aiohttp_socks.ProxyConnector.from_url(proxy) if proxy else None + async with aiohttp.ClientSession(headers=headers, connector=connector) as session: + async with session.get(link_for_article) as response: + html_article = await response.text() + + soup = BeautifulSoup(html_article, features="html.parser") + + if html_article: + article_text = [p.text.strip() for p in soup.findAll("p")] + replaced_str = "\xa0" + article_sanit = [w.replace(replaced_str, " ") for w in article_text] + article_sanit = " ".join(article_sanit) + # use HTML article if it is longer than summary + if len(article_sanit) > len(summary): + article = article_sanit + logger.debug(f"{self.collector_source} Got an article: {link_for_article}") + except Exception as error: + logger.exception(f"{self.collector_source} Fetch article failed: {error}") + + # use summary if article is empty + if summary and not article: + article = strip_html_tags(summary) + logger.debug(f"{self.collector_source} Using summary for article: {article}") + # use first 500 characters of article if summary is empty + elif not summary and article: + review = article[:500] + logger.debug(f"{self.collector_source} Using first 500 characters of article for summary: {review}") + + # use published date if available, otherwise use updated date + if published_parsed: + date = datetime.datetime(*published_parsed[:6]).strftime("%d.%m.%Y - %H:%M") + logger.debug(f"{self.collector_source} Using parsed 'published' date") + elif updated_parsed: + date = datetime.datetime(*updated_parsed[:6]).strftime("%d.%m.%Y - %H:%M") + logger.debug(f"{self.collector_source} Using parsed 'updated' date") + elif published: + date = published + logger.debug(f"{self.collector_source} Using 'published' date") + elif updated: + date = updated + logger.debug(f"{self.collector_source} Using 'updated' date") + + logger.debug(f"{self.collector_source} ... Title : {title}") + logger.debug(f"{self.collector_source} ... Review : {review.replace('\r', '').replace('\n', ' ').strip()[:100]}") + logger.debug(f"{self.collector_source} ... Content : {article.replace('\r', '').replace('\n', ' ').strip()[:100]}") + logger.debug(f"{self.collector_source} ... Published: {date}") + + for_hash = author + title + link_for_article + + news_item = NewsItemData( + uuid.uuid4(), + hashlib.sha256(for_hash.encode()).hexdigest(), + title, + review, + feed_url, + link_for_article, + date, + author, + datetime.datetime.now(), + article, + source.id, + [], + ) + + return news_item feed_url = source.parameter_values["FEED_URL"] links_limit = BaseCollector.read_int_parameter("LINKS_LIMIT", 0, source) last_collected = source.last_collected user_agent = source.parameter_values["USER_AGENT"] parsed_proxy = BaseCollector.get_parsed_proxy(source.parameter_values["PROXY_SERVER"], self.collector_source) - if parsed_proxy: - proxy_handler = BaseCollector.get_proxy_handler(parsed_proxy) - else: - proxy_handler = None - opener = urllib.request.build_opener(proxy_handler).open if proxy_handler else urllib.request.urlopen + proxy = parsed_proxy.geturl() if parsed_proxy else None + if user_agent: logger.info(f"{self.collector_source} Requesting feed URL: {feed_url} (User-Agent: {user_agent})") else: logger.info(f"{self.collector_source} Requesting feed URL: {feed_url}") - feed = get_feed(feed_url, last_collected, user_agent, proxy_handler) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + feed = loop.run_until_complete(get_feed(feed_url, last_collected, user_agent)) if feed: try: logger.debug(f"{self.collector_source} Feed returned {len(feed['entries'])} entries.") - news_items = [] - - count = 0 + tasks = [] for feed_entry in feed["entries"]: - count += 1 - author = feed_entry.get("author", "") - title = feed_entry.get("title", "") - published = feed_entry.get("published", "") - published_parsed = feed_entry.get("published_parsed", "") - updated = feed_entry.get("updated", "") - updated_parsed = feed_entry.get("updated_parsed", "") - summary = feed_entry.get("summary", "") - content = feed_entry.get("content", "") - date = "" - review = "" - article = "" - link_for_article = feed_entry.get("link", "") - if summary: - review = strip_html_tags(summary[:500]) - if content: - article = strip_html_tags(content[0].get("value", "")) - - if not link_for_article: - logger.debug(f"{self.collector_source} Skipping an empty link in feed entry '{title}'.") - continue - elif not article: - logger.info(f"{self.collector_source} Visiting an article {count}/{len(feed['entries'])}: {link_for_article}") - html_article = "" - try: - request = urllib.request.Request(link_for_article) - request.add_header("User-Agent", user_agent) - - with opener(request) as response: - html_article = response.read() - - soup = BeautifulSoup(html_article, features="html.parser") - - if html_article: - article_text = [p.text.strip() for p in soup.findAll("p")] - replaced_str = "\xa0" - article_sanit = [w.replace(replaced_str, " ") for w in article_text] - article_sanit = " ".join(article_sanit) - # use HTML article if it is longer than summary - if len(article_sanit) > len(summary): - article = article_sanit - logger.debug(f"{self.collector_source} Got an article: {link_for_article}") - except Exception as error: - logger.exception(f"{self.collector_source} Fetch article failed: {error}") - - # use summary if article is empty - if summary and not article: - article = strip_html_tags(summary) - logger.debug(f"{self.collector_source} Using summary for article: {article}") - # use first 500 characters of article if summary is empty - elif not summary and article: - review = article[:500] - logger.debug(f"{self.collector_source} Using first 500 characters of article for summary: {review}") - - # use published date if available, otherwise use updated date - if published_parsed: - date = datetime.datetime(*published_parsed[:6]).strftime("%d.%m.%Y - %H:%M") - logger.debug(f"{self.collector_source} Using parsed 'published' date") - elif updated_parsed: - date = datetime.datetime(*updated_parsed[:6]).strftime("%d.%m.%Y - %H:%M") - logger.debug(f"{self.collector_source} Using parsed 'updated' date") - elif published: - date = published - logger.debug(f"{self.collector_source} Using 'published' date") - elif updated: - date = updated - logger.debug(f"{self.collector_source} Using 'updated' date") - - logger.debug(f"{self.collector_source} ... Title : {title}") - logger.debug(f"{self.collector_source} ... Review : {review.replace('\r', '').replace('\n', ' ').strip()[:100]}") - logger.debug(f"{self.collector_source} ... Content : {article.replace('\r', '').replace('\n', ' ').strip()[:100]}") - logger.debug(f"{self.collector_source} ... Published: {date}") - - for_hash = author + title + link_for_article - - news_item = NewsItemData( - uuid.uuid4(), - hashlib.sha256(for_hash.encode()).hexdigest(), - title, - review, - feed_url, - link_for_article, - date, - author, - datetime.datetime.now(), - article, - source.id, - [], - ) - - news_items.append(news_item) - - if count >= links_limit & links_limit > 0: - logger.debug(f"{self.collector_source} Limit for article links ({links_limit}) has been reached.") - break + tasks.append(process_feed_entry(feed_entry, user_agent, proxy)) + + news_items = loop.run_until_complete(asyncio.gather(*tasks)) + + # Filter out None values + news_items = [item for item in news_items if item is not None] + + if links_limit > 0: + news_items = news_items[:links_limit] BaseCollector.publish(news_items, source, self.collector_source) diff --git a/src/collectors/requirements.txt b/src/collectors/requirements.txt index b815d68e..7991657d 100644 --- a/src/collectors/requirements.txt +++ b/src/collectors/requirements.txt @@ -1,3 +1,4 @@ +aiohttp-socks==0.10.1 beautifulsoup4==4.13.3 bleach==6.2.0 dateparser==1.2.1 From 4799c3167bbc0c82a5c6d92c970e96822abb1296 Mon Sep 17 00:00:00 2001 From: multiflexi Date: Thu, 20 Feb 2025 16:49:25 +0100 Subject: [PATCH 2/3] update requirements --- src/collectors/requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/src/collectors/requirements.txt b/src/collectors/requirements.txt index 7991657d..4a95b72d 100644 --- a/src/collectors/requirements.txt +++ b/src/collectors/requirements.txt @@ -9,7 +9,6 @@ Flask-RESTful==0.3.10 gevent==24.11.1 gunicorn==23.0.0 marshmallow==3.26.1 -PySocks==1.7.1 python-dateutil==2.9.0.post0 python-dotenv==1.0.1 requests==2.32.3 From 523decf1ab18017ef466c9d77e2a891cf6a8340b Mon Sep 17 00:00:00 2001 From: multiflexi Date: Thu, 20 Feb 2025 17:03:14 +0100 Subject: [PATCH 3/3] add semaphore --- src/collectors/collectors/rss_collector.py | 186 +++++++++++---------- 1 file changed, 94 insertions(+), 92 deletions(-) diff --git a/src/collectors/collectors/rss_collector.py b/src/collectors/collectors/rss_collector.py index 59bff0c0..ec3306a6 100644 --- a/src/collectors/collectors/rss_collector.py +++ b/src/collectors/collectors/rss_collector.py @@ -58,98 +58,99 @@ async def get_feed(feed_url, last_collected=None, user_agent=None): logger.debug(f"{self.collector_source} Fetching feed from URL: {feed_url}") return await fetch_feed(session, feed_url) - async def process_feed_entry(feed_entry, user_agent, proxy): + async def process_feed_entry(feed_entry, user_agent, proxy, semaphore): """Process a single feed entry asynchronously.""" - author = feed_entry.get("author", "") - title = feed_entry.get("title", "") - published = feed_entry.get("published", "") - published_parsed = feed_entry.get("published_parsed", "") - updated = feed_entry.get("updated", "") - updated_parsed = feed_entry.get("updated_parsed", "") - summary = feed_entry.get("summary", "") - content = feed_entry.get("content", "") - date = "" - review = "" - article = "" - link_for_article = feed_entry.get("link", "") - if summary: - review = strip_html_tags(summary[:500]) - if content: - article = strip_html_tags(content[0].get("value", "")) - - if not link_for_article: - logger.debug(f"{self.collector_source} Skipping an empty link in feed entry '{title}'.") - return None - elif not article: - logger.info(f"{self.collector_source} Visiting an article: {link_for_article}") - html_article = "" - try: - headers = {"User-Agent": user_agent} if user_agent else {} - connector = aiohttp_socks.ProxyConnector.from_url(proxy) if proxy else None - async with aiohttp.ClientSession(headers=headers, connector=connector) as session: - async with session.get(link_for_article) as response: - html_article = await response.text() - - soup = BeautifulSoup(html_article, features="html.parser") - - if html_article: - article_text = [p.text.strip() for p in soup.findAll("p")] - replaced_str = "\xa0" - article_sanit = [w.replace(replaced_str, " ") for w in article_text] - article_sanit = " ".join(article_sanit) - # use HTML article if it is longer than summary - if len(article_sanit) > len(summary): - article = article_sanit - logger.debug(f"{self.collector_source} Got an article: {link_for_article}") - except Exception as error: - logger.exception(f"{self.collector_source} Fetch article failed: {error}") - - # use summary if article is empty - if summary and not article: - article = strip_html_tags(summary) - logger.debug(f"{self.collector_source} Using summary for article: {article}") - # use first 500 characters of article if summary is empty - elif not summary and article: - review = article[:500] - logger.debug(f"{self.collector_source} Using first 500 characters of article for summary: {review}") - - # use published date if available, otherwise use updated date - if published_parsed: - date = datetime.datetime(*published_parsed[:6]).strftime("%d.%m.%Y - %H:%M") - logger.debug(f"{self.collector_source} Using parsed 'published' date") - elif updated_parsed: - date = datetime.datetime(*updated_parsed[:6]).strftime("%d.%m.%Y - %H:%M") - logger.debug(f"{self.collector_source} Using parsed 'updated' date") - elif published: - date = published - logger.debug(f"{self.collector_source} Using 'published' date") - elif updated: - date = updated - logger.debug(f"{self.collector_source} Using 'updated' date") - - logger.debug(f"{self.collector_source} ... Title : {title}") - logger.debug(f"{self.collector_source} ... Review : {review.replace('\r', '').replace('\n', ' ').strip()[:100]}") - logger.debug(f"{self.collector_source} ... Content : {article.replace('\r', '').replace('\n', ' ').strip()[:100]}") - logger.debug(f"{self.collector_source} ... Published: {date}") - - for_hash = author + title + link_for_article - - news_item = NewsItemData( - uuid.uuid4(), - hashlib.sha256(for_hash.encode()).hexdigest(), - title, - review, - feed_url, - link_for_article, - date, - author, - datetime.datetime.now(), - article, - source.id, - [], - ) - - return news_item + async with semaphore: + author = feed_entry.get("author", "") + title = feed_entry.get("title", "") + published = feed_entry.get("published", "") + published_parsed = feed_entry.get("published_parsed", "") + updated = feed_entry.get("updated", "") + updated_parsed = feed_entry.get("updated_parsed", "") + summary = feed_entry.get("summary", "") + content = feed_entry.get("content", "") + date = "" + review = "" + article = "" + link_for_article = feed_entry.get("link", "") + if summary: + review = strip_html_tags(summary[:500]) + if content: + article = strip_html_tags(content[0].get("value", "")) + + if not link_for_article: + logger.debug(f"{self.collector_source} Skipping an empty link in feed entry '{title}'.") + return None + elif not article: + logger.info(f"{self.collector_source} Visiting an article: {link_for_article}") + html_article = "" + try: + headers = {"User-Agent": user_agent} if user_agent else {} + connector = aiohttp_socks.ProxyConnector.from_url(proxy) if proxy else None + async with aiohttp.ClientSession(headers=headers, connector=connector) as session: + async with session.get(link_for_article) as response: + html_article = await response.text() + + soup = BeautifulSoup(html_article, features="html.parser") + + if html_article: + article_text = [p.text.strip() for p in soup.findAll("p")] + replaced_str = "\xa0" + article_sanit = [w.replace(replaced_str, " ") for w in article_text] + article_sanit = " ".join(article_sanit) + # use HTML article if it is longer than summary + if len(article_sanit) > len(summary): + article = article_sanit + logger.debug(f"{self.collector_source} Got an article: {link_for_article}") + except Exception as error: + logger.exception(f"{self.collector_source} Fetch article failed: {error}") + + # use summary if article is empty + if summary and not article: + article = strip_html_tags(summary) + logger.debug(f"{self.collector_source} Using summary for article: {article}") + # use first 500 characters of article if summary is empty + elif not summary and article: + review = article[:500] + logger.debug(f"{self.collector_source} Using first 500 characters of article for summary: {review}") + + # use published date if available, otherwise use updated date + if published_parsed: + date = datetime.datetime(*published_parsed[:6]).strftime("%d.%m.%Y - %H:%M") + logger.debug(f"{self.collector_source} Using parsed 'published' date") + elif updated_parsed: + date = datetime.datetime(*updated_parsed[:6]).strftime("%d.%m.%Y - %H:%M") + logger.debug(f"{self.collector_source} Using parsed 'updated' date") + elif published: + date = published + logger.debug(f"{self.collector_source} Using 'published' date") + elif updated: + date = updated + logger.debug(f"{self.collector_source} Using 'updated' date") + + logger.debug(f"{self.collector_source} ... Title : {title}") + logger.debug(f"{self.collector_source} ... Review : {review.replace('\r', '').replace('\n', ' ').strip()[:100]}") + logger.debug(f"{self.collector_source} ... Content : {article.replace('\r', '').replace('\n', ' ').strip()[:100]}") + logger.debug(f"{self.collector_source} ... Published: {date}") + + for_hash = author + title + link_for_article + + news_item = NewsItemData( + uuid.uuid4(), + hashlib.sha256(for_hash.encode()).hexdigest(), + title, + review, + feed_url, + link_for_article, + date, + author, + datetime.datetime.now(), + article, + source.id, + [], + ) + + return news_item feed_url = source.parameter_values["FEED_URL"] links_limit = BaseCollector.read_int_parameter("LINKS_LIMIT", 0, source) @@ -170,9 +171,10 @@ async def process_feed_entry(feed_entry, user_agent, proxy): try: logger.debug(f"{self.collector_source} Feed returned {len(feed['entries'])} entries.") + semaphore = asyncio.Semaphore(5) # Limit the number of concurrent tasks to 5 tasks = [] for feed_entry in feed["entries"]: - tasks.append(process_feed_entry(feed_entry, user_agent, proxy)) + tasks.append(process_feed_entry(feed_entry, user_agent, proxy, semaphore)) news_items = loop.run_until_complete(asyncio.gather(*tasks))