Skip to content

Commit

Permalink
ingest save time tracking by post type
Browse files Browse the repository at this point in the history
  • Loading branch information
barrycarey committed Oct 20, 2024
1 parent d87265c commit 58e284c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 3 deletions.
17 changes: 17 additions & 0 deletions redditrepostsleuth/core/celery/tasks/ingest_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import random
from dataclasses import dataclass
from datetime import datetime, timedelta
from time import perf_counter
from typing import Optional

import requests
Expand Down Expand Up @@ -44,6 +45,18 @@ def __init__(self):
@celery.task(bind=True, base=IngestTask, ignore_reseults=True, serializer='pickle', autoretry_for=(ConnectionError,ImageConversionException,GalleryNotProcessed, HTTPException), retry_kwargs={'max_retries': 10, 'countdown': 300})
def save_new_post(self, submission: dict, repost_check: bool = True):

start_time = perf_counter()
save_event = {
'measurement': 'Post_Ingest',
#'time': datetime.utcnow().timestamp(),
'fields': {
'run_time': None
},
'tags': {
'post_type': None,
}
}

# TODO: temp fix until I can fix imgur gifs
if 'imgur' in submission['url'] and 'gifv' in submission['url']:
return
Expand Down Expand Up @@ -84,6 +97,10 @@ def save_new_post(self, submission: dict, repost_check: bool = True):
log.exception('Database save failed: %s', str(e), exc_info=False)
return

save_event['fields']['run_time'] = perf_counter() - start_time
save_event['tags']['post_type'] = post.post_type_id
self.event_logger.write_raw_points([save_event])

if repost_check:
if post.post_type_id == 1:
pass
Expand Down
4 changes: 2 additions & 2 deletions redditrepostsleuth/core/services/eventlogging.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(self, config: Config = None):
client = InfluxDBClient(
url=f'http://{self._config.influx_host}:{self._config.influx_port}',
token=self._config.influx_token,
org=self._config.influx_org,
org=self._config.influx_org
)

self._influx_client = client.write_api(write_options=SYNCHRONOUS)
Expand Down Expand Up @@ -90,4 +90,4 @@ def write_raw_points(self, points: list[dict]):
except Exception as e:
log.exception('Failed to write to Influx')

log.info('Wrote Influx: %s', points)
#log.info('Wrote Influx: %s', points)
11 changes: 10 additions & 1 deletion redditrepostsleuth/queue_monitor_svc/queue_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def log_queue_size(event_logger):
while True:
try:
client = redis.Redis(host=config.redis_host, port=config.redis_port, db=config.redis_database, password=config.redis_password)

session_client = redis.Redis(host=config.redis_host, port=config.redis_port, db=2, password=config.redis_password)
for queue in client.scan_iter():
queue_name = queue.decode('utf-8').replace('_kombu.binding.', '')
if len(queue_name) > 30 or queue_name in skip_keys or 'celery' in queue_name:
Expand All @@ -30,6 +30,15 @@ def log_queue_size(event_logger):
continue
event_logger.save_event(
CeleryQueueSize(queue_name, queue_length, event_type='queue_update', env=os.getenv('RUN_ENV', 'dev')))

session_event = {
'measurement': 'Session_Count',
# 'time': datetime.utcnow().timestamp(),
'fields': {
'count': session_client.dbsize()
},
}
event_logger.write_raw_points([session_event])
time.sleep(2)
except ConnectionError as e:
log.error('Failed to connect to Redis')
Expand Down

0 comments on commit 58e284c

Please sign in to comment.