Skip to content

Commit

Permalink
add better error logging
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Dec 10, 2023
1 parent 77cd516 commit 753abc6
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 211 deletions.
47 changes: 31 additions & 16 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,36 +145,36 @@ func (h *FlowRequestHandler) GetColumns(

defer peerPool.Close()
rows, err := peerPool.Query(ctx, `
SELECT
SELECT
cols.column_name,
cols.data_type,
CASE
CASE
WHEN constraint_type = 'PRIMARY KEY' THEN true
ELSE false
END AS is_primary_key
FROM
FROM
information_schema.columns cols
LEFT JOIN
LEFT JOIN
(
SELECT
SELECT
kcu.column_name,
tc.constraint_type
FROM
FROM
information_schema.key_column_usage kcu
JOIN
JOIN
information_schema.table_constraints tc
ON
ON
kcu.constraint_name = tc.constraint_name
AND kcu.constraint_schema = tc.constraint_schema
AND kcu.constraint_name = tc.constraint_name
WHERE
WHERE
tc.constraint_type = 'PRIMARY KEY'
AND kcu.table_schema = $1
AND kcu.table_name = $2
) AS pk
ON
ON
cols.column_name = pk.column_name
WHERE
WHERE
cols.table_schema = $3
AND cols.table_name = $4;
`, req.SchemaName, req.TableName, req.SchemaName, req.TableName)
Expand Down Expand Up @@ -209,11 +209,17 @@ func (h *FlowRequestHandler) GetSlotInfo(

pgConnector, err := connpostgres.NewPostgresConnector(ctx, pgConfig)
if err != nil {
return &protos.PeerSlotResponse{SlotData: nil}, err
return &protos.PeerSlotResponse{SlotData: nil,
ErrorMessage: err.Error(),
}, err
}

slotInfo, err := pgConnector.GetSlotInfo("")
if err != nil {
return &protos.PeerSlotResponse{SlotData: nil}, err
return &protos.PeerSlotResponse{
SlotData: nil,
ErrorMessage: err.Error(),
}, err
}
return &protos.PeerSlotResponse{
SlotData: slotInfo,
Expand All @@ -226,15 +232,21 @@ func (h *FlowRequestHandler) GetStatInfo(
) (*protos.PeerStatResponse, error) {
peerPool, peerUser, err := h.getPoolForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerStatResponse{StatData: nil}, err
return &protos.PeerStatResponse{
StatData: nil,
ErrorMessage: err.Error(),
}, err
}
defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT pid, wait_event, wait_event_type, query_start::text, query,"+
"EXTRACT(epoch FROM(now()-query_start)) AS dur"+
" FROM pg_stat_activity WHERE "+
"usename=$1 AND state != 'idle';", peerUser)
if err != nil {
return &protos.PeerStatResponse{StatData: nil}, err
return &protos.PeerStatResponse{
StatData: nil,
ErrorMessage: err.Error(),
}, err
}
defer rows.Close()
var statInfoRows []*protos.StatInfo
Expand All @@ -248,7 +260,10 @@ func (h *FlowRequestHandler) GetStatInfo(

err := rows.Scan(&pid, &waitEvent, &waitEventType, &queryStart, &query, &duration)
if err != nil {
return &protos.PeerStatResponse{StatData: nil}, err
return &protos.PeerStatResponse{
StatData: nil,
ErrorMessage: err.Error(),
}, err
}

we := waitEvent.String
Expand Down
403 changes: 212 additions & 191 deletions flow/generated/protos/route.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions nexus/pt/src/peerdb_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,16 @@ pub struct StatInfo {
pub struct PeerSlotResponse {
#[prost(message, repeated, tag="1")]
pub slot_data: ::prost::alloc::vec::Vec<SlotInfo>,
#[prost(string, tag="2")]
pub error_message: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PeerStatResponse {
#[prost(message, repeated, tag="1")]
pub stat_data: ::prost::alloc::vec::Vec<StatInfo>,
#[prost(string, tag="2")]
pub error_message: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
36 changes: 36 additions & 0 deletions nexus/pt/src/peerdb_route.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2308,10 +2308,16 @@ impl serde::Serialize for PeerSlotResponse {
if !self.slot_data.is_empty() {
len += 1;
}
if !self.error_message.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_route.PeerSlotResponse", len)?;
if !self.slot_data.is_empty() {
struct_ser.serialize_field("slotData", &self.slot_data)?;
}
if !self.error_message.is_empty() {
struct_ser.serialize_field("errorMessage", &self.error_message)?;
}
struct_ser.end()
}
}
Expand All @@ -2324,11 +2330,14 @@ impl<'de> serde::Deserialize<'de> for PeerSlotResponse {
const FIELDS: &[&str] = &[
"slot_data",
"slotData",
"error_message",
"errorMessage",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
SlotData,
ErrorMessage,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -2352,6 +2361,7 @@ impl<'de> serde::Deserialize<'de> for PeerSlotResponse {
{
match value {
"slotData" | "slot_data" => Ok(GeneratedField::SlotData),
"errorMessage" | "error_message" => Ok(GeneratedField::ErrorMessage),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -2372,6 +2382,7 @@ impl<'de> serde::Deserialize<'de> for PeerSlotResponse {
V: serde::de::MapAccess<'de>,
{
let mut slot_data__ = None;
let mut error_message__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::SlotData => {
Expand All @@ -2380,13 +2391,20 @@ impl<'de> serde::Deserialize<'de> for PeerSlotResponse {
}
slot_data__ = Some(map.next_value()?);
}
GeneratedField::ErrorMessage => {
if error_message__.is_some() {
return Err(serde::de::Error::duplicate_field("errorMessage"));
}
error_message__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
}
}
Ok(PeerSlotResponse {
slot_data: slot_data__.unwrap_or_default(),
error_message: error_message__.unwrap_or_default(),
})
}
}
Expand All @@ -2404,10 +2422,16 @@ impl serde::Serialize for PeerStatResponse {
if !self.stat_data.is_empty() {
len += 1;
}
if !self.error_message.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_route.PeerStatResponse", len)?;
if !self.stat_data.is_empty() {
struct_ser.serialize_field("statData", &self.stat_data)?;
}
if !self.error_message.is_empty() {
struct_ser.serialize_field("errorMessage", &self.error_message)?;
}
struct_ser.end()
}
}
Expand All @@ -2420,11 +2444,14 @@ impl<'de> serde::Deserialize<'de> for PeerStatResponse {
const FIELDS: &[&str] = &[
"stat_data",
"statData",
"error_message",
"errorMessage",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
StatData,
ErrorMessage,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -2448,6 +2475,7 @@ impl<'de> serde::Deserialize<'de> for PeerStatResponse {
{
match value {
"statData" | "stat_data" => Ok(GeneratedField::StatData),
"errorMessage" | "error_message" => Ok(GeneratedField::ErrorMessage),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -2468,6 +2496,7 @@ impl<'de> serde::Deserialize<'de> for PeerStatResponse {
V: serde::de::MapAccess<'de>,
{
let mut stat_data__ = None;
let mut error_message__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::StatData => {
Expand All @@ -2476,13 +2505,20 @@ impl<'de> serde::Deserialize<'de> for PeerStatResponse {
}
stat_data__ = Some(map.next_value()?);
}
GeneratedField::ErrorMessage => {
if error_message__.is_some() {
return Err(serde::de::Error::duplicate_field("errorMessage"));
}
error_message__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
}
}
Ok(PeerStatResponse {
stat_data: stat_data__.unwrap_or_default(),
error_message: error_message__.unwrap_or_default(),
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,12 @@ message StatInfo {

message PeerSlotResponse {
repeated SlotInfo slot_data = 1;
string error_message = 2;
}

message PeerStatResponse {
repeated StatInfo stat_data = 1;
string error_message = 2;
}

message SnapshotStatus {
Expand Down
8 changes: 8 additions & 0 deletions ui/app/peers/[peerName]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ const PeerData = async ({ params: { peerName } }: DataConfigProps) => {
`${flowServiceAddr}/v1/peers/slots/${peerName}`
).then((res) => res.json());

if (peerSlots.errorMessage) {
console.log(`Error fetching slots for peer ${peerName}: ${peerSlots.errorMessage}`);
}

const slotArray = peerSlots.slotData;
// slots with 'peerflow_slot' should come first
slotArray?.sort((slotA, slotB) => {
Expand Down Expand Up @@ -46,6 +50,10 @@ const PeerData = async ({ params: { peerName } }: DataConfigProps) => {
`${flowServiceAddr}/v1/peers/stats/${peerName}`
).then((res) => res.json());

if (peerStats.errorMessage) {
console.log(`Error fetching stats for peer ${peerName}: ${peerStats.errorMessage}`);
}

return peerStats.statData;
};

Expand Down
Loading

0 comments on commit 753abc6

Please sign in to comment.