diff --git a/flow/connectors/core.go b/flow/connectors/core.go index dc7afe0495..0814feabd9 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -329,8 +329,9 @@ var ( _ QRepSyncConnector = &connpostgres.PostgresConnector{} _ QRepSyncConnector = &connbigquery.BigQueryConnector{} _ QRepSyncConnector = &connsnowflake.SnowflakeConnector{} - _ QRepSyncConnector = &connclickhouse.ClickhouseConnector{} + _ QRepSyncConnector = &connkafka.KafkaConnector{} _ QRepSyncConnector = &conns3.S3Connector{} + _ QRepSyncConnector = &connclickhouse.ClickhouseConnector{} _ QRepSyncConnector = &connelasticsearch.ElasticsearchConnector{} _ QRepConsolidateConnector = &connsnowflake.SnowflakeConnector{} diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index 30b370e51e..e09b14ab33 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -6,7 +6,6 @@ import ( "fmt" "log/slog" "strings" - "sync" "sync/atomic" "time" @@ -167,13 +166,11 @@ func lvalueToKafkaRecord(ls *lua.LState, value lua.LValue) (*kgo.Record, error) } func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) { - var wg sync.WaitGroup wgCtx, wgErr := context.WithCancelCause(ctx) produceCb := func(_ *kgo.Record, err error) { if err != nil { wgErr(err) } - wg.Done() } numRecords := atomic.Int64{} @@ -197,7 +194,6 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords } return ls, nil }, func(krs []*kgo.Record) { - wg.Add(len(krs)) for _, kr := range krs { c.client.Produce(wgCtx, kr, produceCb) } @@ -296,16 +292,6 @@ Loop: if err := c.client.Flush(wgCtx); err != nil { return nil, fmt.Errorf("[kafka] final flush error: %w", err) } - waitChan := make(chan struct{}) - go func() { - wg.Wait() - close(waitChan) - }() - select { - case <-wgCtx.Done(): - return nil, wgCtx.Err() - case <-waitChan: - } lastCheckpoint := req.Records.GetLastCheckpoint() if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint); err != nil { diff --git a/flow/connectors/kafka/qrep.go b/flow/connectors/kafka/qrep.go new file mode 100644 index 0000000000..0e4867ae02 --- /dev/null +++ b/flow/connectors/kafka/qrep.go @@ -0,0 +1,140 @@ +package connkafka + +import ( + "context" + "fmt" + "strings" + "sync/atomic" + "time" + + "github.com/twmb/franz-go/pkg/kgo" + lua "github.com/yuin/gopher-lua" + + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/pua" +) + +func (*KafkaConnector) SetupQRepMetadataTables(_ context.Context, _ *protos.QRepConfig) error { + return nil +} + +func (c *KafkaConnector) SyncQRepRecords( + ctx context.Context, + config *protos.QRepConfig, + partition *protos.QRepPartition, + stream *model.QRecordStream, +) (int, error) { + startTime := time.Now() + schema := stream.Schema() + + wgCtx, wgErr := context.WithCancelCause(ctx) + produceCb := func(_ *kgo.Record, err error) { + if err != nil { + wgErr(err) + } + } + + pool, err := utils.LuaPool(func() (*lua.LState, error) { + ls, err := utils.LoadScript(wgCtx, config.Script, func(ls *lua.LState) int { + top := ls.GetTop() + ss := make([]string, top) + for i := range top { + ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() + } + _ = c.LogFlowInfo(ctx, config.FlowJobName, strings.Join(ss, "\t")) + return 0 + }) + if err != nil { + return nil, err + } + if config.Script == "" { + ls.Env.RawSetString("onRecord", ls.NewFunction(utils.DefaultOnRecord)) + } + return ls, nil + }, func(krs []*kgo.Record) { + for _, kr := range krs { + c.client.Produce(wgCtx, kr, produceCb) + } + }) + if err != nil { + return 0, err + } + defer pool.Close() + + numRecords := atomic.Int64{} +Loop: + for { + select { + case qrecord, ok := <-stream.Records: + if !ok { + c.logger.Info("flushing batches because no more records") + break Loop + } + + pool.Run(func(ls *lua.LState) []*kgo.Record { + items := model.NewRecordItems(len(qrecord)) + for i, val := range qrecord { + items.AddColumn(schema.Fields[i].Name, val) + } + record := &model.InsertRecord[model.RecordItems]{ + BaseRecord: model.BaseRecord{}, + Items: items, + SourceTableName: config.WatermarkTable, + DestinationTableName: config.DestinationTableIdentifier, + CommitID: 0, + } + + lfn := ls.Env.RawGetString("onRecord") + fn, ok := lfn.(*lua.LFunction) + if !ok { + wgErr(fmt.Errorf("script should define `onRecord` as function, not %s", lfn)) + return nil + } + + ls.Push(fn) + ls.Push(pua.LuaRecord.New(ls, record)) + err := ls.PCall(1, -1, nil) + if err != nil { + wgErr(fmt.Errorf("script failed: %w", err)) + return nil + } + + args := ls.GetTop() + results := make([]*kgo.Record, 0, args) + for i := range args { + kr, err := lvalueToKafkaRecord(ls, ls.Get(i-args)) + if err != nil { + wgErr(err) + return nil + } + if kr != nil { + if kr.Topic == "" { + kr.Topic = record.GetDestinationTableName() + } + results = append(results, kr) + } + } + ls.SetTop(0) + numRecords.Add(1) + return results + }) + + case <-wgCtx.Done(): + break Loop + } + } + + if err := pool.Wait(wgCtx); err != nil { + return 0, err + } + if err := c.client.Flush(wgCtx); err != nil { + return 0, fmt.Errorf("[kafka] final flush error: %w", err) + } + + if err := c.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime); err != nil { + return 0, err + } + return int(numRecords.Load()), nil +} diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index f42c9f4cd8..fadd09deab 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -125,7 +125,7 @@ func (s *SnowflakeAvroSyncHandler) SyncQRepRecords( err = s.connector.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime) if err != nil { - return -1, err + return 0, err } activity.RecordHeartbeat(ctx, "finished syncing records") diff --git a/protos/flow.proto b/protos/flow.proto index 46516fea29..4d4343f71a 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -297,6 +297,7 @@ message QRepConfig { string soft_delete_col_name = 17; TypeSystem system = 18; + string script = 19; } message QRepPartition {