Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Navigator tools documentation #1056

Merged
merged 5 commits into from
Sep 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
"""Use the DBHelper class to interface with the Database without having to deal with ROS things."""
import asyncio
import sys
import time
Expand All @@ -14,10 +13,29 @@


class DBHelper:
"""DBHelper class."""
"""
Use the DBHelper class to interface with the Database without having to deal with ROS things.

Attributes:
found (bool): checks whether the object is what the user has been looking for.
nh (axros.NodeHandle): processes database requests.
position (int): the current position of the object.
rot (): one of the dimensional values.
new_object_subscriber (): an object that is being called from the database.
ensuring_objects (bool): a variable state for the object.
ensuring_object_dep (): a list of objects being used for storage.
ensuring_object_cb (): this is considered when it is an "ensuring_objects".
looking_for (): a setter variable to which a name is being assigned.
is_found (bool): determines whether the object is found or not.
"""

def __init__(self, nh):
"""Initialize the DB helper class."""
"""
Initialize the DB helper class.

Args:
nh(NodeHandle): NodeHandle object
"""
self.found = set()
self.nh = nh
self.position = None
Expand All @@ -30,7 +48,12 @@ def __init__(self, nh):
self.is_found = False

async def init_(self, navigator=None):
"""Initialize the axros parts of the DBHelper."""
"""
Initialize the axros parts of the DBHelper.

Args:
navigator (NaviGator | None): Base NaviGator object
"""
# self._sub_database = yield self.nh.subscribe('/database/objects', PerceptionObjectArray, self.object_cb)
self._database = self.nh.get_service_client("/database/requests", ObjectDBQuery)
self.navigator = navigator
Expand All @@ -43,9 +66,24 @@ async def init_(self, navigator=None):
return self

def _odom_cb(self, odom):
"""
Sets the position and the rot values.

Args:
odom (object): Odom
"""
self.position, self.rot = nt.odometry_to_numpy(odom)[0]

async def get_object_by_id(self, my_id):
"""
Retrieves the object with the associated "my_id".

Args:
my_id (int): an "id" of the object is passed in.

Returns:
An individual object is being returned with a unique id.
"""
print(my_id)
req = ObjectDBQueryRequest()
req.name = "all"
Expand All @@ -57,7 +95,8 @@ async def begin_observing(self, cb):
"""
Get notified when a new object is added to the database.

cb : The callback that is called when a new object is added to the database
Args:
cb : The callback that is called when a new object is added to the database.
"""
self.new_object_subscriber = cb
req = ObjectDBQueryRequest()
Expand All @@ -78,6 +117,12 @@ async def begin_observing(self, cb):
self.new_object_subscriber(o)

async def get_unknown_and_low_conf(self):
"""
Gets unknown objects that are of low confidence.

Returns:
Objects that meet certain criteria.
"""
req = ObjectDBQueryRequest()
req.name = "all"
resp = await self._database(req)
Expand All @@ -91,17 +136,34 @@ async def get_unknown_and_low_conf(self):
return m

def set_looking_for(self, name):
"""
Setting to a name that is being looked for.

Args:
name (string): name of the object.
"""
self.looking_for = name

def is_found_func(self):
def is_found_func(self) -> bool:
"""
Determines whether the object is being found or not.

Returns:
The existence of the object is returned.
"""
if self.is_found:
self.looking_for = None
self.is_found = False
return True
return False

def object_cb(self, perception_objects):
"""Callback for the object database."""
"""
Callback for the object database.

Args:
perception_objects (object): a list of objects that are passed.
"""
self.total_num = len(perception_objects.objects)
for o in perception_objects.objects:
if o.name == self.looking_for:
Expand All @@ -120,23 +182,45 @@ def object_cb(self, perception_objects):
self.ensuring_object_cb(missings_objs)

