Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an S3 processor agent #755

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
012e520
S3 processor agent (#1)
cdbartholomew Jan 13, 2024
7d32049
Updating OpenAI embedding example to use gateways to work better with…
cdbartholomew Jan 21, 2024
dd2f41a
Add Enrico's fix for docker run missing class paths (#3)
cdbartholomew Jan 27, 2024
b2ab54b
Collection name from record (#4)
cdbartholomew Jan 27, 2024
058c918
Allow proxy to any any agent, not just service type (#5)
cdbartholomew Jan 29, 2024
e5e229a
Bump openai python version
cdbartholomew Jan 29, 2024
32a8ac5
Update S3 processor to emit errors
cdbartholomew Jan 29, 2024
c15c8c4
Make Pulsar connections work on non-default public/default tenant and…
cdbartholomew Mar 18, 2024
799ab3f
Bumping the Kafka version since the old version is no longer available
cdbartholomew Mar 18, 2024
e692602
Add support for Pulsar in docker run
cdbartholomew Mar 20, 2024
2d693d3
Fix for duplicate tenant/namespace in topic
cdbartholomew Mar 20, 2024
9a85746
Allow resource size and parallelism to be specified as variables; fix…
cdbartholomew Mar 21, 2024
1ada050
Update to latest Pinecone SDK that works with servless indexes
cdbartholomew Mar 26, 2024
b72bf70
Add similarity score to Pinecone query output
cdbartholomew Mar 26, 2024
e1ecbd9
Add fn:sha256 function
cdbartholomew Mar 26, 2024
1b1f1df
Reduce the receiver queue size on Pulsar clients to limit the number …
cdbartholomew Mar 27, 2024
8ed2ee7
Trying an even small receiver queue size
cdbartholomew Mar 27, 2024
b233806
Log an error message on a exception from OpenAI with non-streaming ch…
cdbartholomew Mar 27, 2024
6d31d37
Update GitHub actions to run unit tests on a push to main and push ta…
cdbartholomew Mar 28, 2024
a8f038c
Adding retry logic to Pulsar producer creation to avoid null pointer …
cdbartholomew Apr 1, 2024
45d808a
Updating poetry.lock file
cdbartholomew Apr 1, 2024
8b7b710
Adding lock around the creation of producer in Pulsar write
cdbartholomew Apr 1, 2024
5badc7e
Make Pulsar producer and schema volatile
cdbartholomew Apr 1, 2024
45ae49e
Additional logging around intermittently failing API gateway test
cdbartholomew Apr 2, 2024
f88630b
Add support for Voyage AI embeddings
cdbartholomew Apr 3, 2024
78948b7
Update intermittently failing test to not depend on order of messages…
cdbartholomew Apr 3, 2024
8ccd7b2
Separating the lazy initialization of the schema from the initializa…
cdbartholomew Apr 3, 2024
c194ba1
Switch Voyage HTTP client to 1.1
cdbartholomew Apr 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ dist
.tox
build/
venv/
.vscode
81 changes: 68 additions & 13 deletions dev/s3_upload.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,71 @@ file=$4
s3_key=minioadmin
s3_secret=minioadmin

fileWithoutPath=$(basename $file)
resource="/${bucket}/${fileWithoutPath}"
content_type="application/octet-stream"
date=`date -R`
_signature="PUT\n\n${content_type}\n${date}\n${resource}"
signature=`echo -en ${_signature} | openssl sha1 -hmac ${s3_secret} -binary | base64`
set -x
curl -X PUT -T "${file}" \
-H "Host: ${host}" \
-H "Date: ${date}" \
-H "Content-Type: ${content_type}" \
-H "Authorization: AWS ${s3_key}:${signature}" \
${url}${resource}
# Function to generate a signature
generate_signature() {
local method=$1
local content_type=$2
local date=$3
local resource=$4

local string_to_sign="${method}\n\n${content_type}\n${date}\n${resource}"
echo -en ${string_to_sign} | openssl sha1 -hmac ${s3_secret} -binary | base64
}

# Check if the bucket exists
check_bucket_exists() {
local check_resource="/${bucket}/"
local check_date=`date -R`
local check_signature=$(generate_signature "HEAD" "" "${check_date}" "${check_resource}")

local status_code=$(curl -s -o /dev/null -w "%{http_code}" -X HEAD \
-H "Host: ${host}" \
-H "Date: ${check_date}" \
-H "Authorization: AWS ${s3_key}:${check_signature}" \
${url}${check_resource})

if [ "$status_code" == "404" ]; then
return 1 # Bucket does not exist
else
return 0 # Bucket exists
fi
}

# Create the bucket
create_bucket() {
local create_resource="/${bucket}/"
local create_date=`date -R`
local create_signature=$(generate_signature "PUT" "" "${create_date}" "${create_resource}")

curl -X PUT \
-H "Host: ${host}" \
-H "Date: ${create_date}" \
-H "Authorization: AWS ${s3_key}:${create_signature}" \
${url}${create_resource}
}

# Upload the file
upload_file() {
local file_without_path=$(basename $file)
local upload_resource="/${bucket}/${file_without_path}"
local content_type="application/octet-stream"
local upload_date=`date -R`
local upload_signature=$(generate_signature "PUT" "${content_type}" "${upload_date}" "${upload_resource}")

curl -X PUT -T "${file}" \
-H "Host: ${host}" \
-H "Date: ${upload_date}" \
-H "Content-Type: ${content_type}" \
-H "Authorization: AWS ${s3_key}:${upload_signature}" \
${url}${upload_resource}
}

# Main script
if ! check_bucket_exists; then
echo "Bucket does not exist. Creating bucket: ${bucket}"
create_bucket
fi

echo "Uploading file: ${file}"
upload_file

25 changes: 25 additions & 0 deletions examples/applications/compute-openai-embeddings/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Computing text embeddings with Open AI

This sample application shows how to use Open AI to compute text embeddings by calling the API.

## Configure you OpenAI API Key

Export to the ENV the access key to OpenAI

```
export OPEN_AI_ACCESS_KEY=...
```

## Deploy the LangStream application

```
./bin/langstream docker run test -app examples/applications/compute-openai-embeddings -s examples/secrets/secrets.yaml
```

## Talk with the Chat bot using the CLI
Since the application opens a gateway, we can use the gateway API to send and consume messages.

```
./bin/langstream gateway chat test -cg output -pg input -p sessionId=$(uuidgen)
```

21 changes: 21 additions & 0 deletions examples/applications/compute-openai-embeddings/gateways.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
gateways:
- id: "input"
type: produce
topic: "input-topic"
parameters:
- sessionId
produceOptions:
headers:
- key: langstream-client-session-id
valueFromParameters: sessionId

- id: "output"
type: consume
topic: "output-topic"
parameters:
- sessionId
consumeOptions:
filters:
headers:
- key: langstream-client-session-id
valueFromParameters: sessionId
8 changes: 6 additions & 2 deletions examples/applications/compute-openai-embeddings/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@ topics:
errors:
on-failure: "skip"
pipeline:
- name: "convert-to-structure"
input: "input-topic"
type: "document-to-json"
configuration:
text-field: "text"
- name: "compute-embeddings"
id: "step1"
type: "compute-ai-embeddings"
input: "input-topic"
output: "output-topic"
configuration:
model: "${secrets.open-ai.embeddings-model}" # This needs to match the name of the model deployment, not the base model
embeddings-field: "value.embeddings"
text: "{{ value.name }} {{ value.description }}"
text: "{{ value.text }}"
batch-size: 10
# this is in milliseconds. It is important to take this value into consideration when using this agent in the chat response pipeline
# in fact this value impacts the latency of the response
Expand Down
2 changes: 1 addition & 1 deletion examples/applications/openai-text-completions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This sample application shows how to use the `gpt-3.5-turbo-instruct` Open AI model.

## Configure you OpenAI
## Configure your OpenAI key


```
Expand Down
1 change: 1 addition & 0 deletions examples/applications/s3-processor/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
java/lib/*
36 changes: 36 additions & 0 deletions examples/applications/s3-processor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Preprocessing Text

This sample application shows how to read a single file from an S3 bucket and process in a pipeline.

The pipeline will:


- Read a file from and S3 bucket which is specified in the value of the message sent to the input topic
- Extract text from document files (PDF, Word...)
- Split the text into chunks
- Write the chunks to the output topic

## Prerequisites

Prepare some PDF files and upload them to a bucket in S3.

## Deploy the LangStream application

```
./bin/langstream docker run test -app examples/applications/s3-processor -s examples/secrets/secrets.yaml --docker-args="-p9900:9000"
```

Please note that here we are adding --docker-args="-p9900:9000" to expose the S3 API on port 9900.


## Write some documents in the S3 bucket

```
# Upload a document to the S3 bucket
dev/s3_upload.sh localhost http://localhost:9900 documents README.md
dev/s3_upload.sh localhost http://localhost:9900 documents examples/applications/s3-source/simple.pdf
```

## Interact with the pipeline

Now you can use the developer UI to specify which document to read from S3 and process. To process the simple.pdf file, simply send `simple.pdf` as the message.
20 changes: 20 additions & 0 deletions examples/applications/s3-processor/configuration.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

configuration:
resources:
dependencies:
70 changes: 70 additions & 0 deletions examples/applications/s3-processor/extract-text.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name: "Extract and manipulate text"
topics:
- name: "request-topic"
creation-mode: create-if-not-exists
- name: "response-topic"
creation-mode: create-if-not-exists
pipeline:
- name: "convert-to-structure"
type: "document-to-json"
input: "request-topic"
configuration:
text-field: "filename"
- name: "Process file from S3"
type: "s3-processor"
configuration:
bucketName: "${secrets.s3.bucket-name}"
endpoint: "${secrets.s3.endpoint}"
access-key: "${secrets.s3.access-key}"
secret-key: "${secrets.s3.secret}"
region: "${secrets.s3.region}"
objectName: "{{ value.filename }}"
- name: "Extract text"
type: "text-extractor"
- name: "Split into chunks"
type: "text-splitter"
configuration:
splitter_type: "RecursiveCharacterTextSplitter"
chunk_size: 400
separators: ["\n\n", "\n", " ", ""]
keep_separator: false
chunk_overlap: 100
length_function: "cl100k_base"
- name: "Convert to structured data"
type: "document-to-json"
configuration:
text-field: text
copy-properties: true
- name: "prepare-structure"
type: "compute"
output: "response-topic"
configuration:
fields:
- name: "value.filename"
expression: "properties.name"
type: STRING
- name: "value.chunk_id"
expression: "properties.chunk_id"
type: STRING
- name: "value.language"
expression: "properties.language"
type: STRING
- name: "value.chunk_num_tokens"
expression: "properties.chunk_num_tokens"
type: STRING
38 changes: 38 additions & 0 deletions examples/applications/s3-processor/gateways.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

gateways:
- id: "request"
type: produce
topic: "request-topic"
parameters:
- sessionId
produceOptions:
headers:
- key: langstream-client-session-id
valueFromParameters: sessionId

- id: "reply"
type: consume
topic: "response-topic"
parameters:
- sessionId
consumeOptions:
filters:
headers:
- key: langstream-client-session-id
valueFromParameters: sessionId
Binary file added examples/applications/s3-processor/simple.pdf
Binary file not shown.
7 changes: 7 additions & 0 deletions examples/applications/s3-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ The extract-text.yaml file defines a pipeline that will:

Prepare some PDF files and upload them to a bucket in S3.

## Configure your OpenAI key


```
export OPEN_AI_ACCESS_KEY=...
```

## Deploy the LangStream application

```
Expand Down
9 changes: 9 additions & 0 deletions langstream-agents/langstream-agent-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>langstream-agents-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.samskivert</groupId>
<artifactId>jmustache</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@
import ai.langstream.api.runner.code.AgentCode;
import ai.langstream.api.runner.code.AgentCodeProvider;

public class S3SourceAgentCodeProvider implements AgentCodeProvider {
public class S3AgentCodeProvider implements AgentCodeProvider {
@Override
public boolean supports(String agentType) {
return "s3-source".equals(agentType);
return switch (agentType) {
case "s3-source", "s3-processor" -> true;
default -> false;
};
}

@Override
public AgentCode createInstance(String agentType) {
return new S3Source();
return switch (agentType) {
case "s3-source" -> new S3Source();
case "s3-processor" -> new S3Processor();
default -> throw new IllegalStateException();
};
}
}
Loading
Loading