diff --git a/backend/samfundet/automatic_interview_allocation/__init__.py b/backend/samfundet/automatic_interview_allocation/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/samfundet/automatic_interview_allocation/allocate_interviews_for_position.py b/backend/samfundet/automatic_interview_allocation/allocate_interviews_for_position.py new file mode 100644 index 000000000..add17872f --- /dev/null +++ b/backend/samfundet/automatic_interview_allocation/allocate_interviews_for_position.py @@ -0,0 +1,167 @@ +from __future__ import annotations + +from datetime import datetime, timedelta + +from django.utils import timezone + +from samfundet.models.general import User +from samfundet.models.recruitment import Interview, RecruitmentPosition, RecruitmentApplication +from samfundet.automatic_interview_allocation.exceptions import ( + NoFutureTimeSlotsError, + NoTimeBlocksAvailableError, + InsufficientTimeBlocksError, + AllApplicantsUnavailableError, + NoApplicationsWithoutInterviewsError, +) +from samfundet.automatic_interview_allocation.generate_interview_timeblocks import ( + FinalizedTimeBlock, + generate_and_sort_timeblocks, +) + +from .utils import ( + is_applicant_available, + get_available_interviewers_for_timeslot, +) + + +def allocate_interviews_for_position(position: RecruitmentPosition) -> int: + """ + Allocates interviews for applicants of a given recruitment position based on available time blocks. + + Args: + position: The recruitment position for which interviews are being allocated. + allocation_limit: If set, limits the number of interviews to allocate. If None, allocates for all applicants. + + Returns: + The number of interviews allocated. + + Raises: + NoTimeBlocksAvailableError: If no timeblocks are available. + NoApplicationsWithoutInterviewsError: If no applications without interviews exist. + AllApplicantsUnavailableError: If all applicants are unavailable for the remaining time blocks. + NoAvailableInterviewersError: If no interviewers are available for any time slot. + InsufficientTimeBlocksError: If there are not enough time blocks for all applications. + """ + interview_duration = timedelta(minutes=30) # Each interview lasts 30 minutes + + timeblocks = generate_and_sort_timeblocks(position) + applications = get_applications_without_interview(position) + check_timeblocks_and_applications(timeblocks, applications, position) + + interview_count = allocate_all_interviews( + timeblocks, + applications, + position, + interview_duration, + ) + + check_allocation_completeness(interview_count, applications, position) + + return interview_count + + +def get_applications_without_interview(position: RecruitmentPosition) -> list[RecruitmentApplication]: + """Fetch all applications without assigned interviews.""" + return list(RecruitmentApplication.objects.filter(recruitment_position=position, withdrawn=False, interview__isnull=True)) + + +def check_timeblocks_and_applications(timeblocks: list[FinalizedTimeBlock], applications: list[RecruitmentApplication], position: RecruitmentPosition) -> None: + """Validate that there are available time blocks and applications.""" + if not timeblocks: + raise NoTimeBlocksAvailableError(f'No available time blocks for position: {position.name_en}') + if not applications: + raise NoApplicationsWithoutInterviewsError(f'No applications without interviews for position: {position.name_en}') + + +def allocate_all_interviews( + timeblocks: list[FinalizedTimeBlock], + applications: list[RecruitmentApplication], + position: RecruitmentPosition, + interview_duration: timedelta, +) -> int: + """Allocate interviews within available future time blocks.""" + interview_count = 0 + current_time = timezone.now() + timedelta(hours=24) # Only consider time slots 24 hours or more in the future + + future_blocks = [block for block in timeblocks if block['end'] > current_time] + if not future_blocks: + raise NoFutureTimeSlotsError(f'No time slots available at least 24 hours in the future for position: {position.name_en}') + + for block in future_blocks: + block_interview_count = place_interviews_in_block(block, applications, position, interview_duration, current_time) + interview_count += block_interview_count + + return interview_count + + +def place_interviews_in_block( + block: FinalizedTimeBlock, + applications: list[RecruitmentApplication], + position: RecruitmentPosition, + interview_duration: timedelta, + current_time: datetime, +) -> int: + """Allocate interviews within a single time block.""" + block_interview_count = 0 + block_start = max(block['start'], current_time) + current_time = block_start + + while current_time + interview_duration <= block['end'] and applications: + application = applications[0] # Get the next application to process + if allocate_interview(current_time, block, application, position, interview_duration): + applications.pop(0) # Remove the application that was just allocated an interview + block_interview_count += 1 + current_time += interview_duration + + return block_interview_count + + +def allocate_interview( + current_time: datetime, + block: FinalizedTimeBlock, + application: RecruitmentApplication, + position: RecruitmentPosition, + interview_duration: timedelta, +) -> bool: + """Attempt to allocate a single interview at the current time.""" + interview_end_time = current_time + interview_duration + + # Check for existing interviews + if Interview.objects.filter(applications__recruitment_position__recruitment=position.recruitment, interview_time=current_time).exists(): + return False + + available_interviewers = get_available_interviewers_for_timeslot( + list(block['available_interviewers']), current_time, interview_end_time, position.recruitment + ) + + if not available_interviewers: + return False + + if is_applicant_available(application.user, current_time, interview_end_time, position.recruitment): + create_interview(application, current_time, position, available_interviewers) + return True + + return False + + +def create_interview(application: RecruitmentApplication, interview_time: datetime, position: RecruitmentPosition, available_interviewers: list[User]) -> None: + """Create and save an interview for the given application.""" + interview = Interview.objects.create( + interview_time=interview_time, + interview_location=f'Location for {position.name_en}', + room=None, + ) + interview.interviewers.set(available_interviewers) + interview.save() + application.interview = interview # Assign the interview to the application + application.save() + + +def check_allocation_completeness(interview_count: int, applications: list[RecruitmentApplication], position: RecruitmentPosition) -> None: + """Handle the results of the interview allocation process.""" + if interview_count == 0: + raise AllApplicantsUnavailableError(f'All applicants are unavailable for the remaining time slots for position: {position.name_en}') + if applications: + raise InsufficientTimeBlocksError( + f'Not enough time blocks to accommodate all applications for position: {position.name_en}. Allocated {interview_count} interviews.' + ) diff --git a/backend/samfundet/automatic_interview_allocation/exceptions.py b/backend/samfundet/automatic_interview_allocation/exceptions.py new file mode 100644 index 000000000..5c5b215bf --- /dev/null +++ b/backend/samfundet/automatic_interview_allocation/exceptions.py @@ -0,0 +1,43 @@ +from __future__ import annotations + + +class InterviewAllocationError(Exception): + """Base exception for interview allocation errors.""" + + pass + + +class NoTimeBlocksAvailableError(InterviewAllocationError): + """Raised when there are no available time blocks for interviews.""" + + pass + + +class NoApplicationsWithoutInterviewsError(InterviewAllocationError): + """Raised when there are no applications without interviews.""" + + pass + + +class NoAvailableInterviewersError(InterviewAllocationError): + """Raised when there are no available interviewers for a given time slot.""" + + pass + + +class AllApplicantsUnavailableError(InterviewAllocationError): + """Raised when all applicants are unavailable for the remaining time slots.""" + + pass + + +class InsufficientTimeBlocksError(InterviewAllocationError): + """Raised when there are not enough time blocks to accommodate all applications.""" + + pass + + +class NoFutureTimeSlotsError(InterviewAllocationError): + """Raised when there are no time slots available at least 24 hours in the future.""" + + pass diff --git a/backend/samfundet/automatic_interview_allocation/generate_interview_timeblocks.py b/backend/samfundet/automatic_interview_allocation/generate_interview_timeblocks.py new file mode 100644 index 000000000..b524d2924 --- /dev/null +++ b/backend/samfundet/automatic_interview_allocation/generate_interview_timeblocks.py @@ -0,0 +1,264 @@ +from __future__ import annotations + +from typing import TypedDict +from datetime import date, time, datetime, timedelta + +from django.utils import timezone + +from samfundet.models.general import User +from samfundet.models.recruitment import ( + Recruitment, + OccupiedTimeslot, + RecruitmentPosition, +) +from samfundet.automatic_interview_allocation.utils import get_interviewers_grouped_by_section + +# TODO: optimize block rating and interview allocation strategy +# TODO: implement room allocation, based on rooms available at the time the interview has been set + + +class FinalizedTimeBlock(TypedDict): + """ + Represents a finalized time block for interviews during the recruitment process. + + InterviewBlocks are created by processing InterviewTimeBlocks and adding recruitment-specific + context and prioritization. These blocks are used for scheduling across multiple days. + + Each block is characterized by a unique combination of available interviewers and is + associated with a specific recruitment position. Blocks are rated to help prioritize + optimal interview slots during the allocation process. + + For example, given interviewer availability from 08:00 to 16:00: + 1. 08:00 - 12:00: 4 interviewers available (Block I) + 2. 12:00 - 14:00: 3 interviewers available (Block II) + 3. 14:00 - 16:00: 5 interviewers available (Block III) + + Attributes: + start (datetime): Start time of the block + end (datetime): End time of the block + available_interviewers (set[User]): Set of available interviewers for this block + recruitment_position (RecruitmentPosition): The position being recruited for + date (date): The date of the interview block + rating (float): A calculated rating based on block duration, interviewer availability, + and other recruitment-specific factors + """ + + start: datetime + end: datetime + available_interviewers: set[User] + recruitment_position: RecruitmentPosition + date: date + rating: float + + +class IntermediateTimeBlock(TypedDict): + """ + Represents an intermediate time block used for calculations within a single day. + + InterviewTimeBlocks are created during the initial phase of interview scheduling, + focusing on interviewer availability within a specific day. These blocks are later + processed to create finalized InterviewBlocks. + + InterviewTimeBlocks are simpler than InterviewBlocks, containing only the essential + time and availability information without recruitment-specific context or ratings. + + Attributes: + start (datetime): Start time of the block + end (datetime): End time of the block + available_interviewers (set[User]): Set of available interviewers for this block + """ + + start: datetime + end: datetime + available_interviewers: set[User] + + +def get_occupied_timeslots(recruitment: Recruitment) -> OccupiedTimeslot: + """ + Retrieves unavailable timeslots for a given recruitment. + + Args: + recruitment: The recruitment for which unavailable timeslots are fetched. + + Returns: + A queryset of OccupiedTimeslot objects ordered by start time. + """ + return OccupiedTimeslot.objects.filter(recruitment=recruitment).order_by('start_dt') + + +def generate_intermediate_blocks( + position: RecruitmentPosition, start_dt: datetime, end_dt: datetime, unavailability: OccupiedTimeslot, interval: timedelta +) -> list[IntermediateTimeBlock]: + """ + Generates intermediate time within a given time range, accounting for interviewer unavailability. + + Args: + position: Recruitment position. + start_dt: Start datetime of the interview range. + end_dt: End datetime of the interview range. + unavailability: List of unavailable timeslots for interviewers. + interval: Time interval for each block. + + Returns: + A list of time blocks with available interviewers for each block. + """ + all_interviewers = set(position.interviewers.all()) + intermediate_blocks: list[IntermediateTimeBlock] = [] + current_dt = start_dt + + while current_dt < end_dt: + block_end = min(current_dt + interval, end_dt) + available_interviewers = filter_available_interviewers_for_block(all_interviewers, current_dt, block_end, unavailability) + + if available_interviewers: + append_or_extend_last_block(intermediate_blocks, current_dt, block_end, available_interviewers) + + current_dt = block_end + + return intermediate_blocks + + +def create_final_interview_blocks( + position: RecruitmentPosition, start_time: time = time(8, 0), end_time: time = time(23, 0), interval: timedelta = timedelta(minutes=30) +) -> list[FinalizedTimeBlock]: + """ + Generates time blocks for interviews based on the recruitment's time range, + the availability of interviewers, and their unavailability. The blocks are divided + into 30-minute intervals for each day between the start and end dates. + + Args: + position: Recruitment position for which interview time blocks are generated. + + Returns: + List of time blocks with available interviewers, start and end times, and ratings. + """ + recruitment = position.recruitment + + # Determine the time range: current data, start and end dates for interview slots + current_date = timezone.now().date() + start_date = max(recruitment.visible_from.date(), current_date) + end_date = recruitment.actual_application_deadline.date() + + final_blocks: list[FinalizedTimeBlock] = [] + + # Loop through each day in the range to generate time blocks + current_date = start_date + while current_date <= end_date: + # Create datetime objects for the start and end of the day + current_datetime = timezone.make_aware(datetime.combine(current_date, start_time)) + end_datetime = timezone.make_aware(datetime.combine(current_date, end_time)) + + # Fetch unavailability slots and generate blocks for the current day + unavailability = get_occupied_timeslots(recruitment) + intermediate_blocks = generate_intermediate_blocks(position, current_datetime, end_datetime, unavailability, interval) + + # Use list comprehension to create and add InterviewBlock objects + final_blocks.extend( + [ + FinalizedTimeBlock( + start=block['start'], + end=block['end'], + available_interviewers=set(block['available_interviewers']), + recruitment_position=position, + date=current_date, + rating=calculate_block_rating( + block['start'], + block['end'], + set(block['available_interviewers']), + position, + ), + ) + for block in intermediate_blocks + ] + ) + + current_date += timedelta(days=1) + + return final_blocks + + +def filter_available_interviewers_for_block(all_interviewers: set, start: datetime, end: datetime, unavailability: OccupiedTimeslot) -> set: + """ + Determines which interviewers are available for a given time block. + + Args: + all_interviewers: Set of all interviewers. + start: Start time of the block. + end: End time of the block. + unavailability: List of unavailable timeslots. + + Returns: + Set of available interviewers for the block. + """ + available_interviewers = all_interviewers.copy() + for slot in unavailability: + if slot.start_dt < end and slot.end_dt > start: + available_interviewers.discard(slot.user) + return available_interviewers + + +def append_or_extend_last_block(blocks: list, start: datetime, end: datetime, available_interviewers: set) -> None: + """ + Updates an existing block or creates a new one based on available interviewers. + + Args: + blocks: List of existing blocks. + start: Start time of the current block. + end: End time of the current block. + available_interviewers: Set of available interviewers for the current block. + """ + if not blocks or len(blocks[-1]['available_interviewers']) != len(available_interviewers): + blocks.append({'start': start, 'end': end, 'available_interviewers': available_interviewers}) + else: + blocks[-1]['end'] = end + + +def calculate_block_rating(start_dt: datetime, end_dt: datetime, available_interviewers: set[User], position: RecruitmentPosition) -> int: + """ + Calculates a rating for a time block based on interviewer availability, block length, and section diversity. + + For shared interviews (multiple sections), the rating considers: + 1. The number of available interviewers + 2. The length of the time block + 3. The diversity of sections represented by available interviewers + 4. The average number of available interviewers per section + + For non-shared interviews or single-section positions, only the number of + available interviewers and block length are considered. + + Args: + start_dt: Start datetime of the block. + end_dt: End datetime of the block. + available_interviewers: Set of interviewers available for the block. + position: The RecruitmentPosition for which the rating is calculated. + + Returns: + An integer rating for the time block. Higher values indicate more + favorable blocks for scheduling interviews. + """ + + block_length = (end_dt - start_dt).total_seconds() / 3600 + interviewers_grouped_by_section = get_interviewers_grouped_by_section(position) + + if len(interviewers_grouped_by_section) > 1: + represented_sections = sum(1 for section_interviewers in interviewers_grouped_by_section.values() if set(section_interviewers) & available_interviewers) + section_diversity_factor = represented_sections / len(interviewers_grouped_by_section) + + # Calculate the average number of interviewers available per section + avg_interviewers_per_section = sum( + len(set(section_interviewers) & available_interviewers) for section_interviewers in interviewers_grouped_by_section.values() + ) / len(interviewers_grouped_by_section) + + rating = (len(available_interviewers) * 2) + (block_length * 0.5) + (section_diversity_factor * 5) + (avg_interviewers_per_section * 3) + else: + # For non-shared interviews or single section, use the original rating calculation + rating = (len(available_interviewers) * 2) + (block_length * 0.5) + + return max(0, int(rating)) + + +def generate_and_sort_timeblocks(position: RecruitmentPosition) -> list[FinalizedTimeBlock]: + """Generate and sort time blocks by rating (higher rating first).""" + timeblocks = create_final_interview_blocks(position) + timeblocks.sort(key=lambda block: (-block['rating'], block['start'])) + return timeblocks diff --git a/backend/samfundet/automatic_interview_allocation/utils.py b/backend/samfundet/automatic_interview_allocation/utils.py new file mode 100644 index 000000000..50f79ac41 --- /dev/null +++ b/backend/samfundet/automatic_interview_allocation/utils.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +from datetime import datetime +from collections import defaultdict + +from django.db.models import Q, QuerySet + +from samfundet.models.general import User, GangSection +from samfundet.models.recruitment import Interview, Recruitment, OccupiedTimeslot, RecruitmentPosition + +UserIdType = int +TimeSlotType = tuple[datetime, datetime] + +UnavailabilityTypeDict = dict[UserIdType, list[TimeSlotType]] + + +def get_available_interviewers_for_timeslot(interviewers: list[User], start_dt: datetime, end_dt: datetime, recruitment: Recruitment) -> list[User]: + """ + Filters interviewers who are available for a specific time slot. + Args: + interviewers: List of interviewers to check availability for. + start_dt: The start datetime of the interview slot. + end_dt: The end datetime of the interview slot. + recruitment: The recruitment for which to check availability. + Returns: + A list of available interviewers. + """ + unavailable_interviewer_ids = set( + OccupiedTimeslot.objects.filter(user__in=interviewers, recruitment=recruitment) + .filter( + Q(start_dt__lt=end_dt, end_dt__gt=start_dt) # Overlaps with the start + | Q(start_dt__lt=end_dt, end_dt__gt=end_dt) # Overlaps with the end + | Q(start_dt__gte=start_dt, end_dt__lte=end_dt) # Fully within the interval + ) + .values_list('user_id', flat=True) + ) + + return [interviewer for interviewer in interviewers if interviewer.id not in unavailable_interviewer_ids] + + +def is_applicant_available(applicant: User, start_dt: datetime, end_dt: datetime, recruitment: Recruitment) -> bool: + """ + Checks if an applicant is available for an interview during a given time range. + Args: + applicant: The applicant to check availability for. + start_dt: The start datetime of the interview slot. + end_dt: The end datetime of the interview slot. + recruitment: The recruitment to which the applicant has applied. + Returns: + A boolean indicating whether the applicant is available for the given time range. + """ + # Check for existing interviews + existing_interview = Interview.objects.filter( + applications__user=applicant, applications__recruitment=recruitment, interview_time__lt=end_dt, interview_time__gte=start_dt + ).exists() + + if existing_interview: + return False + + # Check applicant's unavailability + applicant_unavailable = ( + OccupiedTimeslot.objects.filter(user=applicant, recruitment=recruitment) + .filter( + Q(start_dt__lt=end_dt, end_dt__gt=start_dt) # Overlaps with the start + | Q(start_dt__lt=end_dt, end_dt__gt=end_dt) # Overlaps with the end + | Q(start_dt__gte=start_dt, end_dt__lte=end_dt) # Fully within the interval + ) + .exists() + ) + + return not applicant_unavailable + + +def get_interviewers_grouped_by_section(recruitment_position: RecruitmentPosition) -> dict[GangSection, QuerySet[User]]: + interviewers_by_section = defaultdict(set) + + positions = recruitment_position.shared_interview_group.positions.all() if recruitment_position.shared_interview_group else [recruitment_position] + + for position in positions: + if position.section: + interviewers_by_section[position.section].update(position.interviewers.all()) + + return {section: User.objects.filter(id__in=[u.id for u in interviewers]) for section, interviewers in interviewers_by_section.items()} diff --git a/backend/samfundet/tests/test_interview_allocation.py b/backend/samfundet/tests/test_interview_allocation.py new file mode 100644 index 000000000..8f8523f73 --- /dev/null +++ b/backend/samfundet/tests/test_interview_allocation.py @@ -0,0 +1,416 @@ +from __future__ import annotations + +from datetime import timedelta +from unittest.mock import patch + +import pytest + +from django.utils import timezone + +from samfundet.models.general import Gang, User, GangSection, Organization +from samfundet.models.recruitment import Recruitment, OccupiedTimeslot, RecruitmentPosition, RecruitmentPositionSharedInterviewGroup +from samfundet.automatic_interview_allocation.utils import is_applicant_available, get_interviewers_grouped_by_section, get_available_interviewers_for_timeslot + + +@pytest.fixture +def setup_recruitment(): + organization = Organization.objects.create(name='Test Org') + now = timezone.now() + recruitment = Recruitment.objects.create( + name_nb='Test Recruitment', + name_en='Test Recruitment', + organization=organization, + visible_from=now, + actual_application_deadline=now + timedelta(days=30), + shown_application_deadline=now + timedelta(days=28), + reprioritization_deadline_for_applicant=now + timedelta(days=35), + reprioritization_deadline_for_groups=now + timedelta(days=40), + ) + return organization, recruitment + + +@pytest.fixture +def setup_users(): + user1 = User.objects.create(username='user1', email='user1@example.com') + user2 = User.objects.create(username='user2', email='user2@example.com') + user3 = User.objects.create(username='user3', email='user3@example.com') + return user1, user2, user3 + + +@pytest.mark.django_db +def test_get_available_interviewers_for_timeslot(setup_recruitment, setup_users): + recruitment = setup_recruitment + user1, user2, user3 = setup_users + + with patch('samfundet.models.recruitment.OccupiedTimeslot.objects.filter') as mock_filter: + mock_filter.return_value.filter.return_value.values_list.return_value = [user1.id] + + start_dt = timezone.now() + end_dt = start_dt + timedelta(hours=1) + + available_interviewers = get_available_interviewers_for_timeslot([user1, user2, user3], start_dt, end_dt, recruitment) + + assert len(available_interviewers) == 2 + assert user1 not in available_interviewers + assert user2 in available_interviewers + assert user3 in available_interviewers + + +@pytest.mark.django_db +def test_is_applicant_available(setup_recruitment, setup_users): + recruitment = setup_recruitment + user1 = setup_users + + with ( + patch('samfundet.models.recruitment.Interview.objects.filter') as mock_interview_filter, + patch('samfundet.models.recruitment.OccupiedTimeslot.objects.filter') as mock_occupied_filter, + ): + # Test when applicant is available + mock_interview_filter.return_value.exists.return_value = False + mock_occupied_filter.return_value.filter.return_value.exists.return_value = False + + start_dt = timezone.now() + end_dt = start_dt + timedelta(hours=1) + + is_available = is_applicant_available(user1, start_dt, end_dt, recruitment) + assert is_available is True + + # Test when applicant has an interview + mock_interview_filter.return_value.exists.return_value = True + + is_available = is_applicant_available(user1, start_dt, end_dt, recruitment) + assert is_available is False + + # Test when applicant has an occupied timeslot + mock_interview_filter.return_value.exists.return_value = False + mock_occupied_filter.return_value.filter.return_value.exists.return_value = True + + is_available = is_applicant_available(user1, start_dt, end_dt, recruitment) + assert is_available is False + + +@pytest.mark.django_db +def test_get_interviewers_grouped_by_section(setup_recruitment, setup_users): + organization, recruitment = setup_recruitment + user1, user2, user3 = setup_users + + gang1 = Gang.objects.create(name_nb='Gang 1', name_en='Gang 1', organization=organization) + gang2 = Gang.objects.create(name_nb='Gang 2', name_en='Gang 2', organization=organization) + + section1 = GangSection.objects.create(name_nb='Section 1', name_en='Section 1', gang=gang1) + section2 = GangSection.objects.create(name_nb='Section 2', name_en='Section 2', gang=gang2) + + position1 = RecruitmentPosition.objects.create( + name_nb='Position 1', + name_en='Position 1', + section=section1, + recruitment=recruitment, + short_description_nb='Short description 1', + short_description_en='Short description 1 EN', + long_description_nb='Long description 1', + long_description_en='Long description 1 EN', + is_funksjonaer_position=False, + default_application_letter_nb='Default application letter 1', + default_application_letter_en='Default application letter 1 EN', + tags='tag1,tag2', + ) + position2 = RecruitmentPosition.objects.create( + name_nb='Position 2', + name_en='Position 2', + section=section2, + recruitment=recruitment, + short_description_nb='Short description 2', + short_description_en='Short description 2 EN', + long_description_nb='Long description 2', + long_description_en='Long description 2 EN', + is_funksjonaer_position=False, + default_application_letter_nb='Default application letter 2', + default_application_letter_en='Default application letter 2 EN', + tags='tag2,tag3', + ) + + position1.interviewers.add(user1, user2) + position2.interviewers.add(user2, user3) + + shared_group = RecruitmentPositionSharedInterviewGroup.objects.create(recruitment=recruitment) + shared_group.positions.add(position1, position2) + + recruitment_position = RecruitmentPosition.objects.create( + name_nb='Shared Position', + name_en='Shared Position', + recruitment=recruitment, + short_description_nb='Short description shared', + short_description_en='Short description shared EN', + long_description_nb='Long description shared', + long_description_en='Long description shared EN', + is_funksjonaer_position=False, + default_application_letter_nb='Default application letter shared', + default_application_letter_en='Default application letter shared EN', + tags='tag1,tag2,tag3', + gang=gang1, + ) + recruitment_position.shared_interview_group = shared_group + recruitment_position.save() + + result = get_interviewers_grouped_by_section(recruitment_position) + + assert len(result) == 2 + assert section1 in result + assert section2 in result + assert result[section1].count() == 2 + assert result[section2].count() == 2 + assert user1 in result[section1] + assert user2 in result[section1] + assert user2 in result[section2] + assert user3 in result[section2] + + +# New tests start here + + +@pytest.mark.django_db +def test_get_available_interviewers_no_interviewers(setup_recruitment): + _, recruitment = setup_recruitment + start_dt = timezone.now() + end_dt = start_dt + timedelta(hours=1) + + available_interviewers = get_available_interviewers_for_timeslot([], start_dt, end_dt, recruitment) + assert len(available_interviewers) == 0 + + +@pytest.mark.django_db +def test_get_available_interviewers_all_occupied(setup_recruitment, setup_users): + recruitment = setup_recruitment + user1, user2, user3 = setup_users + start_dt = timezone.now() + end_dt = start_dt + timedelta(hours=1) + + with patch('samfundet.models.recruitment.OccupiedTimeslot.objects.filter') as mock_filter: + mock_filter.return_value.filter.return_value.values_list.return_value = {user1.id, user2.id, user3.id} + available_interviewers = get_available_interviewers_for_timeslot([user1, user2, user3], start_dt, end_dt, recruitment) + assert len(available_interviewers) == 0 + + +@pytest.mark.django_db +def test_get_available_interviewers_complex_overlap(setup_recruitment, setup_users): + recruitment = setup_recruitment + user1, user2, user3 = setup_users + start_dt = timezone.now() + mid_dt = start_dt + timedelta(minutes=30) + end_dt = start_dt + timedelta(hours=1) + + with patch('samfundet.models.recruitment.OccupiedTimeslot.objects.filter') as mock_filter: + mock_filter.return_value.filter.return_value.values_list.return_value = { + user1.id, # user1 is occupied for the entire period + user2.id, # user2 is occupied for the first half + } + available_interviewers = get_available_interviewers_for_timeslot([user1, user2, user3], start_dt, end_dt, recruitment) + assert len(available_interviewers) == 1 + assert user3 in available_interviewers + + # Now check for the second half of the time period + mock_filter.return_value.filter.return_value.values_list.return_value = { + user1.id, # user1 is still occupied + user3.id, # user3 is now occupied + } + available_interviewers = get_available_interviewers_for_timeslot([user1, user2, user3], mid_dt, end_dt, recruitment) + assert len(available_interviewers) == 1 + assert user2 in available_interviewers + + +@pytest.mark.django_db +def test_is_applicant_available_multiple_conflicts(setup_recruitment, setup_users): + recruitment = setup_recruitment + user1, _, _ = setup_users + start_dt = timezone.now() + end_dt = start_dt + timedelta(hours=2) + + with ( + patch('samfundet.models.recruitment.Interview.objects.filter') as mock_interview_filter, + patch('samfundet.models.recruitment.OccupiedTimeslot.objects.filter') as mock_occupied_filter, + ): + mock_interview_filter.return_value.exists.return_value = True + mock_occupied_filter.return_value.filter.return_value.exists.return_value = True + + is_available = is_applicant_available(user1, start_dt, end_dt, recruitment) + assert is_available is False + + +@pytest.mark.django_db +def test_is_applicant_available_edge_cases(setup_recruitment, setup_users): + recruitment = setup_recruitment + user1, _, _ = setup_users + start_dt = timezone.now() + end_dt = start_dt + timedelta(microseconds=1) + + with ( + patch('samfundet.models.recruitment.Interview.objects.filter') as mock_interview_filter, + patch('samfundet.models.recruitment.OccupiedTimeslot.objects.filter') as mock_occupied_filter, + ): + mock_interview_filter.return_value.exists.return_value = False + mock_occupied_filter.return_value.filter.return_value.exists.return_value = False + + is_available = is_applicant_available(user1, start_dt, end_dt, recruitment) + assert is_available is True + + +@pytest.mark.django_db +def test_get_interviewers_grouped_by_section_complex(setup_recruitment, setup_users): + organization, recruitment = setup_recruitment + user1, user2, user3 = setup_users + + gang1 = Gang.objects.create(name_nb='Gang 1', name_en='Gang 1', organization=organization) + gang2 = Gang.objects.create(name_nb='Gang 2', name_en='Gang 2', organization=organization) + + section1 = GangSection.objects.create(name_nb='Section 1', name_en='Section 1', gang=gang1) + section2 = GangSection.objects.create(name_nb='Section 2', name_en='Section 2', gang=gang2) + section3 = GangSection.objects.create(name_nb='Section 3', name_en='Section 3', gang=gang1) + + position1 = RecruitmentPosition.objects.create( + name_nb='Position 1', + name_en='Position 1', + section=section1, + recruitment=recruitment, + short_description_nb='Short 1', + short_description_en='Short 1 EN', + long_description_nb='Long 1', + long_description_en='Long 1 EN', + is_funksjonaer_position=False, + default_application_letter_nb='Default 1', + default_application_letter_en='Default 1 EN', + tags='tag1', + ) + position2 = RecruitmentPosition.objects.create( + name_nb='Position 2', + name_en='Position 2', + section=section2, + recruitment=recruitment, + short_description_nb='Short 2', + short_description_en='Short 2 EN', + long_description_nb='Long 2', + long_description_en='Long 2 EN', + is_funksjonaer_position=False, + default_application_letter_nb='Default 2', + default_application_letter_en='Default 2 EN', + tags='tag2', + ) + position3 = RecruitmentPosition.objects.create( + name_nb='Position 3', + name_en='Position 3', + section=section3, + recruitment=recruitment, + short_description_nb='Short 3', + short_description_en='Short 3 EN', + long_description_nb='Long 3', + long_description_en='Long 3 EN', + is_funksjonaer_position=False, + default_application_letter_nb='Default 3', + default_application_letter_en='Default 3 EN', + tags='tag3', + ) + + position1.interviewers.add(user1, user2) + position2.interviewers.add(user2, user3) + position3.interviewers.add(user1, user3) + + shared_group = RecruitmentPositionSharedInterviewGroup.objects.create(recruitment=recruitment) + shared_group.positions.add(position1, position2, position3) + + recruitment_position = RecruitmentPosition.objects.create( + name_nb='Shared Position', + name_en='Shared Position', + recruitment=recruitment, + short_description_nb='Short shared', + short_description_en='Short shared EN', + long_description_nb='Long shared', + long_description_en='Long shared EN', + is_funksjonaer_position=False, + default_application_letter_nb='Default shared', + default_application_letter_en='Default shared EN', + tags='tag1,tag2,tag3', + gang=gang1, + ) + recruitment_position.shared_interview_group = shared_group + recruitment_position.save() + + result = get_interviewers_grouped_by_section(recruitment_position) + + assert len(result) == 3 + assert section1 in result + assert section2 in result + assert section3 in result + assert result[section1].count() == 2 + assert result[section2].count() == 2 + assert result[section3].count() == 2 + assert user1 in result[section1] + assert user2 in result[section1] + assert user2 in result[section2] + assert user3 in result[section2] + assert user1 in result[section3] + assert user3 in result[section3] + + +@pytest.mark.django_db +def test_get_available_interviewers_for_timeslot_filtering(setup_recruitment, setup_users): + """Test the get_available_interviewers_for_timeslot function with various scenarios.""" + _, recruitment = setup_recruitment + user1, user2, user3 = setup_users + user4, user5, user6 = [User.objects.create(username=f'user{i}', email=f'user{i}@example.com') for i in range(4, 7)] + start_dt = timezone.now() + end_dt = start_dt + timedelta(hours=2) + + def create_occupied_timeslot(user: User, start_offset: timedelta, end_offset: timedelta) -> OccupiedTimeslot: + return OccupiedTimeslot.objects.create(user=user, recruitment=recruitment, start_dt=start_dt + start_offset, end_dt=start_dt + end_offset) + + # Create OccupiedTimeslots for different scenarios + scenarios = [ + (user1, timedelta(hours=-1), timedelta(minutes=30)), # Overlaps start + (user2, timedelta(hours=1, minutes=30), timedelta(hours=3)), # Overlaps end + (user3, timedelta(minutes=30), timedelta(hours=1, minutes=30)), # Fully within + (user4, timedelta(hours=-2), timedelta(hours=-1)), # Before interval + (user5, timedelta(hours=3), timedelta(hours=4)), # After interval + (user6, timedelta(hours=-1), timedelta(hours=3)), # Completely encompasses interval + ] + + for user, start_offset, end_offset in scenarios: + create_occupied_timeslot(user, start_offset, end_offset) + + # Test with the exact interval + available_interviewers = get_available_interviewers_for_timeslot([user1, user2, user3, user4, user5, user6], start_dt, end_dt, recruitment) + assert set(available_interviewers) == {user4, user5} + + # Add detailed checks immediately after the exact interval test + assert user1 not in available_interviewers, 'User1 should be unavailable due to overlap with start' + assert user2 not in available_interviewers, 'User2 should be unavailable due to overlap with end' + assert user3 not in available_interviewers, 'User3 should be unavailable due to being fully within interval' + assert user4 in available_interviewers, 'User4 should be available (occupied time is before interval)' + assert user5 in available_interviewers, 'User5 should be available (occupied time is after interval)' + assert user6 not in available_interviewers, 'User6 should be unavailable (occupied time encompasses interval)' + + # Test with a smaller interval + smaller_end_dt = start_dt + timedelta(hours=1) + available_interviewers_smaller = get_available_interviewers_for_timeslot([user1, user2, user3, user4, user5, user6], start_dt, smaller_end_dt, recruitment) + assert set(available_interviewers_smaller) == {user2, user4, user5} + + # Test with a larger interval + larger_end_dt = start_dt + timedelta(hours=3) + available_interviewers_larger = get_available_interviewers_for_timeslot([user1, user2, user3, user4, user5, user6], start_dt, larger_end_dt, recruitment) + + # Since user5's occupied time is after the requested interval, they should be available + assert set(available_interviewers_larger) == {user4, user5} + + # Detailed assertions + assert user1 not in available_interviewers, 'User1 should be unavailable due to overlap with start' + assert user2 not in available_interviewers, 'User2 should be unavailable due to overlap with end' + assert user3 not in available_interviewers, 'User3 should be unavailable due to being fully within interval' + assert user4 in available_interviewers, 'User4 should be available (occupied time is before interval)' + assert user5 in available_interviewers, 'User5 should be available (occupied time is after interval)' + assert user6 not in available_interviewers, 'User6 should be unavailable (occupied time encompasses interval)' + + # Test with no interviewers + assert len(get_available_interviewers_for_timeslot([], start_dt, end_dt, recruitment)) == 0 + + # Test with no occupied timeslots + OccupiedTimeslot.objects.all().delete() + all_available = get_available_interviewers_for_timeslot([user1, user2, user3, user4, user5, user6], start_dt, end_dt, recruitment) + assert set(all_available) == {user1, user2, user3, user4, user5, user6} diff --git a/backend/samfundet/urls.py b/backend/samfundet/urls.py index a0246ae45..6c7170d52 100644 --- a/backend/samfundet/urls.py +++ b/backend/samfundet/urls.py @@ -140,4 +140,5 @@ path('recruitment//availability/', views.RecruitmentAvailabilityView.as_view(), name='recruitment_availability'), path('feedback/', views.UserFeedbackView.as_view(), name='feedback'), path('purchase-feedback/', views.PurchaseFeedbackView.as_view(), name='purchase_feedback'), + path('allocate-interviews//', views.AutomaticInterviewAllocationView.as_view(), name='allocate_interviews'), ] diff --git a/backend/samfundet/views.py b/backend/samfundet/views.py index f53726a25..7aefb3203 100644 --- a/backend/samfundet/views.py +++ b/backend/samfundet/views.py @@ -38,6 +38,17 @@ REQUESTED_IMPERSONATE_USER, ) +from samfundet.automatic_interview_allocation.exceptions import ( + NoFutureTimeSlotsError, + InterviewAllocationError, + NoTimeBlocksAvailableError, + InsufficientTimeBlocksError, + NoAvailableInterviewersError, + AllApplicantsUnavailableError, + NoApplicationsWithoutInterviewsError, +) +from samfundet.automatic_interview_allocation.allocate_interviews_for_position import allocate_interviews_for_position + from .utils import event_query, generate_timeslots, get_occupied_timeslots_from_request from .homepage import homepage from .models.role import Role @@ -1322,3 +1333,51 @@ def post(self, request: Request) -> Response: form=purchase_model, ) return Response(status=status.HTTP_201_CREATED, data={'message': 'Feedback submitted successfully!'}) + + +class AutomaticInterviewAllocationView(APIView): + permission_classes = [IsAuthenticated] + + def post(self, request: Request, pk: int) -> Response: + position = get_object_or_404(RecruitmentPosition, id=pk) + + try: + interview_count = allocate_interviews_for_position(position) + return self.get_success_response(pk, interview_count) + except InterviewAllocationError as e: + return self.handle_allocation_error(e, interview_count=getattr(e, 'interview_count', 0)) + except Exception as e: + return Response({'error': f'An unexpected error occurred: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + def get_success_response(self, pk: int, interview_count: int) -> Response: + if interview_count > 0: + return Response( + { + 'message': f'Interviews allocated successfully for position {pk}.', + 'interviews_allocated': interview_count, + }, + status=status.HTTP_200_OK, + ) + return Response( + { + 'message': f'No interviews were allocated for position {pk}.', + 'interviews_allocated': 0, + }, + status=status.HTTP_204_NO_CONTENT, + ) + + def handle_allocation_error(self, error: InterviewAllocationError, interview_count: int = 0) -> Response: + error_responses = { + NoTimeBlocksAvailableError: ('No available time blocks for interviews.', status.HTTP_400_BAD_REQUEST), + NoApplicationsWithoutInterviewsError: ('No applications without interviews found.', status.HTTP_400_BAD_REQUEST), + NoAvailableInterviewersError: ('No available interviewers for any time slot.', status.HTTP_400_BAD_REQUEST), + AllApplicantsUnavailableError: ('All applicants are unavailable for the remaining time slots.', status.HTTP_400_BAD_REQUEST), + NoFutureTimeSlotsError: ('No time slots available at least 24 hours in the future.', status.HTTP_400_BAD_REQUEST), + InsufficientTimeBlocksError: ('Not enough time blocks to accommodate all applications.', status.HTTP_206_PARTIAL_CONTENT), + } + + message, status_code = error_responses.get(type(error), ('An error occurred during interview allocation.', status.HTTP_400_BAD_REQUEST)) + + response_data = {'error': message, 'interviews_allocated': interview_count} + + return Response(response_data, status=status_code)