Skip to content

Commit

Permalink
Merge pull request #383 from barrycarey/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
barrycarey authored Dec 10, 2024
2 parents bc37475 + 0fc5a11 commit b01d871
Show file tree
Hide file tree
Showing 21 changed files with 324 additions and 158 deletions.
5 changes: 1 addition & 4 deletions alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@
target_metadata = Base.metadata

def get_conn_string():
conn_str = r'mysql+pymysql://barry:[email protected]/reddit_dev'
return conn_str
#return f'mysql+pymysql://{bot_config.db_user}:{quote_plus(bot_config.db_password)}@{bot_config.db_host}/{bot_config.db_name}'

return f'mysql+pymysql://{bot_config.db_user}:{quote_plus(bot_config.db_password)}@{bot_config.db_host}/{bot_config.db_name}'

# other values from the config, defined by the needs of env.py,
# can be acquired:
Expand Down
18 changes: 17 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ services:
- CELERY_IMPORTS=redditrepostsleuth.core.celery.admin_tasks,redditrepostsleuth.core.celery.tasks.scheduled_tasks
entrypoint: celery -A redditrepostsleuth.core.celery worker -Q scheduled_tasks -n scheduled_task_worker --autoscale=15,2

subreddit_update_worker:
container_name: subreddit_update_worker
restart: unless-stopped
user: "1001"
build:
context: .
dockerfile: docker/WorkerDockerFile
env_file:
- .env
environment:
- RUN_ENV=production
- db_user=maintenance_task
- LOG_LEVEL=DEBUG
- CELERY_IMPORTS=redditrepostsleuth.core.celery.tasks.maintenance_tasks
entrypoint: celery -A redditrepostsleuth.core.celery worker -Q update_subreddit_data -n subreddit_update_worker --autoscale=1,4

scheduler:
container_name: beat_scheduler
restart: unless-stopped
Expand Down Expand Up @@ -120,7 +136,7 @@ services:
- db_user=sub_monitor
- LOG_LEVEL=INFO
- CELERY_IMPORTS=redditrepostsleuth.core.celery.tasks.monitored_sub_tasks
entrypoint: celery -A redditrepostsleuth.core.celery worker -Q submonitor -n submonitor_worker --autoscale=6,2
entrypoint: celery -A redditrepostsleuth.core.celery worker -Q submonitor -n submonitor_worker --autoscale=10,2

