Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic UI for PeerDB (only connectors page is wired) #377

Merged
merged 14 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ nexus/target
thirdparty/**/*
thirdparty

ui/node_modules/**/*
ui/node_modules
ui/.next/**/*
ui/.next

.git
.gitignore
.github
Expand Down
5 changes: 5 additions & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,8 @@ plugins:
out: nexus/pt/src
opt:
- ignore_unknown_fields=true
- plugin: buf.build/community/stephenh-ts-proto:v1.156.8
out: ui/grpc_generated
opt:
- esModuleInterop=true
- outputServices=grpc-js
18 changes: 15 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ x-flow-worker-env: &flow-worker-env
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-""}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-""}
AWS_REGION: ${AWS_REGION:-""}
# enables worker profiling using Go's pprof
# enables worker profiling using Go's pprof
ENABLE_PROFILING: "true"
# enables exporting of mirror metrics to Prometheus for visualization using Grafana
# enables exporting of mirror metrics to Prometheus for visualization using Grafana
ENABLE_METRICS: "true"
# enables exporting of mirror metrics to Catalog in the PEERDB_STATS schema.
# enables exporting of mirror metrics to Catalog in the PEERDB_STATS schema.
ENABLE_STATS: "true"

services:
Expand Down Expand Up @@ -96,6 +96,7 @@ services:
ports:
- 8112:8112
environment:
<<: [*catalog-config]
TEMPORAL_HOST_PORT: temporal:7233
depends_on:
temporal-admin-tools:
Expand Down Expand Up @@ -211,6 +212,17 @@ services:
- multi-metrics
- metrics

peerdb_ui:
container_name: peerdb_ui
build:
context: .
dockerfile: stacks/ui.Dockerfile
ports:
- 3000:3000
environment:
<<: *catalog-config
PEERDB_FLOW_SERVER_ADDRESS: flow_api:8112

volumes:
pgdata:
prometheusdata:
10 changes: 9 additions & 1 deletion flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net"

utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
Expand Down Expand Up @@ -32,7 +33,14 @@ func APIMain(args *APIServerParams) error {
}

grpcServer := grpc.NewServer()
flowHandler := NewFlowRequestHandler(tc)

catalogConn, err := utils.GetCatalogConnectionPoolFromEnv()
if err != nil {
return fmt.Errorf("unable to get catalog connection pool: %w", err)
}

flowHandler := NewFlowRequestHandler(tc, catalogConn)
defer flowHandler.Close()

protos.RegisterFlowServiceServer(grpcServer, flowHandler)
grpc_health_v1.RegisterHealthServer(grpcServer, health.NewServer())
Expand Down
104 changes: 103 additions & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,30 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/client"
"google.golang.org/protobuf/proto"
)

// grpc server implementation
type FlowRequestHandler struct {
temporalClient client.Client
pool *pgxpool.Pool
protos.UnimplementedFlowServiceServer
}

func NewFlowRequestHandler(temporalClient client.Client) *FlowRequestHandler {
func NewFlowRequestHandler(temporalClient client.Client, pool *pgxpool.Pool) *FlowRequestHandler {
return &FlowRequestHandler{
temporalClient: temporalClient,
pool: pool,
}
}

// Close closes the connection pool
func (h *FlowRequestHandler) Close() {
if h.pool != nil {
h.pool.Close()
}
}

Expand Down Expand Up @@ -130,3 +142,93 @@ func (h *FlowRequestHandler) ShutdownFlow(
Ok: true,
}, nil
}

func (h *FlowRequestHandler) ListPeers(
ctx context.Context,
req *protos.ListPeersRequest,
) (*protos.ListPeersResponse, error) {
rows, err := h.pool.Query(ctx, "SELECT * FROM peers")
if err != nil {
return nil, fmt.Errorf("unable to query peers: %w", err)
}
defer rows.Close()

peers := []*protos.Peer{}
for rows.Next() {
var id int
var name string
var peerType int
var options []byte
if err := rows.Scan(&id, &name, &peerType, &options); err != nil {
return nil, fmt.Errorf("unable to scan peer row: %w", err)
}

dbtype := protos.DBType(peerType)
var peer *protos.Peer
switch dbtype {
case protos.DBType_POSTGRES:
var pgOptions protos.PostgresConfig
err := proto.Unmarshal(options, &pgOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal postgres options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_PostgresConfig{PostgresConfig: &pgOptions},
}
case protos.DBType_BIGQUERY:
var bqOptions protos.BigqueryConfig
err := proto.Unmarshal(options, &bqOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal bigquery options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_BigqueryConfig{BigqueryConfig: &bqOptions},
}
case protos.DBType_SNOWFLAKE:
var sfOptions protos.SnowflakeConfig
err := proto.Unmarshal(options, &sfOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal snowflake options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_SnowflakeConfig{SnowflakeConfig: &sfOptions},
}
case protos.DBType_EVENTHUB:
var ehOptions protos.EventHubConfig
err := proto.Unmarshal(options, &ehOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal eventhub options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_EventhubConfig{EventhubConfig: &ehOptions},
}
case protos.DBType_SQLSERVER:
var ssOptions protos.SqlServerConfig
err := proto.Unmarshal(options, &ssOptions)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal sqlserver options: %w", err)
}
peer = &protos.Peer{
Name: name,
Type: dbtype,
Config: &protos.Peer_SqlserverConfig{SqlserverConfig: &ssOptions},
}
default:
log.Errorf("unsupported peer type for peer '%s': %v", name, dbtype)
}

peers = append(peers, peer)
}

