Skip to content

Commit

Permalink
PMM-12896 Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
artemgavrilov committed Mar 15, 2024
1 parent e003f5c commit be6e0bf
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 58 deletions.
4 changes: 2 additions & 2 deletions agent/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestClient(t *testing.T) {
s.On("AgentsList").Return([]*agentlocalpb.AgentInfo{})
s.On("ClearChangesChannel").Return()

r := runner.New(cfgStorage.Get().RunnerCapacity, cfgStorage.Get().RunnerPerInstanceCapacity)
r := runner.New(cfgStorage.Get().RunnerCapacity, cfgStorage.Get().RunnerMaxConnectionsPerService)
client := New(cfgStorage, &s, r, nil, nil, nil, connectionuptime.NewService(time.Hour), nil)
err := client.Run(context.Background())
assert.NoError(t, err)
Expand Down Expand Up @@ -281,7 +281,7 @@ func TestUnexpectedActionType(t *testing.T) {
s.On("AgentsList").Return([]*agentlocalpb.AgentInfo{})
s.On("ClearChangesChannel").Return()

r := runner.New(cfgStorage.Get().RunnerCapacity, cfgStorage.Get().RunnerPerInstanceCapacity)
r := runner.New(cfgStorage.Get().RunnerCapacity, cfgStorage.Get().RunnerMaxConnectionsPerService)
client := New(cfgStorage, s, r, nil, nil, nil, connectionuptime.NewService(time.Hour), nil)
err := client.Run(context.Background())
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion agent/commands/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func Run() {
supervisor := supervisor.NewSupervisor(ctx, v, configStorage)
connectionChecker := connectionchecker.New(configStorage)
serviceInfoBroker := serviceinfobroker.New(configStorage)
r := runner.New(cfg.RunnerCapacity, cfg.RunnerPerInstanceCapacity)
r := runner.New(cfg.RunnerCapacity, cfg.RunnerMaxConnectionsPerService)
client := client.New(configStorage, supervisor, r, connectionChecker, v, serviceInfoBroker, prepareConnectionService(ctx, cfg), logStore)
localServer := agentlocal.NewServer(configStorage, supervisor, client, configFilepath, logStore)

Expand Down
14 changes: 7 additions & 7 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,11 @@ type Setup struct {
type Config struct { //nolint:musttag
// no config file there

ID string `yaml:"id"`
ListenAddress string `yaml:"listen-address"`
ListenPort uint16 `yaml:"listen-port"`
RunnerCapacity uint16 `yaml:"runner-capacity,omitempty"`
RunnerPerInstanceCapacity uint16 `yaml:"runner-per-instance-capacity,omitempty"`
ID string `yaml:"id"`
ListenAddress string `yaml:"listen-address"`
ListenPort uint16 `yaml:"listen-port"`
RunnerCapacity uint16 `yaml:"runner-capacity,omitempty"`
RunnerMaxConnectionsPerService uint16 `yaml:"runner-max-connections-per-service,omitempty"`

Server Server `yaml:"server"`
Paths Paths `yaml:"paths"`
Expand Down Expand Up @@ -354,8 +354,8 @@ func Application(cfg *Config) (*kingpin.Application, *string) {
Envar("PMM_AGENT_LISTEN_PORT").Uint16Var(&cfg.ListenPort)
app.Flag("runner-capacity", "Agent internal actions/jobs runner capacity [PMM_AGENT_RUNNER_CAPACITY]").
Envar("PMM_AGENT_RUNNER_CAPACITY").Uint16Var(&cfg.RunnerCapacity)
app.Flag("runner-per-instance-capacity", "Agent internal actions/jobs runner per DB execution limit [PMM_AGENT_RUNNER_CAPACITY]").
Envar("PMM_AGENT_RUNNER_PER_INSTANCE_CAPACITY").Uint16Var(&cfg.RunnerPerInstanceCapacity)
app.Flag("runner-max-connections-per-service", "Agent internal actions/jobs runner connections limit per DB instance").
Envar("PMM_AGENT_RUNNER_MAX_CONNECTIONS_PER_SERVICE").Uint16Var(&cfg.RunnerMaxConnectionsPerService)

app.Flag("server-address", "PMM Server address [PMM_AGENT_SERVER_ADDRESS]").
Envar("PMM_AGENT_SERVER_ADDRESS").PlaceHolder("<host:port>").StringVar(&cfg.Server.Address)
Expand Down
2 changes: 0 additions & 2 deletions agent/runner/actions/mongodb_explain_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type mongodbExplainAction struct {
id string
timeout time.Duration
params *agentpb.StartActionRequest_MongoDBExplainParams
tempDir string
dsn string
}

Expand All @@ -54,7 +53,6 @@ func NewMongoDBExplainAction(id string, timeout time.Duration, params *agentpb.S
id: id,
timeout: timeout,
params: params,
tempDir: tempDir,
dsn: dsn,
}, nil
}
Expand Down
4 changes: 0 additions & 4 deletions agent/runner/actions/mongodb_query_admincommand_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ type mongodbQueryAdmincommandAction struct {
id string
timeout time.Duration
dsn string
files *agentpb.TextFiles
command string
arg interface{}
tempDir string
}

// NewMongoDBQueryAdmincommandAction creates a MongoDB adminCommand query action.
Expand All @@ -60,10 +58,8 @@ func NewMongoDBQueryAdmincommandAction(
id: id,
timeout: timeout,
dsn: dsn,
files: files,
command: command,
arg: arg,
tempDir: tempDir,
}, nil
}

Expand Down
2 changes: 0 additions & 2 deletions agent/runner/actions/postgresql_query_select_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type postgresqlQuerySelectAction struct {
id string
timeout time.Duration
params *agentpb.StartActionRequest_PostgreSQLQuerySelectParams
tempDir string
dsn string
}

Expand All @@ -60,7 +59,6 @@ func NewPostgreSQLQuerySelectAction(id string, timeout time.Duration, params *ag
id: id,
timeout: timeout,
params: params,
tempDir: tempDir,
dsn: dsn,
}, nil
}
Expand Down
2 changes: 0 additions & 2 deletions agent/runner/actions/postgresql_query_show_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ const postgreSQLQueryShowActionType = "postgresql-query-show"
type postgresqlQueryShowAction struct {
id string
timeout time.Duration
tempDir string
dsn string
}

Expand All @@ -48,7 +47,6 @@ func NewPostgreSQLQueryShowAction(id string, timeout time.Duration, params *agen
return &postgresqlQueryShowAction{
id: id,
timeout: timeout,
tempDir: tempDir,
dsn: dsn,
}, nil
}
Expand Down
2 changes: 0 additions & 2 deletions agent/runner/actions/postgresql_show_create_table_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type postgresqlShowCreateTableAction struct {
id string
timeout time.Duration
params *agentpb.StartActionRequest_PostgreSQLShowCreateTableParams
tempDir string
dsn string
}

Expand All @@ -89,7 +88,6 @@ func NewPostgreSQLShowCreateTableAction(
id: id,
timeout: timeout,
params: params,
tempDir: tempDir,
dsn: dsn,
}, nil
}
Expand Down
2 changes: 0 additions & 2 deletions agent/runner/actions/postgresql_show_index_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type postgresqlShowIndexAction struct {
id string
timeout time.Duration
params *agentpb.StartActionRequest_PostgreSQLShowIndexParams
tempDir string
dsn string
}

Expand All @@ -52,7 +51,6 @@ func NewPostgreSQLShowIndexAction(id string, timeout time.Duration, params *agen
id: id,
timeout: timeout,
params: params,
tempDir: tempDir,
dsn: dsn,
}, nil
}
Expand Down
68 changes: 34 additions & 34 deletions agent/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
bufferSize = 256
defaultActionTimeout = 10 * time.Second // default timeout for compatibility with an older server
defaultTotalCapacity = 32 // how many concurrent operations are allowed in total
defaultDBCapacity = 2 // how many concurrent operations on a single database instance are allowed
defaultTokenCapacity = 2 // how many concurrent operations on a single resource (usually DB instance) are allowed
)

