Skip to content

Commit

Permalink
dummy threads in agents
Browse files Browse the repository at this point in the history
  • Loading branch information
Necr0x0Der committed Oct 21, 2024
1 parent 0476b7d commit 37c239c
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 29 deletions.
60 changes: 33 additions & 27 deletions python/hyperon/atoms.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,35 @@ class MettaError(Exception):
, but we don't want to output Python error stack."""
pass

def unwrap_args(atoms):
args = []
kwargs = {}
for a in atoms:
if isinstance(a, ExpressionAtom):
ch = a.get_children()
if len(ch) > 0 and repr(ch[0]) == "Kwargs":
for c in ch[1:]:
try:
kwarg = c.get_children()
assert len(kwarg) == 2
except:
raise RuntimeError(f"Incorrect kwarg format {kwarg}")
try:
kwargs[get_string_value(kwarg[0])] = kwarg[1].get_object().content
except:
raise NoReduceError()
continue
try:
args.append(a.get_object().content)
except:
# NOTE:
# Currently, applying grounded operations to pure atoms is not reduced.
# If we want, we can raise an exception, or form an error expression instead,
# so a MeTTa program can catch and analyze it.
# raise RuntimeError("Grounded operation " + self.name + " with unwrap=True expects only grounded arguments")
raise NoReduceError()
return args, kwargs

class OperationObject(GroundedObject):
"""
An OperationObject represents an operation as a grounded object, allowing for more
Expand Down Expand Up @@ -385,32 +414,7 @@ def execute(self, *atoms, res_typ=AtomType.UNDEFINED):
"""
# type-check?
if self.unwrap:
args = []
kwargs = {}
for a in atoms:
if isinstance(a, ExpressionAtom):
ch = a.get_children()
if len(ch) > 0 and repr(ch[0]) == "Kwargs":
for c in ch[1:]:
try:
kwarg = c.get_children()
assert len(kwarg) == 2
except:
raise RuntimeError(f"Incorrect kwarg format {kwarg}")
try:
kwargs[get_string_value(kwarg[0])] = kwarg[1].get_object().content
except:
raise NoReduceError()
continue
try:
args.append(a.get_object().content)
except:
# NOTE:
# Currently, applying grounded operations to pure atoms is not reduced.
# If we want, we can raise an exception, or form an error expression instead,
# so a MeTTa program can catch and analyze it.
# raise RuntimeError("Grounded operation " + self.name + " with unwrap=True expects only grounded arguments")
raise NoReduceError()
args, kwargs = unwrap_args(atoms)
try:
result = self.op(*args, **kwargs)
except MettaError as e:
Expand All @@ -422,7 +426,9 @@ def execute(self, *atoms, res_typ=AtomType.UNDEFINED):
return [ValueAtom(result, res_typ)]
else:
result = self.op(*atoms)
if not isinstance(result, list):
try:
iter(result)
except TypeError:
raise RuntimeError("Grounded operation `" + self.name + "` should return list")
return result

Expand Down
53 changes: 51 additions & 2 deletions python/hyperon/exts/agents/agent_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,42 @@
'''
This is very preliminary and incomplete PoC version.
However, it is put to exts, because metta-motto depends on it.
Reagrding threading:
- Generic threading for metta can be introduced with
parallel and sequential composition, for-comprehension, etc.
Agents could be built on top of this functionality. However,
this piece of code was driven by metta-motto demands.
- Two main cases for agents are:
-- Immediate call with inputs to get outputs
-- Asynchronous events and responses
Supporting both cases in one implementation is more convenient,
because both of them can be needed simultaneously in certain
domains (e.g. metta-motto)
- Implementation can be quite different.
-- Agents could be started explicitly
-- They could inherint from StreamMethod
-- Other methods could be called directly without StreamMethod wrapper
All these nuances are to be fleshed out
'''

import threading
from queue import Queue
class StreamMethod(threading.Thread):
def __init__(self, method, args):
super().__init__() #daemon=True
self._result = Queue()
self.method = method
self.args = args
def run(self):
for r in self.method(*self.args):
self._result.put(r)
def __iter__(self):
return self
def __next__(self):
if self._result.empty() and not self.is_alive():
raise StopIteration
return self._result.get()

class AgentObject:

'''
Expand Down Expand Up @@ -58,6 +92,9 @@ def _try_unwrap(self, val):
return repr(val)

def __init__(self, path=None, atoms={}, include_paths=None, code=None):
if path is None and code is None:
# purely Python agent
return
# The first argument is either path or code when called from MeTTa
if isinstance(path, ExpressionAtom):# and path != E():
code = path
Expand Down Expand Up @@ -106,16 +143,28 @@ def __call__(self, atom):
)
return self._metta.evaluate_atom(atom)

def is_daemon(self):
return hasattr(self, 'daemon') and self.daemon is True

def __metta_call__(self, *args):
call = True
method = self.__call__
if len(args) > 0 and isinstance(args[0], SymbolAtom):
n = args[0].get_name()
if n[0] == '.' and hasattr(self, n[1:]):
method = getattr(self, n[1:])
args = args[1:]
call = False
if self._unwrap:
return OperationObject(f"{method}", method).execute(*args)
return method(*args)
method = OperationObject(f"{method}", method).execute
st = StreamMethod(method, args)
st.start()
# We don't return the stream here; otherwise it will be consumed immediately.
# If the agent itself would be StreamMethod, its results could be accessbile.
# Here, they are lost (TODO?).
if call and self.is_daemon():
return [E()]
return st


@register_atoms(pass_metta=True)
Expand Down
69 changes: 69 additions & 0 deletions python/hyperon/exts/agents/tests/test_agents.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from hyperon import MeTTa
from hyperon.exts.agents import AgentObject
from queue import Queue
from time import sleep

class MyAgent(AgentObject):
'''
Expand All @@ -20,3 +22,70 @@ def __call__(self, a, b):
! (&agent 7 8)
! (&agent .subs 4)
'''))

# =================================

class Agent1(AgentObject):
def __call__(self):
for x in range(10):
yield x
sleep(1)
class Agent2(AgentObject):
def __call__(self, xs):
for x in xs:
yield x*2
class Agent3(AgentObject):
def __call__(self, xs):
for x in xs:
print("Result: ", x)

m = MeTTa()
m.register_atom('new-agent-1', Agent1.agent_creator_atom())
m.register_atom('new-agent-2', Agent2.agent_creator_atom())
m.register_atom('new-agent-3', Agent3.agent_creator_atom())
print(m.run('''
! ((new-agent-3) ((new-agent-2) ((new-agent-1))))
'''))

# =================================

class Agnt(AgentObject):
def __init__(self):
self.messages = Queue()
self.running = False
self.output = Queue()
self.daemon = True
def __call__(self):
self.running = True
cnt = 0
while self.running:
if self.messages.empty():
self.output.put(f"Waiting {cnt}")
sleep(2)
cnt += 1
else:
m = self.messages.get()
self.output.put(m[::-1])
def stop(self):
self.running = False
def input(self, msg):
self.messages.put(msg)
def response(self):
if self.output.empty():
return None
return self.output.get()

m = MeTTa()
m.register_atom('agnt', Agnt.agent_creator_atom())
print(m.run('''
! (bind! &a1 (agnt))
! (&a1)
! (println! "Agent is running")
! ((py-atom time.sleep) 1)
! ("First response:" (&a1 .response))
! (&a1 .input "Hello")
! (println! "Agent is receiving messages")
! ((py-atom time.sleep) 2)
! ("Second response:" (&a1 .response))
! (&a1 .stop)
'''))

0 comments on commit 37c239c

Please sign in to comment.