Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Functionality to track multiple students #118

Merged
270 changes: 145 additions & 125 deletions Backend/detection.py
Original file line number Diff line number Diff line change
@@ -1,138 +1,158 @@
import time
import audio
import head_pose
import matplotlib.pyplot as plt
import cv2
import numpy as np
import logging
import time
import matplotlib.pyplot as plt
from typing import Dict, List, Tuple
from .student_tracker import MultiStudentTracker
from .face_recog import FaceRecognition

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

PLOT_LENGTH = 200

# Placeholders
GLOBAL_CHEAT = 0
PERCENTAGE_CHEAT = 0
CHEAT_THRESH = 0.6
XDATA = list(range(200))
YDATA = [0] * 200

# Global flag to check if window is open
is_running = True

def avg(current, previous):
if previous > 1:
return 0.65
if current == 0:
if previous < 0.01:
return 0.01
return previous / 1.01
if previous == 0:
return current
return 1 * previous + 0.1 * current

def process():
global GLOBAL_CHEAT, PERCENTAGE_CHEAT, CHEAT_THRESH

try:
if GLOBAL_CHEAT == 0:
if head_pose.X_AXIS_CHEAT == 0:
if head_pose.Y_AXIS_CHEAT == 0:
if audio.AUDIO_CHEAT == 0:
PERCENTAGE_CHEAT = avg(0, PERCENTAGE_CHEAT)
else:
PERCENTAGE_CHEAT = avg(0.2, PERCENTAGE_CHEAT)
class DetectionSystem:
def __init__(self):
self.face_recognition = FaceRecognition()
self.student_tracker = MultiStudentTracker()
self.active_tracking = False

# Detection parameters
self.PLOT_LENGTH = 200
self.CHEAT_THRESH = 0.6
self.is_running = True

# Initialize tracking data for each student
self.student_data = {} # Dictionary to store per-student metrics

def avg(self, current: float, previous: float) -> float:
"""Calculate weighted average of cheat probability"""
if previous > 1:
return 0.65
if current == 0:
if previous < 0.01:
return 0.01
return previous / 1.01
if previous == 0:
return current
return 1 * previous + 0.1 * current

def calculate_cheat_probability(self, student_metrics: Dict) -> float:
"""
Calculate cheat probability based on multiple metrics
"""
pose_data = student_metrics.get('pose_data', {})
audio_cheat = student_metrics.get('audio_cheat', 0)
previous_cheat = student_metrics.get('previous_cheat', 0)

x_axis_cheat = int(not pose_data.get('looking_straight', True))
y_axis_cheat = int(pose_data.get('movement_detected', False))

# Complex decision tree for cheat probability
base_probability = 0

if x_axis_cheat == 0:
if y_axis_cheat == 0:
if audio_cheat == 0:
base_probability = 0
else:
if audio.AUDIO_CHEAT == 0:
PERCENTAGE_CHEAT = avg(0.2, PERCENTAGE_CHEAT)
else:
PERCENTAGE_CHEAT = avg(0.4, PERCENTAGE_CHEAT)
base_probability = 0.2
else:
if head_pose.Y_AXIS_CHEAT == 0:
if audio.AUDIO_CHEAT == 0:
PERCENTAGE_CHEAT = avg(0.1, PERCENTAGE_CHEAT)
else:
PERCENTAGE_CHEAT = avg(0.4, PERCENTAGE_CHEAT)
if audio_cheat == 0:
base_probability = 0.2
else:
if audio.AUDIO_CHEAT == 0:
PERCENTAGE_CHEAT = avg(0.15, PERCENTAGE_CHEAT)
else:
PERCENTAGE_CHEAT = avg(0.25, PERCENTAGE_CHEAT)
base_probability = 0.4
else:
if head_pose.X_AXIS_CHEAT == 0:
if head_pose.Y_AXIS_CHEAT == 0:
if audio.AUDIO_CHEAT == 0:
PERCENTAGE_CHEAT = avg(0, PERCENTAGE_CHEAT)
else:
PERCENTAGE_CHEAT = avg(0.55, PERCENTAGE_CHEAT)
if y_axis_cheat == 0:
if audio_cheat == 0:
base_probability = 0.1
else:
if audio.AUDIO_CHEAT == 0:
PERCENTAGE_CHEAT = avg(0.55, PERCENTAGE_CHEAT)
else:
PERCENTAGE_CHEAT = avg(0.85, PERCENTAGE_CHEAT)
base_probability = 0.4
else:
if head_pose.Y_AXIS_CHEAT == 0:
if audio.AUDIO_CHEAT == 0:
PERCENTAGE_CHEAT = avg(0.6, PERCENTAGE_CHEAT)
else:
PERCENTAGE_CHEAT = avg(0.85, PERCENTAGE_CHEAT)
if audio_cheat == 0:
base_probability = 0.15
else:
if audio.AUDIO_CHEAT == 0:
PERCENTAGE_CHEAT = avg(0.5, PERCENTAGE_CHEAT)
else:
PERCENTAGE_CHEAT = avg(0.85, PERCENTAGE_CHEAT)

