Skip to content

Commit

Permalink
Merge branch 'main' into cdc-parallel-sync-normalize
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 18, 2023
2 parents b569852 + 52a013c commit f28e360
Show file tree
Hide file tree
Showing 98 changed files with 1,555 additions and 1,687 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golang-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
pull-requests: write
strategy:
matrix:
runner: [ubicloud-standard-2-ubuntu-2204-arm]
runner: [ubicloud-standard-4-ubuntu-2204-arm]
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
Expand Down
2 changes: 1 addition & 1 deletion flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ linters:
enable:
- dogsled
- dupl
- gofmt
- gofumpt
- gosec
- misspell
- nakedret
Expand Down
47 changes: 27 additions & 20 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -87,7 +88,12 @@ func (a *FlowableActivity) GetLastSyncedID(
}
defer connectors.CloseConnector(dstConn)

return dstConn.GetLastOffset(config.FlowJobName)
var lastOffset int64
lastOffset, err = dstConn.GetLastOffset(config.FlowJobName)
if err != nil {
return nil, err
}
return &protos.LastSyncState{Checkpoint: lastOffset}, nil
}

// EnsurePullability implements EnsurePullability.
Expand All @@ -114,7 +120,6 @@ func (a *FlowableActivity) CreateRawTable(
ctx context.Context,
config *protos.CreateRawTableInput,
) (*protos.CreateRawTableOutput, error) {
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand Down Expand Up @@ -205,10 +210,10 @@ func (a *FlowableActivity) recordSlotSizePeriodically(
}

func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput) (*model.SyncResponse, error) {
input *protos.StartFlowInput,
) (*model.SyncResponse, error) {
activity.RecordHeartbeat(ctx, "starting flow...")
conn := input.FlowConnectionConfigs
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogPool)
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
Expand All @@ -226,8 +231,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60)

recordBatch := model.NewCDCRecordStream()

startTime := time.Now()
Expand All @@ -248,13 +251,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

// start a goroutine to pull records from the source
errGroup.Go(func() error {
return srcConn.PullRecords(&model.PullRecordsRequest{
return srcConn.PullRecords(a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastSyncState: input.LastSyncState,
LastOffset: input.LastSyncState.Checkpoint,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: time.Duration(idleTimeout) * time.Second,
IdleTimeout: peerdbenv.GetPeerDBCDCIdleTimeoutSeconds(),
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
Expand All @@ -273,11 +276,10 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

err = monitoring.AddCDCBatchForFlow(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
monitoring.CDCBatchInfo{
BatchID: syncBatchID + 1,
RowsInBatch: 0,
BatchStartLSN: pglogrepl.LSN(recordBatch.GetFirstCheckpoint()),
BatchEndlSN: 0,
StartTime: startTime,
BatchID: syncBatchID + 1,
RowsInBatch: 0,
BatchEndlSN: 0,
StartTime: startTime,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -554,6 +556,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}

pullCtx, pullCancel := context.WithCancel(ctx)
defer pullCancel()
srcConn, err := connectors.GetQRepPullConnector(pullCtx, config.SourcePeer)
if err != nil {
return fmt.Errorf("failed to get qrep source connector: %w", err)
Expand Down Expand Up @@ -628,7 +631,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}

if rowsSynced == 0 {
pullCancel()
slog.InfoContext(ctx, fmt.Sprintf("no records to push for partition %s\n", partition.PartitionId))
} else {
wg.Wait()
Expand All @@ -653,7 +655,8 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}

func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig,
runUUID string) error {
runUUID string,
) error {
dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID)
Expand Down Expand Up @@ -797,7 +800,8 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
}

func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
config *protos.QRepConfig, last *protos.QRepPartition) error {
config *protos.QRepConfig, last *protos.QRepPartition,
) error {
if config.SourcePeer.Type != protos.DBType_POSTGRES || last.Range == nil {
return nil
}
Expand Down Expand Up @@ -833,7 +837,8 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
}

func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (
*protos.RenameTablesOutput, error) {
*protos.RenameTablesOutput, error,
) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Peer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -857,7 +862,8 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
}

func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) (
*protos.CreateTablesFromExistingOutput, error) {
*protos.CreateTablesFromExistingOutput, error,
) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, req.Peer)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand Down Expand Up @@ -927,7 +933,8 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
Range: &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{
IntRange: &protos.IntPartitionRange{Start: 0, End: int64(numRecords)},
}},
},
},
}
}
updateErr := monitoring.InitializeQRepRun(ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
Expand Down
4 changes: 2 additions & 2 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)

