diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 287082dcdd..f3cab90bef 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -389,6 +389,7 @@ func (a *FlowableActivity) SyncFlow( FlowJobName: flowName, TableMappings: options.TableMappings, StagingPath: config.CdcStagingPath, + Script: config.Script, }) if err != nil { logger.Warn("failed to push records", slog.Any("error", err)) diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index 3ccb17d31f..8e7a9fda49 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -152,9 +152,9 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords } ls.PreloadModule("flatbuffers", pua.FlatBuffers_Loader) pua.RegisterTypes(ls) - err := ls.DoString(req.Script) + err := ls.GPCall(pua.LoadPeerdbScript, lua.LString(req.Script)) if err != nil { - return nil, fmt.Errorf("error while executing script: %w", err) + return nil, fmt.Errorf("error executing script %s: %w", req.Script, err) } var ok bool diff --git a/flow/pua/peerdb.go b/flow/pua/peerdb.go index a42eb8dd48..44007913a6 100644 --- a/flow/pua/peerdb.go +++ b/flow/pua/peerdb.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/jackc/pgx/v5" "github.com/yuin/gopher-lua" "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" @@ -52,15 +53,23 @@ func LoadPeerdbScript(ls *lua.LState) int { ls.RaiseError("Connection failed loading %s: %s", name, err.Error()) return 0 } + var source []byte err = pool.QueryRow(ctx, "select source from scripts where lang = 'lua' and name = $1", name).Scan(&source) - if err == nil { - fn, err := ls.Load(bytes.NewReader(source), name) - if err != nil { - ls.RaiseError(err.Error()) + if err != nil { + if err == pgx.ErrNoRows { + ls.Push(lua.LString("Could not find script " + name)) + return 1 } - ls.Push(fn) + ls.RaiseError("Failed to load script %s: %s", name, err.Error()) + return 0 + } + + fn, err := ls.Load(bytes.NewReader(source), name) + if err != nil { + ls.RaiseError(err.Error()) } + ls.Push(fn) return 1 } diff --git a/protos/flow.proto b/protos/flow.proto index e872b0fa17..1e2b7b71f3 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -69,6 +69,8 @@ message FlowConnectionConfigs { bool soft_delete = 17; string soft_delete_col_name = 18; string synced_at_col_name = 19; + + string script = 20; } message RenameTableOption { diff --git a/ui/app/mirrors/create/helpers/cdc.ts b/ui/app/mirrors/create/helpers/cdc.ts index 10ab07a951..f5112b465d 100644 --- a/ui/app/mirrors/create/helpers/cdc.ts +++ b/ui/app/mirrors/create/helpers/cdc.ts @@ -127,4 +127,14 @@ export const cdcSettings: MirrorSetting[] = [ type: 'switch', advanced: true, }, + { + label: 'Script', + stateHandler: (value, setter) => + setter((curr: CDCConfig) => ({ + ...curr, + script: (value as string) || '', + })), + tips: 'Associate PeerDB script with this mirror.', + advanced: true, + }, ]; diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index 5d1b8d9a93..07eb3d33f6 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -35,6 +35,7 @@ export const blankCDCSetting: FlowConnectionConfigs = { syncedAtColName: '', initialSnapshotOnly: false, idleTimeoutSeconds: 60, + script: '', }; export const blankQRepSetting = {