Skip to content

Commit

Permalink
Either go through gRPC gateway or use prisma
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Oct 16, 2023
1 parent 11cbd26 commit 48f5146
Show file tree
Hide file tree
Showing 25 changed files with 624 additions and 1,024 deletions.
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ services:
- 3001:3000
environment:
<<: *catalog-config
PEERDB_FLOW_SERVER_ADDRESS: flow_api:8112
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113

Expand Down
90 changes: 0 additions & 90 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,96 +346,6 @@ func (h *FlowRequestHandler) handleWorkflowNotClosed(ctx context.Context, workfl
return nil
}

func (h *FlowRequestHandler) ListPeers(
ctx context.Context,
req *protos.ListPeersRequest,
) (*protos.ListPeersResponse, error) {
rows, err := h.pool.Query(ctx, "SELECT * FROM peers")
if err != nil {
return nil, fmt.Errorf("unable to query peers: %w", err)
}
defer rows.Close()

peers := []*protos.Peer{}
for rows.Next() {
var id int
var name string
var peerType int
var options []byte
if err := rows.Scan(&id, &name, &peerType, &options); err != nil {
return nil, fmt.Errorf("unable to scan peer row: %w", err)
}

dbtype := protos.DBType(peerType)
var peer *protos.Peer
switch dbtype {
case protos.DBType_POSTGRES:
var pgOptions protos.PostgresConfig
err := proto.Unmarshal(options, &pgOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal postgres options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_PostgresConfig{PostgresConfig: &pgOptions},
}
case protos.DBType_BIGQUERY:
var bqOptions protos.BigqueryConfig
err := proto.Unmarshal(options, &bqOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal bigquery options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_BigqueryConfig{BigqueryConfig: &bqOptions},
}
case protos.DBType_SNOWFLAKE:
var sfOptions protos.SnowflakeConfig
err := proto.Unmarshal(options, &sfOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal snowflake options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_SnowflakeConfig{SnowflakeConfig: &sfOptions},
}
case protos.DBType_EVENTHUB:
var ehOptions protos.EventHubConfig
err := proto.Unmarshal(options, &ehOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal eventhub options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_EventhubConfig{EventhubConfig: &ehOptions},
}
case protos.DBType_SQLSERVER:
var ssOptions protos.SqlServerConfig
err := proto.Unmarshal(options, &ssOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal sqlserver options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_SqlserverConfig{SqlserverConfig: &ssOptions},
}
default:
log.Errorf("unsupported peer type for peer '%s': %v", name, dbtype)
}

peers = append(peers, peer)
}

return &protos.ListPeersResponse{
Peers: peers,
}, nil
}

func (h *FlowRequestHandler) ValidatePeer(
ctx context.Context,
req *protos.ValidatePeerRequest,
Expand Down
Loading

0 comments on commit 48f5146

Please sign in to comment.