Skip to content

Commit

Permalink
cirrus worker run: ability to override RPC endpoint (#173)
Browse files Browse the repository at this point in the history
* cirrus worker run: ability to override RPC endpoint

* Make RPC endpoint flag similar to agent's -api-endpoint
  • Loading branch information
edigaryev authored Nov 21, 2020
1 parent 34c699f commit b1e0d82
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 23 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/avast/retry-go v3.0.0+incompatible
github.com/bmatcuk/doublestar v1.3.2
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054
github.com/cirruslabs/cirrus-ci-agent v1.20.0
github.com/cirruslabs/cirrus-ci-agent v1.21.0
github.com/cirruslabs/echelon v1.4.0
github.com/cirruslabs/podmanapi v0.1.0
github.com/containerd/containerd v1.4.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/cirruslabs/cirrus-ci-agent v1.20.0 h1:b38J1H1MDCxZIg32cY9+9VFLvUyx+tY0hkXfmUAfNs8=
github.com/cirruslabs/cirrus-ci-agent v1.20.0/go.mod h1:ga2zCGBfC+/+tnlHWa5cHwEuBPnhFuaf1PgTpWPGJWo=
github.com/cirruslabs/cirrus-ci-agent v1.21.0 h1:cbNgskPq+frZFwEg5eD71txM/fYBJmQSaGXCGRJy8Ss=
github.com/cirruslabs/cirrus-ci-agent v1.21.0/go.mod h1:ga2zCGBfC+/+tnlHWa5cHwEuBPnhFuaf1PgTpWPGJWo=
github.com/cirruslabs/cirrus-ci-annotations v0.0.0-20200908203753-b813f63941d7/go.mod h1:98qD7HLlBx5aNqWiCH80OTTqTTsbXT69wxnlnrnoL0E=
github.com/cirruslabs/echelon v1.4.0 h1:xubCf8BLFEBl1kamBZ1zjBrcw5p4z4anvJUBeR3E5YY=
github.com/cirruslabs/echelon v1.4.0/go.mod h1:1jFBACMy3tzodXyTtNNLN9bw6UUU7Xpq9tYMRydehtY=
Expand Down
18 changes: 16 additions & 2 deletions internal/commands/worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ var (
name string
token string
labels map[string]string

// RPC-related variables.
rpcEndpointAddress string
)

func run(cmd *cobra.Command, args []string) error {
Expand All @@ -26,11 +29,17 @@ func run(cmd *cobra.Command, args []string) error {
}
}

worker, err := worker.New(
opts := []worker.Option{
worker.WithName(viper.GetString("name")),
worker.WithRegistrationToken(viper.GetString("token")),
worker.WithLabels(viper.GetStringMapString("labels")),
)
}

if rpcEndpointAddress != "" {
opts = append(opts, worker.WithRPCEndpoint(rpcEndpointAddress))
}

worker, err := worker.New(opts...)
if err != nil {
return err
}
Expand Down Expand Up @@ -61,5 +70,10 @@ func NewRunCmd() *cobra.Command {
"additional labels to use (e.g. --labels distro=debian)")
_ = viper.BindPFlag("labels", cmd.PersistentFlags().Lookup("labels"))

// RPC-related variables
cmd.PersistentFlags().StringVar(&rpcEndpointAddress, "rpc-endpoint", worker.DefaultRPCEndpoint, "RPC endpoint address")
_ = viper.BindPFlag("rpc.endpoint", cmd.PersistentFlags().Lookup("rpc-endpoint"))
_ = cmd.PersistentFlags().MarkHidden("rpc-endpoint")

return cmd
}
6 changes: 0 additions & 6 deletions internal/worker/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,3 @@ func WithRPCEndpoint(rpcEndpoint string) Option {
e.rpcEndpoint = rpcEndpoint
}
}

func WithRPCInsecure() Option {
return func(e *Worker) {
e.rpcInsecure = true
}
}
9 changes: 2 additions & 7 deletions internal/worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,10 @@ func (worker *Worker) runTask(ctx context.Context, agentAwareTask *api.PollRespo
return
}

rpcPrefix := "https://"
if worker.rpcInsecure {
rpcPrefix = "http://"
}

if err := inst.Run(taskCtx, &instance.RunConfig{
ProjectDir: "",
ContainerEndpoint: rpcPrefix + worker.rpcEndpoint,
DirectEndpoint: rpcPrefix + worker.rpcEndpoint,
ContainerEndpoint: worker.rpcEndpoint,
DirectEndpoint: worker.rpcEndpoint,
ServerSecret: agentAwareTask.ServerSecret,
ClientSecret: agentAwareTask.ClientSecret,
TaskID: agentAwareTask.TaskId,
Expand Down
11 changes: 8 additions & 3 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"github.com/certifi/gocertifi"
"github.com/cirruslabs/cirrus-ci-agent/api"
"github.com/cirruslabs/cirrus-ci-agent/pkg/grpchelper"
"github.com/cirruslabs/cirrus-cli/internal/version"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
Expand All @@ -17,7 +18,7 @@ import (
)

const (
defaultRPCEndpoint = "grpc.cirrus-ci.com:443"
DefaultRPCEndpoint = "https://grpc.cirrus-ci.com:443"
defaultPollIntervalSeconds = 10
)

Expand All @@ -28,6 +29,7 @@ var (

type Worker struct {
rpcEndpoint string
rpcTarget string
rpcInsecure bool
rpcClient api.CirrusWorkersServiceClient

Expand All @@ -46,7 +48,7 @@ type Worker struct {

func New(opts ...Option) (*Worker, error) {
worker := &Worker{
rpcEndpoint: defaultRPCEndpoint,
rpcEndpoint: DefaultRPCEndpoint,

userSpecifiedLabels: make(map[string]string),
pollIntervalSeconds: defaultPollIntervalSeconds,
Expand All @@ -62,6 +64,9 @@ func New(opts ...Option) (*Worker, error) {
opt(worker)
}

// Parse endpoint
worker.rpcTarget, worker.rpcInsecure = grpchelper.TransportSettings(worker.rpcEndpoint)

if worker.registrationToken == "" {
return nil, fmt.Errorf("%w: must provide a registration token", ErrWorker)
}
Expand Down Expand Up @@ -123,7 +128,7 @@ func (worker *Worker) Run(ctx context.Context) error {
}

// https://github.com/grpc/grpc-go/blob/master/Documentation/concurrency.md
conn, err := grpc.DialContext(subCtx, worker.rpcEndpoint, rpcSecurity)
conn, err := grpc.DialContext(subCtx, worker.rpcTarget, rpcSecurity)
if err != nil {
worker.logger.Errorf("failed to dial %s: %v", worker.rpcEndpoint, err)
}
Expand Down
3 changes: 1 addition & 2 deletions internal/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ func TestWorker(t *testing.T) {
// Start the worker
worker, err := worker.New(
worker.WithRegistrationToken(registrationToken),
worker.WithRPCEndpoint(lis.Addr().String()),
worker.WithRPCInsecure(),
worker.WithRPCEndpoint("http://"+lis.Addr().String()),
)
if err != nil {
t.Fatal(err)
Expand Down

0 comments on commit b1e0d82

Please sign in to comment.