Skip to content

Commit

Permalink
Merge branch 'main' into api/custom-resume-endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored May 13, 2024
2 parents 7b5ee53 + 5f904dd commit a4a15e9
Show file tree
Hide file tree
Showing 21 changed files with 442 additions and 113 deletions.
2 changes: 1 addition & 1 deletion e2e_cleanup/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/PeerDB-io/peer-flow-cleanup

go 1.22.2
go 1.22.3

require (
cloud.google.com/go/bigquery v1.59.1
Expand Down
23 changes: 20 additions & 3 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/yuin/gopher-lua"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/pua"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -343,10 +345,25 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
})
defer shutdown()

var rowsSynced int
bufferSize := shared.FetchAndChannelSize
errGroup, errCtx := errgroup.WithContext(ctx)
stream := model.NewQRecordStream(bufferSize)
outstream := stream
if config.Script != "" {
ls, err := utils.LoadScript(ctx, config.Script, utils.LuaPrintFn(func(s string) {
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, s)
}))
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}
lfn := ls.Env.RawGetString("transformRow")
if fn, ok := lfn.(*lua.LFunction); ok {
outstream = pua.AttachToStream(ls, fn, stream)
}
}

var rowsSynced int
errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
tmp, err := srcConn.PullQRepRecords(errCtx, config, partition, stream)
if err != nil {
Expand All @@ -363,7 +380,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
})

errGroup.Go(func() error {
rowsSynced, err = dstConn.SyncQRepRecords(errCtx, config, partition, stream)
rowsSynced, err = dstConn.SyncQRepRecords(errCtx, config, partition, outstream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
Expand Down
18 changes: 14 additions & 4 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,15 @@ func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a
if err != nil {
return err
}

sessionTokenPart := ""
if creds.AWS.SessionToken != "" {
sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken)
}
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', '%s', 'Avro')",
query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s'%s, 'Avro')",
s.config.DestinationTableIdentifier, avroFileUrl,
creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, creds.AWS.SessionToken)
creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart)

_, err = s.connector.database.ExecContext(ctx, query)

Expand Down Expand Up @@ -137,10 +142,15 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
selector = append(selector, "`"+colName+"`")
}
selectorStr := strings.Join(selector, ",")

sessionTokenPart := ""
if creds.AWS.SessionToken != "" {
sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken)
}
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM s3('%s','%s','%s', '%s', 'Avro')",
query := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM s3('%s','%s','%s'%s, 'Avro')",
config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl,
creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, creds.AWS.SessionToken)
creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart)

