Skip to content

Commit d6403fd

Browse files
Gaurav SahilGaurav Sahil
Gaurav Sahil
authored and
Gaurav Sahil
committed
merge base branch
2 parents e419e31 + a9abb1b commit d6403fd

File tree

11 files changed

+138
-51
lines changed

11 files changed

+138
-51
lines changed

.github/workflows/buf-update.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
make install-tools proto-update
2020
2121
- name: Create pull request
22-
uses: peter-evans/[email protected].6
22+
uses: peter-evans/[email protected].7
2323
with:
2424
title: Update Buf dependencies
2525
body: Automatic pull request for updating Buf dependencies

.github/workflows/release.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ jobs:
6969

7070
- name: Extract metadata (tags, labels) for Docker
7171
id: meta
72-
uses: docker/metadata-action@v5.6.1
72+
uses: docker/metadata-action@v5.7.0
7373
with:
7474
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
7575
# Makes no sense to have an image with tag v0, hence the check in the last line.
@@ -85,7 +85,7 @@ jobs:
8585
org.opencontainers.image.vendor=ConduitIO
8686
8787
- name: Build and push Docker image
88-
uses: docker/build-push-action@v6.13.0
88+
uses: docker/build-push-action@v6.15.0
8989
with:
9090
context: .
9191
push: true

