-
Notifications
You must be signed in to change notification settings - Fork 0
Task 입출력 파싱 로직
- 사용자가 직접 호출한 경우
- context를 할당
- kwargs를 할당
- 남은 인자에 대해 순서대로 args를 할당
- 이전
Runnable
에서 넘겨받은 경우- context를 할당
- kwargs를 할당 (사실상 무의미)
- 남은 인자가 하나면 이전 output을 그대로 할당
- 남은 인자가 여러 개고, 이전 output이 매핑 타입이면 키워드 매칭 시도
- 남은 인자가 여러 개고, 이전 output이 iterable 타입이면 순서대로 매칭
Task
의 input_config
와 output_config
를 완전히 없애고 기본적으로 type hint로만 모든 정보를 전달하는 방식을 사용한다.
사용자가 실행 인터페이스를 통해 직접 입력한 argument나, 이전 Runnable
의 결과를 넘겨받기 위한 파라미터는 다음과 같이 작성한다.
-
param: Ann[type, key]
형태로 type hint를 작성한다. -
param: type
으로 작성 시param: Ann[type]
와 동일하며,key
는 존재하지 않는 것으로 본다. - 아무런 타입 hint가 없으면
Ann[Any]
와 동일하게 취급한다.
사용자가 run
인터페이스를 통해 직접 호출하는 것은 가장 처음으로 마주하는 Task
의 함수를 직접 호출하는 것과 같다. 이 경우에는 key
가 전혀 사용되지 않는다.
Group
의 경우, 각 멤버의 id
를 키워드로 넘겨진 keyword argument로 해당 멤버에게 입력을 직접 할당한다.
위의 Group
경우나 Runnable
의 결과를 넘겨받는 경우처럼 하나의 함수 인터페이스에 한 객체가 입력으로 할당될 수 있다. 이 경우 다음의 규칙을 적용한다.
- 객체가
[]
연산자를 지원하는 경우-
key
가 같은 파라미터끼리 파싱을 수행한다.key
가 없는 파라미터들은 서로key
가 같다고 본다. -
key
를 통해 객체에 접근하여 값을 가져온다. 만약key
가 같은 파라미터가 하나라면 여기서 멈추고, 여러 개라면 객체가[]
연산자를 지원하지 않는 경우의 규칙을 추가 적용한다.
-
- 객체가
[]
연산자를 지원하지 않는 경우- 해당 객체가
Iterable
이면 파라미터에 순서대로 내부 값을 할당한다. 만약Iterable
이 아니면, 각 파라미터에 해당 객체를 그대로 (혹은 복사하여) 할당한다.
- 해당 객체가
@Task('task1')
def task1(a: int, b: Ann[int], c = 4) -> tuple:
return a, b, c
@Task('task2')
def task2(a: Ann[int, 1]) -> int:
return a ** 2
# 사용자가 직접 호출했으니, 일반적인 함수 호출과 동일한 규칙을 따른다.
task1.run(5, 3) # (5, 3, 4)
task1.run(2, c=9, b=1) # (2, 1, 9)
# add 함수가 *args를 받고 `self`를 반환하도록 수정할 생각..
pipe = Pipeline.add(
Group('group').add(task1),
task2
)
pipe.run(task1=(1,2))
'''
1. 첫번째 인자인 튜플 (1,2)가 'group'의 첫번째 멤버인 task1에 할당된다.
2. 튜플은 [] 연산자를 지원하므로, 객체가 [] 연산자를 지원하는 경우의 규칙을 적용한다.
3. key가 없는 파라미터가 3개이므로 `[]` 연산자를 지원하지 않는 경우의 규칙을 추가 적용한다.
4. (a=1, b=2)를 할당받아서 실행하므로 task1의 결과는 (1,2,4)가 된다.
5. task2의 파라미터 `a`의 key는 1이므로 (1,2,4)[1]가 `a`의 값이 된다.
6. 최종 결과는 4가 된다.
'''
Context
로부터 파라미터 값을 세팅하고자 할 때는 다음과 같이 입력한다.
-
param: Ctx[type, key]
형태로 type hint를 작성한다. -
param: Ctx[type]
형태로 작성 시,param: Ctx[type, 'param']
과 동일하게 취급한다.
Context에서 key
를 통해 값을 조회한다. key
에 해당하는 값이 없고, default 값도 없으면 pydantic error가 반환된다.
만약 key
를 recursive하게 적용하여 값을 가져오고 싶다면 K(key1, key2, ...)
와 같이 사용가능하다. K(key1)
는 key1
과 동일하다. 단, 각 key를 통해 가져오는 객체가 모두 []
연산자를 지원해야 한다.
함수의 반환값 역시 type hint를 통해 관련 정보를 전달한다. 입력 형식은 다음과 같다.
-
def fn(...) -> Ann[type]
혹은def fn(...) -> type
형태로 type hint를 작성한다.
파라미터와 다르게 key
는 사용되지 않으며 설령 입력한다고 해도 무시한다.
@Task('enhance query')
def enhance_query(
query: str,
history: list[Chat]
) -> tuple[str, list[str]]:
"""
사용자 질문을 대화 맥락에 맞게 재구성하고, 관련 문서를 찾기에 적절한 검색어 혹은 질문을 생성.
"""
return new_query, ['quetion or search terms']
@Task('retrive')
def retrieve_documents(
queries: Ann[list[str], 1],
max_document: Ctx[int],
vdb: Ctx[VectorDB]
) -> list[str]:
"""
Vector DB에서 주어진 검색어들을 바탕으로 적절한 문서를 가져옴.
"""
return ['documents']
@Task('action')
def do_action(
documents: list[str],
query: Ctx[str, K('enhance query', 0)]
) -> str | None:
"""
사용자의 요청을 수행하기 위해 단순 답변을 넘어
DB 조작, API 호출 등의 처리가 필요하다 판단되면 해당 처리를 수행하고,
어떤 동작을 수행했는지 안내 메세지를 생성하여 반환한다.
만약 그런 처리가 필요없다면 None을 반환한다.
"""
return '안내 메세지' or None
@Task('qa')
def answer(
documents: list[str],
query: Ctx[str, K('enhance query', 0)]
) -> str | None:
"""
주어진 문서와 사용자 질문을 토대로 적절한 답변을 생성한다.
만약, 적절한 답변을 생성할 수 없으면 None을 반환한다.
"""
return '답변' or None
chatbot = Pipeline('chatbot').add(
enhance_query,
retrieve,
Group('response').add(
qa,
action
)
)
response = chatbot.run(query, history)
if response['action']:
send(response['action'])
elif response['qa']:
send(response['qa'])
else:
send('사과 메세지')