From f2c2901cdb7d456b1c58e35866ff901c8f8c4892 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 12 Dec 2023 09:38:10 -0500 Subject: [PATCH] fix per things --- flow/cmd/peer_data.go | 43 ++++++++++++++------ flow/connectors/postgres/postgres.go | 5 +++ flow/connectors/postgres/ssh_wrapped_pool.go | 5 +++ 3 files changed, 40 insertions(+), 13 deletions(-) diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index 31ef2a298..1b9ced3a3 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -10,6 +10,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" + "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" ) @@ -146,36 +147,36 @@ func (h *FlowRequestHandler) GetColumns( defer peerPool.Close() rows, err := peerPool.Query(ctx, ` - SELECT + SELECT cols.column_name, cols.data_type, - CASE + CASE WHEN constraint_type = 'PRIMARY KEY' THEN true ELSE false END AS is_primary_key - FROM + FROM information_schema.columns cols - LEFT JOIN + LEFT JOIN ( - SELECT + SELECT kcu.column_name, tc.constraint_type - FROM + FROM information_schema.key_column_usage kcu - JOIN + JOIN information_schema.table_constraints tc - ON + ON kcu.constraint_name = tc.constraint_name AND kcu.constraint_schema = tc.constraint_schema AND kcu.constraint_name = tc.constraint_name - WHERE + WHERE tc.constraint_type = 'PRIMARY KEY' AND kcu.table_schema = $1 AND kcu.table_name = $2 ) AS pk - ON + ON cols.column_name = pk.column_name - WHERE + WHERE cols.table_schema = $3 AND cols.table_name = $4; `, req.SchemaName, req.TableName, req.SchemaName, req.TableName) @@ -210,14 +211,17 @@ func (h *FlowRequestHandler) GetSlotInfo( pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig) if err != nil { + logrus.Errorf("Failed to create postgres connector: %v", err) return &protos.PeerSlotResponse{SlotData: nil}, err } defer pgConnector.Close() slotInfo, err := pgConnector.GetSlotInfo("") if err != nil { + logrus.Errorf("Failed to get slot info: %v", err) return &protos.PeerSlotResponse{SlotData: nil}, err } + return &protos.PeerSlotResponse{ SlotData: slotInfo, }, nil @@ -227,16 +231,27 @@ func (h *FlowRequestHandler) GetStatInfo( ctx context.Context, req *protos.PostgresPeerActivityInfoRequest, ) (*protos.PeerStatResponse, error) { - peerPool, peerUser, err := h.getPoolForPGPeer(ctx, req.PeerName) + pgConfig, err := h.getPGPeerConfig(ctx, req.PeerName) if err != nil { return &protos.PeerStatResponse{StatData: nil}, err } - defer peerPool.Close() + + pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig) + if err != nil { + logrus.Errorf("Failed to create postgres connector: %v", err) + return &protos.PeerStatResponse{StatData: nil}, err + } + defer pgConnector.Close() + + peerPool := pgConnector.GetPool() + peerUser := pgConfig.User + rows, err := peerPool.Query(ctx, "SELECT pid, wait_event, wait_event_type, query_start::text, query,"+ "EXTRACT(epoch FROM(now()-query_start)) AS dur"+ " FROM pg_stat_activity WHERE "+ "usename=$1 AND state != 'idle';", peerUser) if err != nil { + logrus.Errorf("Failed to get stat info: %v", err) return &protos.PeerStatResponse{StatData: nil}, err } defer rows.Close() @@ -251,6 +266,7 @@ func (h *FlowRequestHandler) GetStatInfo( err := rows.Scan(&pid, &waitEvent, &waitEventType, &queryStart, &query, &duration) if err != nil { + logrus.Errorf("Failed to scan row: %v", err) return &protos.PeerStatResponse{StatData: nil}, err } @@ -288,6 +304,7 @@ func (h *FlowRequestHandler) GetStatInfo( Duration: float32(d), }) } + return &protos.PeerStatResponse{ StatData: statInfoRows, }, nil diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index e1c0a111c..2bf7fb7d0 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -94,6 +94,11 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) }, nil } +// GetPool returns the connection pool. +func (c *PostgresConnector) GetPool() *SSHWrappedPostgresPool { + return c.pool +} + // Close closes all connections. func (c *PostgresConnector) Close() error { if c.pool != nil { diff --git a/flow/connectors/postgres/ssh_wrapped_pool.go b/flow/connectors/postgres/ssh_wrapped_pool.go index d9fbde5bf..ef19f78b3 100644 --- a/flow/connectors/postgres/ssh_wrapped_pool.go +++ b/flow/connectors/postgres/ssh_wrapped_pool.go @@ -97,6 +97,11 @@ func (swpp *SSHWrappedPostgresPool) connect() error { logrus.Errorf("Failed to create pool: %v", err) } }) + + if err == nil { + logrus.Info("Successfully connected to Postgres") + } + return err }