Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'SHOW backend_connections GROUP BY hostname' command to spqr-console #772

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 77 additions & 17 deletions pkg/clientinteractor/interactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,12 @@ const TEXTOID = 25
// DOUBLEOID https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.dat#L223
const DOUBLEOID = 701

// INTOID https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.dat#L55
const INTOID = 20

// TODO : unit tests

// TextOidFD generates a pgproto3.FieldDescription object with the provided statement text.
// TextOidFD generates a pgproto3.FieldDescription object of TEXT type with the provided statement text.
//
// Parameters:
// - stmt (string): The statement text to use in the FieldDescription.
Expand All @@ -106,6 +109,13 @@ func TextOidFD(stmt string) pgproto3.FieldDescription {
}
}

// FloatOidFD generates a pgproto3.FieldDescription object of FLOAT8 type with the provided statement text.
//
// Parameters:
// - stmt (string): The statement text to use in the FieldDescription.
//
// Returns:
// - A pgproto3.FieldDescription object initialized with the provided statement text and default values.
func FloatOidFD(stmt string) pgproto3.FieldDescription {
return pgproto3.FieldDescription{
Name: []byte(stmt),
Expand All @@ -118,6 +128,25 @@ func FloatOidFD(stmt string) pgproto3.FieldDescription {
}
}

// IntOidFD generates a pgproto3.FieldDescription object of INT type with the provided statement text.
//
// Parameters:
// - stmt (string): The statement text to use in the FieldDescription.
//
// Returns:
// - A pgproto3.FieldDescription object initialized with the provided statement text and default values.
func IntOidFD(stmt string) pgproto3.FieldDescription {
return pgproto3.FieldDescription{
Name: []byte(stmt),
TableOID: 0,
TableAttributeNumber: 0,
DataTypeOID: INTOID,
DataTypeSize: 8,
TypeModifier: -1,
Format: 0,
}
}

// TODO : unit tests

// WriteHeader sends the row description message with the specified field descriptions.
Expand Down Expand Up @@ -1295,35 +1324,66 @@ func (pi *PSQLInteractor) KillClient(clientID uint) error {

// TODO : unit tests

// BackendConnections writes backend connection information to the PSQL client.
// BackendConnections writes backend connection information to the PSQL client. If 'GROUP BY hostname' is specified in
// the query, returned to client is connections count per backend host.
//
// Parameters:
// - ctx (context.Context): The context for the operation.
// - _ (context.Context): The context for the operation.
// - shs ([]shard.Shardinfo): The list of shard information.
// - stmt (*spqrparser.Show): The 'SHOW backend_connections' statement itself.
//
// Returns:
// - error: An error if any occurred during the operation.
func (pi *PSQLInteractor) BackendConnections(ctx context.Context, shs []shard.Shardinfo) error {
if err := pi.WriteHeader("backend connection id", "router", "shard key name", "hostname", "pid", "user", "dbname", "sync", "tx_served", "tx status"); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}
func (pi *PSQLInteractor) BackendConnections(_ context.Context, shs []shard.Shardinfo, stmt *spqrparser.Show) error {
switch t := stmt.GroupBy.(type) {
case spqrparser.GroupByEmpty:
if err := pi.WriteHeader("backend connection id", "router", "shard key name", "hostname", "pid", "user", "dbname", "sync", "tx_served", "tx status"); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}

for _, sh := range shs {
router := "no data"
s, ok := sh.(shard.CoordShardinfo)
if ok {
router = s.Router()
}

if err := pi.WriteDataRow(fmt.Sprintf("%d", sh.ID()), router, sh.ShardKeyName(), sh.InstanceHostname(), fmt.Sprintf("%d", sh.Pid()), sh.Usr(), sh.DB(), strconv.FormatInt(sh.Sync(), 10), strconv.FormatInt(sh.TxServed(), 10), sh.TxStatus().String()); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}

for _, sh := range shs {
router := "no data"
s, ok := sh.(shard.CoordShardinfo)
if ok {
router = s.Router()
}

if err := pi.WriteDataRow(fmt.Sprintf("%d", sh.ID()), router, sh.ShardKeyName(), sh.InstanceHostname(), fmt.Sprintf("%d", sh.Pid()), sh.Usr(), sh.DB(), strconv.FormatInt(sh.Sync(), 10), strconv.FormatInt(sh.TxServed(), 10), sh.TxStatus().String()); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return pi.CompleteMsg(len(shs))
case spqrparser.GroupBy:
if t.ColRef.ColName != "hostname" {
return pi.ReportError(spqrerror.NewByCode(spqrerror.SPQR_INVALID_REQUEST))
}
if err := pi.cl.Send(&pgproto3.RowDescription{
Fields: []pgproto3.FieldDescription{TextOidFD("hostname"), IntOidFD("connections count")},
}); err != nil {
spqrlog.Zero.Error().Err(err).Msg("Could not write header for backend connections")
return err
}

}
res := make(map[string]int)
for _, sh := range shs {
res[sh.InstanceHostname()]++
}

return pi.CompleteMsg(len(shs))
for hostname, count := range res {
if err := pi.WriteDataRow(hostname, fmt.Sprintf("%d", count)); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}
}

return pi.CompleteMsg(len(res))
default:
return pi.ReportError(spqrerror.NewByCode(spqrerror.SPQR_INVALID_REQUEST))
}
}

// TODO unit tests
Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func ProcessKill(ctx context.Context, stmt *spqrparser.Kill, mngr EntityMgr, poo
// - ctx (context.Context): The context for the operation.
// - stmt (*spqrparser.Show): The SHOW statement to process.
// - mngr (EntityMgr): The entity manager for managing entities.
// - ci (connectiterator.ConnectIterator): The connect iterator for connection interactions.
// - ci (connectiterator.ConnectIterator): The connects iterator for connection interactions.
// - cli (*clientinteractor.PSQLInteractor): The PSQL interactor for client interactions.
//
// Returns:
Expand All @@ -455,7 +455,7 @@ func ProcessShow(ctx context.Context, stmt *spqrparser.Show, mngr EntityMgr, ci
return err
}

return cli.BackendConnections(ctx, resp)
return cli.BackendConnections(ctx, resp, stmt)
case spqrparser.ShardsStr:
shards, err := mngr.ListShards(ctx)
if err != nil {
Expand Down
25 changes: 25 additions & 0 deletions test/feature/features/coordinator_show.feature
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,31 @@ Feature: Coordinator show clients, pools and backend_connections
]
"""

Scenario: 'show backend_connections group by hostname' works
When I run SQL on host "coordinator"
"""
SHOW backend_connections GROUP BY hostname
"""
Then command return code should be "0"
And SQL result should match json
"""
[
{
"hostname":"spqr_shard_2:6432",
"connections count": 2
}
]
"""
And SQL result should match json
"""
[
{
"hostname":"spqr_shard_1:6432",
"connections count": 2
}
]
"""

Scenario: show backend_connections collects data from 2 routers
When I run SQL on host "coordinator"
"""
Expand Down
19 changes: 16 additions & 3 deletions yacc/console/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,23 @@ type WhereClauseOp struct {
Right WhereClauseNode
}

type GroupByClause interface{}

type GroupByEmpty struct {
GroupByClause
}

type GroupBy struct {
GroupByClause

ColRef ColumnRef
}

type Show struct {
Cmd string
Where WhereClauseNode
Order OrderClause
Cmd string
Where WhereClauseNode
Order OrderClause
GroupBy GroupByClause
}

type Set struct {
Expand Down
Loading
Loading