Skip to content

Commit

Permalink
detailed errors, refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Sep 28, 2023
1 parent bbe7c0c commit adbb3e8
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 99 deletions.
41 changes: 28 additions & 13 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,22 +298,25 @@ func (h *FlowRequestHandler) ValidatePeer(
conn, err := connectors.GetConnector(ctx, req.Peer)
if err != nil {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("invalid peer: %s", err),
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("peer type is missing or your requested configuration for %s peer %s was invalidated: %s",
req.Peer.Type, req.Peer.Name, err),
}, nil
}

status := conn.ConnectionActive()
if !status {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("failed to establish connection to peer: %s", err),
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("failed to establish active connection to %s peer %s.",
req.Peer.Type, req.Peer.Name),
}, nil
}

return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_VALID,
Message: "valid peer",
Status: protos.ValidatePeerStatus_VALID,
Message: fmt.Sprintf("%s peer %s is valid",
req.Peer.Type, req.Peer.Name),
}, nil
}

Expand All @@ -334,26 +337,37 @@ func (h *FlowRequestHandler) CreatePeer(

config := req.Peer.Config
wrongConfigResponse := &protos.CreatePeerResponse{
Status: protos.CreatePeerStatus_FAILED,
Message: "wrong config for connector",
Status: protos.CreatePeerStatus_FAILED,
Message: fmt.Sprintf("invalid config for %s peer %s",
req.Peer.Type, req.Peer.Name),
}
var encodedConfig []byte
var encodingErr error
peerType := req.Peer.Type
switch peerType {
case protos.DBType_POSTGRES:
pgConfig := config.(*protos.Peer_PostgresConfig).PostgresConfig
pgConfigObject, ok := config.(*protos.Peer_PostgresConfig)
if !ok {
return wrongConfigResponse, nil
}
pgConfig := pgConfigObject.PostgresConfig

encodedConfig, encodingErr = proto.Marshal(pgConfig)

case protos.DBType_SNOWFLAKE:
sfConfig := config.(*protos.Peer_SnowflakeConfig).SnowflakeConfig
sfConfigObject, ok := config.(*protos.Peer_SnowflakeConfig)
if !ok {
return wrongConfigResponse, nil
}
sfConfig := sfConfigObject.SnowflakeConfig
encodedConfig, encodingErr = proto.Marshal(sfConfig)

default:
return wrongConfigResponse, nil
}
if encodingErr != nil {
log.Errorf("failed to encode peer config: %v", encodingErr)
log.Errorf("failed to encode peer configuration for %s peer %s : %v",
req.Peer.Type, req.Peer.Name, encodingErr)
return nil, encodingErr
}

Expand All @@ -362,8 +376,9 @@ func (h *FlowRequestHandler) CreatePeer(
)
if err != nil {
return &protos.CreatePeerResponse{
Status: protos.CreatePeerStatus_FAILED,
Message: err.Error(),
Status: protos.CreatePeerStatus_FAILED,
Message: fmt.Sprintf("failed to insert into peers table for %s peer %s: %s",
req.Peer.Type, req.Peer.Name, err.Error()),
}, nil
}

Expand Down
21 changes: 11 additions & 10 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connectors
import (
"context"
"errors"
"fmt"

connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub"
Expand All @@ -15,7 +16,6 @@ import (
)

var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality")
var ErrWrongConfig = errors.New("wrong config for connector")

type Connector interface {
Close() error
Expand Down Expand Up @@ -191,26 +191,27 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon
}
}

func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) {
inner := config.Type
func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
inner := peer.Type
switch inner {
case protos.DBType_POSTGRES:
pgConfig := config.GetPostgresConfig()
pgConfig := peer.GetPostgresConfig()

if pgConfig == nil {
return nil, ErrWrongConfig
return nil, fmt.Errorf("missing postgres config for %s peer %s", peer.Type.String(), peer.Name)
}
return connpostgres.NewPostgresConnector(ctx, pgConfig)
case protos.DBType_BIGQUERY:
bqConfig := config.GetBigqueryConfig()
bqConfig := peer.GetBigqueryConfig()
if bqConfig == nil {
return nil, ErrWrongConfig
return nil, fmt.Errorf("missing bigquery config for %s peer %s", peer.Type.String(), peer.Name)
}
return connbigquery.NewBigQueryConnector(ctx, bqConfig)

case protos.DBType_SNOWFLAKE:
sfConfig := config.GetSnowflakeConfig()
sfConfig := peer.GetSnowflakeConfig()
if sfConfig == nil {
return nil, ErrWrongConfig
return nil, fmt.Errorf("missing snowflake config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsnowflake.NewSnowflakeConnector(ctx, sfConfig)
// case protos.DBType_S3:
Expand All @@ -220,7 +221,7 @@ func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) {
// case protos.DBType_SQLSERVER:
// return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig())
default:
return nil, ErrUnsupportedFunctionality
return nil, fmt.Errorf("unsupported peer type %s", peer.Type.String())
}
}

Expand Down
133 changes: 71 additions & 62 deletions flow/generated/protos/route.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions nexus/pt/src/peerdb_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ pub struct CreatePeerResponse {
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum ValidatePeerStatus {
Valid = 0,
Invalid = 1,
CreationUnknown = 0,
Valid = 1,
Invalid = 2,
}
impl ValidatePeerStatus {
/// String value of the enum field names used in the ProtoBuf definition.
Expand All @@ -94,13 +95,15 @@ impl ValidatePeerStatus {
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
ValidatePeerStatus::CreationUnknown => "CREATION_UNKNOWN",
ValidatePeerStatus::Valid => "VALID",
ValidatePeerStatus::Invalid => "INVALID",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"CREATION_UNKNOWN" => Some(Self::CreationUnknown),
"VALID" => Some(Self::Valid),
"INVALID" => Some(Self::Invalid),
_ => None,
Expand All @@ -110,8 +113,9 @@ impl ValidatePeerStatus {
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum CreatePeerStatus {
Created = 0,
Failed = 1,
ValidationUnknown = 0,
Created = 1,
Failed = 2,
}
impl CreatePeerStatus {
/// String value of the enum field names used in the ProtoBuf definition.
Expand All @@ -120,13 +124,15 @@ impl CreatePeerStatus {
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
CreatePeerStatus::ValidationUnknown => "VALIDATION_UNKNOWN",
CreatePeerStatus::Created => "CREATED",
CreatePeerStatus::Failed => "FAILED",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"VALIDATION_UNKNOWN" => Some(Self::ValidationUnknown),
"CREATED" => Some(Self::Created),
"FAILED" => Some(Self::Failed),
_ => None,
Expand Down
Loading

0 comments on commit adbb3e8

Please sign in to comment.