Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into cdc-parallel-sync-nor…
Browse files Browse the repository at this point in the history
…malize
  • Loading branch information
serprex committed Dec 12, 2023
2 parents d66c98e + 8a0480f commit ab02ec3
Show file tree
Hide file tree
Showing 62 changed files with 769 additions and 992 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dev-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
docker-build:
strategy:
matrix:
runner: [ubicloud-standard-2-ubuntu-2204-arm]
runner: [ubuntu-latest]
runs-on: ${{ matrix.runner }}
permissions:
contents: read
Expand Down
98 changes: 36 additions & 62 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"log/slog"
"sync"
"time"

Expand All @@ -21,7 +22,6 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -169,7 +169,7 @@ func (a *FlowableActivity) handleSlotInfo(
) error {
slotInfo, err := srcConn.GetSlotInfo(slotName)
if err != nil {
log.Warnf("warning: failed to get slot info: %v", err)
slog.Warn("warning: failed to get slot info", slog.Any("error", err))
return err
}

Expand Down Expand Up @@ -214,26 +214,19 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return nil, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(dstConn)

log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("initializing table schema...")
slog.InfoContext(ctx, "initializing table schema...")
err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping)
if err != nil {
return nil, fmt.Errorf("failed to initialize table schema: %w", err)
}
activity.RecordHeartbeat(ctx, "initialized table schema")

log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Info("pulling records...")

slog.InfoContext(ctx, "pulling records...")
tblNameMapping := make(map[string]model.NameAndExclude)
for _, v := range input.FlowConnectionConfigs.TableMappings {
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 10)
idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60)

recordBatch := model.NewCDCRecordStream()

Expand Down Expand Up @@ -271,10 +264,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
})

hasRecords := !recordBatch.WaitAndCheckEmpty()
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("the current sync flow has records: %v", hasRecords)

slog.InfoContext(ctx, fmt.Sprintf("the current sync flow has records: %v", hasRecords))
if a.CatalogPool != nil && hasRecords {
syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName)
if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB {
Expand All @@ -300,8 +290,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
if err != nil {
return nil, fmt.Errorf("failed to pull records: %w", err)
}

log.WithFields(log.Fields{"flowName": input.FlowConnectionConfigs.FlowJobName}).Info("no records to push")
slog.InfoContext(ctx, "no records to push")
syncResponse := &model.SyncResponse{}
syncResponse.RelationMessageMapping = <-recordBatch.RelationMessageMapping
syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings)
Expand All @@ -326,7 +315,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
PushParallelism: input.FlowConnectionConfigs.PushParallelism,
})
if err != nil {
log.Warnf("failed to push records: %v", err)
slog.Warn("failed to push records", slog.Any("error", err))
return nil, fmt.Errorf("failed to push records: %w", err)
}

Expand All @@ -337,9 +326,10 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

numRecords := res.NumRecordsSynced
syncDuration := time.Since(syncStartTime)
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("pushed %d records in %d seconds\n", numRecords, int(syncDuration.Seconds()))

slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds\n",
numRecords, int(syncDuration.Seconds())),
)

lastCheckpoint, err := recordBatch.GetLastCheckpoint()
if err != nil {
Expand Down Expand Up @@ -416,7 +406,7 @@ func (a *FlowableActivity) StartNormalize(
shutdown <- struct{}{}
}()

log.Info("initializing table schema...")
slog.InfoContext(ctx, "initializing table schema...")
err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping)
if err != nil {
return nil, fmt.Errorf("failed to initialize table schema: %w", err)
Expand Down Expand Up @@ -448,7 +438,8 @@ func (a *FlowableActivity) StartNormalize(

// log the number of batches normalized
if res != nil {
log.Infof("normalized records from batch %d to batch %d\n", res.StartBatchID, res.EndBatchID)
slog.InfoContext(ctx, fmt.Sprintf("normalized records from batch %d to batch %d\n",
res.StartBatchID, res.EndBatchID))
}

return res, nil
Expand Down Expand Up @@ -534,10 +525,12 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
}

numPartitions := len(partitions.Partitions)
log.Infof("replicating partitions for job - %s - batch %d - size: %d\n",
config.FlowJobName, partitions.BatchId, numPartitions)

slog.InfoContext(ctx, fmt.Sprintf("replicating partitions for batch %d - size: %d\n",
partitions.BatchId, numPartitions),
)
for i, p := range partitions.Partitions {
log.Infof("batch-%d - replicating partition - %s\n", partitions.BatchId, p.PartitionId)
slog.InfoContext(ctx, fmt.Sprintf("batch-%d - replicating partition - %s\n", partitions.BatchId, p.PartitionId))
err := a.replicateQRepPartition(ctx, config, i+1, numPartitions, p, runUUID)
if err != nil {
return err
Expand Down Expand Up @@ -573,7 +566,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
defer connectors.CloseConnector(dstConn)

log.Infof("replicating partition %s\n", partition.PartitionId)
slog.InfoContext(ctx, fmt.Sprintf("replicating partition %s\n", partition.PartitionId))

var stream *model.QRecordStream
bufferSize := shared.FetchAndChannelSize
Expand All @@ -589,14 +582,12 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
tmp, err := pgConn.PullQRepRecordStream(config, partition, stream)
numRecords := int64(tmp)
if err != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Errorf("failed to pull records: %v", err)
slog.Error("failed to pull records", slog.Any("error", err))
goroutineErr = err
} else {
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords)
if err != nil {
log.Errorf("%v", err)
slog.Error(fmt.Sprintf("%v", err))
goroutineErr = err
}
}
Expand All @@ -610,9 +601,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return fmt.Errorf("failed to pull records: %w", err)
}
numRecords := int64(recordBatch.NumRecords)
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("pulled %d records\n", len(recordBatch.Records))
slog.InfoContext(ctx, fmt.Sprintf("pulled %d records\n", len(recordBatch.Records)))

