Skip to content

Commit

Permalink
PMM-12375 create serviceInfoBroker component
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Tymchuk committed Sep 18, 2023
1 parent 7d74fbe commit 3a86951
Show file tree
Hide file tree
Showing 55 changed files with 3,330 additions and 1,581 deletions.
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ linters:
- nlreturn # too annoying
- nosnakecase # deprecated
- scopelint # too many false positives
- structcheck # replaced by unused
- varcheck # replaced by unused
- varnamelen # useless
- wrapcheck # we do not use wrapping everywhere
- wsl # too annoying
Expand Down
2 changes: 1 addition & 1 deletion admin/commands/management/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

// AddCommand is used by Kong for CLI flags and commands.
type AddCommand struct {
External AddExternalCommand `cmd:"" help:"Add External source of data (like a custom exporter running on a port) to the monitoring"`
External AddExternalCommand `cmd:"" help:"Add External source of data (like a custom exporter running on a port) to monitoring"`
ExternalServerless AddExternalServerlessCommand `cmd:"" help:"Add External Service on Remote node to monitoring"`
HAProxy AddHAProxyCommand `cmd:"" name:"haproxy" help:"Add HAProxy to monitoring"`
MongoDB AddMongoDBCommand `cmd:"" name:"mongodb" help:"Add MongoDB to monitoring"`
Expand Down
7 changes: 6 additions & 1 deletion agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Client struct {
supervisor supervisor
connectionChecker connectionChecker
softwareVersioner softwareVersioner
serviceInfoBroker serviceInfoBroker

l *logrus.Entry
backoff *backoff.Backoff
Expand All @@ -89,12 +90,13 @@ type Client struct {
// New creates new client.
//
// Caller should call Run.
func New(cfg configGetter, supervisor supervisor, r *runner.Runner, connectionChecker connectionChecker, sv softwareVersioner, cus *connectionuptime.Service, logStore *tailog.Store) *Client { //nolint:lll
func New(cfg configGetter, supervisor supervisor, r *runner.Runner, connectionChecker connectionChecker, sv softwareVersioner, sib serviceInfoBroker, cus *connectionuptime.Service, logStore *tailog.Store) *Client { //nolint:lll
return &Client{
cfg: cfg,
supervisor: supervisor,
connectionChecker: connectionChecker,
softwareVersioner: sv,
serviceInfoBroker: sib,
l: logrus.WithField("component", "client"),
backoff: backoff.New(backoffMinDelay, backoffMaxDelay),
dialTimeout: dialTimeout,
Expand Down Expand Up @@ -388,6 +390,9 @@ loop:
case *agentpb.CheckConnectionRequest:
responsePayload = c.connectionChecker.Check(ctx, p, req.ID)

case *agentpb.ServiceInfoRequest:
responsePayload = c.serviceInfoBroker.GetInfoFromService(ctx, p, req.ID)

case *agentpb.StartJobRequest:
var resp agentpb.StartJobResponse
if err := c.handleStartJobRequest(p); err != nil {
Expand Down
12 changes: 6 additions & 6 deletions agent/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestClient(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

cfgStorage := config.NewStorage(&config.Config{})
client := New(cfgStorage, nil, nil, nil, nil, nil, nil)
client := New(cfgStorage, nil, nil, nil, nil, nil, nil, nil)
cancel()
err := client.Run(ctx)
assert.EqualError(t, err, "missing PMM Server address: context canceled")
Expand All @@ -98,7 +98,7 @@ func TestClient(t *testing.T) {
Address: "127.0.0.1:1",
},
})
client := New(cfgStorage, nil, nil, nil, nil, nil, nil)
client := New(cfgStorage, nil, nil, nil, nil, nil, nil, nil)
cancel()
err := client.Run(ctx)
assert.EqualError(t, err, "missing Agent ID: context canceled")
Expand All @@ -115,7 +115,7 @@ func TestClient(t *testing.T) {
Address: "127.0.0.1:1",
},
})
client := New(cfgStorage, nil, nil, nil, nil, connectionuptime.NewService(time.Hour), nil)
client := New(cfgStorage, nil, nil, nil, nil, nil, connectionuptime.NewService(time.Hour), nil)
err := client.Run(ctx)
assert.EqualError(t, err, "failed to dial: context deadline exceeded")
})
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestClient(t *testing.T) {
s.On("ClearChangesChannel").Return()

r := runner.New(cfgStorage.Get().RunnerCapacity)
client := New(cfgStorage, &s, r, nil, nil, connectionuptime.NewService(time.Hour), nil)
client := New(cfgStorage, &s, r, nil, nil, nil, connectionuptime.NewService(time.Hour), nil)
err := client.Run(context.Background())
assert.NoError(t, err)
assert.Equal(t, serverMD, client.GetServerConnectMetadata())
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestClient(t *testing.T) {
},
})

client := New(cfgStorage, nil, nil, nil, nil, connectionuptime.NewService(time.Hour), nil)
client := New(cfgStorage, nil, nil, nil, nil, nil, connectionuptime.NewService(time.Hour), nil)
client.dialTimeout = 100 * time.Millisecond
err := client.Run(ctx)
assert.EqualError(t, err, "failed to get server metadata: rpc error: code = Canceled desc = context canceled", "%+v", err)
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestUnexpectedActionType(t *testing.T) {
s.On("ClearChangesChannel").Return()

r := runner.New(cfgStorage.Get().RunnerCapacity)
client := New(cfgStorage, s, r, nil, nil, connectionuptime.NewService(time.Hour), nil)
client := New(cfgStorage, s, r, nil, nil, nil, connectionuptime.NewService(time.Hour), nil)
err := client.Run(context.Background())
assert.NoError(t, err)
assert.Equal(t, serverMD, client.GetServerConnectMetadata())
Expand Down
6 changes: 6 additions & 0 deletions agent/client/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

//go:generate ../../bin/mockery --name=connectionChecker --case=snake --inpackage --testonly
//go:generate ../../bin/mockery --name=serviceInfoBroker --case=snake --inpackage --testonly
//go:generate ../../bin/mockery --name=supervisor --case=snake --inpackage --testonly

// connectionChecker is a subset of methods of connectionchecker.ConnectionChecker used by this package.
Expand All @@ -32,6 +33,11 @@ type connectionChecker interface {
Check(ctx context.Context, req *agentpb.CheckConnectionRequest, id uint32) *agentpb.CheckConnectionResponse
}

// serviceInfoBroker is a subset of methods of serviceinfobroker.ServiceInfoBroker used by this package.
type serviceInfoBroker interface {
GetInfoFromService(ctx context.Context, req *agentpb.ServiceInfoRequest, id uint32) *agentpb.ServiceInfoResponse
}

// softwareVersioner is a subset of methods of version.Versioner used by this package.
type softwareVersioner interface {
MySQLdVersion() (string, error)
Expand Down
47 changes: 47 additions & 0 deletions agent/client/mock_service_info_broker_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion agent/commands/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/percona/pmm/agent/connectionchecker"
"github.com/percona/pmm/agent/connectionuptime"
"github.com/percona/pmm/agent/runner"
"github.com/percona/pmm/agent/serviceinfobroker"
"github.com/percona/pmm/agent/tailog"
"github.com/percona/pmm/agent/versioner"
"github.com/percona/pmm/api/inventorypb"
Expand Down Expand Up @@ -69,8 +70,9 @@ func Run() {

supervisor := supervisor.NewSupervisor(ctx, v, configStorage)
connectionChecker := connectionchecker.New(configStorage)
serviceInfoBroker := serviceinfobroker.New(configStorage)
r := runner.New(cfg.RunnerCapacity)
client := client.New(configStorage, supervisor, r, connectionChecker, v, prepareConnectionService(ctx, cfg), logStore)
client := client.New(configStorage, supervisor, r, connectionChecker, v, serviceInfoBroker, prepareConnectionService(ctx, cfg), logStore)
localServer := agentlocal.NewServer(configStorage, supervisor, client, configFilepath, logStore)

logrus.Infof("Window check connection time is %.2f hour(s)", cfg.WindowConnectedTime.Hours())
Expand Down
17 changes: 0 additions & 17 deletions agent/connectionchecker/connection_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"database/sql"
"fmt"
"io"
"math"
"net/http"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -140,22 +139,6 @@ func (cc *ConnectionChecker) checkMySQLConnection(ctx context.Context, dsn strin
} else {
res.Error = err.Error()
}
return &res
}

var count uint64
if err = db.QueryRowContext(ctx, "SELECT /* agent='connectionchecker' */ COUNT(*) FROM information_schema.tables").Scan(&count); err != nil {
res.Error = err.Error()
return &res
}

tableCount := int32(count)
if count > math.MaxInt32 {
tableCount = math.MaxInt32
}

res.Stats = &agentpb.CheckConnectionResponse_Stats{
TableCount: tableCount,
}

return &res
Expand Down
Loading

0 comments on commit 3a86951

Please sign in to comment.