diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 6b71e6b907..005328e169 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -160,10 +160,46 @@ func (a *FlowableActivity) CreateNormalizedTable( return conn.SetupNormalizedTables(config) } +func (a *FlowableActivity) recordSlotSizePeriodically( + ctx context.Context, + srcConn connectors.CDCPullConnector, + slotName string, + done <-chan struct{}, + peerName string, +) { + + timeout := 10 * time.Minute + ticker := time.NewTicker(timeout) + + defer ticker.Stop() + for { + slotInfo, err := srcConn.GetSlotInfo(slotName) + if err != nil { + log.Warnf("warning: failed to get slot info: %v", err) + } + + if len(slotInfo) == 0 { + continue + } + + select { + case <-ticker.C: + a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0]) + case <-done: + a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0]) + } + ticker.Stop() + ticker = time.NewTicker(timeout) + } + +} + // StartFlow implements StartFlow. func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlowInput) (*model.SyncResponse, error) { activity.RecordHeartbeat(ctx, "starting flow...") + done := make(chan struct{}) + defer close(done) conn := input.FlowConnectionConfigs ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor) @@ -205,6 +241,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } defer connectors.CloseConnector(srcConn) + slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", input.FlowConnectionConfigs.FlowJobName) + if input.FlowConnectionConfigs.ReplicationSlotName != "" { + slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName + } + + go a.recordSlotSizePeriodically(ctx, srcConn, slotNameForMetrics, done, input.FlowConnectionConfigs.Source.Name) // start a goroutine to pull records from the source errGroup.Go(func() error { return srcConn.PullRecords(&model.PullRecordsRequest{ @@ -266,7 +308,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() syncStartTime := time.Now() @@ -329,6 +371,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords) activity.RecordHeartbeat(ctx, pushedRecordsWithCount) + done <- struct{}{} return res, nil } @@ -364,7 +407,7 @@ func (a *FlowableActivity) StartNormalize( return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName) }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() log.Info("initializing table schema...") @@ -443,7 +486,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() partitions, err := srcConn.GetQRepPartitions(config, last) @@ -574,7 +617,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() res, err := dstConn.SyncQRepRecords(config, partition, stream) @@ -618,7 +661,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() err = dstConn.ConsolidateQRepPartitions(config) @@ -919,7 +962,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() res, err := dstConn.SyncQRepRecords(config, partition, stream) diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index a490674273..98a0025d56 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -21,7 +21,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(flowJobName string) error { } if s, ok := a.SnapshotConnections[flowJobName]; ok { - s.signal.CloneComplete <- true + s.signal.CloneComplete <- struct{}{} s.connector.Close() } diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index 8aecff6229..db20883053 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -5,27 +5,36 @@ import ( "database/sql" "fmt" + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/jackc/pgx/v5/pgxpool" "google.golang.org/protobuf/proto" ) -func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, string, error) { +func (h *FlowRequestHandler) getPGPeerConfig(ctx context.Context, peerName string) (*protos.PostgresConfig, error) { var pgPeerOptions sql.RawBytes var pgPeerConfig protos.PostgresConfig err := h.pool.QueryRow(ctx, "SELECT options FROM peers WHERE name = $1 AND type=3", peerName).Scan(&pgPeerOptions) if err != nil { - return nil, "", err + return nil, err } unmarshalErr := proto.Unmarshal(pgPeerOptions, &pgPeerConfig) if err != nil { - return nil, "", unmarshalErr + return nil, unmarshalErr } - connStr := utils.GetPGConnectionString(&pgPeerConfig) + return &pgPeerConfig, nil +} + +func (h *FlowRequestHandler) getPoolForPGPeer(ctx context.Context, peerName string) (*pgxpool.Pool, string, error) { + pgPeerConfig, err := h.getPGPeerConfig(ctx, peerName) + if err != nil { + return nil, "", err + } + connStr := utils.GetPGConnectionString(pgPeerConfig) peerPool, err := pgxpool.New(ctx, connStr) if err != nil { return nil, "", err @@ -193,40 +202,21 @@ func (h *FlowRequestHandler) GetSlotInfo( ctx context.Context, req *protos.PostgresPeerActivityInfoRequest, ) (*protos.PeerSlotResponse, error) { - peerPool, _, err := h.getPoolForPGPeer(ctx, req.PeerName) + pgConfig, err := h.getPGPeerConfig(ctx, req.PeerName) if err != nil { return &protos.PeerSlotResponse{SlotData: nil}, err } - defer peerPool.Close() - rows, err := peerPool.Query(ctx, "SELECT slot_name, redo_lsn::Text,restart_lsn::text,active,"+ - "round((redo_lsn-restart_lsn) / 1024 / 1024 , 2) AS MB_Behind"+ - " FROM pg_control_checkpoint(), pg_replication_slots;") + + pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig) if err != nil { return &protos.PeerSlotResponse{SlotData: nil}, err } - defer rows.Close() - var slotInfoRows []*protos.SlotInfo - for rows.Next() { - var redoLSN string - var slotName string - var restartLSN string - var active bool - var lagInMB float32 - err := rows.Scan(&slotName, &redoLSN, &restartLSN, &active, &lagInMB) - if err != nil { - return &protos.PeerSlotResponse{SlotData: nil}, err - } - - slotInfoRows = append(slotInfoRows, &protos.SlotInfo{ - RedoLSN: redoLSN, - RestartLSN: restartLSN, - SlotName: slotName, - Active: active, - LagInMb: lagInMB, - }) + slotInfo, err := pgConnector.GetSlotInfo("") + if err != nil { + return &protos.PeerSlotResponse{SlotData: nil}, err } return &protos.PeerSlotResponse{ - SlotData: slotInfoRows, + SlotData: slotInfo, }, nil } diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 5f395f3585..c31442ed2f 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -325,7 +325,7 @@ func (s *QRepAvroSyncMethod) writeToStage( }, ) defer func() { - shutdown <- true + shutdown <- struct{}{} }() var avroFile *avro.AvroFile diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 6c7fa6c9e8..70fc3d6bb7 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -45,6 +45,9 @@ type CDCPullConnector interface { // SendWALHeartbeat allows for activity to progress restart_lsn on postgres. SendWALHeartbeat() error + + // GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector. + GetSlotInfo(slotName string) ([]*protos.SlotInfo, error) } type CDCSyncConnector interface { diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 961fb979ff..896b7fd9a7 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -227,7 +227,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S ) }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() // if env var PEERDB_BETA_EVENTHUB_PUSH_ASYNC=true diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index fc0063c1ab..f0ef60f1b6 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -209,7 +209,7 @@ func (p *PostgresCDCSource) consumeStream( }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() standbyMessageTimeout := req.IdleTimeout diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index da3096bcb9..92a1b181c9 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -197,6 +197,46 @@ func (c *PostgresConnector) checkSlotAndPublication(slot string, publication str }, nil } +// GetSlotInfo gets the information about the replication slot size and LSNs +// If slotName input is empty, all slot info rows are returned - this is for UI. +// Else, only the row pertaining to that slotName will be returned. +func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, error) { + specificSlotClause := "" + if slotName != "" { + specificSlotClause = fmt.Sprintf(" WHERE slot_name = '%s'", slotName) + } + rows, err := c.pool.Query(c.ctx, "SELECT slot_name, redo_lsn::Text,restart_lsn::text,confirmed_flush_lsn::text,active,"+ + "round((pg_current_wal_lsn() - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind"+ + " FROM pg_control_checkpoint(), pg_replication_slots"+specificSlotClause+";") + if err != nil { + return nil, err + } + defer rows.Close() + var slotInfoRows []*protos.SlotInfo + for rows.Next() { + var redoLSN string + var slotName string + var restartLSN string + var confirmedFlushLSN string + var active bool + var lagInMB float32 + err := rows.Scan(&slotName, &redoLSN, &restartLSN, &confirmedFlushLSN, &active, &lagInMB) + if err != nil { + return nil, err + } + + slotInfoRows = append(slotInfoRows, &protos.SlotInfo{ + RedoLSN: redoLSN, + RestartLSN: restartLSN, + ConfirmedFlushLSN: confirmedFlushLSN, + SlotName: slotName, + Active: active, + LagInMb: lagInMB, + }) + } + return slotInfoRows, nil +} + // createSlotAndPublication creates the replication slot and publication. func (c *PostgresConnector) createSlotAndPublication( signal *SlotSignal, diff --git a/flow/connectors/postgres/postgres_repl_test.go b/flow/connectors/postgres/postgres_repl_test.go index 7e10d9a154..d3831fee63 100644 --- a/flow/connectors/postgres/postgres_repl_test.go +++ b/flow/connectors/postgres/postgres_repl_test.go @@ -127,7 +127,7 @@ func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { log.Infof("signaling clone complete for %s after waiting for 2 seconds", flowJobName) time.Sleep(2 * time.Second) - signal.CloneComplete <- true + signal.CloneComplete <- struct{}{} log.Infof("successfully setup replication for %s", flowJobName) } diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 85c0fb2cce..c8ea1bcadd 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -83,7 +83,7 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc }) defer func() { - shutdownCh <- true + shutdownCh <- struct{}{} }() } diff --git a/flow/connectors/postgres/slot_signal.go b/flow/connectors/postgres/slot_signal.go index e557049831..9660575591 100644 --- a/flow/connectors/postgres/slot_signal.go +++ b/flow/connectors/postgres/slot_signal.go @@ -11,13 +11,13 @@ type SlotCreationResult struct { // 2. CloneComplete - which can be waited on to ensure that the clone has completed. type SlotSignal struct { SlotCreated chan *SlotCreationResult - CloneComplete chan bool + CloneComplete chan struct{} } // NewSlotSignal returns a new SlotSignal. func NewSlotSignal() *SlotSignal { return &SlotSignal{ SlotCreated: make(chan *SlotCreationResult, 1), - CloneComplete: make(chan bool, 1), + CloneComplete: make(chan struct{}, 1), } } diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 920e01d1ca..59259e3a58 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -318,7 +318,7 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() if _, err := s.connector.database.Exec(putCmd); err != nil { diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 3c7fa5830c..1bb65d4317 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -135,7 +135,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) ( }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() } diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go index b16735b6bb..164c64e3cb 100644 --- a/flow/connectors/utils/heartbeat.go +++ b/flow/connectors/utils/heartbeat.go @@ -13,9 +13,9 @@ func HeartbeatRoutine( ctx context.Context, interval time.Duration, message func() string, -) chan bool { +) chan struct{} { counter := 1 - shutdown := make(chan bool) + shutdown := make(chan struct{}) go func() { for { msg := fmt.Sprintf("heartbeat #%d: %s", counter, message()) diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index 1502d04a0f..936aaa909c 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -246,6 +246,33 @@ func (c *CatalogMirrorMonitor) UpdateEndTimeForQRepRun(ctx context.Context, runU return nil } +func (c *CatalogMirrorMonitor) AppendSlotSizeInfo( + ctx context.Context, + peerName string, + slotInfo *protos.SlotInfo, +) error { + if c == nil || c.catalogConn == nil || slotInfo == nil { + return nil + } + + _, err := c.catalogConn.Exec(ctx, + "INSERT INTO peerdb_stats.peer_slot_size"+ + "(peer_name, slot_name, restart_lsn, redo_lsn, confirmed_flush_lsn, slot_size) "+ + "VALUES($1,$2,$3,$4,$5,$6) ON CONFLICT DO NOTHING;", + peerName, + slotInfo.SlotName, + slotInfo.RestartLSN, + slotInfo.RedoLSN, + slotInfo.ConfirmedFlushLSN, + slotInfo.LagInMb, + ) + if err != nil { + return fmt.Errorf("error while upserting row for slot_size: %w", err) + } + + return nil +} + func (c *CatalogMirrorMonitor) addPartitionToQRepRun(ctx context.Context, flowJobName string, runUUID string, partition *protos.QRepPartition) error { if c == nil || c.catalogConn == nil { diff --git a/flow/generated/protos/route.pb.go b/flow/generated/protos/route.pb.go index 48bed94377..4c551f3eec 100644 --- a/flow/generated/protos/route.pb.go +++ b/flow/generated/protos/route.pb.go @@ -1424,11 +1424,12 @@ type SlotInfo struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - SlotName string `protobuf:"bytes,1,opt,name=slot_name,json=slotName,proto3" json:"slot_name,omitempty"` - RedoLSN string `protobuf:"bytes,2,opt,name=redo_lSN,json=redoLSN,proto3" json:"redo_lSN,omitempty"` - RestartLSN string `protobuf:"bytes,3,opt,name=restart_lSN,json=restartLSN,proto3" json:"restart_lSN,omitempty"` - Active bool `protobuf:"varint,4,opt,name=active,proto3" json:"active,omitempty"` - LagInMb float32 `protobuf:"fixed32,5,opt,name=lag_in_mb,json=lagInMb,proto3" json:"lag_in_mb,omitempty"` + SlotName string `protobuf:"bytes,1,opt,name=slot_name,json=slotName,proto3" json:"slot_name,omitempty"` + RedoLSN string `protobuf:"bytes,2,opt,name=redo_lSN,json=redoLSN,proto3" json:"redo_lSN,omitempty"` + RestartLSN string `protobuf:"bytes,3,opt,name=restart_lSN,json=restartLSN,proto3" json:"restart_lSN,omitempty"` + Active bool `protobuf:"varint,4,opt,name=active,proto3" json:"active,omitempty"` + LagInMb float32 `protobuf:"fixed32,5,opt,name=lag_in_mb,json=lagInMb,proto3" json:"lag_in_mb,omitempty"` + ConfirmedFlushLSN string `protobuf:"bytes,6,opt,name=confirmed_flush_lSN,json=confirmedFlushLSN,proto3" json:"confirmed_flush_lSN,omitempty"` } func (x *SlotInfo) Reset() { @@ -1498,6 +1499,13 @@ func (x *SlotInfo) GetLagInMb() float32 { return 0 } +func (x *SlotInfo) GetConfirmedFlushLSN() string { + if x != nil { + return x.ConfirmedFlushLSN + } + return "" +} + type StatInfo struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -2239,7 +2247,7 @@ var file_route_proto_rawDesc = []byte{ 0x73, 0x50, 0x65, 0x65, 0x72, 0x41, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x70, 0x65, 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x65, 0x65, - 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x97, 0x01, 0x0a, 0x08, 0x53, 0x6c, 0x6f, 0x74, 0x49, 0x6e, + 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0xc7, 0x01, 0x0a, 0x08, 0x53, 0x6c, 0x6f, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x6c, 0x6f, 0x74, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x6c, 0x6f, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x72, 0x65, 0x64, 0x6f, 0x5f, 0x6c, 0x53, 0x4e, 0x18, 0x02, 0x20, 0x01, 0x28, @@ -2248,7 +2256,10 @@ var file_route_proto_rawDesc = []byte{ 0x0a, 0x72, 0x65, 0x73, 0x74, 0x61, 0x72, 0x74, 0x4c, 0x53, 0x4e, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x12, 0x1a, 0x0a, 0x09, 0x6c, 0x61, 0x67, 0x5f, 0x69, 0x6e, 0x5f, 0x6d, 0x62, - 0x18, 0x05, 0x20, 0x01, 0x28, 0x02, 0x52, 0x07, 0x6c, 0x61, 0x67, 0x49, 0x6e, 0x4d, 0x62, 0x22, + 0x18, 0x05, 0x20, 0x01, 0x28, 0x02, 0x52, 0x07, 0x6c, 0x61, 0x67, 0x49, 0x6e, 0x4d, 0x62, 0x12, + 0x2e, 0x0a, 0x13, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x72, 0x6d, 0x65, 0x64, 0x5f, 0x66, 0x6c, 0x75, + 0x73, 0x68, 0x5f, 0x6c, 0x53, 0x4e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x63, 0x6f, + 0x6e, 0x66, 0x69, 0x72, 0x6d, 0x65, 0x64, 0x46, 0x6c, 0x75, 0x73, 0x68, 0x4c, 0x53, 0x4e, 0x22, 0xb6, 0x01, 0x0a, 0x08, 0x53, 0x74, 0x61, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x77, 0x61, 0x69, 0x74, 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, diff --git a/nexus/catalog/migrations/V12__slot_size.sql b/nexus/catalog/migrations/V12__slot_size.sql new file mode 100644 index 0000000000..426bdb8bc2 --- /dev/null +++ b/nexus/catalog/migrations/V12__slot_size.sql @@ -0,0 +1,12 @@ +CREATE TABLE IF NOT EXISTS peerdb_stats.peer_slot_size ( + id SERIAL PRIMARY KEY, + slot_name TEXT NOT NULL, + peer_name TEXT NOT NULL, + redo_lsn TEXT, + restart_lsn TEXT, + confirmed_flush_lsn TEXT, + slot_size BIGINT, + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +CREATE INDEX index_slot_name ON peerdb_stats.peer_slot_size (slot_name); diff --git a/nexus/pt/src/peerdb_route.rs b/nexus/pt/src/peerdb_route.rs index f162fad0ef..688e80a09b 100644 --- a/nexus/pt/src/peerdb_route.rs +++ b/nexus/pt/src/peerdb_route.rs @@ -194,6 +194,8 @@ pub struct SlotInfo { pub active: bool, #[prost(float, tag="5")] pub lag_in_mb: f32, + #[prost(string, tag="6")] + pub confirmed_flush_l_sn: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/nexus/pt/src/peerdb_route.serde.rs b/nexus/pt/src/peerdb_route.serde.rs index 87b6bd8557..b6fdb0a436 100644 --- a/nexus/pt/src/peerdb_route.serde.rs +++ b/nexus/pt/src/peerdb_route.serde.rs @@ -3210,6 +3210,9 @@ impl serde::Serialize for SlotInfo { if self.lag_in_mb != 0. { len += 1; } + if !self.confirmed_flush_l_sn.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("peerdb_route.SlotInfo", len)?; if !self.slot_name.is_empty() { struct_ser.serialize_field("slotName", &self.slot_name)?; @@ -3226,6 +3229,9 @@ impl serde::Serialize for SlotInfo { if self.lag_in_mb != 0. { struct_ser.serialize_field("lagInMb", &self.lag_in_mb)?; } + if !self.confirmed_flush_l_sn.is_empty() { + struct_ser.serialize_field("confirmedFlushLSN", &self.confirmed_flush_l_sn)?; + } struct_ser.end() } } @@ -3245,6 +3251,8 @@ impl<'de> serde::Deserialize<'de> for SlotInfo { "active", "lag_in_mb", "lagInMb", + "confirmed_flush_lSN", + "confirmedFlushLSN", ]; #[allow(clippy::enum_variant_names)] @@ -3254,6 +3262,7 @@ impl<'de> serde::Deserialize<'de> for SlotInfo { RestartLSn, Active, LagInMb, + ConfirmedFlushLSn, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -3281,6 +3290,7 @@ impl<'de> serde::Deserialize<'de> for SlotInfo { "restartLSN" | "restart_lSN" => Ok(GeneratedField::RestartLSn), "active" => Ok(GeneratedField::Active), "lagInMb" | "lag_in_mb" => Ok(GeneratedField::LagInMb), + "confirmedFlushLSN" | "confirmed_flush_lSN" => Ok(GeneratedField::ConfirmedFlushLSn), _ => Ok(GeneratedField::__SkipField__), } } @@ -3305,6 +3315,7 @@ impl<'de> serde::Deserialize<'de> for SlotInfo { let mut restart_l_sn__ = None; let mut active__ = None; let mut lag_in_mb__ = None; + let mut confirmed_flush_l_sn__ = None; while let Some(k) = map.next_key()? { match k { GeneratedField::SlotName => { @@ -3339,6 +3350,12 @@ impl<'de> serde::Deserialize<'de> for SlotInfo { Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) ; } + GeneratedField::ConfirmedFlushLSn => { + if confirmed_flush_l_sn__.is_some() { + return Err(serde::de::Error::duplicate_field("confirmedFlushLSN")); + } + confirmed_flush_l_sn__ = Some(map.next_value()?); + } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; } @@ -3350,6 +3367,7 @@ impl<'de> serde::Deserialize<'de> for SlotInfo { restart_l_sn: restart_l_sn__.unwrap_or_default(), active: active__.unwrap_or_default(), lag_in_mb: lag_in_mb__.unwrap_or_default(), + confirmed_flush_l_sn: confirmed_flush_l_sn__.unwrap_or_default(), }) } } diff --git a/protos/route.proto b/protos/route.proto index db96121ca4..fad2a34391 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -141,6 +141,7 @@ message SlotInfo { string restart_lSN = 3; bool active = 4; float lag_in_mb = 5; + string confirmed_flush_lSN = 6; } message StatInfo { diff --git a/ui/grpc_generated/route.ts b/ui/grpc_generated/route.ts index 36b0e8a2d0..c9269c127c 100644 --- a/ui/grpc_generated/route.ts +++ b/ui/grpc_generated/route.ts @@ -262,6 +262,7 @@ export interface SlotInfo { restartLSN: string; active: boolean; lagInMb: number; + confirmedFlushLSN: string; } export interface StatInfo { @@ -1983,7 +1984,7 @@ export const PostgresPeerActivityInfoRequest = { }; function createBaseSlotInfo(): SlotInfo { - return { slotName: "", redoLSN: "", restartLSN: "", active: false, lagInMb: 0 }; + return { slotName: "", redoLSN: "", restartLSN: "", active: false, lagInMb: 0, confirmedFlushLSN: "" }; } export const SlotInfo = { @@ -2003,6 +2004,9 @@ export const SlotInfo = { if (message.lagInMb !== 0) { writer.uint32(45).float(message.lagInMb); } + if (message.confirmedFlushLSN !== "") { + writer.uint32(50).string(message.confirmedFlushLSN); + } return writer; }, @@ -2048,6 +2052,13 @@ export const SlotInfo = { message.lagInMb = reader.float(); continue; + case 6: + if (tag !== 50) { + break; + } + + message.confirmedFlushLSN = reader.string(); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -2064,6 +2075,7 @@ export const SlotInfo = { restartLSN: isSet(object.restartLSN) ? String(object.restartLSN) : "", active: isSet(object.active) ? Boolean(object.active) : false, lagInMb: isSet(object.lagInMb) ? Number(object.lagInMb) : 0, + confirmedFlushLSN: isSet(object.confirmedFlushLSN) ? String(object.confirmedFlushLSN) : "", }; }, @@ -2084,6 +2096,9 @@ export const SlotInfo = { if (message.lagInMb !== 0) { obj.lagInMb = message.lagInMb; } + if (message.confirmedFlushLSN !== "") { + obj.confirmedFlushLSN = message.confirmedFlushLSN; + } return obj; }, @@ -2097,6 +2112,7 @@ export const SlotInfo = { message.restartLSN = object.restartLSN ?? ""; message.active = object.active ?? false; message.lagInMb = object.lagInMb ?? 0; + message.confirmedFlushLSN = object.confirmedFlushLSN ?? ""; return message; }, };