Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert to connect-go based psdb driver #46

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,28 @@ GO ?= go
GO_ENV ?= PS_LOG_LEVEL=debug PS_DEV_MODE=1 CGO_ENABLED=0
GO_RUN := env $(GO_ENV) $(GO) run

OS := $(shell uname)
PROTOC_VERSION=3.20.1
PROTOC_ARCH=x86_64
ifeq ($(OS),Linux)
PROTOC_PLATFORM := linux
UNAME_OS := $(shell uname -s)
UNAME_ARCH := $(shell uname -m)

PROTOC_VERSION=21.3

ifeq ($(UNAME_OS),Darwin)
PROTOC_OS := osx
ifeq ($(UNAME_ARCH),arm64)
PROTOC_ARCH := aarch_64
else
PROTOC_ARCH := x86_64
endif
endif
ifeq ($(UNAME_OS),Linux)
PROTOC_OS = linux
ifeq ($(UNAME_ARCH),aarch64)
PROTOC_ARCH := aarch_64
else
PROTOC_ARCH := $(UNAME_ARCH)
endif
ifeq ($(OS),Darwin)
PROTOC_PLATFORM := osx
endif

PSDBCONNECT_PROTO_OUT := proto/psdbconnect

.PHONY: all
Expand Down Expand Up @@ -75,39 +88,35 @@ $(BIN):
$(BIN)/protoc-gen-go: | $(BIN)
go install google.golang.org/protobuf/cmd/protoc-gen-go

$(BIN)/protoc-gen-go-grpc: | $(BIN)
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc
$(BIN)/protoc-gen-connect-go: | $(BIN)
go install github.com/bufbuild/connect-go/cmd/protoc-gen-connect-go

$(BIN)/protoc-gen-go-vtproto: | $(BIN)
go install github.com/planetscale/vtprotobuf/cmd/protoc-gen-go-vtproto

$(BIN)/protoc-gen-twirp: | $(BIN)
go install github.com/twitchtv/twirp/protoc-gen-twirp

$(BIN)/protoc: | $(BIN)
rm -rf tmp-protoc
mkdir -p tmp-protoc
wget -O tmp-protoc/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v$(PROTOC_VERSION)/protoc-$(PROTOC_VERSION)-$(PROTOC_PLATFORM)-$(PROTOC_ARCH).zip
wget -O tmp-protoc/protoc.zip https://github.com/protocolbuffers/protobuf/releases/download/v$(PROTOC_VERSION)/protoc-$(PROTOC_VERSION)-$(PROTOC_OS)-$(PROTOC_ARCH).zip
unzip -d tmp-protoc tmp-protoc/protoc.zip
mv tmp-protoc/bin/protoc $(BIN)/
rm -rf thirdparty/google/
mv tmp-protoc/include/google/ thirdparty/
rm -rf tmp-protoc

PROTO_TOOLS := $(BIN)/protoc $(BIN)/protoc-gen-go $(BIN)/protoc-gen-go-grpc $(BIN)/protoc-gen-go-vtproto $(BIN)/protoc-gen-twirp
PROTO_TOOLS := $(BIN)/protoc $(BIN)/protoc-gen-go $(BIN)/protoc-gen-connect-go $(BIN)/protoc-gen-go-vtproto

$(PSDBCONNECT_PROTO_OUT)/v1alpha1/psdbconnect.v1alpha1.pb.go: $(PROTO_TOOLS) proto/psdbconnect.v1alpha1.proto
mkdir -p $(PSDBCONNECT_PROTO_OUT)/v1alpha1
$(BIN)/protoc \
--plugin=protoc-gen-go=$(BIN)/protoc-gen-go \
--plugin=protoc-gen-go-grpc=$(BIN)/protoc-gen-go-grpc \
--plugin=protoc-gen-connect-go=$(BIN)/protoc-gen-connect-go \
--plugin=protoc-gen-go-vtproto=$(BIN)/protoc-gen-go-vtproto \
--go_out=$(PSDBCONNECT_PROTO_OUT)/v1alpha1 \
--go-grpc_out=$(PSDBCONNECT_PROTO_OUT)/v1alpha1 \
--connect-go_out=$(PSDBCONNECT_PROTO_OUT)/v1alpha1 \
--go-vtproto_out=$(PSDBCONNECT_PROTO_OUT)/v1alpha1 \
--go_opt=paths=source_relative \
--go-grpc_opt=paths=source_relative \
--go-grpc_opt=require_unimplemented_servers=false \
--connect-go_opt=paths=source_relative \
--go-vtproto_opt=features=marshal+unmarshal+size \
--go-vtproto_opt=paths=source_relative \
-I thirdparty \
Expand Down
29 changes: 18 additions & 11 deletions cmd/internal/mock_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package internal
import (
"context"
"database/sql"
"github.com/bufbuild/connect-go"
psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1"
"google.golang.org/grpc"
"io"
)

