Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/store subreddit data #382

Merged
merged 3 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@
target_metadata = Base.metadata

def get_conn_string():
conn_str = r'mysql+pymysql://barry:[email protected]/reddit_dev'
return conn_str
#return f'mysql+pymysql://{bot_config.db_user}:{quote_plus(bot_config.db_password)}@{bot_config.db_host}/{bot_config.db_name}'

return f'mysql+pymysql://{bot_config.db_user}:{quote_plus(bot_config.db_password)}@{bot_config.db_host}/{bot_config.db_name}'

# other values from the config, defined by the needs of env.py,
# can be acquired:
Expand Down
16 changes: 16 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ services:
- CELERY_IMPORTS=redditrepostsleuth.core.celery.admin_tasks,redditrepostsleuth.core.celery.tasks.scheduled_tasks
entrypoint: celery -A redditrepostsleuth.core.celery worker -Q scheduled_tasks -n scheduled_task_worker --autoscale=15,2

subreddit_update_worker:
container_name: subreddit_update_worker
restart: unless-stopped
user: "1001"
build:
context: .
dockerfile: docker/WorkerDockerFile
env_file:
- .env
environment:
- RUN_ENV=production
- db_user=maintenance_task
- LOG_LEVEL=DEBUG
- CELERY_IMPORTS=redditrepostsleuth.core.celery.tasks.maintenance_tasks
entrypoint: celery -A redditrepostsleuth.core.celery worker -Q update_subreddit_data -n subreddit_update_worker --autoscale=1,4

scheduler:
container_name: beat_scheduler
restart: unless-stopped
Expand Down
3 changes: 3 additions & 0 deletions redditrepostsleuth/core/celery/celeryconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
task_routes = {
'redditrepostsleuth.core.celery.tasks.ingest_tasks.save_new_post': {'queue': 'post_ingest'},
'redditrepostsleuth.core.celery.tasks.ingest_tasks.save_new_posts': {'queue': 'post_ingest'},
'redditrepostsleuth.core.celery.tasks.ingest_tasks.save_subreddit': {'queue': 'save_subreddit'},
'redditrepostsleuth.core.celery.tasks.ingest_tasks.ingest_repost_check': {'queue': 'repost'},
'redditrepostsleuth.core.celery.tasks.repost_tasks.check_image_repost_save': {'queue': 'repost_image'},
'redditrepostsleuth.core.celery.tasks.repost_tasks.link_repost_check': {'queue': 'repost_link'},
Expand All @@ -31,6 +32,8 @@
'redditrepostsleuth.core.celery.admin_tasks.update_subreddit_config_from_database': {'queue': 'update_wiki_from_database'},
#'redditrepostsleuth.core.celery.admin_tasks.delete_search_batch': {'queue': 'batch_delete_searches'},
'redditrepostsleuth.core.celery.tasks.reddit_action_tasks.*': {'queue': 'reddit_actions'},
'redditrepostsleuth.core.celery.tasks.maintenance_tasks.update_subreddit_data': {'queue': 'update_subreddit_data'},
'redditrepostsleuth.core.celery.tasks.maintenance_tasks.save_subreddit': {'queue': 'update_subreddit_data'}


}
Expand Down
30 changes: 28 additions & 2 deletions redditrepostsleuth/core/celery/tasks/ingest_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime, timedelta
from time import perf_counter
from typing import Optional
from urllib.parse import urlparse

import requests
from celery import Task
Expand All @@ -14,6 +15,7 @@
from redditrepostsleuth.core.celery.basetasks import SqlAlchemyTask
from redditrepostsleuth.core.celery.task_logic.ingest_task_logic import pre_process_post, get_redgif_image_url
from redditrepostsleuth.core.config import Config
from redditrepostsleuth.core.db.databasemodels import Subreddit
from redditrepostsleuth.core.db.db_utils import get_db_engine
from redditrepostsleuth.core.db.uow.unitofworkmanager import UnitOfWorkManager
from redditrepostsleuth.core.exception import InvalidImageUrlException, GalleryNotProcessed, ImageConversionException, \
Expand Down Expand Up @@ -42,6 +44,22 @@ def __init__(self):
self._proxy_manager = ProxyManager(self.uowm, 1000)
self.domains_to_proxy = []

@celery.task(bind=True, base=IngestTask, ignore_reseults=True, serializer='pickle')
def save_subreddit(self, subreddit_name: str):
try:
with self.uowm.start() as uow:
existing = uow.subreddit.get_by_name(subreddit_name)
if existing:
log.debug('Subreddit %s already exists', subreddit_name)
return
subreddit = Subreddit(name=subreddit_name)
uow.subreddit.add(subreddit)
uow.commit()
log.debug('Saved Subreddit %s', subreddit_name)
celery.send_task('redditrepostsleuth.core.celery.tasks.maintenance_tasks.update_subreddit_data', args=[subreddit_name])
except Exception as e:
log.exception()

@celery.task(bind=True, base=IngestTask, ignore_reseults=True, serializer='pickle', autoretry_for=(ConnectionError,ImageConversionException,GalleryNotProcessed, HTTPException), retry_kwargs={'max_retries': 10, 'countdown': 300})
def save_new_post(self, submission: dict, repost_check: bool = True):

