Skip to content

Commit

Permalink
Navigator tools documentation (#1056)
Browse files Browse the repository at this point in the history
* Types and Docstring update

* Update navigator_emergency.py

* documentation added to navigator-tools

* small updates

---------

Co-authored-by: Cameron Brown <[email protected]>
  • Loading branch information
PRANETALLU and cbrxyz authored Sep 4, 2023
1 parent fa48768 commit a8bb524
Showing 1 changed file with 159 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
"""Use the DBHelper class to interface with the Database without having to deal with ROS things."""
import asyncio
import sys
import time
Expand All @@ -14,10 +13,29 @@


class DBHelper:
"""DBHelper class."""
"""
Use the DBHelper class to interface with the Database without having to deal with ROS things.
Attributes:
found (bool): checks whether the object is what the user has been looking for.
nh (axros.NodeHandle): processes database requests.
position (int): the current position of the object.
rot (): one of the dimensional values.
new_object_subscriber (): an object that is being called from the database.
ensuring_objects (bool): a variable state for the object.
ensuring_object_dep (): a list of objects being used for storage.
ensuring_object_cb (): this is considered when it is an "ensuring_objects".
looking_for (): a setter variable to which a name is being assigned.
is_found (bool): determines whether the object is found or not.
"""

def __init__(self, nh):
"""Initialize the DB helper class."""
"""
Initialize the DB helper class.
Args:
nh(NodeHandle): NodeHandle object
"""
self.found = set()
self.nh = nh
self.position = None
Expand All @@ -30,7 +48,12 @@ def __init__(self, nh):
self.is_found = False

async def init_(self, navigator=None):
"""Initialize the axros parts of the DBHelper."""
"""
Initialize the axros parts of the DBHelper.
Args:
navigator (NaviGator | None): Base NaviGator object
"""
# self._sub_database = yield self.nh.subscribe('/database/objects', PerceptionObjectArray, self.object_cb)
self._database = self.nh.get_service_client("/database/requests", ObjectDBQuery)
self.navigator = navigator
Expand All @@ -43,9 +66,24 @@ async def init_(self, navigator=None):
return self

def _odom_cb(self, odom):
"""
Sets the position and the rot values.
Args:
odom (object): Odom
"""
self.position, self.rot = nt.odometry_to_numpy(odom)[0]

async def get_object_by_id(self, my_id):
"""
Retrieves the object with the associated "my_id".
Args:
my_id (int): an "id" of the object is passed in.
Returns:
An individual object is being returned with a unique id.
"""
print(my_id)
req = ObjectDBQueryRequest()
req.name = "all"
Expand All @@ -57,7 +95,8 @@ async def begin_observing(self, cb):
"""
Get notified when a new object is added to the database.
cb : The callback that is called when a new object is added to the database
Args:
cb : The callback that is called when a new object is added to the database.
"""
self.new_object_subscriber = cb
req = ObjectDBQueryRequest()
Expand All @@ -78,6 +117,12 @@ async def begin_observing(self, cb):
self.new_object_subscriber(o)

async def get_unknown_and_low_conf(self):
"""
Gets unknown objects that are of low confidence.
Returns:
Objects that meet certain criteria.
"""
req = ObjectDBQueryRequest()
req.name = "all"
resp = await self._database(req)
Expand All @@ -91,17 +136,34 @@ async def get_unknown_and_low_conf(self):
return m

def set_looking_for(self, name):
"""
Setting to a name that is being looked for.
Args:
name (string): name of the object.
"""
self.looking_for = name

def is_found_func(self):
def is_found_func(self) -> bool:
"""
Determines whether the object is being found or not.
Returns:
The existence of the object is returned.
"""
if self.is_found:
self.looking_for = None
self.is_found = False
return True
return False

def object_cb(self, perception_objects):
"""Callback for the object database."""
"""
Callback for the object database.
Args:
perception_objects (object): a list of objects that are passed.
"""
self.total_num = len(perception_objects.objects)
for o in perception_objects.objects:
if o.name == self.looking_for:
Expand All @@ -120,23 +182,45 @@ def object_cb(self, perception_objects):
self.ensuring_object_cb(missings_objs)

def remove_found(self, name):
"""Remove an object that has been listed as found."""
"""
Remove an object that has been listed as found.
Args:
name (string): a name of the object is being passed in.
"""
self.found.remove(name)

def ensure_object_permanence(self, object_dep, cb):
"""Ensure that all the objects in the object_dep list remain in the database.
Call the callback if this isn't true."""
"""
Ensure that all the objects in the object_dep list remain in the database.
Call the callback if this isn't true.
Args:
object_dep: a new object is passed in order to be set.
cb : The callback that is called when a new object is added to the database.
"""
if object_dep is None or cb is None:
return
self.ensuring_objects = True
self.ensuring_object_cb = cb
self.ensuring_object_dep = object_dep

def stop_ensuring_object_permanence(self):
"""Stop ensuring that objects remain in the database."""
"""
Stop ensuring that objects remain in the database.
"""
self.ensuring_objects = False

def _wait_for_position(self, timeout=10):
def _wait_for_position(self, timeout=10) -> bool:
"""
A possible position is being stored in a variable.
Args:
timeout (int): time limit to retrieve something.
Returns:
bool: Determines whether the position is found within the time limit.
"""
count = 0
while self.position is None:
if self.navigator is not None:
Expand All @@ -148,7 +232,15 @@ def _wait_for_position(self, timeout=10):
return True

async def get_closest_object(self, objects):
"""Get the closest mission."""
"""
Get the closest mission.
Args:
objects (object): a list of objects are being passed in.
Returns:
A object with the closest mission / distance is returned.
"""
pobjs = []
for obj in objects:
req = ObjectDBQueryRequest()
Expand All @@ -171,6 +263,15 @@ async def get_closest_object(self, objects):
return min_obj

async def _dist(self, x):
"""
Finds the distance of the object keeping into account its current position.
Args:
x (object): a specific object is being passed in order to retrieve its position.
Returns:
the distance between the two objects is being returned.
"""
if self.position is None:
success = asyncio.create_task(self._wait_for_position)
if not success:
Expand All @@ -188,7 +289,18 @@ async def get_object(
thresh=50,
thresh_strict=50,
):
"""Get an object from the database."""
"""
Get an object from the database.
Args:
object_name (string): the name of the object is passed in.
volume_only (bool): determines whether it is volume-based or not.
thresh (int): a maximum distance is passed in.
thresh_strict(int): a more strict maximum distance is passed in.
Returns:
closest_potential_object (object): returns the object that is within the closest range.
"""
if volume_only:
req = ObjectDBQueryRequest()
req.name = object_name
Expand Down Expand Up @@ -233,6 +345,15 @@ async def get_object(
return closest_potential_object

def wait_for_additional_objects(self, timeout=60):
"""
Returns true/false whether additional objects are being passed in.
Args:
timeout (int): maximum time limit of 60 seconds to run the program.
Returns:
(bool): determines whether additional objects are added.
"""
num_items = self.num_items
start = time.time()
while timeout < time.time() - start:
Expand All @@ -241,14 +362,36 @@ def wait_for_additional_objects(self, timeout=60):
return False

def set_color(self, color, name):
"""Set the color of an object in the database."""
"""
Sets the color of an object in the database.
Args:
color: the new color of the object is passed in.
name: the name of the object is passed in.
"""
raise NotImplementedError

def set_fake_position(self, pos):
"""Set the position of a fake perception object."""
"""
Sets the position of a fake perception object.
Args:
pos (int): the new position of the object is passed in.
"""
raise NotImplementedError

async def get_objects_in_radius(self, pos, radius, objects="all"):
"""
Retrieves the objects that are present within the radius.
Args:
pos (int): the position of the object is passed in.
radius (int): the radius of the object is passed in.
objects (object): this is a list of all the objects that needs to be compared.
Returns:
ans (object): returns the objects that are present within the radius.
"""
req = ObjectDBQueryRequest()
req.name = "all"
resp = await self._database(req)
Expand Down

0 comments on commit a8bb524

Please sign in to comment.