Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BP: WASM: Implement Configure, Open, Process #1327

Closed
wants to merge 15 commits into from
4 changes: 1 addition & 3 deletions .github/workflows/validate-generated-files.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,5 @@ jobs:
- name: Check generated files
run: |
export PATH=$PATH:$(go env GOPATH)/bin
make install-tools
make generate
make proto-generate
make install-tools generate proto-generate
git diff --exit-code --numstat
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,6 @@ dist/
/conduit.yaml

# Escape Analysis Report
escape_analysis.txt
escape_analysis.txt

pkg/plugin/processor/standalone/test/wasm_processors/*/processor.wasm
2 changes: 1 addition & 1 deletion docs/connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Once you have chosen a connector to be built-in, you can:

- Download the new package and its dependencies: `go get "github.com/foo/conduit-connector-new"`
- Import the Go module defining the connector
into the [builtin registry](https://github.com/ConduitIO/conduit/blob/main/pkg/plugin/builtin/registry.go)
into the [builtin registry](https://github.com/ConduitIO/conduit/blob/main/pkg/plugin/connector/builtin/registry.go)
and add a new key to `DefaultDispenserFactories`:

```diff
Expand Down
22 changes: 11 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
module github.com/conduitio/conduit

go 1.21

toolchain go1.21.1
go 1.21.5

require (
buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.31.0-20231027202514-3f42134f4c56.2
github.com/Masterminds/semver/v3 v3.2.1
github.com/NYTimes/gziphandler v1.1.1
github.com/antchfx/jsonquery v1.3.3
github.com/bufbuild/buf v1.28.1
github.com/conduitio/conduit-commons v0.0.0-20231222154604-f6dfae573b45
github.com/conduitio/conduit-connector-file v0.6.0
github.com/conduitio/conduit-connector-generator v0.5.0
github.com/conduitio/conduit-connector-kafka v0.7.0
Expand All @@ -18,6 +17,7 @@ require (
github.com/conduitio/conduit-connector-protocol v0.5.0
github.com/conduitio/conduit-connector-s3 v0.5.0
github.com/conduitio/conduit-connector-sdk v0.8.0
github.com/conduitio/conduit-processor-sdk v0.0.0-20231228195815-5a4e9b618cc8
github.com/conduitio/yaml/v3 v3.3.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/dop251/goja v0.0.0-20230531210528-d7324b2d74f7
Expand All @@ -27,7 +27,7 @@ require (
github.com/golang/mock v1.6.0
github.com/golangci/golangci-lint v1.55.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.4.0
github.com/google/uuid v1.5.0
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1
github.com/hamba/avro/v2 v2.18.0
Expand All @@ -44,15 +44,16 @@ require (
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.45.0
github.com/rs/zerolog v1.31.0
github.com/tetratelabs/wazero v1.5.0
github.com/twmb/go-cache v1.2.0
go.uber.org/goleak v1.3.0
go.uber.org/mock v0.3.0
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa
golang.org/x/tools v0.16.0
golang.org/x/tools v0.16.1
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2
google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.1-0.20231027082548-f4a6c1f6e5c1
google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0
google.golang.org/grpc v1.60.0
google.golang.org/protobuf v1.32.0
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
)

Expand Down Expand Up @@ -298,7 +299,6 @@ require (
github.com/t-yuki/gocover-cobertura v0.0.0-20180217150009-aaee18c8195c // indirect
github.com/tdakkota/asciicheck v0.2.0 // indirect
github.com/tetafro/godot v1.4.15 // indirect
github.com/tetratelabs/wazero v1.5.0 // indirect
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 // indirect
github.com/timonwong/loggercheck v0.9.4 // indirect
github.com/tomarrell/wrapcheck/v2 v2.8.1 // indirect
Expand Down Expand Up @@ -335,8 +335,8 @@ require (
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.4.0 // indirect
google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/genproto v0.0.0-20231211222908-989df2bf70f3 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231211222908-989df2bf70f3 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
32 changes: 18 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,8 @@ github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWH
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/conduitio/conduit-commons v0.0.0-20231222154604-f6dfae573b45 h1:GJhyDjoyswfJdqKyszQvevOw/xAz6ilNgN8q6T0lDeQ=
github.com/conduitio/conduit-commons v0.0.0-20231222154604-f6dfae573b45/go.mod h1:+bKjNjWYkbc/MMkBFSzND09FxpPD3GqjmPRQMndkvZ0=
github.com/conduitio/conduit-connector-file v0.6.0 h1:8tsGeGhKvFwYQZztOOL5/tmOhVShsfo9lQ3b/0fX8kQ=
github.com/conduitio/conduit-connector-file v0.6.0/go.mod h1:ju7PiB4kTJgqng4KVXDt/Gvw/53kFwSzi5Ez9EDXxNI=
github.com/conduitio/conduit-connector-generator v0.5.0 h1:zpXHif89DCJ13nftKLv31uI2AJGicpY5H1V7SwldRNo=
Expand All @@ -1036,6 +1038,8 @@ github.com/conduitio/conduit-connector-s3 v0.5.0 h1:nqLcf/foYnDLkXWYcWJX/5UHzTjW
github.com/conduitio/conduit-connector-s3 v0.5.0/go.mod h1:I6oE37zz25RTjnQiUBz2rOASwXNqfaMW7gSlsKX6z8E=
github.com/conduitio/conduit-connector-sdk v0.8.0 h1:gvchqoj5d3AQsBoIosx4i32L8Ex9+5BuAyHi/IM9VD4=
github.com/conduitio/conduit-connector-sdk v0.8.0/go.mod h1:nOz4K3X6fD8YMe5CPbULwSEE18Eu02ZrpT6o6KwQfxs=
github.com/conduitio/conduit-processor-sdk v0.0.0-20231228195815-5a4e9b618cc8 h1:JaaDBQWmB/APPuJ3OQvvN8c8UHaNYCPvUXDs+w1NfUw=
github.com/conduitio/conduit-processor-sdk v0.0.0-20231228195815-5a4e9b618cc8/go.mod h1:vzR6okyqvGBHlnFvBMkeZ1oMj8VaGSPk7mE3eJnndKE=
github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI=
github.com/conduitio/yaml/v3 v3.3.0/go.mod h1:JNgFMOX1t8W4YJuRZOh6GggVtSMsgP9XgTw+7dIenpc=
github.com/containerd/stargz-snapshotter/estargz v0.15.1 h1:eXJjw9RbkLFgioVaTG+G/ZW/0kEe2oEKCdS/ZxIyoCU=
Expand Down Expand Up @@ -1340,8 +1344,8 @@ github.com/google/s2a-go v0.1.4/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkj
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.0.0-20220520183353-fd19c99a87aa/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8=
github.com/googleapis/enterprise-certificate-proxy v0.1.0/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8=
github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg=
Expand Down Expand Up @@ -2423,8 +2427,8 @@ golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
golang.org/x/tools v0.8.0/go.mod h1:JxBZ99ISMI5ViVkT1tr6tdNmXeTrcpVSD3vZ1RsRdN4=
golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
golang.org/x/tools v0.10.0/go.mod h1:UJwyiVBsOA2uwvK/e5OY3GTpDUJriEd+/YlqAwLPmyM=
golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM=
golang.org/x/tools v0.16.0/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA=
golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down Expand Up @@ -2661,8 +2665,8 @@ google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5/go.mod h1:oH/ZOT02
google.golang.org/genproto v0.0.0-20230821184602-ccc8af3d0e93/go.mod h1:yZTlhN0tQnXo3h00fuXNCxJdLdIdnVFVBaRJ5LWBbw4=
google.golang.org/genproto v0.0.0-20230913181813-007df8e322eb/go.mod h1:yZTlhN0tQnXo3h00fuXNCxJdLdIdnVFVBaRJ5LWBbw4=
google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97/go.mod h1:t1VqOqqvce95G3hIDCT5FeO3YUc6Q4Oe24L/+rNMxRk=
google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405 h1:I6WNifs6pF9tNdSob2W24JtyxIYjzFB9qDlpUC76q+U=
google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405/go.mod h1:3WDQMjmJk36UQhjQ89emUzb1mdaHcPeeAh4SCBKznB4=
google.golang.org/genproto v0.0.0-20231211222908-989df2bf70f3 h1:1hfbdAfFbkmpg41000wDVqr7jUpK/Yo+LPnIxxGzmkg=
google.golang.org/genproto v0.0.0-20231211222908-989df2bf70f3/go.mod h1:5RBcpGRxr25RbDzY5w+dmaqpSEvl8Gwl1x2CICf60ic=
google.golang.org/genproto/googleapis/api v0.0.0-20230525234020-1aefcd67740a/go.mod h1:ts19tUU+Z0ZShN1y3aPyq2+O3d5FUNNgT6FtOzmrNn8=
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig=
google.golang.org/genproto/googleapis/api v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig=
Expand All @@ -2673,8 +2677,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98/go.
google.golang.org/genproto/googleapis/api v0.0.0-20230726155614-23370e0ffb3e/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ=
google.golang.org/genproto/googleapis/api v0.0.0-20230803162519-f966b187b2e5/go.mod h1:5DZzOUPCLYL3mNkQ0ms0F3EuUNZ7py1Bqeq6sxzI7/Q=
google.golang.org/genproto/googleapis/api v0.0.0-20230920204549-e6e6cdab5c13/go.mod h1:RdyHbowztCGQySiCvQPgWQWgWhGnouTdCflKoDBt32U=
google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 h1:JpwMPBpFN3uKhdaekDpiNlImDdkUAyiJ6ez/uxGaUSo=
google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:0xJLfVdJqpAPl8tDg1ujOCGzx6LFLttXT5NhllGOXY4=
google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0 h1:s1w3X6gQxwrLEpxnLd/qXTVLgQE2yXwaOaoa6IlY/+o=
google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0/go.mod h1:CAny0tYF+0/9rmDB9fahA9YLzX3+AEVl1qXbv5hhj6c=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:ylj+BE99M198VPbBh6A8d9n3w8fChvyLK3wwBOjXBFA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234015-3fc162c6f38a/go.mod h1:xURIpW9ES5+/GZhnV6beoEtxQrnkRGIfP5VQG2tCBLc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
Expand All @@ -2688,8 +2692,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20230803162519-f966b187b2e5/go.
google.golang.org/genproto/googleapis/rpc v0.0.0-20230920183334-c177e329c48b/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13/go.mod h1:KSqppvjFjtoCI+KGd4PELB0qLNxdJHRGqRI09mB6pQA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a/go.mod h1:4cYg8o5yUbm77w8ZX00LhMVNl/YVBFJRYWDc0uYWMs0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231211222908-989df2bf70f3 h1:kzJAXnzZoFbe5bhZd4zjUuHos/I31yH4thfMb/13oVY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231211222908-989df2bf70f3/go.mod h1:eJVxU6o+4G1PSczBr85xmyvSNYAKvAYgkub40YGomFM=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
Expand Down Expand Up @@ -2736,8 +2740,8 @@ google.golang.org/grpc v1.56.1/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpX
google.golang.org/grpc v1.56.2/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/grpc v1.60.0 h1:6FQAR0kM31P6MRdeluor2w2gPaS4SVNrD/DNTxrQ15k=
google.golang.org/grpc v1.60.0/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
Expand All @@ -2757,8 +2761,8 @@ google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
google.golang.org/protobuf v1.29.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.31.1-0.20231027082548-f4a6c1f6e5c1 h1:fk72uXZyuZiTtW5tgd63jyVK6582lF61nRC/kGv6vCA=
google.golang.org/protobuf v1.31.1-0.20231027082548-f4a6c1f6e5c1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
2 changes: 1 addition & 1 deletion pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/database"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/builtin"
"github.com/conduitio/conduit/pkg/plugin/connector/builtin"
"github.com/conduitio/conduit/pkg/processor"
"github.com/rs/zerolog"
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ import (
"github.com/conduitio/conduit/pkg/orchestrator"
"github.com/conduitio/conduit/pkg/pipeline"
"github.com/conduitio/conduit/pkg/plugin"
"github.com/conduitio/conduit/pkg/plugin/builtin"
"github.com/conduitio/conduit/pkg/plugin/standalone"
"github.com/conduitio/conduit/pkg/plugin/connector/builtin"
"github.com/conduitio/conduit/pkg/plugin/connector/standalone"
"github.com/conduitio/conduit/pkg/processor"
"github.com/conduitio/conduit/pkg/provisioning"
"github.com/conduitio/conduit/pkg/web/api"
Expand Down
7 changes: 4 additions & 3 deletions pkg/connector/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin"
connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"
"github.com/conduitio/conduit/pkg/record"
)

type Destination struct {
Instance *Instance

dispenser plugin.Dispenser
plugin plugin.DestinationPlugin
dispenser connectorPlugin.Dispenser
plugin connectorPlugin.DestinationPlugin

// errs is used to signal the node that the connector experienced an error
// when it was processing something asynchronously (e.g. persisting state).
Expand All @@ -37,7 +38,7 @@ type Destination struct {
// stopStream is a function that closes the context of the stream
stopStream context.CancelFunc

// wg tracks the number of in flight calls to the plugin.
// wg tracks the number of in flight calls to the connectorPlugin.
wg sync.WaitGroup
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/database/inmemory"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin"
"github.com/conduitio/conduit/pkg/plugin/mock"
"github.com/conduitio/conduit/pkg/plugin/connector/mock"
"github.com/matryer/is"
"go.uber.org/mock/gomock"
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/connector/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/inspector"
"github.com/conduitio/conduit/pkg/plugin"
connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"
)

const (
Expand Down Expand Up @@ -86,7 +86,7 @@ type Connector interface {

// PluginDispenserFetcher can fetch a plugin dispenser.
type PluginDispenserFetcher interface {
NewDispenser(logger log.CtxLogger, name string) (plugin.Dispenser, error)
NewDispenser(logger log.CtxLogger, name string) (connectorPlugin.Dispenser, error)
}

func (i *Instance) Init(logger log.CtxLogger, persister *Persister) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/connector/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ package connector
import (
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin"
connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"
)

// fakePluginFetcher fulfills the PluginFetcher interface.
type fakePluginFetcher map[string]plugin.Dispenser
type fakePluginFetcher map[string]connectorPlugin.Dispenser

func (fpf fakePluginFetcher) NewDispenser(_ log.CtxLogger, name string) (plugin.Dispenser, error) {
func (fpf fakePluginFetcher) NewDispenser(_ log.CtxLogger, name string) (connectorPlugin.Dispenser, error) {
plug, ok := fpf[name]
if !ok {
return nil, plugin.ErrPluginNotFound
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/database/inmemory"
"github.com/conduitio/conduit/pkg/foundation/database/mock"
"github.com/conduitio/conduit/pkg/foundation/log"
pmock "github.com/conduitio/conduit/pkg/plugin/mock"
pmock "github.com/conduitio/conduit/pkg/plugin/connector/mock"
"github.com/conduitio/conduit/pkg/record"
"github.com/google/uuid"
"github.com/matryer/is"
Expand Down
7 changes: 4 additions & 3 deletions pkg/connector/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin"
connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector"
"github.com/conduitio/conduit/pkg/record"
)

type Source struct {
Instance *Instance

dispenser plugin.Dispenser
plugin plugin.SourcePlugin
dispenser connectorPlugin.Dispenser
plugin connectorPlugin.SourcePlugin

// errs is used to signal the node that the connector experienced an error
// when it was processing something asynchronously (e.g. persisting state).
Expand All @@ -37,7 +38,7 @@ type Source struct {
// stopStream is a function that closes the context of the stream
stopStream context.CancelFunc

// wg tracks the number of in flight calls to the plugin.
// wg tracks the number of in flight calls to the connectorPlugin.
wg sync.WaitGroup
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/database/inmemory"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin"
"github.com/conduitio/conduit/pkg/plugin/mock"
"github.com/conduitio/conduit/pkg/plugin/connector/mock"
"github.com/conduitio/conduit/pkg/record"
"github.com/matryer/is"
"go.uber.org/mock/gomock"
Expand Down
10 changes: 3 additions & 7 deletions pkg/orchestrator/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,12 @@ func (c *ConnectorOrchestrator) Validate(
plugin string,
config connector.Config,
) error {
d, err := c.plugins.NewDispenser(c.logger, plugin)
if err != nil {
return cerrors.Errorf("couldn't get dispenser: %w", err)
}

var err error
switch t {
case connector.TypeSource:
err = c.plugins.ValidateSourceConfig(ctx, d, config.Settings)
err = c.plugins.ValidateSourceConfig(ctx, plugin, config.Settings)
case connector.TypeDestination:
err = c.plugins.ValidateDestinationConfig(ctx, d, config.Settings)
err = c.plugins.ValidateDestinationConfig(ctx, plugin, config.Settings)
default:
return cerrors.New("invalid connector type")
}
Expand Down
Loading