Expand All @@ -50,13 +68,20 @@ def save_new_post(self, submission: dict, repost_check: bool = True):
'measurement': 'Post_Ingest',
#'time': datetime.utcnow().timestamp(),
'fields': {
'run_time': None
'run_time': None,
'post_id': submission.get('id', None)
},
'tags': {
'post_type': None,
'domain': None
}
}

# Adding for timing in Grafana
url = submission.get('url', None)
if url:
save_event['tags']['domain'] = urlparse(url).netloc

# TODO: temp fix until I can fix imgur gifs
if 'imgur' in submission['url'] and 'gifv' in submission['url']:
return
Expand Down Expand Up @@ -110,7 +135,7 @@ def save_new_post(self, submission: dict, repost_check: bool = True):
elif post.post_type_id == 3:
celery.send_task('redditrepostsleuth.core.celery.tasks.repost_tasks.link_repost_check', args=[post])

#celery.send_task('redditrepostsleuth.core.celery.admin_tasks.check_user_for_only_fans', args=[post.author])
celery.send_task('redditrepostsleuth.core.celery.tasks.maintenance_tasks.save_subreddit', args=[post.subreddit])



Expand All @@ -119,6 +144,7 @@ def save_new_posts(posts: list[dict], repost_check: bool = True) -> None:
for post in posts:
save_new_post.apply_async((post, repost_check))


@celery.task(bind=True, base=SqlAlchemyTask, ignore_results=True)
def save_pushshift_results(self, data):
with self.uowm.start() as uow:
Expand Down
52 changes: 52 additions & 0 deletions redditrepostsleuth/core/celery/tasks/maintenance_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import datetime

import requests

from redditrepostsleuth.core.celery import celery
from redditrepostsleuth.core.celery.basetasks import SqlAlchemyTask
from redditrepostsleuth.core.db.databasemodels import Subreddit
from redditrepostsleuth.core.exception import UtilApiException
from redditrepostsleuth.core.logging import configure_logger

log = configure_logger(
name='redditrepostsleuth',
)


@celery.task(bind=True, base=SqlAlchemyTask, autoretry_for=(UtilApiException,), retry_kwards={'max_retries': 50, 'countdown': 600})
def update_subreddit_data(self, subreddit_name) -> None:
try:
with self.uowm.start() as uow:
subreddit = uow.subreddit.get_by_name(subreddit_name)
url_to_fetch = f'{self.config.util_api}/reddit/subreddit?name={subreddit.name}'
res = requests.get(url_to_fetch)
if res.status_code != 200:
log.error('Bad status %s from util API when checking subreddit %s', res.status_code, subreddit.name)
raise UtilApiException(f'Bad status {res.status_code} checking {subreddit_name}')

subreddit_data = res.json()['data']
subreddit.subscribers = subreddit_data['subscribers'] or 0
subreddit.nsfw = subreddit_data['over18'] or False
subreddit.last_checked = datetime.datetime.now(datetime.UTC)
uow.commit()
log.debug('Update subreddit data for %s. NSFW: %s - Subscribers: %s', subreddit.name, subreddit.nsfw, subreddit.subscribers)
except UtilApiException as e:
raise e
except Exception as e:
log.exception('')

@celery.task(bind=True, base=SqlAlchemyTask, ignore_reseults=True, serializer='pickle')
def save_subreddit(self, subreddit_name: str):
try:
with self.uowm.start() as uow:
existing = uow.subreddit.get_by_name(subreddit_name)
if existing:
log.debug('Subreddit %s already exists', subreddit_name)
return
subreddit = Subreddit(name=subreddit_name)
uow.subreddit.add(subreddit)
uow.commit()
log.debug('Saved Subreddit %s', subreddit_name)
update_subreddit_data.apply_async((subreddit_name,))
except Exception as e:
log.exception('')
13 changes: 12 additions & 1 deletion redditrepostsleuth/core/celery/tasks/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from functools import wraps
from time import perf_counter

import requests
from prawcore import TooManyRequests, Redirect, ServerError, NotFound

from redditrepostsleuth.adminsvc.bot_comment_monitor import BotCommentMonitor
Expand All @@ -14,6 +15,7 @@
from redditrepostsleuth.core.celery.task_logic.scheduled_task_logic import update_proxies, token_checker, \
run_update_top_reposters, update_top_reposters, update_monitored_sub_data, run_update_top_reposts
from redditrepostsleuth.core.db.databasemodels import StatsDailyCount
from redditrepostsleuth.core.exception import UtilApiException
from redditrepostsleuth.core.logging import configure_logger
from redditrepostsleuth.core.util.helpers import chunk_list

Expand Down Expand Up @@ -321,4 +323,13 @@ def queue_search_history_cleanup(self):
log.info('Queuing Search History Cleanup. Range: ID Range: %s:%s', searches[0].id, searches[-1].id)
ids = [x[0] for x in searches]
for chunk in chunk_list(ids, 5000):
delete_search_batch.apply_async((chunk,))
delete_search_batch.apply_async((chunk,))

