Skip to content
This repository has been archived by the owner on Dec 8, 2024. It is now read-only.

Commit

Permalink
Merge pull request #90 from LimaoC/Obliteration
Browse files Browse the repository at this point in the history
Obliteration
  • Loading branch information
avg-lebesgue-enjoyer authored Oct 11, 2024
2 parents 53d17c2 + e8abcc6 commit 1144771
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 102 deletions.
45 changes: 6 additions & 39 deletions client/drivers/data_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,6 @@ def make_failed(cls) -> "ControlledData":

# SECTION: Getters/Setters

def DEBUG_get_next_posture_graph_value(self) -> int:
"""
Returns next thing to put on the DEBUG graph.
TODO: Remove this method
"""
return_me = self._DEBUG_current_graph_function(
self._DEBUG_current_graph_list_index
)
self._DEBUG_current_graph_list_index += 1
return return_me

def is_failed(self) -> bool:
"""
Returns True iff this ControlledData is failed.
Expand Down Expand Up @@ -183,31 +171,6 @@ def accept_new_posture_data(
for datum in posture_data:
self._posture_data.put_nowait(datum)

# SECTION: Posture data mapping

def get_cushion_posture_data(
self,
) -> "CUSHION_POSTURE_DATA": # TODO: Decide what this type looks like
"""
Returns posture data necessary for cushion feedback.
TODO: Implement this.
"""
print("<!> WARNING: get_cushion_posture_data() not implemented!")
return None

def get_plant_posture_data(
self,
) -> "PLANT_POSTURE_DATA": # TODO: Decide what this type looks like
"""
Returns posture data necessary for plant feedback.
TODO: Implement this.
"""
print("<!> WARNING: get_plant_posture_data() not implemented!")
return None


class HardwareComponents:
"""
Hardware components packaged together into a class.
Expand Down Expand Up @@ -467,15 +430,19 @@ def oled_display_texts(self, texts: List[str], x: int, y: int, colour: int) -> i
)
return display_height_offset

def send_message(self, message: str, message_time: int = 1) -> None:
def send_message(self, messages: List[str], message_time: int = 1) -> None:
"""Clear the screen and display message
Args:
message: Message to send to the user
message_time: Time (seconds) to sleep for after displaying message.
"""
self.display.fill(0)
self.oled_display_text(message, 0, 0, 1)
display_height_offset = 0
for text in messages:
display_height_offset = self.oled_display_text(
text, 0, 0 + display_height_offset, 1
)
self.display.show()
time.sleep(message_time)

Expand Down
32 changes: 16 additions & 16 deletions client/drivers/login_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
Status.NO_MATCH.value: "Could not match face",
Status.ALREADY_REGISTERED.value: "Face already registered",
}
QUIT_INSTRUCTIONS = "Right button to quit"
QUIT_INSTRUCTIONS = "Right: quit"

Action = Callable[[HardwareComponents], int]

Expand All @@ -44,9 +44,9 @@ def handle_authentication(hardware: HardwareComponents) -> int:
while True:
_log_and_send(
hardware,
"Left button to login\n"
"Right button to register\n"
"Double press right button to reset data",
["Left: login",
"Right: register",
"Double press right: reset data"]
)
button = hardware.wait_for_button_press()

Expand Down Expand Up @@ -82,8 +82,8 @@ def _loop_action(hardware: HardwareComponents, action: Action) -> int:

def _attempt_login(hardware: HardwareComponents) -> int:
capturer = RaspCapturer()
message = f"Press left button to take photo\n{QUIT_INSTRUCTIONS}"
_log_and_send(hardware, message, message_time=0)
messages = ["Left: take photo", f"{QUIT_INSTRUCTIONS}"]
_log_and_send(hardware, messages, message_time=0)

button_pressed = hardware.wait_for_button_press()
if button_pressed == LEFT_BUTTON:
Expand All @@ -92,7 +92,7 @@ def _attempt_login(hardware: HardwareComponents) -> int:
if button_pressed == RIGHT_BUTTON:
return QUIT

_log_and_send(hardware, "Trying login...", message_time=0)
_log_and_send(hardware, ["Trying login..."], message_time=0)
status = get_face_match(face)
_handle_status_message(hardware, status)

Expand All @@ -105,11 +105,11 @@ def _attempt_register(hardware: HardwareComponents) -> int:
# Capture NUM_FACES faces
faces: list[np.ndarray] = []
for i in range(NUM_FACES):
message = (
f"Press left button to take photo {i + 1}/{NUM_FACES}\n"
messages = [
f"Left: take photo {i + 1}/{NUM_FACES}",
f"{QUIT_INSTRUCTIONS}"
)
_log_and_send(hardware, message, message_time=0)
]
_log_and_send(hardware, messages, message_time=0)

button_pressed = hardware.wait_for_button_press()
if button_pressed == RIGHT_BUTTON:
Expand All @@ -120,13 +120,13 @@ def _attempt_register(hardware: HardwareComponents) -> int:
faces.append(frame)

# Try register faces
_log_and_send(hardware, "Registering...")
_log_and_send(hardware, ["Registering..."])
user_id = next_user_id()
status = register_faces(user_id, faces)

if status == Status.OK.value:
create_user()
_log_and_send(hardware, "Registration successful!")
_log_and_send(hardware, ["Registration successful!"])
return user_id

_handle_status_message(hardware, status)
Expand All @@ -144,7 +144,7 @@ def _handle_status_message(hardware: HardwareComponents, status: int) -> None:


