Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka + lua scripting + flatbuffers #1461

Merged
merged 31 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
5d7d15a
Yet another kafka connector
serprex Mar 7, 2024
6b6a8f9
cleanup UI, add missing parts
Amogh-Bharadwaj Mar 11, 2024
046ec57
Add script to ui, script is name, rely on loader to retrieve from cat…
serprex Mar 11, 2024
a282e61
replace big.Rat with decimal.Decimal, add more __tostring
serprex Mar 16, 2024
1c6376a
Fix CreateByteVector, expose more decimal/int properties
serprex Mar 17, 2024
f3c957b
add peerdb.type function to help with debugging
serprex Mar 17, 2024
3e90271
Don't expose QValue directly to scripts
serprex Mar 17, 2024
6107a89
3 tweaks:
serprex Mar 18, 2024
9ef43f2
fix resize not updating data
serprex Mar 18, 2024
2d66ed1
Fix flatbuffers not handling buffer resizing properly
serprex Mar 18, 2024
8f97c68
Kafka: username optional, if not supplied no auth used
serprex Mar 18, 2024
0c97fca
Move flatbuffers into its own package, fix stack not being cleared in…
serprex Mar 18, 2024
93c5ccb
lua: bit32
serprex Mar 19, 2024
b4b57b6
Remove FlatBuffer_ prefixes
serprex Mar 19, 2024
3b2e236
adjust arrays to 1 based offset when row value is an array being conv…
serprex Mar 19, 2024
9ec1c58
Add lo/hi properties to i64/u64 types to help use with bit32 library
serprex Mar 19, 2024
163a1e2
onRecord: support for multiple returns
serprex Mar 19, 2024
5554b51
flatbuffers: license directory as ApacheV2 & add readme explaining
serprex Mar 20, 2024
188a30d
Rename LuaUserDataType to UserDataType
serprex Mar 20, 2024
f5c1668
add some basic unit testing to pua, fix i64/u64 comparisons
serprex Mar 20, 2024
820cd23
first stab at a basic kafka e2e test
serprex Mar 20, 2024
00d40d4
add CommitTime to cdc messages, expose to script
serprex Mar 20, 2024
876a79e
Add Script to mirror details in UI
serprex Mar 20, 2024
53028df
reduce BaseMessage size by 16 bytes only storing unix nano, good unti…
serprex Mar 21, 2024
521d576
clippy
serprex Mar 21, 2024
6bd45b3
moved code to gluaflatbuffers / glua64
serprex Mar 21, 2024
49200e5
kafka: split up SyncRecords, report numrecords as records consumed, n…
serprex Mar 21, 2024
cb96820
loadScript cleanup control flow
serprex Mar 21, 2024
3282859
move glua64 tests to glua64
serprex Mar 21, 2024
19314c0
fix lint
serprex Mar 21, 2024
0a81ade
oops
serprex Mar 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ jobs:
env:
PGPASSWORD: postgres

- name: start redpanda
uses: redpanda-data/[email protected]
with:
version: "latest"

- name: Install Temporal CLI
uses: temporalio/setup-temporal@v0

Expand Down
1 change: 1 addition & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (a *FlowableActivity) SyncFlow(
FlowJobName: flowName,
TableMappings: options.TableMappings,
StagingPath: config.CdcStagingPath,
Script: config.Script,
})
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
Expand Down
11 changes: 7 additions & 4 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,7 @@ func (h *FlowRequestHandler) CreatePeer(
return wrongConfigResponse, nil
}
pgConfig := pgConfigObject.PostgresConfig

encodedConfig, encodingErr = proto.Marshal(pgConfig)

case protos.DBType_SNOWFLAKE:
sfConfigObject, ok := config.(*protos.Peer_SnowflakeConfig)
if !ok {
Expand Down Expand Up @@ -566,13 +564,18 @@ func (h *FlowRequestHandler) CreatePeer(
encodedConfig, encodingErr = proto.Marshal(s3Config)
case protos.DBType_CLICKHOUSE:
chConfigObject, ok := config.(*protos.Peer_ClickhouseConfig)

if !ok {
return wrongConfigResponse, nil
}

chConfig := chConfigObject.ClickhouseConfig
encodedConfig, encodingErr = proto.Marshal(chConfig)
case protos.DBType_KAFKA:
kaConfigObject, ok := config.(*protos.Peer_KafkaConfig)
if !ok {
return wrongConfigResponse, nil
}
kaConfig := kaConfigObject.KafkaConfig
encodedConfig, encodingErr = proto.Marshal(kaConfig)
default:
return wrongConfigResponse, nil
}
Expand Down
4 changes: 4 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse"
conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub"
connkafka "github.com/PeerDB-io/peer-flow/connectors/kafka"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
conns3 "github.com/PeerDB-io/peer-flow/connectors/s3"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
Expand Down Expand Up @@ -202,6 +203,8 @@ func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) {
return connsqlserver.NewSQLServerConnector(ctx, inner.SqlserverConfig)
case *protos.Peer_ClickhouseConfig:
return connclickhouse.NewClickhouseConnector(ctx, inner.ClickhouseConfig)
case *protos.Peer_KafkaConfig:
return connkafka.NewKafkaConnector(ctx, inner.KafkaConfig)
default:
return nil, ErrUnsupportedFunctionality
}
Expand Down Expand Up @@ -260,6 +263,7 @@ var (
_ CDCSyncConnector = &connbigquery.BigQueryConnector{}
_ CDCSyncConnector = &connsnowflake.SnowflakeConnector{}
_ CDCSyncConnector = &conneventhub.EventHubConnector{}
_ CDCSyncConnector = &connkafka.KafkaConnector{}
_ CDCSyncConnector = &conns3.S3Connector{}
_ CDCSyncConnector = &connclickhouse.ClickhouseConnector{}

Expand Down
12 changes: 2 additions & 10 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,7 @@ func (c *EventHubConnector) GetLastOffset(ctx context.Context, jobName string) (
}

func (c *EventHubConnector) SetLastOffset(ctx context.Context, jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(ctx, jobName, offset)
if err != nil {
c.logger.Error("failed to update last offset", slog.Any("error", err))
return err
}

return nil
return c.pgMetadata.UpdateLastOffset(ctx, jobName, offset)
}

// returns the number of records synced
Expand Down Expand Up @@ -204,9 +198,7 @@ func (c *EventHubConnector) processBatch(
}

func (c *EventHubConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
batch := req.Records

numRecords, err := c.processBatch(ctx, req.FlowJobName, batch)
numRecords, err := c.processBatch(ctx, req.FlowJobName, req.Records)
if err != nil {
c.logger.Error("failed to process batch", slog.Any("error", err))
return nil, err
Expand Down
7 changes: 7 additions & 0 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ func (p *PostgresMetadataStore) Ping(ctx context.Context) error {
return nil
}

func (p *PostgresMetadataStore) LogFlowInfo(ctx context.Context, flowName string, info string) error {
_, err := p.pool.Exec(ctx,
"INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",
flowName, info, "info")
return err
}

func (p *PostgresMetadataStore) FetchLastOffset(ctx context.Context, jobName string) (int64, error) {
row := p.pool.QueryRow(ctx,
`SELECT last_offset FROM `+
Expand Down
Loading
Loading