Skip to content

Commit

Permalink
Merge pull request #10 from cesaregarza/feature/weapon-calcs
Browse files Browse the repository at this point in the history
Feature/weapon calcs
  • Loading branch information
cesaregarza authored May 24, 2024
2 parents b8f9491 + 1a06b91 commit 6df0c30
Show file tree
Hide file tree
Showing 22 changed files with 591 additions and 166 deletions.
2 changes: 1 addition & 1 deletion dockerfiles/dockerfile.celery
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ RUN poetry version $BUILD_VERSION && \
poetry install --no-dev --without scripts && \
poetry update

CMD ["poetry", "run", "celery", "-A", "celery_app.tasks", "worker", "--loglevel=info"]
CMD ["poetry", "run", "celery", "-A", "celery_app.app", "worker", "--loglevel=info"]
2 changes: 1 addition & 1 deletion k8s/celery-worker/celery-worker-deployment-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ spec:
image: celery-worker:latest
imagePullPolicy: Never
command:
["celery", "-A", "celery_app.tasks", "worker", "--loglevel=info"]
["celery", "-A", "celery_app.app", "worker", "--loglevel=info"]
env:
- name: DB_HOST
valueFrom:
Expand Down
2 changes: 1 addition & 1 deletion k8s/celery-worker/celery-worker-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ spec:
image: registry.digitalocean.com/sendouq/celery:latest
imagePullPolicy: Always
command:
["celery", "-A", "celery_app.tasks", "worker", "--loglevel=info"]
["celery", "-A", "celery_app.app", "worker", "--loglevel=info"]
env:
- name: DB_HOST
valueFrom:
Expand Down
22 changes: 22 additions & 0 deletions src/celery_app/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import logging

from celery import Celery

from celery_app.connections import Session, redis_conn
from celery_app.tasks.front_page import pull_data
from celery_app.tasks.misc import pull_aliases, update_weapon_info
from celery_app.tasks.player_detail import fetch_player_data
from shared_lib.constants import REDIS_URI

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(filename)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)

celery = Celery("tasks", broker=REDIS_URI, backend=REDIS_URI)

celery.task(name="tasks.pull_data")(pull_data)
celery.task(name="tasks.fetch_player_data")(fetch_player_data)
celery.task(name="tasks.update_weapon_info")(update_weapon_info)
celery.task(name="tasks.pull_aliases")(pull_aliases)
5 changes: 5 additions & 0 deletions src/celery_app/database.py → src/celery_app/connections.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import redis
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

from shared_lib.constants import REDIS_HOST, REDIS_PORT
from shared_lib.db import create_uri

engine = create_engine(create_uri().replace("asyncpg", "psycopg2"))
Session = scoped_session(sessionmaker(bind=engine))
redis_conn = redis.Redis(
host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True
)
Empty file.
158 changes: 158 additions & 0 deletions src/celery_app/tasks/front_page.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import logging

import numpy as np
import orjson
import pandas as pd
from sqlalchemy import text

from celery_app.connections import Session, redis_conn
from shared_lib.constants import (
ALIASES_REDIS_KEY,
MODES,
MODES_SNAKE_CASE,
REGIONS,
)
from shared_lib.queries.front_page_queries import LEADERBOARD_MAIN_QUERY
from shared_lib.utils import get_badge_image, get_banner_image, get_weapon_image

logger = logging.getLogger(__name__)


def fetch_and_store_leaderboard_data(mode: str, region_bool: bool) -> list:
"""Fetches leaderboard data from the database and stores it in Redis.
Args:
mode (str): The game mode.
region_bool (bool): Boolean indicating the region (True for 'Takoroka',
False for 'Tentatek').
Returns:
list: A list of player data dictionaries.
"""
logger.info(
"Fetching leaderboard data for mode: %s, region: %s",
mode,
"Takoroka" if region_bool else "Tentatek",
)
query = text(LEADERBOARD_MAIN_QUERY)
with Session() as session:
result = session.execute(
query, {"mode": mode, "region": region_bool}
).fetchall()
players = [{**row._asdict()} for row in result]

for player in players:
player["weapon_image"] = get_weapon_image(int(player["weapon_id"]))
player["badge_left_image"] = get_badge_image(player["badge_left_id"])
player["badge_center_image"] = get_badge_image(
player["badge_center_id"]
)
player["badge_right_image"] = get_badge_image(player["badge_right_id"])
player["nameplate_image"] = get_banner_image(
int(player["nameplate_id"])
)
player["timestamp"] = player["timestamp"].isoformat()
player["rotation_start"] = player["rotation_start"].isoformat()

# Save to Redis with a key that combines mode and region for uniqueness
redis_key = (
f"leaderboard_data:{mode}:{'Takoroka' if region_bool else 'Tentatek'}"
)
redis_conn.set(redis_key, orjson.dumps(players))
logger.info(
"Leaderboard data for mode: %s, region: %s saved to Redis",
mode,
"Takoroka" if region_bool else "Tentatek",
)
return players


