Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Peer APIs #445

Merged
merged 6 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand Down Expand Up @@ -275,3 +276,99 @@ func (h *FlowRequestHandler) ListPeers(
Peers: peers,
}, nil
}

func (h *FlowRequestHandler) ValidatePeer(
ctx context.Context,
req *protos.ValidatePeerRequest,
) (*protos.ValidatePeerResponse, error) {
if req.Peer == nil {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: "no peer provided",
}, nil
}

if len(req.Peer.Name) == 0 {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: "no peer name provided",
}, nil
}

conn, err := connectors.GetConnector(ctx, req.Peer)
if err != nil {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("invalid peer: %s", err),
}, nil
}

status := conn.ConnectionActive()
if !status {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("failed to establish connection to peer: %s", err),
}, nil
}

return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_VALID,
Message: "valid peer",
}, nil
}

func (h *FlowRequestHandler) CreatePeer(
ctx context.Context,
req *protos.CreatePeerRequest,
) (*protos.CreatePeerResponse, error) {
status, validateErr := h.ValidatePeer(ctx, &protos.ValidatePeerRequest{Peer: req.Peer})
if validateErr != nil {
return nil, validateErr
}
if status.Status != protos.ValidatePeerStatus_VALID {
return &protos.CreatePeerResponse{
Status: protos.CreatePeerStatus_FAILED,
Message: status.Message,
}, nil
}

config := req.Peer.Config
wrongConfigResponse := &protos.CreatePeerResponse{
Status: protos.CreatePeerStatus_FAILED,
Message: "wrong config for connector",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more detailed response

}
var encodedConfig []byte
var encodingErr error
peerType := req.Peer.Type
switch peerType {
case protos.DBType_POSTGRES:
pgConfig := config.(*protos.Peer_PostgresConfig).PostgresConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets do pgConfig, ok and check if the cast is Ok, do this else where too

encodedConfig, encodingErr = proto.Marshal(pgConfig)

case protos.DBType_SNOWFLAKE:
sfConfig := config.(*protos.Peer_SnowflakeConfig).SnowflakeConfig
encodedConfig, encodingErr = proto.Marshal(sfConfig)

default:
return wrongConfigResponse, nil
}
if encodingErr != nil {
log.Errorf("failed to encode peer config: %v", encodingErr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for every error, add some info about the request that resulted in this response

return nil, encodingErr
}

_, err := h.pool.Exec(ctx, "INSERT INTO peers (name, type, options) VALUES ($1, $2, $3)",
req.Peer.Name, peerType, encodedConfig,
)
if err != nil {
return &protos.CreatePeerResponse{
Status: protos.CreatePeerStatus_FAILED,
Message: err.Error(),
}, nil
}

return &protos.CreatePeerResponse{
Status: protos.CreatePeerStatus_CREATED,
Message: "",
}, nil
}
34 changes: 34 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
)

var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality")
var ErrWrongConfig = errors.New("wrong config for connector")

type Connector interface {
Close() error
Expand Down Expand Up @@ -190,6 +191,39 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon
}
}

func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) {
inner := config.Type
switch inner {
case protos.DBType_POSTGRES:
pgConfig := config.GetPostgresConfig()
if pgConfig == nil {
return nil, ErrWrongConfig
}
return connpostgres.NewPostgresConnector(ctx, pgConfig)
case protos.DBType_BIGQUERY:
bqConfig := config.GetBigqueryConfig()
if bqConfig == nil {
return nil, ErrWrongConfig
}
return connbigquery.NewBigQueryConnector(ctx, bqConfig)

case protos.DBType_SNOWFLAKE:
sfConfig := config.GetSnowflakeConfig()
if sfConfig == nil {
return nil, ErrWrongConfig
}
return connsnowflake.NewSnowflakeConnector(ctx, sfConfig)
// case protos.DBType_S3:
// return conns3.NewS3Connector(ctx, config.GetS3Config())
// case protos.DBType_EVENTHUB:
// return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig())
// case protos.DBType_SQLSERVER:
// return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig())
default:
return nil, ErrUnsupportedFunctionality
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you return a more detailed error with the info of the Peer and that it isn't supported?

}
}

func GetQRepConsolidateConnector(ctx context.Context,
config *protos.Peer) (QRepConsolidateConnector, error) {
inner := config.Config
Expand Down
Loading
Loading