diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 38bc636f51..54919f0bde 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -86,6 +86,11 @@ jobs: env: PGPASSWORD: postgres + - name: start redpanda + uses: redpanda-data/github-action@v0.1.4 + with: + version: "latest" + - name: Install Temporal CLI uses: temporalio/setup-temporal@v0 diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 52ab1a8ca7..3d6e7815b7 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -400,6 +400,7 @@ func (a *FlowableActivity) SyncFlow( FlowJobName: flowName, TableMappings: options.TableMappings, StagingPath: config.CdcStagingPath, + Script: config.Script, }) if err != nil { a.Alerter.LogFlowError(ctx, flowName, err) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 554e177dc1..f0264608bb 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -533,9 +533,7 @@ func (h *FlowRequestHandler) CreatePeer( return wrongConfigResponse, nil } pgConfig := pgConfigObject.PostgresConfig - encodedConfig, encodingErr = proto.Marshal(pgConfig) - case protos.DBType_SNOWFLAKE: sfConfigObject, ok := config.(*protos.Peer_SnowflakeConfig) if !ok { @@ -566,13 +564,18 @@ func (h *FlowRequestHandler) CreatePeer( encodedConfig, encodingErr = proto.Marshal(s3Config) case protos.DBType_CLICKHOUSE: chConfigObject, ok := config.(*protos.Peer_ClickhouseConfig) - if !ok { return wrongConfigResponse, nil } - chConfig := chConfigObject.ClickhouseConfig encodedConfig, encodingErr = proto.Marshal(chConfig) + case protos.DBType_KAFKA: + kaConfigObject, ok := config.(*protos.Peer_KafkaConfig) + if !ok { + return wrongConfigResponse, nil + } + kaConfig := kaConfigObject.KafkaConfig + encodedConfig, encodingErr = proto.Marshal(kaConfig) default: return wrongConfigResponse, nil } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 39e31f8171..4251c1726a 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -11,6 +11,7 @@ import ( connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery" connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse" conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub" + connkafka "github.com/PeerDB-io/peer-flow/connectors/kafka" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" conns3 "github.com/PeerDB-io/peer-flow/connectors/s3" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" @@ -202,6 +203,8 @@ func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) { return connsqlserver.NewSQLServerConnector(ctx, inner.SqlserverConfig) case *protos.Peer_ClickhouseConfig: return connclickhouse.NewClickhouseConnector(ctx, inner.ClickhouseConfig) + case *protos.Peer_KafkaConfig: + return connkafka.NewKafkaConnector(ctx, inner.KafkaConfig) default: return nil, ErrUnsupportedFunctionality } @@ -260,6 +263,7 @@ var ( _ CDCSyncConnector = &connbigquery.BigQueryConnector{} _ CDCSyncConnector = &connsnowflake.SnowflakeConnector{} _ CDCSyncConnector = &conneventhub.EventHubConnector{} + _ CDCSyncConnector = &connkafka.KafkaConnector{} _ CDCSyncConnector = &conns3.S3Connector{} _ CDCSyncConnector = &connclickhouse.ClickhouseConnector{} diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index d34c1d9274..7d75298777 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -88,13 +88,7 @@ func (c *EventHubConnector) GetLastOffset(ctx context.Context, jobName string) ( } func (c *EventHubConnector) SetLastOffset(ctx context.Context, jobName string, offset int64) error { - err := c.pgMetadata.UpdateLastOffset(ctx, jobName, offset) - if err != nil { - c.logger.Error("failed to update last offset", slog.Any("error", err)) - return err - } - - return nil + return c.pgMetadata.UpdateLastOffset(ctx, jobName, offset) } // returns the number of records synced @@ -204,9 +198,7 @@ func (c *EventHubConnector) processBatch( } func (c *EventHubConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) { - batch := req.Records - - numRecords, err := c.processBatch(ctx, req.FlowJobName, batch) + numRecords, err := c.processBatch(ctx, req.FlowJobName, req.Records) if err != nil { c.logger.Error("failed to process batch", slog.Any("error", err)) return nil, err diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 5fffec0f69..bbe986b497 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -55,6 +55,13 @@ func (p *PostgresMetadataStore) Ping(ctx context.Context) error { return nil } +func (p *PostgresMetadataStore) LogFlowInfo(ctx context.Context, flowName string, info string) error { + _, err := p.pool.Exec(ctx, + "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)", + flowName, info, "info") + return err +} + func (p *PostgresMetadataStore) FetchLastOffset(ctx context.Context, jobName string) (int64, error) { row := p.pool.QueryRow(ctx, `SELECT last_offset FROM `+ diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go new file mode 100644 index 0000000000..b879e31def --- /dev/null +++ b/flow/connectors/kafka/kafka.go @@ -0,0 +1,338 @@ +package connkafka + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "log/slog" + "strings" + "sync" + "unsafe" + + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/sasl/plain" + "github.com/twmb/franz-go/pkg/sasl/scram" + "github.com/twmb/franz-go/plugin/kslog" + "github.com/yuin/gopher-lua" + "go.temporal.io/sdk/log" + + "github.com/PeerDB-io/gluaflatbuffers" + metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/logger" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/peerdbenv" + "github.com/PeerDB-io/peer-flow/pua" +) + +type KafkaConnector struct { + client *kgo.Client + pgMetadata *metadataStore.PostgresMetadataStore + logger log.Logger +} + +func unsafeFastStringToReadOnlyBytes(s string) []byte { + return unsafe.Slice(unsafe.StringData(s), len(s)) +} + +func LVAsReadOnlyBytes(ls *lua.LState, v lua.LValue) ([]byte, error) { + str, err := LVAsStringOrNil(ls, v) + if err != nil { + return nil, err + } else if str == "" { + return nil, nil + } else { + return unsafeFastStringToReadOnlyBytes(str), nil + } +} + +func LVAsStringOrNil(ls *lua.LState, v lua.LValue) (string, error) { + if lstr, ok := v.(lua.LString); ok { + return string(lstr), nil + } else if v == lua.LNil { + return "", nil + } else { + return "", fmt.Errorf("invalid bytes, must be nil or string: %s", v) + } +} + +func NewKafkaConnector( + ctx context.Context, + config *protos.KafkaConfig, +) (*KafkaConnector, error) { + optionalOpts := append( + make([]kgo.Opt, 0, 7), + kgo.SeedBrokers(config.Servers...), + kgo.AllowAutoTopicCreation(), + kgo.WithLogger(kslog.New(slog.Default())), // TODO use logger.LoggerFromCtx + kgo.SoftwareNameAndVersion("peerdb", peerdbenv.PeerDBVersionShaShort()), + ) + if !config.DisableTls { + optionalOpts = append(optionalOpts, kgo.DialTLSConfig(&tls.Config{MinVersion: tls.VersionTLS13})) + } + switch config.Partitioner { + case "LeastBackup": + optionalOpts = append(optionalOpts, kgo.RecordPartitioner(kgo.LeastBackupPartitioner())) + case "Manual": + optionalOpts = append(optionalOpts, kgo.RecordPartitioner(kgo.ManualPartitioner())) + case "RoundRobin": + optionalOpts = append(optionalOpts, kgo.RecordPartitioner(kgo.RoundRobinPartitioner())) + case "StickyKey": + optionalOpts = append(optionalOpts, kgo.RecordPartitioner(kgo.StickyKeyPartitioner(nil))) + case "Sticky": + optionalOpts = append(optionalOpts, kgo.RecordPartitioner(kgo.StickyPartitioner())) + } + if config.Username != "" { + switch config.Sasl { + case "PLAIN": + auth := plain.Auth{User: config.Username, Pass: config.Password} + optionalOpts = append(optionalOpts, kgo.SASL(auth.AsMechanism())) + case "SCRAM-SHA-256": + auth := scram.Auth{User: config.Username, Pass: config.Password} + optionalOpts = append(optionalOpts, kgo.SASL(auth.AsSha256Mechanism())) + case "SCRAM-SHA-512": + auth := scram.Auth{User: config.Username, Pass: config.Password} + optionalOpts = append(optionalOpts, kgo.SASL(auth.AsSha512Mechanism())) + default: + return nil, fmt.Errorf("unsupported SASL mechanism: %s", config.Sasl) + } + } + client, err := kgo.NewClient(optionalOpts...) + if err != nil { + return nil, fmt.Errorf("failed to create kafka client: %w", err) + } + + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) + if err != nil { + return nil, err + } + + return &KafkaConnector{ + client: client, + pgMetadata: pgMetadata, + logger: logger.LoggerFromCtx(ctx), + }, nil +} + +func (c *KafkaConnector) Close() error { + if c != nil { + c.client.Close() + } + return nil +} + +func (c *KafkaConnector) ConnectionActive(ctx context.Context) error { + return c.client.Ping(ctx) +} + +func (c *KafkaConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { + return &protos.CreateRawTableOutput{TableIdentifier: "n/a"}, nil +} + +func (c *KafkaConnector) GetLastSyncBatchID(ctx context.Context, jobName string) (int64, error) { + return c.pgMetadata.GetLastBatchID(ctx, jobName) +} + +func (c *KafkaConnector) GetLastOffset(ctx context.Context, jobName string) (int64, error) { + return c.pgMetadata.FetchLastOffset(ctx, jobName) +} + +func (c *KafkaConnector) SetLastOffset(ctx context.Context, jobName string, offset int64) error { + return c.pgMetadata.UpdateLastOffset(ctx, jobName, offset) +} + +func (c *KafkaConnector) NeedsSetupMetadataTables(_ context.Context) bool { + return false +} + +func (c *KafkaConnector) SetupMetadataTables(_ context.Context) error { + return nil +} + +func (c *KafkaConnector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error { + return nil +} + +func (c *KafkaConnector) SyncFlowCleanup(ctx context.Context, jobName string) error { + return c.pgMetadata.DropMetadata(ctx, jobName) +} + +func loadScript(ctx context.Context, script string, printfn lua.LGFunction) (*lua.LState, error) { + if script == "" { + return nil, errors.New("kafka mirror must have script") + } + + ls := lua.NewState(lua.Options{SkipOpenLibs: true}) + ls.SetContext(ctx) + for _, pair := range []struct { + n string + f lua.LGFunction + }{ + {lua.LoadLibName, lua.OpenPackage}, // Must be first + {lua.BaseLibName, lua.OpenBase}, + {lua.TabLibName, lua.OpenTable}, + {lua.StringLibName, lua.OpenString}, + {lua.MathLibName, lua.OpenMath}, + } { + ls.Push(ls.NewFunction(pair.f)) + ls.Push(lua.LString(pair.n)) + err := ls.PCall(1, 0, nil) + if err != nil { + return nil, fmt.Errorf("failed to initialize Lua runtime: %w", err) + } + } + ls.PreloadModule("flatbuffers", gluaflatbuffers.Loader) + pua.RegisterTypes(ls) + ls.Env.RawSetString("print", ls.NewFunction(printfn)) + err := ls.GPCall(pua.LoadPeerdbScript, lua.LString(script)) + if err != nil { + return nil, fmt.Errorf("error loading script %s: %w", script, err) + } + err = ls.PCall(0, 0, nil) + if err != nil { + return nil, fmt.Errorf("error executing script %s: %w", script, err) + } + return ls, nil +} + +func lvalueToKafkaRecord(ls *lua.LState, value lua.LValue) (*kgo.Record, error) { + var kr *kgo.Record + switch v := value.(type) { + case lua.LString: + kr = kgo.StringRecord(string(v)) + case *lua.LTable: + key, err := LVAsReadOnlyBytes(ls, ls.GetField(v, "key")) + if err != nil { + return nil, fmt.Errorf("invalid key, %w", err) + } + value, err := LVAsReadOnlyBytes(ls, ls.GetField(v, "value")) + if err != nil { + return nil, fmt.Errorf("invalid value, %w", err) + } + topic, err := LVAsStringOrNil(ls, ls.GetField(v, "topic")) + if err != nil { + return nil, fmt.Errorf("invalid topic, %w", err) + } + partition := int32(lua.LVAsNumber(ls.GetField(v, "partition"))) + kr = &kgo.Record{ + Key: key, + Value: value, + Topic: topic, + Partition: partition, + } + lheaders := ls.GetField(v, "headers") + if headers, ok := lheaders.(*lua.LTable); ok { + headers.ForEach(func(k, v lua.LValue) { + kstr := k.String() + vbytes, err := LVAsReadOnlyBytes(ls, v) + if err != nil { + vbytes = unsafeFastStringToReadOnlyBytes(err.Error()) + } + kr.Headers = append(kr.Headers, kgo.RecordHeader{ + Key: kstr, + Value: vbytes, + }) + }) + } else if lua.LVAsBool(lheaders) { + return nil, fmt.Errorf("invalid headers, must be nil or table: %s", lheaders) + } + case *lua.LNilType: + default: + return nil, fmt.Errorf("script returned invalid value: %s", value) + } + return kr, nil +} + +func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) { + var wg sync.WaitGroup + wgCtx, wgErr := context.WithCancelCause(ctx) + produceCb := func(r *kgo.Record, err error) { + if err != nil { + wgErr(err) + } + wg.Done() + } + + numRecords := int64(0) + tableNameRowsMapping := make(map[string]uint32) + + ls, err := loadScript(wgCtx, req.Script, func(ls *lua.LState) int { + top := ls.GetTop() + ss := make([]string, top) + for i := range top { + ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() + } + _ = c.pgMetadata.LogFlowInfo(ctx, req.FlowJobName, strings.Join(ss, "\t")) + return 0 + }) + if err != nil { + return nil, err + } + defer ls.Close() + + lfn := ls.Env.RawGetString("onRecord") + fn, ok := lfn.(*lua.LFunction) + if !ok { + return nil, fmt.Errorf("script should define `onRecord` as function, not %s", lfn) + } + + for record := range req.Records.GetRecords() { + if err := wgCtx.Err(); err != nil { + return nil, err + } + ls.Push(fn) + ls.Push(pua.LuaRecord.New(ls, record)) + err := ls.PCall(1, -1, nil) + if err != nil { + return nil, fmt.Errorf("script failed: %w", err) + } + args := ls.GetTop() + for i := range args { + kr, err := lvalueToKafkaRecord(ls, ls.Get(i-args)) + if err != nil { + return nil, err + } + if kr != nil { + if kr.Topic == "" { + kr.Topic = record.GetDestinationTableName() + } + + wg.Add(1) + c.client.Produce(wgCtx, kr, produceCb) + tableNameRowsMapping[kr.Topic] += 1 + } + } + numRecords += 1 + ls.SetTop(0) + } + + waitChan := make(chan struct{}) + go func() { + wg.Wait() + close(waitChan) + }() + select { + case <-wgCtx.Done(): + return nil, wgCtx.Err() + case <-waitChan: + } + + if err := c.client.Flush(ctx); err != nil { + return nil, fmt.Errorf("could not flush transaction: %w", err) + } + + lastCheckpoint := req.Records.GetLastCheckpoint() + err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint) + if err != nil { + return nil, err + } + + return &model.SyncResponse{ + CurrentSyncBatchID: req.SyncBatchID, + LastSyncedCheckpointID: lastCheckpoint, + NumRecordsSynced: numRecords, + TableNameRowsMapping: tableNameRowsMapping, + TableSchemaDeltas: req.Records.SchemaDeltas, + }, nil +} diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index acf3ba7a40..e5bad73d62 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -35,7 +35,7 @@ type PostgresCDCSource struct { slot string publication string typeMap *pgtype.Map - commitLock bool + commitLock *pglogrepl.BeginMessage // for partitioned tables, maps child relid to parent relid childToParentRelIDMapping map[uint32]uint32 @@ -75,7 +75,7 @@ func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) * publication: cdcConfig.Publication, childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap, typeMap: pgtype.NewMap(), - commitLock: false, + commitLock: nil, catalogPool: cdcConfig.CatalogPool, flowJobName: cdcConfig.FlowJobName, } @@ -188,7 +188,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco } } - if !p.commitLock { + if p.commitLock == nil { if cdcRecordsStorage.Len() >= int(req.MaxBatchSize) { return nil } @@ -208,7 +208,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco if !cdcRecordsStorage.IsEmpty() { p.logger.Info(fmt.Sprintf("standby deadline reached, have %d records", cdcRecordsStorage.Len())) - if !p.commitLock { + if p.commitLock == nil { p.logger.Info( fmt.Sprintf("no commit lock, returning currently accumulated records - %d", cdcRecordsStorage.Len())) @@ -241,7 +241,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco return fmt.Errorf("consumeStream preempted: %w", ctxErr) } - if err != nil && !p.commitLock { + if err != nil && p.commitLock == nil { if pgconn.Timeout(err) { p.logger.Info(fmt.Sprintf("Stand-by deadline reached, returning currently accumulated records - %d", cdcRecordsStorage.Len())) @@ -408,6 +408,17 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco } } +func (p *PostgresCDCSource) baseRecord(lsn pglogrepl.LSN) model.BaseRecord { + var nano int64 + if p.commitLock != nil { + nano = p.commitLock.CommitTime.UnixNano() + } + return model.BaseRecord{ + CheckpointID: int64(lsn), + CommitTimeNano: nano, + } +} + func (p *PostgresCDCSource) processMessage( ctx context.Context, batch *model.CDCRecordStream, @@ -423,7 +434,7 @@ func (p *PostgresCDCSource) processMessage( case *pglogrepl.BeginMessage: p.logger.Debug(fmt.Sprintf("BeginMessage => FinalLSN: %v, XID: %v", msg.FinalLSN, msg.Xid)) p.logger.Debug("Locking PullRecords at BeginMessage, awaiting CommitMessage") - p.commitLock = true + p.commitLock = msg case *pglogrepl.InsertMessage: return p.processInsertMessage(xld.WALStart, msg) case *pglogrepl.UpdateMessage: @@ -435,7 +446,7 @@ func (p *PostgresCDCSource) processMessage( p.logger.Debug(fmt.Sprintf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v", msg.CommitLSN, msg.TransactionEndLSN)) batch.UpdateLatestCheckpoint(int64(msg.CommitLSN)) - p.commitLock = false + p.commitLock = nil case *pglogrepl.RelationMessage: // treat all relation messages as corresponding to parent if partitioned. msg.RelationID = p.getParentRelIDIfPartitioned(msg.RelationID) @@ -483,7 +494,7 @@ func (p *PostgresCDCSource) processInsertMessage( } return &model.InsertRecord{ - CheckpointID: int64(lsn), + BaseRecord: p.baseRecord(lsn), Items: items, DestinationTableName: p.tableNameMapping[tableName].Name, SourceTableName: tableName, @@ -524,7 +535,7 @@ func (p *PostgresCDCSource) processUpdateMessage( } return &model.UpdateRecord{ - CheckpointID: int64(lsn), + BaseRecord: p.baseRecord(lsn), OldItems: oldItems, NewItems: newItems, DestinationTableName: p.tableNameMapping[tableName].Name, @@ -561,7 +572,7 @@ func (p *PostgresCDCSource) processDeleteMessage( } return &model.DeleteRecord{ - CheckpointID: int64(lsn), + BaseRecord: p.baseRecord(lsn), Items: items, DestinationTableName: p.tableNameMapping[tableName].Name, SourceTableName: tableName, @@ -765,8 +776,8 @@ func (p *PostgresCDCSource) processRelationMessage( // only log audit if there is actionable delta if len(schemaDelta.AddedColumns) > 0 { rec := &model.RelationRecord{ + BaseRecord: p.baseRecord(lsn), TableSchemaDelta: schemaDelta, - CheckpointID: int64(lsn), } return rec, p.auditSchemaDelta(ctx, p.flowJobName, rec) } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index bd6721c076..c57154bd7c 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -404,7 +404,7 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) c.logger.Info(fmt.Sprintf("pushing records to Postgres table %s via COPY", rawTableIdentifier)) - numRecords := 0 + numRecords := int64(0) tableNameRowsMapping := make(map[string]uint32) streamReadFunc := func() ([]any, error) { @@ -509,7 +509,7 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco if err != nil { return nil, fmt.Errorf("error syncing records: %w", err) } - if syncedRecordsCount != int64(numRecords) { + if syncedRecordsCount != numRecords { return nil, fmt.Errorf("error syncing records: expected %d records to be synced, but %d were synced", numRecords, syncedRecordsCount) } @@ -536,7 +536,7 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco return &model.SyncResponse{ LastSyncedCheckpointID: lastCP, - NumRecordsSynced: int64(numRecords), + NumRecordsSynced: numRecords, CurrentSyncBatchID: req.SyncBatchID, TableNameRowsMapping: tableNameRowsMapping, TableSchemaDeltas: req.Records.SchemaDeltas, diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index 50796109f3..5acbc5962a 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -47,9 +47,12 @@ func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) { PkeyColVal: [32]byte(pkeyColVal), } rec := &model.InsertRecord{ + BaseRecord: model.BaseRecord{ + CheckpointID: 1, + CommitTimeNano: time.Now().UnixNano(), + }, SourceTableName: "test_src_tbl", DestinationTableName: "test_dst_tbl", - CheckpointID: 1, CommitID: 2, Items: &model.RecordItems{ ColToValIdx: map[string]int{ diff --git a/flow/e2e/kafka/kafka_test.go b/flow/e2e/kafka/kafka_test.go new file mode 100644 index 0000000000..f35e0f568a --- /dev/null +++ b/flow/e2e/kafka/kafka_test.go @@ -0,0 +1,137 @@ +package e2e_kafka + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kgo" + + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" + "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" + peerflow "github.com/PeerDB-io/peer-flow/workflows" +) + +type KafkaSuite struct { + t *testing.T + conn *connpostgres.PostgresConnector + suffix string +} + +func (s KafkaSuite) T() *testing.T { + return s.t +} + +func (s KafkaSuite) Connector() *connpostgres.PostgresConnector { + return s.conn +} + +func (s KafkaSuite) Conn() *pgx.Conn { + return s.Connector().Conn() +} + +func (s KafkaSuite) Suffix() string { + return s.suffix +} + +func (s KafkaSuite) Peer() *protos.Peer { + return &protos.Peer{ + Name: e2e.AddSuffix(s, "kasimple"), + Type: protos.DBType_KAFKA, + Config: &protos.Peer_KafkaConfig{ + KafkaConfig: &protos.KafkaConfig{ + Servers: []string{"localhost:9092"}, + DisableTls: true, + }, + }, + } +} + +func (s KafkaSuite) DestinationTable(table string) string { + return table +} + +func (s KafkaSuite) Teardown() { + e2e.TearDownPostgres(s) +} + +func SetupSuite(t *testing.T) KafkaSuite { + t.Helper() + + suffix := "ka_" + strings.ToLower(shared.RandomString(8)) + conn, err := e2e.SetupPostgres(t, suffix) + require.NoError(t, err, "failed to setup postgres") + + return KafkaSuite{ + t: t, + conn: conn, + suffix: suffix, + } +} + +func Test_Kafka(t *testing.T) { + e2eshared.RunSuite(t, SetupSuite) +} + +func (s KafkaSuite) TestSimple() { + srcTableName := e2e.AttachSchema(s, "kasimple") + + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + val text + ); + `, srcTableName)) + require.NoError(s.t, err) + + _, err = s.Conn().Exec(context.Background(), `insert into public.scripts (name, lang, source) values + ('e2e_kasimple', 'lua', 'function onRecord(r) return r.row and r.row.val end') on conflict do nothing`) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: e2e.AddSuffix(s, "kasimple"), + TableNameMapping: map[string]string{srcTableName: "katest"}, + Destination: s.Peer(), + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() + flowConnConfig.Script = "e2e_kasimple" + + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s (id, val) VALUES (1, 'testval') + `, srcTableName)) + require.NoError(s.t, err) + + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize insert", func() bool { + kafka, err := kgo.NewClient( + kgo.SeedBrokers("localhost:9092"), + kgo.ConsumeTopics("katest"), + ) + if err != nil { + return false + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + fetches := kafka.PollFetches(ctx) + fetches.EachTopic(func(ft kgo.FetchTopic) { + require.Equal(s.t, "katest", ft.Topic) + ft.EachRecord(func(r *kgo.Record) { + require.Equal(s.t, "testval", string(r.Value)) + }) + }) + return true + }) + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) +} diff --git a/flow/go.mod b/flow/go.mod index 154a068d20..5a5fb83094 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -10,6 +10,9 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.0.4 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0 github.com/ClickHouse/clickhouse-go/v2 v2.21.1 + github.com/PeerDB-io/glua64 v1.0.1 + github.com/PeerDB-io/gluabit32 v1.0.0 + github.com/PeerDB-io/gluaflatbuffers v1.0.1 github.com/aws/aws-sdk-go-v2 v1.25.3 github.com/aws/aws-sdk-go-v2/config v1.27.7 github.com/aws/aws-sdk-go-v2/credentials v1.17.7 @@ -35,9 +38,12 @@ require ( github.com/slack-go/slack v0.12.5 github.com/snowflakedb/gosnowflake v1.8.0 github.com/stretchr/testify v1.9.0 + github.com/twmb/franz-go v1.16.1 + github.com/twmb/franz-go/plugin/kslog v1.0.0 github.com/twpayne/go-geos v0.17.0 github.com/urfave/cli/v3 v3.0.0-alpha9 github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a + github.com/yuin/gopher-lua v1.1.1 go.temporal.io/api v1.29.1 go.temporal.io/sdk v1.26.0 go.uber.org/automaxprocs v1.5.3 @@ -91,6 +97,7 @@ require ( github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect + github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect go.opentelemetry.io/otel v1.24.0 // indirect diff --git a/flow/go.sum b/flow/go.sum index 377b2c2a16..aacfa2da10 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -54,6 +54,12 @@ github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ= github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= +github.com/PeerDB-io/glua64 v1.0.1 h1:biXLlFF/L5pnJCwDon7hkWkuQPozC8NjKS3J7Wzi69I= +github.com/PeerDB-io/glua64 v1.0.1/go.mod h1:UHmAhniv61bJPMhQvxkpC7jXbn353dSbQviu83bgQVg= +github.com/PeerDB-io/gluabit32 v1.0.0 h1:jn88j22LqiqDoS47LUnvk29hsFQE6yW0imSsHHiYsV8= +github.com/PeerDB-io/gluabit32 v1.0.0/go.mod h1:tsHStN1XG5uGVWEA8d/RameB7el3PE3sVkvk8e3+FJg= +github.com/PeerDB-io/gluaflatbuffers v1.0.1 h1:Oxlv0VlMYoQ05Q5n/k4hXAsvtDnuVNC99JBUf927br0= +github.com/PeerDB-io/gluaflatbuffers v1.0.1/go.mod h1:unZOM4Mm2Sn+aAFuVjoJDZ2Dji7jlDWrt4Hvq79as2g= github.com/alecthomas/assert/v2 v2.6.0 h1:o3WJwILtexrEUk3cUVal3oiQY2tfgr/FHWiz/v2n4FU= github.com/alecthomas/assert/v2 v2.6.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= @@ -367,6 +373,12 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/twmb/franz-go v1.16.1 h1:rpWc7fB9jd7TgmCyfxzenBI+QbgS8ZfJOUQE+tzPtbE= +github.com/twmb/franz-go v1.16.1/go.mod h1:/pER254UPPGp/4WfGqRi+SIRGE50RSQzVubQp6+N4FA= +github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E= +github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= +github.com/twmb/franz-go/plugin/kslog v1.0.0 h1:I64oEmF+0PDvmyLgwrlOtg4mfpSE9GwlcLxM4af2t60= +github.com/twmb/franz-go/plugin/kslog v1.0.0/go.mod h1:8pMjK3OJJJNNYddBSbnXZkIK5dCKFIk9GcVVCDgvnQc= github.com/twpayne/go-geos v0.17.0 h1:158IwlZxA5Q1qWpBrP90dG3B8mQ5yb5RA4SPhR2CJ2E= github.com/twpayne/go-geos v0.17.0/go.mod h1:OgP9eXBQBbU1Qi2IR3kF608WTd9YtRXZP0FugQ0POi0= github.com/urfave/cli/v3 v3.0.0-alpha9 h1:P0RMy5fQm1AslQS+XCmy9UknDXctOmG/q/FZkUFnJSo= @@ -383,6 +395,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= diff --git a/flow/model/model.go b/flow/model/model.go index 9e38e237de..1561e66544 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -14,9 +14,12 @@ type NameAndExclude struct { } func NewNameAndExclude(name string, exclude []string) NameAndExclude { - exset := make(map[string]struct{}, len(exclude)) - for _, col := range exclude { - exset[col] = struct{}{} + var exset map[string]struct{} + if len(exclude) != 0 { + exset = make(map[string]struct{}, len(exclude)) + for _, col := range exclude { + exset[col] = struct{}{} + } } return NameAndExclude{Name: name, Exclude: exset} } @@ -45,10 +48,10 @@ type PullRecordsRequest struct { } type Record interface { - // GetCheckpointID returns the ID of the record. GetCheckpointID() int64 - // get table name + GetCommitTime() time.Time GetDestinationTableName() string + GetSourceTableName() string // get columns and values for the record GetItems() *RecordItems } @@ -59,9 +62,12 @@ type ToJSONOptions struct { } func NewToJSONOptions(unnestCols []string, hstoreAsJSON bool) *ToJSONOptions { - unnestColumns := make(map[string]struct{}, len(unnestCols)) - for _, col := range unnestCols { - unnestColumns[col] = struct{}{} + var unnestColumns map[string]struct{} + if len(unnestCols) != 0 { + unnestColumns = make(map[string]struct{}, len(unnestCols)) + for _, col := range unnestCols { + unnestColumns[col] = struct{}{} + } } return &ToJSONOptions{ UnnestColumns: unnestColumns, @@ -69,37 +75,49 @@ func NewToJSONOptions(unnestCols []string, hstoreAsJSON bool) *ToJSONOptions { } } +type BaseRecord struct { + // CheckpointID is the ID of the record. + CheckpointID int64 `json:"checkpointId"` + // BeginMessage.CommitTime.UnixNano(), 16 bytes smaller than time.Time + CommitTimeNano int64 `json:"commitTimeNano"` +} + +func (r *BaseRecord) GetCheckpointID() int64 { + return r.CheckpointID +} + +func (r *BaseRecord) GetCommitTime() time.Time { + return time.Unix(0, r.CommitTimeNano) +} + type InsertRecord struct { + BaseRecord // Name of the source table SourceTableName string // Name of the destination table DestinationTableName string - // CheckpointID is the ID of the record. - CheckpointID int64 // CommitID is the ID of the commit corresponding to this record. CommitID int64 // Items is a map of column name to value. Items *RecordItems } -// Implement Record interface for InsertRecord. -func (r *InsertRecord) GetCheckpointID() int64 { - return r.CheckpointID -} - func (r *InsertRecord) GetDestinationTableName() string { return r.DestinationTableName } +func (r *InsertRecord) GetSourceTableName() string { + return r.SourceTableName +} + func (r *InsertRecord) GetItems() *RecordItems { return r.Items } type UpdateRecord struct { + BaseRecord // Name of the source table SourceTableName string - // CheckpointID is the ID of the record. - CheckpointID int64 // Name of the destination table DestinationTableName string // OldItems is a map of column name to value. @@ -110,42 +128,38 @@ type UpdateRecord struct { UnchangedToastColumns map[string]struct{} } -// Implement Record interface for UpdateRecord. -func (r *UpdateRecord) GetCheckpointID() int64 { - return r.CheckpointID -} - -// Implement Record interface for UpdateRecord. func (r *UpdateRecord) GetDestinationTableName() string { return r.DestinationTableName } +func (r *UpdateRecord) GetSourceTableName() string { + return r.SourceTableName +} + func (r *UpdateRecord) GetItems() *RecordItems { return r.NewItems } type DeleteRecord struct { + BaseRecord // Name of the source table SourceTableName string // Name of the destination table DestinationTableName string - // CheckpointID is the ID of the record. - CheckpointID int64 // Items is a map of column name to value. Items *RecordItems // unchanged toast columns, filled from latest UpdateRecord UnchangedToastColumns map[string]struct{} } -// Implement Record interface for DeleteRecord. -func (r *DeleteRecord) GetCheckpointID() int64 { - return r.CheckpointID -} - func (r *DeleteRecord) GetDestinationTableName() string { return r.DestinationTableName } +func (r *DeleteRecord) GetSourceTableName() string { + return r.SourceTableName +} + func (r *DeleteRecord) GetItems() *RecordItems { return r.Items } @@ -165,6 +179,8 @@ type SyncRecordsRequest struct { TableMappings []*protos.TableMapping // Staging path for AVRO files in CDC StagingPath string + // Lua script + Script string } type NormalizeRecordsRequest struct { @@ -204,19 +220,18 @@ type NormalizeResponse struct { // being clever and passing the delta back as a regular record instead of heavy CDC refactoring. type RelationRecord struct { - CheckpointID int64 `json:"checkpointId"` + BaseRecord TableSchemaDelta *protos.TableSchemaDelta `json:"tableSchemaDelta"` } -// Implement Record interface for RelationRecord. -func (r *RelationRecord) GetCheckpointID() int64 { - return r.CheckpointID -} - func (r *RelationRecord) GetDestinationTableName() string { return r.TableSchemaDelta.DstTableName } +func (r *RelationRecord) GetSourceTableName() string { + return r.TableSchemaDelta.SrcTableName +} + func (r *RelationRecord) GetItems() *RecordItems { return nil } diff --git a/flow/model/qrecord_test.go b/flow/model/qrecord_test.go index 12b59c67bb..e6c769fd69 100644 --- a/flow/model/qrecord_test.go +++ b/flow/model/qrecord_test.go @@ -12,8 +12,8 @@ import ( ) func TestEquals(t *testing.T) { - uuidVal1, _ := uuid.NewRandom() - uuidVal2, _ := uuid.NewRandom() + uuidVal1 := uuid.New() + uuidVal2 := uuid.New() tests := []struct { name string diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 4a495a31cc..bedc3decec 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -606,10 +606,7 @@ func getUUID(v interface{}) (uuid.UUID, bool) { return parsed, true } case [16]byte: - parsed, err := uuid.FromBytes(value[:]) - if err == nil { - return parsed, true - } + return uuid.UUID(value), true } return uuid.UUID{}, false diff --git a/flow/model/record_items.go b/flow/model/record_items.go index fe9da58af9..a258f78be5 100644 --- a/flow/model/record_items.go +++ b/flow/model/record_items.go @@ -236,8 +236,7 @@ func (r *RecordItems) ToJSONWithOptions(options *ToJSONOptions) (string, error) } func (r *RecordItems) ToJSON() (string, error) { - unnestCols := make([]string, 0) - return r.ToJSONWithOpts(NewToJSONOptions(unnestCols, true)) + return r.ToJSONWithOpts(NewToJSONOptions(nil, true)) } func (r *RecordItems) ToJSONWithOpts(opts *ToJSONOptions) (string, error) { diff --git a/flow/pua/peerdb.go b/flow/pua/peerdb.go new file mode 100644 index 0000000000..c8f9f8884c --- /dev/null +++ b/flow/pua/peerdb.go @@ -0,0 +1,442 @@ +package pua + +import ( + "bytes" + "fmt" + "math/big" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/shopspring/decimal" + "github.com/yuin/gopher-lua" + + "github.com/PeerDB-io/glua64" + "github.com/PeerDB-io/gluabit32" + "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" +) + +var ( + LuaRecord = glua64.UserDataType[model.Record]{Name: "peerdb_record"} + LuaRow = glua64.UserDataType[*model.RecordItems]{Name: "peerdb_row"} + LuaTime = glua64.UserDataType[time.Time]{Name: "peerdb_time"} + LuaUuid = glua64.UserDataType[uuid.UUID]{Name: "peerdb_uuid"} + LuaBigInt = glua64.UserDataType[*big.Int]{Name: "peerdb_bigint"} + LuaDecimal = glua64.UserDataType[decimal.Decimal]{Name: "peerdb_bigrat"} +) + +func RegisterTypes(ls *lua.LState) { + glua64.Loader(ls) + ls.Env.RawSetString("loadfile", lua.LNil) + ls.Env.RawSetString("dofile", lua.LNil) + + // gopher-lua provides 2 loaders {preload, file} + // overwrite file loader with one retrieving scripts from database + loaders := ls.G.Registry.RawGetString("_LOADERS").(*lua.LTable) + loaders.RawSetInt(2, ls.NewFunction(LoadPeerdbScript)) + + ls.PreloadModule("bit32", bit32.Loader) + + mt := LuaRecord.NewMetatable(ls) + mt.RawSetString("__index", ls.NewFunction(LuaRecordIndex)) + + mt = LuaRow.NewMetatable(ls) + mt.RawSetString("__index", ls.NewFunction(LuaRowIndex)) + mt.RawSetString("__len", ls.NewFunction(LuaRowLen)) + + mt = LuaUuid.NewMetatable(ls) + mt.RawSetString("__index", ls.NewFunction(LuaUuidIndex)) + mt.RawSetString("__tostring", ls.NewFunction(LuaUuidString)) + + mt = LuaTime.NewMetatable(ls) + mt.RawSetString("__index", ls.NewFunction(LuaTimeIndex)) + mt.RawSetString("__tostring", ls.NewFunction(LuaTimeString)) + + mt = LuaBigInt.NewMetatable(ls) + mt.RawSetString("__index", ls.NewFunction(LuaBigIntIndex)) + mt.RawSetString("__tostring", ls.NewFunction(LuaBigIntString)) + mt.RawSetString("__len", ls.NewFunction(LuaBigIntLen)) + + mt = LuaDecimal.NewMetatable(ls) + mt.RawSetString("__index", ls.NewFunction(LuaDecimalIndex)) + mt.RawSetString("__tostring", ls.NewFunction(LuaDecimalString)) + + peerdb := ls.NewTable() + peerdb.RawSetString("RowToJSON", ls.NewFunction(LuaRowToJSON)) + peerdb.RawSetString("RowColumns", ls.NewFunction(LuaRowColumns)) + peerdb.RawSetString("RowColumnKind", ls.NewFunction(LuaRowColumnKind)) + peerdb.RawSetString("Now", ls.NewFunction(LuaNow)) + peerdb.RawSetString("UUID", ls.NewFunction(LuaUUID)) + peerdb.RawSetString("type", ls.NewFunction(LuaType)) + peerdb.RawSetString("tostring", ls.NewFunction(LuaToString)) + ls.Env.RawSetString("peerdb", peerdb) +} + +func LoadPeerdbScript(ls *lua.LState) int { + ctx := ls.Context() + name := ls.CheckString(1) + pool, err := utils.GetCatalogConnectionPoolFromEnv(ctx) + if err != nil { + ls.RaiseError("Connection failed loading %s: %s", name, err.Error()) + return 0 + } + + var source []byte + err = pool.QueryRow(ctx, "select source from scripts where lang = 'lua' and name = $1", name).Scan(&source) + if err != nil { + if err == pgx.ErrNoRows { + ls.Push(lua.LString("Could not find script " + name)) + return 1 + } + ls.RaiseError("Failed to load script %s: %s", name, err.Error()) + return 0 + } + + fn, err := ls.Load(bytes.NewReader(source), name) + if err != nil { + ls.RaiseError(err.Error()) + } + ls.Push(fn) + return 1 +} + +func GetRowQ(ls *lua.LState, row *model.RecordItems, col string) qvalue.QValue { + qv, err := row.GetValueByColName(col) + if err != nil { + ls.RaiseError(err.Error()) + return qvalue.QValue{} + } + return qv +} + +func LuaRowIndex(ls *lua.LState) int { + row, key := LuaRow.StartIndex(ls) + ls.Push(LuaQValue(ls, GetRowQ(ls, row, key))) + return 1 +} + +func LuaRowLen(ls *lua.LState) int { + _, row := LuaRow.Check(ls, 1) + ls.Push(lua.LNumber(len(row.Values))) + return 1 +} + +func LuaRowToJSON(ls *lua.LState) int { + _, row := LuaRow.Check(ls, 1) + json, err := row.ToJSON() + if err != nil { + ls.RaiseError("failed to serialize json: %s", err.Error()) + return 0 + } + ls.Push(lua.LString(json)) + return 1 +} + +func LuaRowColumns(ls *lua.LState) int { + _, row := LuaRow.Check(ls, 1) + tbl := ls.CreateTable(len(row.ColToValIdx), 0) + for col, idx := range row.ColToValIdx { + tbl.RawSetInt(idx+1, lua.LString(col)) + } + ls.Push(tbl) + return 1 +} + +func LuaRowColumnKind(ls *lua.LState) int { + row, key := LuaRow.StartIndex(ls) + ls.Push(lua.LString(GetRowQ(ls, row, key).Kind)) + return 1 +} + +func LuaRecordIndex(ls *lua.LState) int { + record, key := LuaRecord.StartIndex(ls) + switch key { + case "kind": + switch record.(type) { + case *model.InsertRecord: + ls.Push(lua.LString("insert")) + case *model.UpdateRecord: + ls.Push(lua.LString("update")) + case *model.DeleteRecord: + ls.Push(lua.LString("delete")) + case *model.RelationRecord: + ls.Push(lua.LString("relation")) + } + case "row": + items := record.GetItems() + if items != nil { + ls.Push(LuaRow.New(ls, items)) + } else { + ls.Push(lua.LNil) + } + case "old": + var items *model.RecordItems + switch rec := record.(type) { + case *model.UpdateRecord: + items = rec.OldItems + case *model.DeleteRecord: + items = rec.Items + } + if items != nil { + ls.Push(LuaRow.New(ls, items)) + } else { + ls.Push(lua.LNil) + } + case "new": + var items *model.RecordItems + switch rec := record.(type) { + case *model.InsertRecord: + items = rec.Items + case *model.UpdateRecord: + items = rec.NewItems + } + if items != nil { + ls.Push(LuaRow.New(ls, items)) + } else { + ls.Push(lua.LNil) + } + case "checkpoint": + ls.Push(glua64.I64.New(ls, record.GetCheckpointID())) + case "commit_time": + ls.Push(LuaTime.New(ls, record.GetCommitTime())) + case "target": + ls.Push(lua.LString(record.GetDestinationTableName())) + case "source": + ls.Push(lua.LString(record.GetSourceTableName())) + default: + return 0 + } + return 1 +} + +func qvToLTable[T any](ls *lua.LState, s []T, f func(x T) lua.LValue) *lua.LTable { + tbl := ls.CreateTable(len(s), 0) + for idx, val := range s { + tbl.RawSetInt(idx+1, f(val)) + } + return tbl +} + +func LuaQValue(ls *lua.LState, qv qvalue.QValue) lua.LValue { + switch v := qv.Value.(type) { + case nil: + return lua.LNil + case bool: + return lua.LBool(v) + case uint8: + if qv.Kind == qvalue.QValueKindQChar { + return lua.LString(rune(v)) + } else { + return lua.LNumber(v) + } + case int16: + return lua.LNumber(v) + case int32: + return lua.LNumber(v) + case int64: + return glua64.I64.New(ls, v) + case float32: + return lua.LNumber(v) + case float64: + return lua.LNumber(v) + case string: + if qv.Kind == qvalue.QValueKindUUID { + u, err := uuid.Parse(v) + if err != nil { + return LuaUuid.New(ls, u) + } + } + return lua.LString(v) + case time.Time: + return LuaTime.New(ls, v) + case decimal.Decimal: + return LuaDecimal.New(ls, v) + case [16]byte: + return LuaUuid.New(ls, uuid.UUID(v)) + case []byte: + return lua.LString(v) + case []float32: + return qvToLTable(ls, v, func(f float32) lua.LValue { + return lua.LNumber(f) + }) + case []float64: + return qvToLTable(ls, v, func(f float64) lua.LValue { + return lua.LNumber(f) + }) + case []int16: + return qvToLTable(ls, v, func(x int16) lua.LValue { + return lua.LNumber(x) + }) + case []int32: + return qvToLTable(ls, v, func(x int32) lua.LValue { + return lua.LNumber(x) + }) + case []int64: + return qvToLTable(ls, v, func(x int64) lua.LValue { + return glua64.I64.New(ls, x) + }) + case []string: + return qvToLTable(ls, v, func(x string) lua.LValue { + return lua.LString(x) + }) + case []time.Time: + return qvToLTable(ls, v, func(x time.Time) lua.LValue { + return LuaTime.New(ls, x) + }) + case []bool: + return qvToLTable(ls, v, func(x bool) lua.LValue { + return lua.LBool(x) + }) + default: + return lua.LString(fmt.Sprint(qv.Value)) + } +} + +func LuaUuidIndex(ls *lua.LState) int { + _, val := LuaUuid.Check(ls, 1) + key := ls.CheckNumber(2) + ki := int(key) + if ki >= 0 && ki < 16 { + ls.Push(lua.LNumber(val[ki])) + return 1 + } + return 0 +} + +func LuaUuidString(ls *lua.LState) int { + val := LuaUuid.StartMethod(ls) + ls.Push(lua.LString(val.String())) + return 1 +} + +func LuaNow(ls *lua.LState) int { + ls.Push(LuaTime.New(ls, time.Now())) + return 1 +} + +func LuaUUID(ls *lua.LState) int { + ls.Push(LuaUuid.New(ls, uuid.New())) + return 1 +} + +func LuaType(ls *lua.LState) int { + val := ls.Get(1) + if ud, ok := val.(*lua.LUserData); ok { + ls.Push(lua.LString(fmt.Sprintf("%T", ud.Value))) + return 1 + } + return 0 +} + +func LuaToString(ls *lua.LState) int { + val := ls.Get(1) + if ud, ok := val.(*lua.LUserData); ok { + ls.Push(lua.LString(fmt.Sprint(ud.Value))) + return 1 + } + return 0 +} + +func LuaTimeIndex(ls *lua.LState) int { + tm, key := LuaTime.StartIndex(ls) + switch key { + case "unix_nano": + ls.Push(glua64.I64.New(ls, tm.UnixNano())) + case "unix_micro": + ls.Push(glua64.I64.New(ls, tm.UnixMicro())) + case "unix_milli": + ls.Push(glua64.I64.New(ls, tm.UnixMilli())) + case "unix_second": + ls.Push(glua64.I64.New(ls, tm.Unix())) + case "unix": + ls.Push(lua.LNumber(float64(tm.Unix()) + float64(tm.Nanosecond())/1e9)) + case "year": + ls.Push(lua.LNumber(tm.Year())) + case "month": + ls.Push(lua.LNumber(tm.Month())) + case "day": + ls.Push(lua.LNumber(tm.Day())) + case "yearday": + ls.Push(lua.LNumber(tm.YearDay())) + case "hour": + ls.Push(lua.LNumber(tm.Hour())) + case "minute": + ls.Push(lua.LNumber(tm.Minute())) + case "second": + ls.Push(lua.LNumber(tm.Second())) + case "nanosecond": + ls.Push(lua.LNumber(tm.Nanosecond())) + default: + return 0 + } + return 1 +} + +func LuaTimeString(ls *lua.LState) int { + tm := LuaTime.StartMethod(ls) + ls.Push(lua.LString(tm.String())) + return 1 +} + +func LuaBigIntIndex(ls *lua.LState) int { + _, bi := LuaBigInt.Check(ls, 1) + switch key := ls.Get(2).(type) { + case lua.LNumber: + ls.Push(lua.LNumber(bi.Bytes()[int(key)])) + case lua.LString: + switch string(key) { + case "sign": + ls.Push(lua.LNumber(bi.Sign())) + case "bytes": + ls.Push(lua.LString(bi.Bytes())) + case "int64": + ls.Push(glua64.I64.New(ls, bi.Int64())) + case "is64": + ls.Push(lua.LBool(bi.IsInt64())) + } + default: + ls.RaiseError("BigInt accessed with non number/string") + } + return 1 +} + +func LuaBigIntString(ls *lua.LState) int { + bi := LuaBigInt.StartMethod(ls) + ls.Push(lua.LString(bi.String())) + return 1 +} + +func LuaBigIntLen(ls *lua.LState) int { + bi := LuaBigInt.StartMethod(ls) + ls.Push(lua.LNumber(len(bi.Bytes()))) + return 1 +} + +func LuaDecimalIndex(ls *lua.LState) int { + num, key := LuaDecimal.StartIndex(ls) + switch key { + case "coefficient": + ls.Push(LuaBigInt.New(ls, num.Coefficient())) + case "coefficient64": + ls.Push(glua64.I64.New(ls, num.CoefficientInt64())) + case "exponent": + ls.Push(lua.LNumber(num.Exponent())) + case "bigint": + ls.Push(LuaBigInt.New(ls, num.BigInt())) + case "int64": + ls.Push(glua64.I64.New(ls, num.IntPart())) + case "float64": + ls.Push(lua.LNumber(num.InexactFloat64())) + default: + return 0 + } + return 1 +} + +func LuaDecimalString(ls *lua.LState) int { + num := LuaDecimal.StartMethod(ls) + ls.Push(lua.LString(num.String())) + return 1 +} diff --git a/flow/pua/peerdb_test.go b/flow/pua/peerdb_test.go new file mode 100644 index 0000000000..b2a752a16b --- /dev/null +++ b/flow/pua/peerdb_test.go @@ -0,0 +1,50 @@ +package pua + +import ( + "testing" + + "github.com/google/uuid" + "github.com/yuin/gopher-lua" +) + +func assert(t *testing.T, ls *lua.LState, source string) { + t.Helper() + err := ls.DoString(source) + if err != nil { + t.Log(err) + t.FailNow() + } +} + +func Test_Lua(t *testing.T) { + t.Parallel() + + ls := lua.NewState(lua.Options{}) + RegisterTypes(ls) + + id := uuid.UUID([16]byte{2, 3, 5, 7, 11, 13, 17, 19, 127, 131, 137, 139, 149, 151, 241, 251}) + ls.Env.RawSetString("uuid", LuaUuid.New(ls, id)) + + assert(t, ls, ` +assert(require('bit32').band(173, 21) == 5) +assert(dofile == nil) +assert(loadfile == nil) + +assert(uuid[0] == 2) +assert(uuid[1] == 3) +assert(uuid[2] == 5) +assert(uuid[3] == 7) +assert(uuid[4] == 11) +assert(uuid[5] == 13) +assert(uuid[6] == 17) +assert(uuid[7] == 19) +assert(uuid[8] == 127) +assert(uuid[9] == 131) +assert(uuid[10] == 137) +assert(uuid[11] == 139) +assert(uuid[12] == 149) +assert(uuid[13] == 151) +assert(uuid[14] == 241) +assert(uuid[15] == 251) +`) +} diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 81468018a8..da8cab94d4 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -10,8 +10,8 @@ use anyhow::Context; use pt::{ flow_model::{FlowJob, FlowJobTableMapping, QRepFlowJob}, peerdb_peers::{ - peer::Config, BigqueryConfig, ClickhouseConfig, DbType, EventHubConfig, MongoConfig, Peer, - PostgresConfig, S3Config, SnowflakeConfig, SqlServerConfig, + peer::Config, BigqueryConfig, ClickhouseConfig, DbType, EventHubConfig, KafkaConfig, + MongoConfig, Peer, PostgresConfig, S3Config, SnowflakeConfig, SqlServerConfig, }, }; use qrep::process_options; @@ -251,8 +251,8 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { let snapshot_staging_path = match raw_options .remove("snapshot_staging_path") { - Some(sqlparser::ast::Value::SingleQuotedString(s)) => Some(s.clone()), - _ => Some("".to_string()), + Some(sqlparser::ast::Value::SingleQuotedString(s)) => s.clone(), + _ => String::new(), }; let snapshot_max_parallel_workers: Option = match raw_options @@ -311,6 +311,11 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { _ => false, }; + let script = match raw_options.remove("script") { + Some(sqlparser::ast::Value::SingleQuotedString(s)) => s.clone(), + _ => String::new(), + }; + let flow_job = FlowJob { name: cdc.mirror_name.to_string().to_lowercase(), source_peer: cdc.source_peer.to_string().to_lowercase(), @@ -333,6 +338,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { soft_delete_col_name, synced_at_col_name, initial_snapshot_only: initial_copy_only, + script, }; if initial_copy_only && !do_initial_copy { @@ -813,5 +819,40 @@ fn parse_db_options( }; Config::ClickhouseConfig(clickhouse_config) } + DbType::Kafka => { + let kafka_config = KafkaConfig { + servers: opts + .get("servers") + .context("no servers specified")? + .split(',') + .map(String::from) + .collect::>(), + username: opts + .get("user") + .cloned() + .unwrap_or_default() + .to_string(), + password: opts + .get("password") + .cloned() + .unwrap_or_default() + .to_string(), + sasl: opts + .get("sasl_mechanism") + .cloned() + .unwrap_or_default() + .to_string(), + partitioner: opts + .get("sasl_mechanism") + .cloned() + .unwrap_or_default() + .to_string(), + disable_tls: opts + .get("disable_tls") + .and_then(|s| s.parse::().ok()) + .unwrap_or_default(), + }; + Config::KafkaConfig(kafka_config) + } })) } diff --git a/nexus/catalog/migrations/V23__scripts.sql b/nexus/catalog/migrations/V23__scripts.sql new file mode 100644 index 0000000000..d79c897510 --- /dev/null +++ b/nexus/catalog/migrations/V23__scripts.sql @@ -0,0 +1,8 @@ +CREATE TYPE script_lang AS ENUM ('lua'); + +CREATE TABLE scripts ( + id SERIAL PRIMARY KEY, + lang script_lang NOT NULL, + name TEXT NOT NULL UNIQUE, + source BYTEA NOT NULL +); diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index ce3eb7e778..0d8b7ed6a6 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -100,6 +100,7 @@ impl Catalog { eventhub_group_config.encode_to_vec() } Config::ClickhouseConfig(clickhouse_config) => clickhouse_config.encode_to_vec(), + Config::KafkaConfig(kafka_config) => kafka_config.encode_to_vec(), } }; @@ -306,6 +307,11 @@ impl Catalog { pt::peerdb_peers::ClickhouseConfig::decode(options).with_context(err)?; Config::ClickhouseConfig(clickhouse_config) } + DbType::Kafka => { + let kafka_config = + pt::peerdb_peers::KafkaConfig::decode(options).with_context(err)?; + Config::KafkaConfig(kafka_config) + } }) } else { None @@ -319,12 +325,11 @@ impl Catalog { ) -> anyhow::Result { let peer_dbtype = self.get_peer_type_for_id(peer_id).await?; - let mut table_identifier_parts = table_identifier.split('.').collect::>(); - if table_identifier_parts.len() == 1 && (peer_dbtype != DbType::Bigquery) { - table_identifier_parts.insert(0, "public"); + if !table_identifier.contains('.') && peer_dbtype != DbType::Bigquery { + Ok(format!("public.{}", table_identifier)) + } else { + Ok(String::from(table_identifier)) } - - Ok(table_identifier_parts.join(".")) } pub async fn create_cdc_flow_job_entry(&self, job: &FlowJob) -> anyhow::Result<()> { diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index a962c4ec4f..b7569dbe45 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -161,7 +161,7 @@ impl FlowGrpcClient { snapshot_num_rows_per_partition: snapshot_num_rows_per_partition.unwrap_or(0), snapshot_max_parallel_workers: snapshot_max_parallel_workers.unwrap_or(0), snapshot_num_tables_in_parallel: snapshot_num_tables_in_parallel.unwrap_or(0), - snapshot_staging_path: job.snapshot_staging_path.clone().unwrap_or_default(), + snapshot_staging_path: job.snapshot_staging_path.clone(), cdc_staging_path: job.cdc_staging_path.clone().unwrap_or_default(), soft_delete: job.soft_delete, replication_slot_name: replication_slot_name.unwrap_or_default(), @@ -170,6 +170,7 @@ impl FlowGrpcClient { soft_delete_col_name: job.soft_delete_col_name.clone().unwrap_or_default(), synced_at_col_name: job.synced_at_col_name.clone().unwrap_or_default(), initial_snapshot_only: job.initial_snapshot_only, + script: job.script.clone(), ..Default::default() }; diff --git a/nexus/pt/src/flow_model.rs b/nexus/pt/src/flow_model.rs index 021af81bbf..2946dd1d30 100644 --- a/nexus/pt/src/flow_model.rs +++ b/nexus/pt/src/flow_model.rs @@ -23,7 +23,7 @@ pub struct FlowJob { pub snapshot_num_rows_per_partition: Option, pub snapshot_max_parallel_workers: Option, pub snapshot_num_tables_in_parallel: Option, - pub snapshot_staging_path: Option, + pub snapshot_staging_path: String, pub cdc_staging_path: Option, pub soft_delete: bool, pub replication_slot_name: Option, @@ -34,6 +34,7 @@ pub struct FlowJob { pub soft_delete_col_name: Option, pub synced_at_col_name: Option, pub initial_snapshot_only: bool, + pub script: String, } #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] diff --git a/nexus/pt/src/lib.rs b/nexus/pt/src/lib.rs index 41b0566b2a..33da1e6382 100644 --- a/nexus/pt/src/lib.rs +++ b/nexus/pt/src/lib.rs @@ -25,7 +25,7 @@ impl From for DbType { PeerType::S3 => DbType::S3, PeerType::SQLServer => DbType::Sqlserver, PeerType::EventHubGroup => DbType::EventhubGroup, - PeerType::Kafka => todo!("Add Kafka support"), + PeerType::Kafka => DbType::Kafka, } } } diff --git a/protos/flow.proto b/protos/flow.proto index f33b0696db..502520e1ea 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -69,6 +69,8 @@ message FlowConnectionConfigs { bool soft_delete = 17; string soft_delete_col_name = 18; string synced_at_col_name = 19; + + string script = 20; } message RenameTableOption { diff --git a/protos/peers.proto b/protos/peers.proto index 063effc4da..12fc88bfb3 100644 --- a/protos/peers.proto +++ b/protos/peers.proto @@ -108,6 +108,15 @@ message SqlServerConfig { string database = 5; } +message KafkaConfig { + repeated string servers = 1; + string username = 2; + string password = 3; + string sasl = 4; + bool disable_tls = 5; + string partitioner = 6; +} + enum DBType { BIGQUERY = 0; SNOWFLAKE = 1; @@ -118,6 +127,7 @@ enum DBType { SQLSERVER = 6; EVENTHUB_GROUP = 7; CLICKHOUSE = 8; + KAFKA = 9; } message Peer { @@ -133,5 +143,6 @@ message Peer { SqlServerConfig sqlserver_config = 9; EventHubGroupConfig eventhub_group_config = 10; ClickhouseConfig clickhouse_config = 11; + KafkaConfig kafka_config = 12; } } diff --git a/ui/README.md b/ui/README.md index 1d3fd5e120..6947d3f03d 100644 --- a/ui/README.md +++ b/ui/README.md @@ -2,7 +2,7 @@ PeerDB Cloud Template ## Prerequisites -- [NodeJS](https://nodejs.org/en): `18.17.1` +- [NodeJS](https://nodejs.org/en): `LTS` ## Getting Started diff --git a/ui/app/api/peers/getTruePeer.ts b/ui/app/api/peers/getTruePeer.ts index 3cef249e48..93da206854 100644 --- a/ui/app/api/peers/getTruePeer.ts +++ b/ui/app/api/peers/getTruePeer.ts @@ -4,6 +4,7 @@ import { ClickhouseConfig, EventHubConfig, EventHubGroupConfig, + KafkaConfig, Peer, PostgresConfig, S3Config, @@ -19,13 +20,14 @@ export const getTruePeer = (peer: CatalogPeer) => { const options = peer.options; let config: | BigqueryConfig - | SnowflakeConfig - | PostgresConfig + | ClickhouseConfig | EventHubConfig - | S3Config - | SqlServerConfig | EventHubGroupConfig - | ClickhouseConfig; + | KafkaConfig + | PostgresConfig + | S3Config + | SnowflakeConfig + | SqlServerConfig; switch (peer.type) { case 0: config = BigqueryConfig.decode(options); @@ -59,6 +61,10 @@ export const getTruePeer = (peer: CatalogPeer) => { config = ClickhouseConfig.decode(options); newPeer.clickhouseConfig = config; break; + case 9: + config = KafkaConfig.decode(options); + newPeer.kafkaConfig = config; + break; default: return newPeer; } diff --git a/ui/app/api/peers/info/[peerName]/route.ts b/ui/app/api/peers/info/[peerName]/route.ts index 93b0c00647..61dc033efb 100644 --- a/ui/app/api/peers/info/[peerName]/route.ts +++ b/ui/app/api/peers/info/[peerName]/route.ts @@ -1,5 +1,6 @@ import prisma from '@/app/utils/prisma'; import { NextRequest, NextResponse } from 'next/server'; + import { getTruePeer } from '../../getTruePeer'; export async function GET( @@ -20,6 +21,7 @@ export async function GET( const sfConfig = peerConfig.snowflakeConfig; const ehConfig = peerConfig.eventhubConfig; const chConfig = peerConfig.clickhouseConfig; + const kaConfig = peerConfig.kafkaConfig; if (pgConfig) { pgConfig.password = '********'; pgConfig.transactionSnapshot = '********'; @@ -42,6 +44,9 @@ export async function GET( chConfig.password = '********'; chConfig.secretAccessKey = '********'; } + if (kaConfig) { + kaConfig.password = '********'; + } return NextResponse.json(peerConfig); } diff --git a/ui/app/api/peers/route.ts b/ui/app/api/peers/route.ts index 5d43442bd8..0691212af8 100644 --- a/ui/app/api/peers/route.ts +++ b/ui/app/api/peers/route.ts @@ -10,6 +10,7 @@ import { BigqueryConfig, ClickhouseConfig, DBType, + KafkaConfig, Peer, PostgresConfig, S3Config, @@ -63,6 +64,12 @@ const constructPeer = ( type: DBType.S3, s3Config: config as S3Config, }; + case 'KAFKA': + return { + name, + type: DBType.KAFKA, + kafkaConfig: config as KafkaConfig, + }; default: return; } diff --git a/ui/app/dto/PeersDTO.ts b/ui/app/dto/PeersDTO.ts index bbf5fbccde..3e7f5c0b41 100644 --- a/ui/app/dto/PeersDTO.ts +++ b/ui/app/dto/PeersDTO.ts @@ -1,6 +1,7 @@ import { BigqueryConfig, ClickhouseConfig, + KafkaConfig, PostgresConfig, S3Config, SnowflakeConfig, @@ -43,7 +44,8 @@ export type PeerConfig = | SnowflakeConfig | BigqueryConfig | ClickhouseConfig - | S3Config; + | S3Config + | KafkaConfig; export type CatalogPeer = { id: number; name: string; diff --git a/ui/app/mirrors/[mirrorId]/configValues.ts b/ui/app/mirrors/[mirrorId]/configValues.ts index 8d49f3090f..55b5a4c0b3 100644 --- a/ui/app/mirrors/[mirrorId]/configValues.ts +++ b/ui/app/mirrors/[mirrorId]/configValues.ts @@ -19,21 +19,21 @@ const MirrorValues = (mirrorConfig: FlowConnectionConfigs | undefined) => { label: 'Snapshot Parallel Tables', }, { - value: `${ - mirrorConfig?.cdcStagingPath?.length - ? mirrorConfig?.cdcStagingPath - : 'Local' - }`, + value: mirrorConfig?.cdcStagingPath || 'Local', label: 'CDC Staging Path', }, { - value: `${ - mirrorConfig?.snapshotStagingPath?.length - ? mirrorConfig?.snapshotStagingPath - : 'Local' - }`, + value: mirrorConfig?.snapshotStagingPath || 'Local', label: 'Snapshot Staging Path', }, + { + value: mirrorConfig?.snapshotStagingPath || 'Local', + label: 'Snapshot Staging Path', + }, + { + value: mirrorConfig?.script, + label: 'Script', + }, ]; }; export default MirrorValues; diff --git a/ui/app/mirrors/create/helpers/cdc.ts b/ui/app/mirrors/create/helpers/cdc.ts index 0d70e3fac1..5d63cdd69a 100644 --- a/ui/app/mirrors/create/helpers/cdc.ts +++ b/ui/app/mirrors/create/helpers/cdc.ts @@ -139,4 +139,14 @@ export const cdcSettings: MirrorSetting[] = [ type: 'switch', advanced: true, }, + { + label: 'Script', + stateHandler: (value, setter) => + setter((curr: CDCConfig) => ({ + ...curr, + script: (value as string) || '', + })), + tips: 'Associate PeerDB script with this mirror.', + advanced: true, + }, ]; diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index 34233f24fc..63dea81815 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -36,6 +36,7 @@ export const blankCDCSetting: FlowConnectionConfigs = { syncedAtColName: '', initialSnapshotOnly: false, idleTimeoutSeconds: 60, + script: '', }; export const blankQRepSetting = { diff --git a/ui/app/peers/create/[peerType]/handlers.ts b/ui/app/peers/create/[peerType]/handlers.ts index 423b996aec..42f84a492a 100644 --- a/ui/app/peers/create/[peerType]/handlers.ts +++ b/ui/app/peers/create/[peerType]/handlers.ts @@ -8,6 +8,7 @@ import { Dispatch, SetStateAction } from 'react'; import { bqSchema, chSchema, + kaSchema, peerNameSchema, pgSchema, s3Schema, @@ -57,6 +58,10 @@ const validateFields = ( const s3Config = s3Schema.safeParse(config); if (!s3Config.success) validationErr = s3Config.error.issues[0].message; break; + case 'KAFKA': + const kaConfig = kaSchema.safeParse(config); + if (!kaConfig.success) validationErr = kaConfig.error.issues[0].message; + break; default: validationErr = 'Unsupported peer type ' + type; } diff --git a/ui/app/peers/create/[peerType]/helpers/ch.ts b/ui/app/peers/create/[peerType]/helpers/ch.ts index bda5b8eb1e..29147e0620 100644 --- a/ui/app/peers/create/[peerType]/helpers/ch.ts +++ b/ui/app/peers/create/[peerType]/helpers/ch.ts @@ -45,6 +45,7 @@ export const clickhouseSetting: PeerSetting[] = [ setter((curr) => ({ ...curr, disableTls: value as boolean })), type: 'switch', tips: 'If you are using a non-TLS connection for Clickhouse server, check this box.', + optional: true, }, { label: 'S3 Path', diff --git a/ui/app/peers/create/[peerType]/helpers/common.ts b/ui/app/peers/create/[peerType]/helpers/common.ts index ba14dafb2f..e418fd9266 100644 --- a/ui/app/peers/create/[peerType]/helpers/common.ts +++ b/ui/app/peers/create/[peerType]/helpers/common.ts @@ -13,6 +13,8 @@ export interface PeerSetting { tips?: string; helpfulLink?: string; default?: string | number; + placeholder?: string; + options?: { value: string; label: string }[]; } export const getBlankSetting = (dbType: string): PeerConfig => { diff --git a/ui/app/peers/create/[peerType]/helpers/ka.ts b/ui/app/peers/create/[peerType]/helpers/ka.ts new file mode 100644 index 0000000000..5091d29e7e --- /dev/null +++ b/ui/app/peers/create/[peerType]/helpers/ka.ts @@ -0,0 +1,73 @@ +import { KafkaConfig } from '@/grpc_generated/peers'; +import { PeerSetting } from './common'; + +export const kaSetting: PeerSetting[] = [ + { + label: 'Servers', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, servers: (value as string).split(',') })), + tips: 'Brokers', + helpfulLink: + 'https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#SeedBrokers', + }, + { + label: 'Username', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, username: value as string })), + optional: true, + }, + { + label: 'Password', + type: 'password', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, password: value as string })), + optional: true, + }, + { + label: 'SASL Mechanism', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, sasl: value as string })), + type: 'select', + placeholder: 'Select a mechanism', + helpfulLink: + 'https://docs.redpanda.com/current/manage/security/authentication/#scram', + options: [ + { value: 'PLAIN', label: 'PLAIN' }, + { value: 'SCRAM-SHA-256', label: 'SCRAM-SHA-256' }, + { value: 'SCRAM-SHA-512', label: 'SCRAM-SHA-512' }, + ], + }, + { + label: 'Partitioner', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, partitioner: value as string })), + type: 'select', + placeholder: 'Select a partitioner', + helpfulLink: + 'https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#Partitioner', + options: [ + { value: 'LeastBackup', label: 'Least Backup' }, + { value: 'Manual', label: 'Manual' }, + { value: 'RoundRobin', label: 'Round Robin' }, + { value: 'StickyKey', label: 'Sticky Key' }, + { value: 'Sticky', label: 'Sticky' }, + ], + }, + { + label: 'Disable TLS?', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, disableTls: value as boolean })), + type: 'switch', + tips: 'If you are using a non-TLS connection for Kafka server, check this box.', + optional: true, + }, +]; + +export const blankKaSetting: KafkaConfig = { + servers: [], + username: '', + password: '', + sasl: 'PLAIN', + partitioner: '', + disableTls: false, +}; diff --git a/ui/app/peers/create/[peerType]/page.tsx b/ui/app/peers/create/[peerType]/page.tsx index 4aa958756a..bbb25b31ad 100644 --- a/ui/app/peers/create/[peerType]/page.tsx +++ b/ui/app/peers/create/[peerType]/page.tsx @@ -3,6 +3,7 @@ import { PeerConfig } from '@/app/dto/PeersDTO'; import GuideForDestinationSetup from '@/app/mirrors/create/cdc/guide'; import BigqueryForm from '@/components/PeerForms/BigqueryConfig'; import ClickhouseForm from '@/components/PeerForms/ClickhouseConfig'; +import KafkaForm from '@/components/PeerForms/KafkaConfig'; import PostgresForm from '@/components/PeerForms/PostgresForm'; import S3Form from '@/components/PeerForms/S3Form'; import SnowflakeForm from '@/components/PeerForms/SnowflakeForm'; @@ -81,6 +82,8 @@ export default function CreateConfig({ ); case 'S3': return ; + case 'KAFKA': + return ; default: return <>; } diff --git a/ui/app/peers/create/[peerType]/schema.ts b/ui/app/peers/create/[peerType]/schema.ts index 211acd69a7..117b8738fe 100644 --- a/ui/app/peers/create/[peerType]/schema.ts +++ b/ui/app/peers/create/[peerType]/schema.ts @@ -278,6 +278,33 @@ export const chSchema = z.object({ region: z .string({ invalid_type_error: 'Region must be a string' }) .optional(), + disableTls: z.boolean(), +}); + +export const kaSchema = z.object({ + servers: z + .array(z.string({ required_error: 'Server address must not be empty' })) + .min(1, { message: 'At least 1 server required' }), + username: z.string().optional(), + password: z.string().optional(), + sasl: z + .union([ + z.literal('PLAIN'), + z.literal('SCRAM-SHA-256'), + z.literal('SCRAM-SHA-512'), + ]) + .optional(), + partitioner: z + .union([ + z.literal('Default'), + z.literal('LeastBackup'), + z.literal('Manual'), + z.literal('RoundRobin'), + z.literal('StickyKey'), + z.literal('Sticky'), + ]) + .optional(), + disableTls: z.boolean().optional(), }); const urlSchema = z diff --git a/ui/components/PeerComponent.tsx b/ui/components/PeerComponent.tsx index 66851820ed..8505655c72 100644 --- a/ui/components/PeerComponent.tsx +++ b/ui/components/PeerComponent.tsx @@ -32,6 +32,9 @@ export const DBTypeToImageMapping = (peerType: DBType | string) => { case DBType.EVENTHUB_GROUP: case DBType.EVENTHUB: return '/svgs/ms.svg'; + case DBType.KAFKA: + case 'KAFKA': + return '/svgs/kafka.svg'; default: return '/svgs/pg.svg'; } diff --git a/ui/components/PeerForms/KafkaConfig.tsx b/ui/components/PeerForms/KafkaConfig.tsx new file mode 100644 index 0000000000..65df972ba5 --- /dev/null +++ b/ui/components/PeerForms/KafkaConfig.tsx @@ -0,0 +1,116 @@ +'use client'; +import { PeerSetter } from '@/app/dto/PeersDTO'; +import { kaSetting } from '@/app/peers/create/[peerType]/helpers/ka'; +import SelectTheme from '@/app/styles/select'; +import { Label } from '@/lib/Label'; +import { RowWithSelect, RowWithSwitch, RowWithTextField } from '@/lib/Layout'; +import { Switch } from '@/lib/Switch/Switch'; +import { TextField } from '@/lib/TextField'; +import { Tooltip } from '@/lib/Tooltip'; +import ReactSelect from 'react-select'; +import { InfoPopover } from '../InfoPopover'; +interface KafkaProps { + setter: PeerSetter; +} + +const KafkaForm = ({ setter }: KafkaProps) => { + return ( +
+ {kaSetting.map((setting, index) => { + return setting.type === 'switch' ? ( + + {setting.label}{' '} + {!setting.optional && ( + + + + )} + + } + action={ +
+ + setting.stateHandler(state, setter) + } + /> + {setting.tips && ( + + )} +
+ } + /> + ) : setting.type === 'select' ? ( + {setting.label}} + action={ + + val && setting.stateHandler(val.value, setter) + } + options={setting.options} + theme={SelectTheme} + /> + } + /> + ) : ( + + {setting.label}{' '} + {!setting.optional && ( + + + + )} + + } + action={ +
+ ) => + setting.stateHandler(e.target.value, setter) + } + /> + {setting.tips && ( + + )} +
+ } + /> + ); + })} +
+ ); +}; + +export default KafkaForm; diff --git a/ui/components/PeerTypeComponent.tsx b/ui/components/PeerTypeComponent.tsx index d315499a6d..2db4be9dd2 100644 --- a/ui/components/PeerTypeComponent.tsx +++ b/ui/components/PeerTypeComponent.tsx @@ -24,6 +24,8 @@ export const DBTypeToGoodText = (ptype: DBType) => { return 'MongoDB'; case DBType.CLICKHOUSE: return 'Clickhouse'; + case DBType.KAFKA: + return 'Kafka'; case DBType.UNRECOGNIZED: return 'Unrecognised'; } diff --git a/ui/components/SelectSource.tsx b/ui/components/SelectSource.tsx index e77d1b5e4d..eca7c37e61 100644 --- a/ui/components/SelectSource.tsx +++ b/ui/components/SelectSource.tsx @@ -34,7 +34,8 @@ export default function SelectSource({ value === 'SNOWFLAKE' || value === 'BIGQUERY' || value === 'S3' || - value === 'CLICKHOUSE') + value === 'CLICKHOUSE' || + value === 'KAFKA') ) .map((value) => ({ label: value, value })); diff --git a/ui/public/svgs/kafka.svg b/ui/public/svgs/kafka.svg new file mode 100644 index 0000000000..305d876447 --- /dev/null +++ b/ui/public/svgs/kafka.svg @@ -0,0 +1 @@ +