Skip to content

Commit

Permalink
Adds LangChain translation example
Browse files Browse the repository at this point in the history
  • Loading branch information
rachfop committed Dec 15, 2023
1 parent c6cf076 commit 7327a65
Show file tree
Hide file tree
Showing 8 changed files with 1,679 additions and 825 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [dsl](dsl) - DSL workflow that executes steps defined in a YAML file.
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
* [gevent_async](gevent_async) - Combine gevent and Temporal.
* [langchain](langchain) - Orchestrate workflows for LangChain.
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
Expand Down
28 changes: 28 additions & 0 deletions langchain/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# LangChain Sample

This sample shows you how you can use Temporal to orchestrate workflows for [LangChain](https://www.langchain.com).

For this sample, the optional `langchain` dependency group must be included. To include, run:

poetry install --with langchain

Export your [OpenAI API key](https://platform.openai.com/api-keys) as an environment variable. Replace `YOUR_API_KEY` with your actual OpenAI API key.

export OPENAI_API_KEY='...'

To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
worker:

poetry run python worker.py

This will start the worker. Then, in another terminal, run the following to execute a workflow:

poetry run python starter.py

Then, in another terminal, run the following command to translate a phrase:

curl -X POST "http://localhost:8000/translate?phrase=hello%20world&language=Spanish"

Which should produce some output like:

{"translation":"Hola mundo"}
29 changes: 29 additions & 0 deletions langchain/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from dataclasses import dataclass

from temporalio import activity

from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate


@dataclass
class TranslateParams:
phrase: str
language: str


@activity.defn
async def translate_phrase(params: TranslateParams) -> dict:
# LangChain setup
template = """You are a helpful assistant who translates between languages.
Translate the following phrase into the specified language: {phrase}
Language: {language}"""
chat_prompt = ChatPromptTemplate.from_messages(
[
("system", template),
("human", "Translate"),
]
)
chain = chat_prompt | ChatOpenAI()
# Use the asynchronous invoke method
return await chain.ainvoke({"phrase": params.phrase, "language": params.language})
37 changes: 37 additions & 0 deletions langchain/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from contextlib import asynccontextmanager

import uvicorn
from activities import TranslateParams
from fastapi import FastAPI, HTTPException
from temporalio.client import Client
from workflow import LangChainWorkflow


@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.temporal_client = await Client.connect("localhost:7233")
yield


app = FastAPI(lifespan=lifespan)


@app.post("/translate")
async def translate(phrase: str, language: str):
client = app.state.temporal_client
try:
result = await client.execute_workflow(
LangChainWorkflow.run,
TranslateParams(phrase, language),
id=f"langchain-translation-{language.lower()}-{phrase.replace(' ', '-')}",
task_queue="langchain-task-queue",
)
translation_content = result.get("content", "Translation not available")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

return {"translation": translation_content}


if __name__ == "__main__":
uvicorn.run(app, host="localhost", port=8000)
37 changes: 37 additions & 0 deletions langchain/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import asyncio

from activities import translate_phrase
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import LangChainWorkflow

interrupt_event = asyncio.Event()


async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="langchain-task-queue",
workflows=[LangChainWorkflow],
activities=[translate_phrase],
)

print("\nWorker started, ctrl+c to exit\n")
await worker.run()
try:
# Wait indefinitely until the interrupt event is set
await interrupt_event.wait()
finally:
# The worker will be shutdown gracefully due to the async context manager
print("\nShutting down the worker\n")


if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print("\nInterrupt received, shutting down...\n")
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
17 changes: 17 additions & 0 deletions langchain/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from activities import TranslateParams, translate_phrase


@workflow.defn
class LangChainWorkflow:
@workflow.run
async def run(self, params: TranslateParams) -> dict:
return await workflow.execute_activity(
translate_phrase,
params,
schedule_to_close_timeout=timedelta(seconds=30),
)
Loading

0 comments on commit 7327a65

Please sign in to comment.