Skip to content

Commit

Permalink
Create Peer APIs (#445)
Browse files Browse the repository at this point in the history
- Introduces two functions on the Flow API side:
ValidatePeer and CreatePeer

---------

Co-authored-by: Kevin K Biju <[email protected]>
  • Loading branch information
Amogh-Bharadwaj and heavycrystal authored Sep 28, 2023
1 parent 8598d15 commit bdba9ae
Show file tree
Hide file tree
Showing 10 changed files with 1,948 additions and 65 deletions.
113 changes: 113 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand Down Expand Up @@ -275,3 +276,115 @@ func (h *FlowRequestHandler) ListPeers(
Peers: peers,
}, nil
}

func (h *FlowRequestHandler) ValidatePeer(
ctx context.Context,
req *protos.ValidatePeerRequest,
) (*protos.ValidatePeerResponse, error) {
if req.Peer == nil {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: "no peer provided",
}, nil
}

if len(req.Peer.Name) == 0 {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: "no peer name provided",
}, nil
}

conn, err := connectors.GetConnector(ctx, req.Peer)
if err != nil {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("peer type is missing or "+
"your requested configuration for %s peer %s was invalidated: %s",
req.Peer.Type, req.Peer.Name, err),
}, nil
}

status := conn.ConnectionActive()
if !status {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("failed to establish active connection to %s peer %s.",
req.Peer.Type, req.Peer.Name),
}, nil
}

return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_VALID,
Message: fmt.Sprintf("%s peer %s is valid",
req.Peer.Type, req.Peer.Name),
}, nil
}

func (h *FlowRequestHandler) CreatePeer(
ctx context.Context,
req *protos.CreatePeerRequest,
) (*protos.CreatePeerResponse, error) {
status, validateErr := h.ValidatePeer(ctx, &protos.ValidatePeerRequest{Peer: req.Peer})
if validateErr != nil {
return nil, validateErr
}
if status.Status != protos.ValidatePeerStatus_VALID {
return &protos.CreatePeerResponse{
Status: protos.CreatePeerStatus_FAILED,
Message: status.Message,
}, nil
}

config := req.Peer.Config
wrongConfigResponse := &protos.CreatePeerResponse{
Status: protos.CreatePeerStatus_FAILED,
Message: fmt.Sprintf("invalid config for %s peer %s",
req.Peer.Type, req.Peer.Name),
}
var encodedConfig []byte
var encodingErr error
peerType := req.Peer.Type
switch peerType {
case protos.DBType_POSTGRES:
pgConfigObject, ok := config.(*protos.Peer_PostgresConfig)
if !ok {
return wrongConfigResponse, nil
}
pgConfig := pgConfigObject.PostgresConfig

encodedConfig, encodingErr = proto.Marshal(pgConfig)

case protos.DBType_SNOWFLAKE:
sfConfigObject, ok := config.(*protos.Peer_SnowflakeConfig)
if !ok {
return wrongConfigResponse, nil
}
sfConfig := sfConfigObject.SnowflakeConfig
encodedConfig, encodingErr = proto.Marshal(sfConfig)

default:
return wrongConfigResponse, nil
}
if encodingErr != nil {
log.Errorf("failed to encode peer configuration for %s peer %s : %v",
req.Peer.Type, req.Peer.Name, encodingErr)
return nil, encodingErr
}

_, err := h.pool.Exec(ctx, "INSERT INTO peers (name, type, options) VALUES ($1, $2, $3)",
req.Peer.Name, peerType, encodedConfig,
)
if err != nil {
return &protos.CreatePeerResponse{
Status: protos.CreatePeerStatus_FAILED,
Message: fmt.Sprintf("failed to insert into peers table for %s peer %s: %s",
req.Peer.Type, req.Peer.Name, err.Error()),
}, nil
}

return &protos.CreatePeerResponse{
Status: protos.CreatePeerStatus_CREATED,
Message: "",
}, nil
}
35 changes: 35 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connectors
import (
"context"
"errors"
"fmt"

connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub"
Expand Down Expand Up @@ -190,6 +191,40 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon
}
}

func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
inner := peer.Type
switch inner {
case protos.DBType_POSTGRES:
pgConfig := peer.GetPostgresConfig()

if pgConfig == nil {
return nil, fmt.Errorf("missing postgres config for %s peer %s", peer.Type.String(), peer.Name)
}
return connpostgres.NewPostgresConnector(ctx, pgConfig)
case protos.DBType_BIGQUERY:
bqConfig := peer.GetBigqueryConfig()
if bqConfig == nil {
return nil, fmt.Errorf("missing bigquery config for %s peer %s", peer.Type.String(), peer.Name)
}
return connbigquery.NewBigQueryConnector(ctx, bqConfig)

case protos.DBType_SNOWFLAKE:
sfConfig := peer.GetSnowflakeConfig()
if sfConfig == nil {
return nil, fmt.Errorf("missing snowflake config for %s peer %s", peer.Type.String(), peer.Name)
}
return connsnowflake.NewSnowflakeConnector(ctx, sfConfig)
// case protos.DBType_S3:
// return conns3.NewS3Connector(ctx, config.GetS3Config())
// case protos.DBType_EVENTHUB:
// return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig())
// case protos.DBType_SQLSERVER:
// return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig())
default:
return nil, fmt.Errorf("unsupported peer type %s", peer.Type.String())
}
}

func GetQRepConsolidateConnector(ctx context.Context,
config *protos.Peer) (QRepConsolidateConnector, error) {
inner := config.Config
Expand Down
Loading

0 comments on commit bdba9ae

Please sign in to comment.