Skip to content

Commit

Permalink
add script to ui, script is name, rely on loader to retrieve from cat…
Browse files Browse the repository at this point in the history
…alog
  • Loading branch information
serprex committed Mar 11, 2024
1 parent 0416134 commit 92ff8f2
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 7 deletions.
1 change: 1 addition & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ func (a *FlowableActivity) SyncFlow(
FlowJobName: flowName,
TableMappings: options.TableMappings,
StagingPath: config.CdcStagingPath,
Script: config.Script,
})
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
}
ls.PreloadModule("flatbuffers", pua.FlatBuffers_Loader)
pua.RegisterTypes(ls)
err := ls.DoString(req.Script)
err := ls.GPCall(pua.LoadPeerdbScript, lua.LString(req.Script))
if err != nil {
return nil, fmt.Errorf("error while executing script: %w", err)
return nil, fmt.Errorf("error executing script %s: %w", req.Script, err)
}

var ok bool
Expand Down
19 changes: 14 additions & 5 deletions flow/pua/peerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"time"

"github.com/jackc/pgx/v5"
"github.com/yuin/gopher-lua"

"github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
Expand Down Expand Up @@ -52,15 +53,23 @@ func LoadPeerdbScript(ls *lua.LState) int {
ls.RaiseError("Connection failed loading %s: %s", name, err.Error())
return 0
}

var source []byte
err = pool.QueryRow(ctx, "select source from scripts where lang = 'lua' and name = $1", name).Scan(&source)
if err == nil {
fn, err := ls.Load(bytes.NewReader(source), name)
if err != nil {
ls.RaiseError(err.Error())
if err != nil {
if err == pgx.ErrNoRows {
ls.Push(lua.LString("Could not find script " + name))
return 1
}
ls.Push(fn)
ls.RaiseError("Failed to load script %s: %s", name, err.Error())
return 0
}

fn, err := ls.Load(bytes.NewReader(source), name)
if err != nil {
ls.RaiseError(err.Error())
}
ls.Push(fn)
return 1
}

Expand Down
2 changes: 2 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ message FlowConnectionConfigs {
bool soft_delete = 17;
string soft_delete_col_name = 18;
string synced_at_col_name = 19;

string script = 20;
}

message RenameTableOption {
Expand Down
10 changes: 10 additions & 0 deletions ui/app/mirrors/create/helpers/cdc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,14 @@ export const cdcSettings: MirrorSetting[] = [
type: 'switch',
advanced: true,
},
{
label: 'Script',
stateHandler: (value, setter) =>
setter((curr: CDCConfig) => ({
...curr,
script: (value as string) || '',
})),
tips: 'Associate PeerDB script with this mirror.',
advanced: true,
},
];
1 change: 1 addition & 0 deletions ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export const blankCDCSetting: FlowConnectionConfigs = {
syncedAtColName: '',
initialSnapshotOnly: false,
idleTimeoutSeconds: 60,
script: '',
};

export const blankQRepSetting = {
Expand Down

0 comments on commit 92ff8f2

Please sign in to comment.