From 9d74565b2338fa5142471cfe366b07607348428a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 9 May 2024 12:42:29 +0000 Subject: [PATCH 1/6] UI: enable initial load options for non-EH queues (#1683) --- ui/app/mirrors/create/cdc/cdc.tsx | 4 ++-- ui/app/mirrors/create/handlers.ts | 10 ++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/ui/app/mirrors/create/cdc/cdc.tsx b/ui/app/mirrors/create/cdc/cdc.tsx index 353d988d19..495a406cc5 100644 --- a/ui/app/mirrors/create/cdc/cdc.tsx +++ b/ui/app/mirrors/create/cdc/cdc.tsx @@ -83,10 +83,10 @@ export default function CDCConfigForm({ !isQueue) || (label.includes('staging path') && defaultSyncMode(mirrorConfig.destination?.type) !== 'AVRO') || - (isQueue && + (isQueue && label.includes('soft delete')) || + (mirrorConfig.destination?.type === DBType.EVENTHUBS && (label.includes('initial copy') || label.includes('initial load') || - label.includes('soft delete') || label.includes('snapshot'))) || ((mirrorConfig.source?.type !== DBType.POSTGRES || mirrorConfig.destination?.type !== DBType.POSTGRES) && diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index d924dc2588..1c45210c4d 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -25,13 +25,11 @@ import { } from './schema'; export const IsQueuePeer = (peerType?: DBType): boolean => { - if (!peerType) { - return false; - } return ( - peerType === DBType.KAFKA || - peerType === DBType.PUBSUB || - peerType === DBType.EVENTHUBS + !!peerType && + (peerType === DBType.KAFKA || + peerType === DBType.PUBSUB || + peerType === DBType.EVENTHUBS) ); }; From c6d7b282790dd460fba5ab93d1f50a57b37c17c9 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Thu, 9 May 2024 23:46:43 +0530 Subject: [PATCH 2/6] Clickhouse insert-select: account for lower version (#1707) This PR tweaks the insert into select queries of clickhouse qrep to account for lower clickhouse versions where session token as an argument is not supported for the s3() table function. https://github.com/ClickHouse/ClickHouse/issues/61230 functionally tested --- flow/connectors/clickhouse/qrep_avro_sync.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index a0a9fb9a71..b015ef47e4 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -47,10 +47,15 @@ func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a if err != nil { return err } + + sessionTokenPart := "" + if creds.AWS.SessionToken != "" { + sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken) + } //nolint:gosec - query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', '%s', 'Avro')", + query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s'%s, 'Avro')", s.config.DestinationTableIdentifier, avroFileUrl, - creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, creds.AWS.SessionToken) + creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart) _, err = s.connector.database.ExecContext(ctx, query) @@ -137,10 +142,15 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( selector = append(selector, "`"+colName+"`") } selectorStr := strings.Join(selector, ",") + + sessionTokenPart := "" + if creds.AWS.SessionToken != "" { + sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken) + } //nolint:gosec - query := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM s3('%s','%s','%s', '%s', 'Avro')", + query := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s FROM s3('%s','%s','%s'%s, 'Avro')", config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl, - creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, creds.AWS.SessionToken) + creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart) _, err = s.connector.database.ExecContext(ctx, query) if err != nil { From b80d43462126bf2aea23fa9f0966937d2227200e Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Fri, 10 May 2024 00:15:23 +0530 Subject: [PATCH 3/6] Snowflake merge statement: filter out empty peerdb data (#1708) This prevents NormaliseFlow from erroring when we truncate rows > 16MB for PG to Snowflake CDC mirrors --- flow/connectors/snowflake/snowflake.go | 1 + 1 file changed, 1 insertion(+) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 27bb69d07f..a0fcc1754d 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -44,6 +44,7 @@ const ( SELECT _PEERDB_UID,_PEERDB_TIMESTAMP,TO_VARIANT(PARSE_JSON(_PEERDB_DATA)) %s,_PEERDB_RECORD_TYPE, _PEERDB_MATCH_DATA,_PEERDB_BATCH_ID,_PEERDB_UNCHANGED_TOAST_COLUMNS FROM _PEERDB_INTERNAL.%s WHERE _PEERDB_BATCH_ID = %d AND + _PEERDB_DATA != '' AND _PEERDB_DESTINATION_TABLE_NAME = ? ), FLATTENED AS (SELECT _PEERDB_UID,_PEERDB_TIMESTAMP,_PEERDB_RECORD_TYPE,_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID, _PEERDB_UNCHANGED_TOAST_COLUMNS,%s From ffd494f11d0076cb8a9af8a14534680afdb92319 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 10 May 2024 13:38:15 +0000 Subject: [PATCH 4/6] QRep scripting (#1625) Scripts for qrep may define a function `transformRow(row)` which can reassign fields' values (without changing types) --- flow/activities/flowable_core.go | 23 +++- flow/connectors/eventhub/eventhub.go | 13 +- flow/connectors/kafka/kafka.go | 13 +- flow/connectors/pubsub/pubsub.go | 13 +- flow/connectors/utils/lua.go | 13 ++ flow/e2e/postgres/qrep_flow_pg_test.go | 52 ++++++++ flow/pua/peerdb.go | 174 +++++++++++++++++++++++++ flow/pua/stream_adapter.go | 33 +++++ flow/shared/lua.go | 11 +- ui/app/mirrors/create/helpers/qrep.ts | 8 +- 10 files changed, 318 insertions(+), 35 deletions(-) create mode 100644 flow/pua/stream_adapter.go diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 319da34c49..ad3dca378a 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -11,6 +11,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" + "github.com/yuin/gopher-lua" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -23,6 +24,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/peerdbenv" + "github.com/PeerDB-io/peer-flow/pua" "github.com/PeerDB-io/peer-flow/shared" ) @@ -343,10 +345,25 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }) defer shutdown() - var rowsSynced int bufferSize := shared.FetchAndChannelSize - errGroup, errCtx := errgroup.WithContext(ctx) stream := model.NewQRecordStream(bufferSize) + outstream := stream + if config.Script != "" { + ls, err := utils.LoadScript(ctx, config.Script, utils.LuaPrintFn(func(s string) { + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, s) + })) + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return err + } + lfn := ls.Env.RawGetString("transformRow") + if fn, ok := lfn.(*lua.LFunction); ok { + outstream = pua.AttachToStream(ls, fn, stream) + } + } + + var rowsSynced int + errGroup, errCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { tmp, err := srcConn.PullQRepRecords(errCtx, config, partition, stream) if err != nil { @@ -363,7 +380,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }) errGroup.Go(func() error { - rowsSynced, err = dstConn.SyncQRepRecords(errCtx, config, partition, stream) + rowsSynced, err = dstConn.SyncQRepRecords(errCtx, config, partition, outstream) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return fmt.Errorf("failed to sync records: %w", err) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 4f95d1007e..1182f4d413 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log/slog" - "strings" "sync/atomic" "time" @@ -196,15 +195,9 @@ func (c *EventHubConnector) processBatch( var fn *lua.LFunction if req.Script != "" { var err error - ls, err = utils.LoadScript(ctx, req.Script, func(ls *lua.LState) int { - top := ls.GetTop() - ss := make([]string, top) - for i := range top { - ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() - } - _ = c.LogFlowInfo(ctx, req.FlowJobName, strings.Join(ss, "\t")) - return 0 - }) + ls, err = utils.LoadScript(ctx, req.Script, utils.LuaPrintFn(func(s string) { + _ = c.LogFlowInfo(ctx, req.FlowJobName, s) + })) if err != nil { return 0, err } diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index cfe4652598..c58da5e50f 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "fmt" "log/slog" - "strings" "sync/atomic" "time" @@ -178,15 +177,9 @@ func (c *KafkaConnector) createPool( } return utils.LuaPool(func() (*lua.LState, error) { - ls, err := utils.LoadScript(ctx, script, func(ls *lua.LState) int { - top := ls.GetTop() - ss := make([]string, top) - for i := range top { - ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() - } - _ = c.LogFlowInfo(ctx, flowJobName, strings.Join(ss, "\t")) - return 0 - }) + ls, err := utils.LoadScript(ctx, script, utils.LuaPrintFn(func(s string) { + _ = c.LogFlowInfo(ctx, flowJobName, s) + })) if err != nil { return nil, err } diff --git a/flow/connectors/pubsub/pubsub.go b/flow/connectors/pubsub/pubsub.go index 54031f016d..0a8709b3b2 100644 --- a/flow/connectors/pubsub/pubsub.go +++ b/flow/connectors/pubsub/pubsub.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log/slog" - "strings" "sync" "sync/atomic" "time" @@ -130,15 +129,9 @@ func (c *PubSubConnector) createPool( queueErr func(error), ) (*utils.LPool[[]PubSubMessage], error) { return utils.LuaPool(func() (*lua.LState, error) { - ls, err := utils.LoadScript(ctx, script, func(ls *lua.LState) int { - top := ls.GetTop() - ss := make([]string, top) - for i := range top { - ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() - } - _ = c.LogFlowInfo(ctx, flowJobName, strings.Join(ss, "\t")) - return 0 - }) + ls, err := utils.LoadScript(ctx, script, utils.LuaPrintFn(func(s string) { + _ = c.LogFlowInfo(ctx, flowJobName, s) + })) if err != nil { return nil, err } diff --git a/flow/connectors/utils/lua.go b/flow/connectors/utils/lua.go index 47676721b3..f1d82f373f 100644 --- a/flow/connectors/utils/lua.go +++ b/flow/connectors/utils/lua.go @@ -3,6 +3,7 @@ package utils import ( "context" "fmt" + "strings" "github.com/yuin/gopher-lua" @@ -35,6 +36,18 @@ func LVAsStringOrNil(ls *lua.LState, v lua.LValue) (string, error) { } } +func LuaPrintFn(fn func(string)) lua.LGFunction { + return func(ls *lua.LState) int { + top := ls.GetTop() + ss := make([]string, top) + for i := range top { + ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() + } + fn(strings.Join(ss, "\t")) + return 0 + } +} + func LoadScript(ctx context.Context, script string, printfn lua.LGFunction) (*lua.LState, error) { ls := lua.NewState(lua.Options{SkipOpenLibs: true}) ls.SetContext(ctx) diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 63a226ae13..4f2f944a97 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -406,3 +406,55 @@ func (s PeerFlowE2ETestSuitePG) Test_Pause() { env.Cancel() e2e.RequireEnvCanceled(s.t, env) } + +func (s PeerFlowE2ETestSuitePG) TestTransform() { + numRows := 10 + + srcTable := "test_transform" + s.setupSourceTable(srcTable, numRows) + + dstTable := "test_transformdst" + + srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) + dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) + + query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", srcSchemaQualified) + + postgresPeer := e2e.GeneratePostgresPeer() + + _, err := s.Conn().Exec(context.Background(), `insert into public.scripts (name, lang, source) values + ('pgtransform', 'lua', 'function transformRow(row) row.myreal = 1729 end') on conflict do nothing`) + require.NoError(s.t, err) + + qrepConfig, err := e2e.CreateQRepWorkflowConfig( + "test_transform", + srcSchemaQualified, + dstSchemaQualified, + query, + postgresPeer, + "", + true, + "_PEERDB_SYNCED_AT", + "", + ) + require.NoError(s.t, err) + qrepConfig.WriteMode = &protos.QRepWriteMode{ + WriteType: protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE, + } + qrepConfig.InitialCopyOnly = false + qrepConfig.Script = "pgtransform" + + tc := e2e.NewTemporalClient(s.t) + env := e2e.RunQRepFlowWorkflow(tc, qrepConfig) + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "waiting for first sync to complete", func() bool { + err = s.compareCounts(dstSchemaQualified, int64(numRows)) + return err == nil + }) + require.NoError(s.t, env.Error()) + + var exists bool + err = s.Conn().QueryRow(context.Background(), + fmt.Sprintf("select exists(select * from %s where myreal <> 1729)", dstSchemaQualified)).Scan(&exists) + require.NoError(s.t, err) + require.False(s.t, exists) +} diff --git a/flow/pua/peerdb.go b/flow/pua/peerdb.go index 7330cb9d09..c44aeb497a 100644 --- a/flow/pua/peerdb.go +++ b/flow/pua/peerdb.go @@ -3,6 +3,7 @@ package pua import ( "bytes" "fmt" + "math" "math/big" "time" @@ -52,6 +53,7 @@ func RegisterTypes(ls *lua.LState) { mt = LuaRow.NewMetatable(ls) mt.RawSetString("__index", ls.NewFunction(LuaRowIndex)) + mt.RawSetString("__newindex", ls.NewFunction(LuaRowNewIndex)) mt.RawSetString("__len", ls.NewFunction(LuaRowLen)) mt = shared.LuaUuid.NewMetatable(ls) @@ -157,6 +159,178 @@ func LuaRowIndex(ls *lua.LState) int { return 1 } +func LVAsTime(ls *lua.LState, lv lua.LValue) time.Time { + switch v := lv.(type) { + case lua.LNumber: + ipart, fpart := math.Modf(float64(v)) + return time.Unix(int64(ipart), int64(fpart*1e9)) + case *lua.LUserData: + if tm, ok := v.Value.(time.Time); ok { + return tm + } + } + ls.RaiseError("Cannot convert %T to time.Time", lv) + return time.Time{} +} + +func LuaRowNewIndex(ls *lua.LState) int { + _, row := LuaRow.Check(ls, 1) + key := ls.CheckString(2) + val := ls.Get(3) + qv := row.GetColumnValue(key) + kind := qv.Kind() + if val == lua.LNil { + row.AddColumn(key, qvalue.QValueNull(kind)) + } + var newqv qvalue.QValue + switch kind { + case qvalue.QValueKindInvalid: + newqv = qvalue.QValueInvalid{Val: lua.LVAsString(val)} + case qvalue.QValueKindFloat32: + newqv = qvalue.QValueFloat32{Val: float32(lua.LVAsNumber(val))} + case qvalue.QValueKindFloat64: + newqv = qvalue.QValueFloat64{Val: float64(lua.LVAsNumber(val))} + case qvalue.QValueKindInt16: + newqv = qvalue.QValueInt16{Val: int16(lua.LVAsNumber(val))} + case qvalue.QValueKindInt32: + newqv = qvalue.QValueInt32{Val: int32(lua.LVAsNumber(val))} + case qvalue.QValueKindInt64: + switch v := val.(type) { + case lua.LNumber: + newqv = qvalue.QValueInt64{Val: int64(v)} + case *lua.LUserData: + switch i64 := v.Value.(type) { + case int64: + newqv = qvalue.QValueInt64{Val: i64} + case uint64: + newqv = qvalue.QValueInt64{Val: int64(i64)} + } + } + if newqv == nil { + ls.RaiseError("invalid int64") + } + case qvalue.QValueKindBoolean: + newqv = qvalue.QValueBoolean{Val: lua.LVAsBool(val)} + case qvalue.QValueKindQChar: + switch v := val.(type) { + case lua.LNumber: + newqv = qvalue.QValueQChar{Val: uint8(v)} + case lua.LString: + if len(v) > 0 { + newqv = qvalue.QValueQChar{Val: v[0]} + } + default: + ls.RaiseError("invalid \"char\"") + } + case qvalue.QValueKindString: + newqv = qvalue.QValueString{Val: lua.LVAsString(val)} + case qvalue.QValueKindTimestamp: + newqv = qvalue.QValueTimestamp{Val: LVAsTime(ls, val)} + case qvalue.QValueKindTimestampTZ: + newqv = qvalue.QValueTimestampTZ{Val: LVAsTime(ls, val)} + case qvalue.QValueKindDate: + newqv = qvalue.QValueDate{Val: LVAsTime(ls, val)} + case qvalue.QValueKindTime: + newqv = qvalue.QValueTime{Val: LVAsTime(ls, val)} + case qvalue.QValueKindTimeTZ: + newqv = qvalue.QValueTimeTZ{Val: LVAsTime(ls, val)} + case qvalue.QValueKindNumeric: + newqv = qvalue.QValueNumeric{Val: LVAsDecimal(ls, val)} + case qvalue.QValueKindBytes: + newqv = qvalue.QValueBytes{Val: []byte(lua.LVAsString(val))} + case qvalue.QValueKindUUID: + if ud, ok := val.(*lua.LUserData); ok { + if id, ok := ud.Value.(uuid.UUID); ok { + newqv = qvalue.QValueUUID{Val: [16]byte(id)} + } + } + case qvalue.QValueKindJSON: + newqv = qvalue.QValueJSON{Val: lua.LVAsString(val)} + case qvalue.QValueKindBit: + newqv = qvalue.QValueBit{Val: []byte(lua.LVAsString(val))} + case qvalue.QValueKindArrayFloat32: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayFloat32{ + Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) float32 { + return float32(lua.LVAsNumber(v)) + }), + } + } + case qvalue.QValueKindArrayFloat64: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayFloat64{ + Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) float64 { + return float64(lua.LVAsNumber(v)) + }), + } + } + case qvalue.QValueKindArrayInt16: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayFloat64{ + Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) float64 { + return float64(lua.LVAsNumber(v)) + }), + } + } + case qvalue.QValueKindArrayInt32: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayFloat64{ + Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) float64 { + return float64(lua.LVAsNumber(v)) + }), + } + } + case qvalue.QValueKindArrayInt64: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayFloat64{ + Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) float64 { + return float64(lua.LVAsNumber(v)) + }), + } + } + case qvalue.QValueKindArrayString: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayString{ + Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) string { + return lua.LVAsString(v) + }), + } + } + case qvalue.QValueKindArrayDate: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayDate{ + Val: shared.LTableToSlice(ls, tbl, LVAsTime), + } + } + case qvalue.QValueKindArrayTimestamp: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayDate{ + Val: shared.LTableToSlice(ls, tbl, LVAsTime), + } + } + case qvalue.QValueKindArrayTimestampTZ: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayDate{ + Val: shared.LTableToSlice(ls, tbl, LVAsTime), + } + } + case qvalue.QValueKindArrayBoolean: + if tbl, ok := val.(*lua.LTable); ok { + newqv = qvalue.QValueArrayBoolean{ + Val: shared.LTableToSlice(ls, tbl, func(_ *lua.LState, v lua.LValue) bool { + return lua.LVAsBool(v) + }), + } + } + default: + ls.RaiseError(fmt.Sprintf("no support for reassigning %s", kind)) + return 0 + } + + row.AddColumn(key, newqv) + return 1 +} + func LuaRowLen(ls *lua.LState) int { row := LuaRow.StartMethod(ls) ls.Push(lua.LNumber(len(row.ColToVal))) diff --git a/flow/pua/stream_adapter.go b/flow/pua/stream_adapter.go new file mode 100644 index 0000000000..0367134368 --- /dev/null +++ b/flow/pua/stream_adapter.go @@ -0,0 +1,33 @@ +package pua + +import ( + "github.com/yuin/gopher-lua" + + "github.com/PeerDB-io/peer-flow/model" +) + +func AttachToStream(ls *lua.LState, lfn *lua.LFunction, stream *model.QRecordStream) *model.QRecordStream { + output := model.NewQRecordStream(0) + go func() { + schema := stream.Schema() + output.SetSchema(schema) + for record := range stream.Records { + row := model.NewRecordItems(len(record)) + for i, qv := range record { + row.AddColumn(schema.Fields[i].Name, qv) + } + ls.Push(lfn) + ls.Push(LuaRow.New(ls, row)) + if err := ls.PCall(1, 0, nil); err != nil { + output.Close(err) + return + } + for i, field := range schema.Fields { + record[i] = row.GetColumnValue(field.Name) + } + output.Records <- record + } + output.Close(stream.Err()) + }() + return output +} diff --git a/flow/shared/lua.go b/flow/shared/lua.go index 2b95c3464c..26aeb4fe2a 100644 --- a/flow/shared/lua.go +++ b/flow/shared/lua.go @@ -18,7 +18,7 @@ var ( LuaDecimal = glua64.UserDataType[decimal.Decimal]{Name: "peerdb_decimal"} ) -func SliceToLTable[T any](ls *lua.LState, s []T, f func(x T) lua.LValue) *lua.LTable { +func SliceToLTable[T any](ls *lua.LState, s []T, f func(T) lua.LValue) *lua.LTable { tbl := ls.CreateTable(len(s), 0) tbl.Metatable = ls.GetTypeMetatable("Array") for idx, val := range s { @@ -26,3 +26,12 @@ func SliceToLTable[T any](ls *lua.LState, s []T, f func(x T) lua.LValue) *lua.LT } return tbl } + +func LTableToSlice[T any](ls *lua.LState, tbl *lua.LTable, f func(*lua.LState, lua.LValue) T) []T { + tlen := tbl.Len() + slice := make([]T, 0, tlen) + for i := range tlen { + slice = append(slice, f(ls, tbl.RawGetInt(i))) + } + return slice +} diff --git a/ui/app/mirrors/create/helpers/qrep.ts b/ui/app/mirrors/create/helpers/qrep.ts index 392439ee77..aa2b80d462 100644 --- a/ui/app/mirrors/create/helpers/qrep.ts +++ b/ui/app/mirrors/create/helpers/qrep.ts @@ -123,8 +123,14 @@ export const qrepSettings: MirrorSetting[] = [ ...curr, waitBetweenBatchesSeconds: parseInt(value as string, 10) || 30, })), - tips: 'Time to wait (in seconds) between getting partitions to process. The default is 30 seconds', + tips: 'Time to wait (in seconds) between getting partitions to process. The default is 30 seconds.', default: 30, type: 'number', }, + { + label: 'Script', + stateHandler: (value, setter) => + setter((curr: QRepConfig) => ({ ...curr, script: value as string })), + tips: 'Script to use for row transformations. The default is no scripting.', + }, ]; From 0116f5ebf3ee1a7325077a76acfe15e14adfc175 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 10 May 2024 10:40:47 -0400 Subject: [PATCH 5/6] Move to alpine to reduce vulns (#1710) reduces the number of high and medium vulns to zero. --- e2e_cleanup/go.mod | 2 +- flow/go.mod | 2 +- stacks/flow.Dockerfile | 8 ++++---- stacks/peerdb-server.Dockerfile | 12 ++++-------- stacks/peerdb-ui.Dockerfile | 7 +++---- 5 files changed, 13 insertions(+), 18 deletions(-) diff --git a/e2e_cleanup/go.mod b/e2e_cleanup/go.mod index 6fc5bcc146..e83a9ab49b 100644 --- a/e2e_cleanup/go.mod +++ b/e2e_cleanup/go.mod @@ -1,6 +1,6 @@ module github.com/PeerDB-io/peer-flow-cleanup -go 1.22.2 +go 1.22.3 require ( cloud.google.com/go/bigquery v1.59.1 diff --git a/flow/go.mod b/flow/go.mod index d6d94975d4..a96851e6e1 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -1,6 +1,6 @@ module github.com/PeerDB-io/peer-flow -go 1.22.2 +go 1.22.3 require ( cloud.google.com/go v0.112.2 diff --git a/stacks/flow.Dockerfile b/stacks/flow.Dockerfile index f991421c40..a8ac567d86 100644 --- a/stacks/flow.Dockerfile +++ b/stacks/flow.Dockerfile @@ -1,7 +1,7 @@ # syntax=docker/dockerfile:1.2 -FROM golang:1.22-bookworm AS builder -RUN apt-get update && apt-get install -y gcc libgeos-dev +FROM golang:1.22-alpine AS builder +RUN apk add --no-cache gcc geos-dev musl-dev WORKDIR /root/flow # first copy only go.mod and go.sum to cache dependencies @@ -18,8 +18,8 @@ WORKDIR /root/flow ENV CGO_ENABLED=1 RUN go build -ldflags="-s -w" -o /root/peer-flow -FROM debian:bookworm-slim AS flow-base -RUN apt-get update && apt-get install -y ca-certificates libgeos-c1v5 +FROM alpine:3.19 AS flow-base +RUN apk add --no-cache ca-certificates geos WORKDIR /root COPY --from=builder /root/peer-flow . diff --git a/stacks/peerdb-server.Dockerfile b/stacks/peerdb-server.Dockerfile index 1dbfb77b37..63c78624e9 100644 --- a/stacks/peerdb-server.Dockerfile +++ b/stacks/peerdb-server.Dockerfile @@ -1,6 +1,6 @@ # syntax=docker/dockerfile:1 -FROM lukemathwalker/cargo-chef:latest-rust-1.77-slim-bookworm as chef +FROM lukemathwalker/cargo-chef:latest-rust-alpine3.19 as chef WORKDIR /root FROM chef as planner @@ -9,10 +9,7 @@ WORKDIR /root/nexus RUN cargo chef prepare --recipe-path recipe.json FROM chef as builder -RUN apt-get update \ - && DEBIAN_FRONTEND=noninteractive \ - apt-get install --assume-yes --no-install-recommends \ - build-essential pkg-config curl unzip +RUN apk add --no-cache build-base pkgconfig curl unzip WORKDIR /root/nexus COPY scripts /root/scripts RUN /root/scripts/install-protobuf.sh @@ -24,9 +21,8 @@ COPY protos /root/protos WORKDIR /root/nexus RUN CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse cargo build --release --bin peerdb-server -FROM debian:bookworm-slim -RUN apt-get update && \ - apt-get install -y ca-certificates postgresql-client curl iputils-ping && \ +FROM alpine:3.19 +RUN apk add --no-cache ca-certificates postgresql-client curl iputils && \ mkdir -p /var/log/peerdb WORKDIR /root COPY --from=builder /root/nexus/target/release/peerdb-server . diff --git a/stacks/peerdb-ui.Dockerfile b/stacks/peerdb-ui.Dockerfile index 8630beb1dd..6d8ea2c5cc 100644 --- a/stacks/peerdb-ui.Dockerfile +++ b/stacks/peerdb-ui.Dockerfile @@ -1,10 +1,9 @@ # syntax=docker/dockerfile:1.2 # Base stage -FROM node:20-bookworm-slim AS base +FROM node:20-alpine AS base ENV NPM_CONFIG_UPDATE_NOTIFIER=false -RUN apt-get update && \ - apt-get install -y openssl && \ +RUN apk add --no-cache openssl && \ mkdir /app && \ chown -R node:node /app ENV NEXT_TELEMETRY_DISABLED 1 @@ -13,7 +12,7 @@ WORKDIR /app # Dependencies stage FROM base AS builder -COPY --chown=node:node ui/package.json ui/package-lock.json . +COPY --chown=node:node ui/package.json ui/package-lock.json ./ RUN npm ci COPY --chown=node:node ui/ . From 5f904dd4b089885d593ee35c727ad113189a7d96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 10 May 2024 15:02:09 +0000 Subject: [PATCH 6/6] kafka/pubsub: fix LSN potentially being updated too early (#1709) LSN should not be updated before success confirmed, as intermediate value may now be read before error aborts when queue is flushed With parallelism, lsn should be updated in critical section so that if earlier invocation is lagging behind LSN doesn't skip Also fix pubsub needing EnableMessageOrdering explicitly enabled --- flow/connectors/kafka/kafka.go | 55 +++++++++++++++++++++----------- flow/connectors/kafka/qrep.go | 12 +++---- flow/connectors/pubsub/pubsub.go | 51 +++++++++++++++++++++-------- flow/connectors/pubsub/qrep.go | 20 ++++++------ 4 files changed, 90 insertions(+), 48 deletions(-) diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index c58da5e50f..ec7decc50c 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -164,18 +164,18 @@ func lvalueToKafkaRecord(ls *lua.LState, value lua.LValue) (*kgo.Record, error) return kr, nil } +type poolResult struct { + records []*kgo.Record + lsn int64 +} + func (c *KafkaConnector) createPool( ctx context.Context, script string, flowJobName string, + lastSeenLSN *atomic.Int64, queueErr func(error), -) (*utils.LPool[[]*kgo.Record], error) { - produceCb := func(_ *kgo.Record, err error) { - if err != nil { - queueErr(err) - } - } - +) (*utils.LPool[poolResult], error) { return utils.LuaPool(func() (*lua.LState, error) { ls, err := utils.LoadScript(ctx, script, utils.LuaPrintFn(func(s string) { _ = c.LogFlowInfo(ctx, flowJobName, s) @@ -187,25 +187,40 @@ func (c *KafkaConnector) createPool( ls.Env.RawSetString("onRecord", ls.NewFunction(utils.DefaultOnRecord)) } return ls, nil - }, func(krs []*kgo.Record) { - for _, kr := range krs { - c.client.Produce(ctx, kr, produceCb) + }, func(result poolResult) { + lenRecords := int32(len(result.records)) + if lenRecords == 0 { + if lastSeenLSN != nil { + shared.AtomicInt64Max(lastSeenLSN, result.lsn) + } + } else { + recordCounter := atomic.Int32{} + recordCounter.Store(lenRecords) + for _, kr := range result.records { + c.client.Produce(ctx, kr, func(_ *kgo.Record, err error) { + if err != nil { + queueErr(err) + } else if recordCounter.Add(-1) == 0 && lastSeenLSN != nil { + shared.AtomicInt64Max(lastSeenLSN, result.lsn) + } + }) + } } }) } func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) { numRecords := atomic.Int64{} - tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) + lastSeenLSN := atomic.Int64{} queueCtx, queueErr := context.WithCancelCause(ctx) - pool, err := c.createPool(queueCtx, req.Script, req.FlowJobName, queueErr) + pool, err := c.createPool(queueCtx, req.Script, req.FlowJobName, &lastSeenLSN, queueErr) if err != nil { return nil, err } defer pool.Close() - lastSeenLSN := atomic.Int64{} + tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) flushLoopDone := make(chan struct{}) go func() { ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds()) @@ -244,12 +259,12 @@ Loop: break Loop } - pool.Run(func(ls *lua.LState) []*kgo.Record { + pool.Run(func(ls *lua.LState) poolResult { lfn := ls.Env.RawGetString("onRecord") fn, ok := lfn.(*lua.LFunction) if !ok { queueErr(fmt.Errorf("script should define `onRecord` as function, not %s", lfn)) - return nil + return poolResult{} } ls.Push(fn) @@ -257,7 +272,7 @@ Loop: err := ls.PCall(1, -1, nil) if err != nil { queueErr(fmt.Errorf("script failed: %w", err)) - return nil + return poolResult{} } args := ls.GetTop() @@ -266,7 +281,7 @@ Loop: kr, err := lvalueToKafkaRecord(ls, ls.Get(i-args)) if err != nil { queueErr(err) - return nil + return poolResult{} } if kr != nil { if kr.Topic == "" { @@ -278,8 +293,10 @@ Loop: } ls.SetTop(0) numRecords.Add(1) - shared.AtomicInt64Max(&lastSeenLSN, record.GetCheckpointID()) - return results + return poolResult{ + records: results, + lsn: record.GetCheckpointID(), + } }) case <-queueCtx.Done(): diff --git a/flow/connectors/kafka/qrep.go b/flow/connectors/kafka/qrep.go index a856ad1ccf..0c472ae0ae 100644 --- a/flow/connectors/kafka/qrep.go +++ b/flow/connectors/kafka/qrep.go @@ -29,7 +29,7 @@ func (c *KafkaConnector) SyncQRepRecords( schema := stream.Schema() queueCtx, queueErr := context.WithCancelCause(ctx) - pool, err := c.createPool(queueCtx, config.Script, config.FlowJobName, queueErr) + pool, err := c.createPool(queueCtx, config.Script, config.FlowJobName, nil, queueErr) if err != nil { return 0, err } @@ -44,7 +44,7 @@ Loop: break Loop } - pool.Run(func(ls *lua.LState) []*kgo.Record { + pool.Run(func(ls *lua.LState) poolResult { items := model.NewRecordItems(len(qrecord)) for i, val := range qrecord { items.AddColumn(schema.Fields[i].Name, val) @@ -61,7 +61,7 @@ Loop: fn, ok := lfn.(*lua.LFunction) if !ok { queueErr(fmt.Errorf("script should define `onRecord` as function, not %s", lfn)) - return nil + return poolResult{} } ls.Push(fn) @@ -69,7 +69,7 @@ Loop: err := ls.PCall(1, -1, nil) if err != nil { queueErr(fmt.Errorf("script failed: %w", err)) - return nil + return poolResult{} } args := ls.GetTop() @@ -78,7 +78,7 @@ Loop: kr, err := lvalueToKafkaRecord(ls, ls.Get(i-args)) if err != nil { queueErr(err) - return nil + return poolResult{} } if kr != nil { if kr.Topic == "" { @@ -89,7 +89,7 @@ Loop: } ls.SetTop(0) numRecords.Add(1) - return results + return poolResult{records: results} }) case <-queueCtx.Done(): diff --git a/flow/connectors/pubsub/pubsub.go b/flow/connectors/pubsub/pubsub.go index 0a8709b3b2..1ad3931432 100644 --- a/flow/connectors/pubsub/pubsub.go +++ b/flow/connectors/pubsub/pubsub.go @@ -76,6 +76,16 @@ type PubSubMessage struct { Topic string } +type poolResult struct { + messages []PubSubMessage + lsn int64 +} + +type publishResult struct { + *pubsub.PublishResult + lsn int64 +} + func lvalueToPubSubMessage(ls *lua.LState, value lua.LValue) (PubSubMessage, error) { var topic string var msg *pubsub.Message @@ -125,9 +135,9 @@ func (c *PubSubConnector) createPool( script string, flowJobName string, topiccache *topicCache, - publish chan<- *pubsub.PublishResult, + publish chan<- publishResult, queueErr func(error), -) (*utils.LPool[[]PubSubMessage], error) { +) (*utils.LPool[poolResult], error) { return utils.LuaPool(func() (*lua.LState, error) { ls, err := utils.LoadScript(ctx, script, utils.LuaPrintFn(func(s string) { _ = c.LogFlowInfo(ctx, flowJobName, s) @@ -139,10 +149,14 @@ func (c *PubSubConnector) createPool( ls.Env.RawSetString("onRecord", ls.NewFunction(utils.DefaultOnRecord)) } return ls, nil - }, func(messages []PubSubMessage) { - for _, message := range messages { + }, func(result poolResult) { + for _, message := range result.messages { topicClient, err := topiccache.GetOrSet(message.Topic, func() (*pubsub.Topic, error) { topicClient := c.client.Topic(message.Topic) + if message.OrderingKey != "" { + topicClient.EnableMessageOrdering = true + } + exists, err := topicClient.Exists(ctx) if err != nil { return nil, fmt.Errorf("error checking if topic exists: %w", err) @@ -160,7 +174,12 @@ func (c *PubSubConnector) createPool( return } - publish <- topicClient.Publish(ctx, message.Message) + publish <- publishResult{ + PublishResult: topicClient.Publish(ctx, message.Message), + } + } + publish <- publishResult{ + lsn: result.lsn, } }) } @@ -216,9 +235,10 @@ func (tc *topicCache) GetOrSet(topic string, f func() (*pubsub.Topic, error)) (* func (c *PubSubConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) { numRecords := atomic.Int64{} + lastSeenLSN := atomic.Int64{} tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) topiccache := topicCache{cache: make(map[string]*pubsub.Topic)} - publish := make(chan *pubsub.PublishResult, 32) + publish := make(chan publishResult, 32) waitChan := make(chan struct{}) queueCtx, queueErr := context.WithCancelCause(ctx) @@ -230,7 +250,9 @@ func (c *PubSubConnector) SyncRecords(ctx context.Context, req *model.SyncRecord go func() { for curpub := range publish { - if _, err := curpub.Get(ctx); err != nil { + if curpub.PublishResult == nil { + shared.AtomicInt64Max(&lastSeenLSN, curpub.lsn) + } else if _, err := curpub.Get(ctx); err != nil { queueErr(err) break } @@ -238,7 +260,6 @@ func (c *PubSubConnector) SyncRecords(ctx context.Context, req *model.SyncRecord close(waitChan) }() - lastSeenLSN := atomic.Int64{} flushLoopDone := make(chan struct{}) go func() { ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds()) @@ -274,12 +295,12 @@ Loop: break Loop } - pool.Run(func(ls *lua.LState) []PubSubMessage { + pool.Run(func(ls *lua.LState) poolResult { lfn := ls.Env.RawGetString("onRecord") fn, ok := lfn.(*lua.LFunction) if !ok { queueErr(fmt.Errorf("script should define `onRecord` as function, not %s", lfn)) - return nil + return poolResult{} } ls.Push(fn) @@ -287,7 +308,7 @@ Loop: err := ls.PCall(1, -1, nil) if err != nil { queueErr(fmt.Errorf("script failed: %w", err)) - return nil + return poolResult{} } args := ls.GetTop() @@ -296,7 +317,7 @@ Loop: msg, err := lvalueToPubSubMessage(ls, ls.Get(i-args)) if err != nil { queueErr(err) - return nil + return poolResult{} } if msg.Message != nil { if msg.Topic == "" { @@ -308,8 +329,10 @@ Loop: } ls.SetTop(0) numRecords.Add(1) - shared.AtomicInt64Max(&lastSeenLSN, record.GetCheckpointID()) - return results + return poolResult{ + messages: results, + lsn: record.GetCheckpointID(), + } }) case <-queueCtx.Done(): diff --git a/flow/connectors/pubsub/qrep.go b/flow/connectors/pubsub/qrep.go index c1f21edc4a..139b3ca210 100644 --- a/flow/connectors/pubsub/qrep.go +++ b/flow/connectors/pubsub/qrep.go @@ -28,7 +28,7 @@ func (c *PubSubConnector) SyncQRepRecords( numRecords := atomic.Int64{} schema := stream.Schema() topiccache := topicCache{cache: make(map[string]*pubsub.Topic)} - publish := make(chan *pubsub.PublishResult, 32) + publish := make(chan publishResult, 32) waitChan := make(chan struct{}) queueCtx, queueErr := context.WithCancelCause(ctx) @@ -40,9 +40,11 @@ func (c *PubSubConnector) SyncQRepRecords( go func() { for curpub := range publish { - if _, err := curpub.Get(ctx); err != nil { - queueErr(err) - break + if curpub.PublishResult != nil { + if _, err := curpub.Get(ctx); err != nil { + queueErr(err) + break + } } } close(waitChan) @@ -57,7 +59,7 @@ Loop: break Loop } - pool.Run(func(ls *lua.LState) []PubSubMessage { + pool.Run(func(ls *lua.LState) poolResult { items := model.NewRecordItems(len(qrecord)) for i, val := range qrecord { items.AddColumn(schema.Fields[i].Name, val) @@ -74,7 +76,7 @@ Loop: fn, ok := lfn.(*lua.LFunction) if !ok { queueErr(fmt.Errorf("script should define `onRecord` as function, not %s", lfn)) - return nil + return poolResult{} } ls.Push(fn) @@ -82,7 +84,7 @@ Loop: err := ls.PCall(1, -1, nil) if err != nil { queueErr(fmt.Errorf("script failed: %w", err)) - return nil + return poolResult{} } args := ls.GetTop() @@ -91,7 +93,7 @@ Loop: msg, err := lvalueToPubSubMessage(ls, ls.Get(i-args)) if err != nil { queueErr(err) - return nil + return poolResult{} } if msg.Message != nil { if msg.Topic == "" { @@ -102,7 +104,7 @@ Loop: } ls.SetTop(0) numRecords.Add(1) - return results + return poolResult{messages: results} }) case <-queueCtx.Done():