// Runner executes jobs and actions.
Expand All @@ -66,11 +66,11 @@ type Runner struct {
// gSem is a global semaphore to limit total number of concurrent operations performed by the runner.
gSem *semaphore.Weighted

// dbInstanceCapacity is a limit of concurrent operations on a single database instance.
dbInstanceCapacity uint16
// tokenCapacity is a limit of concurrent operations on a single resource, usually database instance.
tokenCapacity uint16

// lSems is a map of local semaphores to limit number of concurrent operations on a single database instance.
// Key is a hash of DSN(only host:port pair), value is a semaphore.
// Key is a token which is typically is a hash of DSN(only host:port pair), value is a semaphore.
lSemsM sync.Mutex
lSems map[string]*entry
}
Expand All @@ -82,41 +82,41 @@ type entry struct {
}

// New creates new runner. If capacity is 0 then default value is used.
func New(totalCapacity, dbInstanceCapacity uint16) *Runner {
func New(totalCapacity, tokenCapacity uint16) *Runner {
l := logrus.WithField("component", "runner")
if totalCapacity == 0 {
totalCapacity = defaultTotalCapacity
}

if dbInstanceCapacity == 0 {
dbInstanceCapacity = defaultDBCapacity
if tokenCapacity == 0 {
tokenCapacity = defaultTokenCapacity
}

l.Infof("Runner capacity set to %d, db instance capacity set to %d", totalCapacity, dbInstanceCapacity)
l.Infof("Runner capacity set to %d, token capacity set to %d", totalCapacity, tokenCapacity)

return &Runner{
l: l,
actions: make(chan actions.Action, bufferSize),
jobs: make(chan jobs.Job, bufferSize),
cancels: make(map[string]context.CancelFunc),
running: make(map[string]struct{}),
jobsMessages: make(chan agentpb.AgentResponsePayload),
actionsMessages: make(chan agentpb.AgentRequestPayload),
dbInstanceCapacity: dbInstanceCapacity,
gSem: semaphore.NewWeighted(int64(totalCapacity)),
lSems: make(map[string]*entry),
l: l,
actions: make(chan actions.Action, bufferSize),
jobs: make(chan jobs.Job, bufferSize),
cancels: make(map[string]context.CancelFunc),
running: make(map[string]struct{}),
jobsMessages: make(chan agentpb.AgentResponsePayload),
actionsMessages: make(chan agentpb.AgentRequestPayload),
tokenCapacity: tokenCapacity,
gSem: semaphore.NewWeighted(int64(totalCapacity)),
lSems: make(map[string]*entry),
}
}

// acquire acquires global and local semaphores.
func (r *Runner) acquire(ctx context.Context, id string) error {
if id != "" {
func (r *Runner) acquire(ctx context.Context, token string) error {
if token != "" {
r.lSemsM.Lock()

e, ok := r.lSems[id]
e, ok := r.lSems[token]
if !ok {
e = &entry{sem: semaphore.NewWeighted(int64(r.dbInstanceCapacity))}
r.lSems[id] = e
e = &entry{sem: semaphore.NewWeighted(int64(r.tokenCapacity))}
r.lSems[token] = e
}
r.lSemsM.Unlock()

Expand All @@ -134,16 +134,16 @@ func (r *Runner) acquire(ctx context.Context, id string) error {
}

// release releases global and local semaphores.
func (r *Runner) release(dsn string) {
func (r *Runner) release(token string) {
r.gSem.Release(1)

if dsn != "" {
if token != "" {
r.lSemsM.Lock()

if e, ok := r.lSems[dsn]; ok {
if e, ok := r.lSems[token]; ok {
e.sem.Release(1)
if v := e.count.Add(-1); v == 0 {
delete(r.lSems, dsn)
delete(r.lSems, token)
}
}
r.lSemsM.Unlock()
Expand Down Expand Up @@ -217,9 +217,9 @@ func (r *Runner) IsRunning(id string) bool {
return ok
}

// instanceID returns unique instance ID calculated as a hash from host:port part of the DSN. instanceID allows to
// createTokenFromDSN returns unique instance ID calculated as a hash from host:port part of the DSN. createTokenFromDSN allows to
// identify target database instance and limit number of concurrent operations on it.
func instanceID(dsn string) (string, error) {
func createTokenFromDSN(dsn string) (string, error) {
if dsn == "" {
return "", nil
}
Expand All @@ -244,9 +244,9 @@ func (r *Runner) handleJob(ctx context.Context, job jobs.Job) {
jobID, jobType := job.ID(), job.Type()
l := r.l.WithFields(logrus.Fields{"id": jobID, "type": jobType})

instanceID, err := instanceID(job.DSN())
token, err := createTokenFromDSN(job.DSN())
if err != nil {
r.l.Warnf("Failed to get instance ID for job: %v", err)
r.l.Warnf("Failed to get token for job: %v", err)
}

var nCtx context.Context
Expand All @@ -269,7 +269,7 @@ func (r *Runner) handleJob(ctx context.Context, job jobs.Job) {
defer r.removeCancel(jobID)

l.Debug("Acquiring tokens for a job.")
if err := r.acquire(ctx, instanceID); err != nil {
if err := r.acquire(ctx, token); err != nil {
l.Errorf("Failed to acquire token for a job: %v", err)
r.sendJobsMessage(&agentpb.JobResult{
JobId: job.ID(),
Expand All @@ -282,7 +282,7 @@ func (r *Runner) handleJob(ctx context.Context, job jobs.Job) {
})
return
}
defer r.release(instanceID)
defer r.release(token)

// Mark job as running.
r.addStarted(jobID)
Expand Down Expand Up @@ -311,7 +311,7 @@ func (r *Runner) handleAction(ctx context.Context, action actions.Action) {
actionID, actionType := action.ID(), action.Type()
l := r.l.WithFields(logrus.Fields{"id": actionID, "type": actionType})

instanceID, err := instanceID(action.DSN())
instanceID, err := createTokenFromDSN(action.DSN())
if err != nil {
r.l.Warnf("Failed to get instance ID for action: %v", err)
}
Expand Down

0 comments on commit be6e0bf

Please sign in to comment.