Skip to content

Commit

Permalink
Basic UI for PeerDB (only connectors page is wired) (#377)
Browse files Browse the repository at this point in the history
- This brings the various UI elements to our lib
- Also wires up the gRPC from FE to BE.
- Connectors page is wired, but lots of other work left.
- Serves as a prototype for other pages.
  • Loading branch information
iskakaushik authored Sep 7, 2023
1 parent 40a454a commit c1d153a
Show file tree
Hide file tree
Showing 234 changed files with 25,961 additions and 100 deletions.
5 changes: 5 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ nexus/target
thirdparty/**/*
thirdparty

ui/node_modules/**/*
ui/node_modules
ui/.next/**/*
ui/.next

.git
.gitignore
.github
Expand Down
5 changes: 5 additions & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,8 @@ plugins:
out: nexus/pt/src
opt:
- ignore_unknown_fields=true
- plugin: buf.build/community/stephenh-ts-proto:v1.156.8
out: ui/grpc_generated
opt:
- esModuleInterop=true
- outputServices=grpc-js
18 changes: 15 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ x-flow-worker-env: &flow-worker-env
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-""}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-""}
AWS_REGION: ${AWS_REGION:-""}
# enables worker profiling using Go's pprof
# enables worker profiling using Go's pprof
ENABLE_PROFILING: "true"
# enables exporting of mirror metrics to Prometheus for visualization using Grafana
# enables exporting of mirror metrics to Prometheus for visualization using Grafana
ENABLE_METRICS: "true"
# enables exporting of mirror metrics to Catalog in the PEERDB_STATS schema.
# enables exporting of mirror metrics to Catalog in the PEERDB_STATS schema.
ENABLE_STATS: "true"

services:
Expand Down Expand Up @@ -96,6 +96,7 @@ services:
ports:
- 8112:8112
environment:
<<: [*catalog-config]
TEMPORAL_HOST_PORT: temporal:7233
depends_on:
temporal-admin-tools:
Expand Down Expand Up @@ -211,6 +212,17 @@ services:
- multi-metrics
- metrics

peerdb_ui:
container_name: peerdb_ui
build:
context: .
dockerfile: stacks/ui.Dockerfile
ports:
- 3000:3000
environment:
<<: *catalog-config
PEERDB_FLOW_SERVER_ADDRESS: flow_api:8112

volumes:
pgdata:
prometheusdata:
10 changes: 9 additions & 1 deletion flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net"

utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
Expand Down Expand Up @@ -32,7 +33,14 @@ func APIMain(args *APIServerParams) error {
}

grpcServer := grpc.NewServer()
flowHandler := NewFlowRequestHandler(tc)

catalogConn, err := utils.GetCatalogConnectionPoolFromEnv()
if err != nil {
return fmt.Errorf("unable to get catalog connection pool: %w", err)
}

flowHandler := NewFlowRequestHandler(tc, catalogConn)
defer flowHandler.Close()

protos.RegisterFlowServiceServer(grpcServer, flowHandler)
grpc_health_v1.RegisterHealthServer(grpcServer, health.NewServer())
Expand Down
104 changes: 103 additions & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,30 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/client"
"google.golang.org/protobuf/proto"
)

// grpc server implementation
type FlowRequestHandler struct {
temporalClient client.Client
pool *pgxpool.Pool
protos.UnimplementedFlowServiceServer
}

func NewFlowRequestHandler(temporalClient client.Client) *FlowRequestHandler {
func NewFlowRequestHandler(temporalClient client.Client, pool *pgxpool.Pool) *FlowRequestHandler {
return &FlowRequestHandler{
temporalClient: temporalClient,
pool: pool,
}
}

// Close closes the connection pool
func (h *FlowRequestHandler) Close() {
if h.pool != nil {
h.pool.Close()
}
}

Expand Down Expand Up @@ -130,3 +142,93 @@ func (h *FlowRequestHandler) ShutdownFlow(
Ok: true,
}, nil
}

func (h *FlowRequestHandler) ListPeers(
ctx context.Context,
req *protos.ListPeersRequest,
) (*protos.ListPeersResponse, error) {
rows, err := h.pool.Query(ctx, "SELECT * FROM peers")
if err != nil {
return nil, fmt.Errorf("unable to query peers: %w", err)
}
defer rows.Close()

peers := []*protos.Peer{}
for rows.Next() {
var id int
var name string
var peerType int
var options []byte
if err := rows.Scan(&id, &name, &peerType, &options); err != nil {
return nil, fmt.Errorf("unable to scan peer row: %w", err)
}

dbtype := protos.DBType(peerType)
var peer *protos.Peer
switch dbtype {
case protos.DBType_POSTGRES:
var pgOptions protos.PostgresConfig
err := proto.Unmarshal(options, &pgOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal postgres options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_PostgresConfig{PostgresConfig: &pgOptions},
}
case protos.DBType_BIGQUERY:
var bqOptions protos.BigqueryConfig
err := proto.Unmarshal(options, &bqOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal bigquery options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_BigqueryConfig{BigqueryConfig: &bqOptions},
}
case protos.DBType_SNOWFLAKE:
var sfOptions protos.SnowflakeConfig
err := proto.Unmarshal(options, &sfOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal snowflake options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_SnowflakeConfig{SnowflakeConfig: &sfOptions},
}
case protos.DBType_EVENTHUB:
var ehOptions protos.EventHubConfig
err := proto.Unmarshal(options, &ehOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal eventhub options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_EventhubConfig{EventhubConfig: &ehOptions},
}
case protos.DBType_SQLSERVER:
var ssOptions protos.SqlServerConfig
err := proto.Unmarshal(options, &ssOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal sqlserver options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_SqlserverConfig{SqlserverConfig: &ssOptions},
}
default:
log.Errorf("unsupported peer type for peer '%s': %v", name, dbtype)
}

peers = append(peers, peer)
}

return &protos.ListPeersResponse{
Peers: peers,
}, nil
}
51 changes: 4 additions & 47 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
package main

