diff --git a/uqcsbot/advent.py b/uqcsbot/advent.py index 2cf237d..c6b0ffb 100644 --- a/uqcsbot/advent.py +++ b/uqcsbot/advent.py @@ -1,10 +1,9 @@ import io import logging import os -from datetime import datetime, timedelta -from pytz import timezone +from datetime import datetime from random import choices -from typing import Any, Callable, Dict, Iterable, List, Optional, Literal +from typing import Callable, Dict, Iterable, List, Optional, Literal import requests from requests.exceptions import RequestException from sqlalchemy.sql.expression import and_ @@ -16,6 +15,7 @@ from uqcsbot.bot import UQCSBot from uqcsbot.models import AOCRegistrations, AOCWinners from uqcsbot.utils.err_log_utils import FatalErrorWithLog +from uqcsbot.utils.advent_utils import Member, Day, Json, InvalidHTTPSCode, ADVENT_DAYS, CACHE_TIME, parse_leaderboard_column_string, print_leaderboard # Leaderboard API URL with placeholders for year and code. LEADERBOARD_URL = "https://adventofcode.com/{year}/leaderboard/private/view/{code}.json" @@ -23,135 +23,9 @@ # UQCS leaderboard ID. UQCS_LEADERBOARD = 989288 -# Days in Advent of Code. List of numbers 1 to 25. -ADVENT_DAYS = list(range(1, 25 + 1)) - -# Puzzles are unlocked at midnight EST. -EST_TIMEZONE = timezone("US/Eastern") - -# The time to cache results to limit requests to adventofcode.com. Note that 15 minutes is the recomended minimum time. -CACHE_TIME = timedelta(minutes=15) - # The maximum time in seconds that a person can complete a challenge in. Used as a maximum value to help with sorting when someone whas not attempted a day. MAXIMUM_TIME_FOR_STAR = 365 * 24 * 60 * 60 -# type aliases for documentation purposes. -Day = int # from 1 to 25 -Star = Literal[1, 2] -Seconds = int -Times = Dict[Star, Seconds] -Delta = Optional[Seconds] -Json = Dict[str, Any] - - -class InvalidHTTPSCode(Exception): - def __init__(self, message, request_code): - super().__init__(message) - self.request_code = request_code - - -class Member: - def __init__(self, id: int, name: str, local: int, star_total: int, global_: int): - # The advent of code id - self.id = id - # The advent of code name - self.name = name - # The score of the user on the local leaderboard - self.local = local - # The total number of stars the user has collected - self.star_total = star_total - # The score of the user on the global leaderboard - self.global_ = global_ - - # All of the Times. If no stars are collected, the Times dictionary is empty. - self.times: Dict[Day, Times] = {d: {} for d in ADVENT_DAYS} - - @classmethod - def from_member_data(cls, data: Json, year: int) -> "Member": - """ - Constructs a Member from the API response. - - Times and delta are calculated for the given year and day. - """ - - member = cls( - data["id"], - data["name"], - data["local_score"], - data["stars"], - data["global_score"], - ) - - for d, day_data in data["completion_day_level"].items(): - day = int(d) - times = member.times[day] - - # timestamp of puzzle unlock, rounded to whole seconds - DAY_START = int(datetime(year, 12, day, tzinfo=EST_TIMEZONE).timestamp()) - - for s, star_data in day_data.items(): - star = int(s) - # assert is for type checking - assert star == 1 or star == 2 - times[star] = int(star_data["get_star_ts"]) - DAY_START - assert times[star] >= 0 - - return member - - def get_time_delta(self, day: Day) -> Optional[Seconds]: - """ - Returns the number of seconds between the completion of the second star from the first, or None if the second star have not been completed. - """ - if len(self.times[day]) == 2: - return self.times[day][2] - self.times[day][1] - return None - - def attempted_day(self, day: Day) -> bool: - """ - Returns if a member completed at least the first star in the day - """ - return len(self.times[day]) >= 1 - - def get_total_star1_time(self, default: int = 0) -> int: - """ - Returns the total time working on just star 1 for all challenges in a year. - The argument default determines the returned value if the total is 0. - """ - total = sum(self.times[day].get(1, 0) for day in ADVENT_DAYS) - return total if total != 0 else default - - def get_total_star2_time(self, default: int = 0) -> int: - """ - Returns the total time working on just star 2 for all challenges in a year. - The argument default determines the returned value if the total is 0. - """ - total = sum(self.times[day].get(2, 0) for day in ADVENT_DAYS) - return total if total != 0 else default - - def get_total_time(self, default: int = 0) -> int: - """ - Returns the total time working on stars 1 and 2 for all challenges in a year. - The argument default determines the returned value if the total is 0. - """ - total = self.get_total_star1_time() + self.get_total_star2_time() - return total if total != 0 else default - - def get_discord_userid(self, bot: UQCSBot) -> Optional[int]: - """ - Return the discord userid of this AOC member if one is registered in the database. - """ - db_session = bot.create_db_session() - registration = ( - db_session.query(AOCRegistrations) - .filter(AOCRegistrations.aoc_userid == self.id) - .one_or_none() - ) - db_session.close() - if registration: - return registration.discord_userid - return None - - # --- Sorting Methods & Related Leaderboards --- # Star 1 Time: Time for just getting star 1. For the monthly leaderboard, this will be the total time spent on star 1 across all problems. @@ -268,6 +142,23 @@ class Advent(commands.Cog): /advent remove-winner - Remove a winner for the database """ + advent_command_group = app_commands.Group( + name="advent", description="Commands for Advent of Code" + ) + + Command = Literal[ + "help", + "leaderboard", + "register", + "register-force", + "unregister", + "unregister-force", + "previous-winners", + "new-winner", + "remove-winner", + "leaderboard_style", + ] + def __init__(self, bot: UQCSBot): self.bot = bot self.bot.schedule_task( @@ -301,22 +192,156 @@ def __init__(self, bot: UQCSBot): bot, "Unable to find AoC session ID. Not loading advent cog." ) - advent_command_group = app_commands.Group( - name="advent", description="Commands for Advent of Code" - ) + def _get_leaderboard_json(self, year: int, code: int) -> Json: + """ + Returns a json dump of the leaderboard + """ + try: + response = requests.get( + LEADERBOARD_URL.format(year=year, code=code), + cookies={"session": self.session_id}, + ) + except RequestException as exception: + raise FatalErrorWithLog( + self.bot, + f"Could not get the leaderboard from Advent of Code. For more information {exception}", + ) + if response.status_code != 200: + raise InvalidHTTPSCode( + "Expected a HTTPS status code of 200.", response.status_code + ) + try: + return response.json() + except ValueError as exception: # json.JSONDecodeError + raise FatalErrorWithLog( + self.bot, + f"Could not interpret the JSON from Advent of Code (AOC). This suggests that AOC no longer provides JSON or something went very wrong. For more information: {exception}", + ) + + def _get_members( + self, year: int, code: int = UQCS_LEADERBOARD, force_refresh: bool = False + ): + """ + Returns the list of members in the leaderboard for the given year and leaderboard code. + It will attempt to retrieve from a cache if 15 minutes has not passed. + This can be overriden by setting force refresh. + """ + if ( + force_refresh + or (datetime.now() - self.last_reload_time >= CACHE_TIME) + or year not in self.members_cache + ): + leaderboard = self._get_leaderboard_json(year, code) + self.members_cache[year] = [ + Member.from_member_data(data, year) + for data in leaderboard["members"].values() + ] + return self.members_cache[year] + + def _get_registrations(self, year: int) -> Iterable[AOCRegistrations]: + """ + Get all registrations linking an AOC id to a discord account. + """ + db_session = self.bot.create_db_session() + registrations = db_session.query(AOCRegistrations).filter( + AOCRegistrations.year == year + ) + db_session.commit() + db_session.close() + return registrations + + async def reminder_fifteen_minutes(self): + """ + The function used within the AOC reminder 15 minutes before each challenge starts. + """ + channel = discord.utils.get( + self.bot.uqcs_server.channels, name=self.bot.AOC_CNAME + ) + if channel is None: + logging.warning(f"Could not find required channel #{self.bot.AOC_CNAME}") + return + if not isinstance(channel, discord.TextChannel): + logging.warning( + f"Channel #{self.bot.AOC_CNAME} was expected to be a text channel, but was not" + ) + return + role = discord.utils.get(self.bot.uqcs_server.roles, name=self.bot.AOC_ROLE) + if role is None: + logging.warning(f"The role @{self.bot.AOC_ROLE} could not be found for an Advent of Code puzzle pre-release ping.") + # Still return a message, as it is better to message and not ping than to not message at all. + ping = "" + else: + ping = f"{role.mention} " + await channel.send(f"{ping}Today's Advent of Code puzzle is released in 15 minutes.") + + async def reminder_released(self): + """ + The function used within the AOC reminder when each challenge starts. + """ + channel = discord.utils.get( + self.bot.uqcs_server.channels, name=self.bot.AOC_CNAME + ) + if channel is None: + logging.warning(f"Could not find required channel #{self.bot.AOC_CNAME}") + return + if not isinstance(channel, discord.TextChannel): + logging.warning( + f"Channel #{self.bot.AOC_CNAME} was expected to be a text channel, but was not" + ) + return + role = discord.utils.get(self.bot.uqcs_server.roles, name=self.bot.AOC_ROLE) + if role is None: + logging.warning(f"The role @{self.bot.AOC_ROLE} could not be found for an Advent of Code puzzle release ping.") + # Still return a message, as it is better to message and not ping than to not message at all. + ping = "" + else: + ping = f"{role.mention} " + await channel.send( + f"{ping}Today's Advent of Code puzzle has been released. Good luck!" + ) + + def _get_previous_winner_aoc_ids(self, year: int) -> List[int]: + """ + Returns a list of all winner aoc ids for a year + """ + db_session = self.bot.create_db_session() + prev_winners = db_session.query(AOCWinners).filter(AOCWinners.year == year) + db_session.commit() + db_session.close() + + return [winner.aoc_userid for winner in prev_winners] + + def _add_winners(self, winners: List[Member], year: int, prize: str): + """ + Add all members within the list to the database + """ + for winner in winners: + db_session = self.bot.create_db_session() + db_session.add( + AOCWinners(aoc_userid=winner.id, year=year, prize=prize) + ) + db_session.commit() + db_session.close() + + def _random_choices_without_repition( + self, population: List[Member], weights: List[int], k: int + ) -> List[Member]: + """ + Selects k people from a list of members, weighted by weights. + The weight of a person is like how many tickets they have for the lottery. + """ + result: List[Member] = [] + for _ in range(k): + if sum(weights) == 0: + return [] + + result.append(choices(population, weights)[0]) + index = population.index(result[-1]) + population.pop(index) + weights.pop(index) + + return result - Command = Literal[ - "help", - "leaderboard", - "register", - "register-force", - "unregister", - "unregister-force", - "previous-winners", - "new-winner", - "remove-winner", - "leaderboard_style", - ] @advent_command_group.command(name="help") @app_commands.describe(command="The command you want to view help about.") @@ -506,8 +531,8 @@ async def leaderboard_command( scoreboard_file = io.BytesIO( bytes( - _print_leaderboard( - _parse_leaderboard_column_string(leaderboard_style, self.bot), + print_leaderboard( + parse_leaderboard_column_string(leaderboard_style, self.bot), members, day, ), @@ -535,7 +560,6 @@ async def register_command(self, interaction: discord.Interaction, aoc_name: str # TODO: Check UQCS membership await interaction.response.defer(thinking=True) - id = self._get_unused_registration_id() db_session = self.bot.create_db_session() year = datetime.now().year @@ -592,7 +616,7 @@ async def register_command(self, interaction: discord.Interaction, aoc_name: str db_session.add( AOCRegistrations( - id=id, aoc_userid=AOC_id, year=year, discord_userid=discord_id + aoc_userid=AOC_id, year=year, discord_userid=discord_id ) ) db_session.commit() @@ -631,7 +655,6 @@ async def register_admin_command( await interaction.response.defer(thinking=True) - id = self._get_unused_registration_id() db_session = self.bot.create_db_session() if aoc_name: @@ -671,7 +694,7 @@ async def register_admin_command( db_session.add( AOCRegistrations( - id=id, aoc_userid=aoc_id, year=year, discord_userid=discord_id + aoc_userid=aoc_id, year=year, discord_userid=discord_id ) ) db_session.commit() @@ -978,422 +1001,6 @@ async def remove_winner_command(self, interaction: discord.Interaction, id: int) content=f"Removed the winners entry with id {id}." ) - def _get_leaderboard_json(self, year: int, code: int) -> Json: - """ - Returns a json dump of the leaderboard - """ - try: - response = requests.get( - LEADERBOARD_URL.format(year=year, code=code), - cookies={"session": self.session_id}, - ) - except RequestException as exception: - raise FatalErrorWithLog( - self.bot, - f"Could not get the leaderboard from Advent of Code. For more information {exception}", - ) - if response.status_code != 200: - raise InvalidHTTPSCode( - "Expected a HTTPS status code of 200.", response.status_code - ) - try: - return response.json() - except ValueError as exception: # json.JSONDecodeError - raise FatalErrorWithLog( - self.bot, - f"Could not interpret the JSON from Advent of Code (AOC). This suggests that AOC no longer provides JSON or something went very wrong. For more information: {exception}", - ) - - def _get_members( - self, year: int, code: int = UQCS_LEADERBOARD, force_refresh: bool = False - ): - """ - Returns the list of members in the leaderboard for the given year and leaderboard code. - It will attempt to retrieve from a cache if 15 minutes has not passed. - This can be overriden by setting force refresh. - """ - if ( - force_refresh - or (datetime.now() - self.last_reload_time >= CACHE_TIME) - or year not in self.members_cache - ): - leaderboard = self._get_leaderboard_json(year, code) - self.members_cache[year] = [ - Member.from_member_data(data, year) - for data in leaderboard["members"].values() - ] - return self.members_cache[year] - - def _get_registrations(self, year: int) -> Iterable[AOCRegistrations]: - """ - Get all registrations linking an AOC id to a discord account. - """ - db_session = self.bot.create_db_session() - registrations = db_session.query(AOCRegistrations).filter( - AOCRegistrations.year == year - ) - db_session.commit() - db_session.close() - return registrations - - async def reminder_fifteen_minutes(self): - """ - The function used within the AOC reminder 15 minutes before each challenge starts. - """ - channel = discord.utils.get( - self.bot.uqcs_server.channels, name=self.bot.AOC_CNAME - ) - if channel is None: - logging.warning(f"Could not find required channel #{self.bot.AOC_CNAME}") - return - if not isinstance(channel, discord.TextChannel): - logging.warning( - f"Channel #{self.bot.AOC_CNAME} was expected to be a text channel, but was not" - ) - return - await channel.send("Today's Advent of Code puzzle is released in 15 minutes.") - - async def reminder_released(self): - """ - The function used within the AOC reminder when each challenge starts. - """ - channel = discord.utils.get( - self.bot.uqcs_server.channels, name=self.bot.AOC_CNAME - ) - if channel is None: - logging.warning(f"Could not find required channel #{self.bot.AOC_CNAME}") - return - if not isinstance(channel, discord.TextChannel): - logging.warning( - f"Channel #{self.bot.AOC_CNAME} was expected to be a text channel, but was not" - ) - return - await channel.send( - "Today's Advent of Code puzzle has been released. Good luck!" - ) - - def _get_previous_winner_aoc_ids(self, year: int) -> List[int]: - """ - Returns a list of all winner aoc ids for a year - """ - db_session = self.bot.create_db_session() - prev_winners = db_session.query(AOCWinners).filter(AOCWinners.year == year) - db_session.commit() - db_session.close() - - return [winner.aoc_userid for winner in prev_winners] - - def _add_winners(self, winners: List[Member], year: int, prize: str): - """ - Add all members within the list to the database - """ - for winner in winners: - id = self._get_unused_winner_id() - db_session = self.bot.create_db_session() - db_session.add( - AOCWinners(id=id, aoc_userid=winner.id, year=year, prize=prize) - ) - db_session.commit() - db_session.close() - - def _random_choices_without_repition( - self, population: List[Member], weights: List[int], k: int - ) -> List[Member]: - """ - Selects k people from a list of members, weighted by weights. - The weight of a person is like how many tickets they have for the lottery. - """ - result: List[Member] = [] - for _ in range(k): - if sum(weights) == 0: - return [] - - result.append(choices(population, weights)[0]) - index = population.index(result[-1]) - population.pop(index) - weights.pop(index) - - return result - - def _get_unused_winner_id(self) -> int: - """Returns a AOCWinner id that is not currently in use""" - db_session = self.bot.create_db_session() - prev_winners = db_session.query(AOCWinners) - db_session.commit() - db_session.close() - winner_ids = [winner.id for winner in prev_winners] - i = 1 - while (id := i) in winner_ids: - i += 1 - return id - - def _get_unused_registration_id(self) -> int: - """Returns a AOCRegistration id that is not currently in use""" - db_session = self.bot.create_db_session() - prev_registrations = db_session.query(AOCRegistrations) - db_session.commit() - db_session.close() - registration_ids = [registration.id for registration in prev_registrations] - i = 1 - while (id := i) in registration_ids: - i += 1 - return id - - -class LeaderboardColumn: - """ - A column in a leaderboard. The title is the name of the column as 2 lines and the calculation is a function that determines what is printed for a given member, index and day. The title and calculation should have the same constant width. - """ - - def __init__( - self, - title: tuple[str, str], - calculation: Callable[[Member, int, Optional[Day]], str], - ): - self.title = title - self.calculation = calculation - - @staticmethod - def ordering_column(): - """ - A column used at the right of leaderboards to indicate the overall order. Of the format "XXX)" where XXX is a left padded number of 3 characters. - """ - return LeaderboardColumn( - title=(" " * 4, " " * 4), # Empty spaces, as this does not need a heading - calculation=lambda _, index, __: f"{index:>3})", - ) - - @staticmethod - def star1_column(): - """ - A column indicating the time taken to achieve the first star. Of the format "hh:mm:ss" or ">24h". Only applicable for particular days. - """ - return LeaderboardColumn( - title=(" " * 8, " Star 1 "), - calculation=lambda member, _, day: f"{_format_seconds(member.times[day].get(1, 0)) if day else '':>8}", - ) - - @staticmethod - def star2_column(): - """ - A column indicating the time taken to achieve only the second star. Of the format "hh:mm:ss" or ">24h". Only applicable for particular days. - """ - return LeaderboardColumn( - title=(" " * 8, " Star 2 "), - calculation=lambda member, _, day: f"{_format_seconds(member.get_time_delta(day)) if day else '':>8}", - ) - - @staticmethod - def star1_and_2_column(): - """ - A column indicating the time taken to achieve both stars. Of the format "hh:mm:ss" or ">24h". Only applicable for particular days. - """ - return LeaderboardColumn( - title=(" " * 10, "Both Stars"), - calculation=lambda member, _, day: f"{_format_seconds(member.times[day].get(2, 0)) if day else '':>10}", - ) - - @staticmethod - def total_time_column(): - """ - A column indicating the total time the user has spent on all stars. Of the format "hhhh:mm:ss" or ">30 days". - """ - return LeaderboardColumn( - title=(" " * 10, "Total Time"), - calculation=lambda member, _, __: f"{_format_seconds_long(member.get_total_time()):>10}", - ) - - @staticmethod - def total_star1_time_column(): - """ - A column indicating the total time the user has spent on first stars. Of the format "hhhh:mm:ss" or ">30 days". - """ - return LeaderboardColumn( - title=("Total Star", " 1 Time "), - calculation=lambda member, _, __: f"{_format_seconds_long(member.get_total_star1_time()):>10}", - ) - - @staticmethod - def total_star2_time_column(): - """ - A column indicating the total time the user has spent on second stars. Of the format "hhhh:mm:ss" or ">30 days". - """ - return LeaderboardColumn( - title=("Total Star", " 2 Time "), - calculation=lambda member, _, __: f"{_format_seconds_long(member.get_total_star2_time()):>10}", - ) - - @staticmethod - def stars_column(): - """ - A column indicating the total number of stars a user has. Of the format of a 5 character right-padded number. - """ - return LeaderboardColumn( - title=("Total", "Stars"), - calculation=lambda member, _, __: f"{member.star_total if member.star_total else '':>5}", - ) - - @staticmethod - def local_rank_column(): - """ - A column indicating the members local rank (of the UQCS leaderboard). Of the format of a 5 character right-padded number. - """ - return LeaderboardColumn( - title=("Local", "Order"), - calculation=lambda member, _, __: f"{member.local if member.local else '':>5}", - ) - - @staticmethod - def global_score_column(): - """ - A column indicating the members global score. Of the format of a 5 character right-padded number. - """ - return LeaderboardColumn( - title=("Global", "Score "), - calculation=lambda member, _, __: f"{member.global_ if member.global_ else '':>6}", - ) - - @staticmethod - def star_bar_column(): - """ - A column with a progressbar of the stars that each person has. - """ - return LeaderboardColumn( - title=(" " * 9 + "1" * 10 + "2" * 6, "1234567890123456789012345"), - calculation=lambda member, _, __: _get_member_star_progress_bar(member), - ) - - @staticmethod - def name_column(bot: UQCSBot): - """ - A column listing each name. - """ - - def format_name(member: Member, _: int, __: Optional[int]) -> str: - if not (discord_userid := member.get_discord_userid(bot)): - return member.name - if not (discord_user := bot.uqcs_server.get_member(discord_userid)): - return member.name - # Don't actually ping as leaderboard is called many times - return f"{member.name} (@{discord_user.display_name})" - - return LeaderboardColumn(title=("", ""), calculation=format_name) - - @staticmethod - def padding_column(): - """ - A column that is of a single space character. - """ - return LeaderboardColumn(title=(" ", " "), calculation=lambda _, __, ___: " ") - - -def _parse_leaderboard_column_string(s: str, bot: UQCSBot) -> List[LeaderboardColumn]: - """ - Create a list of columns corresponding to the given string. The characters in the string can be: - # - Provides a column of the form "XXX)" telling the order for the given leaderboard - 1 - The time for star 1 for the specific day (daily leaderboards only) - 2 - The time for star 2 for the specific day (daily leaderboards only) - 3 - The time for both stars for the specific day (dayly leaderboards only) - ! - The total time spent on first stars for the whole competition - @ - The total time spent on second stars for the whole competition - T - The total time spent overall for the whole competition - * - The total number of stars for the whole competition - L - The local ranking someone has within the UQCS leaderboard - G - The global score someone has - B - A progress bar of the stars each person has - space - A padding column of a single character - All other characters will be ignored - """ - columns: List[LeaderboardColumn] = [] - for c in s: - match c: - case "#": - columns.append(LeaderboardColumn.ordering_column()) - case "1": - columns.append(LeaderboardColumn.star1_column()) - case "2": - columns.append(LeaderboardColumn.star2_column()) - case "3": - columns.append(LeaderboardColumn.star1_and_2_column()) - case "!": - columns.append(LeaderboardColumn.total_star1_time_column()) - case "@": - columns.append(LeaderboardColumn.total_star2_time_column()) - case "T": - columns.append(LeaderboardColumn.total_time_column()) - case "*": - columns.append(LeaderboardColumn.stars_column()) - case "L": - columns.append(LeaderboardColumn.local_rank_column()) - case "G": - columns.append(LeaderboardColumn.global_score_column()) - case "B": - columns.append(LeaderboardColumn.star_bar_column()) - case " ": - columns.append(LeaderboardColumn.padding_column()) - case _: - pass - columns.append(LeaderboardColumn.padding_column()) - columns.append(LeaderboardColumn.name_column(bot)) - return columns - - -def _star_char(num_stars: int): - """ - Given a number of stars (0, 1, or 2), returns its leaderboard - representation. - """ - return " .*"[num_stars] - - -def _format_seconds(seconds: Optional[int]): - """ - Format seconds into the format "hh:mm:ss" or ">24h". - """ - if seconds is None or seconds == 0: - return "" - delta = timedelta(seconds=seconds) - if delta > timedelta(hours=24): - return ">24h" - return str(delta) - - -def _format_seconds_long(seconds: Optional[int]): - """ - Format seconds into the format "hhhh:mm:ss" or ">30 days". - """ - if seconds is None or seconds == 0: - return "-" - hours, remainder = divmod(seconds, 3600) - minutes, seconds = divmod(remainder, 60) - if hours >= 30 * 24: - return ">30 days" - return f"{hours}:{minutes:02}:{seconds:02}" - - -def _get_member_star_progress_bar(member: Member): - return "".join(_star_char(len(member.times[day])) for day in ADVENT_DAYS) - - -def _print_leaderboard( - columns: List[LeaderboardColumn], members: List[Member], day: Optional[Day] -): - """ - Returns a string of the leaderboard of the given format. - """ - leaderboard = "".join(column.title[0] for column in columns) - leaderboard += "\n" - leaderboard += "".join(column.title[1] for column in columns) - - # Note that leaderboards start at 1, not 0 - for id, member in enumerate(members, start=1): - leaderboard += "\n" - leaderboard += "".join( - column.calculation(member, id, day) for column in columns - ) - - return leaderboard - async def setup(bot: UQCSBot): cog = Advent(bot) diff --git a/uqcsbot/bot.py b/uqcsbot/bot.py index 3a1af1c..77887a9 100644 --- a/uqcsbot/bot.py +++ b/uqcsbot/bot.py @@ -30,6 +30,7 @@ def __init__(self, *args: Any, **kwargs: Any): self.ADMIN_ALERTS_CNAME = "admin-alerts" self.GENERAL_CNAME = "general" self.AOC_CNAME = "contests" + self.AOC_ROLE = "CPG" self.BOT_TIMEZONE = timezone("Australia/Brisbane") self.uqcs_server: discord.Guild diff --git a/uqcsbot/models.py b/uqcsbot/models.py index ab8227a..4d7f884 100644 --- a/uqcsbot/models.py +++ b/uqcsbot/models.py @@ -19,7 +19,7 @@ class Base(DeclarativeBase): class AOCWinners(Base): __tablename__ = "aoc_winners" - id: Mapped[int] = mapped_column("id", BigInteger, primary_key=True, nullable=False) + id: Mapped[int] = mapped_column("id", BigInteger, primary_key=True, nullable=False, autoincrement=True) aoc_userid: Mapped[int] = mapped_column("aoc_userid", Integer, nullable=False) year: Mapped[int] = mapped_column("year", Integer, nullable=False) prize: Mapped[str] = mapped_column("prize", String, nullable=True) @@ -28,7 +28,7 @@ class AOCWinners(Base): class AOCRegistrations(Base): __tablename__ = "aoc_registrations" - id: Mapped[int] = mapped_column("id", BigInteger, primary_key=True, nullable=False) + id: Mapped[int] = mapped_column("id", BigInteger, primary_key=True, nullable=False, autoincrement=True) aoc_userid: Mapped[int] = mapped_column("aoc_userid", Integer, nullable=False) year: Mapped[int] = mapped_column("year", Integer, nullable=False) discord_userid: Mapped[int] = mapped_column( diff --git a/uqcsbot/utils/advent_utils.py b/uqcsbot/utils/advent_utils.py new file mode 100644 index 0000000..ee7e081 --- /dev/null +++ b/uqcsbot/utils/advent_utils.py @@ -0,0 +1,384 @@ +from typing import Any, List, Literal, Dict, Optional, Callable +from datetime import datetime, timedelta +from pytz import timezone + +from uqcsbot.bot import UQCSBot +from uqcsbot.models import AOCRegistrations + +# Days in Advent of Code. List of numbers 1 to 25. +ADVENT_DAYS = list(range(1, 25 + 1)) + +# type aliases for documentation purposes. +Day = int # from 1 to 25 +Star = Literal[1, 2] +Seconds = int +Times = Dict[Star, Seconds] +Delta = Optional[Seconds] +Json = Dict[str, Any] + +# Puzzles are unlocked at midnight EST. +EST_TIMEZONE = timezone("US/Eastern") + +# The time to cache results to limit requests to adventofcode.com. Note that 15 minutes is the recomended minimum time. +CACHE_TIME = timedelta(minutes=15) + +class InvalidHTTPSCode(Exception): + def __init__(self, message, request_code): + super().__init__(message) + self.request_code = request_code + + +class Member: + def __init__(self, id: int, name: str, local: int, star_total: int, global_: int): + # The advent of code id + self.id = id + # The advent of code name + self.name = name + # The score of the user on the local leaderboard + self.local = local + # The total number of stars the user has collected + self.star_total = star_total + # The score of the user on the global leaderboard + self.global_ = global_ + + # All of the Times. If no stars are collected, the Times dictionary is empty. + self.times: Dict[Day, Times] = {d: {} for d in ADVENT_DAYS} + + @classmethod + def from_member_data(cls, data: Json, year: int) -> "Member": + """ + Constructs a Member from the API response. + + Times and delta are calculated for the given year and day. + """ + + member = cls( + data["id"], + data["name"], + data["local_score"], + data["stars"], + data["global_score"], + ) + + for d, day_data in data["completion_day_level"].items(): + day = int(d) + times = member.times[day] + + # timestamp of puzzle unlock, rounded to whole seconds + DAY_START = int(datetime(year, 12, day, tzinfo=EST_TIMEZONE).timestamp()) + + for s, star_data in day_data.items(): + star = int(s) + # assert is for type checking + assert star == 1 or star == 2 + times[star] = int(star_data["get_star_ts"]) - DAY_START + assert times[star] >= 0 + + return member + + def get_time_delta(self, day: Day) -> Optional[Seconds]: + """ + Returns the number of seconds between the completion of the second star from the first, or None if the second star have not been completed. + """ + if len(self.times[day]) == 2: + return self.times[day][2] - self.times[day][1] + return None + + def attempted_day(self, day: Day) -> bool: + """ + Returns if a member completed at least the first star in the day + """ + return len(self.times[day]) >= 1 + + def get_total_star1_time(self, default: int = 0) -> int: + """ + Returns the total time working on just star 1 for all challenges in a year. + The argument default determines the returned value if the total is 0. + """ + total = sum(self.times[day].get(1, 0) for day in ADVENT_DAYS) + return total if total != 0 else default + + def get_total_star2_time(self, default: int = 0) -> int: + """ + Returns the total time working on just star 2 for all challenges in a year. + The argument default determines the returned value if the total is 0. + """ + total = sum(self.times[day].get(2, 0) for day in ADVENT_DAYS) + return total if total != 0 else default + + def get_total_time(self, default: int = 0) -> int: + """ + Returns the total time working on stars 1 and 2 for all challenges in a year. + The argument default determines the returned value if the total is 0. + """ + total = self.get_total_star1_time() + self.get_total_star2_time() + return total if total != 0 else default + + def get_discord_userid(self, bot: UQCSBot) -> Optional[int]: + """ + Return the discord userid of this AOC member if one is registered in the database. + """ + db_session = bot.create_db_session() + registration = ( + db_session.query(AOCRegistrations) + .filter(AOCRegistrations.aoc_userid == self.id) + .one_or_none() + ) + db_session.close() + if registration: + return registration.discord_userid + return None + + +def _star_char(num_stars: int): + """ + Given a number of stars (0, 1, or 2), returns its leaderboard + representation. + """ + return " .*"[num_stars] + + +def _format_seconds(seconds: Optional[int]): + """ + Format seconds into the format "hh:mm:ss" or ">24h". + """ + if seconds is None or seconds == 0: + return "" + delta = timedelta(seconds=seconds) + if delta > timedelta(hours=24): + return ">24h" + return str(delta) + + +def _format_seconds_long(seconds: Optional[int]): + """ + Format seconds into the format "hhhh:mm:ss" or ">30 days". + """ + if seconds is None or seconds == 0: + return "-" + hours, remainder = divmod(seconds, 3600) + minutes, seconds = divmod(remainder, 60) + if hours >= 30 * 24: + return ">30 days" + return f"{hours}:{minutes:02}:{seconds:02}" + + +def _get_member_star_progress_bar(member: Member): + return "".join(_star_char(len(member.times[day])) for day in ADVENT_DAYS) + +class LeaderboardColumn: + """ + A column in a leaderboard. The title is the name of the column as 2 lines and the calculation is a function that determines what is printed for a given member, index and day. The title and calculation should have the same constant width. + """ + + def __init__( + self, + title: tuple[str, str], + calculation: Callable[[Member, int, Optional[Day]], str], + ): + self.title = title + self.calculation = calculation + + @staticmethod + def ordering_column(): + """ + A column used at the right of leaderboards to indicate the overall order. Of the format "XXX)" where XXX is a left padded number of 3 characters. + """ + return LeaderboardColumn( + title=(" " * 4, " " * 4), # Empty spaces, as this does not need a heading + calculation=lambda _, index, __: f"{index:>3})", + ) + + @staticmethod + def star1_column(): + """ + A column indicating the time taken to achieve the first star. Of the format "hh:mm:ss" or ">24h". Only applicable for particular days. + """ + return LeaderboardColumn( + title=(" " * 8, " Star 1 "), + calculation=lambda member, _, day: f"{_format_seconds(member.times[day].get(1, 0)) if day else '':>8}", + ) + + @staticmethod + def star2_column(): + """ + A column indicating the time taken to achieve only the second star. Of the format "hh:mm:ss" or ">24h". Only applicable for particular days. + """ + return LeaderboardColumn( + title=(" " * 8, " Star 2 "), + calculation=lambda member, _, day: f"{_format_seconds(member.get_time_delta(day)) if day else '':>8}", + ) + + @staticmethod + def star1_and_2_column(): + """ + A column indicating the time taken to achieve both stars. Of the format "hh:mm:ss" or ">24h". Only applicable for particular days. + """ + return LeaderboardColumn( + title=(" " * 10, "Both Stars"), + calculation=lambda member, _, day: f"{_format_seconds(member.times[day].get(2, 0)) if day else '':>10}", + ) + + @staticmethod + def total_time_column(): + """ + A column indicating the total time the user has spent on all stars. Of the format "hhhh:mm:ss" or ">30 days". + """ + return LeaderboardColumn( + title=(" " * 10, "Total Time"), + calculation=lambda member, _, __: f"{_format_seconds_long(member.get_total_time()):>10}", + ) + + @staticmethod + def total_star1_time_column(): + """ + A column indicating the total time the user has spent on first stars. Of the format "hhhh:mm:ss" or ">30 days". + """ + return LeaderboardColumn( + title=("Total Star", " 1 Time "), + calculation=lambda member, _, __: f"{_format_seconds_long(member.get_total_star1_time()):>10}", + ) + + @staticmethod + def total_star2_time_column(): + """ + A column indicating the total time the user has spent on second stars. Of the format "hhhh:mm:ss" or ">30 days". + """ + return LeaderboardColumn( + title=("Total Star", " 2 Time "), + calculation=lambda member, _, __: f"{_format_seconds_long(member.get_total_star2_time()):>10}", + ) + + @staticmethod + def stars_column(): + """ + A column indicating the total number of stars a user has. Of the format of a 5 character right-padded number. + """ + return LeaderboardColumn( + title=("Total", "Stars"), + calculation=lambda member, _, __: f"{member.star_total if member.star_total else '':>5}", + ) + + @staticmethod + def local_rank_column(): + """ + A column indicating the members local rank (of the UQCS leaderboard). Of the format of a 5 character right-padded number. + """ + return LeaderboardColumn( + title=("Local", "Order"), + calculation=lambda member, _, __: f"{member.local if member.local else '':>5}", + ) + + @staticmethod + def global_score_column(): + """ + A column indicating the members global score. Of the format of a 5 character right-padded number. + """ + return LeaderboardColumn( + title=("Global", "Score "), + calculation=lambda member, _, __: f"{member.global_ if member.global_ else '':>6}", + ) + + @staticmethod + def star_bar_column(): + """ + A column with a progressbar of the stars that each person has. + """ + return LeaderboardColumn( + title=(" " * 9 + "1" * 10 + "2" * 6, "1234567890123456789012345"), + calculation=lambda member, _, __: _get_member_star_progress_bar(member), + ) + + @staticmethod + def name_column(bot: UQCSBot): + """ + A column listing each name. + """ + + def format_name(member: Member, _: int, __: Optional[int]) -> str: + if not (discord_userid := member.get_discord_userid(bot)): + return member.name + if not (discord_user := bot.uqcs_server.get_member(discord_userid)): + return member.name + # Don't actually ping as leaderboard is called many times + return f"{member.name} (@{discord_user.display_name})" + + return LeaderboardColumn(title=("", ""), calculation=format_name) + + @staticmethod + def padding_column(): + """ + A column that is of a single space character. + """ + return LeaderboardColumn(title=(" ", " "), calculation=lambda _, __, ___: " ") + + +def parse_leaderboard_column_string(s: str, bot: UQCSBot) -> List[LeaderboardColumn]: + """ + Create a list of columns corresponding to the given string. The characters in the string can be: + # - Provides a column of the form "XXX)" telling the order for the given leaderboard + 1 - The time for star 1 for the specific day (daily leaderboards only) + 2 - The time for star 2 for the specific day (daily leaderboards only) + 3 - The time for both stars for the specific day (dayly leaderboards only) + ! - The total time spent on first stars for the whole competition + @ - The total time spent on second stars for the whole competition + T - The total time spent overall for the whole competition + * - The total number of stars for the whole competition + L - The local ranking someone has within the UQCS leaderboard + G - The global score someone has + B - A progress bar of the stars each person has + space - A padding column of a single character + All other characters will be ignored + """ + columns: List[LeaderboardColumn] = [] + for c in s: + match c: + case "#": + columns.append(LeaderboardColumn.ordering_column()) + case "1": + columns.append(LeaderboardColumn.star1_column()) + case "2": + columns.append(LeaderboardColumn.star2_column()) + case "3": + columns.append(LeaderboardColumn.star1_and_2_column()) + case "!": + columns.append(LeaderboardColumn.total_star1_time_column()) + case "@": + columns.append(LeaderboardColumn.total_star2_time_column()) + case "T": + columns.append(LeaderboardColumn.total_time_column()) + case "*": + columns.append(LeaderboardColumn.stars_column()) + case "L": + columns.append(LeaderboardColumn.local_rank_column()) + case "G": + columns.append(LeaderboardColumn.global_score_column()) + case "B": + columns.append(LeaderboardColumn.star_bar_column()) + case " ": + columns.append(LeaderboardColumn.padding_column()) + case _: + pass + columns.append(LeaderboardColumn.padding_column()) + columns.append(LeaderboardColumn.name_column(bot)) + return columns + +def print_leaderboard( + columns: List[LeaderboardColumn], members: List[Member], day: Optional[Day] +): + """ + Returns a string of the leaderboard of the given format. + """ + leaderboard = "".join(column.title[0] for column in columns) + leaderboard += "\n" + leaderboard += "".join(column.title[1] for column in columns) + + # Note that leaderboards start at 1, not 0 + for id, member in enumerate(members, start=1): + leaderboard += "\n" + leaderboard += "".join( + column.calculation(member, id, day) for column in columns + ) + + return leaderboard +