diff --git a/README.md b/README.md index fde260c..e5ee7c8 100644 --- a/README.md +++ b/README.md @@ -75,3 +75,21 @@ Console output contains information about center allocation run. Total remaining capacity across all centers: 190 Students not assigned: 29 + +## Usage + +To run the `school_center.py` script, follow these steps: + +1. Make sure you have Python installed on your system. + +2. Navigate to the directory containing the script in your terminal or command prompt. + +3. Run the script with the following command: + + python school_center.py sample_data/schools_grade12_2081.tsv sample_data/centers_grade12_2081.tsv sample_data/prefs.tsv -o output_file_name.tsv + + - Replace `sample_data/schools_grade12_2081.tsv`, `sample_data/centers_grade12_2081.tsv`, and `sample_data/prefs.tsv` with the paths to your input data files. + - Replace `output_file_name.tsv` with the desired name for the output file. + +4. After the script finishes execution, you will find the output file in the same directory, named according to the `-o` parameter you provided. + diff --git a/school_center.py b/school_center.py index c68cdf7..524ac65 100644 --- a/school_center.py +++ b/school_center.py @@ -1,91 +1,102 @@ -OUTPUT_DIR = 'results/' - -PREF_DISTANCE_THRESHOLD = 2 # Preferred threshold distance in kilometers -ABS_DISTANCE_THRESHOLD = 7 # Absolute threshold distance in kilometers -MIN_STUDENT_IN_CENTER = 10 # minimum number of students from a school to be assigned to a center in normal circumstances -STRETCH_CAPACITY_FACTOR = 0.02 # how much can center capacity be streched if need arises -PREF_CUTOFF = -4 # Do not allocate students with pref score less than cutoff - -import math -import csv -import random -import logging -import argparse -import os -from typing import Dict, List - +# Constants for distance thresholds, minimum student count, and capacity factors from utils.custom_logger import configure_logging +from typing import Dict, List +import os +import argparse +import logging +import random +import csv +import math +OUTPUT_DIR = 'results/' +PREF_DISTANCE_THRESHOLD = 2 # Preferred distance threshold in kilometers +ABS_DISTANCE_THRESHOLD = 7 # Absolute distance threshold in kilometers +# Minimum number of students from a school to be assigned to a center in normal circumstances +MIN_STUDENT_IN_CENTER = 10 +STRETCH_CAPACITY_FACTOR = 0.02 # Capacity stretching factor +PREF_CUTOFF = -4 # Preference score cutoff +# Configure logging configure_logging() - logger = logging.getLogger(__name__) -def create_dir(dirPath:str): +# Function to create directory if it doesn't exist + + +def create_directory(dir_path: str): """ - Create the given directory if it doesn't exists - - Creates all the directories needed to resolve to the provided directory path + Create the given directory if it doesn't exist. + Creates all the directories needed to resolve to the provided directory path. """ - if not os.path.exists(dirPath): - os.makedirs(dirPath) + if not os.path.exists(dir_path): + os.makedirs(dir_path) + +# Function to calculate haversine distance between two points + def haversine_distance(lat1, lon1, lat2, lon2): """ Calculate the great circle distance between two points - on the earth specified in decimal degrees + on the earth specified in decimal degrees. """ # Convert decimal degrees to radians lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2]) - + # Haversine formula dlon = lon2 - lon1 dlat = lat2 - lat1 - a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2 + a = math.sin(dlat/2)**2 + math.cos(lat1) * \ + math.cos(lat2) * math.sin(dlon/2)**2 c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) radius_earth = 6371 # Radius of Earth in kilometers distance = radius_earth * c return distance -def centers_within_distance(school: Dict[str, str], centers: Dict[str, str], distance_threshold: float) -> List[Dict[str, any]]: +# Function to find centers within a certain distance from a school + + +def find_centers_within_distance(school: Dict[str, str], centers: Dict[str, str], distance_threshold: float) -> List[Dict[str, any]]: """ - Return List of centers that are within given distance from school. - If there are no centers within given distance return one that is closest - Returned params : + Return a list of centers that are within a given distance from a school. + If there are no centers within the given distance, return the nearest one. + Returned parameters: {'cscode', 'name', 'address', 'capacity', 'lat', 'long', 'distance_km'} - """ def center_to_dict(c, distance): return {'cscode': c['cscode'], 'name': c['name'], 'address': c['address'], 'capacity': c['capacity'], 'lat': c['lat'], 'long': c['long'], 'distance_km': distance} - + def sort_key(c): - # intent: sort by preference score DESC then by distance_km ASC - # leaky abstraction - sorted requires a single numberic value for each element - return c['distance_km'] * random.uniform(1,5) - get_pref(school['scode'], c['cscode'])*100 - + # Sort by preference score descending, then by distance_km ascending + return c['distance_km'] * random.uniform(1, 5) - get_preference_score(school['scode'], c['cscode']) * 100 + school_lat = school.get('lat') school_long = school.get('long') if len(school_lat) == 0 or len(school_long) == 0: return [] - + within_distance = [] - nearest_distance = None; + nearest_distance = None nearest_center = None - for c in centers: - distance = haversine_distance(float(school_lat), float(school_long), float(c.get('lat')), float(c.get('long'))) + for c in centers: + distance = haversine_distance(float(school_lat), float( + school_long), float(c.get('lat')), float(c.get('long'))) if school['scode'] == c['cscode']: continue if nearest_center == None or distance < nearest_distance: nearest_center = c nearest_distance = distance - if distance <= distance_threshold and get_pref(school['scode'], c['cscode']) > PREF_CUTOFF: + if distance <= distance_threshold and get_preference_score(school['scode'], c['cscode']) > PREF_CUTOFF: within_distance.append(center_to_dict(c, distance)) - + if len(within_distance) > 0: - return sorted(within_distance, key=sort_key) - else: # if there are no centers within given threshold, return one that is closest + return sorted(within_distance, key=sort_key) + else: # If there are no centers within the given threshold, return the nearest one return [center_to_dict(nearest_center, nearest_distance)] +# Function to read data from a TSV file + + def read_tsv(file_path: str) -> List[Dict[str, str]]: data = [] with open(file_path, 'r', newline='', encoding='utf-8') as file: @@ -94,7 +105,10 @@ def read_tsv(file_path: str) -> List[Dict[str, str]]: data.append(dict(row)) return data -def read_prefs(file_path: str) -> Dict[str, Dict[str, int]]: +# Function to read preference scores from a TSV file + + +def read_preference_scores(file_path: str) -> Dict[str, Dict[str, int]]: prefs = {} with open(file_path, 'r', newline='', encoding='utf-8') as file: reader = csv.DictReader(file, delimiter='\t') @@ -106,30 +120,39 @@ def read_prefs(file_path: str) -> Dict[str, Dict[str, int]]: prefs[row['scode']][row['cscode']] = int(row['pref']) else: prefs[row['scode']] = {row['cscode']: int(row['pref'])} - return prefs -def get_pref(scode, cscode) -> int: +# Function to get preference score + + +def get_preference_score(scode, cscode) -> int: if prefs.get(scode): if prefs[scode].get(cscode): return prefs[scode][cscode] else: return 0 else: - return 0 + return 0 -def calc_per_center(count: int) -> int: +# Function to calculate per center count + + +def calculate_per_center(count: int) -> int: if count <= 400: return 100 - # elif count <= 900: - # return 200 - else: + else: return 200 +# Function to sort schools + + def school_sort_key(s): - return (-1 if int(s['count']) > 500 else 1 ) * random.uniform(1, 100) + return (-1 if int(s['count']) > 500 else 1) * random.uniform(1, 100) + +# Function to allocate students to centers + -def allocate(scode:str, cscode:str, count: int): +def allocate_students(scode: str, cscode: str, count: int): if allocations.get(scode) == None: allocations[scode] = {cscode: count} elif allocations[scode].get(cscode) == None: @@ -137,85 +160,109 @@ def allocate(scode:str, cscode:str, count: int): else: allocations[scode][cscode] += count -def is_allocated(scode1: str, scode2:str) -> bool: +# Function to check if a school is allocated to a center + + +def is_allocated_to_center(scode1: str, scode2: str) -> bool: if allocations.get(scode1): return allocations[scode1].get(scode2) != None else: return False + +# Argument parser for command line interface parser = argparse.ArgumentParser( - prog='center randomizer', - description='Assigns centers to exam centers to students') -parser.add_argument('schools_tsv', default='schools.tsv', help="Tab separated (TSV) file containing school details") -parser.add_argument('centers_tsv', default='centers.tsv', help="Tab separated (TSV) file containing center details") -parser.add_argument('prefs_tsv', default='prefs.tsv', help="Tab separated (TSV) file containing preference scores") -parser.add_argument('-o', '--output', default='school-center.tsv', help='Output file') -parser.add_argument('-s', '--seed', action='store', metavar='SEEDVALUE', default=None, type=float, help='Initialization seed for Random Number Generator') + prog='center randomizer', + description='Assigns centers to exam centers to students') +parser.add_argument('schools_tsv', default='schools.tsv', + help="Tab separated (TSV) file containing school details") +parser.add_argument('centers_tsv', default='centers.tsv', + help="Tab separated (TSV) file containing center details") +parser.add_argument('prefs_tsv', default='prefs.tsv', + help="Tab separated (TSV) file containing preference scores") +parser.add_argument( + '-o', '--output', default='school-center.tsv', help='Output file') +parser.add_argument('-s', '--seed', action='store', metavar='SEEDVALUE', default=None, + type=float, help='Initialization seed for Random Number Generator') args = parser.parse_args() -random = random.Random(args.seed) #overwrites the random module to use seeded rng +random.seed(args.seed) # Seed the random number generator -schools = sorted(read_tsv(args.schools_tsv), key= school_sort_key) +# Read data from TSV files +schools = sorted(read_tsv(args.schools_tsv), key=school_sort_key) centers = read_tsv(args.centers_tsv) -centers_remaining_cap = {c['cscode']:int(c['capacity']) for c in centers} -prefs = read_prefs(args.prefs_tsv) +centers_remaining_capacity = {c['cscode']: int(c['capacity']) for c in centers} +prefs = read_preference_scores(args.prefs_tsv) -remaining = 0 # stores count of non allocated students -allocations = {} # to track mutual allocations +remaining_students = 0 # Count of non-allocated students +allocations = {} # Dictionary to track allocations -create_dir(OUTPUT_DIR) # Create the output directory if not exists +create_directory(OUTPUT_DIR) # Create the output directory if it doesn't exist + +# Open output files with open('{}school-center-distance.tsv'.format(OUTPUT_DIR), 'w', encoding='utf-8') as intermediate_file, \ -open(OUTPUT_DIR + args.output, 'w', encoding='utf-8') as a_file: + open(OUTPUT_DIR + args.output, 'w', encoding='utf-8') as allocation_file: writer = csv.writer(intermediate_file, delimiter="\t") - writer.writerow(["scode", "s_count", "school_name", "school_lat", "school_long", "cscode", "center_name", "center_address", "center_capacity", "distance_km"]) - - allocation_file = csv.writer(a_file, delimiter='\t') - allocation_file.writerow(["scode", "school", "cscode", "center", "center_address", "allocation", "distance_km"]) - - for s in schools: - centers_for_school = centers_within_distance(s, centers, PREF_DISTANCE_THRESHOLD) - to_allot = int(s['count']) - per_center = calc_per_center(to_allot) + writer.writerow(["scode", "s_count", "school_name", "school_lat", "school_long", + "cscode", "center_name", "center_address", "center_capacity", "distance_km"]) + + allocation_writer = csv.writer(allocation_file, delimiter='\t') + allocation_writer.writerow( + ["scode", "school", "cscode", "center", "center_address", "allocation", "distance_km"]) + + for school in schools: + centers_for_school = find_centers_within_distance( + school, centers, PREF_DISTANCE_THRESHOLD) + to_allocate = int(school['count']) + per_center_count = calculate_per_center(to_allocate) allocated_centers = {} - # per_center = math.ceil(to_allot / min(calc_num_centers(to_allot), len(centers_for_school))) - for c in centers_for_school: - writer.writerow([s['scode'], s['count'], s['name-address'], s['lat'], s['long'], c['cscode'], c['name'], c['address'], c['capacity'], c['distance_km'] ]) - if is_allocated(c['cscode'], s['scode']): + for center in centers_for_school: + writer.writerow([school['scode'], school['count'], school['name-address'], school['lat'], school['long'], + center['cscode'], center['name'], center['address'], center['capacity'], center['distance_km']]) + if is_allocated_to_center(center['cscode'], school['scode']): continue - next_allot = min(to_allot, per_center, max(centers_remaining_cap[c['cscode']], MIN_STUDENT_IN_CENTER)) - if to_allot > 0 and next_allot > 0 and centers_remaining_cap[c['cscode']] >= next_allot: - allocated_centers[c['cscode']] = c - allocate(s['scode'], c['cscode'], next_allot) - # allocation.writerow([s['scode'], s['name-address'], c['cscode'], c['name'], c['address'], next_allot, c['distance_km']]) - to_allot -= next_allot - centers_remaining_cap[c['cscode']] -= next_allot - - if to_allot > 0: # try again with relaxed constraints and more capacity at centers - expanded_centers = centers_within_distance(s, centers, ABS_DISTANCE_THRESHOLD) - for c in expanded_centers: - if is_allocated(c['cscode'], s['scode']): + next_allocation = min(to_allocate, per_center_count, max( + centers_remaining_capacity[center['cscode']], MIN_STUDENT_IN_CENTER)) + if to_allocate > 0 and next_allocation > 0 and centers_remaining_capacity[center['cscode']] >= next_allocation: + allocated_centers[center['cscode']] = center + allocate_students( + school['scode'], center['cscode'], next_allocation) + to_allocate -= next_allocation + centers_remaining_capacity[center['cscode']] -= next_allocation + + if to_allocate > 0: # Try again with relaxed constraints and more capacity at centers + expanded_centers = find_centers_within_distance( + school, centers, ABS_DISTANCE_THRESHOLD) + for center in expanded_centers: + if is_allocated_to_center(center['cscode'], school['scode']): continue - stretched_capacity = math.floor(int(c['capacity']) * STRETCH_CAPACITY_FACTOR + centers_remaining_cap[c['cscode']]) - next_allot = min(to_allot, max(stretched_capacity, MIN_STUDENT_IN_CENTER)) - if to_allot > 0 and next_allot > 0 and stretched_capacity >= next_allot: - allocated_centers[c['cscode']] = c - allocate(s['scode'], c['cscode'], next_allot) - # allocation.writerow([s['scode'], s['name-address'], c['cscode'], c['name'], c['address'], next_allot, c['distance_km']]) - to_allot -= next_allot - centers_remaining_cap[c['cscode']] -= next_allot - - for c in allocated_centers.values(): - allocation_file.writerow([s['scode'], s['name-address'], c['cscode'], c['name'], c['address'], allocations[s['scode']][c['cscode']], c['distance_km']]) - - if to_allot > 0: - remaining+=to_allot - logger.warn(f"{to_allot}/{s['count']} left for {s['scode']} {s['name-address']} centers: {len(centers_for_school)}") - + stretched_capacity = math.floor(int( + center['capacity']) * STRETCH_CAPACITY_FACTOR + centers_remaining_capacity[center['cscode']]) + next_allocation = min(to_allocate, max( + stretched_capacity, MIN_STUDENT_IN_CENTER)) + if to_allocate > 0 and next_allocation > 0 and stretched_capacity >= next_allocation: + allocated_centers[center['cscode']] = center + allocate_students( + school['scode'], center['cscode'], next_allocation) + to_allocate -= next_allocation + centers_remaining_capacity[center['cscode'] + ] -= next_allocation + + for center in allocated_centers.values(): + allocation_writer.writerow([school['scode'], school['name-address'], center['cscode'], center['name'], + center['address'], allocations[school['scode']][center['cscode']], center['distance_km']]) + + if to_allocate > 0: + remaining_students += to_allocate + logger.warning(f"{to_allocate}/{school['count']} students left for { + school['scode']} {school['name-address']} centers: {len(centers_for_school)}") logger.info("Remaining capacity at each center (remaining_capacity cscode):") - logger.info(sorted([(v,k) for k, v in centers_remaining_cap.items() if v != 0])) - logger.info(f"Total remaining capacity across all centers: {sum({k:v for k, v in centers_remaining_cap.items() if v != 0}.values())}") - logger.info(f"Students not assigned: {remaining}") + logger.info( + sorted([(v, k) for k, v in centers_remaining_capacity.items() if v != 0])) + logger.info(f"Total remaining capacity across all centers: { + sum({k: v for k, v in centers_remaining_capacity.items() if v != 0}.values())}") + logger.info(f"Students not assigned: {remaining_students}")