Skip to content

Commit

Permalink
track wal_status in slot info
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 9, 2023
1 parent 77cd516 commit debb3f1
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 207 deletions.
7 changes: 5 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er
if slotName != "" {
specificSlotClause = fmt.Sprintf(" WHERE slot_name = '%s'", slotName)
}
rows, err := c.pool.Query(c.ctx, "SELECT slot_name, redo_lsn::Text,restart_lsn::text,confirmed_flush_lsn::text,active,"+
rows, err := c.pool.Query(c.ctx, "SELECT slot_name, redo_lsn::Text,restart_lsn::text,wal_status,"+
"confirmed_flush_lsn::text,active,"+
"round((pg_current_wal_lsn() - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind"+
" FROM pg_control_checkpoint(), pg_replication_slots"+specificSlotClause+";")
if err != nil {
Expand All @@ -217,17 +218,19 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er
var redoLSN string
var slotName string
var restartLSN string
var walStatus string
var confirmedFlushLSN string
var active bool
var lagInMB float32
err := rows.Scan(&slotName, &redoLSN, &restartLSN, &confirmedFlushLSN, &active, &lagInMB)
err := rows.Scan(&slotName, &redoLSN, &restartLSN, &walStatus, &confirmedFlushLSN, &active, &lagInMB)
if err != nil {
return nil, err
}

slotInfoRows = append(slotInfoRows, &protos.SlotInfo{
RedoLSN: redoLSN,
RestartLSN: restartLSN,
WalStatus: walStatus,
ConfirmedFlushLSN: confirmedFlushLSN,
SlotName: slotName,
Active: active,
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/utils/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,15 @@ func (c *CatalogMirrorMonitor) AppendSlotSizeInfo(

_, err := c.catalogConn.Exec(ctx,
"INSERT INTO peerdb_stats.peer_slot_size"+
"(peer_name, slot_name, restart_lsn, redo_lsn, confirmed_flush_lsn, slot_size) "+
"(peer_name, slot_name, restart_lsn, redo_lsn, confirmed_flush_lsn, slot_size, wal_status) "+
"VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING;",
peerName,
slotInfo.SlotName,
slotInfo.RestartLSN,
slotInfo.RedoLSN,
slotInfo.ConfirmedFlushLSN,
slotInfo.LagInMb,
slotInfo.WalStatus,
)
if err != nil {
return fmt.Errorf("error while upserting row for slot_size: %w", err)
Expand Down
416 changes: 213 additions & 203 deletions flow/generated/protos/route.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions nexus/catalog/migrations/V13__wal_status.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE peerdb_stats.peer_slot_size
ADD COLUMN wal_status TEXT;
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ pub struct SlotInfo {
pub lag_in_mb: f32,
#[prost(string, tag="6")]
pub confirmed_flush_l_sn: ::prost::alloc::string::String,
#[prost(string, tag="7")]
pub wal_status: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_route.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3213,6 +3213,9 @@ impl serde::Serialize for SlotInfo {
if !self.confirmed_flush_l_sn.is_empty() {
len += 1;
}
if !self.wal_status.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_route.SlotInfo", len)?;
if !self.slot_name.is_empty() {
struct_ser.serialize_field("slotName", &self.slot_name)?;
Expand All @@ -3232,6 +3235,9 @@ impl serde::Serialize for SlotInfo {
if !self.confirmed_flush_l_sn.is_empty() {
struct_ser.serialize_field("confirmedFlushLSN", &self.confirmed_flush_l_sn)?;
}
if !self.wal_status.is_empty() {
struct_ser.serialize_field("walStatus", &self.wal_status)?;
}
struct_ser.end()
}
}
Expand All @@ -3253,6 +3259,8 @@ impl<'de> serde::Deserialize<'de> for SlotInfo {
"lagInMb",
"confirmed_flush_lSN",
"confirmedFlushLSN",
"wal_status",
"walStatus",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -3263,6 +3271,7 @@ impl<'de> serde::Deserialize<'de> for SlotInfo {
Active,
LagInMb,
ConfirmedFlushLSn,
WalStatus,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand Down Expand Up @@ -3291,6 +3300,7 @@ impl<'de> serde::Deserialize<'de> for SlotInfo {
"active" => Ok(GeneratedField::Active),
"lagInMb" | "lag_in_mb" => Ok(GeneratedField::LagInMb),
"confirmedFlushLSN" | "confirmed_flush_lSN" => Ok(GeneratedField::ConfirmedFlushLSn),
"walStatus" | "wal_status" => Ok(GeneratedField::WalStatus),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -3316,6 +3326,7 @@ impl<'de> serde::Deserialize<'de> for SlotInfo {
let mut active__ = None;
let mut lag_in_mb__ = None;
let mut confirmed_flush_l_sn__ = None;
let mut wal_status__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::SlotName => {
Expand Down Expand Up @@ -3356,6 +3367,12 @@ impl<'de> serde::Deserialize<'de> for SlotInfo {
}
confirmed_flush_l_sn__ = Some(map.next_value()?);
}
GeneratedField::WalStatus => {
if wal_status__.is_some() {
return Err(serde::de::Error::duplicate_field("walStatus"));
}
wal_status__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -3368,6 +3385,7 @@ impl<'de> serde::Deserialize<'de> for SlotInfo {
active: active__.unwrap_or_default(),
lag_in_mb: lag_in_mb__.unwrap_or_default(),
confirmed_flush_l_sn: confirmed_flush_l_sn__.unwrap_or_default(),
wal_status: wal_status__.unwrap_or_default(),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ message SlotInfo {
bool active = 4;
float lag_in_mb = 5;
string confirmed_flush_lSN = 6;
string wal_status = 7;
}

message StatInfo {
Expand Down
18 changes: 17 additions & 1 deletion ui/grpc_generated/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ export interface SlotInfo {
active: boolean;
lagInMb: number;
confirmedFlushLSN: string;
walStatus: string;
}

export interface StatInfo {
Expand Down Expand Up @@ -1984,7 +1985,7 @@ export const PostgresPeerActivityInfoRequest = {
};

function createBaseSlotInfo(): SlotInfo {
return { slotName: "", redoLSN: "", restartLSN: "", active: false, lagInMb: 0, confirmedFlushLSN: "" };
return { slotName: "", redoLSN: "", restartLSN: "", active: false, lagInMb: 0, confirmedFlushLSN: "", walStatus: "" };
}

export const SlotInfo = {
Expand All @@ -2007,6 +2008,9 @@ export const SlotInfo = {
if (message.confirmedFlushLSN !== "") {
writer.uint32(50).string(message.confirmedFlushLSN);
}
if (message.walStatus !== "") {
writer.uint32(58).string(message.walStatus);
}
return writer;
},

Expand Down Expand Up @@ -2059,6 +2063,13 @@ export const SlotInfo = {

message.confirmedFlushLSN = reader.string();
continue;
case 7:
if (tag !== 58) {
break;
}

message.walStatus = reader.string();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand All @@ -2076,6 +2087,7 @@ export const SlotInfo = {
active: isSet(object.active) ? Boolean(object.active) : false,
lagInMb: isSet(object.lagInMb) ? Number(object.lagInMb) : 0,
confirmedFlushLSN: isSet(object.confirmedFlushLSN) ? String(object.confirmedFlushLSN) : "",
walStatus: isSet(object.walStatus) ? String(object.walStatus) : "",
};
},

Expand All @@ -2099,6 +2111,9 @@ export const SlotInfo = {
if (message.confirmedFlushLSN !== "") {
obj.confirmedFlushLSN = message.confirmedFlushLSN;
}
if (message.walStatus !== "") {
obj.walStatus = message.walStatus;
}
return obj;
},

Expand All @@ -2113,6 +2128,7 @@ export const SlotInfo = {
message.active = object.active ?? false;
message.lagInMb = object.lagInMb ?? 0;
message.confirmedFlushLSN = object.confirmedFlushLSN ?? "";
message.walStatus = object.walStatus ?? "";
return message;
},
};
Expand Down

0 comments on commit debb3f1

Please sign in to comment.