diff --git a/alembic/env.py b/alembic/env.py index ba064ef..1b8714f 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -40,10 +40,7 @@ target_metadata = Base.metadata def get_conn_string(): - conn_str = r'mysql+pymysql://barry:Tovalu88!@192.168.1.194/reddit_dev' - return conn_str - #return f'mysql+pymysql://{bot_config.db_user}:{quote_plus(bot_config.db_password)}@{bot_config.db_host}/{bot_config.db_name}' - + return f'mysql+pymysql://{bot_config.db_user}:{quote_plus(bot_config.db_password)}@{bot_config.db_host}/{bot_config.db_name}' # other values from the config, defined by the needs of env.py, # can be acquired: diff --git a/docker-compose.yml b/docker-compose.yml index 62dc26e..11a78dd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,6 +31,22 @@ services: - CELERY_IMPORTS=redditrepostsleuth.core.celery.admin_tasks,redditrepostsleuth.core.celery.tasks.scheduled_tasks entrypoint: celery -A redditrepostsleuth.core.celery worker -Q scheduled_tasks -n scheduled_task_worker --autoscale=15,2 + subreddit_update_worker: + container_name: subreddit_update_worker + restart: unless-stopped + user: "1001" + build: + context: . + dockerfile: docker/WorkerDockerFile + env_file: + - .env + environment: + - RUN_ENV=production + - db_user=maintenance_task + - LOG_LEVEL=DEBUG + - CELERY_IMPORTS=redditrepostsleuth.core.celery.tasks.maintenance_tasks + entrypoint: celery -A redditrepostsleuth.core.celery worker -Q update_subreddit_data -n subreddit_update_worker --autoscale=1,4 + scheduler: container_name: beat_scheduler restart: unless-stopped @@ -120,7 +136,7 @@ services: - db_user=sub_monitor - LOG_LEVEL=INFO - CELERY_IMPORTS=redditrepostsleuth.core.celery.tasks.monitored_sub_tasks - entrypoint: celery -A redditrepostsleuth.core.celery worker -Q submonitor -n submonitor_worker --autoscale=6,2 + entrypoint: celery -A redditrepostsleuth.core.celery worker -Q submonitor -n submonitor_worker --autoscale=10,2 reddit_actions_worker: container_name: reddit-actions-worker diff --git a/redditrepostsleuth/adminsvc/inbox_monitor.py b/redditrepostsleuth/adminsvc/inbox_monitor.py index b4c505b..32f043e 100644 --- a/redditrepostsleuth/adminsvc/inbox_monitor.py +++ b/redditrepostsleuth/adminsvc/inbox_monitor.py @@ -36,7 +36,6 @@ def __init__( self.failed_checks = [] def check_inbox(self): - print('[Scheduled Job] Checking Inbox Start') for msg in self.reddit.inbox.messages(limit=500): if msg.author != 'RepostSleuthBot' and msg.subject.lower() in ['false negative', 'false positive']: #self._process_user_report(msg) diff --git a/redditrepostsleuth/core/celery/admin_tasks.py b/redditrepostsleuth/core/celery/admin_tasks.py index 7f5f82a..6630660 100644 --- a/redditrepostsleuth/core/celery/admin_tasks.py +++ b/redditrepostsleuth/core/celery/admin_tasks.py @@ -5,21 +5,13 @@ import pymysql from celery import Task -from prawcore import TooManyRequests -from requests.exceptions import ConnectionError -from sqlalchemy import func -from sqlalchemy.exc import IntegrityError from redditrepostsleuth.core.celery import celery from redditrepostsleuth.core.celery.basetasks import AdminTask from redditrepostsleuth.core.config import Config -from redditrepostsleuth.core.db.databasemodels import MonitoredSub, Post, UserReview -from redditrepostsleuth.core.exception import UtilApiException, UserNotFound +from redditrepostsleuth.core.db.databasemodels import MonitoredSub, Post from redditrepostsleuth.core.logfilters import ContextFilter from redditrepostsleuth.core.logging import log, configure_logger -from redditrepostsleuth.core.util.helpers import chunk_list -from redditrepostsleuth.core.util.onlyfans_handling import check_user_for_promoter_links, \ - check_user_comments_for_promoter_links log = configure_logger( name='redditrepostsleuth', @@ -126,88 +118,5 @@ def update_subreddit_config_from_database(self, monitored_sub: MonitoredSub, use ) -@celery.task(bind=True, base=AdminTask, autoretry_for=(UtilApiException,ConnectionError,TooManyRequests), retry_kwards={'max_retries': 3, 'retry_backoff': True}) -def check_user_for_only_fans(self, username: str) -> None: - skip_names = ['[deleted]', 'AutoModerator'] - - if username in skip_names: - log.info('Skipping name %s', username) - return - - try: - with self.uowm.start() as uow: - user = uow.user_review.get_by_username(username) - - if user: - delta = datetime.utcnow() - user.last_checked - if delta.days < 30: - log.info('Skipping existing user %s, last check was %s days ago', username, delta.days) - return - user.content_links_found = False - user.notes = None - user.last_checked = func.utc_timestamp() - - log.info('Checking user %s', username) - if not user: - user = UserReview(username=username) - try: - result = check_user_for_promoter_links(username) - except UserNotFound as e: - log.warning(e) - return - - if result: - log.info('Promoter found: %s - %s', username, str(result)) - user.content_links_found = True - user.notes = str(result) - uow.user_review.add(user) - uow.commit() - except (UtilApiException, ConnectionError, TooManyRequests) as e: - raise e - except IntegrityError: - pass - except Exception as e: - log.exception('') - - -@celery.task(bind=True, base=AdminTask, autoretry_for=(UtilApiException,ConnectionError,TooManyRequests), retry_kwards={'max_retries': 3}) -def check_user_comments_for_only_fans(self, username: str) -> None: - """ - This should be run after the profile check so we don't do any timeframe checking - :param self: - :param username: - :return: - """ - skip_names = ['[deleted]', 'AutoModerator'] - - if username in skip_names: - log.info('Skipping name %s', username) - return - - try: - with self.uowm.start() as uow: - user = uow.user_review.get_by_username(username) - - if not user: - log.error('User not found: %s', username) - - try: - result = check_user_comments_for_promoter_links(username) - except UserNotFound as e: - log.warning(e) - return - - if result: - log.info('Promoter found: %s - %s', username, str(result)) - user.content_links_found = True - user.notes = str(result) - uow.user_review.add(user) - uow.commit() - except (UtilApiException, ConnectionError, TooManyRequests) as e: - raise e - except IntegrityError: - pass - except Exception as e: - log.exception('') diff --git a/redditrepostsleuth/core/celery/basetasks.py b/redditrepostsleuth/core/celery/basetasks.py index fed1eeb..e1e3a4a 100644 --- a/redditrepostsleuth/core/celery/basetasks.py +++ b/redditrepostsleuth/core/celery/basetasks.py @@ -1,3 +1,5 @@ +import datetime + from celery import Task from redditrepostsleuth.core.config import Config diff --git a/redditrepostsleuth/core/celery/celeryconfig.py b/redditrepostsleuth/core/celery/celeryconfig.py index c1db7d5..843c198 100644 --- a/redditrepostsleuth/core/celery/celeryconfig.py +++ b/redditrepostsleuth/core/celery/celeryconfig.py @@ -16,6 +16,7 @@ task_routes = { 'redditrepostsleuth.core.celery.tasks.ingest_tasks.save_new_post': {'queue': 'post_ingest'}, 'redditrepostsleuth.core.celery.tasks.ingest_tasks.save_new_posts': {'queue': 'post_ingest'}, + 'redditrepostsleuth.core.celery.tasks.ingest_tasks.save_subreddit': {'queue': 'save_subreddit'}, 'redditrepostsleuth.core.celery.tasks.ingest_tasks.ingest_repost_check': {'queue': 'repost'}, 'redditrepostsleuth.core.celery.tasks.repost_tasks.check_image_repost_save': {'queue': 'repost_image'}, 'redditrepostsleuth.core.celery.tasks.repost_tasks.link_repost_check': {'queue': 'repost_link'}, @@ -29,8 +30,10 @@ 'redditrepostsleuth.core.celery.admin_tasks.update_proxies_job': {'queue': 'scheduled_tasks'}, 'redditrepostsleuth.core.celery.admin_tasks.check_user_for_only_fans': {'queue': 'onlyfans_check'}, 'redditrepostsleuth.core.celery.admin_tasks.update_subreddit_config_from_database': {'queue': 'update_wiki_from_database'}, - 'redditrepostsleuth.core.celery.admin_tasks.delete_search_batch': {'queue': 'batch_delete_searches'}, + #'redditrepostsleuth.core.celery.admin_tasks.delete_search_batch': {'queue': 'batch_delete_searches'}, 'redditrepostsleuth.core.celery.tasks.reddit_action_tasks.*': {'queue': 'reddit_actions'}, + 'redditrepostsleuth.core.celery.tasks.maintenance_tasks.update_subreddit_data': {'queue': 'update_subreddit_data'}, + 'redditrepostsleuth.core.celery.tasks.maintenance_tasks.save_subreddit': {'queue': 'update_subreddit_data'} } diff --git a/redditrepostsleuth/core/celery/task_logic/scheduled_task_logic.py b/redditrepostsleuth/core/celery/task_logic/scheduled_task_logic.py index 8bbabc0..043aafe 100644 --- a/redditrepostsleuth/core/celery/task_logic/scheduled_task_logic.py +++ b/redditrepostsleuth/core/celery/task_logic/scheduled_task_logic.py @@ -4,6 +4,7 @@ import os import sys import time +from datetime import datetime import jwt import redis @@ -62,22 +63,25 @@ def update_proxies(uowm: UnitOfWorkManager) -> None: def update_top_reposts(uow: UnitOfWork, post_type_id: int, day_range: int = None): # reddit.info(reddit_ids_to_lookup): log.info('Getting top repostors for post type %s with range %s', post_type_id, day_range) - range_query = "SELECT repost_of_id, COUNT(*) c FROM repost WHERE detected_at > NOW() - INTERVAL :days DAY AND post_type_id=:posttype GROUP BY repost_of_id HAVING c > 5 ORDER BY c DESC" - all_time_query = "SELECT repost_of_id, COUNT(*) c FROM repost WHERE post_type_id=:posttype GROUP BY repost_of_id HAVING c > 5 ORDER BY c DESC" + range_query = "SELECT repost_of_id, COUNT(*) c FROM repost WHERE detected_at > NOW() - INTERVAL :days DAY AND post_type_id=:posttype GROUP BY repost_of_id HAVING c > 5 ORDER BY c DESC LIMIT 100000" + all_time_query = "SELECT repost_of_id, COUNT(*) c FROM repost WHERE post_type_id=:posttype GROUP BY repost_of_id HAVING c > 5 ORDER BY c DESC LIMIT 100000" if day_range: query = range_query + log.debug('Deleting top reposts for day range %s', day_range) uow.session.execute(text('DELETE FROM stat_top_repost WHERE post_type_id=:posttype AND day_range=:days'), {'posttype': post_type_id, 'days': day_range}) else: query = all_time_query + log.debug('Deleting all top reposts for all time') uow.session.execute(text('DELETE FROM stat_top_repost WHERE post_type_id=:posttype AND day_range IS NULL'), {'posttype': post_type_id}) uow.commit() - + log.debug('Executing query for day range %s', day_range) result = uow.session.execute(text(query), {'posttype': post_type_id, 'days': day_range}) + log.debug('Finished executing query for day range %s', day_range) for row in result: stat = StatsTopRepost() stat.post_id = row[0] @@ -98,8 +102,8 @@ def run_update_top_reposts(uow: UnitOfWork) -> None: def update_top_reposters(uow: UnitOfWork, post_type_id: int, day_range: int = None) -> None: log.info('Getting top repostors for post type %s with range %s', post_type_id, day_range) - range_query = "SELECT author, COUNT(*) c FROM repost WHERE detected_at > NOW() - INTERVAL :days DAY AND post_type_id=:posttype AND author is not NULL AND author!= '[deleted]' GROUP BY author HAVING c > 10 ORDER BY c DESC" - all_time_query = "SELECT author, COUNT(*) c FROM repost WHERE post_type_id=:posttype AND author is not NULL AND author!= '[deleted]' GROUP BY author HAVING c > 10 ORDER BY c DESC" + range_query = "SELECT author, COUNT(*) c FROM repost WHERE detected_at > NOW() - INTERVAL :days DAY AND post_type_id=:posttype AND author is not NULL AND author!= '[deleted]' GROUP BY author HAVING c > 10 ORDER BY c DESC LIMIT 100000" + all_time_query = "SELECT author, COUNT(*) c FROM repost WHERE post_type_id=:posttype AND author is not NULL AND author!= '[deleted]' GROUP BY author HAVING c > 10 ORDER BY c DESC LIMIT 100000" if day_range: query = range_query else: @@ -123,14 +127,20 @@ def update_top_reposters(uow: UnitOfWork, post_type_id: int, day_range: int = No stat.repost_count = row[1] stat.updated_at = func.utc_timestamp() uow.stat_top_reposter.add(stat) - uow.commit() + uow.commit() + + log.info('finished') def run_update_top_reposters(uow: UnitOfWork): post_types = [1, 2, 3] day_ranges = [1, 7, 14, 30, None] + log.warning('Starting update to reposters task') + start_time = datetime.utcnow() for post_type_id in post_types: for days in day_ranges: update_top_reposters(uow, post_type_id, days) + delta = (datetime.utcnow() - start_time) + log.warning('Top reposters: Type=%s days=%s time=%s', post_type_id, days, delta) def token_checker() -> None: @@ -229,12 +239,12 @@ def update_monitored_sub_data( monitored_sub.is_private = True if subreddit.subreddit_type == 'private' else False monitored_sub.nsfw = True if subreddit.over18 else False - log.info('[Subscriber Update] %s: %s subscribers', monitored_sub.name, monitored_sub.subscribers) + log.debug('[Subscriber Update] %s: %s subscribers', monitored_sub.name, monitored_sub.subscribers) perms = get_bot_permissions(subreddit) if monitored_sub.is_mod else [] monitored_sub.post_permission = True if 'all' in perms or 'posts' in perms else None monitored_sub.wiki_permission = True if 'all' in perms or 'wiki' in perms else None - log.info('[Mod Check] %s | Post Perm: %s | Wiki Perm: %s', monitored_sub.name, monitored_sub.post_permission, + log.debug('[Mod Check] %s | Post Perm: %s | Wiki Perm: %s', monitored_sub.name, monitored_sub.post_permission, monitored_sub.wiki_permission) diff --git a/redditrepostsleuth/core/celery/tasks/ingest_tasks.py b/redditrepostsleuth/core/celery/tasks/ingest_tasks.py index 8d52b6a..e2cf736 100644 --- a/redditrepostsleuth/core/celery/tasks/ingest_tasks.py +++ b/redditrepostsleuth/core/celery/tasks/ingest_tasks.py @@ -2,7 +2,9 @@ import random from dataclasses import dataclass from datetime import datetime, timedelta +from time import perf_counter from typing import Optional +from urllib.parse import urlparse import requests from celery import Task @@ -13,6 +15,7 @@ from redditrepostsleuth.core.celery.basetasks import SqlAlchemyTask from redditrepostsleuth.core.celery.task_logic.ingest_task_logic import pre_process_post, get_redgif_image_url from redditrepostsleuth.core.config import Config +from redditrepostsleuth.core.db.databasemodels import Subreddit from redditrepostsleuth.core.db.db_utils import get_db_engine from redditrepostsleuth.core.db.uow.unitofworkmanager import UnitOfWorkManager from redditrepostsleuth.core.exception import InvalidImageUrlException, GalleryNotProcessed, ImageConversionException, \ @@ -41,9 +44,44 @@ def __init__(self): self._proxy_manager = ProxyManager(self.uowm, 1000) self.domains_to_proxy = [] +@celery.task(bind=True, base=IngestTask, ignore_reseults=True, serializer='pickle') +def save_subreddit(self, subreddit_name: str): + try: + with self.uowm.start() as uow: + existing = uow.subreddit.get_by_name(subreddit_name) + if existing: + log.debug('Subreddit %s already exists', subreddit_name) + return + subreddit = Subreddit(name=subreddit_name) + uow.subreddit.add(subreddit) + uow.commit() + log.debug('Saved Subreddit %s', subreddit_name) + celery.send_task('redditrepostsleuth.core.celery.tasks.maintenance_tasks.update_subreddit_data', args=[subreddit_name]) + except Exception as e: + log.exception() + @celery.task(bind=True, base=IngestTask, ignore_reseults=True, serializer='pickle', autoretry_for=(ConnectionError,ImageConversionException,GalleryNotProcessed, HTTPException), retry_kwargs={'max_retries': 10, 'countdown': 300}) def save_new_post(self, submission: dict, repost_check: bool = True): + start_time = perf_counter() + save_event = { + 'measurement': 'Post_Ingest', + #'time': datetime.utcnow().timestamp(), + 'fields': { + 'run_time': None, + 'post_id': submission.get('id', None) + }, + 'tags': { + 'post_type': None, + 'domain': None + } + } + + # Adding for timing in Grafana + url = submission.get('url', None) + if url: + save_event['tags']['domain'] = urlparse(url).netloc + # TODO: temp fix until I can fix imgur gifs if 'imgur' in submission['url'] and 'gifv' in submission['url']: return @@ -84,6 +122,10 @@ def save_new_post(self, submission: dict, repost_check: bool = True): log.exception('Database save failed: %s', str(e), exc_info=False) return + save_event['fields']['run_time'] = perf_counter() - start_time + save_event['tags']['post_type'] = post.post_type_id + self.event_logger.write_raw_points([save_event]) + if repost_check: if post.post_type_id == 1: pass @@ -93,7 +135,7 @@ def save_new_post(self, submission: dict, repost_check: bool = True): elif post.post_type_id == 3: celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.link_repost_check', args=[post]) - #celery.send_task('redditrepostsleuth.core.celery.admin_tasks.check_user_for_only_fans', args=[post.author]) + celery.send_task('redditrepostsleuth.core.celery.tasks.maintenance_tasks.save_subreddit', args=[post.subreddit]) @@ -102,6 +144,7 @@ def save_new_posts(posts: list[dict], repost_check: bool = True) -> None: for post in posts: save_new_post.apply_async((post, repost_check)) + @celery.task(bind=True, base=SqlAlchemyTask, ignore_results=True) def save_pushshift_results(self, data): with self.uowm.start() as uow: diff --git a/redditrepostsleuth/core/celery/tasks/maintenance_tasks.py b/redditrepostsleuth/core/celery/tasks/maintenance_tasks.py new file mode 100644 index 0000000..2ffb501 --- /dev/null +++ b/redditrepostsleuth/core/celery/tasks/maintenance_tasks.py @@ -0,0 +1,52 @@ +import datetime + +import requests + +from redditrepostsleuth.core.celery import celery +from redditrepostsleuth.core.celery.basetasks import SqlAlchemyTask +from redditrepostsleuth.core.db.databasemodels import Subreddit +from redditrepostsleuth.core.exception import UtilApiException +from redditrepostsleuth.core.logging import configure_logger + +log = configure_logger( + name='redditrepostsleuth', +) + + +@celery.task(bind=True, base=SqlAlchemyTask, autoretry_for=(UtilApiException,), retry_kwards={'max_retries': 50, 'countdown': 600}) +def update_subreddit_data(self, subreddit_name) -> None: + try: + with self.uowm.start() as uow: + subreddit = uow.subreddit.get_by_name(subreddit_name) + url_to_fetch = f'{self.config.util_api}/reddit/subreddit?name={subreddit.name}' + res = requests.get(url_to_fetch) + if res.status_code != 200: + log.error('Bad status %s from util API when checking subreddit %s', res.status_code, subreddit.name) + raise UtilApiException(f'Bad status {res.status_code} checking {subreddit_name}') + + subreddit_data = res.json()['data'] + subreddit.subscribers = subreddit_data['subscribers'] or 0 + subreddit.nsfw = subreddit_data['over18'] or False + subreddit.last_checked = datetime.datetime.now(datetime.UTC) + uow.commit() + log.debug('Update subreddit data for %s. NSFW: %s - Subscribers: %s', subreddit.name, subreddit.nsfw, subreddit.subscribers) + except UtilApiException as e: + raise e + except Exception as e: + log.exception('') + +@celery.task(bind=True, base=SqlAlchemyTask, ignore_reseults=True, serializer='pickle') +def save_subreddit(self, subreddit_name: str): + try: + with self.uowm.start() as uow: + existing = uow.subreddit.get_by_name(subreddit_name) + if existing: + log.debug('Subreddit %s already exists', subreddit_name) + return + subreddit = Subreddit(name=subreddit_name) + uow.subreddit.add(subreddit) + uow.commit() + log.debug('Saved Subreddit %s', subreddit_name) + update_subreddit_data.apply_async((subreddit_name,)) + except Exception as e: + log.exception('') \ No newline at end of file diff --git a/redditrepostsleuth/core/celery/tasks/repost_tasks.py b/redditrepostsleuth/core/celery/tasks/repost_tasks.py index 4f9716c..810555f 100644 --- a/redditrepostsleuth/core/celery/tasks/repost_tasks.py +++ b/redditrepostsleuth/core/celery/tasks/repost_tasks.py @@ -120,4 +120,4 @@ def notify_watch(self, watches: list[dict[SearchMatch, RepostWatch]], repost: Po try: uow.commit() except Exception as e: - log.exception('Failed to save repost watch %s', w['watch'].id, exc_info=True) \ No newline at end of file + log.exception('Failed to save repost watch %s', w['watch'].id, exc_info=True) diff --git a/redditrepostsleuth/core/celery/tasks/scheduled_tasks.py b/redditrepostsleuth/core/celery/tasks/scheduled_tasks.py index 8cd9f7f..bf77ed8 100644 --- a/redditrepostsleuth/core/celery/tasks/scheduled_tasks.py +++ b/redditrepostsleuth/core/celery/tasks/scheduled_tasks.py @@ -1,4 +1,8 @@ -from praw.exceptions import PRAWException +import datetime +from functools import wraps +from time import perf_counter + +import requests from prawcore import TooManyRequests, Redirect, ServerError, NotFound from redditrepostsleuth.adminsvc.bot_comment_monitor import BotCommentMonitor @@ -8,24 +12,71 @@ from redditrepostsleuth.adminsvc.new_activation_monitor import NewActivationMonitor from redditrepostsleuth.core.celery import celery from redditrepostsleuth.core.celery.basetasks import RedditTask, SqlAlchemyTask, AdminTask -from redditrepostsleuth.core.celery.task_logic.scheduled_task_logic import update_proxies, update_top_reposts, \ - token_checker, run_update_top_reposters, update_top_reposters, update_monitored_sub_data, run_update_top_reposts -from redditrepostsleuth.core.db.databasemodels import MonitoredSub, StatsDailyCount +from redditrepostsleuth.core.celery.task_logic.scheduled_task_logic import update_proxies, token_checker, \ + run_update_top_reposters, update_top_reposters, update_monitored_sub_data, run_update_top_reposts +from redditrepostsleuth.core.db.databasemodels import StatsDailyCount +from redditrepostsleuth.core.exception import UtilApiException from redditrepostsleuth.core.logging import configure_logger from redditrepostsleuth.core.util.helpers import chunk_list -from redditrepostsleuth.core.util.reddithelpers import is_sub_mod_praw, get_bot_permissions -from redditrepostsleuth.core.util.replytemplates import MONITORED_SUB_MOD_REMOVED_CONTENT, \ - MONITORED_SUB_MOD_REMOVED_SUBJECT log = configure_logger( name='redditrepostsleuth', ) +def get_task_influx_points(task_name: str, task_status: str, task_runtime: float): + return ([ + { + 'measurement': 'Scheduled_Task_Updates', + 'time': datetime.datetime.now(datetime.UTC), + 'fields': { + 'run_time': task_runtime + }, + 'tags': { + 'task_name': task_name, + 'task_status': task_status + } + } + ]) + +def record_task_status(func): + @wraps(func) + def dec(self, *args, **kwargs): + if not hasattr(self, 'event_logger'): + log.warning('Task class %s does not have an event logger, cannot record task time', self.__name__) + return func(self, *args, **kwargs) + + try: + task_start = perf_counter() + self.event_logger.write_raw_points( + get_task_influx_points( + func.__name__, + 'started', + 0.0, + ) + ) + func(self, *args, **kwargs) + task_status = 'finished' + + except Exception as e: + log.exception('') + task_status = 'failed' + + self.event_logger.write_raw_points( + get_task_influx_points( + func.__name__, + task_status, + perf_counter() - task_start, + ) + ) + return dec + + @celery.task(bind=True, base=RedditTask) +@record_task_status def check_inbox_task(self) -> None: - print("Checking inbox") log.info('Scheduled Task: Check Inbox') + inbox_monitor = InboxMonitor(self.uowm, self.reddit, self.response_handler) try: inbox_monitor.check_inbox() @@ -35,9 +86,11 @@ def check_inbox_task(self) -> None: log.exception('Failed to update subreddit stats') + @celery.task(bind=True, base=RedditTask) +@record_task_status def check_new_activations_task(self) -> None: - log.info('Scheduled Task: Checking For Activations') + log.debug('Scheduled Task: Checking For Activations') activation_monitor = NewActivationMonitor( self.uowm, self.reddit, @@ -53,7 +106,9 @@ def check_new_activations_task(self) -> None: @celery.task(bind=True, base=RedditTask) +@record_task_status def check_comments_for_downvotes_task(self) -> None: + # TODO: Remove, no longer used log.info('Scheduled Task: Check Comment Downvotes') comment_monitor = BotCommentMonitor(self.reddit, self.uowm, self.config, notification_svc=self.notification_svc) try: @@ -62,6 +117,7 @@ def check_comments_for_downvotes_task(self) -> None: log.exception('Failed to update subreddit stats') @celery.task(bind=True, base=RedditTask) +@record_task_status def update_ban_list_task(self) -> None: """ Go through banned subs and see if we're still banned @@ -74,8 +130,9 @@ def update_ban_list_task(self) -> None: @celery.task(bind=True, base=RedditTask) +@record_task_status def update_monitored_sub_data_task(self) -> None: - log.info('Starting Job: Update Subreddit Data') + log.debug('Starting Job: Update Subreddit Data') try: with self.uowm.start() as uow: subs = uow.monitored_sub.get_all() @@ -86,6 +143,7 @@ def update_monitored_sub_data_task(self) -> None: @celery.task(bind=True, base=RedditTask) +@record_task_status def remove_expired_bans_task(self) -> None: log.info('Starting Job: Remove Expired Bans') try: @@ -93,18 +151,8 @@ def remove_expired_bans_task(self) -> None: except Exception as e: log.exception('Scheduled Task Failed: Update Mod Status') - -@celery.task(bind=True, base=RedditTask) -def update_top_image_reposts_task(self) -> None: - # TODO: Remove - log.info('Starting Job: Remove Expired Bans') - try: - update_stat_top_image_repost(self.uowm, self.reddit) - except Exception as e: - log.exception('Problem in scheduled task') - - @celery.task(bind=True, base=RedditTask) +@record_task_status def send_reports_to_meme_voting_task(self): log.info('Starting Job: Reports to meme voting') try: @@ -114,6 +162,7 @@ def send_reports_to_meme_voting_task(self): @celery.task(bind=True, base=RedditTask) +@record_task_status def check_meme_template_potential_votes_task(self): log.info('Starting Job: Meme Template Vote') try: @@ -121,7 +170,9 @@ def check_meme_template_potential_votes_task(self): except Exception as e: log.exception('Problem in scheduled task') + @celery.task(bind=True, base=AdminTask, autoretry_for=(TooManyRequests,), retry_kwards={'max_retries': 3}) +@record_task_status def check_for_subreddit_config_update_task(self, subreddit_name: str) -> None: with self.uowm.start() as uow: @@ -142,7 +193,12 @@ def check_for_subreddit_config_update_task(self, subreddit_name: str) -> None: log.exception('') @celery.task(bind=True, base=RedditTask) +@record_task_status def queue_config_updates_task(self): + """ + Goes through each registered subreddit and queues a job to check their Wiki config for updates + :param self: + """ log.info('Starting Job: Config Update Check') try: print('[Scheduled Job] Queue config update check') @@ -158,6 +214,7 @@ def queue_config_updates_task(self): @celery.task(bind=True, base=SqlAlchemyTask) +@record_task_status def update_daily_stats(self): log.info('[Daily Stat Update] Started') daily_stats = StatsDailyCount() @@ -180,6 +237,7 @@ def update_daily_stats(self): @celery.task(bind=True, base=SqlAlchemyTask) +@record_task_status def update_all_top_reposts_task(self): try: with self.uowm.start() as uow: @@ -188,6 +246,7 @@ def update_all_top_reposts_task(self): log.exception('Unknown task error') @celery.task(bind=True, base=SqlAlchemyTask) +@record_task_status def update_all_top_reposters_task(self): try: with self.uowm.start() as uow: @@ -196,6 +255,7 @@ def update_all_top_reposters_task(self): log.exception('Unknown task error') @celery.task(bind=True, base=SqlAlchemyTask) +@record_task_status def update_daily_top_reposters_task(self): post_types = [1, 2, 3] try: @@ -207,6 +267,7 @@ def update_daily_top_reposters_task(self): @celery.task(bind=True, base=RedditTask, autoretry_for=(TooManyRequests,), retry_kwards={'max_retries': 3}) +@record_task_status def update_monitored_sub_stats_task(self, sub_name: str) -> None: try: with self.uowm.start() as uow: @@ -225,6 +286,7 @@ def update_monitored_sub_stats_task(self, sub_name: str) -> None: log.exception('') @celery.task(bind=True, base=SqlAlchemyTask) +@record_task_status def update_proxies_task(self) -> None: log.info('Starting proxy update') try: @@ -233,12 +295,9 @@ def update_proxies_task(self) -> None: except Exception as e: log.exception('Failed to update proxies') -@celery.task -def update_profile_token_task(): - print('Staring token checker') - token_checker() @celery.task(bind=True, base=SqlAlchemyTask) +@record_task_status def delete_search_batch(self, ids: list[int]): try: with self.uowm.start() as uow: @@ -254,6 +313,7 @@ def delete_search_batch(self, ids: list[int]): log.exception('') @celery.task(bind=True, base=SqlAlchemyTask) +@record_task_status def queue_search_history_cleanup(self): with self.uowm.start() as uow: searches = uow.repost_search.get_all_ids_older_than_days(120, limit=100000000) @@ -263,4 +323,13 @@ def queue_search_history_cleanup(self): log.info('Queuing Search History Cleanup. Range: ID Range: %s:%s', searches[0].id, searches[-1].id) ids = [x[0] for x in searches] for chunk in chunk_list(ids, 5000): - delete_search_batch.apply_async((chunk,), queue='batch_delete_searches') \ No newline at end of file + delete_search_batch.apply_async((chunk,)) + +@celery.task(bind=True, base=RedditTask, autoretry_for=(UtilApiException,), retry_kwards={'max_retries': 5}) +@record_task_status +def queue_subreddit_data_updates(self) -> None: + with self.uowm.start() as uow: + subreddits_to_update = uow.subreddit.get_subreddits_to_update() + for subreddit in subreddits_to_update: + celery.send_task('redditrepostsleuth.core.celery.tasks.maintenance_tasks.save_subreddit', + args=[subreddit.name]) \ No newline at end of file diff --git a/redditrepostsleuth/core/config.py b/redditrepostsleuth/core/config.py index 9d39502..7974ab8 100644 --- a/redditrepostsleuth/core/config.py +++ b/redditrepostsleuth/core/config.py @@ -176,6 +176,7 @@ def _initialize_attributes(self): 'wiki_config_name', 'index_api', 'util_api', + 'embedding_api', 'live_responses', 'top_post_offer_watch', 'ocr_east_model', diff --git a/redditrepostsleuth/core/db/databasemodels.py b/redditrepostsleuth/core/db/databasemodels.py index 6adb105..72d55cd 100644 --- a/redditrepostsleuth/core/db/databasemodels.py +++ b/redditrepostsleuth/core/db/databasemodels.py @@ -262,6 +262,7 @@ class Repost(Base): Index('idx_repost_by_type', 'post_type_id', 'detected_at', unique=False), Index('idx_repost_of_date', 'author', 'detected_at',unique=False), Index('idx_repost_by_subreddit', 'subreddit', 'post_type_id', 'detected_at', unique=False), + Index('idx_repost_by_author', 'author', unique=False), ) id = Column(Integer, primary_key=True) post_id = Column(Integer, ForeignKey('post.id')) @@ -758,4 +759,19 @@ class UserReview(Base): content_links_found = Column(Boolean, default=False) added_at = Column(DateTime, default=func.utc_timestamp(), nullable=False) notes = Column(String(150)) - last_checked = Column(DateTime, default=func.utc_timestamp()) \ No newline at end of file + last_checked = Column(DateTime, default=func.utc_timestamp()) + +class Subreddit(Base): + __tablename__ = 'subreddit' + __table_args__ = ( + Index('idx_subreddit_name', 'name'), + ) + id = Column(Integer, primary_key=True) + name = Column(String(25), nullable=False, unique=True) + subscribers = Column(Integer, nullable=False, default=0) + nsfw = Column(Boolean, nullable=False, default=False) + added_at = Column(DateTime, default=func.utc_timestamp(), nullable=False) + bot_banned = Column(Boolean, nullable=False, default=False) + bot_banned_at = Column(DateTime) + last_ban_check = Column(DateTime) + last_checked = Column(DateTime) \ No newline at end of file diff --git a/redditrepostsleuth/core/db/repository/repost_repo.py b/redditrepostsleuth/core/db/repository/repost_repo.py index cf465f1..f691be2 100644 --- a/redditrepostsleuth/core/db/repository/repost_repo.py +++ b/redditrepostsleuth/core/db/repository/repost_repo.py @@ -19,6 +19,9 @@ def get_all(self, limit: int = None, offset: int = None) -> List[Repost]: def get_all_by_type(self, post_type_id: int, limit: None, offset: None) -> list[Repost]: return self.db_session.query(Repost).filter(Repost.post_type_id == post_type_id).order_by(Repost.id.desc()).offset(offset).limit(limit).all() + def get_by_author(self, author: str) -> List[Repost]: + return self.db_session.query(Repost).filter(Repost.author == author).all() + def get_all_without_author(self, limit: int = None, offset: int = None): return self.db_session.query(Repost).filter(Repost.author == None).order_by(Repost.id.desc()).offset(offset).limit(limit).all() diff --git a/redditrepostsleuth/core/db/repository/subreddit_repo.py b/redditrepostsleuth/core/db/repository/subreddit_repo.py new file mode 100644 index 0000000..fe885fa --- /dev/null +++ b/redditrepostsleuth/core/db/repository/subreddit_repo.py @@ -0,0 +1,20 @@ +import datetime + +from sqlalchemy import or_ + +from redditrepostsleuth.core.db.databasemodels import Subreddit + + +class SubredditRepo: + def __init__(self, db_session): + self.db_session = db_session + + def add(self, item): + self.db_session.add(item) + + def get_by_name(self, name: str): + return self.db_session.query(Subreddit).filter(Subreddit.name == name).first() + + def get_subreddits_to_update(self, limit: int = None, offset: int = None) -> list[Subreddit]: + delta = datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=3) + return self.db_session.query(Subreddit).filter(or_(Subreddit.added_at < delta, Subreddit.last_checked == None)).limit(limit).offset(offset).all() \ No newline at end of file diff --git a/redditrepostsleuth/core/db/uow/unitofwork.py b/redditrepostsleuth/core/db/uow/unitofwork.py index 0aac0ed..dbdb571 100644 --- a/redditrepostsleuth/core/db/uow/unitofwork.py +++ b/redditrepostsleuth/core/db/uow/unitofwork.py @@ -1,5 +1,6 @@ from sqlalchemy.orm import scoped_session +from redditrepostsleuth.core.db.databasemodels import Subreddit from redditrepostsleuth.core.db.repository.banned_subreddit_repo import BannedSubredditRepo from redditrepostsleuth.core.db.repository.banned_user_repo import BannedUserRepo from redditrepostsleuth.core.db.repository.bot_private_message_repo import BotPrivateMessageRepo @@ -27,6 +28,7 @@ from redditrepostsleuth.core.db.repository.stat_daily_count_repo import StatDailyCountRepo from redditrepostsleuth.core.db.repository.stat_top_repost_repo import StatTopRepostRepo from redditrepostsleuth.core.db.repository.stats_top_reposter_repo import StatTopReposterRepo +from redditrepostsleuth.core.db.repository.subreddit_repo import SubredditRepo from redditrepostsleuth.core.db.repository.summonsrepository import SummonsRepository from redditrepostsleuth.core.db.repository.user_report_repo import UserReportRepo from redditrepostsleuth.core.db.repository.user_review_repo import UserReviewRepo @@ -175,4 +177,8 @@ def post_type(self) -> PostTypeRepo: @property def user_whitelist(self) -> UserWhitelistRepo: - return UserWhitelistRepo(self.session) \ No newline at end of file + return UserWhitelistRepo(self.session) + + @property + def subreddit(self) -> SubredditRepo: + return SubredditRepo(self.session) \ No newline at end of file diff --git a/redditrepostsleuth/core/services/eventlogging.py b/redditrepostsleuth/core/services/eventlogging.py index 2f179d7..04a40db 100644 --- a/redditrepostsleuth/core/services/eventlogging.py +++ b/redditrepostsleuth/core/services/eventlogging.py @@ -22,7 +22,7 @@ def __init__(self, config: Config = None): client = InfluxDBClient( url=f'http://{self._config.influx_host}:{self._config.influx_port}', token=self._config.influx_token, - org=self._config.influx_org, + org=self._config.influx_org ) self._influx_client = client.write_api(write_options=SYNCHRONOUS) @@ -82,4 +82,12 @@ def _write_to_influx(self, event: InfluxEvent) -> bool: self._unsaved_events.append(event) log.error('Failed To Write To InfluxDB', exc_info=True) log.error(event.get_influx_event()) - return False \ No newline at end of file + return False + + def write_raw_points(self, points: list[dict]): + try: + self._influx_client.write(bucket=self._config.influx_bucket, record=points) + except Exception as e: + log.exception('Failed to write to Influx') + + #log.info('Wrote Influx: %s', points) \ No newline at end of file diff --git a/redditrepostsleuth/core/util/onlyfans_handling.py b/redditrepostsleuth/core/util/onlyfans_handling.py index fe5faf0..d599907 100644 --- a/redditrepostsleuth/core/util/onlyfans_handling.py +++ b/redditrepostsleuth/core/util/onlyfans_handling.py @@ -143,8 +143,8 @@ def get_profile_links(username: str) -> list[str]: log.info('No token to cehck user with') return [] else: - log.warning('Non 200 return code %s from Util API', response.status_code) - raise UtilApiException(f'Unexpected status {response.status_code} from util API') + log.warning('Non 200 return code %s from Util API: %s', response.status_code, response.text) + return [] def check_user_for_promoter_links(username: str, reddit: Reddit) -> Optional[LinkCheckResult]: @@ -206,6 +206,9 @@ def get_links_from_comments(username: str) -> list[str]: case 429: log.warning('Rate limited') raise UtilApiException(f'Rate limited') + case 500: + log.warning('Got a 500 from util API: %s', response.text) + raise UtilApiException(f'No Sessions') case 200: response_json = json.loads(response.text) all_urls = [] @@ -281,7 +284,6 @@ def check_user_for_only_fans(uow: UnitOfWork, username: str, reddit: Reddit) -> uow.commit() return user except (UtilApiException, ConnectionError, TooManyRequests) as e: - log.exception('') raise e except IntegrityError: pass diff --git a/redditrepostsleuth/ingestsvc/ingestsvc.py b/redditrepostsleuth/ingestsvc/ingestsvc.py index e4657ac..e259f41 100644 --- a/redditrepostsleuth/ingestsvc/ingestsvc.py +++ b/redditrepostsleuth/ingestsvc/ingestsvc.py @@ -63,8 +63,9 @@ async def fetch_page(url: str, session: ClientSession) -> Optional[str]: raise RateLimitException('Data API rate limit') elif resp.status == 401: raise RedditTokenExpiredException('Token expired') - log.info('Unexpected request status %s - %s', resp.status, url) - return + else: + log.info('Unexpected request status %s - %s', resp.status, url) + return except (ClientOSError, TimeoutError): log.exception('') @@ -225,7 +226,7 @@ def get_auth_headers(reddit: Reddit) -> dict: :param reddit: :return: """ - reddit.user.me() + list(reddit.subreddit('all').new(limit=1)) # Force praw to make a req so we can steal the token return {**HEADERS, **{'Authorization': f'Bearer {reddit.auth._reddit._core._authorizer.access_token}'}} async def main() -> None: @@ -261,7 +262,7 @@ async def main() -> None: try: log.debug('Sending fetch request') results = await fetch_page(url, session) - except (ServerDisconnectedError, ClientConnectorError, ClientOSError, TimeoutError, CancelledError, UtilApiException): + except (ServerDisconnectedError, ClientConnectorError, ClientOSError, TimeoutError, CancelledError, UtilApiException) as e: log.warning('Error during fetch') await asyncio.sleep(2) continue @@ -299,16 +300,17 @@ async def main() -> None: newest_id = res_data['data']['children'][-1]['data']['id'] - - saved_ids = [x['id'] for x in posts_to_save] - missing_ids_in_this_req = list(set(ids_to_get).difference(saved_ids)) - missed_ids += [base36decode(x) for x in missing_ids_in_this_req] time.sleep(request_delay) - log.info('Missed IDs: %s', len(missed_ids)) - if len(missed_ids) > missed_id_retry_count: - await ingest_sequence(missed_ids, alt_headers=auth_headers) - missed_ids = [] + # saved_ids = [x['id'] for x in posts_to_save] + # missing_ids_in_this_req = list(set(ids_to_get).difference(saved_ids)) + # missed_ids += [base36decode(x) for x in missing_ids_in_this_req] + + + # log.info('Missed IDs: %s', len(missed_ids)) + # if len(missed_ids) > missed_id_retry_count: + # await ingest_sequence(missed_ids, alt_headers=auth_headers) + # missed_ids = [] if __name__ == '__main__': run(main()) \ No newline at end of file diff --git a/redditrepostsleuth/queue_monitor_svc/queue_monitor.py b/redditrepostsleuth/queue_monitor_svc/queue_monitor.py index 05a6410..b187290 100644 --- a/redditrepostsleuth/queue_monitor_svc/queue_monitor.py +++ b/redditrepostsleuth/queue_monitor_svc/queue_monitor.py @@ -19,7 +19,7 @@ def log_queue_size(event_logger): while True: try: client = redis.Redis(host=config.redis_host, port=config.redis_port, db=config.redis_database, password=config.redis_password) - + session_client = redis.Redis(host=config.redis_host, port=config.redis_port, db=2, password=config.redis_password) for queue in client.scan_iter(): queue_name = queue.decode('utf-8').replace('_kombu.binding.', '') if len(queue_name) > 30 or queue_name in skip_keys or 'celery' in queue_name: @@ -30,6 +30,15 @@ def log_queue_size(event_logger): continue event_logger.save_event( CeleryQueueSize(queue_name, queue_length, event_type='queue_update', env=os.getenv('RUN_ENV', 'dev'))) + + session_event = { + 'measurement': 'Session_Count', + # 'time': datetime.utcnow().timestamp(), + 'fields': { + 'count': session_client.dbsize() + }, + } + event_logger.write_raw_points([session_event]) time.sleep(2) except ConnectionError as e: log.error('Failed to connect to Redis') diff --git a/tests/core/util/test_onlyfans_handling.py b/tests/core/util/test_onlyfans_handling.py index a2bbc97..4afa284 100644 --- a/tests/core/util/test_onlyfans_handling.py +++ b/tests/core/util/test_onlyfans_handling.py @@ -44,10 +44,9 @@ def test_get_profile_links_api_connect_fail(self, mock_requests): get_profile_links('testuser') @patch('redditrepostsleuth.core.util.onlyfans_handling.requests.get') - def test_get_profile_links_api_bad_status(self, mock_requests): + def test_get_profile_links_api_bad_status_return_empty(self, mock_requests): mock_requests.return_value = Mock(status_code=500) - with self.assertRaises(UtilApiException): - get_profile_links('testuser') + self.assertEqual([], get_profile_links('testuser')) @patch('redditrepostsleuth.core.util.onlyfans_handling.requests.get') def test_get_profile_links_get_links(self, mock_requests): expected = ['facebook.com', 'google.com']