diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 89b25004b7..b5cf8e8973 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -180,12 +180,17 @@ func (a *FlowableActivity) handleSlotInfo( return err } + deploymentUIDPrefix := "" + if peerdbenv.GetPeerDBDeploymentUID() != "" { + deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.GetPeerDBDeploymentUID()) + } + slotLagInMBThreshold := peerdbenv.GetPeerDBSlotLagMBAlertThreshold() if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) { a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName), - fmt.Sprintf(`Slot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB! + fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB! cc: `, - slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb)) + deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb)) } // Also handles alerts for PeerDB user connections exceeding a given limit here @@ -195,12 +200,12 @@ cc: `, slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err)) return err } - if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= uint64(maxOpenConnectionsThreshold)) { + if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) { a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName), - fmt.Sprintf(`Open connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+ + fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+ ` has exceeded threshold size of %d connections, currently at %d connections! cc: `, - res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections)) + deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections)) } if len(slotInfo) != 0 { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index f92db963da..a46005fe01 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -910,6 +910,6 @@ func (c *PostgresConnector) GetOpenConnectionsForUser() (*protos.GetOpenConnecti return &protos.GetOpenConnectionsForUserResult{ UserName: c.config.User, - CurrentOpenConnections: uint64(result.Int64), + CurrentOpenConnections: result.Int64, }, nil } diff --git a/flow/generated/protos/flow.pb.go b/flow/generated/protos/flow.pb.go index 7fb79043ee..27d2bb5c04 100644 --- a/flow/generated/protos/flow.pb.go +++ b/flow/generated/protos/flow.pb.go @@ -3371,7 +3371,7 @@ type GetOpenConnectionsForUserResult struct { unknownFields protoimpl.UnknownFields UserName string `protobuf:"bytes,1,opt,name=user_name,json=userName,proto3" json:"user_name,omitempty"` - CurrentOpenConnections uint64 `protobuf:"varint,2,opt,name=current_open_connections,json=currentOpenConnections,proto3" json:"current_open_connections,omitempty"` + CurrentOpenConnections int64 `protobuf:"varint,2,opt,name=current_open_connections,json=currentOpenConnections,proto3" json:"current_open_connections,omitempty"` } func (x *GetOpenConnectionsForUserResult) Reset() { @@ -3413,7 +3413,7 @@ func (x *GetOpenConnectionsForUserResult) GetUserName() string { return "" } -func (x *GetOpenConnectionsForUserResult) GetCurrentOpenConnections() uint64 { +func (x *GetOpenConnectionsForUserResult) GetCurrentOpenConnections() int64 { if x != nil { return x.CurrentOpenConnections } @@ -4117,7 +4117,7 @@ var file_flow_proto_rawDesc = []byte{ 0x75, 0x73, 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x75, 0x73, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x38, 0x0a, 0x18, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x6f, 0x70, 0x65, 0x6e, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x16, 0x63, 0x75, 0x72, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x16, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x4f, 0x70, 0x65, 0x6e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2a, 0x50, 0x0a, 0x0c, 0x51, 0x52, 0x65, 0x70, 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, diff --git a/nexus/pt/src/peerdb_flow.rs b/nexus/pt/src/peerdb_flow.rs index 9f860b0e54..a58fcbb95d 100644 --- a/nexus/pt/src/peerdb_flow.rs +++ b/nexus/pt/src/peerdb_flow.rs @@ -560,8 +560,8 @@ pub struct PeerDbColumns { pub struct GetOpenConnectionsForUserResult { #[prost(string, tag="1")] pub user_name: ::prost::alloc::string::String, - #[prost(uint64, tag="2")] - pub current_open_connections: u64, + #[prost(int64, tag="2")] + pub current_open_connections: i64, } /// protos for qrep #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] diff --git a/protos/flow.proto b/protos/flow.proto index 45a43f4d30..430968bcba 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -376,5 +376,5 @@ message PeerDBColumns { message GetOpenConnectionsForUserResult { string user_name = 1; - uint64 current_open_connections = 2; + int64 current_open_connections = 2; } \ No newline at end of file diff --git a/ui/grpc_generated/flow.ts b/ui/grpc_generated/flow.ts index 2b659d7c03..ba881795ed 100644 --- a/ui/grpc_generated/flow.ts +++ b/ui/grpc_generated/flow.ts @@ -6401,7 +6401,7 @@ export const GetOpenConnectionsForUserResult = { writer.uint32(10).string(message.userName); } if (message.currentOpenConnections !== 0) { - writer.uint32(16).uint64(message.currentOpenConnections); + writer.uint32(16).int64(message.currentOpenConnections); } return writer; }, @@ -6425,7 +6425,7 @@ export const GetOpenConnectionsForUserResult = { break; } - message.currentOpenConnections = longToNumber(reader.uint64() as Long); + message.currentOpenConnections = longToNumber(reader.int64() as Long); continue; } if ((tag & 7) === 4 || tag === 0) {