diff --git a/bot/legends/track.py b/bot/legends/track.py index cf9510a..ec300ba 100644 --- a/bot/legends/track.py +++ b/bot/legends/track.py @@ -15,16 +15,73 @@ from .utils import get_player_responses from collections import defaultdict from utility.utils import gen_season_date, gen_raid_date, gen_games_season, gen_legend_date +from msgspec.json import decode +from msgspec import Struct +from typing import Optional, List + +class Clan(Struct): + name: str + tag: str + + def to_dict(self): + return {f: getattr(self, f) for f in self.__struct_fields__} + +class Equipment(Struct): + name: str + level: int + + def to_dict(self): + return {f: getattr(self, f) for f in self.__struct_fields__} + +class Heroes(Struct): + name: str + equipment: Optional[List[Equipment]] = list() + + def to_dict(self): + result = {f: getattr(self, f) for f in self.__struct_fields__} + for field, value in result.items(): + if isinstance(value, Struct): + result[field] = value.to_dict() # Recursively convert nested structs + elif isinstance(value, list) and all(isinstance(item, Struct) for item in value): + result[field] = [item.to_dict() for item in value] # Convert lists of structs + + return result + +class League(Struct): + name: str + + def to_dict(self): + return {f: getattr(self, f) for f in self.__struct_fields__} + +class Player(Struct): + name: str + tag: str + trophies: int + attackWins: int + defenseWins: int + heroes: List[Heroes] + equipment: Optional[List[Equipment]] = list() + clan: Optional[Clan] = None + league: Optional[League] = None + + def to_dict(self): + result = {f: getattr(self, f) for f in self.__struct_fields__} + for field, value in result.items(): + if isinstance(value, Struct): + result[field] = value.to_dict() # Recursively convert nested structs + elif isinstance(value, list) and all(isinstance(item, Struct) for item in value): + result[field] = [item.to_dict() for item in value] # Convert lists of structs + + return result async def main(): + LEGENDS_CACHE = {} config = LegendTrackingConfig() producer = KafkaProducer(bootstrap_servers=["85.10.200.219:9092"], api_version=(3, 6, 0)) db_client = MongoDatabase(stats_db_connection=config.stats_mongodb, static_db_connection=config.static_mongodb) - cache = redis.Redis(host=config.redis_ip, port=6379, db=10, password=config.redis_pw, decode_responses=False, max_connections=50, - health_check_interval=10, socket_connect_timeout=5, retry_on_timeout=True, socket_keepalive=True) keys: deque = await create_keys([config.coc_email.format(x=x) for x in range(config.min_coc_email, config.max_coc_email + 1)], [config.coc_password] * config.max_coc_email) logger.info(f"{len(keys)} keys created") @@ -47,54 +104,50 @@ async def main(): # update last updated for all the members we are checking this go around logger.info(f"LOOP {loop_spot} | Group {count}/{len(split_tags)}: {len(group)} tags") - # pull previous responses from cache + map to a dict so we can easily pull - previous_player_responses = await cache.mget(keys=group) - previous_player_responses = {tag: response for tag, response in zip(group, previous_player_responses)} # pull current responses from the api, returns (tag: str, response: bytes) # response can be bytes, "delete", and None current_player_responses = await get_player_responses(keys=keys, tags=group) logger.info(f"LOOP {loop_spot} | Group {count}: Entering Changes Loop/Pulled Responses") - pipe = cache.pipeline() legend_date = gen_legend_date() for tag, response in current_player_responses: - previous_response = previous_player_responses.get(tag) if response is None: continue if response == "delete": await db_client.player_stats.delete_one({"tag" : tag}) - await pipe.getdel(tag) + LEGENDS_CACHE.pop(tag, "gone") continue + player = decode(response, type=Player) # if None, update cache and move on - if previous_response is None: - await pipe.set(tag, response, ex=2_592_000) + previous_player: Player = LEGENDS_CACHE.get(tag) + if previous_player is None: + LEGENDS_CACHE[tag] = player continue # if the responses don't match: # - update cache - # - turn responses into dicts - # - use function to find changes & update lists of changes - if previous_response != response: - await pipe.set(tag, response, ex=2_592_000) - response = orjson.loads(response) - previous_response = orjson.loads(previous_response) - - tag = response.get("tag") - league = response.get("league", {}).get("name", "Unranked") - - if response["trophies"] != previous_response["trophies"] and response["trophies"] >= 4900 and league == "Legend League": - diff_trophies = response["trophies"] - previous_response["trophies"] - diff_attacks = response["attackWins"] - previous_response["attackWins"] - - json_data = {"types": ["legends"], "old_player": previous_response, "new_player": response, "timestamp": int(pend.now(tz=pend.UTC).timestamp())} - if response.get("clan", {}).get("tag", "None") in clan_tags: + if previous_player.trophies != player.trophies: + LEGENDS_CACHE[tag] = player + + if player.league is None: + continue + + if player.trophies >= 4900 and player.league.name == "Legend League": + json_data = {"types": ["legends"], + "old_data" : previous_player.to_dict(), + "new_data" : player.to_dict(), + "timestamp": int(pend.now(tz=pend.UTC).timestamp())} + if player.clan is not None and player.clan.tag in clan_tags: producer.send(topic="player", value=orjson.dumps(json_data), timestamp_ms=int(pend.now(tz=pend.UTC).timestamp()) * 1000) + diff_trophies = player.trophies - previous_player.trophies + diff_attacks = player.attackWins - previous_player.attackWins + if diff_trophies <= - 1: diff_trophies = abs(diff_trophies) if diff_trophies <= 100: @@ -109,10 +162,9 @@ async def main(): }}}, upsert=True)) elif diff_trophies >= 1: - heroes = response.get("heroes", []) equipment = [] - for hero in heroes: - for gear in hero.get("equipment", []): + for hero in player.heroes: + for gear in hero.equipment: equipment.append({"name": gear.get("name"), "level": gear.get("level")}) legend_changes.append( @@ -172,8 +224,8 @@ async def main(): legend_changes.append( UpdateOne({"tag": tag}, {"$set": {f"legends.streak": 0}}, upsert=True)) - if response["defenseWins"] != previous_response["defenseWins"]: - diff_defenses = response["defenseWins"] - previous_response["defenseWins"] + if player.defenseWins != previous_player.defenseWins: + diff_defenses = player.defenseWins - previous_player.defenseWins for x in range(0, diff_defenses): legend_changes.append( UpdateOne({"tag": tag}, {"$push": {f"legends.{legend_date}.defenses": 0}}, @@ -185,7 +237,6 @@ async def main(): "trophies": response["trophies"] }}}, upsert=True)) - await pipe.execute() logger.info(f"LOOP {loop_spot}: Changes Found")