if PERCENTAGE_CHEAT > CHEAT_THRESH:
GLOBAL_CHEAT = 1
print("CHEATING")
else:
GLOBAL_CHEAT = 0
print("Cheat percent: ", PERCENTAGE_CHEAT, GLOBAL_CHEAT)

except Exception as e:
logging.error(f"Error in process: {e}")
print("An error occurred during processing. Please check the logs.")

def on_close(event):
global is_running
is_running = False
# Set flag to False when the window is closed

def run_detection():
global XDATA, YDATA, is_running

try:
fig, axes = plt.subplots()

axes.set_xlim(0, 200)
axes.set_ylim(0, 1)
line, = axes.plot(XDATA, YDATA, 'r-')
plt.title("Suspicious Behaviour Detection")
plt.xlabel("Time")
plt.ylabel("Cheat Probability")

# Connect the close event to the callback
fig.canvas.mpl_connect('close_event', on_close)

while is_running:
YDATA.pop(0)
YDATA.append(PERCENTAGE_CHEAT)
line.set_xdata(XDATA)
line.set_ydata(YDATA)
plt.draw()
plt.pause(1e-17)
process()
time.sleep(1 / 5)

plt.close(fig)

except Exception as e:
logging.error(f"Error in run_detection: {e}")
print("An error occurred while running the detection. Please check the logs.")

if __name__ == "__main__":
try:
run_detection()
except KeyboardInterrupt:
logging.info("Detection interrupted by user.")
print("Terminated detection.")
base_probability = 0.25

return self.avg(base_probability, previous_cheat)

def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
"""
Process a single frame with face recognition and student tracking
"""
if frame is None:
return None, []

try:
# Perform face recognition
recognized_frame = self.face_recognition.recognize_faces(frame)

# Perform student tracking
tracked_frame, new_student_data = self.student_tracker.detect_and_track_students(recognized_frame)

# Update student metrics
timestamp = time.time()
for student in new_student_data:
student_id = student['bbox'] # Use bbox as temporary ID

if student_id not in self.student_data:
# Initialize new student data
self.student_data[student_id] = {
'cheat_history': [0] * self.PLOT_LENGTH,
'previous_cheat': 0,
'global_cheat': 0,
'audio_cheat': 0 # Placeholder for audio detection
}

# Calculate cheat probability
cheat_prob = self.calculate_cheat_probability({
'pose_data': student['pose_data'],
'audio_cheat': self.student_data[student_id]['audio_cheat'],
'previous_cheat': self.student_data[student_id]['previous_cheat']
})

# Update student metrics
self.student_data[student_id]['previous_cheat'] = cheat_prob
self.student_data[student_id]['cheat_history'].pop(0)
self.student_data[student_id]['cheat_history'].append(cheat_prob)

