diff --git a/python/hyperon/atoms.py b/python/hyperon/atoms.py index 1dcbf24fd..81ce3ed3b 100644 --- a/python/hyperon/atoms.py +++ b/python/hyperon/atoms.py @@ -311,6 +311,35 @@ class MettaError(Exception): , but we don't want to output Python error stack.""" pass +def unwrap_args(atoms): + args = [] + kwargs = {} + for a in atoms: + if isinstance(a, ExpressionAtom): + ch = a.get_children() + if len(ch) > 0 and repr(ch[0]) == "Kwargs": + for c in ch[1:]: + try: + kwarg = c.get_children() + assert len(kwarg) == 2 + except: + raise RuntimeError(f"Incorrect kwarg format {kwarg}") + try: + kwargs[get_string_value(kwarg[0])] = kwarg[1].get_object().content + except: + raise NoReduceError() + continue + try: + args.append(a.get_object().content) + except: + # NOTE: + # Currently, applying grounded operations to pure atoms is not reduced. + # If we want, we can raise an exception, or form an error expression instead, + # so a MeTTa program can catch and analyze it. + # raise RuntimeError("Grounded operation " + self.name + " with unwrap=True expects only grounded arguments") + raise NoReduceError() + return args, kwargs + class OperationObject(GroundedObject): """ An OperationObject represents an operation as a grounded object, allowing for more @@ -385,32 +414,7 @@ def execute(self, *atoms, res_typ=AtomType.UNDEFINED): """ # type-check? if self.unwrap: - args = [] - kwargs = {} - for a in atoms: - if isinstance(a, ExpressionAtom): - ch = a.get_children() - if len(ch) > 0 and repr(ch[0]) == "Kwargs": - for c in ch[1:]: - try: - kwarg = c.get_children() - assert len(kwarg) == 2 - except: - raise RuntimeError(f"Incorrect kwarg format {kwarg}") - try: - kwargs[get_string_value(kwarg[0])] = kwarg[1].get_object().content - except: - raise NoReduceError() - continue - try: - args.append(a.get_object().content) - except: - # NOTE: - # Currently, applying grounded operations to pure atoms is not reduced. - # If we want, we can raise an exception, or form an error expression instead, - # so a MeTTa program can catch and analyze it. - # raise RuntimeError("Grounded operation " + self.name + " with unwrap=True expects only grounded arguments") - raise NoReduceError() + args, kwargs = unwrap_args(atoms) try: result = self.op(*args, **kwargs) except MettaError as e: @@ -422,7 +426,9 @@ def execute(self, *atoms, res_typ=AtomType.UNDEFINED): return [ValueAtom(result, res_typ)] else: result = self.op(*atoms) - if not isinstance(result, list): + try: + iter(result) + except TypeError: raise RuntimeError("Grounded operation `" + self.name + "` should return list") return result diff --git a/python/hyperon/exts/agents/agent_base.py b/python/hyperon/exts/agents/agent_base.py index 13b0e7f08..9891559ce 100644 --- a/python/hyperon/exts/agents/agent_base.py +++ b/python/hyperon/exts/agents/agent_base.py @@ -4,8 +4,42 @@ ''' This is very preliminary and incomplete PoC version. However, it is put to exts, because metta-motto depends on it. +Reagrding threading: +- Generic threading for metta can be introduced with + parallel and sequential composition, for-comprehension, etc. + Agents could be built on top of this functionality. However, + this piece of code was driven by metta-motto demands. +- Two main cases for agents are: + -- Immediate call with inputs to get outputs + -- Asynchronous events and responses + Supporting both cases in one implementation is more convenient, + because both of them can be needed simultaneously in certain + domains (e.g. metta-motto) +- Implementation can be quite different. + -- Agents could be started explicitly + -- They could inherint from StreamMethod + -- Other methods could be called directly without StreamMethod wrapper + All these nuances are to be fleshed out ''' +import threading +from queue import Queue +class StreamMethod(threading.Thread): + def __init__(self, method, args): + super().__init__() #daemon=True + self._result = Queue() + self.method = method + self.args = args + def run(self): + for r in self.method(*self.args): + self._result.put(r) + def __iter__(self): + return self + def __next__(self): + if self._result.empty() and not self.is_alive(): + raise StopIteration + return self._result.get() + class AgentObject: ''' @@ -58,6 +92,9 @@ def _try_unwrap(self, val): return repr(val) def __init__(self, path=None, atoms={}, include_paths=None, code=None): + if path is None and code is None: + # purely Python agent + return # The first argument is either path or code when called from MeTTa if isinstance(path, ExpressionAtom):# and path != E(): code = path @@ -106,16 +143,28 @@ def __call__(self, atom): ) return self._metta.evaluate_atom(atom) + def is_daemon(self): + return hasattr(self, 'daemon') and self.daemon is True + def __metta_call__(self, *args): + call = True method = self.__call__ if len(args) > 0 and isinstance(args[0], SymbolAtom): n = args[0].get_name() if n[0] == '.' and hasattr(self, n[1:]): method = getattr(self, n[1:]) args = args[1:] + call = False if self._unwrap: - return OperationObject(f"{method}", method).execute(*args) - return method(*args) + method = OperationObject(f"{method}", method).execute + st = StreamMethod(method, args) + st.start() + # We don't return the stream here; otherwise it will be consumed immediately. + # If the agent itself would be StreamMethod, its results could be accessbile. + # Here, they are lost (TODO?). + if call and self.is_daemon(): + return [E()] + return st @register_atoms(pass_metta=True) diff --git a/python/hyperon/exts/agents/tests/test_agents.py b/python/hyperon/exts/agents/tests/test_agents.py index a0c28fa82..bbc058352 100644 --- a/python/hyperon/exts/agents/tests/test_agents.py +++ b/python/hyperon/exts/agents/tests/test_agents.py @@ -1,5 +1,7 @@ from hyperon import MeTTa from hyperon.exts.agents import AgentObject +from queue import Queue +from time import sleep class MyAgent(AgentObject): ''' @@ -20,3 +22,70 @@ def __call__(self, a, b): ! (&agent 7 8) ! (&agent .subs 4) ''')) + +# ================================= + +class Agent1(AgentObject): + def __call__(self): + for x in range(10): + yield x + sleep(1) +class Agent2(AgentObject): + def __call__(self, xs): + for x in xs: + yield x*2 +class Agent3(AgentObject): + def __call__(self, xs): + for x in xs: + print("Result: ", x) + +m = MeTTa() +m.register_atom('new-agent-1', Agent1.agent_creator_atom()) +m.register_atom('new-agent-2', Agent2.agent_creator_atom()) +m.register_atom('new-agent-3', Agent3.agent_creator_atom()) +print(m.run(''' + ! ((new-agent-3) ((new-agent-2) ((new-agent-1)))) +''')) + +# ================================= + +class Agnt(AgentObject): + def __init__(self): + self.messages = Queue() + self.running = False + self.output = Queue() + self.daemon = True + def __call__(self): + self.running = True + cnt = 0 + while self.running: + if self.messages.empty(): + self.output.put(f"Waiting {cnt}") + sleep(2) + cnt += 1 + else: + m = self.messages.get() + self.output.put(m[::-1]) + def stop(self): + self.running = False + def input(self, msg): + self.messages.put(msg) + def response(self): + if self.output.empty(): + return None + return self.output.get() + +m = MeTTa() +m.register_atom('agnt', Agnt.agent_creator_atom()) +print(m.run(''' + ! (bind! &a1 (agnt)) + ! (&a1) + ! (println! "Agent is running") + ! ((py-atom time.sleep) 1) + ! ("First response:" (&a1 .response)) + ! (&a1 .input "Hello") + ! (println! "Agent is receiving messages") + ! ((py-atom time.sleep) 2) + ! ("Second response:" (&a1 .response)) + ! (&a1 .stop) +'''))