Skip to content

Commit

Permalink
Adjust grpc client.
Browse files Browse the repository at this point in the history
  • Loading branch information
blakerouse committed Oct 18, 2023
1 parent abde993 commit 9acd53a
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 34 deletions.
13 changes: 9 additions & 4 deletions internal/pkg/agent/application/upgrade/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/hashicorp/go-multierror"
"google.golang.org/grpc"

"github.com/elastic/elastic-agent/pkg/control/v2/client"
"github.com/elastic/elastic-agent/pkg/core/logger"
Expand Down Expand Up @@ -128,10 +129,14 @@ LOOP:
return
case <-connectTimer.C:
ch.log.Info("Trying to connect to agent")
err := ch.agentClient.Connect(ctx)
// block on connection, don't retry connection, and fail on temp dial errors
// always a local connection it should connect quickly so the timeout is only 1 second
connectCtx, connectCancel := context.WithTimeout(ctx, 1*time.Second)
err := ch.agentClient.Connect(connectCtx, grpc.WithBlock(), grpc.WithDisableRetry(), grpc.FailOnNonTempDialError(true))
connectCancel()
if err != nil {
ch.connectCounter++
ch.log.Error("Failed connecting to running daemon: %s", err)
ch.log.Error("Failed connecting to running daemon: ", err)
if ch.checkFailures() {
return
}
Expand All @@ -142,7 +147,7 @@ LOOP:
if err != nil {
// considered a connect error
ch.agentClient.Disconnect()
ch.log.Error("Failed to start state watch: %s", err)
ch.log.Error("Failed to start state watch: ", err)
ch.connectCounter++
if ch.checkFailures() {
return
Expand Down Expand Up @@ -170,7 +175,7 @@ LOOP:
if err != nil {
// agent has crashed or exited
ch.agentClient.Disconnect()
ch.log.Error("Failed reading next state: %s", err)
ch.log.Error("Lost connection: failed reading next state: ", err)
ch.lostCounter++
if ch.checkFailures() {
return
Expand Down
14 changes: 8 additions & 6 deletions pkg/control/v2/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import (
"sync"
"time"

"github.com/elastic/elastic-agent/pkg/control"
"github.com/elastic/elastic-agent/pkg/control/v2/cproto"
"google.golang.org/grpc"

"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/pkg/control"
"github.com/elastic/elastic-agent/pkg/control/v2/cproto"
)

// UnitType is the type of the unit
Expand Down Expand Up @@ -156,10 +157,11 @@ type DiagnosticComponentResult struct {
}

// Client communicates to Elastic Agent through the control protocol.
// go:generate mockery --name Client
//
//go:generate mockery --name Client
type Client interface {
// Connect connects to the running Elastic Agent.
Connect(ctx context.Context) error
Connect(ctx context.Context, opts ...grpc.DialOption) error
// Disconnect disconnects from the running Elastic Agent.
Disconnect()
// Version returns the current version of the running agent.
Expand Down Expand Up @@ -231,9 +233,9 @@ func New(opts ...Option) Client {
}

// Connect connects to the running Elastic Agent.
func (c *client) Connect(ctx context.Context) error {
func (c *client) Connect(ctx context.Context, opts ...grpc.DialOption) error {
c.ctx, c.cancel = context.WithCancel(ctx)
conn, err := dialContext(ctx, c.address, c.maxMsgSize)
conn, err := dialContext(ctx, c.address, c.maxMsgSize, opts...)
if err != nil {
return err
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/control/v2/client/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ import (
"google.golang.org/grpc/credentials/insecure"
)

func dialContext(ctx context.Context, address string, maxMsgSize int) (*grpc.ClientConn, error) {
return grpc.DialContext(
ctx,
address,
func dialContext(ctx context.Context, address string, maxMsgSize int, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts = append(opts,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(dialer),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
)
return grpc.DialContext(ctx, address, opts...)
}

func dialer(ctx context.Context, addr string) (net.Conn, error) {
Expand Down
7 changes: 3 additions & 4 deletions pkg/control/v2/client/dial_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ import (
"github.com/elastic/elastic-agent-libs/api/npipe"
)

func dialContext(ctx context.Context, address string, maxMsgSize int) (*grpc.ClientConn, error) {
return grpc.DialContext(
ctx,
address,
func dialContext(ctx context.Context, address string, maxMsgSize int, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts = append(opts,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(dialer),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
)
return grpc.DialContext(ctx, address, opts...)
}

func dialer(ctx context.Context, addr string) (net.Conn, error) {
Expand Down
48 changes: 32 additions & 16 deletions pkg/control/v2/client/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 9acd53a

Please sign in to comment.