Skip to content

Commit

Permalink
improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Oct 26, 2023
1 parent f72ac27 commit f9251f0
Show file tree
Hide file tree
Showing 14 changed files with 632 additions and 368 deletions.
35 changes: 22 additions & 13 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"google.golang.org/protobuf/proto"
)

func (h *FlowRequestHandler) GetPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, error) {
func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, error) {
var pgPeerOptions sql.RawBytes
var pgPeerConfig protos.PostgresConfig
err := h.pool.QueryRow(ctx,
Expand All @@ -34,14 +34,14 @@ func (h *FlowRequestHandler) GetPoolForPGPeer(ctx context.Context, peerName stri

func (h *FlowRequestHandler) GetSlotInfo(
ctx context.Context,
req *protos.PeerDataRequest,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerSlotResponse, error) {
peerPool, err := h.GetPoolForPGPeer(ctx, req.PeerName)
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerSlotResponse{SlotData: nil}, err
}
defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT slot_name, redo_lsn::Text,restart_lsn::text,"+
rows, err := peerPool.Query(ctx, "SELECT slot_name, redo_lsn::Text,restart_lsn::text,active,"+
"round((redo_lsn-restart_lsn) / 1024 / 1024 , 2) AS MB_Behind"+
" FROM pg_control_checkpoint(), pg_replication_slots;")
if err != nil {
Expand All @@ -53,8 +53,9 @@ func (h *FlowRequestHandler) GetSlotInfo(
var redoLSN string
var slotName string
var restartLSN string
var active bool
var lagInMB float32
err := rows.Scan(&slotName, &redoLSN, &restartLSN, &lagInMB)
err := rows.Scan(&slotName, &redoLSN, &restartLSN, &active, &lagInMB)
if err != nil {
return &protos.PeerSlotResponse{SlotData: nil}, err
}
Expand All @@ -63,6 +64,7 @@ func (h *FlowRequestHandler) GetSlotInfo(
RedoLSN: redoLSN,
RestartLSN: restartLSN,
SlotName: slotName,
Active: active,
LagInMb: lagInMB,
})
}
Expand All @@ -73,34 +75,41 @@ func (h *FlowRequestHandler) GetSlotInfo(

func (h *FlowRequestHandler) GetStatInfo(
ctx context.Context,
req *protos.PeerDataRequest,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerStatResponse, error) {
peerPool, err := h.GetPoolForPGPeer(ctx, req.PeerName)
peerPool, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerStatResponse{StatData: nil}, err
}
defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT pid, query, EXTRACT(epoch FROM(now()-query_start)) AS dur"+
" FROM pg_stat_activity WHERE query_start IS NOT NULL;")
rows, err := peerPool.Query(ctx, "SELECT pid, wait_event, wait_event_type, query_start::text, query,"+
"EXTRACT(epoch FROM(now()-query_start)) AS dur"+
" FROM pg_stat_activity WHERE usename='peerdb_user' AND state != 'idle' AND query_start IS NOT NULL;")
if err != nil {
return &protos.PeerStatResponse{StatData: nil}, err
}
defer rows.Close()
var statInfoRows []*protos.StatInfo
for rows.Next() {
var pid int64
var waitEvent string
var waitEventType string
var queryStart string
var query string
var duration float32

err := rows.Scan(&pid, &query, &duration)
err := rows.Scan(&pid, &waitEvent, &waitEventType, &queryStart, &query, &duration)
if err != nil {
return &protos.PeerStatResponse{StatData: nil}, err
}

statInfoRows = append(statInfoRows, &protos.StatInfo{
Pid: pid,
Query: query,
Duration: duration,
Pid: pid,
WaitEvent: waitEvent,
WaitEventType: waitEventType,
QueryStart: queryStart,
Query: query,
Duration: duration,
})
}
return &protos.PeerStatResponse{
Expand Down
391 changes: 217 additions & 174 deletions flow/generated/protos/route.pb.go

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions flow/generated/protos/route.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 12 additions & 12 deletions flow/generated/protos/route_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions nexus/pt/src/peerdb_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub struct CdcSyncStatus {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PeerDataRequest {
pub struct PostgresPeerActivityInfoRequest {
#[prost(string, tag="1")]
pub peer_name: ::prost::alloc::string::String,
}
Expand All @@ -132,7 +132,9 @@ pub struct SlotInfo {
pub redo_l_sn: ::prost::alloc::string::String,
#[prost(string, tag="3")]
pub restart_l_sn: ::prost::alloc::string::String,
#[prost(float, tag="4")]
#[prost(bool, tag="4")]
pub active: bool,
#[prost(float, tag="5")]
pub lag_in_mb: f32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand All @@ -141,8 +143,14 @@ pub struct StatInfo {
#[prost(int64, tag="1")]
pub pid: i64,
#[prost(string, tag="2")]
pub wait_event: ::prost::alloc::string::String,
#[prost(string, tag="3")]
pub wait_event_type: ::prost::alloc::string::String,
#[prost(string, tag="4")]
pub query_start: ::prost::alloc::string::String,
#[prost(string, tag="5")]
pub query: ::prost::alloc::string::String,
#[prost(float, tag="3")]
#[prost(float, tag="6")]
pub duration: f32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down
Loading

0 comments on commit f9251f0

Please sign in to comment.