Skip to content

Commit

Permalink
Merge branch 'main' into ui/queue-sync-interval
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored May 6, 2024
2 parents 857626d + 9a857f7 commit 4e1feb1
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 9 deletions.
10 changes: 5 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (a *FlowableActivity) CreateNormalizedTable(
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
conn, err := connectors.GetAs[connectors.NormalizedTablesConnector](ctx, config.PeerConnectionConfig)
if err != nil {
if err == connectors.ErrUnsupportedFunctionality {
if errors.Is(err, errors.ErrUnsupported) {
logger.Info("Connector does not implement normalized tables")
return nil, nil
}
Expand Down Expand Up @@ -298,7 +298,7 @@ func (a *FlowableActivity) StartNormalize(
logger := activity.GetLogger(ctx)

dstConn, err := connectors.GetCDCNormalizeConnector(ctx, conn.Destination)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
if errors.Is(err, errors.ErrUnsupported) {
err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
input.SyncBatchID)
return nil, err
Expand Down Expand Up @@ -441,7 +441,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
if errors.Is(err, errors.ErrUnsupported) {
return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID)
} else if err != nil {
return err
Expand All @@ -465,7 +465,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dst, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
if errors.Is(err, errors.ErrUnsupported) {
return nil
} else if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down Expand Up @@ -578,7 +578,7 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
func() {
srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source)
if err != nil {
if err != connectors.ErrUnsupportedFunctionality {
if !errors.Is(err, errors.ErrUnsupported) {
logger.Error("Failed to create connector to handle slot info", slog.Any("error", err))
}
return
Expand Down
6 changes: 2 additions & 4 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"github.com/PeerDB-io/peer-flow/otel_metrics"
)

var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality")

type Connector interface {
Close() error
ConnectionActive(context.Context) error
Expand Down Expand Up @@ -245,7 +243,7 @@ func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) {
case *protos.Peer_ElasticsearchConfig:
return connelasticsearch.NewElasticsearchConnector(ctx, inner.ElasticsearchConfig)
default:
return nil, ErrUnsupportedFunctionality
return nil, errors.ErrUnsupported
}
}

Expand All @@ -259,7 +257,7 @@ func GetAs[T Connector](ctx context.Context, config *protos.Peer) (T, error) {
if conn, ok := conn.(T); ok {
return conn, nil
} else {
return none, ErrUnsupportedFunctionality
return none, errors.ErrUnsupported
}
}

Expand Down

0 comments on commit 4e1feb1

Please sign in to comment.