def remove_found(self, name):
"""Remove an object that has been listed as found."""
"""
Remove an object that has been listed as found.

Args:
name (string): a name of the object is being passed in.
"""
self.found.remove(name)

def ensure_object_permanence(self, object_dep, cb):
"""Ensure that all the objects in the object_dep list remain in the database.
Call the callback if this isn't true."""
"""
Ensure that all the objects in the object_dep list remain in the database.
Call the callback if this isn't true.

Args:
object_dep: a new object is passed in order to be set.
cb : The callback that is called when a new object is added to the database.
"""
if object_dep is None or cb is None:
return
self.ensuring_objects = True
self.ensuring_object_cb = cb
self.ensuring_object_dep = object_dep

def stop_ensuring_object_permanence(self):
"""Stop ensuring that objects remain in the database."""
"""
Stop ensuring that objects remain in the database.
"""
self.ensuring_objects = False

def _wait_for_position(self, timeout=10):
def _wait_for_position(self, timeout=10) -> bool:
"""
A possible position is being stored in a variable.

Args:
timeout (int): time limit to retrieve something.

Returns:
bool: Determines whether the position is found within the time limit.
"""
count = 0
while self.position is None:
if self.navigator is not None:
Expand All @@ -148,7 +232,15 @@ def _wait_for_position(self, timeout=10):
return True

async def get_closest_object(self, objects):
"""Get the closest mission."""
"""
Get the closest mission.

Args:
objects (object): a list of objects are being passed in.

Returns:
A object with the closest mission / distance is returned.
"""
pobjs = []
for obj in objects:
req = ObjectDBQueryRequest()
Expand All @@ -171,6 +263,15 @@ async def get_closest_object(self, objects):
return min_obj

async def _dist(self, x):
"""
Finds the distance of the object keeping into account its current position.

Args:
x (object): a specific object is being passed in order to retrieve its position.

Returns:
the distance between the two objects is being returned.
"""
if self.position is None:
success = asyncio.create_task(self._wait_for_position)
if not success:
Expand All @@ -188,7 +289,18 @@ async def get_object(
thresh=50,
thresh_strict=50,
):
"""Get an object from the database."""
"""
Get an object from the database.

Args:
object_name (string): the name of the object is passed in.
volume_only (bool): determines whether it is volume-based or not.
thresh (int): a maximum distance is passed in.
thresh_strict(int): a more strict maximum distance is passed in.

Returns:
closest_potential_object (object): returns the object that is within the closest range.
"""
if volume_only:
req = ObjectDBQueryRequest()
req.name = object_name
Expand Down Expand Up @@ -233,6 +345,15 @@ async def get_object(
return closest_potential_object

def wait_for_additional_objects(self, timeout=60):
"""
Returns true/false whether additional objects are being passed in.

Args:
timeout (int): maximum time limit of 60 seconds to run the program.

Returns:
(bool): determines whether additional objects are added.
"""
num_items = self.num_items
start = time.time()
while timeout < time.time() - start:
Expand All @@ -241,14 +362,36 @@ def wait_for_additional_objects(self, timeout=60):
return False

def set_color(self, color, name):
"""Set the color of an object in the database."""
"""
Sets the color of an object in the database.

Args:
color: the new color of the object is passed in.
name: the name of the object is passed in.
"""
raise NotImplementedError

def set_fake_position(self, pos):
"""Set the position of a fake perception object."""
"""
Sets the position of a fake perception object.

Args:
pos (int): the new position of the object is passed in.
"""
raise NotImplementedError

async def get_objects_in_radius(self, pos, radius, objects="all"):
"""
Retrieves the objects that are present within the radius.

Args:
pos (int): the position of the object is passed in.
radius (int): the radius of the object is passed in.
objects (object): this is a list of all the objects that needs to be compared.

Returns:
ans (object): returns the objects that are present within the radius.
"""
req = ObjectDBQueryRequest()
req.name = "all"
resp = await self._database(req)
Expand Down
Loading