# Update global cheat flag
if cheat_prob > self.CHEAT_THRESH:
self.student_data[student_id]['global_cheat'] = 1
logging.info(f"Cheating detected for student at {student['bbox']}")
else:
self.student_data[student_id]['global_cheat'] = 0

# Add metrics to student data
student['cheat_probability'] = cheat_prob
student['global_cheat'] = self.student_data[student_id]['global_cheat']
student['timestamp'] = timestamp

return tracked_frame, new_student_data

except Exception as e:
logging.error(f"Error in process_frame: {e}")
return frame, []

def start_tracking(self):
"""Enable student tracking"""
self.active_tracking = True
self.is_running = True

def stop_tracking(self):
"""Disable student tracking"""
self.active_tracking = False
self.is_running = False

def get_student_plot_data(self, student_id):
"""Get plotting data for a specific student"""
if student_id in self.student_data:
return (
list(range(self.PLOT_LENGTH)),
self.student_data[student_id]['cheat_history']
)
return None, None

def cleanup(self):
"""Cleanup resources"""
self.stop_tracking()
plt.close('all')
3 changes: 2 additions & 1 deletion Backend/proctor_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,5 @@ def not_found(e):

@proctor_api.errorhandler(500)
def internal_error(e):
return jsonify({'error': 'Internal server error'}), 500
return jsonify({'error': 'Internal server error'}), 500

88 changes: 88 additions & 0 deletions Backend/student_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import cv2
import mediapipe as mp
import numpy as np
from typing import List, Dict, Tuple

class MultiStudentTracker:
def __init__(self):
self.mp_face_detection = mp.solutions.face_detection
self.face_detection = self.mp_face_detection.FaceDetection(
min_detection_confidence=0.7
)
self.suspicious_behaviors = {
'looking_away': 0,
'rapid_movement': 0,
'out_of_frame': 0
}

def detect_and_track_students(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
"""
Detect and track multiple students in the frame
Returns annotated frame and list of student tracking data
"""
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.face_detection.process(rgb_frame)
student_data = []

if results.detections:
for detection in results.detections:
bbox = detection.location_data.relative_bounding_box
h, w, _ = frame.shape

x = int(bbox.xmin * w)
y = int(bbox.ymin * h)
width = int(bbox.width * w)
height = int(bbox.height * h)

# Track head pose and movement
pose_data = self.analyze_head_pose(frame[y:y+height, x:x+width])

# Calculate suspicion metrics
suspicion_score = self.calculate_suspicion_score(pose_data)

student_info = {
'bbox': (x, y, width, height),
'pose_data': pose_data,
'suspicion_score': suspicion_score,
'timestamp': cv2.getTickCount() / cv2.getTickFrequency()
}
student_data.append(student_info)

# Draw bounding box and suspicion score
color = self.get_alert_color(suspicion_score)
cv2.rectangle(frame, (x, y), (x + width, y + height), color, 2)
cv2.putText(frame, f"Score: {suspicion_score:.2f}",
(x, y - 10), cv2.FONT_HERSHEY_SIMPLEX,
0.5, color, 2)

return frame, student_data

def analyze_head_pose(self, face_region: np.ndarray) -> Dict:
"""
Analyze head pose and movement patterns
"""
return {
'looking_straight': True,
'movement_detected': False
}

def calculate_suspicion_score(self, pose_data: Dict) -> float:
"""
Calculate suspicion score based on pose analysis
"""
score = 0.0
if not pose_data['looking_straight']:
score += 0.3
if pose_data['movement_detected']:
score += 0.2
return min(score, 1.0)

def get_alert_color(self, score: float) -> Tuple[int, int, int]:
"""
Return color based on suspicion score
"""
if score < 0.3:
return (0, 255, 0) # Green
elif score < 0.7:
return (0, 255, 255) # Yellow
return (0, 0, 255) # Red
Loading
Loading