Skip to content

Commit

Permalink
add script to ui, script is name, rely on loader to retrieve from cat…
Browse files Browse the repository at this point in the history
…alog
  • Loading branch information
serprex committed Mar 11, 2024
1 parent 4316dd3 commit a092615
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 7 deletions.
1 change: 1 addition & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ func (a *FlowableActivity) SyncFlow(
FlowJobName: flowName,
TableMappings: options.TableMappings,
StagingPath: config.CdcStagingPath,
Script: config.Script,
})
if err != nil {
logger.Warn("failed to push records", slog.Any("error", err))
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
}
ls.PreloadModule("flatbuffers", pua.FlatBuffers_Loader)
pua.RegisterTypes(ls)
err := ls.DoString(req.Script)
err := ls.GPCall(pua.LoadPeerdbScript, lua.LString(req.Script))
if err != nil {
return nil, fmt.Errorf("error while executing script: %w", err)
return nil, fmt.Errorf("error executing script %s: %w", req.Script, err)
}

var ok bool
Expand Down
19 changes: 14 additions & 5 deletions flow/pua/peerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"time"

"github.com/jackc/pgx/v5"
"github.com/yuin/gopher-lua"

"github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
Expand Down Expand Up @@ -52,15 +53,23 @@ func LoadPeerdbScript(ls *lua.LState) int {
ls.RaiseError("Connection failed loading %s: %s", name, err.Error())
return 0
}

var source []byte
err = pool.QueryRow(ctx, "select source from scripts where lang = 'lua' and name = $1", name).Scan(&source)
if err == nil {
fn, err := ls.Load(bytes.NewReader(source), name)
if err != nil {
ls.RaiseError(err.Error())
if err != nil {
if err == pgx.ErrNoRows {
ls.Push(lua.LString(fmt.Sprintf("Could not find script %s", name)))

Check failure on line 61 in flow/pua/peerdb.go

View workflow job for this annotation

GitHub Actions / lint

fmt.Sprintf can be replaced with string addition (perfsprint)
return 1
}
ls.Push(fn)
ls.RaiseError("Failed to load script %s: %s", name, err.Error())
return 0
}

fn, err := ls.Load(bytes.NewReader(source), name)
if err != nil {
ls.RaiseError(err.Error())
}
ls.Push(fn)
return 1
}

Expand Down
2 changes: 2 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ message FlowConnectionConfigs {
bool soft_delete = 17;
string soft_delete_col_name = 18;
string synced_at_col_name = 19;

string script = 20;
}

message RenameTableOption {
Expand Down
10 changes: 10 additions & 0 deletions ui/app/mirrors/create/helpers/cdc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,14 @@ export const cdcSettings: MirrorSetting[] = [
type: 'switch',
advanced: true,
},
{
label: 'Script',
stateHandler: (value, setter) =>
setter((curr: CDCConfig) => ({
...curr,
script: (value as string) || '',
})),
tips: 'Associate PeerDB script with this mirror.',
advanced: true,
},
];
1 change: 1 addition & 0 deletions ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export const blankCDCSetting: FlowConnectionConfigs = {
syncedAtColName: '',
initialSnapshotOnly: false,
idleTimeoutSeconds: 60,
script: '',
};

export const blankQRepSetting = {
Expand Down

0 comments on commit a092615

Please sign in to comment.