type testAirbyteLogger struct {
Expand Down Expand Up @@ -52,28 +51,36 @@ func (testAirbyteLogger) Error(error string) {
}

type clientConnectionMock struct {
syncFn func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error)
syncFn func(ctx context.Context, in *connect.Request[psdbconnect.SyncRequest]) (*connect.ServerStreamForClient[psdbconnect.SyncResponse], error)
syncFnInvoked bool
syncFnInvokedCount int
}

type connectSyncClientMock struct {
lastResponseSent int
syncResponses []*psdbconnect.SyncResponse
grpc.ClientStream
}

func (x *connectSyncClientMock) Recv() (*psdbconnect.SyncResponse, error) {
if x.lastResponseSent >= len(x.syncResponses) {
return nil, io.EOF
func (c *connectSyncClientMock) Receive() bool {
if c.lastResponseSent >= len(c.syncResponses) {
return false
}
x.lastResponseSent += 1
return x.syncResponses[x.lastResponseSent-1], nil
c.lastResponseSent += 1
return true
}
func (c *clientConnectionMock) Sync(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {

func (c *connectSyncClientMock) Msg() *psdbconnect.SyncResponse {
return c.syncResponses[c.lastResponseSent-1]
}

func (c *connectSyncClientMock) Err() error {
return nil
}

func (c *clientConnectionMock) Sync(ctx context.Context, in *connect.Request[psdbconnect.SyncRequest]) (*connect.ServerStreamForClient[psdbconnect.SyncResponse], error) {
c.syncFnInvoked = true
c.syncFnInvokedCount += 1
return c.syncFn(ctx, in, opts...)
return c.syncFn(ctx, in)
}

type mysqlAccessMock struct {
Expand Down
65 changes: 22 additions & 43 deletions cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"strings"
"time"

"github.com/bufbuild/connect-go"
"github.com/pkg/errors"
psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1"
psdbconnectdriver "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1/psdbconnectv1alpha1connect"
"github.com/planetscale/psdb/auth"
grpcclient "github.com/planetscale/psdb/core/pool"
clientoptions "github.com/planetscale/psdb/core/pool/options"
psdbclient "github.com/planetscale/psdb/core/client"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"vitess.io/vitess/go/sqltypes"
Expand All @@ -35,7 +36,7 @@ type PlanetScaleDatabase interface {
type PlanetScaleEdgeDatabase struct {
Logger AirbyteLogger
Mysql PlanetScaleEdgeMysqlAccess
clientFn func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error)
clientFn func(ctx context.Context, ps PlanetScaleSource) (psdbconnectdriver.ConnectClient, error)
}

func (p PlanetScaleEdgeDatabase) CanConnect(ctx context.Context, psc PlanetScaleSource) error {
Expand Down Expand Up @@ -210,23 +211,15 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table

var (
err error
client psdbconnect.ConnectClient
client psdbconnectdriver.ConnectClient
)

if p.clientFn == nil {
conn, err := grpcclient.Dial(ctx, ps.Host,
clientoptions.WithDefaultTLSConfig(),
clientoptions.WithCompression(true),
clientoptions.WithConnectionPool(1),
clientoptions.WithExtraCallOption(
auth.NewBasicAuth(ps.Username, ps.Password).CallOption(),
),
client = psdbclient.New(
ps.Host,
psdbconnectdriver.NewConnectClient,
auth.NewBasicAuth(ps.Username, ps.Password),
)
if err != nil {
return tc, err
}
defer conn.Close()
client = psdbconnect.NewConnectClient(conn)
} else {
client, err = p.clientFn(ctx, ps)
if err != nil {
Expand All @@ -246,7 +239,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
TabletType: tabletType,
}

c, err := client.Sync(ctx, sReq)
c, err := client.Sync(ctx, connect.NewRequest(sReq))
if err != nil {
return tc, err
}
Expand All @@ -259,13 +252,8 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
// stop when we've reached the well known stop position for this sync session.
watchForVgGtidChange := false

for {

res, err := c.Recv()
if err != nil {
return tc, err
}

for c.Receive() {
res := c.Msg()
if res.Cursor != nil {
tc = res.Cursor
}
Expand Down Expand Up @@ -295,6 +283,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
return tc, io.EOF
}
}
return tc, c.Err()
}

func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, shard, keyspace string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType) (string, error) {
Expand All @@ -304,23 +293,15 @@ func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, sh
defer cancel()
var (
err error
client psdbconnect.ConnectClient
client psdbconnectdriver.ConnectClient
)

if p.clientFn == nil {
conn, err := grpcclient.Dial(ctx, ps.Host,
clientoptions.WithDefaultTLSConfig(),
clientoptions.WithCompression(true),
clientoptions.WithConnectionPool(1),
clientoptions.WithExtraCallOption(
auth.NewBasicAuth(ps.Username, ps.Password).CallOption(),
),
client = psdbclient.New(
ps.Host,
psdbconnectdriver.NewConnectClient,
auth.NewBasicAuth(ps.Username, ps.Password),
)
if err != nil {
return "", err
}
defer conn.Close()
client = psdbconnect.NewConnectClient(conn)
} else {
client, err = p.clientFn(ctx, ps)
if err != nil {
Expand All @@ -338,21 +319,19 @@ func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, sh
TabletType: tabletType,
}

c, err := client.Sync(ctx, sReq)
c, err := client.Sync(ctx, connect.NewRequest(sReq))
if err != nil {
return "", nil
}

for {
res, err := c.Recv()
if err != nil {
return "", err
}
for c.Receive() {
res := c.Msg()

if res.Cursor != nil {
return res.Cursor.Position, nil
}
}
return "", c.Err()
}

// printQueryResult will pretty-print an AirbyteRecordMessage to the logger.
Expand Down
9 changes: 5 additions & 4 deletions cmd/internal/planetscale_edge_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/bufbuild/connect-go"
psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
Expand Down Expand Up @@ -38,7 +39,7 @@ func TestRead_CanPeekBeforeRead(t *testing.T) {
}

cc := clientConnectionMock{
syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {
syncFn: func(ctx context.Context, in *connect.Request[psdbconnect.SyncRequest]) (*connect.ServerStreamForClient[psdbconnect.SyncResponse], error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got real lost in the sauce here and gave up with the mocks. With grpc-go, the Connect_SyncClient was an interface, so it was simpler to mock, with connect-go these are a concrete type *connect.ServerStreamForClient[T any], so I'm not sure how to mock this.

return syncClient, nil
},
}
Expand Down Expand Up @@ -82,7 +83,7 @@ func TestRead_CanEarlyExitIfNoNewVGtidInPeek(t *testing.T) {
}

cc := clientConnectionMock{
syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {
syncFn: func(ctx context.Context, in *connect.Request[psdbconnect.SyncRequest]) (*connect.ServerStreamForClient[psdbconnect.SyncResponse], error) {
return syncClient, nil
},
}
Expand Down Expand Up @@ -124,8 +125,8 @@ func TestRead_CanPickPrimaryForShardedKeyspaces(t *testing.T) {
}

cc := clientConnectionMock{
syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {
assert.Equal(t, psdbconnect.TabletType_primary, in.TabletType)
syncFn: func(ctx context.Context, in *connect.Request[psdbconnect.SyncRequest]) (*connect.ServerStreamForClient[psdbconnect.SyncResponse], error) {
assert.Equal(t, psdbconnect.TabletType_primary, in.Msg().TabletType)
return syncClient, nil
},
}
Expand Down
16 changes: 9 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/planetscale/airbyte-source

go 1.17
go 1.18

require (
github.com/go-sql-driver/mysql v1.6.0
Expand All @@ -10,9 +10,10 @@ require (
)

require (
github.com/bufbuild/connect-go v0.2.0
github.com/pkg/errors v0.9.1
github.com/planetscale/psdb v0.0.0-20220429000526-e2a0e798aaf3
google.golang.org/grpc v1.46.0
github.com/planetscale/psdb v0.0.0-20220726233456-0cde790cdd14
google.golang.org/grpc v1.48.0
google.golang.org/protobuf v1.28.0
)

Expand All @@ -33,7 +34,6 @@ require (
github.com/golang/glog v1.0.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
Expand All @@ -42,6 +42,8 @@ require (
github.com/hashicorp/serf v0.9.6 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/connect-compress v0.1.1 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
Expand All @@ -62,13 +64,13 @@ require (
github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect
golang.org/x/net v0.0.0-20220725212005-46097bf591d3 // indirect
golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a // indirect
golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect
google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect
google.golang.org/genproto v0.0.0-20220725144611-272f38e5d71b // indirect
gopkg.in/DataDog/dd-trace-go.v1 v1.38.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
Expand Down
Loading