From 9a857f743124cd258ea9a807ae5b437fd5082d90 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Mon, 6 May 2024 22:57:34 +0530 Subject: [PATCH] swapping out custom error for errors.ErrUnsupported (#1674) turns out reading release notes once in a while helps --- flow/activities/flowable.go | 10 +++++----- flow/connectors/core.go | 6 ++---- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 9c03346734..2291400b03 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -154,7 +154,7 @@ func (a *FlowableActivity) CreateNormalizedTable( ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) conn, err := connectors.GetAs[connectors.NormalizedTablesConnector](ctx, config.PeerConnectionConfig) if err != nil { - if err == connectors.ErrUnsupportedFunctionality { + if errors.Is(err, errors.ErrUnsupported) { logger.Info("Connector does not implement normalized tables") return nil, nil } @@ -298,7 +298,7 @@ func (a *FlowableActivity) StartNormalize( logger := activity.GetLogger(ctx) dstConn, err := connectors.GetCDCNormalizeConnector(ctx, conn.Destination) - if errors.Is(err, connectors.ErrUnsupportedFunctionality) { + if errors.Is(err, errors.ErrUnsupported) { err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, input.SyncBatchID) return nil, err @@ -441,7 +441,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config ) error { ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer) - if errors.Is(err, connectors.ErrUnsupportedFunctionality) { + if errors.Is(err, errors.ErrUnsupported) { return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID) } else if err != nil { return err @@ -465,7 +465,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error { ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) dst, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer) - if errors.Is(err, connectors.ErrUnsupportedFunctionality) { + if errors.Is(err, errors.ErrUnsupported) { return nil } else if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) @@ -578,7 +578,7 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { func() { srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source) if err != nil { - if err != connectors.ErrUnsupportedFunctionality { + if !errors.Is(err, errors.ErrUnsupported) { logger.Error("Failed to create connector to handle slot info", slog.Any("error", err)) } return diff --git a/flow/connectors/core.go b/flow/connectors/core.go index b6f75c4828..dc7afe0495 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -25,8 +25,6 @@ import ( "github.com/PeerDB-io/peer-flow/otel_metrics" ) -var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality") - type Connector interface { Close() error ConnectionActive(context.Context) error @@ -245,7 +243,7 @@ func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) { case *protos.Peer_ElasticsearchConfig: return connelasticsearch.NewElasticsearchConnector(ctx, inner.ElasticsearchConfig) default: - return nil, ErrUnsupportedFunctionality + return nil, errors.ErrUnsupported } } @@ -259,7 +257,7 @@ func GetAs[T Connector](ctx context.Context, config *protos.Peer) (T, error) { if conn, ok := conn.(T); ok { return conn, nil } else { - return none, ErrUnsupportedFunctionality + return none, errors.ErrUnsupported } }