Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dummy threads in agents #784

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 33 additions & 27 deletions python/hyperon/atoms.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,35 @@ class MettaError(Exception):
, but we don't want to output Python error stack."""
pass

def unwrap_args(atoms):
args = []
kwargs = {}
for a in atoms:
if isinstance(a, ExpressionAtom):
ch = a.get_children()
if len(ch) > 0 and repr(ch[0]) == "Kwargs":
for c in ch[1:]:
try:
kwarg = c.get_children()
assert len(kwarg) == 2
except:
raise RuntimeError(f"Incorrect kwarg format {kwarg}")
try:
kwargs[get_string_value(kwarg[0])] = kwarg[1].get_object().content
except:
raise NoReduceError()
continue
try:
args.append(a.get_object().content)
except:
# NOTE:
# Currently, applying grounded operations to pure atoms is not reduced.
# If we want, we can raise an exception, or form an error expression instead,
# so a MeTTa program can catch and analyze it.
# raise RuntimeError("Grounded operation " + self.name + " with unwrap=True expects only grounded arguments")
raise NoReduceError()
return args, kwargs

class OperationObject(GroundedObject):
"""
An OperationObject represents an operation as a grounded object, allowing for more
Expand Down Expand Up @@ -385,32 +414,7 @@ def execute(self, *atoms, res_typ=AtomType.UNDEFINED):
"""
# type-check?
if self.unwrap:
args = []
kwargs = {}
for a in atoms:
if isinstance(a, ExpressionAtom):
ch = a.get_children()
if len(ch) > 0 and repr(ch[0]) == "Kwargs":
for c in ch[1:]:
try:
kwarg = c.get_children()
assert len(kwarg) == 2
except:
raise RuntimeError(f"Incorrect kwarg format {kwarg}")
try:
kwargs[get_string_value(kwarg[0])] = kwarg[1].get_object().content
except:
raise NoReduceError()
continue
try:
args.append(a.get_object().content)
except:
# NOTE:
# Currently, applying grounded operations to pure atoms is not reduced.
# If we want, we can raise an exception, or form an error expression instead,
# so a MeTTa program can catch and analyze it.
# raise RuntimeError("Grounded operation " + self.name + " with unwrap=True expects only grounded arguments")
raise NoReduceError()
args, kwargs = unwrap_args(atoms)
try:
result = self.op(*args, **kwargs)
except MettaError as e:
Expand All @@ -422,7 +426,9 @@ def execute(self, *atoms, res_typ=AtomType.UNDEFINED):
return [ValueAtom(result, res_typ)]
else:
result = self.op(*atoms)
if not isinstance(result, list):
try:
iter(result)
except TypeError:
raise RuntimeError("Grounded operation `" + self.name + "` should return list")
return result

Expand Down
53 changes: 51 additions & 2 deletions python/hyperon/exts/agents/agent_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,42 @@
'''
This is very preliminary and incomplete PoC version.
However, it is put to exts, because metta-motto depends on it.
Reagrding threading:
- Generic threading for metta can be introduced with
parallel and sequential composition, for-comprehension, etc.
Agents could be built on top of this functionality. However,
this piece of code was driven by metta-motto demands.
- Two main cases for agents are:
-- Immediate call with inputs to get outputs
-- Asynchronous events and responses
Supporting both cases in one implementation is more convenient,
because both of them can be needed simultaneously in certain
domains (e.g. metta-motto)
- Implementation can be quite different.
-- Agents could be started explicitly
-- They could inherint from StreamMethod
-- Other methods could be called directly without StreamMethod wrapper
All these nuances are to be fleshed out
'''

import threading
from queue import Queue
class StreamMethod(threading.Thread):
def __init__(self, method, args):
super().__init__() #daemon=True
self._result = Queue()
self.method = method
self.args = args
def run(self):
for r in self.method(*self.args):
self._result.put(r)
def __iter__(self):
return self
def __next__(self):
if self._result.empty() and not self.is_alive():
raise StopIteration
return self._result.get()

class AgentObject:

'''
Expand Down Expand Up @@ -58,6 +92,9 @@ def _try_unwrap(self, val):
return repr(val)

def __init__(self, path=None, atoms={}, include_paths=None, code=None):
if path is None and code is None:
# purely Python agent
return
# The first argument is either path or code when called from MeTTa
if isinstance(path, ExpressionAtom):# and path != E():
code = path
Expand Down Expand Up @@ -106,16 +143,28 @@ def __call__(self, atom):
)
return self._metta.evaluate_atom(atom)

def is_daemon(self):
return hasattr(self, 'daemon') and self.daemon is True

def __metta_call__(self, *args):
call = True
method = self.__call__
if len(args) > 0 and isinstance(args[0], SymbolAtom):
n = args[0].get_name()
if n[0] == '.' and hasattr(self, n[1:]):
method = getattr(self, n[1:])
args = args[1:]
call = False
if self._unwrap:
return OperationObject(f"{method}", method).execute(*args)
return method(*args)
method = OperationObject(f"{method}", method).execute
st = StreamMethod(method, args)
st.start()
# We don't return the stream here; otherwise it will be consumed immediately.
# If the agent itself would be StreamMethod, its results could be accessbile.
# Here, they are lost (TODO?).
if call and self.is_daemon():
return [E()]
return st


@register_atoms(pass_metta=True)
Expand Down
69 changes: 69 additions & 0 deletions python/hyperon/exts/agents/tests/test_agents.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from hyperon import MeTTa
from hyperon.exts.agents import AgentObject
from queue import Queue
from time import sleep

class MyAgent(AgentObject):
'''
Expand All @@ -20,3 +22,70 @@ def __call__(self, a, b):
! (&agent 7 8)
! (&agent .subs 4)
'''))

# =================================

class Agent1(AgentObject):
def __call__(self):
for x in range(10):
yield x
sleep(1)
class Agent2(AgentObject):
def __call__(self, xs):
for x in xs:
yield x*2
class Agent3(AgentObject):
def __call__(self, xs):
for x in xs:
print("Result: ", x)

m = MeTTa()
m.register_atom('new-agent-1', Agent1.agent_creator_atom())
m.register_atom('new-agent-2', Agent2.agent_creator_atom())
m.register_atom('new-agent-3', Agent3.agent_creator_atom())
print(m.run('''
! ((new-agent-3) ((new-agent-2) ((new-agent-1))))
'''))

# =================================

class Agnt(AgentObject):
def __init__(self):
self.messages = Queue()
self.running = False
self.output = Queue()
self.daemon = True
def __call__(self):
self.running = True
cnt = 0
while self.running:
if self.messages.empty():
self.output.put(f"Waiting {cnt}")
sleep(2)
cnt += 1
else:
m = self.messages.get()
self.output.put(m[::-1])
def stop(self):
self.running = False
def input(self, msg):
self.messages.put(msg)
def response(self):
if self.output.empty():
return None
return self.output.get()

m = MeTTa()
m.register_atom('agnt', Agnt.agent_creator_atom())
print(m.run('''
! (bind! &a1 (agnt))
! (&a1)
! (println! "Agent is running")
! ((py-atom time.sleep) 1)
! ("First response:" (&a1 .response))
! (&a1 .input "Hello")
! (println! "Agent is receiving messages")
! ((py-atom time.sleep) 2)
! ("Second response:" (&a1 .response))
! (&a1 .stop)
'''))
Loading