err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords)
if err != nil {
Expand Down Expand Up @@ -640,9 +629,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,

if rowsSynced == 0 {
pullCancel()
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("no records to push for partition %s\n", partition.PartitionId)
slog.InfoContext(ctx, fmt.Sprintf("no records to push for partition %s\n", partition.PartitionId))
} else {
wg.Wait()
if goroutineErr != nil {
Expand All @@ -654,9 +641,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return err
}

log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("pushed %d records\n", rowsSynced)
slog.InfoContext(ctx, fmt.Sprintf("pushed %d records\n", rowsSynced))
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand Down Expand Up @@ -767,12 +752,12 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
for {
select {
case <-ctx.Done():
log.Info("context is done, exiting wal heartbeat send loop")
slog.InfoContext(ctx, "context is done, exiting wal heartbeat send loop")
return nil
case <-ticker.C:
pgPeers, err := a.getPostgresPeerConfigs(ctx)
if err != nil {
log.Warn("[sendwalheartbeat]: warning: unable to fetch peers." +
slog.Warn("[sendwalheartbeat]: warning: unable to fetch peers." +
"Skipping walheartbeat send. error encountered: " + err.Error())
continue
}
Expand All @@ -795,15 +780,15 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {

_, err := peerConn.Exec(ctx, command)
if err != nil {
log.Warnf("warning: could not send walheartbeat to peer %v: %v", pgPeer.Name, err)
slog.Warn(fmt.Sprintf("warning: could not send walheartbeat to peer %v: %v", pgPeer.Name, err))
}

closeErr := peerConn.Close(ctx)
if closeErr != nil {
return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, closeErr)
}
log.Infof("sent walheartbeat to peer %v", pgPeer.Name)
slog.InfoContext(ctx, fmt.Sprintf("sent walheartbeat to peer %v", pgPeer.Name))
}
}
ticker.Stop()
Expand All @@ -827,10 +812,7 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
}
defer connectors.CloseConnector(srcConn)
pgSrcConn := srcConn.(*connpostgres.PostgresConnector)

log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("current last partition value is %v\n", last)
slog.InfoContext(ctx, fmt.Sprintf("current last partition value is %v\n", last))
attemptCount := 1
for {
activity.RecordHeartbeat(ctx, fmt.Sprintf("no new rows yet, attempt #%d", attemptCount))
Expand Down Expand Up @@ -917,9 +899,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}
defer connectors.CloseConnector(dstConn)

log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Info("replicating xmin\n")
slog.InfoContext(ctx, "replicating xmin\n")

bufferSize := shared.FetchAndChannelSize
errGroup, errCtx := errgroup.WithContext(ctx)
Expand All @@ -933,9 +913,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
var numRecords int
numRecords, currentSnapshotXmin, pullErr = pgConn.PullXminRecordStream(config, partition, stream)
if pullErr != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Errorf("failed to pull records: %v", err)
slog.InfoContext(ctx, fmt.Sprintf("failed to pull records: %v", err))
return err
}

Expand Down Expand Up @@ -964,7 +942,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,

err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
if err != nil {
log.Errorf("%v", err)
slog.Error(fmt.Sprintf("%v", err))
return err
}

Expand All @@ -985,9 +963,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}

if rowsSynced == 0 {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Info("no records to push for xmin\n")
slog.InfoContext(ctx, "no records to push for xmin\n")
} else {
err := errGroup.Wait()
if err != nil {
Expand All @@ -999,9 +975,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
return 0, err
}

log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("pushed %d records\n", rowsSynced)
slog.InfoContext(ctx, fmt.Sprintf("pushed %d records\n", rowsSynced))
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand Down
16 changes: 5 additions & 11 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package activities
import (
"context"
"fmt"
"log/slog"

"github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/generated/protos"
log "github.com/sirupsen/logrus"
)

type SnapshotActivity struct {
Expand All @@ -34,7 +34,7 @@ func (a *SnapshotActivity) SetupReplication(
) (*protos.SetupReplicationOutput, error) {
dbType := config.PeerConnectionConfig.Type
if dbType != protos.DBType_POSTGRES {
log.Infof("setup replication is no-op for %s", dbType)
slog.InfoContext(ctx, fmt.Sprintf("setup replication is no-op for %s", dbType))
return nil, nil
}

Expand All @@ -53,23 +53,17 @@ func (a *SnapshotActivity) SetupReplication(
pgConn := conn.(*connpostgres.PostgresConnector)
err = pgConn.SetupReplication(slotSignal, config)
if err != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Errorf("failed to setup replication: %v", err)
slog.ErrorContext(ctx, "failed to setup replication", slog.Any("error", err))
replicationErr <- err
return
}
}()

log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Info("waiting for slot to be created...")
slog.InfoContext(ctx, "waiting for slot to be created...")
var slotInfo *connpostgres.SlotCreationResult
select {
case slotInfo = <-slotSignal.SlotCreated:
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("slot '%s' created", slotInfo.SlotName)
slog.InfoContext(ctx, fmt.Sprintf("slot '%s' created", slotInfo.SlotName))
case err := <-replicationErr:
return nil, fmt.Errorf("failed to setup replication: %w", err)
}
Expand Down
Loading

0 comments on commit ab02ec3

Please sign in to comment.