Skip to content

Commit

Permalink
attempt to skip a table to sync
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Mar 6, 2024
1 parent 0a32419 commit 625202e
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 4 deletions.
44 changes: 44 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -23,6 +24,7 @@ import (
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
Expand Down Expand Up @@ -528,13 +530,48 @@ func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *
return nil
}

func (a *FlowableActivity) skipTableForQRep(
ctx context.Context,
flowName string,
tableName string,
) bool {
tablesToSkip := dynamicconf.PeerDBMirrorTablesToSkip(ctx)
if tablesToSkip == nil {
return false
}

for flowNamePrefix, tables := range tablesToSkip {
if !strings.HasPrefix(flowName, flowNamePrefix) {
continue
}

for _, table := range tables {
if table == tableName {
return true
}
}
}

return false
}

// GetQRepPartitions returns the partitions for a given QRepConfig.
func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
config *protos.QRepConfig,
last *protos.QRepPartition,
runUUID string,
) (*protos.QRepParitionResult, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)

// check if the table can be skipped.
if a.skipTableForQRep(ctx, config.FlowJobName, config.WatermarkTable) {
logger := activity.GetLogger(ctx)
logger.Info("skipping table for qrep get partitions: " + config.WatermarkTable)
return &protos.QRepParitionResult{
Partitions: nil,
}, nil
}

srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return nil, fmt.Errorf("failed to get qrep pull connector: %w", err)
Expand Down Expand Up @@ -716,6 +753,13 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig,
runUUID string,
) error {
// check if the table can be skipped.
if a.skipTableForQRep(ctx, config.FlowJobName, config.WatermarkTable) {
logger := activity.GetLogger(ctx)
logger.Info("skipping table for qrep in consolidate: " + config.WatermarkTable)
return nil
}

dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID)
Expand Down
46 changes: 42 additions & 4 deletions flow/dynamicconf/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package dynamicconf

import (
"context"
"encoding/json"
"errors"
"strconv"
"time"

Expand All @@ -24,22 +26,32 @@ func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) b
return exists.Bool
}

func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uint32 {
func dynamicConfString(ctx context.Context, key string) (pgtype.Text, error) {
var value pgtype.Text
conn, err := utils.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err)
return defaultValue
return value, err
}

if !dynamicConfKeyExists(ctx, conn, key) {
return defaultValue
logger.LoggerFromCtx(ctx).Error("Key does not exist")
return value, errors.New("Key does not exist")
}

var value pgtype.Text
query := "SELECT config_value FROM alerting_settings WHERE config_name = $1"
err = conn.QueryRow(ctx, query, key).Scan(&value)
if err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err)
return value, err
}

return value, nil
}

func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uint32 {
value, err := dynamicConfString(ctx, key)
if err != nil {
return defaultValue
}

Expand Down Expand Up @@ -67,3 +79,29 @@ func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) time.Duration {
func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 {
return dynamicConfUint32(ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5)
}

// PEERDB_MIRROR_TABLES_TO_SKIP, is a JSON array with { "mirror_name_prefix": <>, "tables_to_skip": [] }
func PeerDBMirrorTablesToSkip(ctx context.Context) map[string][]string {
value, err := dynamicConfString(ctx, "PEERDB_MIRROR_TABLES_TO_SKIP")
if err != nil || !value.Valid {
return make(map[string][]string)
}

type mirrorTablesToSkip struct {
MirrorName string `json:"mirror_name_prefix"`
TablesToSkip []string `json:"tables_to_skip"`
}

var configs []mirrorTablesToSkip
if err := json.Unmarshal([]byte(value.String), &configs); err != nil {
logger.LoggerFromCtx(ctx).Error("Failed to unmarshal mirror tables to skip: %v", err)
return make(map[string][]string)
}

result := make(map[string][]string)
for _, config := range configs {
result[config.MirrorName] = config.TablesToSkip
}

return result
}

0 comments on commit 625202e

Please sign in to comment.