diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 3523489a7f..886b8c9baf 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -216,12 +216,12 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er defer rows.Close() var slotInfoRows []*protos.SlotInfo for rows.Next() { - var redoLSN string - var slotName string - var restartLSN string - var confirmedFlushLSN string - var active bool - var lagInMB float32 + var redoLSN pgtype.Text + var slotName pgtype.Text + var restartLSN pgtype.Text + var confirmedFlushLSN pgtype.Text + var active pgtype.Bool + var lagInMB pgtype.Float4 var walStatus pgtype.Text err := rows.Scan(&slotName, &redoLSN, &restartLSN, &walStatus, &confirmedFlushLSN, &active, &lagInMB) if err != nil { @@ -229,13 +229,13 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er } slotInfoRows = append(slotInfoRows, &protos.SlotInfo{ - RedoLSN: redoLSN, - RestartLSN: restartLSN, + RedoLSN: redoLSN.String, + RestartLSN: restartLSN.String, WalStatus: walStatus.String, - ConfirmedFlushLSN: confirmedFlushLSN, - SlotName: slotName, - Active: active, - LagInMb: lagInMB, + ConfirmedFlushLSN: confirmedFlushLSN.String, + SlotName: slotName.String, + Active: active.Bool, + LagInMb: lagInMB.Float32, }) } return slotInfoRows, nil