def _log_and_send(
hardware: HardwareComponents, message: str, message_time: int = 1
hardware: HardwareComponents, messages: list[str], message_time: int = 1
) -> None:
logger.debug(message)
hardware.send_message(message, message_time=message_time)
logger.debug(messages)
hardware.send_message(messages, message_time=message_time)
72 changes: 25 additions & 47 deletions client/drivers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
#: the plant will move down.
#: FIXME: Fine-tune this value later.
PLANT_PROPORTION_GOOD_THRESHOLD = 0.5
"""
Threshold for I. Jensen Plant Mover 10000 feedback. If the proportion of "good" sitting posture is below this,
the plant will move down.
FIXME: Fine-tune this value later.
"""

#: DEBUG Number of milliseconds between each loop iteration in do_everything().
DEBUG_DO_EVERYTHING_INTERVAL = 1000
Expand Down Expand Up @@ -169,8 +174,6 @@ def do_everything(auspost: ControlledData) -> None:
Requires:
! auspost.is_failed()
"""
print("<!> BEGIN do_everything()")

LOGIN_MESSAGE = "Logged in with user id: " + str(auspost.get_user_id())
LOGOUT_MESSAGE = "Logged out user id " + str(auspost.get_user_id())

Expand All @@ -197,7 +200,6 @@ def do_everything(auspost: ControlledData) -> None:
hardware.display.show()

while True:
# Loop invariant: ! auspost.is_failed()
# Check for user logout
if hardware.button0.was_pressed:
hardware.display.fill(0)
Expand All @@ -208,8 +210,7 @@ def do_everything(auspost: ControlledData) -> None:
return

update_display_screen(auspost)
# handle_posture_monitoring(auspost)
handle_posture_monitoring_new(auspost)
handle_posture_graph(auspost)
handle_feedback(auspost)

sleep_ms(DEBUG_DO_EVERYTHING_INTERVAL)
Expand All @@ -235,8 +236,6 @@ def update_display_screen(auspost: ControlledData) -> bool:
Ensures:
! auspost.is_failed()
"""
print("<!> BEGIN update_display_screen()")

while (
not auspost.get_posture_data().empty()
): # NOTE: This is much more robust than getting a fixed number of things out of the queue
Expand All @@ -249,14 +248,25 @@ def update_display_screen(auspost: ControlledData) -> bool:
)
hardware.display.show()

print("<!> END update_display_screen()")
return True


def handle_posture_monitoring_new(auspost: ControlledData) -> bool:
def handle_posture_graph(auspost: ControlledData) -> bool:
"""
Get a snapshot of the user's posture data.
Use this information to update the data for the posture graph.
print("<!> handle_posture_monitoring_new()")
Args:
(auspost : ControlledData): Data encapsulating the current state of the program.
Returns:
(bool): True, always. If you get a False return value, then something has gone VERY wrong.
Requires:
! auspost.is_failed()
Ensures:
! auspost.is_failed()
TODO: Check this
"""
now = datetime.now()

if now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT:
Expand All @@ -268,14 +278,12 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool:
period_end=now,
)

# Exit if not enough data
# if len(recent_posture_data) <= POSTURE_GRAPH_DATUM_WIDTH:
# Exit if no data
if len(recent_posture_data) == 0:
print("<!> Exiting handle_posture_monitoring_new() early: Not enough data")
# auspost.set_last_snapshot_time(datetime.now())
return True

# Exit if not in frame enough
# Exit if person not in frame enough
average_prop_in_frame = sum(
[posture.prop_in_frame for posture in recent_posture_data]
) / len(recent_posture_data)
Expand All @@ -301,11 +309,12 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool:
# Calculate the interval length
interval = total_time / POSTURE_GRAPH_DATUM_WIDTH

# Setup a sublist each representing 1 pixel on the graph
# Setup sublists, where each sublist is a portion of the overall data
split_posture_lists: list[list[Posture]]
split_posture_lists = [[] for _ in range(POSTURE_GRAPH_DATUM_WIDTH)]

# Sublists will be split by period_start
# What is in each sublist is determined by period_start
# We want an approximately equal amount of data in each sublist
for posture in recent_posture_data:
index = min(
POSTURE_GRAPH_DATUM_WIDTH - 1,
Expand All @@ -322,44 +331,13 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool:
average_prop_good = sum(
[posture.prop_good for posture in posture_list]
) / len(posture_list)
# KILLME:
# print(f"<!> {average_prop_good=}")
# auspost.accept_new_posture_data([average_prop_good] * DEBUG_MULTIPLIER_CONSTANT) # DEBUG: 2024-10-06_20-16 Gabe: Fixed the typing by wrapping into a singleton list
# print(f"<!> Avg prop_good is {average_prop_good}")
new_prop_good_data += [average_prop_good] * POSTURE_GRAPH_DATUM_WIDTH
auspost.accept_new_posture_data(new_prop_good_data)

auspost.set_last_snapshot_time(now)

return True


# SECTION: Feedback handling
def handle_posture_monitoring(auspost: ControlledData) -> bool:
"""
Take a snapshot monitoring the user, and update the given ControlledData if necessary.
Args:
auspost: Data encapsulating the current state of the program.
Returns:
(bool): True, always. If you get a False return value, then something has gone VERY wrong.
Requires:
! auspost.is_failed()
Ensures:
! auspost.is_failed()
"""
print("<!> handle_posture_monitoring()")
now = datetime.now()
if now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT:
auspost.accept_new_posture_data([])
auspost.accept_new_posture_data([auspost.DEBUG_get_next_posture_graph_value()])
auspost.set_last_snapshot_time(now)
return True


def handle_feedback(auspost: ControlledData) -> bool:
"""
Provide feedback to the user if necessary.
Expand Down

0 comments on commit 1144771

Please sign in to comment.