Skip to content

Commit 7b3087e

Browse files
harisolovromazgonmaha-hajja
authored
BP: WASM: Load available WASM processors (#1322)
Co-authored-by: Maha Hajja <[email protected]> --------- Co-authored-by: Lovro Mažgon <[email protected]> Co-authored-by: Maha Hajja <[email protected]>
1 parent 7782b08 commit 7b3087e

28 files changed

+2234
-40
lines changed

.github/workflows/validate-generated-files.yml

+1-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,5 @@ jobs:
2525
- name: Check generated files
2626
run: |
2727
export PATH=$PATH:$(go env GOPATH)/bin
28-
make install-tools
29-
make generate
30-
make proto-generate
28+
make install-tools generate proto-generate
3129
git diff --exit-code --numstat

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,6 @@ escape_analysis.txt
8686

8787
# Profiles
8888
*.prof
89+
90+
# Compiled test wasm processors
91+
pkg/plugin/processor/standalone/test/wasm_processors/*/processor.wasm

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ install-tools: download
6565
@go mod tidy
6666

6767
generate:
68-
go generate ./...
68+
go generate -x ./...
6969

7070
pkg/web/ui/dist:
7171
make ui-dist

go.mod

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/conduitio/conduit
22

3-
go 1.21.1
3+
go 1.21.5
44

55
require (
66
buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.32.0-20231027202514-3f42134f4c56.1
@@ -17,6 +17,7 @@ require (
1717
github.com/conduitio/conduit-connector-protocol v0.5.1-0.20240104160905-e9e61586fb8d
1818
github.com/conduitio/conduit-connector-s3 v0.5.1
1919
github.com/conduitio/conduit-connector-sdk v0.8.0
20+
github.com/conduitio/conduit-processor-sdk v0.0.0-20240118151737-a75ea9e86bb8
2021
github.com/conduitio/yaml/v3 v3.3.0
2122
github.com/dgraph-io/badger/v4 v4.2.0
2223
github.com/dop251/goja v0.0.0-20230531210528-d7324b2d74f7
@@ -42,6 +43,8 @@ require (
4243
github.com/prometheus/client_model v0.5.0
4344
github.com/prometheus/common v0.45.0
4445
github.com/rs/zerolog v1.31.0
46+
github.com/stealthrocket/wazergo v0.19.1
47+
github.com/tetratelabs/wazero v1.5.0
4548
github.com/twmb/go-cache v1.2.0
4649
go.uber.org/goleak v1.3.0
4750
go.uber.org/mock v0.4.0
@@ -296,7 +299,6 @@ require (
296299
github.com/t-yuki/gocover-cobertura v0.0.0-20180217150009-aaee18c8195c // indirect
297300
github.com/tdakkota/asciicheck v0.2.0 // indirect
298301
github.com/tetafro/godot v1.4.15 // indirect
299-
github.com/tetratelabs/wazero v1.5.0 // indirect
300302
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 // indirect
301303
github.com/timonwong/loggercheck v0.9.4 // indirect
302304
github.com/tomarrell/wrapcheck/v2 v2.8.1 // indirect

go.sum

+4
Original file line numberDiff line numberDiff line change
@@ -1101,6 +1101,8 @@ github.com/conduitio/conduit-connector-s3 v0.5.1 h1:yRo8004ryCIZc/S3iWQ1rN6pm6bj
11011101
github.com/conduitio/conduit-connector-s3 v0.5.1/go.mod h1:nbxzsyS95gbFJ28Job9vFFB+byRFINSv70/13Yi4mKQ=
11021102
github.com/conduitio/conduit-connector-sdk v0.8.0 h1:gvchqoj5d3AQsBoIosx4i32L8Ex9+5BuAyHi/IM9VD4=
11031103
github.com/conduitio/conduit-connector-sdk v0.8.0/go.mod h1:nOz4K3X6fD8YMe5CPbULwSEE18Eu02ZrpT6o6KwQfxs=
1104+
github.com/conduitio/conduit-processor-sdk v0.0.0-20240118151737-a75ea9e86bb8 h1:H6Px/c38KiId1XDsb4agp25wOlMsZM2rp4p2kxlHDKM=
1105+
github.com/conduitio/conduit-processor-sdk v0.0.0-20240118151737-a75ea9e86bb8/go.mod h1:k0rpE3kOAyDcIsBbS5vMO035XzDGW9FJsC4sgEXCH8Y=
11041106
github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI=
11051107
github.com/conduitio/yaml/v3 v3.3.0/go.mod h1:JNgFMOX1t8W4YJuRZOh6GggVtSMsgP9XgTw+7dIenpc=
11061108
github.com/containerd/stargz-snapshotter/estargz v0.15.1 h1:eXJjw9RbkLFgioVaTG+G/ZW/0kEe2oEKCdS/ZxIyoCU=
@@ -1957,6 +1959,8 @@ github.com/ssgreg/nlreturn/v2 v2.2.1 h1:X4XDI7jstt3ySqGU86YGAURbxw3oTDPK9sPEi6YE
19571959
github.com/ssgreg/nlreturn/v2 v2.2.1/go.mod h1:E/iiPB78hV7Szg2YfRgyIrk1AD6JVMTRkkxBiELzh2I=
19581960
github.com/stbenjam/no-sprintf-host-port v0.1.1 h1:tYugd/yrm1O0dV+ThCbaKZh195Dfm07ysF0U6JQXczc=
19591961
github.com/stbenjam/no-sprintf-host-port v0.1.1/go.mod h1:TLhvtIvONRzdmkFiio4O8LHsN9N74I+PhRquPsxpL0I=
1962+
github.com/stealthrocket/wazergo v0.19.1 h1:BPrITETPgSFwiytwmToO0MbUC/+RGC39JScz1JmmG6c=
1963+
github.com/stealthrocket/wazergo v0.19.1/go.mod h1:riI0hxw4ndZA5e6z7PesHg2BtTftcZaMxRcoiGGipTs=
19601964
github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs=
19611965
github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
19621966
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

pkg/foundation/cerrors/cerrors.go

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ var (
3737
Is = errors.Is
3838
As = errors.As
3939
Unwrap = errors.Unwrap
40+
Join = errors.Join
4041
)
4142

4243
type Frame struct {

pkg/foundation/log/ctxlogger.go

+22
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ package log
1616

1717
import (
1818
"context"
19+
"reflect"
20+
"strings"
21+
"testing"
1922

2023
"github.com/conduitio/conduit/pkg/foundation/cerrors"
2124
"github.com/rs/zerolog"
@@ -46,6 +49,11 @@ func Nop() CtxLogger {
4649
return CtxLogger{Logger: zerolog.Nop()}
4750
}
4851

52+
// Test returns a test logger that writes to the supplied testing.TB.
53+
func Test(t testing.TB) CtxLogger {
54+
return CtxLogger{Logger: zerolog.New(zerolog.NewTestWriter(t))}
55+
}
56+
4957
// InitLogger returns a logger initialized with the wanted level and format
5058
func InitLogger(level zerolog.Level, f Format) CtxLogger {
5159
var w = GetWriter(f)
@@ -67,6 +75,20 @@ func (l CtxLogger) WithComponent(component string) CtxLogger {
6775
return l
6876
}
6977

78+
func (l CtxLogger) WithComponentFromType(c any) CtxLogger {
79+
cType := reflect.TypeOf(c)
80+
for cType.Kind() == reflect.Ptr || cType.Kind() == reflect.Interface {
81+
cType = cType.Elem()
82+
}
83+
84+
pkgPath := cType.PkgPath()
85+
pkgPath = strings.TrimPrefix(pkgPath, "github.com/conduitio/conduit/pkg/")
86+
pkgPath = strings.ReplaceAll(pkgPath, "/", ".")
87+
typeName := cType.Name()
88+
l.component = pkgPath + "." + typeName
89+
return l
90+
}
91+
7092
func (l CtxLogger) Component() string {
7193
return l.component
7294
}

pkg/foundation/log/ctxlogger_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,20 @@ func TestCtxLoggerComponent(t *testing.T) {
4141
is.Equal(`{"level":"info","component":"test","message":"testing component"}`+"\n", got)
4242
}
4343

44+
type testComponent struct{}
45+
46+
func TestCtxLoggerComponentFromType(t *testing.T) {
47+
is := is.New(t)
48+
49+
logger := New(zerolog.New(zerolog.NewTestWriter(t)))
50+
51+
logger = logger.WithComponentFromType(testComponent{})
52+
is.Equal("foundation.log.testComponent", logger.Component())
53+
54+
logger = logger.WithComponentFromType(&testComponent{})
55+
is.Equal("foundation.log.testComponent", logger.Component())
56+
}
57+
4458
func TestCtxLoggerWithoutHooks(t *testing.T) {
4559
ctx := context.Background()
4660

pkg/foundation/log/fields.go

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ const (
2222
NodeIDField = "node_id"
2323
ParallelWorkerIDField = "parallel_worker_id"
2424
PipelineIDField = "pipeline_id"
25+
ProcessorIDField = "processor_id"
2526
RecordPositionField = "record_position"
2627
RequestIDField = "request_id"
2728
ServerAddressField = "address"

pkg/plugin/connector/builtin/registry.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func NewDispenserFactory(conn sdk.Connector) DispenserFactory {
8585
}
8686

8787
func NewRegistry(logger log.CtxLogger, factories map[string]DispenserFactory) *Registry {
88-
logger = logger.WithComponent("builtin.Registry")
88+
logger = logger.WithComponentFromType(Registry{})
8989
buildInfo, ok := debug.ReadBuildInfo()
9090
if !ok {
9191
// we are using modules, build info should always be available, we are staying on the safe side

pkg/plugin/connector/standalone/registry.go

+32-32
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,14 @@ type blueprint struct {
5050

5151
func NewRegistry(logger log.CtxLogger, pluginDir string) *Registry {
5252
r := &Registry{
53-
logger: logger.WithComponent("standalone.Registry"),
53+
logger: logger.WithComponentFromType(Registry{}),
5454
}
5555

5656
if pluginDir != "" {
5757
// extract absolute path to make it clearer in the logs what directory is used
5858
absPluginDir, err := filepath.Abs(pluginDir)
5959
if err != nil {
60-
r.logger.Warn(context.Background()).Err(err).Msg("could not extract absolute plugins path")
60+
r.logger.Warn(context.Background()).Err(err).Msg("could not extract absolute connector plugins path")
6161
} else {
6262
r.pluginDir = absPluginDir // store plugin dir for hot reloads
6363
r.reloadPlugins()
@@ -67,15 +67,11 @@ func NewRegistry(logger log.CtxLogger, pluginDir string) *Registry {
6767
r.logger.Info(context.Background()).
6868
Str(log.PluginPathField, r.pluginDir).
6969
Int("count", len(r.List())).
70-
Msg("standalone plugins initialized")
70+
Msg("standalone connector plugins initialized")
7171

7272
return r
7373
}
7474

75-
func newFullName(pluginName, pluginVersion string) plugin.FullName {
76-
return plugin.NewFullName(plugin.PluginTypeStandalone, pluginName, pluginVersion)
77-
}
78-
7975
func (r *Registry) reloadPlugins() {
8076
plugins := r.loadPlugins(context.Background(), r.pluginDir)
8177
r.m.Lock()
@@ -84,19 +80,19 @@ func (r *Registry) reloadPlugins() {
8480
}
8581

8682
func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string]map[string]blueprint {
87-
r.logger.Debug(ctx).Msgf("loading plugins from directory %v", pluginDir)
83+
r.logger.Debug(ctx).Msgf("loading connector plugins from directory %v", pluginDir)
8884
plugins := make(map[string]map[string]blueprint)
8985

9086
dirEntries, err := os.ReadDir(pluginDir)
9187
if err != nil {
92-
r.logger.Warn(ctx).Err(err).Msg("could not read plugin directory")
88+
r.logger.Warn(ctx).Err(err).Msg("could not read connector plugin directory")
9389
return plugins // return empty map
9490
}
9591
warn := func(ctx context.Context, err error, pluginPath string) {
9692
r.logger.Warn(ctx).
9793
Err(err).
9894
Str(log.PluginPathField, pluginPath).
99-
Msgf("could not load standalone plugin")
95+
Msgf("could not load standalone connector plugin")
10096
}
10197

10298
for _, dirEntry := range dirEntries {
@@ -107,24 +103,8 @@ func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string
107103

108104
pluginPath := path.Join(pluginDir, dirEntry.Name())
109105

110-
// create dispenser without a logger to not spam logs on refresh
111-
dispenser, err := standalonev1.NewDispenser(zerolog.Nop(), pluginPath)
112-
if err != nil {
113-
err = cerrors.Errorf("failed to create dispenser: %w", err)
114-
warn(ctx, err, pluginPath)
115-
continue
116-
}
117-
118-
specPlugin, err := dispenser.DispenseSpecifier()
119-
if err != nil {
120-
err = cerrors.Errorf("failed to dispense specifier (tip: check if the file is a valid plugin binary and if you have permissions for running it): %w", err)
121-
warn(ctx, err, pluginPath)
122-
continue
123-
}
124-
125-
specs, err := specPlugin.Specify()
106+
specs, err := r.loadSpecifications(pluginPath)
126107
if err != nil {
127-
err = cerrors.Errorf("failed to get specs: %w", err)
128108
warn(ctx, err, pluginPath)
129109
continue
130110
}
@@ -135,9 +115,9 @@ func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string
135115
plugins[specs.Name] = versionMap
136116
}
137117

138-
fullName := newFullName(specs.Name, specs.Version)
118+
fullName := plugin.NewFullName(plugin.PluginTypeStandalone, specs.Name, specs.Version)
139119
if conflict, ok := versionMap[specs.Version]; ok {
140-
err = cerrors.Errorf("conflict detected, plugin %v already registered, please remove either %v or %v, these plugins won't be usable until that happens", fullName, conflict.path, pluginPath)
120+
err = cerrors.Errorf("conflict detected, connector plugin %v already registered, please remove either %v or %v, these plugins won't be usable until that happens", fullName, conflict.path, pluginPath)
141121
warn(ctx, err, pluginPath)
142122
// delete plugin from map at the end so that further duplicates can
143123
// still be found
@@ -163,18 +143,38 @@ func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string
163143
r.logger.Debug(ctx).
164144
Str(log.PluginPathField, pluginPath).
165145
Str(log.PluginNameField, string(bp.fullName)).
166-
Msg("set plugin as latest")
146+
Msg("set connector plugin as latest")
167147
}
168148

169149
r.logger.Debug(ctx).
170150
Str(log.PluginPathField, pluginPath).
171151
Str(log.PluginNameField, string(bp.fullName)).
172-
Msg("loaded standalone plugin")
152+
Msg("loaded standalone connector plugin")
173153
}
174154

175155
return plugins
176156
}
177157

158+
func (r *Registry) loadSpecifications(pluginPath string) (connector.Specification, error) {
159+
// create dispenser without a logger to not spam logs on refresh
160+
dispenser, err := standalonev1.NewDispenser(zerolog.Nop(), pluginPath)
161+
if err != nil {
162+
return connector.Specification{}, cerrors.Errorf("failed to create connector dispenser: %w", err)
163+
}
164+
165+
specPlugin, err := dispenser.DispenseSpecifier()
166+
if err != nil {
167+
return connector.Specification{}, cerrors.Errorf("failed to dispense connector specifier (tip: check if the file is a valid connector plugin binary and if you have permissions for running it): %w", err)
168+
}
169+
170+
specs, err := specPlugin.Specify()
171+
if err != nil {
172+
return connector.Specification{}, cerrors.Errorf("failed to get connector specs: %w", err)
173+
}
174+
175+
return specs, nil
176+
}
177+
178178
func (r *Registry) NewDispenser(logger log.CtxLogger, fullName plugin.FullName) (connector.Dispenser, error) {
179179
r.m.RLock()
180180
defer r.m.RUnlock()
@@ -189,7 +189,7 @@ func (r *Registry) NewDispenser(logger log.CtxLogger, fullName plugin.FullName)
189189
for k := range versionMap {
190190
availableVersions = append(availableVersions, k)
191191
}
192-
return nil, cerrors.Errorf("could not find standalone plugin, only found versions %v: %w", availableVersions, plugin.ErrPluginNotFound)
192+
return nil, cerrors.Errorf("could not find standalone connector plugin, only found versions %v: %w", availableVersions, plugin.ErrPluginNotFound)
193193
}
194194

195195
return standalonev1.NewDispenser(logger.ZerologWithComponent(), bp.path)

0 commit comments

Comments
 (0)