Skip to content

Commit

Permalink
kafka initial load
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 6, 2024
1 parent e8ebd3c commit 3edd459
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 16 deletions.
3 changes: 2 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,9 @@ var (
_ QRepSyncConnector = &connpostgres.PostgresConnector{}
_ QRepSyncConnector = &connbigquery.BigQueryConnector{}
_ QRepSyncConnector = &connsnowflake.SnowflakeConnector{}
_ QRepSyncConnector = &connclickhouse.ClickhouseConnector{}
_ QRepSyncConnector = &connkafka.KafkaConnector{}
_ QRepSyncConnector = &conns3.S3Connector{}
_ QRepSyncConnector = &connclickhouse.ClickhouseConnector{}
_ QRepSyncConnector = &connelasticsearch.ElasticsearchConnector{}

_ QRepConsolidateConnector = &connsnowflake.SnowflakeConnector{}
Expand Down
14 changes: 0 additions & 14 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"log/slog"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -167,13 +166,11 @@ func lvalueToKafkaRecord(ls *lua.LState, value lua.LValue) (*kgo.Record, error)
}

func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) {
var wg sync.WaitGroup
wgCtx, wgErr := context.WithCancelCause(ctx)
produceCb := func(_ *kgo.Record, err error) {
if err != nil {
wgErr(err)
}
wg.Done()
}

numRecords := atomic.Int64{}
Expand All @@ -197,7 +194,6 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
}
return ls, nil
}, func(krs []*kgo.Record) {
wg.Add(len(krs))
for _, kr := range krs {
c.client.Produce(wgCtx, kr, produceCb)
}
Expand Down Expand Up @@ -296,16 +292,6 @@ Loop:
if err := c.client.Flush(wgCtx); err != nil {
return nil, fmt.Errorf("[kafka] final flush error: %w", err)
}
waitChan := make(chan struct{})
go func() {
wg.Wait()
close(waitChan)
}()
select {
case <-wgCtx.Done():
return nil, wgCtx.Err()
case <-waitChan:
}

lastCheckpoint := req.Records.GetLastCheckpoint()
if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint); err != nil {
Expand Down
140 changes: 140 additions & 0 deletions flow/connectors/kafka/qrep.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package connkafka

import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/twmb/franz-go/pkg/kgo"
lua "github.com/yuin/gopher-lua"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/pua"
)

func (*KafkaConnector) SetupQRepMetadataTables(_ context.Context, _ *protos.QRepConfig) error {
return nil
}

func (c *KafkaConnector) SyncQRepRecords(
ctx context.Context,
config *protos.QRepConfig,
partition *protos.QRepPartition,
stream *model.QRecordStream,
) (int, error) {
startTime := time.Now()
schema := stream.Schema()

wgCtx, wgErr := context.WithCancelCause(ctx)
produceCb := func(_ *kgo.Record, err error) {
if err != nil {
wgErr(err)
}
}

pool, err := utils.LuaPool(func() (*lua.LState, error) {
ls, err := utils.LoadScript(wgCtx, config.Script, func(ls *lua.LState) int {
top := ls.GetTop()
ss := make([]string, top)
for i := range top {
ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String()
}
_ = c.LogFlowInfo(ctx, config.FlowJobName, strings.Join(ss, "\t"))
return 0
})
if err != nil {
return nil, err
}
if config.Script == "" {
ls.Env.RawSetString("onRecord", ls.NewFunction(utils.DefaultOnRecord))
}
return ls, nil
}, func(krs []*kgo.Record) {
for _, kr := range krs {
c.client.Produce(wgCtx, kr, produceCb)
}
})
if err != nil {
return 0, err
}
defer pool.Close()

numRecords := atomic.Int64{}
Loop:
for {
select {
case qrecord, ok := <-stream.Records:
if !ok {
c.logger.Info("flushing batches because no more records")
break Loop
}

pool.Run(func(ls *lua.LState) []*kgo.Record {
items := model.NewRecordItems(len(qrecord))
for i, val := range qrecord {
items.AddColumn(schema.Fields[i].Name, val)
}
record := &model.InsertRecord[model.RecordItems]{
BaseRecord: model.BaseRecord{},
Items: items,
SourceTableName: config.WatermarkTable,
DestinationTableName: config.DestinationTableIdentifier,
CommitID: 0,
}

lfn := ls.Env.RawGetString("onRecord")
fn, ok := lfn.(*lua.LFunction)
if !ok {
wgErr(fmt.Errorf("script should define `onRecord` as function, not %s", lfn))
return nil
}

ls.Push(fn)
ls.Push(pua.LuaRecord.New(ls, record))
err := ls.PCall(1, -1, nil)
if err != nil {
wgErr(fmt.Errorf("script failed: %w", err))
return nil
}

args := ls.GetTop()
results := make([]*kgo.Record, 0, args)
for i := range args {
kr, err := lvalueToKafkaRecord(ls, ls.Get(i-args))
if err != nil {
wgErr(err)
return nil
}
if kr != nil {
if kr.Topic == "" {
kr.Topic = record.GetDestinationTableName()
}
results = append(results, kr)
}
}
ls.SetTop(0)
numRecords.Add(1)
return results
})

case <-wgCtx.Done():
break Loop
}
}

if err := pool.Wait(wgCtx); err != nil {
return 0, err
}
if err := c.client.Flush(wgCtx); err != nil {
return 0, fmt.Errorf("[kafka] final flush error: %w", err)
}

if err := c.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime); err != nil {
return 0, err
}
return int(numRecords.Load()), nil
}
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (s *SnowflakeAvroSyncHandler) SyncQRepRecords(

err = s.connector.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime)
if err != nil {
return -1, err
return 0, err
}

activity.RecordHeartbeat(ctx, "finished syncing records")
Expand Down
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ message QRepConfig {
string soft_delete_col_name = 17;

TypeSystem system = 18;
string script = 19;
}

message QRepPartition {
Expand Down

0 comments on commit 3edd459

Please sign in to comment.