Skip to content

Commit

Permalink
Initial sketch of qrep scripting, mechanism should work for cdc too
Browse files Browse the repository at this point in the history
Needs to address error handling, maybe run inside errgroup as a long way to cancel incoming stream

Also need to implement __newindex on LuaRow to allow mutating value
  • Loading branch information
serprex committed May 5, 2024
1 parent bdbdfca commit 9233794
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 3 deletions.
34 changes: 31 additions & 3 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"fmt"
"log/slog"
"reflect"
"strings"
"sync/atomic"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/yuin/gopher-lua"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand All @@ -23,6 +25,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/pua"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -343,10 +346,35 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
})
defer shutdown()

var rowsSynced int
bufferSize := shared.FetchAndChannelSize
errGroup, errCtx := errgroup.WithContext(ctx)
stream := model.NewQRecordStream(bufferSize)
outstream := stream
if config.Script != "" {
ls, err := utils.LoadScript(ctx, config.Script, func(ls *lua.LState) int {
top := ls.GetTop()
ss := make([]string, top)
for i := range top {
ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String()
}
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, strings.Join(ss, "\t"))
return 0
})
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}
lfn := ls.Env.RawGetString("transformRow")
fn, ok := lfn.(*lua.LFunction)
if !ok {
err := fmt.Errorf("script should define `transformRow` as function, not %s", lfn)
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}
outstream = pua.AttachToStream(ls, fn, stream)
}

var rowsSynced int
errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
tmp, err := srcConn.PullQRepRecords(errCtx, config, partition, stream)
if err != nil {
Expand All @@ -363,7 +391,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
})

errGroup.Go(func() error {
rowsSynced, err = dstConn.SyncQRepRecords(errCtx, config, partition, stream)
rowsSynced, err = dstConn.SyncQRepRecords(errCtx, config, partition, outstream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
Expand Down
32 changes: 32 additions & 0 deletions flow/pua/stream_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package pua

import (
"github.com/yuin/gopher-lua"

"github.com/PeerDB-io/peer-flow/model"
)

func AttachToStream(ls *lua.LState, lfn *lua.LFunction, stream *model.QRecordStream) *model.QRecordStream {
output := model.NewQRecordStream(0)
go func() {
schema := stream.Schema()
output.SetSchema(schema)
for record := range stream.Records {
row := model.NewRecordItems(len(record))
for i, qv := range record {
row.AddColumn(schema.Fields[i].Name, qv)
}
ls.Push(lfn)
ls.Push(LuaRow.New(ls, row))
if err := ls.PCall(1, 0, nil); err != nil {
panic(err.Error()) // TODO error handling
}
for i, field := range schema.Fields {
record[i] = row.GetColumnValue(field.Name)
}
output.Records <- record
}
output.Close(stream.Err())
}()
return output
}
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ message QRepConfig {
string soft_delete_col_name = 17;

TypeSystem system = 18;
string script = 19;
}

message QRepPartition {
Expand Down

0 comments on commit 9233794

Please sign in to comment.