diff --git a/examples/echo_bot.py b/examples/echo_bot.py index 7a2384d..2ccceb8 100755 --- a/examples/echo_bot.py +++ b/examples/echo_bot.py @@ -41,7 +41,7 @@ def __init__(self, creds: dict): android_id = creds['android_id'] node = creds.get('node') # If you don't know it, set it to None - self.client = KikClient(self, username, password, node, device_id=device_id, android_id=android_id, logging=True) + self.client = KikClient(self, username, str(password), node, device_id=device_id, android_id=android_id, enable_logging=True) self.client.wait_for_messages() # Initialization and Authentication diff --git a/kik_unofficial/callbacks.py b/kik_unofficial/callbacks.py index 5109212..6ef5e89 100644 --- a/kik_unofficial/callbacks.py +++ b/kik_unofficial/callbacks.py @@ -3,12 +3,12 @@ from kik_unofficial.datatypes.xmpp.account import GetMyProfileResponse from kik_unofficial.datatypes.xmpp.chatting import * from kik_unofficial.datatypes.xmpp.errors import LoginError, SignUpError -from kik_unofficial.datatypes.xmpp.login import LoginResponse, ConnectionFailedResponse, CaptchaElement +from kik_unofficial.datatypes.xmpp.login import LoginResponse, ConnectionFailedResponse, CaptchaElement, TempBanElement from kik_unofficial.datatypes.xmpp.roster import FetchRosterResponse, PeersInfoResponse, GroupSearchResponse from kik_unofficial.datatypes.xmpp.sign_up import RegisterResponse, UsernameUniquenessResponse from kik_unofficial.datatypes.xmpp.xiphias import UsersResponse, UsersByAliasResponse from kik_unofficial.datatypes.xmpp.history import HistoryResponse - +from kik_unofficial.datatypes.xmpp.chatting import KikPongResponse class KikClientCallback: @@ -147,3 +147,17 @@ def on_xiphias_get_users_response(self, response: Union[UsersResponse, UsersByAl def on_card_received(self, response: IncomingCardMessage): pass + + def on_pong(self, response: KikPongResponse): + """ + Gets called when the kik server sends a pong response to a ping response. + :return: + """ + pass + + def on_temp_ban_received(self, response: TempBanElement): + """ + Gets called when kik servers send a temp ban message after successful authentication + When received, you will be unable to send or receive any stanzas until the current time is greater than the ban end time. + """ + pass diff --git a/kik_unofficial/client.py b/kik_unofficial/client.py index 762fd78..9436799 100644 --- a/kik_unofficial/client.py +++ b/kik_unofficial/client.py @@ -1,5 +1,4 @@ import asyncio -import logging import time from threading import Thread, Event from typing import Union, List, Tuple @@ -21,18 +20,19 @@ from kik_unofficial.datatypes.xmpp.base_elements import XMPPElement from kik_unofficial.http import profile_pictures, content from kik_unofficial.utilities.credential_utilities import random_device_id, random_android_id -from kik_unofficial.utilities.logging_utils import set_up_basic_logging - +from kik_unofficial.utilities.logging_utils import set_up_basic_logging HOST, PORT = "talk1110an.kik.com", 5223 -log = logging.getLogger('kik_unofficial') + class KikClient: """ The main kik class with which you're managing a kik connection and sending commands """ + def __init__(self, callback: callbacks.KikClientCallback, kik_username: str, kik_password: str, - kik_node: str = None, device_id: str = None, android_id: str = random_android_id(), logging: bool = False) -> None: + kik_node: str = None, device_id: str = None, android_id: str = random_android_id(), log_level: int = 1, + enable_logging: bool = False, log_file_path: str = None) -> None: """ Initializes a connection to Kik servers. If you want to automatically login too, use the username and password parameters. @@ -46,12 +46,13 @@ def __init__(self, callback: callbacks.KikClientCallback, kik_username: str, kik authentication will happen faster and without a login. otherwise supply None. :param device_id: a unique device ID. If you don't supply one, a random one will be generated. (generated at _on_connection_made) :param android_id: a unique android ID. If you don't supply one, a random one will be generated. - :param logging: If true, turns on logging to stdout (default: False) + :param enable_logging: If true, turns on logging to stdout (default: False) + ;param log_file_path: If set will create a daily rotated log file and archive for 7 days. """ # turn on logging with basic configuration - if logging: - set_up_basic_logging() - + if enable_logging: + self.log = set_up_basic_logging(log_level=log_level, logger_name="kik_unofficial", log_file_path=log_file_path) + self.username = kik_username self.password = kik_password self.kik_node = kik_node @@ -71,7 +72,7 @@ def __init__(self, callback: callbacks.KikClientCallback, kik_username: str, kik self._known_users_information = set() self._new_user_added_event = Event() - + self.should_login_on_connection = kik_username is not None and kik_password is not None self._connect() @@ -86,10 +87,10 @@ def _connect(self): def wait_for_messages(self): for _ in range(5): self.kik_connection_thread.join() - log.info("[+] Connection has disconnected, trying again...") + self.log.info("Connection has disconnected, trying again...") time.sleep(1) - - log.info("[+] Failed to reconnect, exiting...") + + self.log.info("Failed to reconnect, exiting...") def _on_connection_made(self): """ @@ -98,13 +99,15 @@ def _on_connection_made(self): """ if self.username and self.password and self.kik_node and self.device_id: # we have all required credentials, we can authenticate - log.info(f"[+] Establishing authenticated connection using kik node '{self.kik_node}', device id '{self.device_id}' and android id '{self.android_id}'...") + self.log.info( + f"Establishing authenticated connection using kik node '{self.kik_node}', device id '{self.device_id}' and android id '{self.android_id}'...") - message = login.EstablishAuthenticatedSessionRequest(self.kik_node, self.username, self.password, self.device_id) + message = login.EstablishAuthenticatedSessionRequest(self.kik_node, self.username, self.password, + self.device_id) else: # if device id is not known, we generate a random one self.device_id = random_device_id() - message = login.MakeAnonymousStreamInitTag(self.device_id, n = 1) + message = login.MakeAnonymousStreamInitTag(self.device_id, n=1) self.initial_connection_payload = message.serialize() self.connection.send_raw_data(self.initial_connection_payload) @@ -117,7 +120,7 @@ def _establish_authenticated_session(self, kik_node): :param kik_node: The user's kik node (everything before '@' in JID). """ self.kik_node = kik_node - log.info("[+] Closing current connection and creating a new authenticated one.") + self.log.info("Closing current connection and creating a new authenticated one.") self.disconnect() self._connect() @@ -135,25 +138,27 @@ def login(self, username: str, password: str, captcha_result: str = None): self.password = password login_request = login.LoginRequest(username, password, captcha_result, self.device_id, self.android_id) login_type = "email" if '@' in self.username else "username" - log.info(f"[+] Logging in with {login_type} '{username}' and a given password {'*' * len(password)}...") + self.log.info(f"Logging in with {login_type} '{username}' and a given password {'*' * len(password)}...") return self._send_xmpp_element(login_request) - def register(self, email, username, password, first_name, last_name, birthday="1974-11-20", captcha_result: str = None): + def register(self, email, username, password, first_name, last_name, birthday="1974-11-20", + captcha_result: str = None): """ Sends a register request to sign up a new user to kik with the given details. """ self.username = username self.password = password - register_message = sign_up.RegisterRequest(email, username, password, first_name, last_name, birthday, captcha_result, + register_message = sign_up.RegisterRequest(email, username, password, first_name, last_name, birthday, + captcha_result, self.device_id, self.android_id) - log.info(f"[+] Sending sign up request (name: {first_name} {last_name}, email: {email})...") + self.log.info(f"Sending sign up request (name: {first_name} {last_name}, email: {email})...") return self._send_xmpp_element(register_message) def request_roster(self, is_big=True, timestamp=None): """ Requests the list of chat partners (people and groups). This is called roster in XMPP terms. """ - log.info("[+] Requesting roster (list of chat partners)...") + self.log.info("Requesting roster (list of chat partners)...") return self._send_xmpp_element(roster.FetchRosterRequest(is_big=is_big, timestamp=timestamp)) # ------------------------------- @@ -173,10 +178,10 @@ def send_chat_message(self, peer_jid: str, message: str, bot_mention_jid=None): peer_jid = self.get_jid(peer_jid) if self.is_group_jid(peer_jid): - log.info(f"[+] Sending chat message '{message}' to group '{peer_jid}'...") + self.log.info(f"Sending chat message '{message}' to group '{peer_jid}'...") return self._send_xmpp_element(chatting.OutgoingGroupChatMessage(peer_jid, message, bot_mention_jid)) else: - log.info(f"[+] Sending chat message '{message}' to user '{peer_jid}'...") + self.log.info(f"Sending chat message '{message}' to user '{peer_jid}'...") return self._send_xmpp_element(chatting.OutgoingChatMessage(peer_jid, message, False, bot_mention_jid)) def send_chat_image(self, peer_jid: str, file, forward=True): @@ -189,10 +194,10 @@ def send_chat_image(self, peer_jid: str, file, forward=True): peer_jid = self.get_jid(peer_jid) if self.is_group_jid(peer_jid): - log.info(f"[+] Sending chat image to group '{peer_jid}'...") + self.log.info(f"Sending chat image to group '{peer_jid}'...") imageRequest = chatting.OutgoingGroupChatImage(peer_jid, file, forward) else: - log.info(f"[+] Sending chat image to user '{peer_jid}'...") + self.log.info(f"Sending chat image to user '{peer_jid}'...") imageRequest = chatting.OutgoingChatImage(peer_jid, file, False, forward) content.upload_gallery_image( imageRequest, @@ -211,7 +216,7 @@ def send_read_receipt(self, peer_jid: str, receipt_message_id: str, group_jid=No :param group_jid If the receipt is sent for a message that was sent in a group, this parameter should contain the group's JID """ - log.info(f"[+] Sending read receipt to JID {peer_jid} for message ID {receipt_message_id}") + self.log.info(f"Sending read receipt to JID {peer_jid} for message ID {receipt_message_id}") return self._send_xmpp_element(chatting.OutgoingReadReceipt(peer_jid, receipt_message_id, group_jid)) def send_delivered_receipt(self, peer_jid: str, receipt_message_id: str, group_jid: str = None): @@ -222,7 +227,7 @@ def send_delivered_receipt(self, peer_jid: str, receipt_message_id: str, group_j :param receipt_message_id: The message ID for which to generate the receipt :param group_jid: The group's JID, in case the receipt is sent in a group (None otherwise) """ - log.info(f"[+] Sending delivered receipt to JID {peer_jid} for message ID {receipt_message_id}") + self.log.info(f"Sending delivered receipt to JID {peer_jid} for message ID {receipt_message_id}") return self._send_xmpp_element(chatting.OutgoingDeliveredReceipt(peer_jid, receipt_message_id, group_jid)) def send_is_typing(self, peer_jid: str, is_typing: bool): @@ -248,10 +253,10 @@ def send_gif_image(self, peer_jid: str, search_term: str, API_key: str): :param API_key: The API key for tenor (Get one from https://developers.google.com/tenor/) """ if self.is_group_jid(peer_jid): - log.info(f"[+] Sending a GIF message to group '{peer_jid}'...") + self.log.info(f"Sending a GIF message to group '{peer_jid}'...") return self._send_xmpp_element(chatting.OutgoingGIFMessage(peer_jid, search_term, API_key, True)) else: - log.info(f"[+] Sending a GIF message to user '{peer_jid}'...") + self.log.info(f"Sending a GIF message to user '{peer_jid}'...") return self._send_xmpp_element(chatting.OutgoingGIFMessage(peer_jid, search_term, API_key, False)) def request_info_of_users(self, peer_jids: Union[str, List[str]]): @@ -302,7 +307,7 @@ def change_group_name(self, group_jid: str, new_name: str): :param group_jid: The JID of the group whose name should be changed :param new_name: The new name to give to the group """ - log.info(f"[+] Requesting a group name change for JID {group_jid} to '{new_name}'") + self.log.info(f"Requesting a group name change for JID {group_jid} to '{new_name}'") return self._send_xmpp_element(group_adminship.ChangeGroupNameRequest(group_jid, new_name)) def add_peer_to_group(self, group_jid, peer_jid): @@ -312,7 +317,7 @@ def add_peer_to_group(self, group_jid, peer_jid): :param group_jid: The JID of the group into which to add a user :param peer_jid: The JID of the user to add """ - log.info(f"[+] Requesting to add user {peer_jid} into the group {group_jid}") + self.log.info(f"Requesting to add user {peer_jid} into the group {group_jid}") return self._send_xmpp_element(group_adminship.AddToGroupRequest(group_jid, peer_jid)) def remove_peer_from_group(self, group_jid, peer_jid): @@ -322,7 +327,7 @@ def remove_peer_from_group(self, group_jid, peer_jid): :param group_jid: The group JID from which to remove the user :param peer_jid: The JID of the user to remove """ - log.info(f"[+] Requesting removal of user {peer_jid} from group {group_jid}") + self.log.info(f"Requesting removal of user {peer_jid} from group {group_jid}") return self._send_xmpp_element(group_adminship.RemoveFromGroupRequest(group_jid, peer_jid)) def ban_member_from_group(self, group_jid, peer_jid): @@ -332,7 +337,7 @@ def ban_member_from_group(self, group_jid, peer_jid): :param group_jid: The JID of the relevant group :param peer_jid: The JID of the user to ban """ - log.info(f"[+] Requesting ban of user {peer_jid} from group {group_jid}") + self.log.info(f"Requesting ban of user {peer_jid} from group {group_jid}") return self._send_xmpp_element(group_adminship.BanMemberRequest(group_jid, peer_jid)) def unban_member_from_group(self, group_jid, peer_jid): @@ -342,7 +347,7 @@ def unban_member_from_group(self, group_jid, peer_jid): :param group_jid: The JID of the relevant group :param peer_jid: The JID of the user to un-ban from the gorup """ - log.info(f"[+] Requesting un-banning of user {peer_jid} from the group {group_jid}") + self.log.info(f"Requesting un-banning of user {peer_jid} from the group {group_jid}") return self._send_xmpp_element(group_adminship.UnbanRequest(group_jid, peer_jid)) def join_group_with_token(self, group_hashtag, group_jid, join_token): @@ -354,7 +359,7 @@ def join_group_with_token(self, group_hashtag, group_jid, join_token): :param join_token: a token that can be extracted in the callback on_group_search_response, after calling search_group() """ - log.info(f"[+] Trying to join the group '{group_hashtag}' with JID {group_jid}") + self.log.info(f"Trying to join the group '{group_hashtag}' with JID {group_jid}") return self._send_xmpp_element(roster.GroupJoinRequest(group_hashtag, join_token, group_jid)) def leave_group(self, group_jid): @@ -363,7 +368,7 @@ def leave_group(self, group_jid): :param group_jid: The JID of the group to leave """ - log.info(f"[+] Leaving group {group_jid}") + self.log.info(f"Leaving group {group_jid}") return self._send_xmpp_element(group_adminship.LeaveGroupRequest(group_jid)) def promote_to_admin(self, group_jid, peer_jid): @@ -373,7 +378,7 @@ def promote_to_admin(self, group_jid, peer_jid): :param group_jid: The group JID for which the member will become an admin :param peer_jid: The JID of user to turn into an admin """ - log.info(f"[+] Promoting user {peer_jid} to admin in group {group_jid}") + self.log.info(f"Promoting user {peer_jid} to admin in group {group_jid}") return self._send_xmpp_element(group_adminship.PromoteToAdminRequest(group_jid, peer_jid)) def demote_admin(self, group_jid, peer_jid): @@ -384,7 +389,7 @@ def demote_admin(self, group_jid, peer_jid): :param peer_jid: The admin user to demote :return: """ - log.info(f"[+] Demoting user {peer_jid} to a regular member in group {group_jid}") + self.log.info(f"Demoting user {peer_jid} to a regular member in group {group_jid}") return self._send_xmpp_element(group_adminship.DemoteAdminRequest(group_jid, peer_jid)) def add_members(self, group_jid, peer_jids: Union[str, List[str]]): @@ -394,18 +399,18 @@ def add_members(self, group_jid, peer_jids: Union[str, List[str]]): :param group_jid: The group into which to join the users :param peer_jids: a list (or a single string) of JIDs to add to the group """ - log.info(f"[+] Adding some members to the group {group_jid}") + self.log.info(f"Adding some members to the group {group_jid}") return self._send_xmpp_element(group_adminship.AddMembersRequest(group_jid, peer_jids)) # ---------------------- # Other Operations # ---------------------- - def send_ack(self, sender_jid, is_receipt: bool, message_id, group_jid = None): + def send_ack(self, sender_jid, is_receipt: bool, message_id, group_jid=None): """ Sends an acknowledgement for a provided message ID """ - log.info(f"[+] Sending acknowledgement for message ID {message_id}") + self.log.info(f"Sending acknowledgement for message ID {message_id}") return self._send_xmpp_element(history.OutgoingAcknowledgement(sender_jid, is_receipt, message_id, group_jid)) def request_messaging_history(self): @@ -413,7 +418,7 @@ def request_messaging_history(self): Requests the account's messaging history. Results will be returned using the on_message_history_response() callback """ - log.info("[+] Requesting messaging history") + self.log.info("Requesting messaging history") return self._send_xmpp_element(history.OutgoingHistoryRequest()) def search_group(self, search_query): @@ -423,7 +428,7 @@ def search_group(self, search_query): :param search_query: The query that contains some of the desired groups' name. """ - log.info(f"[+] Initiating a search for groups using the query '{search_query}'") + self.log.info(f"Initiating a search for groups using the query '{search_query}'") return self._send_xmpp_element(roster.GroupSearchRequest(search_query)) def check_username_uniqueness(self, username): @@ -433,7 +438,7 @@ def check_username_uniqueness(self, username): :param username: The username to check for its existence """ - log.info(f"[+] Checking for Uniqueness of username '{username}'") + self.log.info(f"Checking for Uniqueness of username '{username}'") return self._send_xmpp_element(sign_up.CheckUsernameUniquenessRequest(username)) def set_profile_picture(self, filename): @@ -442,7 +447,7 @@ def set_profile_picture(self, filename): :param filename: The path to the file OR its bytes OR an IOBase object to set """ - log.info(f"[+] Setting the profile picture to file '{filename}'") + self.log.info(f"Setting the profile picture to file '{filename}'") profile_pictures.set_profile_picture( filename, f'{self.kik_node}@talk.kik.com', self.username, self.password ) @@ -453,11 +458,19 @@ def set_background_picture(self, filename): :param filename: The path to the image file OR its bytes OR an IOBase object to set """ - log.info(f"[+] Setting the background picture to filename '{filename}'") + self.log.info(f"Setting the background picture to filename '{filename}'") profile_pictures.set_background_picture( filename, f'{self.kik_node}@talk.kik.com', self.username, self.password ) + def send_ping(self): + """ + Sends Ping Stanza A response Pong Should return from the server + Pong Object will include round trip time + """ + self.log.info(f'Sending KIK servers a ping.') + return self._send_xmpp_element(chatting.KikPingRequest()) + def send_captcha_result(self, stc_id, captcha_result): """ In case a captcha was encountered, solves it using an element ID and a response parameter. @@ -467,14 +480,14 @@ def send_captcha_result(self, stc_id, captcha_result): :param stc_id: The stc_id from the CaptchaElement that was encountered :param captcha_result: The answer to the captcha (which was generated after solved by a human) """ - log.info(f"[+] Trying to solve a captcha with result: '{captcha_result}'") + self.log.info(f"Trying to solve a captcha with result: '{captcha_result}'") return self._send_xmpp_element(login.CaptchaSolveRequest(stc_id, captcha_result)) def get_my_profile(self): """ Fetches your own profile details """ - log.info("[+] Requesting self profile") + self.log.info("Requesting self profile") return self._send_xmpp_element(account.GetMyProfileRequest()) def change_display_name(self, first_name, last_name): @@ -484,7 +497,7 @@ def change_display_name(self, first_name, last_name): :param first_name: The first name :param last_name: The last name """ - log.info(f"[+] Changing the display name to '{first_name} {last_name}'") + self.log.info(f"Changing the display name to '{first_name} {last_name}'") return self._send_xmpp_element(account.ChangeNameRequest(first_name, last_name)) def change_password(self, new_password, email): @@ -494,7 +507,7 @@ def change_password(self, new_password, email): :param new_password: The new login password to set for the account :param email: The current email of the account """ - log.info("[+] Changing the password of the account") + self.log.info("Changing the password of the account") return self._send_xmpp_element(account.ChangePasswordRequest(self.password, new_password, email, self.username)) def change_email(self, old_email, new_email): @@ -504,14 +517,14 @@ def change_email(self, old_email, new_email): :param old_email: The current email :param new_email: The new email to set """ - log.info(f"[+] Changing account email to '{new_email}'") + self.log.info(f"Changing account email to '{new_email}'") return self._send_xmpp_element(account.ChangeEmailRequest(self.password, old_email, new_email)) def disconnect(self): """ Closes the connection to kik's servers. """ - log.info("[!] Disconnecting.") + self.log.warning("Disconnecting.") self.connection.close() self.is_expecting_connection_reset = True @@ -528,10 +541,10 @@ def _send_xmpp_element(self, message: XMPPElement): :return: The UUID of the element that was sent """ while not self.connected: - log.debug("[!] Waiting for connection.") + self.log.debug("Waiting for connection.") time.sleep(0.1) if type(message.serialize()) is list: - log.debug("[!] Sending multi packet data.") + self.log.debug("Sending multi packet data.") packets = message.serialize() for p in packets: self.loop.call_soon_threadsafe(self.connection.send_raw_data, p) @@ -558,12 +571,23 @@ def _on_new_data_received(self, data: bytes): if xml_element.name == "k": self._handle_received_k_element(xml_element) - if xml_element.name == "iq": + elif xml_element.name == "iq": self._handle_received_iq_element(xml_element) elif xml_element.name == "message": self._handle_xmpp_message(xml_element) elif xml_element.name == 'stc': - self.callback.on_captcha_received(login.CaptchaElement(xml_element)) + if xml_element.stp['type'] == 'ca': + self.callback.on_captcha_received(login.CaptchaElement(xml_element)) + elif xml_element.stp['type'] == 'bn': + self.callback.on_temp_ban_received(login.TempBanElement(xml_element)) + else: + self.log.warning(f'Unknown stc element type: {xml_element["type"]}') + elif xml_element.name == 'ack': + pass + elif xml_element.name == 'pong': + self.callback.on_pong(chatting.KikPongResponse(xml_element)) + else: + self.log.warning(f'Unknown element type: {xml_element.name}') def _handle_received_k_element(self, k_element: BeautifulSoup): """ @@ -577,7 +601,7 @@ def _handle_received_k_element(self, k_element: BeautifulSoup): if 'ts' in k_element.attrs: # authenticated! - log.info("[+] Authenticated successfully.") + self.log.info("Authenticated successfully.") self.authenticated = True self.authenticator.send_stanza() self.callback.on_authenticated() @@ -597,8 +621,10 @@ def _handle_received_iq_element(self, iq_element: BeautifulSoup): :param iq_element: The iq XML element we just received from kik. """ if iq_element.error and "bad-request" in dir(iq_element.error): - # TODO: specify error type - raise Exception(f"Received a Bad Request error for stanza with ID {iq_element.attrs['id']}") + if "bad-request" in dir(iq_element.error): + raise Exception(f'Received a Bad Request error for stanza with ID {iq_element.attrs["id"]}') + if iq_element.error.find("service-unavailable"): + raise Exception(f'Received a service Unavailable error for stanza with ID {iq_element.attrs["id"]}') query = iq_element.query xml_namespace = query['xmlns'] if 'xmlns' in query.attrs else query['xmlns:'] @@ -656,6 +682,14 @@ def _handle_kik_event(self, xmpp_element): self.callback.on_group_receipts_received(chatting.IncomingGroupReceiptsEvent(xmpp_element)) else: xmlns_handlers.XMPPMessageHandler(self.callback, self).handle(xmpp_element) + elif xmpp_element['type'] == "is-typing": + # some clients send is-typing this way? + xmlns_handlers.XMPPMessageHandler(self.callback, self).handle(xmpp_element) + elif xmpp_element['type'] == "error": + # error type happens on KIK jail (Restricted Group Access) + pass + else: + self.log.warning(f'Received unknown XMPP element type: {xmpp_element}') else: # iPads send messages without xmlns, try to handle it as jabber:client xmlns_handlers.XMPPMessageHandler(self.callback, self).handle(xmpp_element) @@ -668,7 +702,7 @@ def _on_connection_lost(self): """ self.connected = False if not self.is_expecting_connection_reset: - log.info("[-] The connection was unexpectedly lost") + self.log.warning("The connection was unexpectedly lost") self.is_expecting_connection_reset = False @@ -680,21 +714,21 @@ def _kik_connection_thread_function(self): # If there is already a connection going, than wait for it to stop if self.loop and self.loop.is_running(): self.loop.call_soon_threadsafe(self.connection.close) - log.debug("[!] Waiting for the previous connection to stop.") + self.log.debug("Waiting for the previous connection to stop.") while self.loop.is_running(): - log.debug("[!] Still Waiting for the previous connection to stop.") + self.log.debug("Still Waiting for the previous connection to stop.") time.sleep(1) - log.info("[+] Initiating the Kik Connection thread and connecting to kik server...") + self.log.info("Initiating the Kik Connection thread and connecting to kik server...") # create the connection and launch the asyncio loop self.connection = KikConnection(self.loop, self) connection_coroutine = self.loop.create_connection(lambda: self.connection, HOST, PORT, ssl=True) self.loop.run_until_complete(connection_coroutine) - log.debug("[!] Running main loop") + self.log.debug("Running main loop") self.loop.run_forever() - log.debug("[!] Main loop ended.") + self.log.debug("Main loop ended.") self.callback.on_disconnected() def get_jid(self, username_or_jid): @@ -703,7 +737,7 @@ def get_jid(self, username_or_jid): return username_or_jid username = username_or_jid - # first search if we already have it + # first search if we already have it if self.get_jid_from_cache(username) is None: # go request for it @@ -714,7 +748,6 @@ def get_jid(self, username_or_jid): return self.get_jid_from_cache(username) - def get_jid_from_cache(self, username): for user in self._known_users_information: if user.username.lower() == username.lower(): @@ -739,19 +772,20 @@ def __init__(self, loop, api: KikClient): self.partial_data = None # type: bytes self.partial_data_start_tag = None # type: str self.transport = None # type: Transport + self.log = api.log def connection_made(self, transport: Transport): self.transport = transport - log.info("[!] Connected.") + self.log.info("Connected.") self.api._on_connection_made() def data_received(self, data: bytes): - log.debug("[+] Received raw data: %s", data) + self.log.debug("Received raw data: %s", data) if self.partial_data is None: if len(data) < 16384: self.loop.call_soon_threadsafe(self.api._on_new_data_received, data) else: - log.debug("Multi-packet data, waiting for next packet.") + self.log.debug("Multi-packet data, waiting for next packet.") start_tag, is_closing = self.parse_start_tag(data) self.partial_data_start_tag = start_tag self.partial_data = data @@ -760,7 +794,7 @@ def data_received(self, data: bytes): self.partial_data = None self.partial_data_start_tag = None else: - log.debug(f"[!] Waiting for another packet, size={len(self.partial_data)}") + self.log.debug(f"Waiting for another packet, size={len(self.partial_data)}") self.partial_data += data @staticmethod @@ -782,7 +816,7 @@ def connection_lost(self, exception): self.loop.stop() def send_raw_data(self, data: bytes): - log.debug("[+] Sending raw data: %s", data) + self.log.debug("Sending raw data: %s", data) self.transport.write(data) def close(self): diff --git a/kik_unofficial/datatypes/xmpp/account.py b/kik_unofficial/datatypes/xmpp/account.py index a3d58e1..78d216f 100644 --- a/kik_unofficial/datatypes/xmpp/account.py +++ b/kik_unofficial/datatypes/xmpp/account.py @@ -1,4 +1,5 @@ import datetime +import time from bs4 import BeautifulSoup @@ -124,3 +125,6 @@ def serialize(self): f'' f'') return data.encode() + + + diff --git a/kik_unofficial/datatypes/xmpp/auth_stanza.py b/kik_unofficial/datatypes/xmpp/auth_stanza.py index c6cba85..0640aa4 100644 --- a/kik_unofficial/datatypes/xmpp/auth_stanza.py +++ b/kik_unofficial/datatypes/xmpp/auth_stanza.py @@ -33,7 +33,7 @@ def send_stanza(self) -> None: Send the outgoing auth stanza """ stanza = self.searlize() - log.info('[+] Sending authentication certificate') + log.info('Sending authentication certificate') self.client.loop.call_soon_threadsafe(self.client.connection.send_raw_data, stanza) def revalidate(self) -> None: @@ -43,7 +43,7 @@ def revalidate(self) -> None: if time.time() < self.revalidate_time: return stanza = self.searlize() - log.info('[+] Revalidating the authentication certificate') + log.info('Revalidating the authentication certificate') self.client.loop.call_soon_threadsafe(self.client.connection.send_raw_data, stanza) def searlize(self) -> bytes: @@ -158,11 +158,11 @@ def handle(self, data: bs4.BeautifulSoup): Handles the auth response (result/error) sent by Kik """ if data.error: - log.error('[!] kik:auth:cert [' + data.error.get('code') + '] ' + data.error.get_text()) + log.error('kik:auth:cert [' + data.error.get('code') + '] ' + data.error.get_text()) log.debug(str(data)) return if data.find_all('regenerate-key', recursive=True): - log.info('[!] Regenerating the keys for certificate authentication') + log.info('Regenerating the keys for certificate authentication') self.teardown() self.send_stanza() return @@ -171,7 +171,7 @@ def handle(self, data: bs4.BeautifulSoup): self.cert_url = data.certificate.url.text self.revalidate_time = current + (revalidate * 1000) self.client.loop.call_later(revalidate, self.revalidate) - log.info('[+] Successfully validated the authentication certificate') + log.info('Successfully validated the authentication certificate') def teardown(self): """ diff --git a/kik_unofficial/datatypes/xmpp/chatting.py b/kik_unofficial/datatypes/xmpp/chatting.py index b67f1dc..2f23739 100644 --- a/kik_unofficial/datatypes/xmpp/chatting.py +++ b/kik_unofficial/datatypes/xmpp/chatting.py @@ -19,6 +19,7 @@ class OutgoingChatMessage(XMPPElement): """ Represents an outgoing text chat message to another kik entity (member or group) """ + def __init__(self, peer_jid, body, is_group=False, bot_mention_jid=None): super().__init__() self.peer_jid = peer_jid @@ -49,6 +50,7 @@ class OutgoingGroupChatMessage(OutgoingChatMessage): """ Represents an outgoing text chat message to a group """ + def __init__(self, group_jid, body, bot_mention_jid): super().__init__(group_jid, body, is_group=True, bot_mention_jid=bot_mention_jid) @@ -57,6 +59,7 @@ class OutgoingChatImage(XMPPElement): """ Represents an outgoing image chat message to another kik entity (member or group) """ + def __init__(self, peer_jid, file_location, is_group=False, forward=True): super().__init__() self.peer_jid = peer_jid @@ -96,8 +99,7 @@ def serialize(self): '' ) - - packets = [data[s:s+16384].encode() for s in range(0, len(data), 16384)] + packets = [data[s:s + 16384].encode() for s in range(0, len(data), 16384)] return list(packets) @@ -105,6 +107,7 @@ class OutgoingGroupChatImage(OutgoingChatImage): """ Represents an outgoing image chat message to a group """ + def __init__(self, group_jid, file_location, forward): super().__init__(group_jid, file_location, is_group=True, forward=forward) @@ -113,6 +116,7 @@ class IncomingChatMessage(XMPPResponse): """ Represents an incoming text chat message from another user """ + def __init__(self, data: BeautifulSoup): super().__init__(data) self.request_delivered_receipt = data.request['d'] == 'true' if 'd' in data.request.attrs else False @@ -131,6 +135,7 @@ class IncomingGroupChatMessage(IncomingChatMessage): """ Represents an incoming text chat message from a group """ + def __init__(self, data: BeautifulSoup): super().__init__(data) self.group_jid = data.g['jid'] @@ -142,6 +147,7 @@ class OutgoingReadReceipt(XMPPElement): """ Represents an outgoing read receipt to a specific user, for one or more messages """ + def __init__(self, peer_jid, receipt_message_id, group_jid=None): super().__init__() self.peer_jid = peer_jid @@ -234,7 +240,7 @@ def __init__(self, peer_jid, link, title, text, app_name): def serialize(self): message_type = 'type="groupchat" xmlns="kik:groups"' if 'group' in self.peer_jid else 'type="chat"' timestamp = str(int(round(time.time() * 1000))) - + data = (f'' '' f'' @@ -258,7 +264,7 @@ def serialize(self): '' '' '') - + return data.encode() @@ -298,7 +304,8 @@ class IncomingGroupStatus(XMPPResponse): def __init__(self, data: BeautifulSoup): super().__init__(data) - self.request_delivered_receipt = data.request['d'] == 'true' if data.request and 'd' in data.request.attrs else False + self.request_delivered_receipt = data.request[ + 'd'] == 'true' if data.request and 'd' in data.request.attrs else False self.requets_read_receipt = data.request['r'] == 'true' if data.request and 'r' in data.request.attrs else False self.group_jid = data['from'] self.to_jid = data['to'] @@ -312,7 +319,8 @@ class IncomingGroupSysmsg(XMPPResponse): def __init__(self, data: BeautifulSoup): super().__init__(data) - self.request_delivered_receipt = data.request['d'] == 'true' if data.request and 'd' in data.request.attrs else False + self.request_delivered_receipt = data.request[ + 'd'] == 'true' if data.request and 'd' in data.request.attrs else False self.requets_read_receipt = data.request['r'] == 'true' if data.request and 'r' in data.request.attrs else False self.group_jid = data['from'] self.to_jid = data['to'] @@ -354,7 +362,8 @@ def __init__(self, data: BeautifulSoup): class IncomingImageMessage(XMPPResponse): def __init__(self, data: BeautifulSoup): super().__init__(data) - self.request_delivered_receipt = data.request['d'] == 'true' if data.request and 'd' in data.request.attrs else False + self.request_delivered_receipt = data.request[ + 'd'] == 'true' if data.request and 'd' in data.request.attrs else False self.requets_read_receipt = data.request['r'] == 'true' if data.request and 'r' in data.request.attrs else False self.image_url = data.find('file-url').get_text() if data.find('file-url') else None self.status = data.status.text if data.status else None @@ -392,9 +401,11 @@ class IncomingGifMessage(XMPPResponse): """ Represents an incoming GIF message from another kik entity, sent as a URL """ + def __init__(self, data: BeautifulSoup): super().__init__(data) - self.request_delivered_receipt = data.request['d'] == 'true' if data.request and 'd' in data.request.attrs else False + self.request_delivered_receipt = data.request[ + 'd'] == 'true' if data.request and 'd' in data.request.attrs else False self.requets_read_receipt = data.request['r'] == 'true' if data.request and 'r' in data.request.attrs else False self.status = data.status.text if data.status else None self.from_jid = data['from'] if data else None @@ -413,6 +424,7 @@ class OutgoingGIFMessage(XMPPElement): """ Represents an outgoing GIF message to another kik entity (member or group) """ + def __init__(self, peer_jid, search_term, API_key, is_group=True): super().__init__() self.peer_jid = peer_jid @@ -454,11 +466,10 @@ def serialize(self): '' ) - packets = [data[s:s + 16384].encode() for s in range(0, len(data), 16384)] return list(packets) - def get_gif_data(self, search_term, API_key): + def get_gif_data(self, search_term, API_key): if not API_key: raise Exception("A tendor.com API key is required to search for GIFs images. please get one and change it") @@ -479,7 +490,8 @@ def get_gif_data(self, search_term, API_key): class IncomingVideoMessage(XMPPResponse): def __init__(self, data: BeautifulSoup): super().__init__(data) - self.request_delivered_receipt = data.request['d'] == 'true' if data.request and 'd' in data.request.attrs else False + self.request_delivered_receipt = data.request[ + 'd'] == 'true' if data.request and 'd' in data.request.attrs else False self.requets_read_receipt = data.request['r'] == 'true' if data.request and 'r' in data.request.attrs else False self.video_url = data.find('file-url').text self.file_content_type = data.find('file-content-type').text if data.find('file-content-type') else None @@ -493,7 +505,8 @@ def __init__(self, data: BeautifulSoup): class IncomingCardMessage(XMPPResponse): def __init__(self, data: BeautifulSoup): super().__init__(data) - self.request_delivered_receipt = data.request['d'] == 'true' if data.request and 'd' in data.request.attrs else False + self.request_delivered_receipt = data.request[ + 'd'] == 'true' if data.request and 'd' in data.request.attrs else False self.request_read_receipt = data.request['r'] == 'true' if data.request and 'r' in data.request.attrs else False self.from_jid = data['from'] self.to_jid = data['to'] @@ -508,3 +521,20 @@ def __init__(self, data: BeautifulSoup): self.uri = data.find('uri').text if data.find('uri') else None +class KikPingRequest(XMPPElement): + def __init__(self): + super().__init__() + + def serialize(self) -> bytes: + data = ('') + + return data.encode() + + +class KikPongResponse: + """ + Response to a request to kik servers + """ + + def __init__(self, data: BeautifulSoup): + self.recieved_time = time.time() diff --git a/kik_unofficial/datatypes/xmpp/login.py b/kik_unofficial/datatypes/xmpp/login.py index a9d6401..990d3bb 100644 --- a/kik_unofficial/datatypes/xmpp/login.py +++ b/kik_unofficial/datatypes/xmpp/login.py @@ -13,11 +13,11 @@ kik_version = kik_version_info["kik_version"] private_key_pem = "-----BEGIN RSA PRIVATE KEY-----\nMIIBPAIBAAJBANEWUEINqV1KNG7Yie9GSM8t75ZvdTeqT7kOF40kvDHIp" \ - "/C3tX2bcNgLTnGFs8yA2m2p7hKoFLoxh64vZx5fZykCAwEAAQJAT" \ - "/hC1iC3iHDbQRIdH6E4M9WT72vN326Kc3MKWveT603sUAWFlaEa5T80GBiP/qXt9PaDoJWcdKHr7RqDq" \ - "+8noQIhAPh5haTSGu0MFs0YiLRLqirJWXa4QPm4W5nz5VGKXaKtAiEA12tpUlkyxJBuuKCykIQbiUXHEwzFYbMHK5E" \ - "/uGkFoe0CIQC6uYgHPqVhcm5IHqHM6/erQ7jpkLmzcCnWXgT87ABF2QIhAIzrfyKXp1ZfBY9R0H4pbboHI4uatySKc" \ - "Q5XHlAMo9qhAiEA43zuIMknJSGwa2zLt/3FmVnuCInD6Oun5dbcYnqraJo=\n-----END RSA PRIVATE KEY----- " + "/C3tX2bcNgLTnGFs8yA2m2p7hKoFLoxh64vZx5fZykCAwEAAQJAT" \ + "/hC1iC3iHDbQRIdH6E4M9WT72vN326Kc3MKWveT603sUAWFlaEa5T80GBiP/qXt9PaDoJWcdKHr7RqDq" \ + "+8noQIhAPh5haTSGu0MFs0YiLRLqirJWXa4QPm4W5nz5VGKXaKtAiEA12tpUlkyxJBuuKCykIQbiUXHEwzFYbMHK5E" \ + "/uGkFoe0CIQC6uYgHPqVhcm5IHqHM6/erQ7jpkLmzcCnWXgT87ABF2QIhAIzrfyKXp1ZfBY9R0H4pbboHI4uatySKc" \ + "Q5XHlAMo9qhAiEA43zuIMknJSGwa2zLt/3FmVnuCInD6Oun5dbcYnqraJo=\n-----END RSA PRIVATE KEY----- " private_key = rsa.PrivateKey.load_pkcs1(private_key_pem, format='PEM') @@ -25,6 +25,7 @@ class LoginRequest(XMPPElement): """ Represents a Kik Login request. """ + def __init__(self, username, password, captcha_result=None, device_id=None, android_id=None): super().__init__() self.username = username @@ -43,25 +44,25 @@ def serialize(self) -> bytes: tag = ('{}' '{}') - data = (f'' - f'' - f'{tag.format(self.username, password_key)}' - f'{self.device_id}' - 'utm_source=google-play&utm_medium=organic' - 'unknown' - 'unknown' - 'android' - 'generic' - '1' - f'{kik_version}' - 'en_US' - '19' - '0' + data = (f'' + f'' + f'{tag.format(self.username, password_key)}' + f'{self.device_id}' + 'utm_source=google-play&utm_medium=organic' + 'unknown' + 'unknown' + 'android' + 'generic' + '1' + f'{kik_version}' + 'en_US' + '19' + '0' 'CAN' \ - f'{self.android_id}' - 'Samsung Galaxy S5 - 4.4.4 - API 19 - 1080x1920' + f'{self.android_id}' + 'Samsung Galaxy S5 - 4.4.4 - API 19 - 1080x1920' f'{captcha}' \ - '' + '' '') return data.encode() @@ -71,6 +72,7 @@ class LoginResponse: """ Represents a Kik Login response that is received after a log-in attempt. """ + def __init__(self, data: BeautifulSoup): self.kik_node = data.query.node.text self.email = data.query.email.text @@ -79,6 +81,7 @@ def __init__(self, data: BeautifulSoup): self.first_name = data.query.first.text self.last_name = data.query.last.text + class MakeAnonymousStreamInitTag(XMPPElement): def __init__(self, device_id=None, n=1): super().__init__() @@ -92,7 +95,7 @@ def serialize(self): timestamp = str(CryptographicUtils.make_kik_timestamp()) sid = CryptographicUtils.make_kik_uuid() - signature = rsa.sign(f"{can + device}:{kik_version}:{timestamp}:{sid}".encode(), private_key,'SHA-256') + signature = rsa.sign(f"{can + device}:{kik_version}:{timestamp}:{sid}".encode(), private_key, 'SHA-256') signature = base64.b64encode(signature, '-_'.encode()).decode().rstrip('=') hmac_data = f"{timestamp}:{can}{device}" @@ -104,12 +107,12 @@ def serialize(self): 'lang': 'en_US', 'sid': sid, 'anon': '1', - 'ts': timestamp, - 'v': kik_version, - 'cv': cv, - 'conn': 'WIFI', - 'dev': can+device, - } + 'ts': timestamp, + 'v': kik_version, + 'cv': cv, + 'conn': 'WIFI', + 'dev': can + device, + } # Test data to confirm the sort_kik_map function returns the correct result. # the_map = { @@ -130,11 +133,13 @@ def serialize(self): packet = CryptographicUtils.make_connection_payload(*CryptographicUtils.sort_kik_map(the_map)) return packet.encode() + class EstablishAuthenticatedSessionRequest(XMPPElement): """ a request sent on the begging of the connection to establish an authenticated session. That is, on the behalf of a specific kik user, with his credentials. """ + def __init__(self, node, username, password, device_id=None): super().__init__() self.node = node @@ -150,7 +155,7 @@ def serialize(self): # some super secret cryptographic stuff - signature = rsa.sign(f"{jid}:{kik_version}:{timestamp}:{sid}".encode(), private_key,'SHA-256') + signature = rsa.sign(f"{jid}:{kik_version}:{timestamp}:{sid}".encode(), private_key, 'SHA-256') signature = base64.b64encode(signature, '-_'.encode()).decode().rstrip('=') hmac_data = f"{timestamp}:{jid}" hmac_secret_key = CryptographicUtils.build_hmac_key() @@ -174,6 +179,7 @@ class CaptchaElement: The 'stc' element is received when Kik requires a captcha to be filled in, it's followed up by a 'hold' element after which the connection is paused. """ + def __init__(self, data: BeautifulSoup): self.type = data.stp['type'] self.captcha_url = f"{data.stp.text}&callback_url=https://kik.com/captcha-url" @@ -184,6 +190,7 @@ class CaptchaSolveRequest(XMPPElement): """ Response to the 'stc' element. Given the result of the captcha, the connection will resume. """ + def __init__(self, stc_id: str, captcha_result: str): super().__init__() self.captcha_result = captcha_result @@ -192,3 +199,16 @@ def __init__(self, stc_id: str, captcha_result: str): def serialize(self) -> bytes: data = f'{self.captcha_result}' return data.encode() + + +class TempBanElement: + """ + When this is received, you will not be able to send or receive any stanzas until after the ban time + """ + + def __init__(self, data: BeautifulSoup): + self.type = data.stp['type'] + self.stc_id = data['id'] + self.ban_title = data.dialog.find('dialog-title').text + self.ban_message = data.dialog.find('dialog-body').text + self.ban_end_time = int(data.dialog.find('ban-end').text) diff --git a/kik_unofficial/http/content.py b/kik_unofficial/http/content.py index 6bf16b7..384e107 100644 --- a/kik_unofficial/http/content.py +++ b/kik_unofficial/http/content.py @@ -53,7 +53,7 @@ def send(url, image, jid, username, password): def content_upload_thread(url, image, headers): - log.debug('Uploading content') + log.debug('Uploading Image') r = requests.put(url, data=image, headers=headers) if r.status_code != 200: raise KikUploadError(r.status_code, r.reason) diff --git a/kik_unofficial/utilities/credential_utilities.py b/kik_unofficial/utilities/credential_utilities.py index 2ff5fee..d8bd1f8 100644 --- a/kik_unofficial/utilities/credential_utilities.py +++ b/kik_unofficial/utilities/credential_utilities.py @@ -4,12 +4,15 @@ from kik_unofficial.configuration import env from typing import Tuple, Union + def random_device_id(): return os.urandom(16).hex() + def random_android_id(): return os.urandom(8).hex() + def get_credentials_from_env_or_prompt() -> Union[Tuple[str, str, str], None]: # /// ENVIRONMENT VARIABLES /// # # Create your own `.env` file to store the environment variables if running with Docker. diff --git a/kik_unofficial/utilities/cryptographic_utilities.py b/kik_unofficial/utilities/cryptographic_utilities.py index b8553ec..0361700 100644 --- a/kik_unofficial/utilities/cryptographic_utilities.py +++ b/kik_unofficial/utilities/cryptographic_utilities.py @@ -14,6 +14,7 @@ class CryptographicUtils: A class for generating various cryptographic values needed to establish an authenticated session and sending messages. """ + def __init__(self): pass @@ -93,13 +94,13 @@ def make_kik_uuid(): i3 = iArr[i2][0] i2 = iArr[i2][1] j = (((-16777216 & most_significant_bits) >> 22) ^ ((16711680 & most_significant_bits) >> 16)) ^ ( - (65280 & most_significant_bits) >> 8) + (65280 & most_significant_bits) >> 8) i2 = (CryptographicUtils.kik_uuid_sub_func(most_significant_bits, i2) + 1) | ( - CryptographicUtils.kik_uuid_sub_func(most_significant_bits, i3) << 1) + CryptographicUtils.kik_uuid_sub_func(most_significant_bits, i3) << 1) for i4 in range(6): i = (i + (i2 * 7)) % 60 least_significant_bits = (least_significant_bits & ((1 << (i + 2)) ^ -1)) | ( - (CryptographicUtils.kik_uuid_sub_func(j, i4)) << (i + 2)) + (CryptographicUtils.kik_uuid_sub_func(j, i4)) << (i + 2)) mstb = binascii.hexlify( (most_significant_bits.to_bytes((most_significant_bits.bit_length() + 7) // 8, 'big') or b'\0')) lstb = binascii.hexlify( @@ -152,7 +153,7 @@ def sort_kik_map(original_dictionary): new_map[selected_key] = dictionary[selected_key] del dictionary[selected_key] - return new_map, ' '*hash_code_for_spaces + return new_map, ' ' * hash_code_for_spaces @staticmethod def kik_map_hash_code(dictionary, hash_code_base, hash_code_offset): @@ -184,7 +185,22 @@ def kik_hash_code_sub_func(hash_id, bytes_array): for i in range(0, len(digest), 4): j ^= ((((ParsingUtilities.byte_to_signed_int(digest[i + 3])) << 24) | ( - (ParsingUtilities.byte_to_signed_int(digest[i + 2])) << 16)) | ( - (ParsingUtilities.byte_to_signed_int(digest[i + 1])) << 8)) | (ParsingUtilities.byte_to_signed_int(digest[i])) + (ParsingUtilities.byte_to_signed_int(digest[i + 2])) << 16)) | ( + (ParsingUtilities.byte_to_signed_int(digest[i + 1])) << 8)) | ( + ParsingUtilities.byte_to_signed_int(digest[i])) return j + + @staticmethod + def get_kik_host_name(): + # The android APK determines the host name for the XMPP domain + # by using the minor and major version numbers + # talk(major)(minor)0an.kik.com + + split = kik_version_info['kik_version'].split('.') + ret = 'talk' + + for i in range(0, 2): + ret += split[i] + + return ret + '0an.kik.com' diff --git a/kik_unofficial/utilities/logging_utils.py b/kik_unofficial/utilities/logging_utils.py index 10cbe69..c0a75e8 100644 --- a/kik_unofficial/utilities/logging_utils.py +++ b/kik_unofficial/utilities/logging_utils.py @@ -1,13 +1,149 @@ import logging +import os.path import sys +from logging.handlers import TimedRotatingFileHandler + +from colorama import Fore, Style +import datetime + # turn on logging with basic configuration -def set_up_basic_logging(): - logger = logging.getLogger() - logger.setLevel(logging.INFO) - stream_handler = logging.StreamHandler(sys.stdout) - stream_handler.setFormatter(logging.Formatter(log_format())) - logger.addHandler(stream_handler) - -def log_format(): - return '[%(asctime)-15s] %(levelname)-6s (thread %(threadName)-10s): %(message)s' \ No newline at end of file +def set_up_basic_logging(log_level, logger_name, log_file_path): + """ + Set up basic logging using CustomLogger. + + Args: + log_level (int): The logging level (1=DEBUG, 2=INFO, etc.). + logger_name (str): The name of the logger. + log_file_path (str): If a path is given a TimeRotated log file will be created. + """ + # Initialize your custom logger + return CustomLogger(log_level, logger_name, log_file_path) + + +class ColoredFormatter(logging.Formatter): + """ + Custom formatter for logging messages with colors. + + Attributes: + COLOR_CODES (dict): Mapping of logging levels to their respective color codes. + """ + + COLOR_CODES = { + logging.DEBUG: Fore.CYAN, + logging.INFO: Fore.GREEN, + logging.WARNING: Fore.YELLOW, + logging.ERROR: Fore.RED, + logging.CRITICAL: Fore.MAGENTA, + } + + def format(self, record): + """ + Format the specified record as text. + + Args: + record (logging.LogRecord): The record to be formatted. + + Returns: + str: The formatted record with colors and icons. + """ + level_color = self.COLOR_CODES.get(record.levelno, '') + time = datetime.datetime.fromtimestamp(record.created).strftime('%Y-%m-%d %H:%M:%S') + reset = Style.RESET_ALL + if record.levelno == logging.DEBUG: + level_icon = f'[{Style.BRIGHT}{Fore.LIGHTCYAN_EX}^{reset}]' + level_name = 'DEBUG' + highlight_color = f'{Style.BRIGHT}{Fore.LIGHTCYAN_EX}' + elif record.levelno == logging.INFO: + level_icon = f'[{Style.BRIGHT}{Fore.LIGHTGREEN_EX}+{reset}]' + level_name = 'INFO' + highlight_color = f'{Style.BRIGHT}{Fore.LIGHTGREEN_EX}' + elif record.levelno == logging.WARNING: + level_icon = f'[{Style.BRIGHT}{Fore.LIGHTYELLOW_EX}!{reset}]' + level_name = 'WARNING' + highlight_color = f'{Style.BRIGHT}{Fore.LIGHTYELLOW_EX}' + elif record.levelno == logging.ERROR: + level_icon = f'[{Style.BRIGHT}{Fore.LIGHTRED_EX}#{reset}]' + level_name = 'ERROR' + highlight_color = f'{Style.BRIGHT}{Fore.LIGHTRED_EX}' + elif record.levelno == logging.CRITICAL: + level_icon = f'[{Style.BRIGHT}{Fore.LIGHTMAGENTA_EX}*{reset}]' + level_name = 'CRITICAL' + highlight_color = f'{Style.BRIGHT}{Fore.LIGHTMAGENTA_EX}' + else: + level_icon = '' + level_name = '' + highlight_color = '' + message = super().format(record) + for word in message.split(): + if word.startswith('<<') and word.endswith('>>'): + message = message.replace(word, f'{highlight_color}{word[2:-2]}{reset}') + return f'{time} {level_color}{level_name}:{reset} {level_icon} [Thread-{record.thread}:{record.threadName}] {message.replace(level_name + ": ", "")}' + + +class CustomLogger: + """ + Custom logger with colored console output and file logging. + + Attributes: + logger (logging.Logger): The underlying logger instance. + """ + + def __init__(self, log_level, logger_name, log_file_path): + """ + Initialize the custom logger. + + Args: + log_level (int): The logging level (1=DEBUG, 2=INFO, etc.). + logger_name (str): The name of the logger. + log_file_path (str): Full path to log file. + """ + self.logger = logging.getLogger(logger_name) + level_mapping = { + 1: logging.DEBUG, + 2: logging.INFO, + 3: logging.WARNING, + 4: logging.ERROR, + 5: logging.CRITICAL + } + + self.logger.setLevel(level_mapping.get(log_level, logging.INFO)) + + console_handler = logging.StreamHandler() + console_handler.setLevel(level_mapping.get(log_level, logging.INFO)) + + formatter = ColoredFormatter('%(message)s') + console_handler.setFormatter(formatter) + + if log_file_path: + log_dir = os.path.dirname(log_file_path) + if not os.path.exists(log_dir): + try: + os.makedirs(log_dir) + except OSError as e: + print(f"Could not create log directory: {str(e)}") + + file_handler = TimedRotatingFileHandler( + log_file_path, when="midnight", backupCount=7 + ) + + file_handler.setLevel(level_mapping.get(log_level, logging.INFO)) + file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s [%(thread)d/%(threadName)s]: %(message)s')) + self.logger.addHandler(file_handler) + + self.logger.addHandler(console_handler) + + def info(self, msg, *args, **kwargs): + self.logger.info(msg, *args, **kwargs) + + def debug(self, msg, *args, **kwargs): + self.logger.debug(msg, *args, **kwargs) + + def warning(self, msg, *args, **kwargs): + self.logger.warning(msg, *args, **kwargs) + + def error(self, msg, *args, **kwargs): + self.logger.error(msg, *args, **kwargs) + + def critical(self, msg, *args, **kwargs): + self.logger.critical(msg, *args, **kwargs) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c67bcaa --- /dev/null +++ b/requirements.txt @@ -0,0 +1,11 @@ +setuptools~=68.2.0 +requests~=2.31.0 +bs4~=0.0.1 +beautifulsoup4~=4.12.2 +rsa~=4.9 +Pillow~=10.0.1 +pyDes~=2.0.1 +colorama~=0.4.6 +pbkdf2~=1.3 +python-dotenv~=1.0.0 +PyYAML~=6.0.1 \ No newline at end of file