Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Human Detection Migration to Superthin Template #2689

Open
wants to merge 4 commits into
base: humble-devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os.path
from typing import Callable

from src.manager.libs.applications.compatibility.exercise_wrapper_ros2 import CompatibilityExerciseWrapperRos2


class Exercise(CompatibilityExerciseWrapperRos2):
def __init__(self, circuit: str, update_callback: Callable):
current_path = os.path.dirname(__file__)

super(Exercise, self).__init__(exercise_command=f"{current_path}/../../python_template/ros2_humble/exercise.py 0.0.0.0",
gui_command=f"{current_path}/../../python_template/ros2_humble/gui.py 0.0.0.0 {circuit}",
update_callback=update_callback)
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import json
import os
import rclpy
import cv2
import sys
import base64
import threading
import time
import numpy as np
from datetime import datetime
import websocket
import subprocess
import logging

from hal_interfaces.general.odometry import OdometryNode
from console_interfaces.general.console import start_console

# Graphical User Interface Class
class GUI:
# Initialization function
# The actual initialization
def __init__(self, host):

self.payload = {'image': '', 'shape': []}

# ROS2 init
if not rclpy.ok():
rclpy.init(args=None)


# Image variables
self.image_to_be_shown = None
self.image_to_be_shown_updated = False
self.image_show_lock = threading.Lock()
self.host = host
self.client = None



self.ack = False
self.ack_lock = threading.Lock()

# Create the lap object
# TODO: maybe move this to HAL and have it be hybrid


self.client_thread = threading.Thread(target=self.run_websocket)
self.client_thread.start()

def run_websocket(self):
while True:
print("GUI WEBSOCKET CONNECTED")
self.client = websocket.WebSocketApp(self.host, on_message=self.on_message)
self.client.run_forever(ping_timeout=None, ping_interval=0)

# Function to prepare image payload
# Encodes the image as a JSON string and sends through the WS
def payloadImage(self):
with self.image_show_lock:
image_to_be_shown_updated = self.image_to_be_shown_updated
image_to_be_shown = self.image_to_be_shown

image = image_to_be_shown
payload = {'image': '', 'shape': ''}

if not image_to_be_shown_updated:
return payload

shape = image.shape
frame = cv2.imencode('.JPEG', image)[1]
encoded_image = base64.b64encode(frame)

payload['image'] = encoded_image.decode('utf-8')
payload['shape'] = shape
with self.image_show_lock:
self.image_to_be_shown_updated = False

return payload

# Function for student to call
def showImage(self, image):
with self.image_show_lock:
self.image_to_be_shown = image
self.image_to_be_shown_updated = True

# Update the gui
def update_gui(self):
# print("GUI update")
# Payload Image Message
payload = self.payloadImage()
self.payload["image"] = json.dumps(payload)


message = json.dumps(self.payload)
if self.client:
try:
self.client.send(message)
# print(message)
except Exception as e:
print(f"Error sending message: {e}")

def on_message(self, ws, message):
"""Handles incoming messages from the websocket client."""
if message.startswith("#ack"):
# print("on message" + str(message))
self.set_acknowledge(True)

def get_acknowledge(self):
"""Gets the acknowledge status."""
with self.ack_lock:
ack = self.ack

return ack

def set_acknowledge(self, value):
"""Sets the acknowledge status."""
with self.ack_lock:
self.ack = value


class ThreadGUI:
"""Class to manage GUI updates and frequency measurements in separate threads."""

def __init__(self, gui):
"""Initializes the ThreadGUI with a reference to the GUI instance."""
self.gui = gui
self.ideal_cycle = 80
self.real_time_factor = 0
self.frequency_message = {'brain': '', 'gui': ''}
self.iteration_counter = 0
self.running = True

def start(self):
"""Starts the GUI, frequency measurement, and real-time factor threads."""
self.frequency_thread = threading.Thread(target=self.measure_and_send_frequency)
self.gui_thread = threading.Thread(target=self.run)
self.frequency_thread.start()
self.gui_thread.start()
print("GUI Thread Started!")

def measure_and_send_frequency(self):
"""Measures and sends the frequency of GUI updates and brain cycles."""
previous_time = datetime.now()
while self.running:
time.sleep(2)

current_time = datetime.now()
dt = current_time - previous_time
ms = (dt.days * 24 * 60 * 60 + dt.seconds) * 1000 + dt.microseconds / 1000.0
previous_time = current_time
measured_cycle = ms / self.iteration_counter if self.iteration_counter > 0 else 0
self.iteration_counter = 0
brain_frequency = round(1000 / measured_cycle, 1) if measured_cycle != 0 else 0
gui_frequency = round(1000 / self.ideal_cycle, 1)
self.frequency_message = {'brain': brain_frequency, 'gui': gui_frequency}
message = json.dumps(self.frequency_message)
if self.gui.client:
try:
self.gui.client.send(message)
except Exception as e:
print(f"Error sending frequency message: {e}")

def run(self):
"""Main loop to update the GUI at regular intervals."""
while self.running:
start_time = datetime.now()

self.gui.update_gui()
self.iteration_counter += 1
finish_time = datetime.now()

dt = finish_time - start_time
ms = (dt.days * 24 * 60 * 60 + dt.seconds) * 1000 + dt.microseconds / 1000.0
sleep_time = max(0, (50 - ms) / 1000.0)
time.sleep(sleep_time)


# Create a GUI interface
host = "ws://127.0.0.1:2303"
gui_interface = GUI(host)

start_console()

