Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into cdc-parallel-sync-nor…
Browse files Browse the repository at this point in the history
…malize
  • Loading branch information
serprex committed Dec 21, 2023
2 parents ae0f1f8 + 08861aa commit d64288b
Show file tree
Hide file tree
Showing 72 changed files with 2,559 additions and 689 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ on:
push:
branches: [main, release/*]
pull_request:
branches:
- "main"
- "release/*"
branches: [main, release/*]

jobs:
build:
Expand Down
3 changes: 1 addition & 2 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ name: Flow build and test

on:
pull_request:
branches:
- "main"
branches: [main]
push:
branches: [main]

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/golang-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ name: GolangCI-Lint

on:
pull_request:
branches:
- "main"
branches: [main]
paths: [flow/**]

jobs:
golangci-lint:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/rust-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ name: clippy-action

on:
pull_request:
branches:
- "main"
branches: [main]
paths: [nexus/**]

jobs:
clippy:
Expand Down
7 changes: 3 additions & 4 deletions .github/workflows/ui-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ name: Build & Test UI

on:
push:
branches:
- main
branches: [main]
pull_request:
branches:
- main
branches: [main]
paths: [ui/**]

jobs:
build-test:
Expand Down
7 changes: 3 additions & 4 deletions .github/workflows/ui-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ name: Lint UI

on:
push:
branches:
- main
branches: [main]
pull_request:
branches:
- main
branches: [main]
paths: [ui/**]

permissions:
checks: write
Expand Down
2 changes: 1 addition & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ services:
dockerfile: stacks/flow.Dockerfile
target: flow-snapshot-worker
environment:
<<: [*flow-worker-env]
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ services:
container_name: flow-snapshot-worker
image: ghcr.io/peerdb-io/flow-snapshot-worker:latest-dev
environment:
<<: [*flow-worker-env]
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand Down
3 changes: 2 additions & 1 deletion flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ linters:
- dupl
- gofumpt
- gosec
- gosimple
- misspell
- nakedret
- stylecheck
- unconvert
- unparam
- whitespace
- errcheck
- gosimple
- prealloc
- staticcheck
- ineffassign
- unused
Expand Down
43 changes: 41 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
Expand All @@ -42,6 +43,7 @@ type SlotSnapshotSignal struct {

type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
}

// CheckConnection implements CheckConnection.
Expand Down Expand Up @@ -174,10 +176,38 @@ func (a *FlowableActivity) handleSlotInfo(
) error {
slotInfo, err := srcConn.GetSlotInfo(slotName)
if err != nil {
slog.Warn("warning: failed to get slot info", slog.Any("error", err))
slog.WarnContext(ctx, "warning: failed to get slot info", slog.Any("error", err))
return err
}

deploymentUIDPrefix := ""
if peerdbenv.GetPeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.GetPeerDBDeploymentUID())
}

slotLagInMBThreshold := peerdbenv.GetPeerDBSlotLagMBAlertThreshold()
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
cc: <!channel>`,
deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
maxOpenConnectionsThreshold := peerdbenv.GetPeerDBOpenConnectionsAlertThreshold()
res, err := srcConn.GetOpenConnectionsForUser()
if err != nil {
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName),
fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
` has exceeded threshold size of %d connections, currently at %d connections!
cc: <!channel>`,
deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections))
}

if len(slotInfo) != 0 {
return monitoring.AppendSlotSizeInfo(ctx, a.CatalogPool, peerName, slotInfo[0])
}
Expand All @@ -190,7 +220,13 @@ func (a *FlowableActivity) recordSlotSizePeriodically(
slotName string,
peerName string,
) {
timeout := 10 * time.Minute
// ensures slot info is logged at least once per SyncFlow
err := a.handleSlotInfo(ctx, srcConn, slotName, peerName)
if err != nil {
return
}

timeout := 5 * time.Minute
ticker := time.NewTicker(timeout)

defer ticker.Stop()
Expand Down Expand Up @@ -263,6 +299,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
RecordStream: recordBatch,
SetLastOffset: func(lastOffset int64) error {
return dstConn.SetLastOffset(input.FlowConnectionConfigs.FlowJobName, lastOffset)
},
})
})

Expand Down
7 changes: 7 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,13 @@ func (h *FlowRequestHandler) CreateQRepFlow(
} else {
workflowFn = peerflow.QRepFlowWorkflow
}

if req.QrepConfig.SyncedAtColName == "" {
cfg.SyncedAtColName = "_PEERDB_SYNCED_AT"
} else {
// make them all uppercase
cfg.SyncedAtColName = strings.ToUpper(req.QrepConfig.SyncedAtColName)
}
_, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state)
if err != nil {
slog.Error("unable to start QRepFlow workflow",
Expand Down
2 changes: 2 additions & 0 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/PeerDB-io/peer-flow/activities"
utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/alerting"
peerflow "github.com/PeerDB-io/peer-flow/workflows"

"github.com/grafana/pyroscope-go"
Expand Down Expand Up @@ -133,6 +134,7 @@ func WorkerMain(opts *WorkerOptions) error {
w.RegisterWorkflow(peerflow.HeartbeatFlowWorkflow)
w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(conn),
})

err = w.Run(worker.InterruptCh())
Expand Down
40 changes: 39 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,23 @@ func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) {
}
}

func (c *BigQueryConnector) SetLastOffset(jobName string, lastOffset int64) error {
query := fmt.Sprintf(
"UPDATE %s.%s SET offset = GREATEST(offset, %d) WHERE mirror_job_name = '%s'",
c.datasetID,
MirrorJobsTable,
lastOffset,
jobName,
)
q := c.client.Query(query)
_, err := q.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
}

return nil
}

func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
c.datasetID, MirrorJobsTable, jobName)
Expand Down Expand Up @@ -788,6 +805,11 @@ func (c *BigQueryConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
SyncBatchID: syncBatchID,
NormalizeBatchID: normalizeBatchID,
UnchangedToastColumns: tableNametoUnchangedToastCols[tableName],
peerdbCols: &protos.PeerDBColumns{
SoftDeleteColName: req.SoftDeleteColName,
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
},
}
// normalize anything between last normalized batch id to last sync batchid
mergeStmts := mergeGen.generateMergeStmts()
Expand Down Expand Up @@ -956,7 +978,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
}

// convert the column names and types to bigquery types
columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns))
columns := make([]*bigquery.FieldSchema, len(tableSchema.Columns), len(tableSchema.Columns)+2)
idx := 0
for colName, genericColType := range tableSchema.Columns {
columns[idx] = &bigquery.FieldSchema{
Expand All @@ -967,6 +989,22 @@ func (c *BigQueryConnector) SetupNormalizedTables(
idx++
}

if req.SoftDeleteColName != "" {
columns = append(columns, &bigquery.FieldSchema{
Name: req.SoftDeleteColName,
Type: bigquery.BooleanFieldType,
Repeated: false,
})
}

if req.SyncedAtColName != "" {
columns = append(columns, &bigquery.FieldSchema{
Name: req.SyncedAtColName,
Type: bigquery.TimestampFieldType,
Repeated: false,
})
}

// create the table using the columns
schema := bigquery.Schema(columns)
err = table.Create(c.ctx, &bigquery.TableMetadata{Schema: schema})
Expand Down
Loading

0 comments on commit d64288b

Please sign in to comment.