-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create Peer APIs #445
Create Peer APIs #445
Changes from 3 commits
7e5639a
58fae1b
bbe7c0c
adbb3e8
46b0cb9
beda19b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ import ( | |
"fmt" | ||
"time" | ||
|
||
"github.com/PeerDB-io/peer-flow/connectors" | ||
"github.com/PeerDB-io/peer-flow/generated/protos" | ||
"github.com/PeerDB-io/peer-flow/shared" | ||
peerflow "github.com/PeerDB-io/peer-flow/workflows" | ||
|
@@ -275,3 +276,99 @@ func (h *FlowRequestHandler) ListPeers( | |
Peers: peers, | ||
}, nil | ||
} | ||
|
||
func (h *FlowRequestHandler) ValidatePeer( | ||
ctx context.Context, | ||
req *protos.ValidatePeerRequest, | ||
) (*protos.ValidatePeerResponse, error) { | ||
if req.Peer == nil { | ||
return &protos.ValidatePeerResponse{ | ||
Status: protos.ValidatePeerStatus_INVALID, | ||
Message: "no peer provided", | ||
}, nil | ||
} | ||
|
||
if len(req.Peer.Name) == 0 { | ||
return &protos.ValidatePeerResponse{ | ||
Status: protos.ValidatePeerStatus_INVALID, | ||
Message: "no peer name provided", | ||
}, nil | ||
} | ||
|
||
conn, err := connectors.GetConnector(ctx, req.Peer) | ||
if err != nil { | ||
return &protos.ValidatePeerResponse{ | ||
Status: protos.ValidatePeerStatus_INVALID, | ||
Message: fmt.Sprintf("invalid peer: %s", err), | ||
}, nil | ||
} | ||
|
||
status := conn.ConnectionActive() | ||
if !status { | ||
return &protos.ValidatePeerResponse{ | ||
Status: protos.ValidatePeerStatus_INVALID, | ||
Message: fmt.Sprintf("failed to establish connection to peer: %s", err), | ||
}, nil | ||
} | ||
|
||
return &protos.ValidatePeerResponse{ | ||
Status: protos.ValidatePeerStatus_VALID, | ||
Message: "valid peer", | ||
}, nil | ||
} | ||
|
||
func (h *FlowRequestHandler) CreatePeer( | ||
ctx context.Context, | ||
req *protos.CreatePeerRequest, | ||
) (*protos.CreatePeerResponse, error) { | ||
status, validateErr := h.ValidatePeer(ctx, &protos.ValidatePeerRequest{Peer: req.Peer}) | ||
if validateErr != nil { | ||
return nil, validateErr | ||
} | ||
if status.Status != protos.ValidatePeerStatus_VALID { | ||
return &protos.CreatePeerResponse{ | ||
Status: protos.CreatePeerStatus_FAILED, | ||
Message: status.Message, | ||
}, nil | ||
} | ||
|
||
config := req.Peer.Config | ||
wrongConfigResponse := &protos.CreatePeerResponse{ | ||
Status: protos.CreatePeerStatus_FAILED, | ||
Message: "wrong config for connector", | ||
} | ||
var encodedConfig []byte | ||
var encodingErr error | ||
peerType := req.Peer.Type | ||
switch peerType { | ||
case protos.DBType_POSTGRES: | ||
pgConfig := config.(*protos.Peer_PostgresConfig).PostgresConfig | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets do |
||
encodedConfig, encodingErr = proto.Marshal(pgConfig) | ||
|
||
case protos.DBType_SNOWFLAKE: | ||
sfConfig := config.(*protos.Peer_SnowflakeConfig).SnowflakeConfig | ||
encodedConfig, encodingErr = proto.Marshal(sfConfig) | ||
|
||
default: | ||
return wrongConfigResponse, nil | ||
} | ||
if encodingErr != nil { | ||
log.Errorf("failed to encode peer config: %v", encodingErr) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for every error, add some info about the request that resulted in this response |
||
return nil, encodingErr | ||
} | ||
|
||
_, err := h.pool.Exec(ctx, "INSERT INTO peers (name, type, options) VALUES ($1, $2, $3)", | ||
req.Peer.Name, peerType, encodedConfig, | ||
) | ||
if err != nil { | ||
return &protos.CreatePeerResponse{ | ||
Status: protos.CreatePeerStatus_FAILED, | ||
Message: err.Error(), | ||
}, nil | ||
} | ||
|
||
return &protos.CreatePeerResponse{ | ||
Status: protos.CreatePeerStatus_CREATED, | ||
Message: "", | ||
}, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ import ( | |
) | ||
|
||
var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality") | ||
var ErrWrongConfig = errors.New("wrong config for connector") | ||
|
||
type Connector interface { | ||
Close() error | ||
|
@@ -190,6 +191,39 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon | |
} | ||
} | ||
|
||
func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) { | ||
inner := config.Type | ||
switch inner { | ||
case protos.DBType_POSTGRES: | ||
pgConfig := config.GetPostgresConfig() | ||
if pgConfig == nil { | ||
return nil, ErrWrongConfig | ||
} | ||
return connpostgres.NewPostgresConnector(ctx, pgConfig) | ||
case protos.DBType_BIGQUERY: | ||
bqConfig := config.GetBigqueryConfig() | ||
if bqConfig == nil { | ||
return nil, ErrWrongConfig | ||
} | ||
return connbigquery.NewBigQueryConnector(ctx, bqConfig) | ||
|
||
case protos.DBType_SNOWFLAKE: | ||
sfConfig := config.GetSnowflakeConfig() | ||
if sfConfig == nil { | ||
return nil, ErrWrongConfig | ||
} | ||
return connsnowflake.NewSnowflakeConnector(ctx, sfConfig) | ||
// case protos.DBType_S3: | ||
// return conns3.NewS3Connector(ctx, config.GetS3Config()) | ||
// case protos.DBType_EVENTHUB: | ||
// return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig()) | ||
// case protos.DBType_SQLSERVER: | ||
// return conneventhub.NewEventHubConnector(ctx, config.GetEventhubConfig()) | ||
default: | ||
return nil, ErrUnsupportedFunctionality | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you return a more detailed error with the info of the Peer and that it isn't supported? |
||
} | ||
} | ||
|
||
func GetQRepConsolidateConnector(ctx context.Context, | ||
config *protos.Peer) (QRepConsolidateConnector, error) { | ||
inner := config.Config | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more detailed response