@celery.task(bind=True, base=RedditTask, autoretry_for=(UtilApiException,), retry_kwards={'max_retries': 5})
@record_task_status
def queue_subreddit_data_updates(self) -> None:
with self.uowm.start() as uow:
subreddits_to_update = uow.subreddit.get_subreddits_to_update()
for subreddit in subreddits_to_update:
celery.send_task('redditrepostsleuth.core.celery.tasks.maintenance_tasks.save_subreddit',
args=[subreddit.name])
18 changes: 17 additions & 1 deletion redditrepostsleuth/core/db/databasemodels.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ class Repost(Base):
Index('idx_repost_by_type', 'post_type_id', 'detected_at', unique=False),
Index('idx_repost_of_date', 'author', 'detected_at',unique=False),
Index('idx_repost_by_subreddit', 'subreddit', 'post_type_id', 'detected_at', unique=False),
Index('idx_repost_by_author', 'author', unique=False),
)
id = Column(Integer, primary_key=True)
post_id = Column(Integer, ForeignKey('post.id'))
Expand Down Expand Up @@ -758,4 +759,19 @@ class UserReview(Base):
content_links_found = Column(Boolean, default=False)
added_at = Column(DateTime, default=func.utc_timestamp(), nullable=False)
notes = Column(String(150))
last_checked = Column(DateTime, default=func.utc_timestamp())
last_checked = Column(DateTime, default=func.utc_timestamp())

class Subreddit(Base):
__tablename__ = 'subreddit'
__table_args__ = (
Index('idx_subreddit_name', 'name'),
)
id = Column(Integer, primary_key=True)
name = Column(String(25), nullable=False, unique=True)
subscribers = Column(Integer, nullable=False, default=0)
nsfw = Column(Boolean, nullable=False, default=False)
added_at = Column(DateTime, default=func.utc_timestamp(), nullable=False)
bot_banned = Column(Boolean, nullable=False, default=False)
bot_banned_at = Column(DateTime)
last_ban_check = Column(DateTime)
last_checked = Column(DateTime)
3 changes: 3 additions & 0 deletions redditrepostsleuth/core/db/repository/repost_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ def get_all(self, limit: int = None, offset: int = None) -> List[Repost]:
def get_all_by_type(self, post_type_id: int, limit: None, offset: None) -> list[Repost]:
return self.db_session.query(Repost).filter(Repost.post_type_id == post_type_id).order_by(Repost.id.desc()).offset(offset).limit(limit).all()

def get_by_author(self, author: str) -> List[Repost]:
return self.db_session.query(Repost).filter(Repost.author == author).all()

def get_all_without_author(self, limit: int = None, offset: int = None):
return self.db_session.query(Repost).filter(Repost.author == None).order_by(Repost.id.desc()).offset(offset).limit(limit).all()

Expand Down
20 changes: 20 additions & 0 deletions redditrepostsleuth/core/db/repository/subreddit_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import datetime

from sqlalchemy import or_

from redditrepostsleuth.core.db.databasemodels import Subreddit


class SubredditRepo:
def __init__(self, db_session):
self.db_session = db_session

def add(self, item):
self.db_session.add(item)

def get_by_name(self, name: str):
return self.db_session.query(Subreddit).filter(Subreddit.name == name).first()

def get_subreddits_to_update(self, limit: int = None, offset: int = None) -> list[Subreddit]:
delta = datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=3)
return self.db_session.query(Subreddit).filter(or_(Subreddit.added_at < delta, Subreddit.last_checked == None)).limit(limit).offset(offset).all()
8 changes: 7 additions & 1 deletion redditrepostsleuth/core/db/uow/unitofwork.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from sqlalchemy.orm import scoped_session

from redditrepostsleuth.core.db.databasemodels import Subreddit
from redditrepostsleuth.core.db.repository.banned_subreddit_repo import BannedSubredditRepo
from redditrepostsleuth.core.db.repository.banned_user_repo import BannedUserRepo
from redditrepostsleuth.core.db.repository.bot_private_message_repo import BotPrivateMessageRepo
Expand Down Expand Up @@ -27,6 +28,7 @@
from redditrepostsleuth.core.db.repository.stat_daily_count_repo import StatDailyCountRepo
from redditrepostsleuth.core.db.repository.stat_top_repost_repo import StatTopRepostRepo
from redditrepostsleuth.core.db.repository.stats_top_reposter_repo import StatTopReposterRepo
from redditrepostsleuth.core.db.repository.subreddit_repo import SubredditRepo
from redditrepostsleuth.core.db.repository.summonsrepository import SummonsRepository
from redditrepostsleuth.core.db.repository.user_report_repo import UserReportRepo
from redditrepostsleuth.core.db.repository.user_review_repo import UserReviewRepo
Expand Down Expand Up @@ -175,4 +177,8 @@ def post_type(self) -> PostTypeRepo:

@property
def user_whitelist(self) -> UserWhitelistRepo:
return UserWhitelistRepo(self.session)
return UserWhitelistRepo(self.session)

@property
def subreddit(self) -> SubredditRepo:
return SubredditRepo(self.session)
Loading