_, err = s.connector.database.ExecContext(ctx, query)
if err != nil {
Expand Down
13 changes: 3 additions & 10 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -196,15 +195,9 @@ func (c *EventHubConnector) processBatch(
var fn *lua.LFunction
if req.Script != "" {
var err error
ls, err = utils.LoadScript(ctx, req.Script, func(ls *lua.LState) int {
top := ls.GetTop()
ss := make([]string, top)
for i := range top {
ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String()
}
_ = c.LogFlowInfo(ctx, req.FlowJobName, strings.Join(ss, "\t"))
return 0
})
ls, err = utils.LoadScript(ctx, req.Script, utils.LuaPrintFn(func(s string) {
_ = c.LogFlowInfo(ctx, req.FlowJobName, s)
}))
if err != nil {
return 0, err
}
Expand Down
68 changes: 39 additions & 29 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/tls"
"fmt"
"log/slog"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -165,54 +164,63 @@ func lvalueToKafkaRecord(ls *lua.LState, value lua.LValue) (*kgo.Record, error)
return kr, nil
}

type poolResult struct {
records []*kgo.Record
lsn int64
}

func (c *KafkaConnector) createPool(
ctx context.Context,
script string,
flowJobName string,
lastSeenLSN *atomic.Int64,
queueErr func(error),
) (*utils.LPool[[]*kgo.Record], error) {
produceCb := func(_ *kgo.Record, err error) {
if err != nil {
queueErr(err)
}
}

) (*utils.LPool[poolResult], error) {
return utils.LuaPool(func() (*lua.LState, error) {
ls, err := utils.LoadScript(ctx, script, func(ls *lua.LState) int {
top := ls.GetTop()
ss := make([]string, top)
for i := range top {
ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String()
}
_ = c.LogFlowInfo(ctx, flowJobName, strings.Join(ss, "\t"))
return 0
})
ls, err := utils.LoadScript(ctx, script, utils.LuaPrintFn(func(s string) {
_ = c.LogFlowInfo(ctx, flowJobName, s)
}))
if err != nil {
return nil, err
}
if script == "" {
ls.Env.RawSetString("onRecord", ls.NewFunction(utils.DefaultOnRecord))
}
return ls, nil
}, func(krs []*kgo.Record) {
for _, kr := range krs {
c.client.Produce(ctx, kr, produceCb)
}, func(result poolResult) {
lenRecords := int32(len(result.records))
if lenRecords == 0 {
if lastSeenLSN != nil {
shared.AtomicInt64Max(lastSeenLSN, result.lsn)
}
} else {
recordCounter := atomic.Int32{}
recordCounter.Store(lenRecords)
for _, kr := range result.records {
c.client.Produce(ctx, kr, func(_ *kgo.Record, err error) {
if err != nil {
queueErr(err)
} else if recordCounter.Add(-1) == 0 && lastSeenLSN != nil {
shared.AtomicInt64Max(lastSeenLSN, result.lsn)
}
})
}
}
})
}

func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) {
numRecords := atomic.Int64{}
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
lastSeenLSN := atomic.Int64{}

queueCtx, queueErr := context.WithCancelCause(ctx)
pool, err := c.createPool(queueCtx, req.Script, req.FlowJobName, queueErr)
pool, err := c.createPool(queueCtx, req.Script, req.FlowJobName, &lastSeenLSN, queueErr)
if err != nil {
return nil, err
}
defer pool.Close()

lastSeenLSN := atomic.Int64{}
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
flushLoopDone := make(chan struct{})
go func() {
ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds())
Expand Down Expand Up @@ -251,20 +259,20 @@ Loop:
break Loop
}

pool.Run(func(ls *lua.LState) []*kgo.Record {
pool.Run(func(ls *lua.LState) poolResult {
lfn := ls.Env.RawGetString("onRecord")
fn, ok := lfn.(*lua.LFunction)
if !ok {
queueErr(fmt.Errorf("script should define `onRecord` as function, not %s", lfn))
return nil
return poolResult{}
}

ls.Push(fn)
ls.Push(pua.LuaRecord.New(ls, record))
err := ls.PCall(1, -1, nil)
if err != nil {
queueErr(fmt.Errorf("script failed: %w", err))
return nil
return poolResult{}
}

args := ls.GetTop()
Expand All @@ -273,7 +281,7 @@ Loop:
kr, err := lvalueToKafkaRecord(ls, ls.Get(i-args))
if err != nil {
queueErr(err)
return nil
return poolResult{}
}
if kr != nil {
if kr.Topic == "" {
Expand All @@ -285,8 +293,10 @@ Loop:
}
ls.SetTop(0)
numRecords.Add(1)
shared.AtomicInt64Max(&lastSeenLSN, record.GetCheckpointID())
return results
return poolResult{
records: results,
lsn: record.GetCheckpointID(),
}
})

case <-queueCtx.Done():
Expand Down
12 changes: 6 additions & 6 deletions flow/connectors/kafka/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (c *KafkaConnector) SyncQRepRecords(
schema := stream.Schema()

queueCtx, queueErr := context.WithCancelCause(ctx)
pool, err := c.createPool(queueCtx, config.Script, config.FlowJobName, queueErr)
pool, err := c.createPool(queueCtx, config.Script, config.FlowJobName, nil, queueErr)
if err != nil {
return 0, err
}
Expand All @@ -44,7 +44,7 @@ Loop:
break Loop
}

pool.Run(func(ls *lua.LState) []*kgo.Record {
pool.Run(func(ls *lua.LState) poolResult {
items := model.NewRecordItems(len(qrecord))
for i, val := range qrecord {
items.AddColumn(schema.Fields[i].Name, val)
Expand All @@ -61,15 +61,15 @@ Loop:
fn, ok := lfn.(*lua.LFunction)
if !ok {
queueErr(fmt.Errorf("script should define `onRecord` as function, not %s", lfn))
return nil
return poolResult{}
}

ls.Push(fn)
ls.Push(pua.LuaRecord.New(ls, record))
err := ls.PCall(1, -1, nil)
if err != nil {
queueErr(fmt.Errorf("script failed: %w", err))
return nil
return poolResult{}
}

args := ls.GetTop()
Expand All @@ -78,7 +78,7 @@ Loop:
kr, err := lvalueToKafkaRecord(ls, ls.Get(i-args))
if err != nil {
queueErr(err)
return nil
return poolResult{}
}
if kr != nil {
if kr.Topic == "" {
Expand All @@ -89,7 +89,7 @@ Loop:
}
ls.SetTop(0)
numRecords.Add(1)
return results
return poolResult{records: results}
})

case <-queueCtx.Done():
Expand Down
Loading

0 comments on commit a4a15e9

Please sign in to comment.