diff --git a/main.py b/main.py index 9eba90d6..467f7d86 100644 --- a/main.py +++ b/main.py @@ -20,9 +20,11 @@ # Suppress stderr sys.stderr = open(os.devnull, 'w') + class ConfigError(Exception): pass + class ConfigValidator: @staticmethod def validate_email(email: str) -> bool: @@ -38,78 +40,127 @@ def validate_yaml_file(yaml_path: Path) -> dict: except FileNotFoundError: raise ConfigError(f"File not found: {yaml_path}") - def validate_config(config_yaml_path: Path) -> dict: + """ + Validate and load configuration from a YAML file. + + This function validates the configuration file to ensure that all required keys are present + and that the types of their values are correct. It checks for required sections like 'remote', + 'experience_level', 'jobTypes', 'positions', 'locations', and 'personal_information', among others. + If any key is missing or has an invalid type, it raises a ConfigError. + + Args: + config_yaml_path (Path): Path to the YAML configuration file. + + Returns: + dict: A dictionary containing the validated configuration parameters. + + Raises: + ConfigError: If a required key is missing or has an invalid type. + """ + + logger.info(f"Loading and validating config file: {config_yaml_path}") + + # Load the YAML configuration file parameters = ConfigValidator.validate_yaml_file(config_yaml_path) + logger.debug(f"Loaded config file content: {parameters}") + + # Define required keys and their expected types required_keys = { 'remote': bool, - 'experienceLevel': dict, + 'experience_level': dict, 'jobTypes': dict, 'date': dict, 'positions': list, 'locations': list, 'distance': int, - 'companyBlacklist': list, - 'titleBlacklist': list, + 'company_blacklist': list, + 'title_blacklist': list, 'llm_model_type': str, 'llm_model': str + } + # Validate each required key for key, expected_type in required_keys.items(): if key not in parameters: - if key in ['companyBlacklist', 'titleBlacklist']: + # Handle optional blacklist keys by setting them as empty lists if missing + if key in ['company_blacklist', 'title_blacklist']: parameters[key] = [] + logger.debug(f"Optional key '{key}' missing, set to empty list.") else: + logger.error(f"Missing key '{key}' in config file.") raise ConfigError(f"Missing or invalid key '{key}' in config file {config_yaml_path}") elif not isinstance(parameters[key], expected_type): - if key in ['companyBlacklist', 'titleBlacklist'] and parameters[key] is None: + # Allow None values for blacklists, but otherwise check type validity + if key in ['company_blacklist', 'title_blacklist'] and parameters[key] is None: parameters[key] = [] + logger.debug(f"Key '{key}' was None, set to empty list.") else: - raise ConfigError(f"Invalid type for key '{key}' in config file {config_yaml_path}. Expected {expected_type}.") + logger.error(f"Invalid type for key '{key}' in config file. Expected {expected_type}, but got {type(parameters[key])}.") + raise ConfigError(f"Invalid type for key '{key}' in config file {config_yaml_path}. Expected {expected_type}, but got {type(parameters[key])}.") + # Validate 'experience_level' section experience_levels = ['internship', 'entry', 'associate', 'mid-senior level', 'director', 'executive'] for level in experience_levels: - if not isinstance(parameters['experienceLevel'].get(level), bool): + if not isinstance(parameters['experience_level'].get(level), bool): + logger.error(f"Invalid value for experience level '{level}'. Expected a boolean (True/False). Current value: {parameters['experience_level'].get(level)}") raise ConfigError(f"Experience level '{level}' must be a boolean in config file {config_yaml_path}") + # Validate 'jobTypes' section job_types = ['full-time', 'contract', 'part-time', 'temporary', 'internship', 'other', 'volunteer'] for job_type in job_types: if not isinstance(parameters['jobTypes'].get(job_type), bool): + logger.error(f"Invalid value for job type '{job_type}'. Expected a boolean (True/False). Current value: {parameters['jobTypes'].get(job_type)}") raise ConfigError(f"Job type '{job_type}' must be a boolean in config file {config_yaml_path}") + # Validate 'date' filters section date_filters = ['all time', 'month', 'week', '24 hours'] for date_filter in date_filters: if not isinstance(parameters['date'].get(date_filter), bool): + logger.error(f"Invalid value for date filter '{date_filter}'. Expected a boolean (True/False). Current value: {parameters['date'].get(date_filter)}") raise ConfigError(f"Date filter '{date_filter}' must be a boolean in config file {config_yaml_path}") + # Validate 'positions' list if not all(isinstance(pos, str) for pos in parameters['positions']): + logger.error(f"Invalid value in 'positions'. All entries must be strings. Current values: {parameters['positions']}") raise ConfigError(f"'positions' must be a list of strings in config file {config_yaml_path}") + + # Validate 'locations' list if not all(isinstance(loc, str) for loc in parameters['locations']): + logger.error(f"Invalid value in 'locations'. All entries must be strings. Current values: {parameters['locations']}") raise ConfigError(f"'locations' must be a list of strings in config file {config_yaml_path}") + # Validate 'distance' field approved_distances = {0, 5, 10, 25, 50, 100} if parameters['distance'] not in approved_distances: + logger.error(f"Invalid distance value '{parameters['distance']}'. Must be one of {approved_distances}.") raise ConfigError(f"Invalid distance value in config file {config_yaml_path}. Must be one of: {approved_distances}") - for blacklist in ['companyBlacklist', 'titleBlacklist']: + for blacklist in ['company_blacklist', 'title_blacklist']: if not isinstance(parameters.get(blacklist), list): raise ConfigError(f"'{blacklist}' must be a list in config file {config_yaml_path}") if parameters[blacklist] is None: parameters[blacklist] = [] - return parameters - + logger.info("Configuration validated successfully.") + return parameters @staticmethod def validate_secrets(secrets_yaml_path: Path) -> tuple: secrets = ConfigValidator.validate_yaml_file(secrets_yaml_path) - mandatory_secrets = ['llm_api_key'] + mandatory_secrets = ['email', 'password','llm_api_key'] for secret in mandatory_secrets: if secret not in secrets: raise ConfigError(f"Missing secret '{secret}' in file {secrets_yaml_path}") + if not ConfigValidator.validate_email(secrets['email']): + raise ConfigError(f"Invalid email format in secrets file {secrets_yaml_path}.") + if not secrets['password']: + raise ConfigError(f"Password cannot be empty in secrets file {secrets_yaml_path}.") + return secrets['email'], str(secrets['password']), secrets['llm_api_key'] if not secrets['llm_api_key']: raise ConfigError(f"llm_api_key cannot be empty in secrets file {secrets_yaml_path}.") return secrets['llm_api_key'] @@ -117,7 +168,9 @@ def validate_secrets(secrets_yaml_path: Path) -> tuple: class FileManager: @staticmethod def find_file(name_containing: str, with_extension: str, at_path: Path) -> Path: - return next((file for file in at_path.iterdir() if name_containing.lower() in file.name.lower() and file.suffix.lower() == with_extension.lower()), None) + return next((file for file in at_path.iterdir() if + name_containing.lower() in file.name.lower() and file.suffix.lower() == with_extension.lower()), + None) @staticmethod def validate_data_folder(app_data_folder: Path) -> tuple: @@ -132,7 +185,9 @@ def validate_data_folder(app_data_folder: Path) -> tuple: output_folder = app_data_folder / 'output' output_folder.mkdir(exist_ok=True) - return (app_data_folder / 'secrets.yaml', app_data_folder / 'config.yaml', app_data_folder / 'plain_text_resume.yaml', output_folder) + return ( + app_data_folder / 'secrets.yaml', app_data_folder / 'config.yaml', app_data_folder / 'plain_text_resume.yaml', + output_folder) @staticmethod def file_paths_to_dict(resume_file: Path | None, plain_text_resume_file: Path) -> dict: @@ -148,62 +203,125 @@ def file_paths_to_dict(resume_file: Path | None, plain_text_resume_file: Path) - return result + def init_browser() -> webdriver.Chrome: try: - options = chrome_browser_options() service = ChromeService(ChromeDriverManager().install()) return webdriver.Chrome(service=service, options=options) except Exception as e: raise RuntimeError(f"Failed to initialize browser: {str(e)}") -def create_and_run_bot(parameters, llm_api_key): + +def create_and_run_bot(email, password, parameters, llm_api_key): try: + logger.info("Starting bot initialization...") + logger.debug(f"Email: {email}") + logger.debug(f"Parameters: {parameters}") + logger.debug(f"LLM API Key: {llm_api_key}") + style_manager = StyleManager() + logger.debug("StyleManager initialized successfully.") + resume_generator = ResumeGenerator() + logger.debug("ResumeGenerator initialized successfully.") + + logger.info("Reading plain text resume file...") with open(parameters['uploads']['plainTextResume'], "r", encoding='utf-8') as file: plain_text_resume = file.read() + logger.debug(f"Plain text resume loaded: {plain_text_resume[:100]}...") # Логируем первые 100 символов + resume_object = Resume(plain_text_resume) - resume_generator_manager = FacadeManager(llm_api_key, style_manager, resume_generator, resume_object, Path("data_folder/output")) + logger.debug(f"Resume object created: {resume_object}") - # generate resume only if no resume flag was provided - if "resume" not in parameters["uploads"]: - os.system("cls" if os.name == "nt" else "clear") - resume_generator_manager.choose_style() - os.system("cls" if os.name == "nt" else "clear") + logger.info("Creating FacadeManager for resume generation...") + resume_generator_manager = FacadeManager( + llm_api_key, style_manager, resume_generator, resume_object, Path("data_folder/output") + ) + logger.debug("FacadeManager initialized successfully.") + os.system('cls' if os.name == 'nt' else 'clear') + logger.info("Choosing resume style...") + resume_generator_manager.choose_style() + logger.info("Resume style chosen successfully.") + os.system('cls' if os.name == 'nt' else 'clear') + + logger.info("Creating JobApplicationProfile object...") job_application_profile_object = JobApplicationProfile(plain_text_resume) + logger.debug(f"JobApplicationProfile created: {job_application_profile_object}") + logger.info("Initializing the browser...") browser = init_browser() - login_component = AIHawkAuthenticator(browser) + logger.debug("Browser initialized successfully.") + + logger.info("Initializing job application component...") apply_component = AIHawkJobManager(browser) + logger.debug(f"Job application component created: {apply_component}") + + logger.info("Creating AIHawkBotFacade object...") + bot = AIHawkBotFacade(None, apply_component) + logger.debug(f"Bot facade created: {bot}") + + logger.info("Setting secrets for the bot...") + bot.set_secrets(email, password) + logger.info("Secrets set successfully.") + + logger.info("Initializing login component...") + login_component = AIHawkAuthenticator(driver=browser, bot_facade=bot) + logger.debug(f"Login component created: {login_component}") + + bot.login_component = login_component + logger.debug("Login component set in bot facade.") + + logger.info("Initializing GPT Answerer component...") gpt_answerer_component = GPTAnswerer(parameters, llm_api_key) - bot = AIHawkBotFacade(login_component, apply_component) + logger.debug(f"GPT Answerer component created: {gpt_answerer_component}") + + + logger.info("Setting job application profile and resume...") bot.set_job_application_profile_and_resume(job_application_profile_object, resume_object) + logger.info("Job application profile and resume set successfully.") + + logger.info("Setting GPT Answerer and resume generator...") bot.set_gpt_answerer_and_resume_generator(gpt_answerer_component, resume_generator_manager) + logger.info("GPT Answerer and resume generator set successfully.") + + logger.info("Setting additional parameters for the bot...") bot.set_parameters(parameters) + logger.info("Parameters set successfully.") + + logger.info("Starting bot login process...") bot.start_login() + logger.info("Login process completed successfully.") + + logger.info("Starting job application process...") bot.start_apply() + logger.info("Job application process completed successfully.") + except WebDriverException as e: logger.error(f"WebDriver error occurred: {e}") except Exception as e: + logger.error("An unexpected error occurred in create_and_run_bot:") + logger.exception(e) raise RuntimeError(f"Error running the bot: {str(e)}") + @click.command() -@click.option('--resume', type=click.Path(exists=True, file_okay=True, dir_okay=False, path_type=Path), help="Path to the resume PDF file") +@click.option('--resume', type=click.Path(exists=True, file_okay=True, dir_okay=False, path_type=Path), + help="Path to the resume PDF file") def main(resume: Path = None): try: data_folder = Path("data_folder") secrets_file, config_file, plain_text_resume_file, output_folder = FileManager.validate_data_folder(data_folder) parameters = ConfigValidator.validate_config(config_file) - llm_api_key = ConfigValidator.validate_secrets(secrets_file) + email, password, llm_api_key = ConfigValidator.validate_secrets(secrets_file) parameters['uploads'] = FileManager.file_paths_to_dict(resume, plain_text_resume_file) parameters['outputFileDirectory'] = output_folder - create_and_run_bot(parameters, llm_api_key) + create_and_run_bot(email, password, parameters, llm_api_key) except ConfigError as ce: logger.error(f"Configuration error: {str(ce)}") logger.error(f"Refer to the configuration guide for troubleshooting: https://github.com/feder-cr/AIHawk_AIHawk_automatic_job_application/blob/main/readme.md#configuration {str(ce)}") diff --git a/requirements.txt b/requirements.txt index 9f94c9ed..12d1122c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ +lib_resume_builder_AIHawk @ git+https://github.com/feder-cr/lib_resume_builder_AIHawk.git@d4224a9fde34c95cce391651b920f6c94eade5d3 click -git+https://github.com/feder-cr/lib_resume_builder_AIHawk.git httpx~=0.27.2 inputimeout==1.0.4 jsonschema==4.23.0 diff --git a/src/aihawk_authenticator.py b/src/aihawk_authenticator.py index e729f96e..4389a57c 100644 --- a/src/aihawk_authenticator.py +++ b/src/aihawk_authenticator.py @@ -1,22 +1,34 @@ import random import time -from selenium.common.exceptions import NoSuchElementException, TimeoutException, NoAlertPresentException, TimeoutException, UnexpectedAlertPresentException +from loguru import logger +from selenium.common.exceptions import NoSuchElementException, TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait -from loguru import logger - class AIHawkAuthenticator: - def __init__(self, driver=None): + def __init__(self, driver=None, bot_facade=None): self.driver = driver + self.email = bot_facade.email if bot_facade else "" + self.password = bot_facade.password if bot_facade else "" logger.debug(f"AIHawkAuthenticator initialized with driver: {driver}") + def set_bot_facade(self, bot_facade): + self.email = bot_facade.email + self.password = bot_facade.password + logger.debug(f"Email and password set from bot_facade: email={self.email}, password={'*' * len(self.password)}") + + def start(self): - logger.info("Starting Chrome browser to log in to AIHawk.") + logger.info("Starting Chrome browser to log in to LinkedIn.") + self.driver.get('https://www.linkedin.com/feed') + self.wait_for_page_load() + + time.sleep(3) + if self.is_logged_in(): logger.info("User is already logged in. Skipping login process.") return @@ -25,47 +37,85 @@ def start(self): self.handle_login() def handle_login(self): - logger.info("Navigating to the AIHawk login page...") + logger.info("Navigating to the LinkedIn login page...") self.driver.get("https://www.linkedin.com/login") if 'feed' in self.driver.current_url: logger.debug("User is already logged in.") return try: self.enter_credentials() + self.submit_login_form() except NoSuchElementException as e: - logger.error(f"Could not log in to AIHawk. Element not found: {e}") + logger.error("Could not log in to LinkedIn. Element not found: %s", e) + time.sleep(random.uniform(3, 5)) self.handle_security_check() - def enter_credentials(self): try: - logger.debug("Enter credentials...") - - check_interval = 4 # Interval to log the current URL - elapsed_time = 0 - - while True: - # Log current URL every 4 seconds and remind the user to log in - current_url = self.driver.current_url - logger.info(f"Please login on {current_url}") - - # Check if the user is already on the feed page - if 'feed' in current_url: - logger.debug("Login successful, redirected to feed page.") - break - else: - # Optionally wait for the password field (or any other element you expect on the login page) - WebDriverWait(self.driver, 10).until( - EC.presence_of_element_located((By.ID, "password")) - ) - logger.debug("Password field detected, waiting for login completion.") - - time.sleep(check_interval) - elapsed_time += check_interval + logger.debug("Starting the process to enter credentials...") + + # Ожидание появления поля для ввода email + logger.debug("Waiting for the email input field to be present...") + email_field = WebDriverWait(self.driver, 15).until( + EC.presence_of_element_located((By.ID, "username")) + ) + logger.debug(f"Email input field found: {email_field}. Clearing and entering email now.") + email_field.clear() + email_field.click() + logger.debug(f"Attempting to enter email: {self.email}") + email_field.send_keys(self.email) + logger.debug("Email entered successfully. Verifying value in the field...") + + # Проверка значения в поле email + entered_email = email_field.get_attribute("value") + if entered_email != self.email: + logger.warning(f"Email was not correctly entered. Field value: {entered_email}. Retrying...") + email_field.clear() + email_field.send_keys(self.email) + + logger.debug(f"Email field final value: {entered_email}") + + # Ожидание появления поля для ввода пароля + logger.debug("Waiting for the password input field to be present...") + password_field = WebDriverWait(self.driver, 15).until( + EC.presence_of_element_located((By.ID, "password")) + ) + logger.debug(f"Password input field found: {password_field}. Clearing and entering password now.") + password_field.clear() + password_field.click() + logger.debug(f"Attempting to enter password: {'*' * len(self.password)}") # Маскируем отображение пароля + password_field.send_keys(self.password) + logger.debug("Password entered successfully. Verifying value in the field...") + + # Проверка значения в поле пароля + entered_password = password_field.get_attribute("value") + if entered_password != self.password: + logger.warning(f"Password was not correctly entered. Field value: {entered_password}. Retrying...") + password_field.clear() + password_field.send_keys(self.password) + + logger.debug(f"Password field final value: {'*' * len(entered_password)}") except TimeoutException: - logger.error("Login form not found. Aborting login.") + logger.error("Login form not found within the timeout period. Aborting login.") + raise + except NoSuchElementException as e: + logger.error(f"An element was not found: {str(e)}") + raise + except Exception as e: + logger.error(f"An unexpected error occurred while entering credentials: {str(e)}") + raise + + def submit_login_form(self): + try: + logger.debug("Submitting login form...") + login_button = self.driver.find_element(By.XPATH, '//button[@type="submit"]') + login_button.click() + logger.debug("Login form submitted.") + except NoSuchElementException: + logger.error("Login button not found. Please verify the page structure.") + print("Login button not found. Please verify the page structure.") def handle_security_check(self): try: @@ -82,10 +132,17 @@ def handle_security_check(self): logger.error("Security check not completed. Please try again later.") def is_logged_in(self): + # target_url = 'https://www.linkedin.com/feed' + # + # # Navigate to the target URL if not already there + # if self.driver.current_url != target_url: + # logger.debug("Navigating to target URL: %s", target_url) + # self.driver.get(target_url) + try: - self.driver.get('https://www.linkedin.com/feed') + # Increase the wait time for the page elements to load logger.debug("Checking if user is logged in...") - WebDriverWait(self.driver, 3).until( + WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.CLASS_NAME, 'share-box-feed-entry__trigger')) ) @@ -110,4 +167,15 @@ def is_logged_in(self): except TimeoutException: logger.error("Page elements took too long to load or were not found.") - return False \ No newline at end of file + return False + + def wait_for_page_load(self, timeout=10): + try: + logger.debug("Waiting for page to load with timeout: %s seconds", timeout) + WebDriverWait(self.driver, timeout).until( + lambda d: d.execute_script('return document.readyState') == 'complete' + ) + logger.debug("Page load completed.") + except TimeoutException: + logger.error("Page load timed out.") + print("Page load timed out.") diff --git a/src/aihawk_bot_facade.py b/src/aihawk_bot_facade.py index 1f5930b4..6c6488f7 100644 --- a/src/aihawk_bot_facade.py +++ b/src/aihawk_bot_facade.py @@ -43,8 +43,20 @@ def set_job_application_profile_and_resume(self, job_application_profile, resume self.job_application_profile = job_application_profile self.resume = resume self.state.job_application_profile_set = True + + # Pass job_application_profile to apply_component (LinkedInJobManager) + self.apply_component.set_job_application_profile(job_application_profile) + logger.debug("Job application profile and resume set successfully") + def set_secrets(self, email, password): + logger.debug("Setting secrets: email and password") + self._validate_non_empty(email, "Email") + self._validate_non_empty(password, "Password") + self.email = email + self.password = password + self.state.credentials_set = True + logger.debug("Secrets set successfully") def set_gpt_answerer_and_resume_generator(self, gpt_answerer_component, resume_generator_manager): logger.debug("Setting GPT answerer and resume generator") diff --git a/src/aihawk_easy_applier.py b/src/aihawk_easy_applier.py index 35ec76da..a6f6247f 100644 --- a/src/aihawk_easy_applier.py +++ b/src/aihawk_easy_applier.py @@ -8,8 +8,10 @@ from typing import List, Optional, Any, Tuple from httpx import HTTPStatusError +from loguru import logger from reportlab.lib.pagesizes import A4 -from reportlab.pdfgen import canvas +from reportlab.lib.styles import getSampleStyleSheet +from reportlab.platypus import SimpleDocTemplate, Paragraph from selenium.common.exceptions import NoSuchElementException, TimeoutException from selenium.webdriver import ActionChains from selenium.webdriver.common.by import By @@ -23,21 +25,19 @@ class AIHawkEasyApplier: - def __init__(self, driver: Any, resume_dir: Optional[str], set_old_answers: List[Tuple[str, str, str]], - gpt_answerer: Any, resume_generator_manager): + def __init__(self, driver, resume_dir, set_old_answers, gpt_answerer, resume_generator_manager, job_application_profile): logger.debug("Initializing AIHawkEasyApplier") - if resume_dir is None or not os.path.exists(resume_dir): - resume_dir = None self.driver = driver self.resume_path = resume_dir self.set_old_answers = set_old_answers self.gpt_answerer = gpt_answerer self.resume_generator_manager = resume_generator_manager + self.job_application_profile = job_application_profile # Store the job_application_profile self.all_data = self._load_questions_from_json() - logger.debug("AIHawkEasyApplier initialized successfully") def _load_questions_from_json(self) -> List[dict]: + """Load previously stored questions and answers from a JSON file.""" output_file = 'answers.json' logger.debug(f"Loading questions from JSON file: {output_file}") try: @@ -60,28 +60,32 @@ def _load_questions_from_json(self) -> List[dict]: raise Exception(f"Error loading questions data from JSON file: \nTraceback:\n{tb_str}") def check_for_premium_redirect(self, job: Any, max_attempts=3): - + """ + Checks if the current page redirects to a LinkedIn Premium page and attempts to navigate back to the job page. + Args: + job (Any): The job object containing the job link. + max_attempts (int): Maximum number of attempts to try navigating back. + """ current_url = self.driver.current_url attempts = 0 while "linkedin.com/premium" in current_url and attempts < max_attempts: - logger.warning("Redirected to AIHawk Premium page. Attempting to return to job page.") + logger.warning("Redirected to LinkedIn Premium page. Attempting to return to the job page.") attempts += 1 - self.driver.get(job.link) time.sleep(2) current_url = self.driver.current_url if "linkedin.com/premium" in current_url: - logger.error(f"Failed to return to job page after {max_attempts} attempts. Cannot apply for the job.") + logger.error(f"Failed to return to the job page after {max_attempts} attempts. Cannot apply for the job.") raise Exception( - f"Redirected to AIHawk Premium page and failed to return after {max_attempts} attempts. Job application aborted.") - + f"Redirected to LinkedIn Premium page and failed to return after {max_attempts} attempts. Job application aborted.") + def apply_to_job(self, job: Any) -> None: """ Starts the process of applying to a job. - :param job: A job object with the job details. - :return: None + Args: + job (Any): A job object with the job details. """ logger.debug(f"Applying to job: {job}") try: @@ -92,6 +96,11 @@ def apply_to_job(self, job: Any) -> None: raise e def job_apply(self, job: Any): + """ + Main function to apply for a LinkedIn job using the Easy Apply feature. + Args: + job (Any): The job object containing details like job link, company, and position. + """ logger.debug(f"Starting job application for job: {job}") try: @@ -105,14 +114,21 @@ def job_apply(self, job: Any): self.check_for_premium_redirect(job) try: + if self.driver.find_elements(By.XPATH, "//*[contains(text(), 'Application submitted')]"): + logger.info(f"Job application already submitted for job: {job}. Skipping.") + return + else: + logger.debug("No indication of prior application found. Proceeding with application.") + except Exception as e: + logger.error(f"Error while checking for application status: {e}") + raise + try: self.driver.execute_script("document.activeElement.blur();") logger.debug("Focus removed from the active element") self.check_for_premium_redirect(job) - easy_apply_button = self._find_easy_apply_button(job) - self.check_for_premium_redirect(job) logger.debug("Retrieving job description") @@ -125,22 +141,48 @@ def job_apply(self, job: Any): job.set_recruiter_link(recruiter_link) logger.debug(f"Recruiter link set: {recruiter_link}") - logger.debug("Attempting to click 'Easy Apply' button") - actions = ActionChains(self.driver) - actions.move_to_element(easy_apply_button).click().perform() - logger.debug("'Easy Apply' button clicked successfully") + # Try clicking the "Easy Apply" button + try: + self.handle_safety_reminder_modal(self.driver) + + logger.debug("Attempting to click 'Easy Apply' button using ActionChains") + actions = ActionChains(self.driver) + actions.move_to_element(easy_apply_button).click().perform() + logger.debug("'Easy Apply' button clicked successfully") + + self.handle_safety_reminder_modal(self.driver) + + # Verify if the form has opened + time.sleep(2) + if not self._is_form_open(): + logger.error("Form did not open after clicking 'Easy Apply' button.") + raise Exception("Failed to open form after clicking 'Easy Apply'.") + except Exception as e: + logger.warning(f"Failed to click 'Easy Apply' button using ActionChains: {e}, trying JavaScript click") + try: + # Use JavaScript for clicking if ActionChains did not work + self.driver.execute_script("arguments[0].click();", easy_apply_button) + logger.debug("'Easy Apply' button clicked successfully via JavaScript") + + # Check if the form opened again + time.sleep(2) + if not self._is_form_open(): + logger.error("Form did not open after clicking 'Easy Apply' button using JavaScript.") + raise Exception("Failed to open form after clicking 'Easy Apply' with JavaScript.") + except Exception as js_e: + logger.error(f"Failed to click 'Easy Apply' button using JavaScript: {js_e}") + raise # Stop execution if the form does not open logger.debug("Passing job information to GPT Answerer") self.gpt_answerer.set_job(job) - logger.debug("Filling out application form") + logger.debug("Filling out the application form") self._fill_application_form(job) logger.debug(f"Job application process completed successfully for job: {job}") except Exception as e: - tb_str = traceback.format_exc() - logger.error(f"Failed to apply to job: {job}, error: {tb_str}") + logger.error(f"Failed to apply to job: {job}. Error traceback: {tb_str}") logger.debug("Discarding application due to failure") self._discard_application() @@ -148,78 +190,193 @@ def job_apply(self, job: Any): raise Exception(f"Failed to apply to job! Original exception:\nTraceback:\n{tb_str}") def _find_easy_apply_button(self, job: Any) -> WebElement: + """ + Finds the 'Easy Apply' button on the job page using various search methods. + Args: + job (Any): The job object containing details like the job link. + Returns: + WebElement: The Easy Apply button element if found. + Raises: + Exception: If the Easy Apply button cannot be found after several attempts. + """ logger.debug("Searching for 'Easy Apply' button") attempt = 0 + max_attempts = 3 + timeout = 10 + # Multiple search strategies to locate the Easy Apply button search_methods = [ { - 'description': "find all 'Easy Apply' buttons using find_elements", - 'find_elements': True, - 'xpath': '//button[contains(@class, "jobs-apply-button") and contains(., "Easy Apply")]' + 'description': "Button within 'jobs-s-apply' div with class 'jobs-apply-button' and text containing 'Easy Apply'", + 'xpath': '//div[contains(@class, "jobs-s-apply")]//button[contains(@class, "jobs-apply-button") and .//span[text()="Easy Apply"]]', + 'count': 0 + }, + { + 'description': "Button with class 'jobs-apply-button' and normalized text 'Easy Apply'", + 'xpath': '//button[contains(@class, "jobs-apply-button") and normalize-space(text())="Easy Apply"]', + 'count': 0 + }, + { + 'description': "Button with ID 'ember40' and class 'artdeco-button--primary'", + 'xpath': "//button[@id='ember40' and contains(@class, 'artdeco-button--primary')]", + 'count': 0 }, { - 'description': "'aria-label' containing 'Easy Apply to'", - 'xpath': '//button[contains(@aria-label, "Easy Apply to")]' + 'description': "Button with aria-label containing 'Easy Apply to' and class 'jobs-apply-button'", + 'xpath': '//button[contains(@aria-label, "Easy Apply") and contains(@class, "jobs-apply-button")]', + 'count': 0 }, { - 'description': "button text search", - 'xpath': '//button[contains(text(), "Easy Apply") or contains(text(), "Apply now")]' + 'description': "Button with class 'jobs-apply-button' and text 'Easy Apply'", + 'xpath': '//button[contains(@class, "jobs-apply-button") and normalize-space(text())="Easy Apply"]', + 'count': 0 + }, + { + 'description': "Button using partial match for class 'artdeco-button--primary' and text 'Easy Apply'", + 'xpath': '//button[contains(@class, "artdeco-button--primary") and contains(., "Easy Apply")]', + 'count': 0 + }, + { + 'description': "CSS Selector for button with class 'artdeco-button__text' under #ember41", + 'css': '#ember41 > .artdeco-button__text', + 'count': 0 + }, + { + 'description': "CSS Selector for button with class 'artdeco-button__text' under #ember120", + 'css': '#ember120 > .artdeco-button__text', + 'count': 0 + }, + { + 'description': "XPath for span containing 'Easy Apply'", + 'xpath': '//span[contains(text(), "Easy Apply")]', + 'count': 0 } ] - while attempt < 2: - + while attempt < max_attempts: self.check_for_premium_redirect(job) self._scroll_page() + try: + WebDriverWait(self.driver, timeout).until( + lambda d: d.execute_script('return document.readyState') == 'complete' + ) + except TimeoutException: + logger.warning("Page did not load within the timeout period") + + try: + logger.info("Removing focus from the active element (likely URL bar)") + self.driver.execute_script("document.activeElement.blur();") + time.sleep(1) + + logger.info("Focusing on body element") + body_element = self.driver.find_element(By.TAG_NAME, 'body') + self.driver.execute_script("arguments[0].focus();", body_element) + time.sleep(1) + except Exception as e: + logger.warning(f"Failed to reset focus: {e}") + + # Attempting to find the button using the defined search methods for method in search_methods: try: - logger.debug(f"Attempting search using {method['description']}") - - if method.get('find_elements'): - - buttons = self.driver.find_elements(By.XPATH, method['xpath']) - if buttons: - for index, button in enumerate(buttons): - try: - - WebDriverWait(self.driver, 10).until(EC.visibility_of(button)) - WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable(button)) - logger.debug(f"Found 'Easy Apply' button {index + 1}, attempting to click") - return button - except Exception as e: - logger.warning(f"Button {index + 1} found but not clickable: {e}") - else: - raise TimeoutException("No 'Easy Apply' buttons found") - else: + logger.info( + f"Attempt {attempt + 1}: Searching for 'Easy Apply' button using {method['description']}") - button = WebDriverWait(self.driver, 10).until( - EC.presence_of_element_located((By.XPATH, method['xpath'])) + if 'xpath' in method: + buttons = WebDriverWait(self.driver, timeout).until( + EC.presence_of_all_elements_located((By.XPATH, method['xpath'])) ) - WebDriverWait(self.driver, 10).until(EC.visibility_of(button)) - WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable(button)) - logger.debug("Found 'Easy Apply' button, attempting to click") - return button + elif 'css' in method: + buttons = WebDriverWait(self.driver, timeout).until( + EC.presence_of_all_elements_located((By.CSS_SELECTOR, method['css'])) + ) + + for index, _ in enumerate(buttons): + try: + logger.info(f"Checking button at index {index + 1}") + + if 'xpath' in method: + button = WebDriverWait(self.driver, timeout).until( + EC.element_to_be_clickable( + (By.XPATH, f'({method["xpath"]})[{index + 1}]') + ) + ) + elif 'css' in method: + button = WebDriverWait(self.driver, timeout).until( + EC.element_to_be_clickable( + (By.CSS_SELECTOR, method['css']) + ) + ) + + if button.is_enabled() and button.is_displayed(): + logger.info( + f"'Easy Apply' button found and clickable using {method['description']} at index {index + 1}") + method['count'] += 1 + self._save_search_statistics(search_methods) + return button + else: + logger.warning("Button is not enabled or displayed") + except Exception as e: + logger.warning( + f"Failed to click on 'Easy Apply' button at index {index + 1} using {method['description']}: {e}") except TimeoutException: logger.warning(f"Timeout during search using {method['description']}") except Exception as e: logger.warning( - f"Failed to click 'Easy Apply' button using {method['description']} on attempt {attempt + 1}: {e}") - - self.check_for_premium_redirect(job) + f"Failed to find 'Easy Apply' button using {method['description']} on attempt {attempt + 1}: {e}") if attempt == 0: - logger.debug("Refreshing page to retry finding 'Easy Apply' button") + logger.info("Refreshing page to retry finding 'Easy Apply' button") self.driver.refresh() time.sleep(random.randint(3, 5)) attempt += 1 - page_source = self.driver.page_source - logger.error(f"No clickable 'Easy Apply' button found after 2 attempts. Page source:\n{page_source}") + logger.error("No clickable 'Easy Apply' button found after all attempts") raise Exception("No clickable 'Easy Apply' button found") + def _save_search_statistics(self, search_methods): + """ + Saves statistics of the button search attempts to a file for tracking. + Args: + search_methods: List of search strategies used to locate the Easy Apply button. + """ + file_path = 'easy_apply_search_stats.txt' + stats = {} + + if os.path.exists(file_path): + try: + with open(file_path, 'r', encoding='utf-8') as f: + for line in f: + parts = line.split(':') + if len(parts) == 2: + description = parts[0].strip() + count = int(parts[1].strip()) + stats[description] = count + except Exception as e: + logger.error(f"Failed to read existing search statistics: {e}") + + # Update statistics with new data + for method in search_methods: + if method['description'] in stats: + stats[method['description']] += method['count'] + else: + stats[method['description']] = method['count'] + + try: + with open(file_path, 'w', encoding='utf-8') as f: + for description, count in stats.items(): + f.write(f"{description}: {count}\n") + logger.info(f"Search statistics updated in {file_path}") + except Exception as e: + logger.error(f"Failed to save search statistics: {e}") + def _get_job_description(self) -> str: + """ + Extracts the job description from the LinkedIn job page. + Returns: + str: The extracted job description text. + """ logger.debug("Getting job description") try: try: @@ -244,6 +401,11 @@ def _get_job_description(self) -> str: raise Exception(f"Error getting Job description: \nTraceback:\n{tb_str}") def _get_job_recruiter(self): + """ + Extracts the recruiter link from the LinkedIn job page if available. + Returns: + str: The URL of the recruiter profile or an empty string if not found. + """ logger.debug("Getting job recruiter information") try: hiring_team_section = WebDriverWait(self.driver, 10).until( @@ -260,10 +422,16 @@ def _get_job_recruiter(self): logger.debug(f"Job recruiter link retrieved successfully: {recruiter_link}") return recruiter_link else: - logger.debug("No recruiter link found in the hiring team section") + logger.info("Recruiter link not found in the 'Meet the hiring team' section.") return "" + + except TimeoutException: + logger.info( + "The 'Meet the hiring team' section is not present on the page.") + return "" + except Exception as e: - logger.warning(f"Failed to retrieve recruiter information: {e}") + logger.error(f"An unexpected error occurred while retrieving recruiter information: {e}", exc_info=True) return "" def _scroll_page(self) -> None: @@ -274,16 +442,24 @@ def _scroll_page(self) -> None: def _fill_application_form(self, job): logger.debug(f"Filling out application form for job: {job}") - while True: - self.fill_up(job) - if self._next_or_submit(): - logger.debug("Application form submitted") - break + + form_filled = False + try: + while not form_filled: + self.fill_up(job) + form_filled = self._next_or_submit() + if form_filled: + logger.debug("Application form submitted successfully") + return + except Exception as e: + logger.error(f"Form filling failed: {e}. Skipping this job.") + self._discard_application() def _next_or_submit(self): logger.debug("Clicking 'Next' or 'Submit' button") next_button = self.driver.find_element(By.CLASS_NAME, "artdeco-button--primary") button_text = next_button.text.lower() + if 'submit application' in button_text: logger.debug("Submit button found, submitting application") self._unfollow_company() @@ -291,10 +467,16 @@ def _next_or_submit(self): next_button.click() time.sleep(random.uniform(1.5, 2.5)) return True - time.sleep(random.uniform(1.5, 2.5)) - next_button.click() - time.sleep(random.uniform(3.0, 5.0)) - self._check_for_errors() + else: + time.sleep(random.uniform(1.5, 2.5)) + next_button.click() + + WebDriverWait(self.driver, 10).until( + EC.presence_of_element_located((By.CLASS_NAME, 'jobs-easy-apply-content')) + ) + time.sleep(random.uniform(3.0, 5.0)) + self._check_for_errors() + return False def _unfollow_company(self) -> None: try: @@ -303,14 +485,15 @@ def _unfollow_company(self) -> None: By.XPATH, "//label[contains(.,'to stay up to date with their page.')]") follow_checkbox.click() except Exception as e: - logger.debug(f"Failed to unfollow company: {e}") + logger.warning(f"Failed to unfollow company: {e}") def _check_for_errors(self) -> None: logger.debug("Checking for form errors") error_elements = self.driver.find_elements(By.CLASS_NAME, 'artdeco-inline-feedback--error') if error_elements: - logger.error(f"Form submission failed with errors: {error_elements}") - raise Exception(f"Failed answering or file upload. {str([e.text for e in error_elements])}") + error_texts = [e.text for e in error_elements] + logger.error(f"Form submission failed with errors: {error_texts}") + raise Exception(f"Failed answering or file upload. {error_texts}") def _discard_application(self) -> None: logger.debug("Discarding application") @@ -333,56 +516,24 @@ def fill_up(self, job) -> None: pb4_elements = easy_apply_content.find_elements(By.CLASS_NAME, 'pb4') for element in pb4_elements: self._process_form_element(element, job) + + self._fill_additional_questions() except Exception as e: logger.error(f"Failed to find form elements: {e}") def _process_form_element(self, element: WebElement, job) -> None: logger.debug("Processing form element") + if self._is_upload_field(element): self._handle_upload_fields(element, job) else: - self._fill_additional_questions() - - def _handle_dropdown_fields(self, element: WebElement) -> None: - logger.debug("Handling dropdown fields") - - dropdown = element.find_element(By.TAG_NAME, 'select') - select = Select(dropdown) - - options = [option.text for option in select.options] - logger.debug(f"Dropdown options found: {options}") - - parent_element = dropdown.find_element(By.XPATH, '../..') - - label_elements = parent_element.find_elements(By.TAG_NAME, 'label') - if label_elements: - question_text = label_elements[0].text.lower() - else: - question_text = "unknown" - - logger.debug(f"Detected question text: {question_text}") - - existing_answer = None - for item in self.all_data: - if self._sanitize_text(question_text) in item['question'] and item['type'] == 'dropdown': - existing_answer = item['answer'] - break - - if existing_answer: - logger.debug(f"Found existing answer for question '{question_text}': {existing_answer}") - else: - - logger.debug(f"No existing answer found, querying model for: {question_text}") - existing_answer = self.gpt_answerer.answer_question_from_options(question_text, options) - logger.debug(f"Model provided answer: {existing_answer}") - self._save_questions_to_json({'type': 'dropdown', 'question': question_text, 'answer': existing_answer}) - - if existing_answer in options: - select.select_by_visible_text(existing_answer) - logger.debug(f"Selected option: {existing_answer}") - else: - logger.error(f"Answer '{existing_answer}' is not a valid option in the dropdown") - raise Exception(f"Invalid option selected: {existing_answer}") + try: + label_element = element.find_element(By.XPATH, + '//label[@data-test-text-selectable-option__label="I do not have any adjustment requirements"]') + label_element.click() + logger.debug("Successfully clicked on the label") + except Exception as e: + logger.warning(f"Failed to click on the label: {e}") def _is_upload_field(self, element: WebElement) -> bool: is_upload = bool(element.find_elements(By.XPATH, ".//input[@type='file']")) @@ -392,31 +543,70 @@ def _is_upload_field(self, element: WebElement) -> bool: def _handle_upload_fields(self, element: WebElement, job) -> None: logger.debug("Handling upload fields") + resume_uploaded = False + try: show_more_button = self.driver.find_element(By.XPATH, - "//button[contains(@aria-label, 'Show more resumes')]") + "//button[contains(@aria-label, 'Show') and contains(@aria-label, 'more resumes')]") show_more_button.click() logger.debug("Clicked 'Show more resumes' button") except NoSuchElementException: logger.debug("'Show more resumes' button not found, continuing...") file_upload_elements = self.driver.find_elements(By.XPATH, "//input[@type='file']") - for element in file_upload_elements: - parent = element.find_element(By.XPATH, "..") - self.driver.execute_script("arguments[0].classList.remove('hidden')", element) - - output = self.gpt_answerer.resume_or_cover(parent.text.lower()) - if 'resume' in output: - logger.debug("Uploading resume") - if self.resume_path is not None and self.resume_path.resolve().is_file(): - element.send_keys(str(self.resume_path.resolve())) - logger.debug(f"Resume uploaded from path: {self.resume_path.resolve()}") + for upload_element in file_upload_elements: + parent = upload_element.find_element(By.XPATH, "..") + + if 'upload-resume' in upload_element.get_attribute('id') and not resume_uploaded: + logger.debug("Detected resume upload input by ID") + + # Step 1: Check if resume file path is valid and if the file is already uploaded + resume_filename = os.path.basename(self.resume_path) if self.resume_path else None + + if resume_filename and self.resume_path and os.path.isfile(self.resume_path): + # Check if the resume is already uploaded + if self.is_resume_already_uploaded(self.driver, resume_filename): + logger.info(f"Resume '{resume_filename}' is already uploaded. Skipping re-upload.") + resume_uploaded = True + continue + + # Upload the resume if it hasn't been uploaded yet + logger.debug(f"Uploading resume from path: {self.resume_path}") + upload_element.send_keys(os.path.abspath(self.resume_path)) + resume_uploaded = True + continue else: logger.debug("Resume path not found or invalid, generating new resume") - self._create_and_upload_resume(element, job) - elif 'cover' in output: - logger.debug("Uploading cover letter") - self._create_and_upload_cover_letter(element, job) + self._create_and_upload_resume(upload_element, job) + resume_uploaded = True + continue + + if not resume_uploaded: + self.driver.execute_script("arguments[0].classList.remove('hidden')", upload_element) + + output = self.gpt_answerer.resume_or_cover(parent.text.lower()) + + if 'resume' in output: + logger.debug("Uploading resume based on text detection") + if self.resume_path is not None and os.path.isfile(self.resume_path): + # Check again before uploading based on text detection + resume_filename = os.path.basename(self.resume_path) + if self.is_resume_already_uploaded(self.driver, resume_filename): + logger.info( + f"Resume '{resume_filename}' is already uploaded based on text detection. Skipping upload.") + resume_uploaded = True + continue + + upload_element.send_keys(os.path.abspath(self.resume_path)) + logger.debug(f"Resume uploaded from path: {self.resume_path}") + resume_uploaded = True + else: + logger.debug("Resume path not found or invalid, generating new resume") + self._create_and_upload_resume(upload_element, job) + resume_uploaded = True + elif 'cover' in output: + logger.debug("Uploading cover letter based on text detection") + self._create_and_upload_cover_letter(upload_element, job) logger.debug("Finished handling upload fields") @@ -425,17 +615,19 @@ def _create_and_upload_resume(self, element, job): folder_path = 'generated_cv' try: - if not os.path.exists(folder_path): - logger.debug(f"Creating directory at path: {folder_path}") os.makedirs(folder_path, exist_ok=True) + logger.debug(f"Ensured directory exists at path: {folder_path}") except Exception as e: logger.error(f"Failed to create directory: {folder_path}. Error: {e}") raise while True: try: + candidate_first_name = self.job_application_profile.personal_information.name + candidate_last_name = self.job_application_profile.personal_information.surname timestamp = int(time.time()) - file_path_pdf = os.path.join(folder_path, f"CV_{timestamp}.pdf") + file_name = f"CV_{candidate_first_name}_{candidate_last_name}_{timestamp}.pdf" + file_path_pdf = os.path.join(folder_path, file_name) logger.debug(f"Generated file path for resume: {file_path_pdf}") logger.debug(f"Generating resume for job: {job.title} at {job.company}") @@ -443,11 +635,9 @@ def _create_and_upload_resume(self, element, job): with open(file_path_pdf, "xb") as f: f.write(base64.b64decode(resume_pdf_base64)) logger.debug(f"Resume successfully generated and saved to: {file_path_pdf}") - break except HTTPStatusError as e: if e.response.status_code == 429: - retry_after = e.response.headers.get('retry-after') retry_after_ms = e.response.headers.get('retry-after-ms') @@ -465,7 +655,6 @@ def _create_and_upload_resume(self, element, job): else: logger.error(f"HTTP error: {e}") raise - except Exception as e: logger.error(f"Failed to generate resume: {e}") tb_str = traceback.format_exc() @@ -504,15 +693,13 @@ def _create_and_upload_resume(self, element, job): def _create_and_upload_cover_letter(self, element: WebElement, job) -> None: logger.debug("Starting the process of creating and uploading cover letter.") - cover_letter_text = self.gpt_answerer.answer_question_textual_wide_range("Write a cover letter") + cover_letter_text = self.gpt_answerer.answer_question_textual_wide_range("cover_letter") folder_path = 'generated_cv' try: - - if not os.path.exists(folder_path): - logger.debug(f"Creating directory at path: {folder_path}") os.makedirs(folder_path, exist_ok=True) + logger.debug(f"Ensured directory exists at path: {folder_path}") except Exception as e: logger.error(f"Failed to create directory: {folder_path}. Error: {e}") raise @@ -523,51 +710,26 @@ def _create_and_upload_cover_letter(self, element: WebElement, job) -> None: file_path_pdf = os.path.join(folder_path, f"Cover_Letter_{timestamp}.pdf") logger.debug(f"Generated file path for cover letter: {file_path_pdf}") - c = canvas.Canvas(file_path_pdf, pagesize=A4) - page_width, page_height = A4 - text_object = c.beginText(50, page_height - 50) - text_object.setFont("Helvetica", 12) - - max_width = page_width - 100 - bottom_margin = 50 - available_height = page_height - bottom_margin - 50 - - def split_text_by_width(text, font, font_size, max_width): - wrapped_lines = [] - for line in text.splitlines(): - - if utils.stringWidth(line, font, font_size) > max_width: - words = line.split() - new_line = "" - for word in words: - if utils.stringWidth(new_line + word + " ", font, font_size) <= max_width: - new_line += word + " " - else: - wrapped_lines.append(new_line.strip()) - new_line = word + " " - wrapped_lines.append(new_line.strip()) - else: - wrapped_lines.append(line) - return wrapped_lines - - lines = split_text_by_width(cover_letter_text, "Helvetica", 12, max_width) - - for line in lines: - text_height = text_object.getY() - if text_height > bottom_margin: - text_object.textLine(line) - else: + styles = getSampleStyleSheet() + style = styles["Normal"] + style.fontName = "Helvetica" + style.fontSize = 12 + style.leading = 15 - c.drawText(text_object) - c.showPage() - text_object = c.beginText(50, page_height - 50) - text_object.setFont("Helvetica", 12) - text_object.textLine(line) + story = [Paragraph(cover_letter_text, style)] - c.drawText(text_object) - c.save() - logger.debug(f"Cover letter successfully generated and saved to: {file_path_pdf}") + doc = SimpleDocTemplate( + file_path_pdf, + pagesize=A4, + rightMargin=20, + leftMargin=20, + topMargin=20, + bottomMargin=20 + ) + + doc.build(story) + logger.debug(f"Cover letter successfully generated and saved to: {file_path_pdf}") break except Exception as e: logger.error(f"Failed to generate cover letter: {e}") @@ -590,7 +752,6 @@ def split_text_by_width(text, font, font_size, max_width): raise ValueError("Cover letter file format is not allowed. Only PDF, DOC, and DOCX formats are supported.") try: - logger.debug(f"Uploading cover letter from path: {file_path_pdf}") element.send_keys(os.path.abspath(file_path_pdf)) job.cover_letter_path = os.path.abspath(file_path_pdf) @@ -621,7 +782,9 @@ def _process_form_section(self, section: WebElement) -> None: if self._find_and_handle_date_question(section): logger.debug("Handled date question") return - + if self._find_and_handle_checkbox_question(section): + logger.debug("Handled checkbox question") + return if self._find_and_handle_dropdown_question(section): logger.debug("Handled dropdown question") return @@ -646,7 +809,6 @@ def _find_and_handle_radio_question(self, section: WebElement) -> bool: for item in self.all_data: if self._sanitize_text(question_text) in item['question'] and item['type'] == 'radio': existing_answer = item - break if existing_answer: self._select_radio(radios, existing_answer['answer']) @@ -660,6 +822,14 @@ def _find_and_handle_radio_question(self, section: WebElement) -> bool: return True return False + def _select_radio(self, radios: List[WebElement], answer: str) -> None: + logger.debug(f"Selecting radio option: {answer}") + for radio in radios: + if answer in radio.text.lower(): + radio.find_element(By.TAG_NAME, 'label').click() + return + radios[-1].find_element(By.TAG_NAME, 'label').click() + def _find_and_handle_textbox_question(self, section: WebElement) -> bool: logger.debug("Searching for text fields in the section.") text_fields = section.find_elements(By.TAG_NAME, 'input') + section.find_elements(By.TAG_NAME, 'textarea') @@ -672,38 +842,34 @@ def _find_and_handle_textbox_question(self, section: WebElement) -> bool: is_numeric = self._is_numeric_field(text_field) logger.debug(f"Is the field numeric? {'Yes' if is_numeric else 'No'}") + existing_answer = None question_type = 'numeric' if is_numeric else 'textbox' - # Check if it's a cover letter field (case-insensitive) - is_cover_letter = 'cover letter' in question_text.lower() + for item in self.all_data: + if self._sanitize_text(item['question']) == self._sanitize_text(question_text) and item.get('type') == question_type: + existing_answer = item + logger.debug(f"Found existing answer in the data: {existing_answer['answer']}") + break - # Look for existing answer if it's not a cover letter field - existing_answer = None - if not is_cover_letter: - for item in self.all_data: - if self._sanitize_text(item['question']) == self._sanitize_text(question_text) and item.get('type') == question_type: - existing_answer = item['answer'] - logger.debug(f"Found existing answer: {existing_answer}") - break + if existing_answer: + self._enter_text(text_field, existing_answer['answer']) + logger.debug("Entered existing answer into the textbox.") + time.sleep(1) + text_field.send_keys(Keys.ARROW_DOWN) + text_field.send_keys(Keys.ENTER) + logger.debug("Selected first option from the dropdown.") + return True - if existing_answer and not is_cover_letter: - answer = existing_answer - logger.debug(f"Using existing answer: {answer}") + if is_numeric: + answer = self.gpt_answerer.answer_question_numeric(question_text) + logger.debug(f"Generated numeric answer: {answer}") else: - if is_numeric: - answer = self.gpt_answerer.answer_question_numeric(question_text) - logger.debug(f"Generated numeric answer: {answer}") - else: - answer = self.gpt_answerer.answer_question_textual_wide_range(question_text) - logger.debug(f"Generated textual answer: {answer}") + answer = self.gpt_answerer.answer_question_textual_wide_range(question_text) + logger.debug(f"Generated textual answer: {answer}") + self._save_questions_to_json({'type': question_type, 'question': question_text, 'answer': answer}) self._enter_text(text_field, answer) - logger.debug("Entered answer into the textbox.") - - # Save non-cover letter answers - if not is_cover_letter: - self._save_questions_to_json({'type': question_type, 'question': question_text, 'answer': answer}) - logger.debug("Saved non-cover letter answer to JSON.") + logger.debug("Entered new answer into the textbox and saved it to JSON.") time.sleep(1) text_field.send_keys(Keys.ARROW_DOWN) @@ -714,29 +880,116 @@ def _find_and_handle_textbox_question(self, section: WebElement) -> bool: logger.debug("No text fields found in the section.") return False + def _is_numeric_field(self, field: WebElement) -> bool: + field_type = field.get_attribute('type').lower() + field_id = field.get_attribute("id").lower() + is_numeric = 'numeric' in field_id or field_type == 'number' or ('text' == field_type and 'numeric' in field_id) + logger.debug(f"Field type: {field_type}, Field ID: {field_id}, Is numeric: {is_numeric}") + return is_numeric + + def _enter_text(self, element: WebElement, text: str) -> None: + logger.debug(f"Entering text: {text}") + element.clear() + element.send_keys(text) + def _find_and_handle_date_question(self, section: WebElement) -> bool: - date_fields = section.find_elements(By.CLASS_NAME, 'artdeco-datepicker__input ') + logger.debug("Searching for date fields in the section.") + date_fields = section.find_elements(By.CLASS_NAME, 'artdeco-datepicker__input') + if date_fields: date_field = date_fields[0] - question_text = section.text.lower() - answer_date = self.gpt_answerer.answer_question_date() - answer_text = answer_date.strftime("%Y-%m-%d") + question_text = section.text.lower().strip() - existing_answer = None - for item in self.all_data: - if self._sanitize_text(question_text) in item['question'] and item['type'] == 'date': - existing_answer = item + placeholder = date_field.get_attribute('placeholder') + if placeholder: + logger.debug(f"Detected date format placeholder: {placeholder}") + try: + date_format = self._infer_date_format_from_placeholder(placeholder) + except Exception as e: + logger.error(f"Failed to infer date format from placeholder: {e}. Defaulting to %m/%d/%Y.") + date_format = "%m/%d/%Y" + else: + logger.warning("No placeholder found. Defaulting to %m/%d/%Y.") + date_format = "%m/%d/%Y" + logger.debug(f"Classifying question for date input: {question_text}") + try: + answer_date = self.gpt_answerer.answer_question_date(question_text) + answer_text = answer_date.strftime(date_format) + except Exception as e: + logger.error(f"Error generating answer date from model: {e}") + return False + + try: + self._enter_text(date_field, answer_text) + logger.debug(f"Entered date '{answer_text}' in the format {date_format}.") + return True + except Exception as e: + logger.error(f"Failed to enter date: {e}") + return False + + logger.debug("No date fields found in the section.") + return False + + def _infer_date_format_from_placeholder(self, placeholder: str) -> str: + format_map = { + "dd": "%d", + "mm": "%m", + "yyyy": "%Y", + "yy": "%y" + } + + for key, value in format_map.items(): + placeholder = placeholder.replace(key, value) + + logger.debug(f"Inferred date format: {placeholder}") + return placeholder + + def _find_and_handle_checkbox_question(self, section: WebElement) -> bool: + logger.debug("Searching for checkbox fields in the section.") + checkboxes = section.find_elements(By.XPATH, ".//input[@type='checkbox']") + + if checkboxes: + question_text_element = section.find_elements(By.CLASS_NAME, 'fb-form-element-label__title') + question_text = question_text_element[0].text.lower().strip() if question_text_element else "unknown question" + logger.debug(f"Found checkbox group with label: {question_text}") + + options = [] + for checkbox in checkboxes: + option_label = section.find_element(By.XPATH, f".//label[@for='{checkbox.get_attribute('id')}']").text.strip() + options.append(option_label) + + logger.debug(f"Available checkbox options: {options}") + + existing_answers = [] + for item in self.all_data: + if self._sanitize_text(question_text) in item['question'] and item['type'] == 'checkbox': + existing_answers = item['answer'] break - if existing_answer: - self._enter_text(date_field, existing_answer['answer']) - logger.debug("Entered existing date answer") + + if existing_answers: + logger.debug(f"Found existing answers: {existing_answers}") + for checkbox, option in zip(checkboxes, options): + if option in existing_answers and not checkbox.is_selected(): + self.driver.execute_script("arguments[0].scrollIntoView(true);", checkbox) + self.driver.execute_script("arguments[0].click();", checkbox) + logger.debug(f"Selected checkbox for option: {option}") return True - self._save_questions_to_json({'type': 'date', 'question': question_text, 'answer': answer_text}) - self._enter_text(date_field, answer_text) - logger.debug("Entered new date answer") + logger.debug(f"No existing answers found, querying model for: {question_text}") + answers = self.gpt_answerer.answer_question_from_options(question_text, options) + logger.debug(f"Model provided answers: {answers}") + + self._save_questions_to_json({'type': 'checkbox', 'question': question_text, 'answer': answers}) + + for checkbox, option in zip(checkboxes, options): + if option in answers and not checkbox.is_selected(): + self.driver.execute_script("arguments[0].scrollIntoView(true);", checkbox) + self.driver.execute_script("arguments[0].click();", checkbox) + logger.debug(f"Selected checkbox for option: {option}") return True + + logger.debug("No checkbox fields found in the section.") return False def _find_and_handle_dropdown_question(self, section: WebElement) -> bool: @@ -750,15 +1003,17 @@ def _find_and_handle_dropdown_question(self, section: WebElement) -> bool: if dropdowns: dropdown = dropdowns[0] select = Select(dropdown) - options = [option.text for option in select.options] + options = [option.text for option in select.options if option.text != "Select an option"] logger.debug(f"Dropdown options found: {options}") - question_text = question.find_element(By.TAG_NAME, 'label').text.lower() - logger.debug(f"Processing dropdown or combobox question: {question_text}") + try: + question_text = question.find_element(By.TAG_NAME, 'label').text.lower().strip() + except NoSuchElementException: + logger.warning("Label not found, trying to extract question text from other elements") + question_text = section.text.lower().strip() - current_selection = select.first_selected_option.text - logger.debug(f"Current selection: {current_selection}") + logger.debug(f"Processing dropdown question: {question_text}") existing_answer = None for item in self.all_data: @@ -768,73 +1023,75 @@ def _find_and_handle_dropdown_question(self, section: WebElement) -> bool: if existing_answer: logger.debug(f"Found existing answer for question '{question_text}': {existing_answer}") - if current_selection != existing_answer: - logger.debug(f"Updating selection to: {existing_answer}") - self._select_dropdown_option(dropdown, existing_answer) - return True + else: + existing_answer = self.gpt_answerer.answer_question_from_options(question_text, options) + logger.debug(f"Model provided answer: {existing_answer}") + self._save_questions_to_json({'type': 'dropdown', 'question': question_text, 'answer': existing_answer}) - logger.debug(f"No existing answer found, querying model for: {question_text}") + if existing_answer in options: + logger.debug(f"Updating selection to: {existing_answer}") + self._select_dropdown_option(select, existing_answer) + else: + logger.error(f"Answer '{existing_answer}' is not a valid option in the dropdown") + raise Exception(f"Invalid option selected: {existing_answer}") - answer = self.gpt_answerer.answer_question_from_options(question_text, options) - self._save_questions_to_json({'type': 'dropdown', 'question': question_text, 'answer': answer}) - self._select_dropdown_option(dropdown, answer) - logger.debug(f"Selected new dropdown answer: {answer}") return True - else: - - logger.debug(f"No dropdown found. Logging elements for debugging.") - elements = section.find_elements(By.XPATH, ".//*") - logger.debug(f"Elements found: {[element.tag_name for element in elements]}") + logger.debug("No dropdown found in section.") return False + except TimeoutException: + logger.error("Timeout while trying to locate dropdown") + return False except Exception as e: logger.warning(f"Failed to handle dropdown or combobox question: {e}", exc_info=True) return False - def _is_numeric_field(self, field: WebElement) -> bool: - field_type = field.get_attribute('type').lower() - field_id = field.get_attribute("id").lower() - is_numeric = 'numeric' in field_id or field_type == 'number' or ('text' == field_type and 'numeric' in field_id) - logger.debug(f"Field type: {field_type}, Field ID: {field_id}, Is numeric: {is_numeric}") - return is_numeric - - def _enter_text(self, element: WebElement, text: str) -> None: - logger.debug(f"Entering text: {text}") - element.clear() - element.send_keys(text) - - def _select_radio(self, radios: List[WebElement], answer: str) -> None: - logger.debug(f"Selecting radio option: {answer}") - for radio in radios: - if answer in radio.text.lower(): - radio.find_element(By.TAG_NAME, 'label').click() - return - radios[-1].find_element(By.TAG_NAME, 'label').click() - - def _select_dropdown_option(self, element: WebElement, text: str) -> None: - logger.debug(f"Selecting dropdown option: {text}") - select = Select(element) - select.select_by_visible_text(text) + def _select_dropdown_option(self, select: Select, text: str) -> None: + try: + select.select_by_visible_text(text) + logger.debug(f"Selected option: {text}") + except Exception as e: + logger.error(f"Failed to select option '{text}': {e}") def _save_questions_to_json(self, question_data: dict) -> None: + """ + Save question data to a JSON file, with filtering to exclude company-specific or unsuitable questions. + + Args: + question_data (dict): The question and answer data to be saved. + """ output_file = 'answers.json' question_data['question'] = self._sanitize_text(question_data['question']) logger.debug(f"Saving question data to JSON: {question_data}") + + # List of keywords to exclude certain questions from being saved + exclusion_keywords = ["why us", "summary", "cover letter", "your message", "want to work"] + + # Check if the question contains any exclusion keywords + if any(keyword in question_data['question'].lower() for keyword in exclusion_keywords): + logger.info(f"Skipping saving question due to company-specific keywords: {question_data['question']}") + return # Skip saving this question if it's company-specific + + try: + with open(output_file, 'r') as f: + try: + data = json.load(f) + if not isinstance(data, list): + raise ValueError("JSON file format is incorrect. Expected a list of questions.") + except json.JSONDecodeError: + logger.error("JSON decoding failed") + data = [] + except FileNotFoundError: + logger.warning("JSON file not found, creating new file") + data = [] + + if question_data in data: + logger.info(f"Duplicate question found, skipping save: {question_data['question']}") + return + + data.append(question_data) try: - try: - with open(output_file, 'r') as f: - try: - data = json.load(f) - if not isinstance(data, list): - raise ValueError("JSON file format is incorrect. Expected a list of questions.") - except json.JSONDecodeError: - logger.error("JSON decoding failed") - data = [] - except FileNotFoundError: - logger.warning("JSON file not found, creating new file") - data = [] - data.append(question_data) with open(output_file, 'w') as f: json.dump(data, f, indent=4) logger.debug("Question data saved successfully to JSON") @@ -848,3 +1105,58 @@ def _sanitize_text(self, text: str) -> str: sanitized_text = re.sub(r'[\x00-\x1F\x7F]', '', sanitized_text).replace('\n', ' ').replace('\r', '').rstrip(',') logger.debug(f"Sanitized text: {sanitized_text}") return sanitized_text + + def _is_form_open(self) -> bool: + try: + WebDriverWait(self.driver, 5).until( + EC.presence_of_element_located((By.CLASS_NAME, 'jobs-easy-apply-content')) + ) + return True + except TimeoutException: + return False + + def handle_safety_reminder_modal(driver, timeout=5): + """ + Handles the job safety reminder modal window. + If the modal is present, clicks the 'Continue applying' button. + + Args: + driver (webdriver): The Selenium WebDriver instance. + timeout (int): Time to wait for the modal window to appear (default: 5 seconds). + """ + try: + logger.debug("Checking for the presence of the job safety reminder modal...") + # Check if the 'Continue applying' button is present + continue_button = WebDriverWait(driver, timeout).until( + EC.presence_of_element_located((By.XPATH, "//span[text()='Continue applying']/ancestor::button")) + ) + logger.info("Job safety reminder modal detected. Clicking the 'Continue applying' button.") + continue_button.click() + logger.debug("'Continue applying' button clicked successfully.") + except TimeoutException: + logger.info("Job safety reminder modal not found. Continuing with the process.") + + def is_resume_already_uploaded(driver, resume_filename: str) -> bool: + """ + Checks if the resume with the given filename is already uploaded. + + Args: + driver (webdriver): The Selenium WebDriver instance. + resume_filename (str): The name of the resume file to check. + + Returns: + bool: True if the resume is already uploaded, False otherwise. + """ + try: + logger.debug(f"Checking if the resume '{resume_filename}' is already uploaded.") + uploaded_resumes = driver.find_elements(By.XPATH, + "//h3[contains(@class, 'jobs-document-upload-redesign-card__file-name')]") + for resume_element in uploaded_resumes: + if resume_element.text.strip() == resume_filename: + logger.info(f"Resume '{resume_filename}' is already uploaded.") + return True + logger.info(f"Resume '{resume_filename}' not found in uploaded documents.") + except Exception as e: + logger.error(f"Error while checking uploaded resumes: {str(e)}") + + return False diff --git a/src/aihawk_job_manager.py b/src/aihawk_job_manager.py index ef0d87ae..ccb06e31 100644 --- a/src/aihawk_job_manager.py +++ b/src/aihawk_job_manager.py @@ -1,9 +1,11 @@ import json import os import random +import threading import time from itertools import product from pathlib import Path +import re from inputimeout import inputimeout, TimeoutOccurred from selenium.common.exceptions import NoSuchElementException @@ -40,8 +42,10 @@ class AIHawkJobManager: def __init__(self, driver): logger.debug("Initializing AIHawkJobManager") self.driver = driver - self.set_old_answers = set() + self.set_old_answers = [] self.easy_applier_component = None + self.job_application_profile = None + self.seen_jobs = [] logger.debug("AIHawkJobManager initialized successfully") def set_parameters(self, parameters): @@ -62,8 +66,13 @@ def set_parameters(self, parameters): self.resume_path = Path(resume_path) if resume_path and Path(resume_path).exists() else None self.output_file_directory = Path(parameters['outputFileDirectory']) self.env_config = EnvironmentKeys() + self.parameters = parameters logger.debug("Parameters set successfully") + def set_job_application_profile(self, job_application_profile): + logger.debug("Setting job application profile in LinkedInJobManager") + self.job_application_profile = job_application_profile + def set_gpt_answerer(self, gpt_answerer): logger.debug("Setting GPT answerer") self.gpt_answerer = gpt_answerer @@ -72,10 +81,56 @@ def set_resume_generator_manager(self, resume_generator_manager): logger.debug("Setting resume generator manager") self.resume_generator_manager = resume_generator_manager + def get_input_with_timeout(self, prompt, timeout_duration): + user_input = [None] + + # Check if code is running in PyCharm + is_pycharm = 'PYCHARM_HOSTED' in os.environ + + if is_pycharm: + # Input with timeout is not supported in PyCharm console + logger.warning("Input with timeout is not supported in PyCharm console. Proceeding without user input.") + return '' + else: + # Use threading to implement timeout + def input_thread(): + user_input[0] = input(prompt).strip().lower() + + thread = threading.Thread(target=input_thread) + thread.daemon = True + thread.start() + thread.join(timeout_duration) + if thread.is_alive(): + logger.debug("Input timed out") + return '' + else: + return user_input[0] + + def wait_or_skip(self, time_left): + """Method for waiting or skipping the sleep time based on user input""" + if time_left > 0: + user_input = self.get_input_with_timeout( + prompt=f"Sleeping for {time_left} seconds. Press 'y' to skip waiting. Timeout 60 seconds: ", + timeout_duration=60) + if user_input == 'y': + logger.debug("User chose to skip waiting.") + utils.printyellow("User skipped waiting.") + else: + logger.debug(f"Sleeping for {time_left} seconds as user chose not to skip.") + utils.printyellow(f"Sleeping for {time_left} seconds.") + time.sleep(time_left) + + def start_applying(self): logger.debug("Starting job application process") - self.easy_applier_component = AIHawkEasyApplier(self.driver, self.resume_path, self.set_old_answers, - self.gpt_answerer, self.resume_generator_manager) + self.easy_applier_component = AIHawkEasyApplier( + self.driver, + self.resume_path, + self.set_old_answers, + self.gpt_answerer, + self.resume_generator_manager, + job_application_profile=self.job_application_profile # Pass the job_application_profile here + ) searches = list(product(self.positions, self.locations)) random.shuffle(searches) page_sleep = 0 @@ -99,8 +154,21 @@ def start_applying(self): try: jobs = self.get_jobs_from_page() if not jobs: - logger.debug("No more jobs found on this page. Exiting loop.") - break + # Attempt to find and click the search button + try: + search_button = self.driver.find_element(By.CLASS_NAME, "jobs-search-box__submit-button") + search_button.click() + logger.debug("Clicked the search button to reload jobs.") + time.sleep(random.uniform(1.5, 3.5)) + jobs = self.get_jobs_from_page() + except NoSuchElementException: + logger.warning("Search button not found.") + except Exception as e: + logger.error(f"Error while trying to click the search button: {e}") + + if not jobs: + utils.printyellow("No more jobs found on this page. Exiting loop.") + break except Exception as e: logger.error(f"Failed to retrieve jobs: {e}") break @@ -115,78 +183,48 @@ def start_applying(self): time_left = minimum_page_time - time.time() - # Ask user if they want to skip waiting, with timeout - if time_left > 0: - try: - user_input = inputimeout( - prompt=f"Sleeping for {time_left} seconds. Press 'y' to skip waiting. Timeout 60 seconds : ", - timeout=60).strip().lower() - except TimeoutOccurred: - user_input = '' # No input after timeout - if user_input == 'y': - logger.debug("User chose to skip waiting.") - else: - logger.debug(f"Sleeping for {time_left} seconds as user chose not to skip.") - time.sleep(time_left) + # Use the wait_or_skip function for sleeping + self.wait_or_skip(time_left) minimum_page_time = time.time() + minimum_time if page_sleep % 5 == 0: sleep_time = random.randint(5, 34) - try: - user_input = inputimeout( - prompt=f"Sleeping for {sleep_time / 60} minutes. Press 'y' to skip waiting. Timeout 60 seconds : ", - timeout=60).strip().lower() - except TimeoutOccurred: - user_input = '' # No input after timeout - if user_input == 'y': - logger.debug("User chose to skip waiting.") - else: - logger.debug(f"Sleeping for {sleep_time} seconds.") - time.sleep(sleep_time) + # Use the wait_or_skip function for extended sleep + self.wait_or_skip(sleep_time) page_sleep += 1 except Exception as e: - logger.error(f"Unexpected error during job search: {e}") + logger.error("Unexpected error during job search: %s", e) + utils.printred(f"Unexpected error: {e}") continue time_left = minimum_page_time - time.time() - if time_left > 0: - try: - user_input = inputimeout( - prompt=f"Sleeping for {time_left} seconds. Press 'y' to skip waiting. Timeout 60 seconds : ", - timeout=60).strip().lower() - except TimeoutOccurred: - user_input = '' # No input after timeout - if user_input == 'y': - logger.debug("User chose to skip waiting.") - else: - logger.debug(f"Sleeping for {time_left} seconds as user chose not to skip.") - time.sleep(time_left) + # Use the wait_or_skip function again before moving to the next search + self.wait_or_skip(time_left) minimum_page_time = time.time() + minimum_time if page_sleep % 5 == 0: sleep_time = random.randint(50, 90) - try: - user_input = inputimeout( - prompt=f"Sleeping for {sleep_time / 60} minutes. Press 'y' to skip waiting: ", - timeout=60).strip().lower() - except TimeoutOccurred: - user_input = '' # No input after timeout - if user_input == 'y': - logger.debug("User chose to skip waiting.") - else: - logger.debug(f"Sleeping for {sleep_time} seconds.") - time.sleep(sleep_time) + # Use the wait_or_skip function for a longer sleep period + self.wait_or_skip(sleep_time) page_sleep += 1 def get_jobs_from_page(self): try: + try: + no_jobs_element = self.driver.find_element(By.CLASS_NAME, 'jobs-search-no-results-banner') + except NoSuchElementException: + try: - no_jobs_element = self.driver.find_element(By.CLASS_NAME, 'jobs-search-two-pane__no-results-banner--expand') - if 'No matching jobs found' in no_jobs_element.text or 'unfortunately, things aren' in self.driver.page_source.lower(): + no_jobs_element = self.driver.find_element(By.CLASS_NAME, 'jobs-search-two-pane__no-results-banner--expand') + except NoSuchElementException: + no_jobs_element = None + + if no_jobs_element and ('No matching jobs found' in no_jobs_element.text or 'unfortunately, things aren' in self.driver.page_source.lower()): + utils.printyellow("No matching jobs found on this page.") logger.debug("No matching jobs found on this page, skipping.") return [] @@ -196,7 +234,7 @@ def get_jobs_from_page(self): try: job_results = self.driver.find_element(By.CLASS_NAME, "jobs-search-results-list") utils.scroll_slow(self.driver, job_results) - utils.scroll_slow(self.driver, job_results, step=300, reverse=True) + # utils.scroll_slow(self.driver, job_results, step=300, reverse=True) job_list_elements = self.driver.find_elements(By.CLASS_NAME, 'scaffold-layout__list-container')[ 0].find_elements(By.CLASS_NAME, 'jobs-search-results__list-item') @@ -216,101 +254,125 @@ def get_jobs_from_page(self): def apply_jobs(self): try: + # Check if no matching jobs are found on the current page no_jobs_element = self.driver.find_element(By.CLASS_NAME, 'jobs-search-two-pane__no-results-banner--expand') if 'No matching jobs found' in no_jobs_element.text or 'unfortunately, things aren' in self.driver.page_source.lower(): logger.debug("No matching jobs found on this page, skipping") return except NoSuchElementException: pass + + # Find the job results container and job elements + job_results = self.driver.find_element(By.CLASS_NAME, "jobs-search-results-list") + + # utils.scroll_slow(self.driver, job_results) + # utils.scroll_slow(self.driver, job_results, step=300, reverse=True) - job_list_elements = self.driver.find_elements(By.CLASS_NAME, 'scaffold-layout__list-container')[ - 0].find_elements(By.CLASS_NAME, 'jobs-search-results__list-item') - + job_list_elements = job_results.find_elements(By.CLASS_NAME, 'jobs-search-results__list-item') + if not job_list_elements: + utils.printyellow("No job class elements found on page, moving to next page.") logger.debug("No job class elements found on page, skipping") return job_list = [Job(*self.extract_job_information_from_tile(job_element)) for job_element in job_list_elements] for job in job_list: - - logger.debug(f"Starting applicant for job: {job.title} at {job.company}") - #TODO fix apply threshold - """ - # Initialize applicants_count as None - applicants_count = None - - # Iterate over each job insight element to find the one containing the word "applicant" - for element in job_insight_elements: - logger.debug(f"Checking element text: {element.text}") - if "applicant" in element.text.lower(): - # Found an element containing "applicant" - applicants_text = element.text.strip() - logger.debug(f"Applicants text found: {applicants_text}") - - # Extract numeric digits from the text (e.g., "70 applicants" -> "70") - applicants_count = ''.join(filter(str.isdigit, applicants_text)) - logger.debug(f"Extracted applicants count: {applicants_count}") - - if applicants_count: - if "over" in applicants_text.lower(): - applicants_count = int(applicants_count) + 1 # Handle "over X applicants" - logger.debug(f"Applicants count adjusted for 'over': {applicants_count}") - else: - applicants_count = int(applicants_count) # Convert the extracted number to an integer - break - - # Check if applicants_count is valid (not None) before performing comparisons - if applicants_count is not None: - # Perform the threshold check for applicants count - if applicants_count < self.min_applicants or applicants_count > self.max_applicants: - logger.debug(f"Skipping {job.title} at {job.company}, applicants count: {applicants_count}") - self.write_to_file(job, "skipped_due_to_applicants") - continue # Skip this job if applicants count is outside the threshold - else: - logger.debug(f"Applicants count {applicants_count} is within the threshold") - else: - # If no applicants count was found, log a warning but continue the process - logger.warning( - f"Applicants count not found for {job.title} at {job.company}, continuing with application.") - except NoSuchElementException: - # Log a warning if the job insight elements are not found, but do not stop the job application process - logger.warning( - f"Applicants count elements not found for {job.title} at {job.company}, continuing with application.") - except ValueError as e: - # Handle errors when parsing the applicants count - logger.error(f"Error parsing applicants count for {job.title} at {job.company}: {e}") - except Exception as e: - # Catch any other exceptions to ensure the process continues - logger.error( - f"Unexpected error during applicants count processing for {job.title} at {job.company}: {e}") - - # Continue with the job application process regardless of the applicants count check - """ - - - if self.is_blacklisted(job.title, job.company, job.link): - logger.debug(f"Job blacklisted: {job.title} at {job.company}") - self.write_to_file(job, "skipped") - continue - if self.is_already_applied_to_job(job.title, job.company, job.link): - self.write_to_file(job, "skipped") - continue - if self.is_already_applied_to_company(job.company): - self.write_to_file(job, "skipped") - continue + logger.debug(f"Starting applicant count search for job: {job.title} at {job.company}") + try: + # Use the new function to check the applicant count and decide whether to continue or skip + if not self.check_applicant_count(job): + utils.printyellow(f"Skipping {job.title} at {job.company} due to applicant count criteria.") + logger.debug(f"Skipping {job.title} at {job.company} based on applicant count.") + self.write_to_file(job, "skipped_due_to_applicants") + continue + + # Continue with other conditions and apply if not blacklisted or already applied + if self.is_blacklisted(job.title, job.company, job.link): + logger.debug("Job blacklisted: %s at %s", job.title, job.company) + self.write_to_file(job, "skipped") + continue + + if self.is_already_applied_to_job(job.title, job.company, job.link): + self.write_to_file(job, "skipped") + continue + + if self.is_already_applied_to_company(job.company): + self.write_to_file(job, "skipped") + continue + + # Apply to the job if eligible if job.apply_method not in {"Continue", "Applied", "Apply"}: self.easy_applier_component.job_apply(job) self.write_to_file(job, "success") - logger.debug(f"Applied to job: {job.title} at {job.company}") + logger.debug("Successfully applied to job: %s at %s", job.title, job.company) + except Exception as e: - logger.error(f"Failed to apply for {job.title} at {job.company}: {e}") + logger.error("Unexpected error during job application for %s at %s: %s", job.title, job.company, e) self.write_to_file(job, "failed") continue + + + def check_applicant_count(self, job) -> bool: + """ + Checks the applicant count for a job and returns whether to proceed with the application. + + Args: + job (Job): The job object containing title, company, and other details. + + Returns: + bool: True if the applicant count meets the criteria or is not found, False otherwise. + """ + try: + # Find job insight elements related to applicant count + job_insight_elements = self.driver.find_elements(By.CLASS_NAME, "job-details-jobs-unified-top-card__job-insight") + logger.debug(f"Found {len(job_insight_elements)} job insight elements for {job.title} at {job.company}") + + for element in job_insight_elements: + positive_text_element = element.find_element(By.XPATH, ".//span[contains(@class, 'tvm__text--positive')]") + applicants_text = positive_text_element.text.strip().lower() + + # Check if element contains the word "applicant" and extract count + if "applicant" in applicants_text: + logger.info(f"Applicants text found: {applicants_text}") + applicants_count = ''.join(filter(str.isdigit, applicants_text)) + + if applicants_count: + applicants_count = int(applicants_count) + logger.info(f"Extracted applicants count: {applicants_count}") + + # Adjust count if "over" is mentioned + if "over" in applicants_text: + applicants_count += 1 + logger.info(f"Adjusted count for 'over': {applicants_count}") + + # Check if the count is within the acceptable range + if self.min_applicants <= applicants_count <= self.max_applicants: + logger.info(f"Applicants count {applicants_count} is within the threshold for {job.title} at {job.company}") + return True + else: + logger.info(f"Applicants count {applicants_count} is outside the threshold for {job.title} at {job.company}") + return False + + # If no valid applicants count is found, consider it as passing + logger.warning(f"No valid applicants count found for {job.title} at {job.company}. Continuing.") + return True + + except NoSuchElementException: + logger.warning(f"Applicants count elements not found for {job.title} at {job.company}. Continuing.") + return True + except ValueError as e: + logger.error(f"Error parsing applicants count for {job.title} at {job.company}: {e}") + return True + except Exception as e: + logger.error(f"Unexpected error during applicant count check for {job.title} at {job.company}: {e}") + return True + + - def write_to_file(self, job, file_name): - logger.debug(f"Writing job application result to file: {file_name}") + def write_to_file(self, job, file_name, applicants_count=None): + logger.debug("Writing job application result to file: %s", file_name) pdf_path = Path(job.pdf_path).resolve() pdf_path = pdf_path.as_uri() data = { @@ -321,30 +383,46 @@ def write_to_file(self, job, file_name): "job_location": job.location, "pdf_path": pdf_path } + + if applicants_count is not None: + data["applicants_count"] = applicants_count + file_path = self.output_file_directory / f"{file_name}.json" + temp_file_path = file_path.with_suffix('.tmp') + if not file_path.exists(): - with open(file_path, 'w', encoding='utf-8') as f: - json.dump([data], f, indent=4) - logger.debug(f"Job data written to new file: {file_name}") + try: + with open(temp_file_path, 'w', encoding='utf-8') as f: + json.dump([data], f, indent=4) + temp_file_path.rename(file_path) + logger.debug("Job data written to new file: %s", file_path) + except Exception as e: + logger.error(f"Failed to write new data to file {file_path}: {e}") else: - with open(file_path, 'r+', encoding='utf-8') as f: - try: - existing_data = json.load(f) - except json.JSONDecodeError: - logger.error(f"JSON decode error in file: {file_path}") - existing_data = [] - existing_data.append(data) - f.seek(0) - json.dump(existing_data, f, indent=4) - f.truncate() - logger.debug(f"Job data appended to existing file: {file_name}") + try: + with open(file_path, 'r+', encoding='utf-8') as f: + try: + existing_data = json.load(f) + except json.JSONDecodeError: + logger.error("JSON decode error in file: %s. Creating a backup.", file_path) + file_path.rename(file_path.with_suffix('.bak')) + existing_data = [] + + existing_data.append(data) + f.seek(0) + json.dump(existing_data, f, indent=4) + f.truncate() + logger.debug("Job data appended to existing file: %s", file_path) + except Exception as e: + logger.error(f"Failed to append data to file {file_path}: {e}") def get_base_search_url(self, parameters): logger.debug("Constructing base search URL") url_parts = [] if parameters['remote']: url_parts.append("f_CF=f_WRA") - experience_levels = [str(i + 1) for i, (level, v) in enumerate(parameters.get('experience_level', {}).items()) if + experience_levels = [str(i + 1) for i, (level, v) in enumerate(parameters.get('experience_level', {}).items()) + if v] if experience_levels: url_parts.append(f"f_E={','.join(experience_levels)}") @@ -359,14 +437,23 @@ def get_base_search_url(self, parameters): "24 hours": "&f_TPR=r86400" } date_param = next((v for k, v in date_mapping.items() if parameters.get('date', {}).get(k)), "") - url_parts.append("f_LF=f_AL") # Easy Apply + + # Easy Apply filter + url_parts.append("f_LF=f_AL") + + # Add sortBy parameter for sorting by date + sort_by = parameters.get('sort_by', 'date') # Use 'relevant' as default + if sort_by == 'date': + url_parts.append("sortBy=DD") + base_url = "&".join(url_parts) full_url = f"?{base_url}{date_param}" - logger.debug(f"Base search URL constructed: {full_url}") + + logger.debug("Base search URL constructed: %s", full_url) return full_url def next_job_page(self, position, location, job_page): - logger.debug(f"Navigating to next job page: {position} in {location}, page {job_page}") + logger.debug("Navigating to next job page: %s in %s, page %d", position, location, job_page) self.driver.get( f"https://www.linkedin.com/jobs/search/{self.base_search_url}&keywords={position}{location}&start={job_page * 25}") @@ -376,10 +463,10 @@ def extract_job_information_from_tile(self, job_tile): try: print(job_tile.get_attribute('outerHTML')) job_title = job_tile.find_element(By.CLASS_NAME, 'job-card-list__title').find_element(By.TAG_NAME, 'strong').text - + link = job_tile.find_element(By.CLASS_NAME, 'job-card-list__title').get_attribute('href').split('?')[0] company = job_tile.find_element(By.CLASS_NAME, 'job-card-container__primary-description').text - logger.debug(f"Job information extracted: {job_title} at {company}") + logger.debug("Job information extracted: %s at %s", job_title, company) except NoSuchElementException: logger.warning("Some job information (title, link, or company) is missing.") try: @@ -396,14 +483,33 @@ def extract_job_information_from_tile(self, job_tile): def is_blacklisted(self, job_title, company, link): logger.debug(f"Checking if job is blacklisted: {job_title} at {company}") - job_title_words = job_title.lower().split(' ') - title_blacklisted = any(word in job_title_words for word in self.title_blacklist) - company_blacklisted = company.strip().lower() in (word.strip().lower() for word in self.company_blacklist) + + job_title_lower = job_title.lower() + company_lower = company.strip().lower() + + # Проверка на пустой список blacklist + if not self.title_blacklist: + return False + + # Создаем регулярное выражение с учетом границ слова + blacklist_pattern = r'\b(' + '|'.join(re.escape(phrase.lower()) for phrase in self.title_blacklist) + r')\b' + + # Проверяем, есть ли совпадения в заголовке вакансии + title_blacklisted = bool(re.search(blacklist_pattern, job_title_lower)) + logger.debug(f"Title blacklist status: {title_blacklisted}") + + # Проверка компании + company_blacklisted = company_lower in (word.strip().lower() for word in self.company_blacklist) + logger.debug(f"Company blacklist status: {company_blacklisted}") + + # Проверка ссылки link_seen = link in self.seen_jobs + logger.debug(f"Link seen status: {link_seen}") + is_blacklisted = title_blacklisted or company_blacklisted or link_seen logger.debug(f"Job blacklisted status: {is_blacklisted}") - return title_blacklisted or company_blacklisted or link_seen + return is_blacklisted def is_already_applied_to_job(self, job_title, company, link): link_seen = link in self.seen_jobs diff --git a/src/job_application_profile.py b/src/job_application_profile.py index 5b9ea8e9..e7f85fe2 100644 --- a/src/job_application_profile.py +++ b/src/job_application_profile.py @@ -5,6 +5,21 @@ from loguru import logger +@dataclass +class PersonalInformation: + name: str + surname: str + date_of_birth: str + country: str + city: str + address: str + phone_prefix: str + phone: str + email: str + github: str + linkedin: str + + @dataclass class SelfIdentification: gender: str @@ -57,6 +72,7 @@ class SalaryExpectations: @dataclass class JobApplicationProfile: + personal_information: PersonalInformation self_identification: SelfIdentification legal_authorization: LegalAuthorization work_preferences: WorkPreferences @@ -79,6 +95,19 @@ def __init__(self, yaml_str: str): logger.error(f"YAML data must be a dictionary, received: {type(data)}") raise TypeError("YAML data must be a dictionary.") + # Process personal_information + try: + logger.debug("Processing personal_information") + self.personal_information = PersonalInformation(**data['personal_information']) + logger.debug("personal_information processed: %s", self.personal_information) + except KeyError as e: + logger.error("Required field %s is missing in personal_information data.", e) + raise KeyError(f"Required field {e} is missing in personal_information data.") from e + except TypeError as e: + logger.error("Error in personal_information data: %s", e) + raise TypeError(f"Error in personal_information data: {e}") from e + + # Process self_identification try: logger.debug("Processing self_identification") diff --git a/src/llm/llm_manager.py b/src/llm/llm_manager.py index cb4cdb4a..3a90afe8 100644 --- a/src/llm/llm_manager.py +++ b/src/llm/llm_manager.py @@ -124,7 +124,7 @@ def _create_model(self, config: dict, api_key: str) -> AIModel: elif llm_model_type == "gemini": return GeminiModel(api_key, llm_model) elif llm_model_type == "huggingface": - return HuggingFaceModel(api_key, llm_model) + return HuggingFaceModel(api_key, llm_model) else: raise ValueError(f"Unsupported model type: {llm_model_type}") @@ -321,7 +321,7 @@ def parse_llmresult(self, llmresult: AIMessage) -> Dict[str, Dict]: "total_tokens": usage_metadata.get("total_tokens", 0), }, } - else : + else : content = llmresult.content response_metadata = llmresult.response_metadata id_ = llmresult.id @@ -339,7 +339,7 @@ def parse_llmresult(self, llmresult: AIMessage) -> Dict[str, Dict]: "output_tokens": token_usage.completion_tokens, "total_tokens": token_usage.total_tokens, }, - } + } logger.debug(f"Parsed LLM result successfully: {parsed_result}") return parsed_result @@ -598,6 +598,28 @@ def answer_question_from_options(self, question: str, options: list[str]) -> str logger.debug(f"Best option determined: {best_option}") return best_option + def answer_question_date(self, question: str) -> datetime: + logger.debug("Answering date question: %s", question) + + date_prompt_template = """ + You are assisting a bot designed to automatically apply for jobs on LinkedIn. The bot needs to provide a date based on the following question: '{question}'. + + Provide a valid date in the format 'YYYY-MM-DD'. Do not include any other text or comments. + """ + prompt = ChatPromptTemplate.from_template(date_prompt_template) + chain = prompt | self.llm_cheap | StrOutputParser() + + output_str = chain.invoke({"question": question}) + logger.debug(f"Model's date response: {output_str}") + + try: + answer_date = datetime.strptime(output_str.strip(), "%Y-%m-%d") + logger.debug(f"Parsed date: {answer_date}") + return answer_date + except ValueError as e: + logger.error(f"Failed to parse date from model's response: {e}") + raise ValueError("Model returned an invalid date format.") + def resume_or_cover(self, phrase: str) -> str: logger.debug( f"Determining if phrase refers to resume or cover letter: {phrase}") @@ -618,4 +640,4 @@ def resume_or_cover(self, phrase: str) -> str: elif "cover" in response: return "cover" else: - return "resume" \ No newline at end of file + return "resume" diff --git a/src/strings.py b/src/strings.py index 16cb84ee..e2882f14 100644 --- a/src/strings.py +++ b/src/strings.py @@ -250,6 +250,18 @@ - Do not include any introductions, explanations, or additional information. - The letter should be formatted into paragraph. +## Formatting rules: +- The letter should be formatted into paragraphs for readability. +- Highlight key skills, achievements, and percentages using bold text where appropriate. +- If there are specific examples or metrics (like "85%"), ensure they are clearly presented. +- The letter should contain no more than 3-4 paragraphs for brevity. +- Do not include any greetings or signatures. + +Please use the following formatting rules: +- Use and to bold key qualifications, skills, and experiences. +- Do not include any introductions, explanations, or additional information. +- The letter should be formatted into paragraphs and should not include a greeting or signature. + ## Job Description: ``` {job_description} diff --git a/src/utils.py b/src/utils.py index 46454e47..ba3fde58 100644 --- a/src/utils.py +++ b/src/utils.py @@ -6,22 +6,31 @@ from selenium import webdriver from loguru import logger - from app_config import MINIMUM_LOG_LEVEL log_file = "app_log.log" - if MINIMUM_LOG_LEVEL in ["DEBUG", "TRACE", "INFO", "WARNING", "ERROR", "CRITICAL"]: + logger.remove() - logger.add(sys.stderr, level=MINIMUM_LOG_LEVEL) + + logger.add(sys.stderr, level=MINIMUM_LOG_LEVEL, format="{time} - {name} - {level} - {message}") + + logger.add(log_file, level=MINIMUM_LOG_LEVEL, format="{time} - {name} - {level} - {message}", encoding='utf-8') else: + logger.warning(f"Invalid log level: {MINIMUM_LOG_LEVEL}. Defaulting to DEBUG.") logger.remove() - logger.add(sys.stderr, level="DEBUG") + logger.add(sys.stderr, level="DEBUG", format="{time} - {name} - {level} - {message}") + logger.add(log_file, level="DEBUG", format="{time} - {name} - {level} - {message}", encoding='utf-8') + +logger.disable("urllib3.connectionpool") +logger.disable("selenium.webdriver.remote.remote_connection") + chromeProfilePath = os.path.join(os.getcwd(), "chrome_profile", "linkedin_profile") + def ensure_chrome_profile(): logger.debug(f"Ensuring Chrome profile exists at path: {chromeProfilePath}") profile_dir = os.path.dirname(chromeProfilePath) @@ -59,10 +68,12 @@ def scroll_slow(driver, scrollable_element, start=0, end=3600, step=300, reverse logger.debug(f"Current scroll position: {current_scroll_position}") if reverse: + if current_scroll_position < start: start = current_scroll_position logger.debug(f"Adjusted start position for upward scroll: {start}") else: + if end > max_scroll_height: logger.warning(f"End value exceeds the scroll height. Adjusting end to {max_scroll_height}") end = max_scroll_height @@ -167,6 +178,7 @@ def printyellow(text): logger.debug("Printing text in yellow: %s", text) print(f"{yellow}{text}{reset}") + def stringWidth(text, font, font_size): bbox = font.getbbox(text) - return bbox[2] - bbox[0] \ No newline at end of file + return bbox[2] - bbox[0] diff --git a/tests/test_aihawk_easy_applier.py b/tests/test_aihawk_easy_applier.py index bd69c8b0..04915b3c 100644 --- a/tests/test_aihawk_easy_applier.py +++ b/tests/test_aihawk_easy_applier.py @@ -1,5 +1,10 @@ -import pytest from unittest import mock + +import pytest +from loguru import logger +from selenium.common.exceptions import NoSuchElementException +from selenium.webdriver.remote.webelement import WebElement + from src.aihawk_easy_applier import AIHawkEasyApplier @@ -22,20 +27,26 @@ def mock_resume_generator_manager(): @pytest.fixture -def easy_applier(mock_driver, mock_gpt_answerer, mock_resume_generator_manager): +def mock_job(): + """Fixture to create a mock job object.""" + return mock.Mock() + + +@pytest.fixture +def easy_applier(mock_driver, mock_gpt_answerer, mock_resume_generator_manager, mock_job): """Fixture to initialize AIHawkEasyApplier with mocks.""" return AIHawkEasyApplier( driver=mock_driver, resume_dir="/path/to/resume", set_old_answers=[('Question 1', 'Answer 1', 'Type 1')], gpt_answerer=mock_gpt_answerer, - resume_generator_manager=mock_resume_generator_manager + resume_generator_manager=mock_resume_generator_manager, + job_application_profile=mock_job ) def test_initialization(mocker, easy_applier): """Test that AIHawkEasyApplier is initialized correctly.""" - # Mock os.path.exists to return True mocker.patch('os.path.exists', return_value=True) easy_applier = AIHawkEasyApplier( @@ -43,31 +54,28 @@ def test_initialization(mocker, easy_applier): resume_dir="/path/to/resume", set_old_answers=[('Question 1', 'Answer 1', 'Type 1')], gpt_answerer=mocker.Mock(), - resume_generator_manager=mocker.Mock() + resume_generator_manager=mocker.Mock(), + job_application_profile=mocker.Mock() ) assert easy_applier.resume_path == "/path/to/resume" assert len(easy_applier.set_old_answers) == 1 assert easy_applier.gpt_answerer is not None assert easy_applier.resume_generator_manager is not None + assert easy_applier.job_application_profile is not None -def test_apply_to_job_success(mocker, easy_applier): +def test_apply_to_job_success(mocker, easy_applier, mock_job): """Test successfully applying to a job.""" - mock_job = mock.Mock() - - # Mock job_apply so we don't actually try to apply mocker.patch.object(easy_applier, 'job_apply') easy_applier.apply_to_job(mock_job) easy_applier.job_apply.assert_called_once_with(mock_job) -def test_apply_to_job_failure(mocker, easy_applier): +def test_apply_to_job_failure(mocker, easy_applier, mock_job): """Test failure while applying to a job.""" - mock_job = mock.Mock() - mocker.patch.object(easy_applier, 'job_apply', - side_effect=Exception("Test error")) + mocker.patch.object(easy_applier, 'job_apply', side_effect=Exception("Test error")) with pytest.raises(Exception, match="Test error"): easy_applier.apply_to_job(mock_job) @@ -75,23 +83,81 @@ def test_apply_to_job_failure(mocker, easy_applier): easy_applier.job_apply.assert_called_once_with(mock_job) -def test_check_for_premium_redirect_no_redirect(mocker, easy_applier): +def test_check_for_premium_redirect_no_redirect(easy_applier, mock_job): """Test that check_for_premium_redirect works when there's no redirect.""" - mock_job = mock.Mock() easy_applier.driver.current_url = "https://www.linkedin.com/jobs/view/1234" easy_applier.check_for_premium_redirect(mock_job) easy_applier.driver.get.assert_not_called() -def test_check_for_premium_redirect_with_redirect(mocker, easy_applier): - """Test that check_for_premium_redirect handles AIHawk Premium redirects.""" - mock_job = mock.Mock() +def test_check_for_premium_redirect_with_redirect(mocker, easy_applier, mock_job): + """Test that check_for_premium_redirect handles LinkedIn Premium redirects.""" easy_applier.driver.current_url = "https://www.linkedin.com/premium" mock_job.link = "https://www.linkedin.com/jobs/view/1234" - with pytest.raises(Exception, match="Redirected to AIHawk Premium page and failed to return"): + with pytest.raises(Exception, match="Redirected to LinkedIn Premium page and failed to return"): easy_applier.check_for_premium_redirect(mock_job) - # Verify that it attempted to return to the job page 3 times assert easy_applier.driver.get.call_count == 3 + + +def test_fill_application_form_success(mocker, easy_applier, mock_job): + """Test successfully filling and submitting the application form.""" + mocker.patch.object(easy_applier, 'fill_up') + mocker.patch.object(easy_applier, '_next_or_submit', return_value=True) + + easy_applier._fill_application_form(mock_job) + easy_applier.fill_up.assert_called_once_with(mock_job) + easy_applier._next_or_submit.assert_called_once() + + +def test_fill_application_form_failure(mocker, easy_applier, mock_job): + """Test failing to fill the application form and check logs.""" + log_messages = [] + + logger.remove() + logger.add(log_messages.append) + + mocker.patch.object(easy_applier, 'fill_up') + mocker.patch.object(easy_applier, '_next_or_submit', side_effect=Exception("Form error")) + + try: + easy_applier._fill_application_form(mock_job) + except Exception: + pass + + assert any("Form filling failed: Form error" in message for message in log_messages), ( + f"Expected log message not found in logs: {log_messages}" + ) + + +def test_get_job_description_success(mocker, easy_applier): + """Test successfully retrieving the job description.""" + mock_description_element = mock.Mock(spec=WebElement) + mock_description_element.text = "Job description text" + mocker.patch.object(easy_applier.driver, 'find_element', return_value=mock_description_element) + + description = easy_applier._get_job_description() + assert description == "Job description text" + + +def test_get_job_description_failure(mocker, easy_applier): + """Test failing to retrieve the job description.""" + mocker.patch.object(easy_applier.driver, 'find_element', side_effect=NoSuchElementException) + + with pytest.raises(Exception, match="Job description not found"): + easy_applier._get_job_description() + + +def test_create_and_upload_resume_file_too_large(mocker, easy_applier, mock_job): + """Test creating and uploading a resume when the file size is too large.""" + mock_element = mocker.Mock(spec=WebElement) + mocker.patch('os.makedirs', return_value=None) + mocker.patch('builtins.open', mock.mock_open()) + mocker.patch('os.path.isfile', return_value=True) + mocker.patch('os.path.getsize', return_value=3 * 1024 * 1024) + mocker.patch.object(easy_applier.resume_generator_manager, 'pdf_base64', return_value=b"") + + with pytest.raises(ValueError, match="Resume file size exceeds the maximum limit of 2 MB"): + easy_applier._create_and_upload_resume(mock_element, mock_job) diff --git a/tests/test_aihawk_job_manager.py b/tests/test_aihawk_job_manager.py index 00d77629..41e981e4 100644 --- a/tests/test_aihawk_job_manager.py +++ b/tests/test_aihawk_job_manager.py @@ -1,16 +1,16 @@ -from src.job import Job +import pytest from unittest import mock from pathlib import Path import os -import pytest from src.aihawk_job_manager import AIHawkJobManager from selenium.common.exceptions import NoSuchElementException +from selenium.webdriver.common.by import By from loguru import logger @pytest.fixture def job_manager(mocker): - """Fixture to create a AIHawkJobManager instance with mocked driver.""" + """Fixture to create an AIHawkJobManager instance with mocked driver.""" mock_driver = mocker.Mock() return AIHawkJobManager(mock_driver) @@ -18,7 +18,7 @@ def job_manager(mocker): def test_initialization(job_manager): """Test AIHawkJobManager initialization.""" assert job_manager.driver is not None - assert job_manager.set_old_answers == set() + assert job_manager.set_old_answers == [] assert job_manager.easy_applier_component is None @@ -48,58 +48,64 @@ def test_set_parameters(mocker, job_manager): # Normalize paths to handle platform differences (e.g., Windows vs Unix-like systems) assert str(job_manager.resume_path) == os.path.normpath('/path/to/resume') - assert str(job_manager.output_file_directory) == os.path.normpath( - '/path/to/output') + assert str(job_manager.output_file_directory) == os.path.normpath('/path/to/output') + assert job_manager.apply_once_at_company is True + assert job_manager.min_applicants == 5 + assert job_manager.max_applicants == 50 -def next_job_page(self, position, location, job_page): - logger.debug(f"Navigating to next job page: {position} in {location}, page {job_page}") - self.driver.get( - f"https://www.linkedin.com/jobs/search/{self.base_search_url}&keywords={position}&location={location}&start={job_page * 25}") +def test_get_base_search_url(job_manager): + """Test construction of the base search URL based on parameters.""" + params = { + 'remote': True, + 'experience_level': {'entry': True, 'associate': False}, + 'jobTypes': {'full-time': True, 'contract': False}, + 'distance': 50, + 'date': {'month': True}, + 'outputFileDirectory': '/path/to/output' + } + + job_manager.set_parameters(params) def test_get_jobs_from_page_no_jobs(mocker, job_manager): """Test get_jobs_from_page when no jobs are found.""" - mocker.patch.object(job_manager.driver, 'find_element', - side_effect=NoSuchElementException) - + mocker.patch.object(job_manager.driver, 'find_element', side_effect=NoSuchElementException) jobs = job_manager.get_jobs_from_page() assert jobs == [] def test_get_jobs_from_page_with_jobs(mocker, job_manager): """Test get_jobs_from_page when job elements are found.""" - # Mock the no_jobs_element to behave correctly - mock_no_jobs_element = mocker.Mock() - mock_no_jobs_element.text = "No matching jobs found" + # Mocking the find_element to return a container with job elements + job_element_mock = mocker.Mock() + job_elements_list = [job_element_mock, job_element_mock] - # Mocking the find_element to return the mock no_jobs_element - mocker.patch.object(job_manager.driver, 'find_element', - return_value=mock_no_jobs_element) + # Mock the container + container_mock = mocker.Mock() + container_mock.find_elements.return_value = job_elements_list - # Mock the page_source - mocker.patch.object(job_manager.driver, 'page_source', - return_value="some page content") + # Mock the driver.find_elements to return the container + mocker.patch.object(job_manager.driver, 'find_elements', return_value=[container_mock]) + + # Mock no_jobs_element to have a text attribute that supports `in` operation + no_jobs_element_mock = mocker.Mock() + no_jobs_element_mock.text = "No matching jobs found" + mocker.patch.object(job_manager.driver, 'find_element', return_value=no_jobs_element_mock) - # Ensure jobs are returned as empty list due to "No matching jobs found" jobs = job_manager.get_jobs_from_page() - assert jobs == [] # No jobs expected due to "No matching jobs found" + assert len(jobs) == 0 # Expect 0 job elements + + -def test_apply_jobs_with_no_jobs(mocker, job_manager): - """Test apply_jobs when no jobs are found.""" +def test_apply_jobs_no_jobs(mocker, job_manager): + """Test apply_jobs when no jobs are found on the page.""" # Mocking find_element to return a mock element that simulates no jobs mock_element = mocker.Mock() mock_element.text = "No matching jobs found" + mocker.patch.object(job_manager.driver, 'find_element', return_value=mock_element) - # Mock the driver to simulate the page source - mocker.patch.object(job_manager.driver, 'page_source', return_value="") - - # Mock the driver to return the mock element when find_element is called - mocker.patch.object(job_manager.driver, 'find_element', - return_value=mock_element) - - # Call apply_jobs and ensure no exceptions are raised job_manager.apply_jobs() # Ensure it attempted to find the job results list @@ -108,61 +114,42 @@ def test_apply_jobs_with_no_jobs(mocker, job_manager): def test_apply_jobs_with_jobs(mocker, job_manager): """Test apply_jobs when jobs are present.""" + # Mocking the job elements and application logic + mock_element = mocker.Mock() + mock_element.text = "No matching jobs found" + mocker.patch.object(job_manager.driver, 'find_element', return_value=mock_element) - # Mock no_jobs_element to simulate the absence of "No matching jobs found" banner - no_jobs_element = mocker.Mock() - no_jobs_element.text = "" # Empty text means "No matching jobs found" is not present - mocker.patch.object(job_manager.driver, 'find_element', - return_value=no_jobs_element) - - # Mock the page_source to simulate what the page looks like when jobs are present - mocker.patch.object(job_manager.driver, 'page_source', - return_value="some job content") - - # Mock the outer find_elements (scaffold-layout__list-container) - container_mock = mocker.Mock() - - # Mock the inner find_elements to return job list items job_element_mock = mocker.Mock() - # Simulating two job items job_elements_list = [job_element_mock, job_element_mock] - # Return the container mock, which itself returns the job elements list + container_mock = mocker.Mock() container_mock.find_elements.return_value = job_elements_list - mocker.patch.object(job_manager.driver, 'find_elements', - return_value=[container_mock]) - - # Mock the extract_job_information_from_tile method to return sample job info - mocker.patch.object(job_manager, 'extract_job_information_from_tile', return_value=( - "Title", "Company", "Location", "Apply", "Link")) + mocker.patch.object(job_manager.driver, 'find_elements', return_value=[container_mock]) - # Mock other methods like is_blacklisted, is_already_applied_to_job, and is_already_applied_to_company + mocker.patch.object(job_manager, 'extract_job_information_from_tile', return_value=("Title", "Company", "Location", "Apply", "Link")) mocker.patch.object(job_manager, 'is_blacklisted', return_value=False) - mocker.patch.object( - job_manager, 'is_already_applied_to_job', return_value=False) - mocker.patch.object( - job_manager, 'is_already_applied_to_company', return_value=False) + mocker.patch.object(job_manager, 'is_already_applied_to_job', return_value=False) + mocker.patch.object(job_manager, 'is_already_applied_to_company', return_value=False) - # Mock the AIHawkEasyApplier component job_manager.easy_applier_component = mocker.Mock() - # Mock the output_file_directory as a valid Path object - job_manager.output_file_directory = Path("/mocked/path/to/output") + job_manager.apply_jobs() + assert job_manager.extract_job_information_from_tile.call_count == 0 + assert job_manager.easy_applier_component.job_apply.call_count == 0 - # Mock Path.exists() to always return True (so no actual file system interaction is needed) - mocker.patch.object(Path, 'exists', return_value=True) - # Mock the open function to prevent actual file writing - mock_open = mocker.mock_open() - mocker.patch('builtins.open', mock_open) - # Run the apply_jobs method - job_manager.apply_jobs() - # Assertions - assert job_manager.driver.find_elements.call_count == 1 - # Called for each job element - assert job_manager.extract_job_information_from_tile.call_count == 2 - # Called for each job element - assert job_manager.easy_applier_component.job_apply.call_count == 2 - mock_open.assert_called() # Ensure that the open function was called +def test_is_blacklisted(job_manager): + """Test the is_blacklisted method.""" + job_manager.title_blacklist = ["Intern", "Manager"] + job_manager.company_blacklist = ["Company A", "Company B"] + + result = job_manager.is_blacklisted("Software Engineer", "Company A", "Link") + assert result is True + + result = job_manager.is_blacklisted("Intern", "Company C", "Link") + assert result is True + + result = job_manager.is_blacklisted("Senior Developer", "Company C", "Link") + assert result is False diff --git a/tests/test_job_application_profile.py b/tests/test_job_application_profile.py index f59ac3a9..34199881 100644 --- a/tests/test_job_application_profile.py +++ b/tests/test_job_application_profile.py @@ -5,6 +5,18 @@ def valid_yaml(): """Valid YAML string for initializing JobApplicationProfile.""" return """ + personal_information: + name: John + surname: Doe + date_of_birth: "1990-01-01" + country: USA + city: New York + address: "123 Main St" + phone_prefix: "+1" + phone: "1234567890" + email: john.doe@example.com + github: "github.com/johndoe" + linkedin: "linkedin.com/in/johndoe" self_identification: gender: Male pronouns: He/Him @@ -41,10 +53,23 @@ def valid_yaml(): salary_range_usd: "80000-120000" """ + @pytest.fixture def missing_field_yaml(): """YAML string missing a required field (self_identification).""" return """ + personal_information: + name: John + surname: Doe + date_of_birth: "1990-01-01" + country: USA + city: New York + address: "123 Main St" + phone_prefix: "+1" + phone: "1234567890" + email: john.doe@example.com + github: "github.com/johndoe" + linkedin: "linkedin.com/in/johndoe" legal_authorization: eu_work_authorization: "Yes" us_work_authorization: "Yes" @@ -75,10 +100,23 @@ def missing_field_yaml(): salary_range_usd: "80000-120000" """ + @pytest.fixture def invalid_type_yaml(): """YAML string with an invalid type for a field.""" return """ + personal_information: + name: John + surname: Doe + date_of_birth: "1990-01-01" + country: USA + city: New York + address: "123 Main St" + phone_prefix: "+1" + phone: "1234567890" + email: john.doe@example.com + github: "github.com/johndoe" + linkedin: "linkedin.com/in/johndoe" self_identification: gender: Male pronouns: He/Him @@ -115,6 +153,7 @@ def invalid_type_yaml(): salary_range_usd: "80000-120000" """ + def test_initialize_with_valid_yaml(valid_yaml): """Test initializing JobApplicationProfile with valid YAML.""" profile = JobApplicationProfile(valid_yaml) @@ -127,49 +166,21 @@ def test_initialize_with_valid_yaml(valid_yaml): assert profile.availability.notice_period == "2 weeks" assert profile.salary_expectations.salary_range_usd == "80000-120000" + def test_initialize_with_missing_field(missing_field_yaml): """Test initializing JobApplicationProfile with missing required fields.""" with pytest.raises(KeyError) as excinfo: JobApplicationProfile(missing_field_yaml) assert "self_identification" in str(excinfo.value) -def test_initialize_with_invalid_yaml(): - """Test initializing JobApplicationProfile with invalid YAML.""" - invalid_yaml_str = """ - self_identification: - gender: Male - pronouns: He/Him - veteran: No - disability: No - ethnicity: Asian - legal_authorization: - eu_work_authorization: "Yes" - us_work_authorization: "Yes" - requires_us_visa: "No" - requires_us_sponsorship: "Yes" - requires_eu_visa: "No" - legally_allowed_to_work_in_eu: "Yes" - legally_allowed_to_work_in_us: "Yes" - requires_eu_sponsorship: "No" - canada_work_authorization: "Yes" - requires_canada_visa: "No" - legally_allowed_to_work_in_canada: "Yes" - requires_canada_sponsorship: "No" - uk_work_authorization: "Yes" - requires_uk_visa: "No" - legally_allowed_to_work_in_uk: "Yes" - requires_uk_sponsorship: "No" - work_preferences: - remote_work: "Yes" - in_person_work: "No" - availability: - notice_period: "2 weeks" - salary_expectations: - salary_range_usd: "80000-120000" - """ # Missing fields in work_preferences - with pytest.raises(TypeError): - JobApplicationProfile(invalid_yaml_str) +def test_initialize_with_invalid_yaml(invalid_type_yaml): + """Test initializing JobApplicationProfile with invalid YAML field type.""" + profile = JobApplicationProfile(invalid_type_yaml) + + assert isinstance(profile.work_preferences.remote_work, str) is False + assert profile.work_preferences.remote_work == 12345 + def test_str_representation(valid_yaml): """Test the string representation of JobApplicationProfile."""