Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Either go through gRPC gateway or use prisma #519

Merged
merged 1 commit into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ services:
- 3001:3000
environment:
<<: *catalog-config
PEERDB_FLOW_SERVER_ADDRESS: flow_api:8112
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113

Expand Down
90 changes: 0 additions & 90 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,96 +346,6 @@ func (h *FlowRequestHandler) handleWorkflowNotClosed(ctx context.Context, workfl
return nil
}

func (h *FlowRequestHandler) ListPeers(
ctx context.Context,
req *protos.ListPeersRequest,
) (*protos.ListPeersResponse, error) {
rows, err := h.pool.Query(ctx, "SELECT * FROM peers")
if err != nil {
return nil, fmt.Errorf("unable to query peers: %w", err)
}
defer rows.Close()

peers := []*protos.Peer{}
for rows.Next() {
var id int
var name string
var peerType int
var options []byte
if err := rows.Scan(&id, &name, &peerType, &options); err != nil {
return nil, fmt.Errorf("unable to scan peer row: %w", err)
}

dbtype := protos.DBType(peerType)
var peer *protos.Peer
switch dbtype {
case protos.DBType_POSTGRES:
var pgOptions protos.PostgresConfig
err := proto.Unmarshal(options, &pgOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal postgres options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_PostgresConfig{PostgresConfig: &pgOptions},
}
case protos.DBType_BIGQUERY:
var bqOptions protos.BigqueryConfig
err := proto.Unmarshal(options, &bqOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal bigquery options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_BigqueryConfig{BigqueryConfig: &bqOptions},
}
case protos.DBType_SNOWFLAKE:
var sfOptions protos.SnowflakeConfig
err := proto.Unmarshal(options, &sfOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal snowflake options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_SnowflakeConfig{SnowflakeConfig: &sfOptions},
}
case protos.DBType_EVENTHUB:
var ehOptions protos.EventHubConfig
err := proto.Unmarshal(options, &ehOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal eventhub options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_EventhubConfig{EventhubConfig: &ehOptions},
}
case protos.DBType_SQLSERVER:
var ssOptions protos.SqlServerConfig
err := proto.Unmarshal(options, &ssOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal sqlserver options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_SqlserverConfig{SqlserverConfig: &ssOptions},
}
default:
log.Errorf("unsupported peer type for peer '%s': %v", name, dbtype)
}

peers = append(peers, peer)
}

return &protos.ListPeersResponse{
Peers: peers,
}, nil
}

func (h *FlowRequestHandler) ValidatePeer(
ctx context.Context,
req *protos.ValidatePeerRequest,
Expand Down
Loading
Loading