Skip to content

Commit

Permalink
added deployment UID prefix and review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 21, 2023
1 parent 214fab9 commit f7cd2b5
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 14 deletions.
15 changes: 10 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,17 @@ func (a *FlowableActivity) handleSlotInfo(
return err
}

deploymentUIDPrefix := ""
if peerdbenv.GetPeerDBDeploymentUID() != "" {
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.GetPeerDBDeploymentUID())
}

slotLagInMBThreshold := peerdbenv.GetPeerDBSlotLagMBAlertThreshold()
if (slotLagInMBThreshold > 0) && (slotInfo[0].LagInMb >= float32(slotLagInMBThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-slot-lag-threshold-exceeded", peerName),
fmt.Sprintf(`Slot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
fmt.Sprintf(`%sSlot `+"`%s`"+` on peer `+"`%s`"+` has exceeded threshold size of %dMB, currently at %.2fMB!
cc: <!channel>`,
slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
deploymentUIDPrefix, slotName, peerName, slotLagInMBThreshold, slotInfo[0].LagInMb))
}

// Also handles alerts for PeerDB user connections exceeding a given limit here
Expand All @@ -195,12 +200,12 @@ cc: <!channel>`,
slog.WarnContext(ctx, "warning: failed to get current open connections", slog.Any("error", err))
return err
}
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= uint64(maxOpenConnectionsThreshold)) {
if (maxOpenConnectionsThreshold > 0) && (res.CurrentOpenConnections >= int64(maxOpenConnectionsThreshold)) {
a.Alerter.AlertIf(ctx, fmt.Sprintf("%s-max-open-connections-threshold-exceeded", peerName),
fmt.Sprintf(`Open connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
fmt.Sprintf(`%sOpen connections from PeerDB user `+"`%s`"+` on peer `+"`%s`"+
` has exceeded threshold size of %d connections, currently at %d connections!
cc: <!channel>`,
res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections))
deploymentUIDPrefix, res.UserName, peerName, maxOpenConnectionsThreshold, res.CurrentOpenConnections))
}

if len(slotInfo) != 0 {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,6 @@ func (c *PostgresConnector) GetOpenConnectionsForUser() (*protos.GetOpenConnecti

return &protos.GetOpenConnectionsForUserResult{
UserName: c.config.User,
CurrentOpenConnections: uint64(result.Int64),
CurrentOpenConnections: result.Int64,
}, nil
}
6 changes: 3 additions & 3 deletions flow/generated/protos/flow.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,8 @@ pub struct PeerDbColumns {
pub struct GetOpenConnectionsForUserResult {
#[prost(string, tag="1")]
pub user_name: ::prost::alloc::string::String,
#[prost(uint64, tag="2")]
pub current_open_connections: u64,
#[prost(int64, tag="2")]
pub current_open_connections: i64,
}
/// protos for qrep
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
Expand Down
2 changes: 1 addition & 1 deletion protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -376,5 +376,5 @@ message PeerDBColumns {

message GetOpenConnectionsForUserResult {
string user_name = 1;
uint64 current_open_connections = 2;
int64 current_open_connections = 2;
}
4 changes: 2 additions & 2 deletions ui/grpc_generated/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6401,7 +6401,7 @@ export const GetOpenConnectionsForUserResult = {
writer.uint32(10).string(message.userName);
}
if (message.currentOpenConnections !== 0) {
writer.uint32(16).uint64(message.currentOpenConnections);
writer.uint32(16).int64(message.currentOpenConnections);
}
return writer;
},
Expand All @@ -6425,7 +6425,7 @@ export const GetOpenConnectionsForUserResult = {
break;
}

message.currentOpenConnections = longToNumber(reader.uint64() as Long);
message.currentOpenConnections = longToNumber(reader.int64() as Long);
continue;
}
if ((tag & 7) === 4 || tag === 0) {
Expand Down

0 comments on commit f7cd2b5

Please sign in to comment.