From b18b16e14198da5f6b75a4775806363037101983 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kaan=20=C3=87ayl=C4=B1?= Date: Thu, 15 Feb 2024 01:10:38 +0100 Subject: [PATCH] Add pipeline base class and a simple pipeline --- app/pipeline/__init__.py | 1 + app/pipeline/pipeline.py | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 app/pipeline/__init__.py create mode 100644 app/pipeline/pipeline.py diff --git a/app/pipeline/__init__.py b/app/pipeline/__init__.py new file mode 100644 index 00000000..7f74c8b4 --- /dev/null +++ b/app/pipeline/__init__.py @@ -0,0 +1 @@ +from pipeline.pipeline import SimplePipeline diff --git a/app/pipeline/pipeline.py b/app/pipeline/pipeline.py new file mode 100644 index 00000000..1379b990 --- /dev/null +++ b/app/pipeline/pipeline.py @@ -0,0 +1,37 @@ +from abc import ABCMeta, abstractmethod +from operator import itemgetter + +from langchain_core.output_parsers import StrOutputParser +from domain import IrisMessage, IrisMessageRole + + +class BasePipeline(metaclass=ABCMeta): + """ Abstract class for all pipelines """ + def __init__(self, name=None): + self.name = name + + def __repr__(self): + return f'{self.__class__.__name__} {self.name if self.name is not None else id(self)}' + + def __str__(self): + return f'{self.__class__.__name__} {self.name if self.name is not None else id(self)}' + + @abstractmethod + def run(self, *args, **kwargs) -> IrisMessage: + """ Run the pipeline """ + raise NotImplementedError + + +class SimplePipeline(BasePipeline): + def __init__(self, llm, name=None): + super().__init__(name=name) + self.llm = llm + self.pipeline = {"query": itemgetter("query")} | llm | StrOutputParser() + + def run(self, *args, query: IrisMessage, **kwargs) -> IrisMessage: + """ A simple pipeline that does not have any memory etc.""" + if query is None: + raise ValueError("IrisMessage must not be None") + message = query.text + response = self.pipeline.invoke({"query": message}) + return IrisMessage(role=IrisMessageRole.ASSISTANT, text=response)