Skip to content

Commit

Permalink
fix per things
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Dec 12, 2023
1 parent 952970a commit f2c2901
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 13 deletions.
43 changes: 30 additions & 13 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -146,36 +147,36 @@ func (h *FlowRequestHandler) GetColumns(

defer peerPool.Close()
rows, err := peerPool.Query(ctx, `
SELECT
SELECT
cols.column_name,
cols.data_type,
CASE
CASE
WHEN constraint_type = 'PRIMARY KEY' THEN true
ELSE false
END AS is_primary_key
FROM
FROM
information_schema.columns cols
LEFT JOIN
LEFT JOIN
(
SELECT
SELECT
kcu.column_name,
tc.constraint_type
FROM
FROM
information_schema.key_column_usage kcu
JOIN
JOIN
information_schema.table_constraints tc
ON
ON
kcu.constraint_name = tc.constraint_name
AND kcu.constraint_schema = tc.constraint_schema
AND kcu.constraint_name = tc.constraint_name
WHERE
WHERE
tc.constraint_type = 'PRIMARY KEY'
AND kcu.table_schema = $1
AND kcu.table_name = $2
) AS pk
ON
ON
cols.column_name = pk.column_name
WHERE
WHERE
cols.table_schema = $3
AND cols.table_name = $4;
`, req.SchemaName, req.TableName, req.SchemaName, req.TableName)
Expand Down Expand Up @@ -210,14 +211,17 @@ func (h *FlowRequestHandler) GetSlotInfo(

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
if err != nil {
logrus.Errorf("Failed to create postgres connector: %v", err)
return &protos.PeerSlotResponse{SlotData: nil}, err
}
defer pgConnector.Close()

slotInfo, err := pgConnector.GetSlotInfo("")
if err != nil {
logrus.Errorf("Failed to get slot info: %v", err)
return &protos.PeerSlotResponse{SlotData: nil}, err
}

return &protos.PeerSlotResponse{
SlotData: slotInfo,
}, nil
Expand All @@ -227,16 +231,27 @@ func (h *FlowRequestHandler) GetStatInfo(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerStatResponse, error) {
peerPool, peerUser, err := h.getPoolForPGPeer(ctx, req.PeerName)
pgConfig, err := h.getPGPeerConfig(ctx, req.PeerName)
if err != nil {
return &protos.PeerStatResponse{StatData: nil}, err
}
defer peerPool.Close()

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
if err != nil {
logrus.Errorf("Failed to create postgres connector: %v", err)
return &protos.PeerStatResponse{StatData: nil}, err
}
defer pgConnector.Close()

peerPool := pgConnector.GetPool()
peerUser := pgConfig.User

rows, err := peerPool.Query(ctx, "SELECT pid, wait_event, wait_event_type, query_start::text, query,"+
"EXTRACT(epoch FROM(now()-query_start)) AS dur"+
" FROM pg_stat_activity WHERE "+
"usename=$1 AND state != 'idle';", peerUser)
if err != nil {
logrus.Errorf("Failed to get stat info: %v", err)
return &protos.PeerStatResponse{StatData: nil}, err
}
defer rows.Close()
Expand All @@ -251,6 +266,7 @@ func (h *FlowRequestHandler) GetStatInfo(

err := rows.Scan(&pid, &waitEvent, &waitEventType, &queryStart, &query, &duration)
if err != nil {
logrus.Errorf("Failed to scan row: %v", err)
return &protos.PeerStatResponse{StatData: nil}, err
}

Expand Down Expand Up @@ -288,6 +304,7 @@ func (h *FlowRequestHandler) GetStatInfo(
Duration: float32(d),
})
}

return &protos.PeerStatResponse{
StatData: statInfoRows,
}, nil
Expand Down
5 changes: 5 additions & 0 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
}, nil
}

// GetPool returns the connection pool.
func (c *PostgresConnector) GetPool() *SSHWrappedPostgresPool {
return c.pool
}

// Close closes all connections.
func (c *PostgresConnector) Close() error {
if c.pool != nil {
Expand Down
5 changes: 5 additions & 0 deletions flow/connectors/postgres/ssh_wrapped_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ func (swpp *SSHWrappedPostgresPool) connect() error {
logrus.Errorf("Failed to create pool: %v", err)
}
})

if err == nil {
logrus.Info("Successfully connected to Postgres")
}

return err
}

Expand Down

0 comments on commit f2c2901

Please sign in to comment.