Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BaseListeningAgent #793

Merged
merged 10 commits into from
Nov 20, 2024
1 change: 1 addition & 0 deletions python/hyperon/exts/agents/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .agent_base import AgentObject
from .agent_base import agent_atoms
from .agent_base import BaseListeningAgent
40 changes: 40 additions & 0 deletions python/hyperon/exts/agents/agent_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,20 @@ def __init__(self, method, args):
self._result = Queue()
self.method = method
self.args = args

def run(self):
for r in self.method(*self.args):
self._result.put(r)

def __iter__(self):
return self

def __next__(self):
if self._result.empty() and not self.is_alive():
raise StopIteration
return self._result.get()


class AgentObject:

'''
Expand Down Expand Up @@ -166,6 +170,42 @@ def __metta_call__(self, *args):
return [E()]
return st

class BaseListeningAgent(AgentObject):
def __init__(self, path=None, atoms={}, include_paths=None, code=None):
super().__init__(path, atoms, include_paths, code)
self.messages = Queue()
self.running = False
self._output = []
self.lock = threading.RLock()

def start(self, *args):
if not args:
args = ()
self.running = True
st = StreamMethod(self.messages_processor, args)
st.start()

def message_processor(self, message, *args):
return []

def messages_processor(self, *args):
while self.running:
if not self.messages.empty():
m = self.messages.get()
with self.lock:
self._output = self.message_processor(m, *args)
return []

def stop(self):
self.running = False
return []

def input(self, msg):
self.messages.put(msg)
return []

def get_output(self):
return self._output

@register_atoms(pass_metta=True)
def agent_atoms(metta):
Expand Down
Loading