Skip to content

Commit

Permalink
rebase for ssh tunnel
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 12, 2023
1 parent ecb7660 commit 76bf54e
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 18 deletions.
12 changes: 6 additions & 6 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"database/sql"
"fmt"
"log/slog"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -211,14 +211,14 @@ func (h *FlowRequestHandler) GetSlotInfo(

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
if err != nil {
logrus.Errorf("Failed to create postgres connector: %v", err)
slog.Error("Failed to create postgres connector", slog.Any("error", err))
return &protos.PeerSlotResponse{SlotData: nil}, err
}
defer pgConnector.Close()

slotInfo, err := pgConnector.GetSlotInfo("")
if err != nil {
logrus.Errorf("Failed to get slot info: %v", err)
slog.Error("Failed to get slot info", slog.Any("error", err))
return &protos.PeerSlotResponse{SlotData: nil}, err
}

Expand All @@ -238,7 +238,7 @@ func (h *FlowRequestHandler) GetStatInfo(

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
if err != nil {
logrus.Errorf("Failed to create postgres connector: %v", err)
slog.Error("Failed to create postgres connector", slog.Any("error", err))
return &protos.PeerStatResponse{StatData: nil}, err
}
defer pgConnector.Close()
Expand All @@ -251,7 +251,7 @@ func (h *FlowRequestHandler) GetStatInfo(
" FROM pg_stat_activity WHERE "+
"usename=$1 AND state != 'idle';", peerUser)
if err != nil {
logrus.Errorf("Failed to get stat info: %v", err)
slog.Error("Failed to get stat info", slog.Any("error", err))
return &protos.PeerStatResponse{StatData: nil}, err
}
defer rows.Close()
Expand All @@ -266,7 +266,7 @@ func (h *FlowRequestHandler) GetStatInfo(

err := rows.Scan(&pid, &waitEvent, &waitEventType, &queryStart, &query, &duration)
if err != nil {
logrus.Errorf("Failed to scan row: %v", err)
slog.Error("Failed to scan row", slog.Any("error", err))
return &protos.PeerStatResponse{StatData: nil}, err
}

Expand Down
22 changes: 11 additions & 11 deletions flow/connectors/postgres/ssh_wrapped_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package connpostgres
import (
"context"
"fmt"
"log/slog"
"net"
"sync"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
)

Expand Down Expand Up @@ -45,7 +45,7 @@ func NewSSHWrappedPostgresPool(
sshConfig.PrivateKey,
)
if err != nil {
logrus.Error("Failed to get SSH client config: ", err)
slog.Error("Failed to get SSH client config", slog.Any("error", err))
cancel()
return nil, err
}
Expand Down Expand Up @@ -77,41 +77,41 @@ func (swpp *SSHWrappedPostgresPool) connect() error {

swpp.Pool, err = pgxpool.NewWithConfig(swpp.ctx, swpp.poolConfig)
if err != nil {
logrus.Errorf("Failed to create pool: %v", err)
slog.Error("Failed to create pool:", slog.Any("error", err))
return
}

logrus.Infof("Established pool to %s:%d",
swpp.poolConfig.ConnConfig.Host, swpp.poolConfig.ConnConfig.Port)
slog.Info(fmt.Sprintf("Established pool to %s:%d",
swpp.poolConfig.ConnConfig.Host, swpp.poolConfig.ConnConfig.Port))

err = retryWithBackoff(func() error {
err = swpp.Ping(swpp.ctx)
if err != nil {
logrus.Errorf("Failed to ping pool: %v", err)
slog.Error("Failed to ping pool", slog.Any("error", err))
return err
}
return nil
}, 5, 5*time.Second)

if err != nil {
logrus.Errorf("Failed to create pool: %v", err)
slog.Error("Failed to create pool", slog.Any("error", err))
}
})

if err == nil {
logrus.Info("Successfully connected to Postgres")
slog.Info("Successfully connected to Postgres")
}

return err
}

func (swpp *SSHWrappedPostgresPool) setupSSH() error {
if swpp.sshConfig == nil {
logrus.Info("SSH config is nil, skipping SSH setup")
slog.Info("SSH config is nil, skipping SSH setup")
return nil
}

logrus.Info("Setting up SSH connection to ", swpp.sshServer)
slog.Info("Setting up SSH connection to " + swpp.sshServer)

var err error
swpp.sshClient, err = ssh.Dial("tcp", swpp.sshServer, swpp.sshConfig)
Expand Down Expand Up @@ -151,7 +151,7 @@ func retryWithBackoff(fn retryFunc, maxRetries int, backoff time.Duration) (err
return nil
}
if i < maxRetries-1 {
logrus.Infof("Attempt #%d failed, retrying in %s", i+1, backoff)
slog.Info(fmt.Sprintf("Attempt #%d failed, retrying in %s", i+1, backoff))
time.Sleep(backoff)
}
}
Expand Down
2 changes: 1 addition & 1 deletion flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ require (
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/crypto v0.16.0
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.19.0 // indirect
Expand Down

0 comments on commit 76bf54e

Please sign in to comment.