reddit_actions_worker:
container_name: reddit-actions-worker
Expand Down
1 change: 0 additions & 1 deletion redditrepostsleuth/adminsvc/inbox_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def __init__(
self.failed_checks = []

def check_inbox(self):
print('[Scheduled Job] Checking Inbox Start')
for msg in self.reddit.inbox.messages(limit=500):
if msg.author != 'RepostSleuthBot' and msg.subject.lower() in ['false negative', 'false positive']:
#self._process_user_report(msg)
Expand Down
93 changes: 1 addition & 92 deletions redditrepostsleuth/core/celery/admin_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,13 @@

import pymysql
from celery import Task
from prawcore import TooManyRequests
from requests.exceptions import ConnectionError
from sqlalchemy import func
from sqlalchemy.exc import IntegrityError

from redditrepostsleuth.core.celery import celery
from redditrepostsleuth.core.celery.basetasks import AdminTask
from redditrepostsleuth.core.config import Config
from redditrepostsleuth.core.db.databasemodels import MonitoredSub, Post, UserReview
from redditrepostsleuth.core.exception import UtilApiException, UserNotFound
from redditrepostsleuth.core.db.databasemodels import MonitoredSub, Post
from redditrepostsleuth.core.logfilters import ContextFilter
from redditrepostsleuth.core.logging import log, configure_logger
from redditrepostsleuth.core.util.helpers import chunk_list
from redditrepostsleuth.core.util.onlyfans_handling import check_user_for_promoter_links, \
check_user_comments_for_promoter_links

log = configure_logger(
name='redditrepostsleuth',
Expand Down Expand Up @@ -126,88 +118,5 @@ def update_subreddit_config_from_database(self, monitored_sub: MonitoredSub, use
)


@celery.task(bind=True, base=AdminTask, autoretry_for=(UtilApiException,ConnectionError,TooManyRequests), retry_kwards={'max_retries': 3, 'retry_backoff': True})
def check_user_for_only_fans(self, username: str) -> None:
skip_names = ['[deleted]', 'AutoModerator']

if username in skip_names:
log.info('Skipping name %s', username)
return

try:
with self.uowm.start() as uow:
user = uow.user_review.get_by_username(username)

if user:
delta = datetime.utcnow() - user.last_checked
if delta.days < 30:
log.info('Skipping existing user %s, last check was %s days ago', username, delta.days)
return
user.content_links_found = False
user.notes = None
user.last_checked = func.utc_timestamp()

log.info('Checking user %s', username)
if not user:
user = UserReview(username=username)
try:
result = check_user_for_promoter_links(username)
except UserNotFound as e:
log.warning(e)
return

if result:
log.info('Promoter found: %s - %s', username, str(result))
user.content_links_found = True
user.notes = str(result)
uow.user_review.add(user)
uow.commit()
except (UtilApiException, ConnectionError, TooManyRequests) as e:
raise e
except IntegrityError:
pass
except Exception as e:
log.exception('')


@celery.task(bind=True, base=AdminTask, autoretry_for=(UtilApiException,ConnectionError,TooManyRequests), retry_kwards={'max_retries': 3})
def check_user_comments_for_only_fans(self, username: str) -> None:
"""
This should be run after the profile check so we don't do any timeframe checking
:param self:
:param username:
:return:
"""
skip_names = ['[deleted]', 'AutoModerator']

if username in skip_names:
log.info('Skipping name %s', username)
return

try:
with self.uowm.start() as uow:
user = uow.user_review.get_by_username(username)

if not user:
log.error('User not found: %s', username)

try:
result = check_user_comments_for_promoter_links(username)
except UserNotFound as e:
log.warning(e)
return

if result:
log.info('Promoter found: %s - %s', username, str(result))
user.content_links_found = True
user.notes = str(result)
uow.user_review.add(user)
uow.commit()
except (UtilApiException, ConnectionError, TooManyRequests) as e:
raise e
except IntegrityError:
pass
except Exception as e:
log.exception('')


2 changes: 2 additions & 0 deletions redditrepostsleuth/core/celery/basetasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime

from celery import Task

from redditrepostsleuth.core.config import Config
Expand Down
5 changes: 4 additions & 1 deletion redditrepostsleuth/core/celery/celeryconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
task_routes = {
'redditrepostsleuth.core.celery.tasks.ingest_tasks.save_new_post': {'queue': 'post_ingest'},
'redditrepostsleuth.core.celery.tasks.ingest_tasks.save_new_posts': {'queue': 'post_ingest'},
'redditrepostsleuth.core.celery.tasks.ingest_tasks.save_subreddit': {'queue': 'save_subreddit'},
'redditrepostsleuth.core.celery.tasks.ingest_tasks.ingest_repost_check': {'queue': 'repost'},
'redditrepostsleuth.core.celery.tasks.repost_tasks.check_image_repost_save': {'queue': 'repost_image'},
'redditrepostsleuth.core.celery.tasks.repost_tasks.link_repost_check': {'queue': 'repost_link'},
Expand All @@ -29,8 +30,10 @@
'redditrepostsleuth.core.celery.admin_tasks.update_proxies_job': {'queue': 'scheduled_tasks'},
'redditrepostsleuth.core.celery.admin_tasks.check_user_for_only_fans': {'queue': 'onlyfans_check'},
'redditrepostsleuth.core.celery.admin_tasks.update_subreddit_config_from_database': {'queue': 'update_wiki_from_database'},
'redditrepostsleuth.core.celery.admin_tasks.delete_search_batch': {'queue': 'batch_delete_searches'},
#'redditrepostsleuth.core.celery.admin_tasks.delete_search_batch': {'queue': 'batch_delete_searches'},
'redditrepostsleuth.core.celery.tasks.reddit_action_tasks.*': {'queue': 'reddit_actions'},
'redditrepostsleuth.core.celery.tasks.maintenance_tasks.update_subreddit_data': {'queue': 'update_subreddit_data'},
'redditrepostsleuth.core.celery.tasks.maintenance_tasks.save_subreddit': {'queue': 'update_subreddit_data'}


}
Expand Down
26 changes: 18 additions & 8 deletions redditrepostsleuth/core/celery/task_logic/scheduled_task_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import sys
import time
from datetime import datetime

import jwt
import redis
Expand Down Expand Up @@ -62,22 +63,25 @@ def update_proxies(uowm: UnitOfWorkManager) -> None:
def update_top_reposts(uow: UnitOfWork, post_type_id: int, day_range: int = None):
# reddit.info(reddit_ids_to_lookup):
log.info('Getting top repostors for post type %s with range %s', post_type_id, day_range)
range_query = "SELECT repost_of_id, COUNT(*) c FROM repost WHERE detected_at > NOW() - INTERVAL :days DAY AND post_type_id=:posttype GROUP BY repost_of_id HAVING c > 5 ORDER BY c DESC"
all_time_query = "SELECT repost_of_id, COUNT(*) c FROM repost WHERE post_type_id=:posttype GROUP BY repost_of_id HAVING c > 5 ORDER BY c DESC"
range_query = "SELECT repost_of_id, COUNT(*) c FROM repost WHERE detected_at > NOW() - INTERVAL :days DAY AND post_type_id=:posttype GROUP BY repost_of_id HAVING c > 5 ORDER BY c DESC LIMIT 100000"
all_time_query = "SELECT repost_of_id, COUNT(*) c FROM repost WHERE post_type_id=:posttype GROUP BY repost_of_id HAVING c > 5 ORDER BY c DESC LIMIT 100000"
if day_range:
query = range_query
log.debug('Deleting top reposts for day range %s', day_range)
uow.session.execute(text('DELETE FROM stat_top_repost WHERE post_type_id=:posttype AND day_range=:days'),
{'posttype': post_type_id, 'days': day_range})
else:
query = all_time_query
log.debug('Deleting all top reposts for all time')
uow.session.execute(text('DELETE FROM stat_top_repost WHERE post_type_id=:posttype AND day_range IS NULL'),
{'posttype': post_type_id})

uow.commit()



log.debug('Executing query for day range %s', day_range)
result = uow.session.execute(text(query), {'posttype': post_type_id, 'days': day_range})
log.debug('Finished executing query for day range %s', day_range)
for row in result:
stat = StatsTopRepost()
stat.post_id = row[0]
Expand All @@ -98,8 +102,8 @@ def run_update_top_reposts(uow: UnitOfWork) -> None:

def update_top_reposters(uow: UnitOfWork, post_type_id: int, day_range: int = None) -> None:
log.info('Getting top repostors for post type %s with range %s', post_type_id, day_range)
range_query = "SELECT author, COUNT(*) c FROM repost WHERE detected_at > NOW() - INTERVAL :days DAY AND post_type_id=:posttype AND author is not NULL AND author!= '[deleted]' GROUP BY author HAVING c > 10 ORDER BY c DESC"
all_time_query = "SELECT author, COUNT(*) c FROM repost WHERE post_type_id=:posttype AND author is not NULL AND author!= '[deleted]' GROUP BY author HAVING c > 10 ORDER BY c DESC"
range_query = "SELECT author, COUNT(*) c FROM repost WHERE detected_at > NOW() - INTERVAL :days DAY AND post_type_id=:posttype AND author is not NULL AND author!= '[deleted]' GROUP BY author HAVING c > 10 ORDER BY c DESC LIMIT 100000"
all_time_query = "SELECT author, COUNT(*) c FROM repost WHERE post_type_id=:posttype AND author is not NULL AND author!= '[deleted]' GROUP BY author HAVING c > 10 ORDER BY c DESC LIMIT 100000"
if day_range:
query = range_query
else:
Expand All @@ -123,14 +127,20 @@ def update_top_reposters(uow: UnitOfWork, post_type_id: int, day_range: int = No
stat.repost_count = row[1]
stat.updated_at = func.utc_timestamp()
uow.stat_top_reposter.add(stat)
uow.commit()
uow.commit()

log.info('finished')

def run_update_top_reposters(uow: UnitOfWork):
post_types = [1, 2, 3]
day_ranges = [1, 7, 14, 30, None]
log.warning('Starting update to reposters task')
start_time = datetime.utcnow()
for post_type_id in post_types:
for days in day_ranges:
update_top_reposters(uow, post_type_id, days)
delta = (datetime.utcnow() - start_time)
log.warning('Top reposters: Type=%s days=%s time=%s', post_type_id, days, delta)


def token_checker() -> None:
Expand Down Expand Up @@ -229,12 +239,12 @@ def update_monitored_sub_data(

monitored_sub.is_private = True if subreddit.subreddit_type == 'private' else False
monitored_sub.nsfw = True if subreddit.over18 else False
log.info('[Subscriber Update] %s: %s subscribers', monitored_sub.name, monitored_sub.subscribers)
log.debug('[Subscriber Update] %s: %s subscribers', monitored_sub.name, monitored_sub.subscribers)

perms = get_bot_permissions(subreddit) if monitored_sub.is_mod else []
monitored_sub.post_permission = True if 'all' in perms or 'posts' in perms else None
monitored_sub.wiki_permission = True if 'all' in perms or 'wiki' in perms else None
log.info('[Mod Check] %s | Post Perm: %s | Wiki Perm: %s', monitored_sub.name, monitored_sub.post_permission,
log.debug('[Mod Check] %s | Post Perm: %s | Wiki Perm: %s', monitored_sub.name, monitored_sub.post_permission,
monitored_sub.wiki_permission)


Expand Down
45 changes: 44 additions & 1 deletion redditrepostsleuth/core/celery/tasks/ingest_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import random
from dataclasses import dataclass
from datetime import datetime, timedelta
from time import perf_counter
from typing import Optional
from urllib.parse import urlparse

import requests
from celery import Task
Expand All @@ -13,6 +15,7 @@
from redditrepostsleuth.core.celery.basetasks import SqlAlchemyTask
from redditrepostsleuth.core.celery.task_logic.ingest_task_logic import pre_process_post, get_redgif_image_url
from redditrepostsleuth.core.config import Config
from redditrepostsleuth.core.db.databasemodels import Subreddit
from redditrepostsleuth.core.db.db_utils import get_db_engine
from redditrepostsleuth.core.db.uow.unitofworkmanager import UnitOfWorkManager
from redditrepostsleuth.core.exception import InvalidImageUrlException, GalleryNotProcessed, ImageConversionException, \
Expand Down Expand Up @@ -41,9 +44,44 @@ def __init__(self):
self._proxy_manager = ProxyManager(self.uowm, 1000)
self.domains_to_proxy = []

@celery.task(bind=True, base=IngestTask, ignore_reseults=True, serializer='pickle')
def save_subreddit(self, subreddit_name: str):
try:
with self.uowm.start() as uow:
existing = uow.subreddit.get_by_name(subreddit_name)
if existing:
log.debug('Subreddit %s already exists', subreddit_name)
return
subreddit = Subreddit(name=subreddit_name)
uow.subreddit.add(subreddit)
uow.commit()
log.debug('Saved Subreddit %s', subreddit_name)
celery.send_task('redditrepostsleuth.core.celery.tasks.maintenance_tasks.update_subreddit_data', args=[subreddit_name])
except Exception as e:
log.exception()

@celery.task(bind=True, base=IngestTask, ignore_reseults=True, serializer='pickle', autoretry_for=(ConnectionError,ImageConversionException,GalleryNotProcessed, HTTPException), retry_kwargs={'max_retries': 10, 'countdown': 300})
def save_new_post(self, submission: dict, repost_check: bool = True):

start_time = perf_counter()
save_event = {
'measurement': 'Post_Ingest',
#'time': datetime.utcnow().timestamp(),
'fields': {
'run_time': None,
'post_id': submission.get('id', None)
},
'tags': {
'post_type': None,
'domain': None
}
}

# Adding for timing in Grafana
url = submission.get('url', None)
if url:
save_event['tags']['domain'] = urlparse(url).netloc

# TODO: temp fix until I can fix imgur gifs
if 'imgur' in submission['url'] and 'gifv' in submission['url']:
return
Expand Down Expand Up @@ -84,6 +122,10 @@ def save_new_post(self, submission: dict, repost_check: bool = True):
log.exception('Database save failed: %s', str(e), exc_info=False)
return

save_event['fields']['run_time'] = perf_counter() - start_time
save_event['tags']['post_type'] = post.post_type_id
self.event_logger.write_raw_points([save_event])

if repost_check:
if post.post_type_id == 1:
pass
Expand All @@ -93,7 +135,7 @@ def save_new_post(self, submission: dict, repost_check: bool = True):
elif post.post_type_id == 3:
celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.link_repost_check', args=[post])

#celery.send_task('redditrepostsleuth.core.celery.admin_tasks.check_user_for_only_fans', args=[post.author])
celery.send_task('redditrepostsleuth.core.celery.tasks.maintenance_tasks.save_subreddit', args=[post.subreddit])



Expand All @@ -102,6 +144,7 @@ def save_new_posts(posts: list[dict], repost_check: bool = True) -> None:
for post in posts:
save_new_post.apply_async((post, repost_check))


@celery.task(bind=True, base=SqlAlchemyTask, ignore_results=True)
def save_pushshift_results(self, data):
with self.uowm.start() as uow:
Expand Down
Loading

0 comments on commit b01d871

Please sign in to comment.