Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into PMM-12078-HA-Phase-3-…
Browse files Browse the repository at this point in the history
…Active-Active-PoC

# Conflicts:
#	docker-compose.yml
#	managed/cmd/pmm-managed/main.go
#	managed/services/agents/deps.go
#	managed/services/agents/registry.go
#	managed/services/supervisord/devcontainer_test.go
#	managed/services/supervisord/logs.go
#	managed/services/supervisord/supervisord.go
#	managed/services/supervisord/supervisord_test.go
  • Loading branch information
BupycHuk committed Sep 18, 2023
2 parents c6ce8e3 + ffc66f0 commit 7fd5123
Show file tree
Hide file tree
Showing 62 changed files with 5,942 additions and 761 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/devcontainer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,24 @@ jobs:
ref: ${{ github.event.inputs.branch }}

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
uses: docker/setup-buildx-action@v3

- name: Login to ghcr.io registry
uses: docker/login-action@v2
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Login to docker.io registry
uses: docker/login-action@v2
uses: docker/login-action@v3
with:
registry: docker.io
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_PASSWORD }}

- name: Build and push to registries
uses: docker/build-push-action@v4
uses: docker/build-push-action@v5
with:
file: ./devcontainer.Dockerfile
push: true
Expand Down
7 changes: 2 additions & 5 deletions .github/workflows/managed.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,6 @@ jobs:
- name: Run debug commands on failure
if: ${{ failure() }}
run: |
env
go version
go env
pwd
env | sort
go env | sort
git status
kubectl version --short --output json
6 changes: 4 additions & 2 deletions agent/agentlocal/agent_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewServer(cfg configGetReloader, supervisor supervisor, client client, conf
// Run runs gRPC and JSON servers with API and debug endpoints until ctx is canceled.
//
// Run exits when ctx is canceled, or when a request to reload configuration is received.
func (s *Server) Run(ctx context.Context) {
func (s *Server) Run(ctx context.Context, reloadCh chan bool) {
defer s.l.Info("Done.")

serverCtx, serverCancel := context.WithCancel(ctx)
Expand All @@ -125,8 +125,10 @@ func (s *Server) Run(ctx context.Context) {
}()

select {
case <-ctx.Done():
case <-s.reload:
s.l.Debug("Agent reload triggered")
reloadCh <- true
case <-ctx.Done():
}

serverCancel()
Expand Down
12 changes: 12 additions & 0 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ func (c *Client) processActionResults(ctx context.Context) {
for {
select {
case result := <-c.runner.ActionsResults():
if result == nil {
continue
}
resp, err := c.channel.SendAndWaitResponse(result)
if err != nil {
c.l.Error(err)
Expand All @@ -281,6 +284,9 @@ func (c *Client) processJobsResults(ctx context.Context) {
for {
select {
case message := <-c.runner.JobsMessages():
if message == nil {
continue
}
c.channel.Send(&channel.AgentResponse{
ID: 0, // Jobs send messages that don't require any responses, so we can leave message ID blank.
Payload: message,
Expand All @@ -302,6 +308,9 @@ func (c *Client) processSupervisorRequests(ctx context.Context) {
for {
select {
case state := <-c.supervisor.Changes():
if state == nil {
continue
}
resp, err := c.channel.SendAndWaitResponse(state)
if err != nil {
c.l.Error(err)
Expand All @@ -324,6 +333,9 @@ func (c *Client) processSupervisorRequests(ctx context.Context) {
for {
select {
case collect := <-c.supervisor.QANRequests():
if collect == nil {
continue
}
resp, err := c.channel.SendAndWaitResponse(collect)
if err != nil {
c.l.Error(err)
Expand Down
134 changes: 77 additions & 57 deletions agent/commands/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,13 @@ import (

// Run implements `pmm-agent run` default command.
func Run() {
var cfg *config.Config
l := logrus.WithField("component", "main")
ctx, cancel := context.WithCancel(context.Background())
defer l.Info("Done.")

const initServerLogsMaxLength = 32 // store logs before load configuration
logStore := tailog.NewStore(initServerLogsMaxLength)
logrus.SetOutput(io.MultiWriter(os.Stderr, logStore))
l := logrus.WithField("component", "main")
rootCtx, rootCancel := context.WithCancel(context.Background())

defer l.Info("Done.")

// handle termination signals
signals := make(chan os.Signal, 1)
Expand All @@ -56,75 +55,89 @@ func Run() {
s := <-signals
signal.Stop(signals)
l.Warnf("Got %s, shutting down...", unix.SignalName(s.(unix.Signal))) //nolint:forcetypeassert
if cfg != nil {
cleanupTmp(cfg.Paths.TempDir, l)
}
cancel()
rootCancel()
}()

configStorage := config.NewStorage(nil)
configFilepath, err := configStorage.Reload(l)
if err != nil {
l.Fatalf("Failed to load configuration: %s.", err)
}

cfg = configStorage.Get()

cleanupTmp(cfg.Paths.TempDir, l)
connectionUptimeService := connectionuptime.NewService(cfg.WindowConnectedTime)
connectionUptimeService.RunCleanupGoroutine(ctx)
v := versioner.New(&versioner.RealExecFunctions{})
supervisor := supervisor.NewSupervisor(ctx, v, configStorage)
connectionChecker := connectionchecker.New(configStorage)
r := runner.New(cfg.RunnerCapacity)
client := client.New(configStorage, supervisor, r, connectionChecker, v, connectionUptimeService, logStore)
localServer := agentlocal.NewServer(configStorage, supervisor, client, configFilepath, logStore)

var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
supervisor.Run(ctx)
cancel()
}()
go func() {
defer wg.Done()
r.Run(ctx)
cancel()
}()
go func() {
defer wg.Done()
localServer.Run(ctx)
cancel()
}()
configStorage, configFilepath := prepareConfig(l)

for {
_, err = configStorage.Reload(l)
if err != nil {
l.Fatalf("Failed to load configuration: %s.", err)
}

ctx, cancel := context.WithCancel(rootCtx)
cfg := configStorage.Get()

config.ConfigureLogger(cfg)
logStore.Resize(cfg.LogLinesCount)
l.Debugf("Loaded configuration: %+v", cfg)
prepareLogger(cfg, logStore, l)

supervisor := supervisor.NewSupervisor(ctx, v, configStorage)
connectionChecker := connectionchecker.New(configStorage)
r := runner.New(cfg.RunnerCapacity)
client := client.New(configStorage, supervisor, r, connectionChecker, v, prepareConnectionService(ctx, cfg), logStore)
localServer := agentlocal.NewServer(configStorage, supervisor, client, configFilepath, logStore)

logrus.Infof("Window check connection time is %.2f hour(s)", cfg.WindowConnectedTime.Hours())
connectionUptimeService.SetWindowPeriod(cfg.WindowConnectedTime)

var wg sync.WaitGroup
wg.Add(3)
reloadCh := make(chan bool, 1)
go func() {
defer wg.Done()
supervisor.Run(ctx)
cancel()
}()
go func() {
defer wg.Done()
r.Run(ctx)
cancel()
}()
go func() {
defer wg.Done()
localServer.Run(ctx, reloadCh)
cancel()
}()

processClientUntilCancel(ctx, client, reloadCh)

cleanupTmp(cfg.Paths.TempDir, l)
wg.Wait()
select {
case <-rootCtx.Done():
return
default:
}
}
}

func processClientUntilCancel(ctx context.Context, client *client.Client, reloadCh chan bool) {
for {
clientCtx, cancelClientCtx := context.WithCancel(ctx)
client.Run(clientCtx)

_ = client.Run(clientCtx)
cancelClientCtx()

<-client.Done()

if ctx.Err() != nil {
break
select {
case <-reloadCh:
return
case <-ctx.Done():
return
default:
}
}
wg.Wait()
}

func prepareConfig(l *logrus.Entry) (*config.Storage, string) {
configStorage := config.NewStorage(nil)
configFilepath, err := configStorage.Reload(l)
if err != nil {
l.Fatalf("Failed to load configuration: %s.", err)
}

return configStorage, configFilepath
}

func prepareLogger(cfg *config.Config, logStore *tailog.Store, l *logrus.Entry) {
config.ConfigureLogger(cfg)
logStore.Resize(cfg.LogLinesCount)
l.Debugf("Loaded configuration: %+v", cfg)
}

func cleanupTmp(tmpRoot string, log *logrus.Entry) {
Expand All @@ -141,3 +154,10 @@ func cleanupTmp(tmpRoot string, log *logrus.Entry) {
}
}
}

func prepareConnectionService(ctx context.Context, cfg *config.Config) *connectionuptime.Service {
connectionUptimeService := connectionuptime.NewService(cfg.WindowConnectedTime)
connectionUptimeService.RunCleanupGoroutine(ctx)

return connectionUptimeService
}
2 changes: 0 additions & 2 deletions agent/config/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ func ConfigureLogger(cfg *Config) {
logrus.SetLevel(level)

if level == logrus.TraceLevel {
// grpclog.SetLoggerV2 is not thread-safe

// logrus.SetReportCaller thread-safe: https://github.com/sirupsen/logrus/issues/954
logrus.SetReportCaller(true)
}
Expand Down
15 changes: 15 additions & 0 deletions api-tests/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,21 @@ func AddRemoteNode(t TestingT, nodeName string) *nodes.AddRemoteNodeOKBody {
return res.Payload
}

func AddNode(t TestingT, nodeBody *nodes.AddNodeBody) *nodes.AddNodeOKBody {
t.Helper()

params := &nodes.AddNodeParams{
Body: *nodeBody,
Context: Context,
}

res, err := client.Default.Nodes.AddNode(params)
assert.NoError(t, err)
require.NotNil(t, res)

return res.Payload
}

func AddPMMAgent(t TestingT, nodeID string) *agents.AddPMMAgentOKBody {
t.Helper()

Expand Down
Loading

0 comments on commit 7fd5123

Please sign in to comment.