Skip to content

Commit

Permalink
Yet another kafka connector
Browse files Browse the repository at this point in the history
This time with Lua scripting

Need to implement flatbuffer library ourselves sicne string.pack/string.unpack are missing
  • Loading branch information
serprex committed Mar 11, 2024
1 parent 3af43e0 commit deb7189
Show file tree
Hide file tree
Showing 32 changed files with 1,964 additions and 46 deletions.
4 changes: 4 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse"
conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub"
connkafka "github.com/PeerDB-io/peer-flow/connectors/kafka"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
conns3 "github.com/PeerDB-io/peer-flow/connectors/s3"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
Expand Down Expand Up @@ -186,6 +187,8 @@ func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) {
return connsqlserver.NewSQLServerConnector(ctx, inner.SqlserverConfig)
case *protos.Peer_ClickhouseConfig:
return connclickhouse.NewClickhouseConnector(ctx, inner.ClickhouseConfig)
case *protos.Peer_KafkaConfig:
return connkafka.NewKafkaConnector(ctx, inner.KafkaConfig)
default:
return nil, ErrUnsupportedFunctionality
}
Expand Down Expand Up @@ -244,6 +247,7 @@ var (
_ CDCSyncConnector = &connbigquery.BigQueryConnector{}
_ CDCSyncConnector = &connsnowflake.SnowflakeConnector{}
_ CDCSyncConnector = &conneventhub.EventHubConnector{}
_ CDCSyncConnector = &connkafka.KafkaConnector{}
_ CDCSyncConnector = &conns3.S3Connector{}
_ CDCSyncConnector = &connclickhouse.ClickhouseConnector{}

Expand Down
12 changes: 2 additions & 10 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,7 @@ func (c *EventHubConnector) GetLastOffset(ctx context.Context, jobName string) (
}

func (c *EventHubConnector) SetLastOffset(ctx context.Context, jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(ctx, jobName, offset)
if err != nil {
c.logger.Error("failed to update last offset", slog.Any("error", err))
return err
}

return nil
return c.pgMetadata.UpdateLastOffset(ctx, jobName, offset)
}

// returns the number of records synced
Expand Down Expand Up @@ -204,9 +198,7 @@ func (c *EventHubConnector) processBatch(
}

func (c *EventHubConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
batch := req.Records

numRecords, err := c.processBatch(ctx, req.FlowJobName, batch)
numRecords, err := c.processBatch(ctx, req.FlowJobName, req.Records)
if err != nil {
c.logger.Error("failed to process batch", slog.Any("error", err))
return nil, err
Expand Down
227 changes: 227 additions & 0 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package connkafka

import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"log/slog"
"sync"

"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/plugin/kslog"
"github.com/yuin/gopher-lua"
"go.temporal.io/sdk/log"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/pua"
)

type KafkaConnector struct {
client *kgo.Client
pgMetadata *metadataStore.PostgresMetadataStore
logger log.Logger
}

func NewKafkaConnector(
ctx context.Context,
config *protos.KafkaConfig,
) (*KafkaConnector, error) {
optionalOpts := append(
make([]kgo.Opt, 0, 6),
kgo.SeedBrokers(config.Servers...),
kgo.AllowAutoTopicCreation(),
kgo.WithLogger(kslog.New(slog.Default())), // TODO use logger.LoggerFromCtx
kgo.SoftwareNameAndVersion("peerdb", peerdbenv.PeerDBVersionShaShort()),
)
if !config.DisableTls {
optionalOpts = append(optionalOpts, kgo.DialTLSConfig(&tls.Config{MinVersion: tls.VersionTLS13}))
}
if config.Username != "" {
auth := scram.Auth{User: config.Username, Pass: config.Password}
switch config.Sasl {
case "SCRAM-SHA-256":
optionalOpts = append(optionalOpts, kgo.SASL(auth.AsSha256Mechanism()))
case "SCRAM-SHA-512":
optionalOpts = append(optionalOpts, kgo.SASL(auth.AsSha512Mechanism()))
default:
return nil, fmt.Errorf("unsupported SASL mechanism: %s", config.Sasl)
}
}
client, err := kgo.NewClient(optionalOpts...)
if err != nil {
return nil, fmt.Errorf("failed to create kafka client: %w", err)
}

return &KafkaConnector{
client: client,
logger: logger.LoggerFromCtx(ctx),
}, nil
}

func (c *KafkaConnector) Close() error {
if c != nil {
c.client.Close()
}
return nil
}

func (c *KafkaConnector) ConnectionActive(ctx context.Context) error {
return c.client.Ping(ctx)
}

func (c *KafkaConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
return &protos.CreateRawTableOutput{TableIdentifier: "n/a"}, nil
}

func (c *KafkaConnector) GetLastSyncBatchID(ctx context.Context, jobName string) (int64, error) {
return c.pgMetadata.GetLastBatchID(ctx, jobName)
}

func (c *KafkaConnector) GetLastOffset(ctx context.Context, jobName string) (int64, error) {
return c.pgMetadata.FetchLastOffset(ctx, jobName)
}

func (c *KafkaConnector) SetLastOffset(ctx context.Context, jobName string, offset int64) error {
return c.pgMetadata.UpdateLastOffset(ctx, jobName, offset)
}

func (c *KafkaConnector) NeedsSetupMetadataTables(_ context.Context) bool {
return false
}

func (c *KafkaConnector) SetupMetadataTables(_ context.Context) error {
return nil
}

func (c *KafkaConnector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error {
c.logger.Info("ReplayTableSchemaDeltas for event hub is a no-op")
return nil
}

func (c *KafkaConnector) SyncFlowCleanup(ctx context.Context, jobName string) error {
return c.pgMetadata.DropMetadata(ctx, jobName)
}

func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
err := c.client.BeginTransaction()
if err != nil {
return nil, err
}

var wg sync.WaitGroup
wgCtx, wgErr := context.WithCancelCause(ctx)
produceCb := func(r *kgo.Record, err error) {
if err != nil {
wgErr(err)
}
wg.Done()
}

numRecords := int64(0)
tableNameRowsMapping := make(map[string]uint32)

var fn *lua.LFunction
var ls *lua.LState
if req.Script != "" {
ls = lua.NewState(lua.Options{SkipOpenLibs: true})
defer ls.Close()
ls.SetContext(wgCtx)
for _, pair := range []struct {
n string
f lua.LGFunction
}{
{lua.LoadLibName, lua.OpenPackage}, // Must be first
{lua.BaseLibName, lua.OpenBase},
{lua.TabLibName, lua.OpenTable},
{lua.StringLibName, lua.OpenString},
{lua.MathLibName, lua.OpenMath},
} {
ls.Push(ls.NewFunction(pair.f))
ls.Push(lua.LString(pair.n))
err := ls.PCall(1, 0, nil)
if err != nil {
return nil, fmt.Errorf("failed to initialize Lua runtime: %w", err)
}
}
ls.PreloadModule("flatbuffers", pua.FlatBuffers_Loader)
pua.RegisterTypes(ls)
err := ls.DoString(req.Script)
if err != nil {
return nil, fmt.Errorf("error while executing script: %w", err)
}

var ok bool
fn, ok = ls.GetGlobal("onRow").(*lua.LFunction)
if !ok {
return nil, errors.New("script should define `onRow` function")
}
} else {
return nil, errors.New("kafka mirror must have script")
}

for record := range req.Records.GetRecords() {
if err := wgCtx.Err(); err != nil {
return nil, err
}
topic := record.GetDestinationTableName()
ls.Push(fn)
ls.Push(pua.LuaRecord.New(ls, record))
err := ls.PCall(1, 1, nil)
if err != nil {
return nil, fmt.Errorf("script failed: %w", err)
}
value := ls.Get(-1)
if value != lua.LNil {
lstr, ok := value.(lua.LString)
if !ok {
return nil, fmt.Errorf("script returned non-nil non-string: %v", value)
}
wg.Add(1)
c.client.Produce(wgCtx, &kgo.Record{Topic: topic, Value: bytes.Clone([]byte(lstr))}, produceCb)

numRecords += 1
tableNameRowsMapping[topic] += 1
}
}

// TODO handle
waitChan := make(chan struct{})
go func() {
wg.Wait()
waitChan <- struct{}{}
}()
select {
case <-wgCtx.Done():
return nil, wgCtx.Err()
case <-waitChan:
}

if err := c.client.Flush(ctx); err != nil {
return nil, fmt.Errorf("could not flush transaction: %w", err)
}

if err := c.client.EndTransaction(ctx, kgo.TryCommit); err != nil {
return nil, fmt.Errorf("could not commit transaction: %w", err)
}

lastCheckpoint := req.Records.GetLastCheckpoint()
err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint)
if err != nil {
return nil, err
}

return &model.SyncResponse{
CurrentSyncBatchID: req.SyncBatchID,
LastSyncedCheckpointID: lastCheckpoint,
NumRecordsSynced: numRecords,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}
6 changes: 3 additions & 3 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco
rawTableIdentifier := getRawTableIdentifier(req.FlowJobName)
c.logger.Info(fmt.Sprintf("pushing records to Postgres table %s via COPY", rawTableIdentifier))

numRecords := 0
numRecords := int64(0)
tableNameRowsMapping := make(map[string]uint32)

streamReadFunc := func() ([]any, error) {
Expand Down Expand Up @@ -499,7 +499,7 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco
if err != nil {
return nil, fmt.Errorf("error syncing records: %w", err)
}
if syncedRecordsCount != int64(numRecords) {
if syncedRecordsCount != numRecords {
return nil, fmt.Errorf("error syncing records: expected %d records to be synced, but %d were synced",
numRecords, syncedRecordsCount)
}
Expand All @@ -526,7 +526,7 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco

return &model.SyncResponse{
LastSyncedCheckpointID: lastCP,
NumRecordsSynced: int64(numRecords),
NumRecordsSynced: numRecords,
CurrentSyncBatchID: req.SyncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
Expand Down
4 changes: 4 additions & 0 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ require (
github.com/slack-go/slack v0.12.4
github.com/snowflakedb/gosnowflake v1.7.2
github.com/stretchr/testify v1.8.4
github.com/twmb/franz-go v1.16.1
github.com/twmb/franz-go/plugin/kslog v1.0.0
github.com/twpayne/go-geos v0.16.1
github.com/urfave/cli/v3 v3.0.0-alpha9
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a
github.com/yuin/gopher-lua v1.1.1
go.temporal.io/api v1.26.0
go.temporal.io/sdk v1.25.1
go.uber.org/automaxprocs v1.5.3
Expand Down Expand Up @@ -90,6 +93,7 @@ require (
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.48.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.48.0 // indirect
go.opentelemetry.io/otel v1.23.1 // indirect
Expand Down
8 changes: 8 additions & 0 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,12 @@ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE=
github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA=
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
github.com/twmb/franz-go/plugin/kslog v1.0.0 h1:I64oEmF+0PDvmyLgwrlOtg4mfpSE9GwlcLxM4af2t60=
github.com/twmb/franz-go/plugin/kslog v1.0.0/go.mod h1:8pMjK3OJJJNNYddBSbnXZkIK5dCKFIk9GcVVCDgvnQc=
github.com/twpayne/go-geos v0.16.1 h1:2tqZyAtDFxBqKn7HR2/g2tQkC/GuQQ2D8dxgK9BjdV8=
github.com/twpayne/go-geos v0.16.1/go.mod h1:zmBwZNTaMTB1usptcCl4n7FjIDoBi2IGtm6h6nq9G8c=
github.com/urfave/cli/v3 v3.0.0-alpha9 h1:P0RMy5fQm1AslQS+XCmy9UknDXctOmG/q/FZkUFnJSo=
Expand All @@ -404,6 +410,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
Expand Down
Loading

0 comments on commit deb7189

Please sign in to comment.