diff --git a/redditrepostsleuth/core/celery/tasks/ingest_tasks.py b/redditrepostsleuth/core/celery/tasks/ingest_tasks.py index 8d52b6a..176e8dc 100644 --- a/redditrepostsleuth/core/celery/tasks/ingest_tasks.py +++ b/redditrepostsleuth/core/celery/tasks/ingest_tasks.py @@ -2,6 +2,7 @@ import random from dataclasses import dataclass from datetime import datetime, timedelta +from time import perf_counter from typing import Optional import requests @@ -44,6 +45,18 @@ def __init__(self): @celery.task(bind=True, base=IngestTask, ignore_reseults=True, serializer='pickle', autoretry_for=(ConnectionError,ImageConversionException,GalleryNotProcessed, HTTPException), retry_kwargs={'max_retries': 10, 'countdown': 300}) def save_new_post(self, submission: dict, repost_check: bool = True): + start_time = perf_counter() + save_event = { + 'measurement': 'Post_Ingest', + #'time': datetime.utcnow().timestamp(), + 'fields': { + 'run_time': None + }, + 'tags': { + 'post_type': None, + } + } + # TODO: temp fix until I can fix imgur gifs if 'imgur' in submission['url'] and 'gifv' in submission['url']: return @@ -84,6 +97,10 @@ def save_new_post(self, submission: dict, repost_check: bool = True): log.exception('Database save failed: %s', str(e), exc_info=False) return + save_event['fields']['run_time'] = perf_counter() - start_time + save_event['tags']['post_type'] = post.post_type_id + self.event_logger.write_raw_points([save_event]) + if repost_check: if post.post_type_id == 1: pass diff --git a/redditrepostsleuth/core/services/eventlogging.py b/redditrepostsleuth/core/services/eventlogging.py index 4b72cdb..04a40db 100644 --- a/redditrepostsleuth/core/services/eventlogging.py +++ b/redditrepostsleuth/core/services/eventlogging.py @@ -22,7 +22,7 @@ def __init__(self, config: Config = None): client = InfluxDBClient( url=f'http://{self._config.influx_host}:{self._config.influx_port}', token=self._config.influx_token, - org=self._config.influx_org, + org=self._config.influx_org ) self._influx_client = client.write_api(write_options=SYNCHRONOUS) @@ -90,4 +90,4 @@ def write_raw_points(self, points: list[dict]): except Exception as e: log.exception('Failed to write to Influx') - log.info('Wrote Influx: %s', points) \ No newline at end of file + #log.info('Wrote Influx: %s', points) \ No newline at end of file diff --git a/redditrepostsleuth/queue_monitor_svc/queue_monitor.py b/redditrepostsleuth/queue_monitor_svc/queue_monitor.py index 05a6410..b187290 100644 --- a/redditrepostsleuth/queue_monitor_svc/queue_monitor.py +++ b/redditrepostsleuth/queue_monitor_svc/queue_monitor.py @@ -19,7 +19,7 @@ def log_queue_size(event_logger): while True: try: client = redis.Redis(host=config.redis_host, port=config.redis_port, db=config.redis_database, password=config.redis_password) - + session_client = redis.Redis(host=config.redis_host, port=config.redis_port, db=2, password=config.redis_password) for queue in client.scan_iter(): queue_name = queue.decode('utf-8').replace('_kombu.binding.', '') if len(queue_name) > 30 or queue_name in skip_keys or 'celery' in queue_name: @@ -30,6 +30,15 @@ def log_queue_size(event_logger): continue event_logger.save_event( CeleryQueueSize(queue_name, queue_length, event_type='queue_update', env=os.getenv('RUN_ENV', 'dev'))) + + session_event = { + 'measurement': 'Session_Count', + # 'time': datetime.utcnow().timestamp(), + 'fields': { + 'count': session_client.dbsize() + }, + } + event_logger.write_raw_points([session_event]) time.sleep(2) except ConnectionError as e: log.error('Failed to connect to Redis')