Skip to content

Commit

Permalink
Functionality to track multiple students (#118)
Browse files Browse the repository at this point in the history
* Update repo structure

* added the whole frontend

* Update repo structure

* Adding object detection model

* Update repo structure

* Update repo structure

* Update repo structure

* Update repo structure

* Added the functionality to track multiple students

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Tanisha Lalwani <[email protected]>
  • Loading branch information
3 people authored Nov 11, 2024
1 parent 01c0fad commit 43241d2
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 130 deletions.
270 changes: 145 additions & 125 deletions Backend/detection.py
Original file line number Diff line number Diff line change
@@ -1,138 +1,158 @@
import time
import audio
import head_pose
import matplotlib.pyplot as plt
import cv2
import numpy as np
import logging
import time
import matplotlib.pyplot as plt
from typing import Dict, List, Tuple
from .student_tracker import MultiStudentTracker
from .face_recog import FaceRecognition

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

PLOT_LENGTH = 200

# Placeholders
GLOBAL_CHEAT = 0
PERCENTAGE_CHEAT = 0
CHEAT_THRESH = 0.6
XDATA = list(range(200))
YDATA = [0] * 200

# Global flag to check if window is open
is_running = True

def avg(current, previous):
if previous > 1:
return 0.65
if current == 0:
if previous < 0.01:
return 0.01
return previous / 1.01
if previous == 0:
return current
return 1 * previous + 0.1 * current

def process():
global GLOBAL_CHEAT, PERCENTAGE_CHEAT, CHEAT_THRESH

try:
if GLOBAL_CHEAT == 0:
if head_pose.X_AXIS_CHEAT == 0:
if head_pose.Y_AXIS_CHEAT == 0:
if audio.AUDIO_CHEAT == 0:
PERCENTAGE_CHEAT = avg(0, PERCENTAGE_CHEAT)
else:
PERCENTAGE_CHEAT = avg(0.2, PERCENTAGE_CHEAT)
class DetectionSystem:
def __init__(self):
self.face_recognition = FaceRecognition()
self.student_tracker = MultiStudentTracker()
self.active_tracking = False

# Detection parameters
self.PLOT_LENGTH = 200
self.CHEAT_THRESH = 0.6
self.is_running = True

# Initialize tracking data for each student
self.student_data = {} # Dictionary to store per-student metrics

def avg(self, current: float, previous: float) -> float:
"""Calculate weighted average of cheat probability"""
if previous > 1:
return 0.65
if current == 0:
if previous < 0.01:
return 0.01
return previous / 1.01
if previous == 0:
return current
return 1 * previous + 0.1 * current

def calculate_cheat_probability(self, student_metrics: Dict) -> float:
"""
Calculate cheat probability based on multiple metrics
"""
pose_data = student_metrics.get('pose_data', {})
audio_cheat = student_metrics.get('audio_cheat', 0)
previous_cheat = student_metrics.get('previous_cheat', 0)

x_axis_cheat = int(not pose_data.get('looking_straight', True))
y_axis_cheat = int(pose_data.get('movement_detected', False))

# Complex decision tree for cheat probability
base_probability = 0

if x_axis_cheat == 0:
if y_axis_cheat == 0:
if audio_cheat == 0:
base_probability = 0
else:
if audio.AUDIO_CHEAT == 0:
PERCENTAGE_CHEAT = avg(0.2, PERCENTAGE_CHEAT)
else:
PERCENTAGE_CHEAT = avg(0.4, PERCENTAGE_CHEAT)
base_probability = 0.2
else:
if head_pose.Y_AXIS_CHEAT == 0:
if audio.AUDIO_CHEAT == 0:
PERCENTAGE_CHEAT = avg(0.1, PERCENTAGE_CHEAT)
else:
PERCENTAGE_CHEAT = avg(0.4, PERCENTAGE_CHEAT)
if audio_cheat == 0:
base_probability = 0.2
else:
if audio.AUDIO_CHEAT == 0:
PERCENTAGE_CHEAT = avg(0.15, PERCENTAGE_CHEAT)
else:
PERCENTAGE_CHEAT = avg(0.25, PERCENTAGE_CHEAT)
base_probability = 0.4
else:
if head_pose.X_AXIS_CHEAT == 0:
if head_pose.Y_AXIS_CHEAT == 0:
if audio.AUDIO_CHEAT == 0:
PERCENTAGE_CHEAT = avg(0, PERCENTAGE_CHEAT)
else:
PERCENTAGE_CHEAT = avg(0.55, PERCENTAGE_CHEAT)
if y_axis_cheat == 0:
if audio_cheat == 0:
base_probability = 0.1
else:
if audio.AUDIO_CHEAT == 0:
PERCENTAGE_CHEAT = avg(0.55, PERCENTAGE_CHEAT)
else:
PERCENTAGE_CHEAT = avg(0.85, PERCENTAGE_CHEAT)
base_probability = 0.4
else:
if head_pose.Y_AXIS_CHEAT == 0:
if audio.AUDIO_CHEAT == 0:
PERCENTAGE_CHEAT = avg(0.6, PERCENTAGE_CHEAT)
else:
PERCENTAGE_CHEAT = avg(0.85, PERCENTAGE_CHEAT)
if audio_cheat == 0:
base_probability = 0.15
else:
if audio.AUDIO_CHEAT == 0:
PERCENTAGE_CHEAT = avg(0.5, PERCENTAGE_CHEAT)
else:
PERCENTAGE_CHEAT = avg(0.85, PERCENTAGE_CHEAT)

if PERCENTAGE_CHEAT > CHEAT_THRESH:
GLOBAL_CHEAT = 1
print("CHEATING")
else:
GLOBAL_CHEAT = 0
print("Cheat percent: ", PERCENTAGE_CHEAT, GLOBAL_CHEAT)

except Exception as e:
logging.error(f"Error in process: {e}")
print("An error occurred during processing. Please check the logs.")

def on_close(event):
global is_running
is_running = False
# Set flag to False when the window is closed

def run_detection():
global XDATA, YDATA, is_running

try:
fig, axes = plt.subplots()

axes.set_xlim(0, 200)
axes.set_ylim(0, 1)
line, = axes.plot(XDATA, YDATA, 'r-')
plt.title("Suspicious Behaviour Detection")
plt.xlabel("Time")
plt.ylabel("Cheat Probability")

# Connect the close event to the callback
fig.canvas.mpl_connect('close_event', on_close)

while is_running:
YDATA.pop(0)
YDATA.append(PERCENTAGE_CHEAT)
line.set_xdata(XDATA)
line.set_ydata(YDATA)
plt.draw()
plt.pause(1e-17)
process()
time.sleep(1 / 5)

plt.close(fig)

except Exception as e:
logging.error(f"Error in run_detection: {e}")
print("An error occurred while running the detection. Please check the logs.")

if __name__ == "__main__":
try:
run_detection()
except KeyboardInterrupt:
logging.info("Detection interrupted by user.")
print("Terminated detection.")
base_probability = 0.25

return self.avg(base_probability, previous_cheat)

def process_frame(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
"""
Process a single frame with face recognition and student tracking
"""
if frame is None:
return None, []

try:
# Perform face recognition
recognized_frame = self.face_recognition.recognize_faces(frame)

# Perform student tracking
tracked_frame, new_student_data = self.student_tracker.detect_and_track_students(recognized_frame)

# Update student metrics
timestamp = time.time()
for student in new_student_data:
student_id = student['bbox'] # Use bbox as temporary ID

if student_id not in self.student_data:
# Initialize new student data
self.student_data[student_id] = {
'cheat_history': [0] * self.PLOT_LENGTH,
'previous_cheat': 0,
'global_cheat': 0,
'audio_cheat': 0 # Placeholder for audio detection
}

# Calculate cheat probability
cheat_prob = self.calculate_cheat_probability({
'pose_data': student['pose_data'],
'audio_cheat': self.student_data[student_id]['audio_cheat'],
'previous_cheat': self.student_data[student_id]['previous_cheat']
})

# Update student metrics
self.student_data[student_id]['previous_cheat'] = cheat_prob
self.student_data[student_id]['cheat_history'].pop(0)
self.student_data[student_id]['cheat_history'].append(cheat_prob)

# Update global cheat flag
if cheat_prob > self.CHEAT_THRESH:
self.student_data[student_id]['global_cheat'] = 1
logging.info(f"Cheating detected for student at {student['bbox']}")
else:
self.student_data[student_id]['global_cheat'] = 0

# Add metrics to student data
student['cheat_probability'] = cheat_prob
student['global_cheat'] = self.student_data[student_id]['global_cheat']
student['timestamp'] = timestamp

return tracked_frame, new_student_data

except Exception as e:
logging.error(f"Error in process_frame: {e}")
return frame, []

def start_tracking(self):
"""Enable student tracking"""
self.active_tracking = True
self.is_running = True

def stop_tracking(self):
"""Disable student tracking"""
self.active_tracking = False
self.is_running = False

def get_student_plot_data(self, student_id):
"""Get plotting data for a specific student"""
if student_id in self.student_data:
return (
list(range(self.PLOT_LENGTH)),
self.student_data[student_id]['cheat_history']
)
return None, None

def cleanup(self):
"""Cleanup resources"""
self.stop_tracking()
plt.close('all')
3 changes: 2 additions & 1 deletion Backend/proctor_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,5 @@ def not_found(e):

@proctor_api.errorhandler(500)
def internal_error(e):
return jsonify({'error': 'Internal server error'}), 500
return jsonify({'error': 'Internal server error'}), 500

88 changes: 88 additions & 0 deletions Backend/student_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import cv2
import mediapipe as mp
import numpy as np
from typing import List, Dict, Tuple

class MultiStudentTracker:
def __init__(self):
self.mp_face_detection = mp.solutions.face_detection
self.face_detection = self.mp_face_detection.FaceDetection(
min_detection_confidence=0.7
)
self.suspicious_behaviors = {
'looking_away': 0,
'rapid_movement': 0,
'out_of_frame': 0
}

def detect_and_track_students(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Dict]]:
"""
Detect and track multiple students in the frame
Returns annotated frame and list of student tracking data
"""
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.face_detection.process(rgb_frame)
student_data = []

if results.detections:
for detection in results.detections:
bbox = detection.location_data.relative_bounding_box
h, w, _ = frame.shape

x = int(bbox.xmin * w)
y = int(bbox.ymin * h)
width = int(bbox.width * w)
height = int(bbox.height * h)

# Track head pose and movement
pose_data = self.analyze_head_pose(frame[y:y+height, x:x+width])

# Calculate suspicion metrics
suspicion_score = self.calculate_suspicion_score(pose_data)

student_info = {
'bbox': (x, y, width, height),
'pose_data': pose_data,
'suspicion_score': suspicion_score,
'timestamp': cv2.getTickCount() / cv2.getTickFrequency()
}
student_data.append(student_info)

# Draw bounding box and suspicion score
color = self.get_alert_color(suspicion_score)
cv2.rectangle(frame, (x, y), (x + width, y + height), color, 2)
cv2.putText(frame, f"Score: {suspicion_score:.2f}",
(x, y - 10), cv2.FONT_HERSHEY_SIMPLEX,
0.5, color, 2)

return frame, student_data

def analyze_head_pose(self, face_region: np.ndarray) -> Dict:
"""
Analyze head pose and movement patterns
"""
return {
'looking_straight': True,
'movement_detected': False
}

def calculate_suspicion_score(self, pose_data: Dict) -> float:
"""
Calculate suspicion score based on pose analysis
"""
score = 0.0
if not pose_data['looking_straight']:
score += 0.3
if pose_data['movement_detected']:
score += 0.2
return min(score, 1.0)

def get_alert_color(self, score: float) -> Tuple[int, int, int]:
"""
Return color based on suspicion score
"""
if score < 0.3:
return (0, 255, 0) # Green
elif score < 0.7:
return (0, 255, 255) # Yellow
return (0, 0, 255) # Red
Loading

0 comments on commit 43241d2

Please sign in to comment.