Skip to content

Commit

Permalink
uses peer suer
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Oct 27, 2023
1 parent 461f6f1 commit 00d206a
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,33 @@ import (
"google.golang.org/protobuf/proto"
)

func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, error) {
func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, string, error) {
var pgPeerOptions sql.RawBytes
var pgPeerConfig protos.PostgresConfig
err := h.pool.QueryRow(ctx,
"SELECT options FROM peers WHERE name = $1 AND type=3", peerName).Scan(&pgPeerOptions)
if err != nil {
return nil, err
return nil, "", err
}

unmarshalErr := proto.Unmarshal(pgPeerOptions, &pgPeerConfig)
if err != nil {
return nil, unmarshalErr
return nil, "", unmarshalErr
}

connStr := utils.GetPGConnectionString(&pgPeerConfig)
peerPool, err := pgxpool.New(ctx, connStr)
if err != nil {
return nil, unmarshalErr
return nil, "", unmarshalErr
}
return peerPool, nil
return peerPool, pgPeerConfig.User, nil
}

func (h *FlowRequestHandler) GetSlotInfo(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerSlotResponse, error) {
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerSlotResponse{SlotData: nil}, err
}
Expand Down Expand Up @@ -77,14 +77,14 @@ func (h *FlowRequestHandler) GetStatInfo(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerStatResponse, error) {
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
peerPool, peerUser, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerStatResponse{StatData: nil}, err
}
defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT pid, wait_event, wait_event_type, query_start::text, query,"+
"EXTRACT(epoch FROM(now()-query_start)) AS dur"+
" FROM pg_stat_activity WHERE usename='peerdb_user' AND state != 'idle' AND query_start IS NOT NULL;")
" FROM pg_stat_activity WHERE usename=$1 AND state != 'idle' AND query_start IS NOT NULL;", peerUser)
if err != nil {
return &protos.PeerStatResponse{StatData: nil}, err
}
Expand Down

0 comments on commit 00d206a

Please sign in to comment.