cmd/conduit/root/config/config_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func TestConfigWithFlags(t *testing.T) {
8282
"--pipelines.path", "/var/lib/conduit/pipelines",
8383
"--preview.pipeline-arch-v2=true",
8484
"--processors.path", "/opt/conduit/processors",
85-
"--schema-registry.confluent.connection-string", "http://localhost:8081",
85+
"--schema-registry.confluent.connection-string", "http://localhost:8888",
8686
"--schema-registry.type", "confluent",
8787
"--dev.blockprofile", "/tmp/block.prof",
8888
"--dev.cpuprofile", "/tmp/cpu.prof",
@@ -106,8 +106,8 @@ func TestConfigWithFlags(t *testing.T) {
106106
"pipelines.error-recovery.max-retries: 10",
107107
"pipelines.error-recovery.max-retries-window: 15m0s",
108108
"schema-registry.type: confluent",
109-
"schema-registry.confluent.connection-string: http://localhost:8081",
110-
"preview.pipeline-arch-v2: false",
109+
"schema-registry.confluent.connection-string: http://localhost:8888",
110+
"preview.pipeline-arch-v2: true",
111111
"dev.cpuprofile: /tmp/cpu.prof",
112112
"dev.memprofile: /tmp/mem.prof",
113113
"dev.blockprofile: /tmp/block.prof",

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ require (
2727
github.com/gammazero/deque v1.0.0
2828
github.com/goccy/go-json v0.10.5
2929
github.com/golangci/golangci-lint v1.64.5
30-
github.com/google/go-cmp v0.6.0
30+
github.com/google/go-cmp v0.7.0
3131
github.com/google/uuid v1.6.0
3232
github.com/gorilla/websocket v1.5.3
3333
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -468,8 +468,8 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
468468
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
469469
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
470470
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
471-
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
472-
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
471+
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
472+
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
473473
github.com/google/go-containerregistry v0.20.2 h1:B1wPJ1SN/S7pB+ZAimcciVD+r+yV/l/DSArMxlbwseo=
474474
github.com/google/go-containerregistry v0.20.2/go.mod h1:z38EKdKh4h7IP2gSfUUqEvalZBqs6AoLeWfUy34nQC8=
475475
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=

pkg/conduit/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ type Config struct {
119119

120120
Preview struct {
121121
// PipelineArchV2 enables the new pipeline architecture.
122-
PipelineArchV2 bool `long:"preview.pipeline-arch-v2" usage:"enables experimental pipeline architecture v2 (note that the new architecture currently supports only 1 source and 1 destination per pipeline)"`
122+
PipelineArchV2 bool `long:"preview.pipeline-arch-v2" mapstructure:"pipeline-arch-v2" usage:"enables experimental pipeline architecture v2 (note that the new architecture currently supports only 1 source and 1 destination per pipeline)"`
123123
}
124124

125125
Dev struct {

pkg/plugin/processor/builtin/impl/cohere/command.go

+61-37
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ import (
3030
"github.com/jpillora/backoff"
3131
)
3232

33+
type commandModel interface {
34+
Command(ctx context.Context, content string) (string, error)
35+
}
36+
37+
type commandClient struct {
38+
client *cohereClient.Client
39+
config *commandProcessorConfig
40+
}
41+
3342
//go:generate paramgen -output=paramgen_command.go commandProcessorConfig
3443

3544
type commandProcessor struct {
@@ -40,11 +49,11 @@ type commandProcessor struct {
4049
logger log.CtxLogger
4150
config commandProcessorConfig
4251
backoffCfg *backoff.Backoff
43-
client *cohereClient.Client
52+
client commandModel
4453
}
4554

4655
type commandProcessorConfig struct {
47-
// Model is one of the Cohere model (command,embed,rerank).
56+
// Model is one of the name of a compatible command model version.
4857
Model string `json:"model" default:"command"`
4958
// APIKey is the API key for Cohere api calls.
5059
APIKey string `json:"apiKey" validate:"required"`
@@ -91,8 +100,10 @@ func (p *commandProcessor) Configure(ctx context.Context, cfg config.Config) err
91100
}
92101
p.responseBodyRef = &responseBodyRef
93102

94-
// new cohere client
95-
p.client = cohereClient.NewClient()
103+
p.client = &commandClient{
104+
client: cohereClient.NewClient(),
105+
config: &p.config,
106+
}
96107

97108
p.backoffCfg = &backoff.Backoff{
98109
Factor: p.config.BackoffRetryFactor,
@@ -132,22 +143,9 @@ func (p *commandProcessor) Process(ctx context.Context, records []opencdc.Record
132143
return append(out, sdk.ErrorRecord{Error: fmt.Errorf("failed to resolve reference %v: %w", p.config.RequestBodyRef, err)})
133144
}
134145

146+
content := fmt.Sprintf(p.config.Prompt, p.getInput(requestRef.Get()))
135147
for {
136-
resp, err := p.client.V2.Chat(
137-
ctx,
138-
&cohere.V2ChatRequest{
139-
Model: p.config.Model,
140-
Messages: cohere.ChatMessages{
141-
{
142-
Role: "user",
143-
User: &cohere.UserMessage{Content: &cohere.UserMessageContent{
144-
String: fmt.Sprintf(p.config.Prompt, p.getInput(requestRef.Get())),
145-
}},
146-
},
147-
},
148-
},
149-
cohereClient.WithToken(p.config.APIKey),
150-
)
148+
resp, err := p.client.Command(ctx, content)
151149
attempt := p.backoffCfg.Attempt()
152150
duration := p.backoffCfg.Duration()
153151

@@ -182,40 +180,48 @@ func (p *commandProcessor) Process(ctx context.Context, records []opencdc.Record
182180

183181
p.backoffCfg.Reset()
184182

185-
chatResponse, err := unmarshalChatResponse([]byte(resp.String()))
183+
err = p.setField(&record, p.responseBodyRef, resp)
186184
if err != nil {
187-
return append(out, sdk.ErrorRecord{Error: err})
185+
return append(out, sdk.ErrorRecord{Error: fmt.Errorf("failed setting response body: %w", err)})
188186
}
187+
out = append(out, sdk.SingleRecord(record))
189188

190-
if len(chatResponse.Message.Content) == 1 {
191-
err = p.setField(&record, p.responseBodyRef, chatResponse.Message.Content[0].Text)
192-
if err != nil {
193-
return append(out, sdk.ErrorRecord{Error: fmt.Errorf("failed setting response body: %w", err)})
194-
}
195-
out = append(out, sdk.SingleRecord(record))
196-
}
197189
break
198190
}
199191
}
200192
return out
201193
}
202194

203-
func (p *commandProcessor) setField(r *opencdc.Record, refRes *sdk.ReferenceResolver, data any) error {
204-
if refRes == nil {
205-
return nil
195+
func (cc *commandClient) Command(ctx context.Context, content string) (string, error) {
196+
resp, err := cc.client.V2.Chat(
197+
ctx,
198+
&cohere.V2ChatRequest{
199+
Model: cc.config.Model,
200+
Messages: cohere.ChatMessages{
201+
{
202+
Role: "user",
203+
User: &cohere.UserMessage{Content: &cohere.UserMessageContent{
204+
String: content,
205+
}},
206+
},
207+
},
208+
},
209+
cohereClient.WithToken(cc.config.APIKey),
210+
)
211+
if err != nil {
212+
return "", err
206213
}
207214

208-
ref, err := refRes.Resolve(r)
215+
chatResponse, err := unmarshalChatResponse([]byte(resp.String()))
209216
if err != nil {
210-
return fmt.Errorf("error reference resolver: %w", err)
217+
return "", fmt.Errorf("error unmarshalling chat response: %w", err)
211218
}
212219

213-
err = ref.Set(data)
214-
if err != nil {
215-
return fmt.Errorf("error reference set: %w", err)
220+
if len(chatResponse.Message.Content) != 1 {
221+
return "", fmt.Errorf("invalid chat content")
216222
}
217223

218-
return nil
224+
return chatResponse.Message.Content[0].Text, nil
219225
}
220226

221227
type ChatResponse struct {
@@ -249,3 +255,21 @@ func (p *commandProcessor) getInput(val any) string {
249255
return fmt.Sprintf("%v", v)
250256
}
251257
}
258+
259+
func (p *commandProcessor) setField(r *opencdc.Record, refRes *sdk.ReferenceResolver, data any) error {
260+
if refRes == nil {
261+
return nil
262+
}
263+
264+
ref, err := refRes.Resolve(r)
265+
if err != nil {
266+
return fmt.Errorf("error reference resolver: %w", err)
267+
}
268+
269+
err = ref.Set(data)
270+
if err != nil {
271+
return fmt.Errorf("error reference set: %w", err)
272+
}
273+
274+
return nil
275+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright © 2025 Meroxa, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package cohere
16+
17+
import (
18+
"context"
19+
"fmt"
20+
21+
"github.com/conduitio/conduit-commons/config"
22+
"github.com/conduitio/conduit-commons/opencdc"
23+
sdk "github.com/conduitio/conduit-processor-sdk"
24+
)
25+
26+
func ExamplecommandProcessor() {
27+
p := func() sdk.Processor {
28+
proc := &commandProcessor{}
29+
cfg := config.Config{
30+
commandProcessorConfigApiKey: "apikey",
31+
commandProcessorConfigPrompt: "hello",
32+
}
33+
_ = proc.Configure(context.Background(), cfg)
34+
proc.client = &mockClient{}
35+
return proc
36+
}()
37+
38+
records := []opencdc.Record{{
39+
Operation: opencdc.OperationUpdate,
40+
Position: opencdc.Position("pos-1"),
41+
Payload: opencdc.Change{
42+
After: opencdc.RawData("who are you?"),
43+
},
44+
}}
45+
46+
got := p.Process(context.Background(), records)
47+
rec, _ := got[0].(sdk.SingleRecord)
48+
fmt.Println("processor transformed record:")
49+
fmt.Println(string(opencdc.Record(rec).Bytes()))
50+
51+
// Output:
52+
// processor transformed record:
53+
// {"position":"cG9zLTE=","operation":"update","metadata":null,"key":null,"payload":{"before":null,"after":"Y29oZXJlIGNvbW1hbmQgcmVzcG9uc2UgY29udGVudA=="}}
54+
}
55+
56+
type mockClient struct{}
57+
58+
func (m mockClient) Command(ctx context.Context, content string) (string, error) {
59+
if content == "" {
60+
return "", fmt.Errorf("mocked api error")
61+
}
62+
return "cohere command response content", nil
63+
}

pkg/plugin/processor/builtin/impl/cohere/paramgen_command.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/plugin/processor/builtin/impl/cohere/z_examples_exporter_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2024 Meroxa, Inc.
1+
// Copyright © 2025 Meroxa, Inc.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.

proto/buf.lock

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ deps:
99
- remote: buf.build
1010
owner: googleapis
1111
repository: googleapis
12-
commit: 83c0f6c19b2f4ea0b0fd84a80e753659
13-
digest: shake256:20e034fe1562d2ef4137cd7822344d4c29cff8130e112a16e44bc3102be688e9134fae531ec1de7ae18dd660160649db28d160b006c9c70964e3f72a60dc47e3
12+
commit: 546238c53f7340c6a2a6099fb863bc1b
13+
digest: shake256:8d75c12f391e392b24c076d05117b47aeddb090add99c70247a8f4389b906a65f61a933c68e54ed8b73a050b967b6b712ba194348b67c3ab3ee26cc2cb25852c
1414
- remote: buf.build
1515
owner: grpc-ecosystem
1616
repository: grpc-gateway

0 commit comments

Comments
 (0)