def process_all_data(df: pd.DataFrame) -> list[tuple[str, pd.DataFrame]]:
"""Processes all data by filtering and aggregating it for each region.
Args:
df (pd.DataFrame): The input DataFrame containing player data.
Returns:
list[tuple[str, pd.DataFrame]]: A list of tuples, each containing a
region and its processed DataFrame.
"""
logger.info("Processing all data")
keys_to_keep = ["player_id", "x_power", "weapon_id", "mode", "region"]
df = df.loc[:, keys_to_keep]
out = []
for region in REGIONS:
logger.info(f"Processing data for region: {region}")
region_df = df.loc[df["region"] == region]
region_df = process_region_data(region_df)
for mode in MODES_SNAKE_CASE.values():
key = f"{mode}_weapon_image"
weapon_key = f"{mode}_weapon_id"
weapon_mask = region_df[weapon_key].notnull()
region_df[key] = ""
region_df.loc[weapon_mask, key] = (
region_df.loc[weapon_mask, weapon_key]
.astype(int)
.apply(get_weapon_image)
)
out.append((region, region_df))
return out


def process_region_data(df: pd.DataFrame) -> pd.DataFrame:
"""Processes data for a specific region by aggregating and sorting it.
Args:
df (pd.DataFrame): The input DataFrame containing player data for a
specific region.
Returns:
pd.DataFrame: The processed DataFrame with aggregated and sorted data.
"""
logger.info("Processing region data")
df.loc[:, "mode"] = df["mode"].map(MODES_SNAKE_CASE)

df = df.set_index(["player_id", "mode"]).unstack()
df.columns = [f"{mode}_{column}" for column, mode in df.columns]
xp_cols = [col for col in df.columns if "x_power" in col]
df["total_x_power"] = df[xp_cols].sum(axis=1)
df = df.sort_values("total_x_power", ascending=False).iloc[:500]
df["rank"] = np.arange(1, 501)

# pull from ALIASES_REDIS_KEY and get the latest alias
# for each player in the top 500
aliases = redis_conn.get(ALIASES_REDIS_KEY)
aliases = orjson.loads(aliases)
aliases_df = pd.DataFrame(aliases)
aliases_df = (
aliases_df.sort_values("last_seen", ascending=False)
.drop_duplicates(subset="player_id")
.set_index("player_id")
.drop(columns=["last_seen"])
)
df = df.join(aliases_df)
logger.info("Region data processed")
return df


def pull_data() -> None:
"""Pulls data for all modes and regions, processes it, and stores it in
Redis.
"""
logger.info("Pulling data")
dfs = []
for mode in MODES:
for region in REGIONS:
region_bool = region == "Takoroka"
players = fetch_and_store_leaderboard_data(mode, region_bool)
df = pd.DataFrame(players)
df["mode"] = mode
df["region"] = region
dfs.append(df)

for region, processed_df in process_all_data(pd.concat(dfs)):
redis_key = f"leaderboard_data:All Modes:{region}"
redis_conn.set(
redis_key, processed_df.reset_index().to_json(orient="records")
)
logger.info(f"All data for region: {region} saved to Redis")
44 changes: 44 additions & 0 deletions src/celery_app/tasks/misc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import logging

import orjson
import requests
from sqlalchemy import text

from celery_app.connections import Session, redis_conn
from shared_lib.constants import (
ALIASES_REDIS_KEY,
GAME_TRANSLATION_BASE_URL,
GAME_TRANSLATION_REDIS_KEY,
LANGUAGES,
WEAPON_INFO_REDIS_KEY,
WEAPON_INFO_URL,
)
from shared_lib.queries.misc_queries import ALIAS_QUERY

logger = logging.getLogger(__name__)


def update_weapon_info() -> None:
logging.info("Running task: update_weapon_info")
response = requests.get(WEAPON_INFO_URL)
weapon_info = orjson.loads(response.text)
language_data = {}
for language in LANGUAGES:
response = requests.get(GAME_TRANSLATION_BASE_URL % language)
language_data[language] = orjson.loads(response.text)

redis_conn.set(WEAPON_INFO_REDIS_KEY, orjson.dumps(weapon_info))
logging.info("Weapon info updated in Redis.")
redis_conn.set(GAME_TRANSLATION_REDIS_KEY, orjson.dumps(language_data))
logging.info("Weapon translations updated in Redis.")


def pull_aliases() -> None:
logging.info("Running task: fetch_aliases")
query = text(ALIAS_QUERY)
with Session() as session:
result = session.execute(query).fetchall()

aliases = [{**row._asdict()} for row in result]
redis_conn.set(ALIASES_REDIS_KEY, orjson.dumps(aliases))
logging.info("Aliases updated in Redis.")
Loading

0 comments on commit 6df0c30

Please sign in to comment.