import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"runtime"
"strconv"
"syscall"
"time"

//nolint:gosec
_ "net/http/pprof"

"github.com/PeerDB-io/peer-flow/activities"
"github.com/PeerDB-io/peer-flow/connectors/utils"
utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/uber-go/tally/v4"
"github.com/uber-go/tally/v4/prometheus"

Expand Down Expand Up @@ -88,15 +84,11 @@ func WorkerMain(opts *WorkerOptions) error {

catalogMirrorMonitor := monitoring.NewCatalogMirrorMonitor(nil)
if opts.EnableMonitoring {
catalogConnectionString, err := genCatalogConnectionString()
conn, err := utils.GetCatalogConnectionPoolFromEnv()
if err != nil {
log.Fatal(err)
return fmt.Errorf("unable to create catalog connection pool: %w", err)
}
catalogConn, err := pgxpool.New(context.Background(), catalogConnectionString)
if err != nil {
return fmt.Errorf("unable to establish connection with catalog: %w", err)
}
catalogMirrorMonitor = monitoring.NewCatalogMirrorMonitor(catalogConn)
catalogMirrorMonitor = monitoring.NewCatalogMirrorMonitor(conn)
}
defer catalogMirrorMonitor.Close()

Expand Down Expand Up @@ -153,38 +145,3 @@ func newPrometheusScope(c prometheus.Configuration) tally.Scope {
log.Println("prometheus metrics scope created")
return scope
}

func genCatalogConnectionString() (string, error) {
host, ok := os.LookupEnv("PEERDB_CATALOG_HOST")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_HOST is not set")
}
portStr, ok := os.LookupEnv("PEERDB_CATALOG_PORT")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_PORT is not set")
}
port, err := strconv.ParseUint(portStr, 10, 32)
if err != nil {
return "", fmt.Errorf("unable to parse PEERDB_CATALOG_PORT as unsigned integer")
}
user, ok := os.LookupEnv("PEERDB_CATALOG_USER")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_USER is not set")
}
password, ok := os.LookupEnv("PEERDB_CATALOG_PASSWORD")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_PASSWORD is not set")
}
database, ok := os.LookupEnv("PEERDB_CATALOG_DATABASE")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_DATABASE is not set")
}

return utils.GetPGConnectionString(&protos.PostgresConfig{
Host: host,
Port: uint32(port),
User: user,
Password: password,
Database: database,
}), nil
}
61 changes: 61 additions & 0 deletions flow/connectors/utils/catalog/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package utils

import (
"context"
"fmt"
"os"
"strconv"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/jackc/pgx/v5/pgxpool"
)

func GetCatalogConnectionPoolFromEnv() (*pgxpool.Pool, error) {
catalogConnectionString, err := genCatalogConnectionString()
if err != nil {
return nil, fmt.Errorf("unable to generate catalog connection string: %w", err)
}

catalogConn, err := pgxpool.New(context.Background(), catalogConnectionString)
if err != nil {
return nil, fmt.Errorf("unable to establish connection with catalog: %w", err)
}

return catalogConn, nil
}

func genCatalogConnectionString() (string, error) {
host, ok := os.LookupEnv("PEERDB_CATALOG_HOST")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_HOST is not set")
}
portStr, ok := os.LookupEnv("PEERDB_CATALOG_PORT")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_PORT is not set")
}
port, err := strconv.ParseUint(portStr, 10, 32)
if err != nil {
return "", fmt.Errorf("unable to parse PEERDB_CATALOG_PORT as unsigned integer")
}
user, ok := os.LookupEnv("PEERDB_CATALOG_USER")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_USER is not set")
}
password, ok := os.LookupEnv("PEERDB_CATALOG_PASSWORD")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_PASSWORD is not set")
}
database, ok := os.LookupEnv("PEERDB_CATALOG_DATABASE")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_DATABASE is not set")
}

return utils.GetPGConnectionString(&protos.PostgresConfig{
Host: host,
Port: uint32(port),
User: user,
Password: password,
Database: database,
}), nil
}
Loading

0 comments on commit c1d153a

Please sign in to comment.