Skip to content

Commit

Permalink
Berke/alert (#4877)
Browse files Browse the repository at this point in the history
* fix: mkdir if no dir, feat: alert file metadata

* feat: add cola example

* fix: formatting

* feat: working updates but no db edit

* feat: working commit alert 0

* feat: add drive demo

* feat: ordered docs in alert

* fix: edit document for example

* add: experiments

* feat: last edit, prettify meta info

* feat: carry some utils under example_utils

* fix: add init for import

* fix: format

* fix: how to run

* fix: drive example refactor

* fix: adjust prompt

* fix: rm line from json

* fix: remove logs, explain example_utils

* fix: remove drive example from pr

* fix: rm commented out, use logging

* fix: refactor

* fix: remove experimental commits

* fix: typo, add exception for openai requestor

* fix: rm  duplicate key in exception

* fix: gitignore

* fix: remove comment, logging

* fix: change description

* feat: log slack notif to terminal

* fix: openai log shorten

* fix: token to key in example

* Update public/llm-app/examples/pipelines/alert/app.py

Co-authored-by: Jan Chorowski <[email protected]>

* Update public/llm-app/examples/pipelines/alert/app.py

Co-authored-by: Jan Chorowski <[email protected]>

* Update public/llm-app/examples/pipelines/alert/app.py

Co-authored-by: Jan Chorowski <[email protected]>

* Update public/llm-app/examples/pipelines/alert/app.py

Co-authored-by: Jan Chorowski <[email protected]>

* Update public/llm-app/examples/pipelines/alert/app.py

Co-authored-by: Jan Chorowski <[email protected]>

* Update public/llm-app/examples/pipelines/alert/app.py

Co-authored-by: Jan Chorowski <[email protected]>

* fix: long lines

---------

Co-authored-by: Jan Chorowski <[email protected]>
GitOrigin-RevId: a2424f719a2bedb20e988790d9cd19c927de23c3
  • Loading branch information
2 people authored and Manul from Pathway committed Nov 8, 2023
1 parent 034fd97 commit 1aa06f9
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 26 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,6 @@ cython_debug/
#.idea/

pw-env/

# llm debug
examples/ui/data/*
6 changes: 6 additions & 0 deletions examples/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .example_utils import find_last_modified_file, get_file_info

__all__ = [
"find_last_modified_file",
"get_file_info",
]
3 changes: 3 additions & 0 deletions examples/data/magic-cola/live/documents.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"doc": "We will launch Logitech campaign in July 2023"}
{"doc": "Ohio store opening is delayed until further notice."}
{"doc": "Campaign for Magic Cola is going to start in November 2023."}
1 change: 1 addition & 0 deletions examples/data/magic-cola/staging/documents_extra.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"doc": "Country director had discussion with local managers and they agreed to push start of campaign of Magic Cola to January 1st, 2024. Please plan accordingly."}
70 changes: 70 additions & 0 deletions examples/example_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import os
import pwd
import time


def find_last_modified_file(directory) -> str:
"""These functions are meant to be replaced with functionality from Pathway"""
"""Retrieves last added or modified file path.
Args:
directory (str, Path): path of the directory for search
Returns:
path (str, Path): full path of the found last modified file
"""
latest_file = None
latest_time = 0

if os.path.exists(directory) and os.path.isdir(directory):
for root, dirs, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
file_time = os.path.getmtime(file_path)
file_time_create = os.path.getctime(file_path)

file_time = max(file_time_create, file_time)

if file_time > latest_time:
latest_time = file_time
latest_file = file_path

if latest_file is not None:
return latest_file
else:
return None
else:
return None


def get_file_info(file_path) -> dict:
"""These functions are meant to be replaced with functionality from Pathway"""
"""Retrieves os info about a given file.
Args:
file_path (str, Path): path of the file
Returns:
response (dict): dict containing information about file
"""
try:
modification_time = os.path.getmtime(file_path)
create_time = os.path.getctime(file_path)
modified_time_str = time.ctime(modification_time)
create_time_str = time.ctime(create_time)

last_edit = max(modification_time, create_time)
last_edit_time_str = time.ctime(last_edit)

owner_id = os.stat(file_path).st_uid
owner_name = pwd.getpwuid(owner_id).pw_name

return {
"File": file_path.split(os.sep)[-1],
"Modified Time": modified_time_str,
"Created Time": create_time_str,
"Owner": owner_name,
"Last Edit": last_edit_time_str,
}
except Exception as e:
return {"Error": str(e)}
83 changes: 57 additions & 26 deletions examples/pipelines/alert/app.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,54 @@
"""
Microservice for a context-aware ChatGPT assistant.
Microservice for a context-aware alerting ChatGPT assistant.
The following program reads in a collection of documents,
embeds each document using the OpenAI document embedding model,
then builds an index for fast retrieval of documents relevant to a question,
effectively replacing a vector database.
This demo is very similar to `contextful` example with an additional real time alerting capability.
For the demo, alerts are sent to the Slack (you need `slack_alert_channel_id` and `slack_alert_token`), you can
either put these env variables in .env file under llm-app directory,
or create env variables in terminal (ie. export in bash)
If you don't have Slack, you can leave them empty and app will print the notifications to standard output instead.
The program then starts a REST API endpoint serving queries about programming in Pathway.
Upon starting, a REST API endpoint is opened by the app to serve queries about input folder `data_dir`.
We can create notifications by sending query to API stating we want to be modified.
Alternatively, the provided Streamlit chat app can be used.
One example would be `Tell me and alert about start date of campaign for Magic Cola`
How It Works?
Each query text is first turned into a vector using OpenAI embedding service,
then relevant documentation pages are found using a Nearest Neighbor index computed
for documents in the corpus. A prompt is build from the relevant documentations pages
and sent to the OpenAI GPT-4 chat service for processing.
and sent to the OpenAI GPT3.5 chat service for processing and answering.
Once you run, Pathway looks for any changes in data sources, and efficiently detects changes to the relevant documents.
When a change of source documents is detected, the LLM is asked to answer the query again,
and if the new answer is sufficiently different, a notification is created.
Usage:
In the root of this repository run:
`poetry run ./run_examples.py alerts`
or, if all dependencies are managed manually rather than using poetry
You can either
`python examples/pipelines/alerts/app.py`
or
`python ./run_examples.py alert`
You can also run this example directly in the environment with llm_app instaslled.
You can also run this example directly in the environment with llm_app installed.
To call the REST API:
To create alerts:
You can call the REST API:
curl --data '{"user": "user", "query": "How to connect to Kafka in Pathway?"}' http://localhost:8080/ | jq
Or start streamlit UI:
First go to examples/ui directory with `cd llm-app/examples/ui/`
run `streamlit run server.py`
"""

import os

import pathway as pw
from pathway.stdlib.ml.index import KNNIndex

from examples.example_utils import find_last_modified_file, get_file_info
from llm_app import deduplicate, send_slack_alerts
from llm_app.model_wrappers import OpenAIChatGPTModel, OpenAIEmbeddingModel

Expand All @@ -46,12 +65,14 @@ class QueryInputSchema(pw.Schema):
# Helper Functions
@pw.udf
def build_prompt(documents, query):
docs_str = "\n".join(documents)
prompt = f"""Please process the documents below:
{docs_str}
docs_str = "\n".join(
[f"Doc-({idx}) -> {doc}" for idx, doc in enumerate(documents[::-1])]
)
prompt = f"""Given a set of documents, answer user query. If answer is not in docs, say it can't be inferred.
Respond to query: '{query}'
"""
Docs: {docs_str}
Query: '{query}'
Final Response:"""
return prompt


Expand Down Expand Up @@ -94,13 +115,15 @@ def make_query_id(user, query) -> str:


@pw.udf
def construct_notification_message(query: str, response: str) -> str:
return f'New response for question "{query}":\n{response}'
def construct_notification_message(query: str, response: str, metainfo=None) -> str:
return f'New response for question "{query}":\n{response}\n{str(metainfo)}'


@pw.udf
def construct_message(response, alert_flag):
def construct_message(response, alert_flag, metainfo=None):
if alert_flag:
if metainfo:
response += "\n" + str(metainfo)
return response + "\n\n🔔 Activated"
return response

Expand All @@ -109,9 +132,18 @@ def decision_to_bool(decision: str) -> bool:
return "yes" in decision.lower()


@pw.udf
def add_meta_info(file_path) -> dict:
fname = find_last_modified_file(file_path)
info_dict = get_file_info(fname)
return f"""\nBased on file: {info_dict['File']} modified by {info_dict['Owner']} on {info_dict['Last Edit']}."""


def run(
*,
data_dir: str = os.environ.get("PATHWAY_DATA_DIR", "./examples/data/pathway-docs/"),
data_dir: str = os.environ.get(
"PATHWAY_DATA_DIR", "./examples/data/magic-cola/live/"
),
api_key: str = os.environ.get("OPENAI_API_KEY", ""),
host: str = "0.0.0.0",
port: int = 8080,
Expand All @@ -130,7 +162,7 @@ def run(
documents = pw.io.jsonlines.read(
data_dir,
schema=DocumentInputSchema,
mode="streaming",
mode="streaming_with_deletions",
autocommit_duration_ms=50,
)

Expand Down Expand Up @@ -162,7 +194,7 @@ def run(
model.apply(
pw.this.prompt,
locator=model_locator,
temperature=0.3,
temperature=temperature,
max_tokens=100,
)
),
Expand Down Expand Up @@ -222,23 +254,22 @@ def acceptor(new: str, old: str) -> bool:
locator=model_locator,
max_tokens=20,
)
return decision_to_bool(decision)

pw.io.jsonlines.write(responses, "./examples/ui/data/new_responses.jsonl")
return decision_to_bool(decision)

deduplicated_responses = deduplicate(
responses,
col=responses.response,
acceptor=acceptor,
instance=responses.query_id,
)
pw.io.jsonlines.write(
deduplicated_responses, "./examples/ui/data/deduped_responses.jsonl"
)

alerts = deduplicated_responses.select(
message=construct_notification_message(pw.this.query, pw.this.response)
message=construct_notification_message(
pw.this.query, pw.this.response, add_meta_info(data_dir)
)
)

send_slack_alerts(alerts.message, slack_alert_channel_id, slack_alert_token)

pw.run()
Expand Down
1 change: 1 addition & 0 deletions llm_app/model_wrappers/api_clients/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(
import openai

openai.api_requestor.TIMEOUT_SECS = 90

openai.api_key = api_key
if api_type:
openai.api_type = api_type
Expand Down
7 changes: 7 additions & 0 deletions llm_app/model_wrappers/openai_wrapper/api_models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging

import pathway as pw

from llm_app.model_wrappers.api_clients.clients import (
Expand All @@ -7,6 +9,8 @@
)
from llm_app.model_wrappers.base import BaseModel

logfun = logging.debug


class MessagePreparer:
@staticmethod
Expand Down Expand Up @@ -43,7 +47,10 @@ def __call__(self, text: str, locator="gpt-3.5-turbo", **kwargs) -> str:
kwargs["model"] = locator

messages = MessagePreparer.prepare_chat_messages(text)

logfun(f"Calling OpenAI API with: {messages}\n")
response = self.api_client.make_request(messages=messages, **kwargs)
logfun(f"API Response: {response.choices[0].message.content}\n")
return response.choices[0].message.content

def apply(
Expand Down
4 changes: 4 additions & 0 deletions llm_app/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import logging
from typing import Any, Callable, TypeVar

import pathway as pw
import requests

logfun = logging.info


def send_slack_alerts(
message: pw.ColumnReference, slack_alert_channel_id, slack_alert_token
Expand All @@ -11,6 +14,7 @@ def send_slack_alert(key, row, time, is_addition):
if not is_addition:
return
alert_message = row[message.name]
logfun(alert_message)
requests.post(
"https://slack.com/api/chat.postMessage",
data="text={}&channel={}".format(alert_message, slack_alert_channel_id),
Expand Down

0 comments on commit 1aa06f9

Please sign in to comment.