return &protos.ListPeersResponse{
Peers: peers,
}, nil
}
51 changes: 4 additions & 47 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
package main

import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"runtime"
"strconv"
"syscall"
"time"

//nolint:gosec
_ "net/http/pprof"

"github.com/PeerDB-io/peer-flow/activities"
"github.com/PeerDB-io/peer-flow/connectors/utils"
utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/uber-go/tally/v4"
"github.com/uber-go/tally/v4/prometheus"

Expand Down Expand Up @@ -88,15 +84,11 @@ func WorkerMain(opts *WorkerOptions) error {

catalogMirrorMonitor := monitoring.NewCatalogMirrorMonitor(nil)
if opts.EnableMonitoring {
catalogConnectionString, err := genCatalogConnectionString()
conn, err := utils.GetCatalogConnectionPoolFromEnv()
if err != nil {
log.Fatal(err)
return fmt.Errorf("unable to create catalog connection pool: %w", err)
}
catalogConn, err := pgxpool.New(context.Background(), catalogConnectionString)
if err != nil {
return fmt.Errorf("unable to establish connection with catalog: %w", err)
}
catalogMirrorMonitor = monitoring.NewCatalogMirrorMonitor(catalogConn)
catalogMirrorMonitor = monitoring.NewCatalogMirrorMonitor(conn)
}
defer catalogMirrorMonitor.Close()

Expand Down Expand Up @@ -153,38 +145,3 @@ func newPrometheusScope(c prometheus.Configuration) tally.Scope {
log.Println("prometheus metrics scope created")
return scope
}

func genCatalogConnectionString() (string, error) {
host, ok := os.LookupEnv("PEERDB_CATALOG_HOST")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_HOST is not set")
}
portStr, ok := os.LookupEnv("PEERDB_CATALOG_PORT")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_PORT is not set")
}
port, err := strconv.ParseUint(portStr, 10, 32)
if err != nil {
return "", fmt.Errorf("unable to parse PEERDB_CATALOG_PORT as unsigned integer")
}
user, ok := os.LookupEnv("PEERDB_CATALOG_USER")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_USER is not set")
}
password, ok := os.LookupEnv("PEERDB_CATALOG_PASSWORD")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_PASSWORD is not set")
}
database, ok := os.LookupEnv("PEERDB_CATALOG_DATABASE")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_DATABASE is not set")
}

return utils.GetPGConnectionString(&protos.PostgresConfig{
Host: host,
Port: uint32(port),
User: user,
Password: password,
Database: database,
}), nil
}
61 changes: 61 additions & 0 deletions flow/connectors/utils/catalog/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package utils

import (
"context"
"fmt"
"os"
"strconv"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/jackc/pgx/v5/pgxpool"
)

func GetCatalogConnectionPoolFromEnv() (*pgxpool.Pool, error) {
catalogConnectionString, err := genCatalogConnectionString()
if err != nil {
return nil, fmt.Errorf("unable to generate catalog connection string: %w", err)
}

catalogConn, err := pgxpool.New(context.Background(), catalogConnectionString)
if err != nil {
return nil, fmt.Errorf("unable to establish connection with catalog: %w", err)
}

return catalogConn, nil
}

func genCatalogConnectionString() (string, error) {
host, ok := os.LookupEnv("PEERDB_CATALOG_HOST")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_HOST is not set")
}
portStr, ok := os.LookupEnv("PEERDB_CATALOG_PORT")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_PORT is not set")
}
port, err := strconv.ParseUint(portStr, 10, 32)
if err != nil {
return "", fmt.Errorf("unable to parse PEERDB_CATALOG_PORT as unsigned integer")
}
user, ok := os.LookupEnv("PEERDB_CATALOG_USER")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_USER is not set")
}
password, ok := os.LookupEnv("PEERDB_CATALOG_PASSWORD")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_PASSWORD is not set")
}
database, ok := os.LookupEnv("PEERDB_CATALOG_DATABASE")
if !ok {
return "", fmt.Errorf("PEERDB_CATALOG_DATABASE is not set")
}

return utils.GetPGConnectionString(&protos.PostgresConfig{
Host: host,
Port: uint32(port),
User: user,
Password: password,
Database: database,
}), nil
}
Loading
Loading