From 92337946bce2dbcdc2564c2489b1487a50681b80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 17 Apr 2024 01:20:31 +0000 Subject: [PATCH] Initial sketch of qrep scripting, mechanism should work for cdc too Needs to address error handling, maybe run inside errgroup as a long way to cancel incoming stream Also need to implement __newindex on LuaRow to allow mutating value --- flow/activities/flowable_core.go | 34 +++++++++++++++++++++++++++++--- flow/pua/stream_adapter.go | 32 ++++++++++++++++++++++++++++++ protos/flow.proto | 1 + 3 files changed, 64 insertions(+), 3 deletions(-) create mode 100644 flow/pua/stream_adapter.go diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 319da34c49..1935e066f4 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -6,11 +6,13 @@ import ( "fmt" "log/slog" "reflect" + "strings" "sync/atomic" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" + "github.com/yuin/gopher-lua" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -23,6 +25,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/peerdbenv" + "github.com/PeerDB-io/peer-flow/pua" "github.com/PeerDB-io/peer-flow/shared" ) @@ -343,10 +346,35 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }) defer shutdown() - var rowsSynced int bufferSize := shared.FetchAndChannelSize - errGroup, errCtx := errgroup.WithContext(ctx) stream := model.NewQRecordStream(bufferSize) + outstream := stream + if config.Script != "" { + ls, err := utils.LoadScript(ctx, config.Script, func(ls *lua.LState) int { + top := ls.GetTop() + ss := make([]string, top) + for i := range top { + ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() + } + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, strings.Join(ss, "\t")) + return 0 + }) + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return err + } + lfn := ls.Env.RawGetString("transformRow") + fn, ok := lfn.(*lua.LFunction) + if !ok { + err := fmt.Errorf("script should define `transformRow` as function, not %s", lfn) + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return err + } + outstream = pua.AttachToStream(ls, fn, stream) + } + + var rowsSynced int + errGroup, errCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { tmp, err := srcConn.PullQRepRecords(errCtx, config, partition, stream) if err != nil { @@ -363,7 +391,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }) errGroup.Go(func() error { - rowsSynced, err = dstConn.SyncQRepRecords(errCtx, config, partition, stream) + rowsSynced, err = dstConn.SyncQRepRecords(errCtx, config, partition, outstream) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return fmt.Errorf("failed to sync records: %w", err) diff --git a/flow/pua/stream_adapter.go b/flow/pua/stream_adapter.go new file mode 100644 index 0000000000..05e0371d14 --- /dev/null +++ b/flow/pua/stream_adapter.go @@ -0,0 +1,32 @@ +package pua + +import ( + "github.com/yuin/gopher-lua" + + "github.com/PeerDB-io/peer-flow/model" +) + +func AttachToStream(ls *lua.LState, lfn *lua.LFunction, stream *model.QRecordStream) *model.QRecordStream { + output := model.NewQRecordStream(0) + go func() { + schema := stream.Schema() + output.SetSchema(schema) + for record := range stream.Records { + row := model.NewRecordItems(len(record)) + for i, qv := range record { + row.AddColumn(schema.Fields[i].Name, qv) + } + ls.Push(lfn) + ls.Push(LuaRow.New(ls, row)) + if err := ls.PCall(1, 0, nil); err != nil { + panic(err.Error()) // TODO error handling + } + for i, field := range schema.Fields { + record[i] = row.GetColumnValue(field.Name) + } + output.Records <- record + } + output.Close(stream.Err()) + }() + return output +} diff --git a/protos/flow.proto b/protos/flow.proto index 46516fea29..4d4343f71a 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -297,6 +297,7 @@ message QRepConfig { string soft_delete_col_name = 17; TypeSystem system = 18; + string script = 19; } message QRepPartition {