Skip to content

Commit

Permalink
initial load for queues (#1675)
Browse files Browse the repository at this point in the history
Turns out waitgroup unnecessary since kgo flush handles wait, so removed
Meanwhile, potential send-after-close found in pubsub, so fix

EventHubs is complicated, so skipped. Can be followup
  • Loading branch information
serprex authored May 7, 2024
1 parent 5104b00 commit eae0f32
Show file tree
Hide file tree
Showing 10 changed files with 459 additions and 86 deletions.
3 changes: 2 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,9 @@ var (
_ QRepSyncConnector = &connpostgres.PostgresConnector{}
_ QRepSyncConnector = &connbigquery.BigQueryConnector{}
_ QRepSyncConnector = &connsnowflake.SnowflakeConnector{}
_ QRepSyncConnector = &connclickhouse.ClickhouseConnector{}
_ QRepSyncConnector = &connkafka.KafkaConnector{}
_ QRepSyncConnector = &conns3.S3Connector{}
_ QRepSyncConnector = &connclickhouse.ClickhouseConnector{}
_ QRepSyncConnector = &connelasticsearch.ElasticsearchConnector{}

_ QRepConsolidateConnector = &connsnowflake.SnowflakeConnector{}
Expand Down
57 changes: 26 additions & 31 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"log/slog"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -166,42 +165,48 @@ func lvalueToKafkaRecord(ls *lua.LState, value lua.LValue) (*kgo.Record, error)
return kr, nil
}

func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) {
var wg sync.WaitGroup
wgCtx, wgErr := context.WithCancelCause(ctx)
func (c *KafkaConnector) createPool(
ctx context.Context,
script string,
flowJobName string,
queueErr func(error),
) (*utils.LPool[[]*kgo.Record], error) {
produceCb := func(_ *kgo.Record, err error) {
if err != nil {
wgErr(err)
queueErr(err)
}
wg.Done()
}

numRecords := atomic.Int64{}
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)

pool, err := utils.LuaPool(func() (*lua.LState, error) {
ls, err := utils.LoadScript(wgCtx, req.Script, func(ls *lua.LState) int {
return utils.LuaPool(func() (*lua.LState, error) {
ls, err := utils.LoadScript(ctx, script, func(ls *lua.LState) int {
top := ls.GetTop()
ss := make([]string, top)
for i := range top {
ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String()
}
_ = c.LogFlowInfo(ctx, req.FlowJobName, strings.Join(ss, "\t"))
_ = c.LogFlowInfo(ctx, flowJobName, strings.Join(ss, "\t"))
return 0
})
if err != nil {
return nil, err
}
if req.Script == "" {
if script == "" {
ls.Env.RawSetString("onRecord", ls.NewFunction(utils.DefaultOnRecord))
}
return ls, nil
}, func(krs []*kgo.Record) {
wg.Add(len(krs))
for _, kr := range krs {
c.client.Produce(wgCtx, kr, produceCb)
c.client.Produce(ctx, kr, produceCb)
}
})
}

func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) {
numRecords := atomic.Int64{}
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)

queueCtx, queueErr := context.WithCancelCause(ctx)
pool, err := c.createPool(queueCtx, req.Script, req.FlowJobName, queueErr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -250,15 +255,15 @@ Loop:
lfn := ls.Env.RawGetString("onRecord")
fn, ok := lfn.(*lua.LFunction)
if !ok {
wgErr(fmt.Errorf("script should define `onRecord` as function, not %s", lfn))
queueErr(fmt.Errorf("script should define `onRecord` as function, not %s", lfn))
return nil
}

ls.Push(fn)
ls.Push(pua.LuaRecord.New(ls, record))
err := ls.PCall(1, -1, nil)
if err != nil {
wgErr(fmt.Errorf("script failed: %w", err))
queueErr(fmt.Errorf("script failed: %w", err))
return nil
}

Expand All @@ -267,7 +272,7 @@ Loop:
for i := range args {
kr, err := lvalueToKafkaRecord(ls, ls.Get(i-args))
if err != nil {
wgErr(err)
queueErr(err)
return nil
}
if kr != nil {
Expand All @@ -284,28 +289,18 @@ Loop:
return results
})

case <-wgCtx.Done():
case <-queueCtx.Done():
break Loop
}
}

close(flushLoopDone)
if err := pool.Wait(wgCtx); err != nil {
if err := pool.Wait(queueCtx); err != nil {
return nil, err
}
if err := c.client.Flush(wgCtx); err != nil {
if err := c.client.Flush(queueCtx); err != nil {
return nil, fmt.Errorf("[kafka] final flush error: %w", err)
}
waitChan := make(chan struct{})
go func() {
wg.Wait()
close(waitChan)
}()
select {
case <-wgCtx.Done():
return nil, wgCtx.Err()
case <-waitChan:
}

lastCheckpoint := req.Records.GetLastCheckpoint()
if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint); err != nil {
Expand Down
111 changes: 111 additions & 0 deletions flow/connectors/kafka/qrep.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package connkafka

import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/twmb/franz-go/pkg/kgo"
lua "github.com/yuin/gopher-lua"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/pua"
)

func (*KafkaConnector) SetupQRepMetadataTables(_ context.Context, _ *protos.QRepConfig) error {
return nil
}

func (c *KafkaConnector) SyncQRepRecords(
ctx context.Context,
config *protos.QRepConfig,
partition *protos.QRepPartition,
stream *model.QRecordStream,
) (int, error) {
startTime := time.Now()
numRecords := atomic.Int64{}
schema := stream.Schema()

queueCtx, queueErr := context.WithCancelCause(ctx)
pool, err := c.createPool(queueCtx, config.Script, config.FlowJobName, queueErr)
if err != nil {
return 0, err
}
defer pool.Close()

Loop:
for {
select {
case qrecord, ok := <-stream.Records:
if !ok {
c.logger.Info("flushing batches because no more records")
break Loop
}

pool.Run(func(ls *lua.LState) []*kgo.Record {
items := model.NewRecordItems(len(qrecord))
for i, val := range qrecord {
items.AddColumn(schema.Fields[i].Name, val)
}
record := &model.InsertRecord[model.RecordItems]{
BaseRecord: model.BaseRecord{},
Items: items,
SourceTableName: config.WatermarkTable,
DestinationTableName: config.DestinationTableIdentifier,
CommitID: 0,
}

lfn := ls.Env.RawGetString("onRecord")
fn, ok := lfn.(*lua.LFunction)
if !ok {
queueErr(fmt.Errorf("script should define `onRecord` as function, not %s", lfn))
return nil
}

ls.Push(fn)
ls.Push(pua.LuaRecord.New(ls, record))
err := ls.PCall(1, -1, nil)
if err != nil {
queueErr(fmt.Errorf("script failed: %w", err))
return nil
}

args := ls.GetTop()
results := make([]*kgo.Record, 0, args)
for i := range args {
kr, err := lvalueToKafkaRecord(ls, ls.Get(i-args))
if err != nil {
queueErr(err)
return nil
}
if kr != nil {
if kr.Topic == "" {
kr.Topic = record.GetDestinationTableName()
}
results = append(results, kr)
}
}
ls.SetTop(0)
numRecords.Add(1)
return results
})

case <-queueCtx.Done():
break Loop
}
}

if err := pool.Wait(queueCtx); err != nil {
return 0, err
}
if err := c.client.Flush(queueCtx); err != nil {
return 0, fmt.Errorf("[kafka] final flush error: %w", err)
}

if err := c.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime); err != nil {
return 0, err
}
return int(numRecords.Load()), nil
}
Loading

0 comments on commit eae0f32

Please sign in to comment.