if err != nil {
return nil, fmt.Errorf("unable to dial grpc server: %w", err)
}
Expand All @@ -68,7 +67,8 @@ func killExistingHeartbeatFlows(
ctx context.Context,
tc client.Client,
namespace string,
taskQueue string) error {
taskQueue string,
) error {
listRes, err := tc.ListWorkflow(ctx,
&workflowservice.ListWorkflowExecutionsRequest{
Namespace: namespace,
Expand Down
13 changes: 8 additions & 5 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func schemaForTableIdentifier(tableIdentifier string, peerDBType int32) string {
}

func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
req *protos.CreateCDCFlowRequest, workflowID string) error {
req *protos.CreateCDCFlowRequest, workflowID string,
) error {
sourcePeerID, sourePeerType, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.Source.Name)
if srcErr != nil {
return fmt.Errorf("unable to get peer id for source peer %s: %w",
Expand Down Expand Up @@ -86,7 +87,8 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
}

func (h *FlowRequestHandler) createQrepJobEntry(ctx context.Context,
req *protos.CreateQRepFlowRequest, workflowID string) error {
req *protos.CreateQRepFlowRequest, workflowID string,
) error {
sourcePeerName := req.QrepConfig.SourcePeer.Name
sourcePeerID, _, srcErr := h.getPeerID(ctx, sourcePeerName)
if srcErr != nil {
Expand Down Expand Up @@ -117,7 +119,8 @@ func (h *FlowRequestHandler) createQrepJobEntry(ctx context.Context,
}

func (h *FlowRequestHandler) CreateCDCFlow(
ctx context.Context, req *protos.CreateCDCFlowRequest) (*protos.CreateCDCFlowResponse, error) {
ctx context.Context, req *protos.CreateCDCFlowRequest,
) (*protos.CreateCDCFlowResponse, error) {
cfg := req.ConnectionConfigs
workflowID := fmt.Sprintf("%s-peerflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand Down Expand Up @@ -223,7 +226,8 @@ func (h *FlowRequestHandler) removeFlowEntryInCatalog(
}

func (h *FlowRequestHandler) CreateQRepFlow(
ctx context.Context, req *protos.CreateQRepFlowRequest) (*protos.CreateQRepFlowResponse, error) {
ctx context.Context, req *protos.CreateQRepFlowRequest,
) (*protos.CreateQRepFlowResponse, error) {
cfg := req.QrepConfig
workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand Down Expand Up @@ -696,5 +700,4 @@ func (h *FlowRequestHandler) DropPeer(
return &protos.DropPeerResponse{
Ok: true,
}, nil

}
4 changes: 2 additions & 2 deletions flow/cmd/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package main
import (
"context"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

func (h *FlowRequestHandler) GetVersion(
ctx context.Context,
req *protos.PeerDBVersionRequest,
) (*protos.PeerDBVersionResponse, error) {
version := utils.GetEnvString("PEERDB_VERSION_SHA_SHORT", "unknown")
version := peerdbenv.GetPeerDBVersionShaShort()
return &protos.PeerDBVersionResponse{Version: version}, nil
}
3 changes: 1 addition & 2 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func setupPyroscope(opts *WorkerOptions) {
ServerAddress: opts.PyroscopeServer,

// you can disable logging by setting this to nil
Logger: pyroscope.StandardLogger,
Logger: nil,

// you can provide static tags via a map:
Tags: map[string]string{"hostname": os.Getenv("HOSTNAME")},
Expand All @@ -66,7 +66,6 @@ func setupPyroscope(opts *WorkerOptions) {
pyroscope.ProfileBlockDuration,
},
})

if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/concurrency/bound_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type BoundSelector struct {
ferrors []error
}

func NewBoundSelector(limit int, total int, ctx workflow.Context) *BoundSelector {
func NewBoundSelector(limit int, ctx workflow.Context) *BoundSelector {
return &BoundSelector{
ctx: ctx,
limit: limit,
Expand Down
Loading

0 comments on commit f28e360

Please sign in to comment.