# Spin a thread to keep the interface updated
thread_gui = ThreadGUI(gui_interface)
thread_gui.start()

def showImage(image):
gui_interface.showImage(image)

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import threading
import cv2

current_frame = None # Global variable to store the frame

class WebcamSubscriber(Node):
MihirGore23 marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self):
super().__init__('webcam_subscriber')
self.subscription = self.create_subscription(
Image,
'/image_raw',
self.listener_callback,
10)
self.subscription # prevent unused variable warning
self.bridge = CvBridge()

def listener_callback(self, msg):
global current_frame
self.get_logger().info('Receiving video frame')
current_frame = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')

def run_webcam_node():

webcam_subscriber = WebcamSubscriber()

rclpy.spin(webcam_subscriber)
webcam_subscriber.destroy_node()


# Start the ROS2 node in a separate thread
thread = threading.Thread(target=run_webcam_node)
thread.start()

def getImage():
global current_frame
return current_frame

uploaded_vid_capture = cv2.VideoCapture('/workspace/code/demo_video/video.mp4')
frame_number = 0
def getVid():
# After the frame has been inferred on, the frame number gets incremented in exercise.py
uploaded_vid_capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
success, img = uploaded_vid_capture.read()
return success, img

Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import GUI
import HAL
import base64
from datetime import datetime
import os
import sys
import cv2
import numpy as np
import onnxruntime as rt

raw_dl_model = '/workspace/code/demo_model/Demo_Model.onnx'

def display_output_detection(img, detections, scores):
"""Draw box and label for the detections."""
height, width = img.shape[:2]
for i, detection in enumerate(detections):
top = int(max(0, np.floor(detection[0] * height + 0.5)))
left = int(max(0, np.floor(detection[1] * width + 0.5)))
bottom = int(min(height, np.floor(detection[2] * height + 0.5)))
right = int(min(width, np.floor(detection[3] * width + 0.5)))
cv2.rectangle(img, (left, top), (right, bottom), (0, 0, 255), 2)
cv2.putText(img, 'Human', (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 1)

def display_gt_detection(img, gt_detections):
"""Draw ground truth bounding boxes."""
for gt_detection in gt_detections:
left, top, right, bottom = map(int, gt_detection[1:5])
cv2.rectangle(img, (left, top), (right, bottom), (0, 255, 0), 2)

# Load ONNX model
sess = rt.InferenceSession(raw_dl_model)
input_layer_name = sess.get_inputs()[0].name
output_layers_names = [output.name for output in sess.get_outputs()]

while True:
# Get input webcam image
success, img = HAL.getVid()
if not success:
break

img_resized = cv2.resize(img, (300, 300))
img_data = np.reshape(
img_resized, (1, img_resized.shape[0], img_resized.shape[1], img_resized.shape[2]))

# Inference
result = sess.run(output_layers_names, {input_layer_name: img_data})
detection_boxes, detection_classes, detection_scores, num_detections = result
count = 0
detections = []
scores = []
batch_size = num_detections.shape[0]

for batch in range(0, batch_size):
for detection in range(0, int(num_detections[batch])):
c = detection_classes[batch][detection]
# Skip if not human class
if c != 1:
GUI.showImage(img)
continue

count += 1
d = detection_boxes[batch][detection]
detections.append(d)
score = detection_scores[batch][detection]
scores.append(score)

display_output_detection(img, detections, scores)
HAL.frame_number += 1

# To print FPS after each frame has been fully processed and displayed
GUI.showImage(img)


Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import GUI
import HAL
import base64
from datetime import datetime
import os
import sys
import cv2
import numpy as np
import onnxruntime as rt
raw_dl_model = '/workspace/code/demo_model/Demo_Model.onnx'

def display_output_detection(img, detections, scores):
"""Draw box and label for the detections."""
height, width = img.shape[:2]
for i, detection in enumerate(detections):
top = int(max(0, np.floor(detection[0] * height + 0.5)))
left = int(max(0, np.floor(detection[1] * width + 0.5)))
bottom = int(min(height, np.floor(detection[2] * height + 0.5)))
right = int(min(width, np.floor(detection[3] * width + 0.5)))
cv2.rectangle(img, (left, top), (right, bottom), (0, 0, 255), 2)
cv2.putText(img, 'Human', (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 1)

def display_gt_detection(img, gt_detections):
"""Draw ground truth bounding boxes."""
for gt_detection in gt_detections:
left, top, right, bottom = map(int, gt_detection[1:5])
cv2.rectangle(img, (left, top), (right, bottom), (0, 255, 0), 2)

# Load ONNX model
sess = rt.InferenceSession(raw_dl_model)
input_layer_name = sess.get_inputs()[0].name
output_layers_names = [output.name for output in sess.get_outputs()]

while True:
# Get input webcam image
img = HAL.getImage()
if img is not None:
img_resized = cv2.resize(img, (300, 300))
img_data = np.reshape(img_resized, (1, 300, 300, 3))

# Inference
result = sess.run(output_layers_names, {input_layer_name: img_data})
detection_boxes, detection_classes, detection_scores, num_detections = result

detections = []
scores = []
batch_size = num_detections.shape[0]
for batch in range(batch_size):
for detection in range(int(num_detections[batch])):
c = detection_classes[batch][detection]
# Skip if not human class
if c != 1:
GUI.showImage(img)
continue
detections.append(detection_boxes[batch][detection])
scores.append(detection_scores[batch][detection])

display_output_detection(img, detections, scores)
GUI.showImage(img)

Binary file not shown.
Binary file not shown.
Loading