Skip to content

Commit

Permalink
feat: apis for validate and create peer
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Sep 28, 2023
1 parent 6215e53 commit 533f627
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 158 deletions.
97 changes: 97 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand Down Expand Up @@ -275,3 +276,99 @@ func (h *FlowRequestHandler) ListPeers(
Peers: peers,
}, nil
}

func (h *FlowRequestHandler) ValidatePeer(
ctx context.Context,
req *protos.ValidatePeerRequest,
) (*protos.ValidatePeerResponse, error) {
if req.Peer == nil {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: "no peer provided",
}, nil
}

if len(req.Peer.Name) == 0 {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: "no peer name provided",
}, nil
}

conn, err := connectors.GetConnector(ctx, req.Peer)
if err != nil {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("invalid peer: %s", err),
}, nil
}

status := conn.ConnectionActive()
if !status {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("failed to establish connection to peer: %s", err),
}, nil
}

return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_VALID,
Message: "valid peer",
}, nil
}

func (h *FlowRequestHandler) CreatePeer(
ctx context.Context,
req *protos.CreatePeerRequest,
) (*protos.CreatePeerResponse, error) {
status, validateErr := h.ValidatePeer(ctx, &protos.ValidatePeerRequest{Peer: req.Peer})
if validateErr != nil {
return nil, validateErr
}
if status.Status != protos.ValidatePeerStatus_VALID {
return &protos.CreatePeerResponse{
Status: protos.CreatePeerStatus_FAILED,
Message: status.Message,
}, nil
}

config := req.Peer.Config
wrongConfigResponse := &protos.CreatePeerResponse{
Status: protos.CreatePeerStatus_FAILED,
Message: "wrong config for connector",
}
var encodedConfig []byte
var encodingErr error
peerType := req.Peer.Type
switch peerType {
case protos.DBType_POSTGRES:
pgConfig := config.(*protos.Peer_PostgresConfig).PostgresConfig
encodedConfig, encodingErr = proto.Marshal(pgConfig)

case protos.DBType_SNOWFLAKE:
sfConfig := config.(*protos.Peer_SnowflakeConfig).SnowflakeConfig
encodedConfig, encodingErr = proto.Marshal(sfConfig)

default:
return wrongConfigResponse, nil
}
if encodingErr != nil {
log.Errorf("failed to encode peer config: %v", encodingErr)
return nil, encodingErr
}

_, err := h.pool.Exec(ctx, "INSERT INTO peers (name, type, options) VALUES ($1, $2, $3)",
req.Peer.Name, peerType, encodedConfig,
)
if err != nil {
return &protos.CreatePeerResponse{
Status: protos.CreatePeerStatus_FAILED,
Message: err.Error(),
}, nil
}

return &protos.CreatePeerResponse{
Status: protos.CreatePeerStatus_CREATED,
Message: "",
}, nil
}
34 changes: 34 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
)

var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality")
var ErrWrongConfig = errors.New("wrong config for connector")

type Connector interface {
Close() error
Expand Down Expand Up @@ -190,6 +191,39 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon
}
}

func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) {
inner := config.Type
switch inner {
case protos.DBType_POSTGRES:
pgConfig := config.GetPostgresConfig()
if pgConfig == nil {
return nil, ErrWrongConfig
}
return connpostgres.NewPostgresConnector(ctx, pgConfig)
case protos.DBType_BIGQUERY:
bqConfig := config.GetBigqueryConfig()
if bqConfig == nil {
return nil, ErrWrongConfig
}
return connbigquery.NewBigQueryConnector(ctx, bqConfig)

case protos.DBType_SNOWFLAKE:
sfConfig := config.GetSnowflakeConfig()
if sfConfig == nil {
return nil, ErrWrongConfig
}
return connsnowflake.NewSnowflakeConnector(ctx, sfConfig)
// case protos.DBType_S3:
// return conns3.NewS3Connector(ctx, config.GetS3Config())
// case protos.DBType_EVENTHUB:
// return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig())
// case protos.DBType_SQLSERVER:
// return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig())
default:
return nil, ErrUnsupportedFunctionality
}
}

func GetQRepConsolidateConnector(ctx context.Context,
config *protos.Peer) (QRepConsolidateConnector, error) {
inner := config.Config
Expand Down
129 changes: 61 additions & 68 deletions flow/generated/protos/route.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 533f627

Please sign in to comment.