diff --git a/cli/internal/cloudcmd/validators.go b/cli/internal/cloudcmd/validators.go index 343170ff14..a5989af457 100644 --- a/cli/internal/cloudcmd/validators.go +++ b/cli/internal/cloudcmd/validators.go @@ -23,7 +23,7 @@ import ( // NewValidator creates a new Validator. func NewValidator(cmd *cobra.Command, config config.AttestationCfg, log debugLog) (atls.Validator, error) { - return choose.Validator(config, warnLogger{cmd: cmd, log: log}) + return choose.Validator(config, WarnLogger{Cmd: cmd, Log: log}) } // UpdateInitMeasurements sets the owner and cluster measurement values. @@ -102,21 +102,21 @@ func decodeMeasurement(encoded string) ([]byte, error) { return decoded, nil } -// warnLogger implements logging of warnings for validators. -type warnLogger struct { - cmd *cobra.Command - log debugLog +// WarnLogger implements logging of warnings for validators. +type WarnLogger struct { + Cmd *cobra.Command + Log debugLog } // Infof messages are reduced to debug messages, since we don't want // the extra info when using the CLI without setting the debug flag. -func (wl warnLogger) Infof(fmtStr string, args ...any) { - wl.log.Debugf(fmtStr, args...) +func (wl WarnLogger) Infof(fmtStr string, args ...any) { + wl.Log.Debugf(fmtStr, args...) } // Warnf prints a formatted warning from the validator. -func (wl warnLogger) Warnf(fmtStr string, args ...any) { - wl.cmd.PrintErrf("Warning: %s\n", fmt.Sprintf(fmtStr, args...)) +func (wl WarnLogger) Warnf(fmtStr string, args ...any) { + wl.Cmd.PrintErrf("Warning: %s\n", fmt.Sprintf(fmtStr, args...)) } type debugLog interface { diff --git a/cli/internal/cmd/BUILD.bazel b/cli/internal/cmd/BUILD.bazel index e605f26a0f..7b0cf0fb48 100644 --- a/cli/internal/cmd/BUILD.bazel +++ b/cli/internal/cmd/BUILD.bazel @@ -60,6 +60,7 @@ go_library( "//internal/api/fetcher", "//internal/api/versionsapi", "//internal/atls", + "//internal/attestation/choose", "//internal/attestation/measurements", "//internal/attestation/snp", "//internal/attestation/variant", @@ -76,7 +77,6 @@ go_library( "//internal/featureset", "//internal/file", "//internal/grpc/dialer", - "//internal/grpc/grpclog", "//internal/grpc/retry", "//internal/helm", "//internal/kms/uri", @@ -163,6 +163,7 @@ go_test( "//internal/cloud/gcpshared", "//internal/config", "//internal/constants", + "//internal/constellation", "//internal/crypto", "//internal/crypto/testvector", "//internal/file", diff --git a/cli/internal/cmd/apply.go b/cli/internal/cmd/apply.go index 916d26c8fb..3c63898dbd 100644 --- a/cli/internal/cmd/apply.go +++ b/cli/internal/cmd/apply.go @@ -19,6 +19,7 @@ import ( "strings" "time" + "github.com/edgelesssys/constellation/v2/bootstrapper/initproto" "github.com/edgelesssys/constellation/v2/cli/internal/cloudcmd" "github.com/edgelesssys/constellation/v2/internal/api/attestationconfigapi" "github.com/edgelesssys/constellation/v2/internal/atls" @@ -31,6 +32,7 @@ import ( "github.com/edgelesssys/constellation/v2/internal/file" "github.com/edgelesssys/constellation/v2/internal/grpc/dialer" "github.com/edgelesssys/constellation/v2/internal/helm" + "github.com/edgelesssys/constellation/v2/internal/kms/uri" "github.com/edgelesssys/constellation/v2/internal/kubecmd" "github.com/edgelesssys/constellation/v2/internal/state" "github.com/edgelesssys/constellation/v2/internal/versions" @@ -252,12 +254,13 @@ func runApply(cmd *cobra.Command, _ []string) error { ) } - applier := constellation.NewApplier(log) + applier := constellation.NewApplier(log, spinner, newDialer) apply := &applyCmd{ fileHandler: fileHandler, flags: flags, log: log, + wLog: &cloudcmd.WarnLogger{Cmd: cmd, Log: log}, spinner: spinner, merger: &kubeconfigMerger{log: log}, newHelmClient: newHelmClient, @@ -279,6 +282,7 @@ type applyCmd struct { flags applyFlags log debugLog + wLog warnLog spinner spinnerInterf merger configMerger @@ -293,6 +297,23 @@ type applyCmd struct { type applier interface { CheckLicense(ctx context.Context, csp cloudprovider.Provider, licenseID string) (int, error) + GenerateMasterSecret() (uri.MasterSecret, error) + GenerateMeasurementSalt() ([]byte, error) + Init( + ctx context.Context, + validator atls.Validator, + state *state.State, + clusterLogWriter io.Writer, + payload constellation.InitPayload, + ) ( + *initproto.InitSuccessResponse, + error, + ) +} + +type warnLog interface { + Warnf(format string, args ...any) + Infof(format string, args ...any) } /* diff --git a/cli/internal/cmd/apply_test.go b/cli/internal/cmd/apply_test.go index 05269245d3..831c513a18 100644 --- a/cli/internal/cmd/apply_test.go +++ b/cli/internal/cmd/apply_test.go @@ -11,15 +11,19 @@ import ( "context" "errors" "fmt" + "io" "path/filepath" "strings" "testing" "time" + "github.com/edgelesssys/constellation/v2/bootstrapper/initproto" + "github.com/edgelesssys/constellation/v2/internal/atls" "github.com/edgelesssys/constellation/v2/internal/cloud/cloudprovider" "github.com/edgelesssys/constellation/v2/internal/cloud/gcpshared" "github.com/edgelesssys/constellation/v2/internal/config" "github.com/edgelesssys/constellation/v2/internal/constants" + "github.com/edgelesssys/constellation/v2/internal/constellation" "github.com/edgelesssys/constellation/v2/internal/file" "github.com/edgelesssys/constellation/v2/internal/helm" "github.com/edgelesssys/constellation/v2/internal/kms/uri" @@ -488,8 +492,25 @@ func newPhases(phases ...skipPhase) skipPhases { return skipPhases } -type stubConstellApplier struct{} +type stubConstellApplier struct { + checkLicenseErr error + generateMasterSecretErr error + generateMeasurementSaltErr error + initErr error +} func (s *stubConstellApplier) CheckLicense(context.Context, cloudprovider.Provider, string) (int, error) { - return 0, nil + return 0, s.checkLicenseErr +} + +func (s *stubConstellApplier) GenerateMasterSecret() (uri.MasterSecret, error) { + return uri.MasterSecret{}, s.generateMasterSecretErr +} + +func (s *stubConstellApplier) GenerateMeasurementSalt() ([]byte, error) { + return nil, s.generateMeasurementSaltErr +} + +func (s *stubConstellApplier) Init(context.Context, atls.Validator, *state.State, io.Writer, constellation.InitPayload) (*initproto.InitSuccessResponse, error) { + return nil, s.initErr } diff --git a/cli/internal/cmd/applyinit.go b/cli/internal/cmd/applyinit.go index 236117eaa0..88479da0e3 100644 --- a/cli/internal/cmd/applyinit.go +++ b/cli/internal/cmd/applyinit.go @@ -8,7 +8,6 @@ package cmd import ( "bytes" - "context" "encoding/hex" "errors" "fmt" @@ -16,22 +15,16 @@ import ( "net" "net/url" "path/filepath" - "strconv" "text/tabwriter" - "time" "github.com/edgelesssys/constellation/v2/bootstrapper/initproto" - "github.com/edgelesssys/constellation/v2/cli/internal/cloudcmd" + "github.com/edgelesssys/constellation/v2/internal/attestation/choose" "github.com/edgelesssys/constellation/v2/internal/config" "github.com/edgelesssys/constellation/v2/internal/constants" - "github.com/edgelesssys/constellation/v2/internal/crypto" + "github.com/edgelesssys/constellation/v2/internal/constellation" "github.com/edgelesssys/constellation/v2/internal/file" - grpcRetry "github.com/edgelesssys/constellation/v2/internal/grpc/retry" "github.com/edgelesssys/constellation/v2/internal/kms/uri" - "github.com/edgelesssys/constellation/v2/internal/retry" "github.com/edgelesssys/constellation/v2/internal/state" - "github.com/edgelesssys/constellation/v2/internal/versions" - "github.com/spf13/afero" "github.com/spf13/cobra" "k8s.io/client-go/tools/clientcmd" ) @@ -41,51 +34,45 @@ import ( // On success, it writes the Kubernetes admin config file to disk. // Therefore it is skipped if the Kubernetes admin config file already exists. func (a *applyCmd) runInit(cmd *cobra.Command, conf *config.Config, stateFile *state.State) (*bytes.Buffer, error) { - a.log.Debugf("Running init RPC") a.log.Debugf("Creating aTLS Validator for %s", conf.GetAttestationConfig().GetVariant()) - validator, err := cloudcmd.NewValidator(cmd, conf.GetAttestationConfig(), a.log) + validator, err := choose.Validator(conf.GetAttestationConfig(), a.wLog) if err != nil { - return nil, fmt.Errorf("creating new validator: %w", err) + return nil, fmt.Errorf("creating validator: %w", err) } - a.log.Debugf("Generating master secret") + a.log.Debugf("Running init RPC") masterSecret, err := a.generateAndPersistMasterSecret(cmd.OutOrStdout()) if err != nil { return nil, fmt.Errorf("generating master secret: %w", err) } - a.log.Debugf("Generated master secret key and salt values") - a.log.Debugf("Generating measurement salt") - measurementSalt, err := crypto.GenerateRandomBytes(crypto.RNGLengthDefault) + measurementSalt, err := a.applier.GenerateMeasurementSalt() if err != nil { return nil, fmt.Errorf("generating measurement salt: %w", err) } - a.spinner.Start("Connecting ", false) - req := &initproto.InitRequest{ - KmsUri: masterSecret.EncodeToURI(), - StorageUri: uri.NoStoreURI, - MeasurementSalt: measurementSalt, - KubernetesVersion: versions.VersionConfigs[conf.KubernetesVersion].ClusterVersion, - KubernetesComponents: versions.VersionConfigs[conf.KubernetesVersion].KubernetesComponents.ToInitProto(), - ConformanceMode: a.flags.conformance, - InitSecret: stateFile.Infrastructure.InitSecret, - ClusterName: stateFile.Infrastructure.Name, - ApiserverCertSans: stateFile.Infrastructure.APIServerCertSANs, - ServiceCidr: conf.ServiceCIDR, - } - a.log.Debugf("Sending initialization request") - resp, err := a.initCall(cmd.Context(), a.newDialer(validator), stateFile.Infrastructure.ClusterEndpoint, req) - a.spinner.Stop() - a.log.Debugf("Initialization request finished") - + clusterLogs := &bytes.Buffer{} + resp, err := a.applier.Init( + cmd.Context(), validator, stateFile, clusterLogs, + constellation.InitPayload{ + MasterSecret: masterSecret, + MeasurementSalt: measurementSalt, + K8sVersion: conf.KubernetesVersion, + ConformanceMode: a.flags.conformance, + ServiceCIDR: conf.ServiceCIDR, + }) + if len(clusterLogs.Bytes()) > 0 { + if err := a.fileHandler.Write(constants.ErrorLog, clusterLogs.Bytes(), file.OptAppend); err != nil { + return nil, fmt.Errorf("writing bootstrapper logs: %w", err) + } + } if err != nil { - var nonRetriable *nonRetriableError + var nonRetriable *constellation.NonRetriableInitError if errors.As(err, &nonRetriable) { cmd.PrintErrln("Cluster initialization failed. This error is not recoverable.") cmd.PrintErrln("Terminate your cluster and try again.") - if nonRetriable.logCollectionErr != nil { - cmd.PrintErrf("Failed to collect logs from bootstrapper: %s\n", nonRetriable.logCollectionErr) + if nonRetriable.LogCollectionErr != nil { + cmd.PrintErrf("Failed to collect logs from bootstrapper: %s\n", nonRetriable.LogCollectionErr) } else { cmd.PrintErrf("Fetched bootstrapper logs are stored in %q\n", a.flags.pathPrefixer.PrefixPrintablePath(constants.ErrorLog)) } @@ -103,49 +90,14 @@ func (a *applyCmd) runInit(cmd *cobra.Command, conf *config.Config, stateFile *s return bufferedOutput, nil } -// initCall performs the gRPC call to the bootstrapper to initialize the cluster. -func (a *applyCmd) initCall(ctx context.Context, dialer grpcDialer, ip string, req *initproto.InitRequest) (*initproto.InitSuccessResponse, error) { - doer := &initDoer{ - dialer: dialer, - endpoint: net.JoinHostPort(ip, strconv.Itoa(constants.BootstrapperPort)), - req: req, - log: a.log, - spinner: a.spinner, - fh: file.NewHandler(afero.NewOsFs()), - } - - // Create a wrapper function that allows logging any returned error from the retrier before checking if it's the expected retriable one. - serviceIsUnavailable := func(err error) bool { - isServiceUnavailable := grpcRetry.ServiceIsUnavailable(err) - a.log.Debugf("Encountered error (retriable: %t): %s", isServiceUnavailable, err) - return isServiceUnavailable - } - - a.log.Debugf("Making initialization call, doer is %+v", doer) - retrier := retry.NewIntervalRetrier(doer, 30*time.Second, serviceIsUnavailable) - if err := retrier.Do(ctx); err != nil { - return nil, err - } - return doer.resp, nil -} - // generateAndPersistMasterSecret generates a 32 byte master secret and saves it to disk. func (a *applyCmd) generateAndPersistMasterSecret(outWriter io.Writer) (uri.MasterSecret, error) { - // No file given, generate a new secret, and save it to disk - key, err := crypto.GenerateRandomBytes(crypto.MasterSecretLengthDefault) - if err != nil { - return uri.MasterSecret{}, err - } - salt, err := crypto.GenerateRandomBytes(crypto.RNGLengthDefault) + secret, err := a.applier.GenerateMasterSecret() if err != nil { - return uri.MasterSecret{}, err - } - secret := uri.MasterSecret{ - Key: key, - Salt: salt, + return uri.MasterSecret{}, fmt.Errorf("generating master secret: %w", err) } if err := a.fileHandler.WriteJSON(constants.MasterSecretFilename, secret, file.OptNone); err != nil { - return uri.MasterSecret{}, err + return uri.MasterSecret{}, fmt.Errorf("writing master secret: %w", err) } fmt.Fprintf(outWriter, "Your Constellation master secret was successfully written to %q\n", a.flags.pathPrefixer.PrefixPrintablePath(constants.MasterSecretFilename)) return secret, nil diff --git a/cli/internal/cmd/init.go b/cli/internal/cmd/init.go index 17b22d2fd7..8bffeb521b 100644 --- a/cli/internal/cmd/init.go +++ b/cli/internal/cmd/init.go @@ -8,11 +8,9 @@ package cmd import ( "context" - "errors" "fmt" "io" "os" - "sync" "time" "github.com/spf13/cobra" @@ -22,13 +20,10 @@ import ( clientcodec "k8s.io/client-go/tools/clientcmd/api/latest" "sigs.k8s.io/yaml" - "github.com/edgelesssys/constellation/v2/bootstrapper/initproto" "github.com/edgelesssys/constellation/v2/internal/attestation/variant" "github.com/edgelesssys/constellation/v2/internal/cloud/cloudprovider" "github.com/edgelesssys/constellation/v2/internal/config" - "github.com/edgelesssys/constellation/v2/internal/constants" "github.com/edgelesssys/constellation/v2/internal/file" - "github.com/edgelesssys/constellation/v2/internal/grpc/grpclog" "github.com/edgelesssys/constellation/v2/internal/helm" "github.com/edgelesssys/constellation/v2/internal/kms/uri" "github.com/edgelesssys/constellation/v2/internal/semver" @@ -61,142 +56,6 @@ func NewInitCmd() *cobra.Command { return cmd } -type initDoer struct { - dialer grpcDialer - endpoint string - req *initproto.InitRequest - resp *initproto.InitSuccessResponse - log debugLog - spinner spinnerInterf - connectedOnce bool - fh file.Handler -} - -func (d *initDoer) Do(ctx context.Context) error { - // connectedOnce is set in handleGRPCStateChanges when a connection was established in one retry attempt. - // This should cancel any other retry attempts when the connection is lost since the bootstrapper likely won't accept any new attempts anymore. - if d.connectedOnce { - return &nonRetriableError{ - logCollectionErr: errors.New("init already connected to the remote server in a previous attempt - resumption is not supported"), - err: errors.New("init already connected to the remote server in a previous attempt - resumption is not supported"), - } - } - - conn, err := d.dialer.Dial(ctx, d.endpoint) - if err != nil { - d.log.Debugf("Dialing init server failed: %s. Retrying...", err) - return fmt.Errorf("dialing init server: %w", err) - } - defer conn.Close() - - var wg sync.WaitGroup - defer wg.Wait() - - grpcStateLogCtx, grpcStateLogCancel := context.WithCancel(ctx) - defer grpcStateLogCancel() - d.handleGRPCStateChanges(grpcStateLogCtx, &wg, conn) - - protoClient := initproto.NewAPIClient(conn) - d.log.Debugf("Created protoClient") - resp, err := protoClient.Init(ctx, d.req) - if err != nil { - return &nonRetriableError{ - logCollectionErr: errors.New("rpc failed before first response was received - no logs available"), - err: fmt.Errorf("init call: %w", err), - } - } - - res, err := resp.Recv() // get first response, either success or failure - if err != nil { - if e := d.getLogs(resp); e != nil { - d.log.Debugf("Failed to collect logs: %s", e) - return &nonRetriableError{ - logCollectionErr: e, - err: err, - } - } - return &nonRetriableError{err: err} - } - - switch res.Kind.(type) { - case *initproto.InitResponse_InitFailure: - if e := d.getLogs(resp); e != nil { - d.log.Debugf("Failed to get logs from cluster: %s", e) - return &nonRetriableError{ - logCollectionErr: e, - err: errors.New(res.GetInitFailure().GetError()), - } - } - return &nonRetriableError{err: errors.New(res.GetInitFailure().GetError())} - case *initproto.InitResponse_InitSuccess: - d.resp = res.GetInitSuccess() - case nil: - d.log.Debugf("Cluster returned nil response type") - err = errors.New("empty response from cluster") - if e := d.getLogs(resp); e != nil { - d.log.Debugf("Failed to collect logs: %s", e) - return &nonRetriableError{ - logCollectionErr: e, - err: err, - } - } - return &nonRetriableError{err: err} - default: - d.log.Debugf("Cluster returned unknown response type") - err = errors.New("unknown response from cluster") - if e := d.getLogs(resp); e != nil { - d.log.Debugf("Failed to collect logs: %s", e) - return &nonRetriableError{ - logCollectionErr: e, - err: err, - } - } - return &nonRetriableError{err: err} - } - - return nil -} - -func (d *initDoer) getLogs(resp initproto.API_InitClient) error { - d.log.Debugf("Attempting to collect cluster logs") - for { - res, err := resp.Recv() - if err == io.EOF { - break - } - if err != nil { - return err - } - - switch res.Kind.(type) { - case *initproto.InitResponse_InitFailure: - return errors.New("trying to collect logs: received init failure response, expected log response") - case *initproto.InitResponse_InitSuccess: - return errors.New("trying to collect logs: received init success response, expected log response") - case nil: - return errors.New("trying to collect logs: received nil response, expected log response") - } - - log := res.GetLog().GetLog() - if log == nil { - return errors.New("received empty logs") - } - - if err := d.fh.Write(constants.ErrorLog, log, file.OptAppend); err != nil { - return err - } - } - return nil -} - -func (d *initDoer) handleGRPCStateChanges(ctx context.Context, wg *sync.WaitGroup, conn *grpc.ClientConn) { - grpclog.LogStateChangesUntilReady(ctx, conn, d.log, wg, func() { - d.connectedOnce = true - d.spinner.Stop() - d.spinner.Start("Initializing cluster ", false) - }) -} - func writeRow(wr io.Writer, col1 string, col2 string) { fmt.Fprint(wr, col1, "\t", col2, "\n") } @@ -257,22 +116,6 @@ func (c *kubeconfigMerger) kubeconfigEnvVar() string { type grpcDialer interface { Dial(ctx context.Context, target string) (*grpc.ClientConn, error) } - -type nonRetriableError struct { - logCollectionErr error - err error -} - -// Error returns the error message. -func (e *nonRetriableError) Error() string { - return e.err.Error() -} - -// Unwrap returns the wrapped error. -func (e *nonRetriableError) Unwrap() error { - return e.err -} - type helmApplier interface { PrepareApply( csp cloudprovider.Provider, attestationVariant variant.Variant, k8sVersion versions.ValidK8sVersion, microserviceVersion semver.Semver, stateFile *state.State, diff --git a/cli/internal/cmd/init_test.go b/cli/internal/cmd/init_test.go index ab14669884..d1f1050275 100644 --- a/cli/internal/cmd/init_test.go +++ b/cli/internal/cmd/init_test.go @@ -29,6 +29,7 @@ import ( "github.com/edgelesssys/constellation/v2/internal/cloud/gcpshared" "github.com/edgelesssys/constellation/v2/internal/config" "github.com/edgelesssys/constellation/v2/internal/constants" + "github.com/edgelesssys/constellation/v2/internal/constellation" "github.com/edgelesssys/constellation/v2/internal/file" "github.com/edgelesssys/constellation/v2/internal/grpc/atlscredentials" "github.com/edgelesssys/constellation/v2/internal/grpc/dialer" @@ -126,7 +127,7 @@ func TestInitialize(t *testing.T) { "non retriable error": { provider: cloudprovider.QEMU, stateFile: preInitStateFile(cloudprovider.QEMU), - initServerAPI: &stubInitServer{initErr: &nonRetriableError{err: assert.AnError}}, + initServerAPI: &stubInitServer{initErr: &constellation.NonRetriableInitError{Err: assert.AnError}}, retriable: false, masterSecretShouldExist: true, wantErr: true, @@ -279,7 +280,11 @@ func TestInitialize(t *testing.T) { getClusterAttestationConfigErr: k8serrors.NewNotFound(schema.GroupResource{}, ""), }, nil }, - applier: &stubConstellApplier{}, + applier: constellation.NewApplier( + logger.NewTest(t), + &nopSpinner{}, + newDialer, + ), } err := i.apply(cmd, stubAttestationFetcher{}, "test") @@ -330,63 +335,6 @@ func (s stubRunner) SaveCharts(_ string, _ file.Handler) error { return s.saveChartsErr } -func TestGetLogs(t *testing.T) { - someErr := errors.New("failed") - - testCases := map[string]struct { - resp initproto.API_InitClient - fh file.Handler - wantedOutput []byte - wantErr bool - }{ - "success": { - resp: stubInitClient{res: bytes.NewReader([]byte("asdf"))}, - fh: file.NewHandler(afero.NewMemMapFs()), - wantedOutput: []byte("asdf"), - }, - "receive error": { - resp: stubInitClient{err: someErr}, - fh: file.NewHandler(afero.NewMemMapFs()), - wantErr: true, - }, - "nil log": { - resp: stubInitClient{res: bytes.NewReader([]byte{1}), setResNil: true}, - fh: file.NewHandler(afero.NewMemMapFs()), - wantErr: true, - }, - "failed write": { - resp: stubInitClient{res: bytes.NewReader([]byte("asdf"))}, - fh: file.NewHandler(afero.NewReadOnlyFs(afero.NewMemMapFs())), - wantErr: true, - }, - } - - for name, tc := range testCases { - t.Run(name, func(t *testing.T) { - assert := assert.New(t) - - doer := initDoer{ - fh: tc.fh, - log: logger.NewTest(t), - } - - err := doer.getLogs(tc.resp) - - if tc.wantErr { - assert.Error(err) - } - - text, err := tc.fh.Read(constants.ErrorLog) - - if tc.wantedOutput == nil { - assert.Error(err) - } - - assert.Equal(tc.wantedOutput, text) - }) - } -} - func TestWriteOutput(t *testing.T) { assert := assert.New(t) require := require.New(t) @@ -545,6 +493,7 @@ func TestGenerateMasterSecret(t *testing.T) { i := &applyCmd{ fileHandler: fileHandler, log: logger.NewTest(t), + applier: constellation.NewApplier(logger.NewTest(t), &nopSpinner{}, nil), } secret, err := i.generateAndPersistMasterSecret(&out) @@ -647,6 +596,7 @@ func TestAttestation(t *testing.T) { return &stubKubernetesUpgrader{}, nil }, newDialer: newDialer, + applier: constellation.NewApplier(logger.NewTest(t), &nopSpinner{}, newDialer), } _, err := i.runInit(cmd, cfg, existingStateFile) assert.Error(err) @@ -781,39 +731,3 @@ func defaultConfigWithExpectedMeasurements(t *testing.T, conf *config.Config, cs return conf } - -type stubInitClient struct { - res io.Reader - err error - setResNil bool - grpc.ClientStream -} - -func (c stubInitClient) Recv() (*initproto.InitResponse, error) { - if c.err != nil { - return &initproto.InitResponse{}, c.err - } - - text := make([]byte, 1024) - n, err := c.res.Read(text) - text = text[:n] - - res := &initproto.InitResponse{ - Kind: &initproto.InitResponse_Log{ - Log: &initproto.LogResponseType{ - Log: text, - }, - }, - } - if c.setResNil { - res = &initproto.InitResponse{ - Kind: &initproto.InitResponse_Log{ - Log: &initproto.LogResponseType{ - Log: nil, - }, - }, - } - } - - return res, err -} diff --git a/internal/constellation/BUILD.bazel b/internal/constellation/BUILD.bazel index 53dacaf043..8e98c09ff2 100644 --- a/internal/constellation/BUILD.bazel +++ b/internal/constellation/BUILD.bazel @@ -1,15 +1,55 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("//bazel/go:go_test.bzl", "go_test") go_library( name = "constellation", srcs = [ "apply.go", + "applyinit.go", "constellation.go", ], importpath = "github.com/edgelesssys/constellation/v2/internal/constellation", visibility = ["//:__subpackages__"], deps = [ + "//bootstrapper/initproto", + "//internal/atls", "//internal/cloud/cloudprovider", + "//internal/constants", + "//internal/crypto", + "//internal/grpc/dialer", + "//internal/grpc/grpclog", + "//internal/grpc/retry", + "//internal/kms/uri", "//internal/license", + "//internal/retry", + "//internal/state", + "//internal/versions", + "@org_golang_google_grpc//:go_default_library", + ], +) + +go_test( + name = "constellation_test", + srcs = [ + "apply_test.go", + "applyinit_test.go", + ], + embed = [":constellation"], + deps = [ + "//bootstrapper/initproto", + "//internal/atls", + "//internal/cloud/cloudprovider", + "//internal/constants", + "//internal/crypto", + "//internal/grpc/atlscredentials", + "//internal/grpc/dialer", + "//internal/grpc/testdialer", + "//internal/kms/uri", + "//internal/license", + "//internal/logger", + "//internal/state", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@org_golang_google_grpc//:go_default_library", ], ) diff --git a/internal/constellation/apply.go b/internal/constellation/apply.go index 91e576e480..286a9b22b6 100644 --- a/internal/constellation/apply.go +++ b/internal/constellation/apply.go @@ -10,15 +10,28 @@ import ( "context" "fmt" + "github.com/edgelesssys/constellation/v2/internal/atls" "github.com/edgelesssys/constellation/v2/internal/cloud/cloudprovider" + "github.com/edgelesssys/constellation/v2/internal/crypto" + "github.com/edgelesssys/constellation/v2/internal/grpc/dialer" + "github.com/edgelesssys/constellation/v2/internal/kms/uri" "github.com/edgelesssys/constellation/v2/internal/license" ) -// An Applier handles applying a specific configuration to a Constellation cluster. +// An Applier handles applying a specific configuration to a Constellation cluster +// with existing Infrastructure. // In Particular, this involves Initialization and Upgrading of the cluster. type Applier struct { log debugLog - licenseChecker *license.Checker + licenseChecker licenseChecker + spinner spinnerInterf + + // newDialer creates a new aTLS gRPC dialer. + newDialer func(validator atls.Validator) *dialer.Dialer +} + +type licenseChecker interface { + CheckLicense(context.Context, cloudprovider.Provider, string) (license.QuotaCheckResponse, error) } type debugLog interface { @@ -26,10 +39,16 @@ type debugLog interface { } // NewApplier creates a new Applier. -func NewApplier(log debugLog) *Applier { +func NewApplier( + log debugLog, + spinner spinnerInterf, + newDialer func(validator atls.Validator) *dialer.Dialer, +) *Applier { return &Applier{ log: log, + spinner: spinner, licenseChecker: license.NewChecker(license.NewClient()), + newDialer: newDialer, } } @@ -45,3 +64,33 @@ func (a *Applier) CheckLicense(ctx context.Context, csp cloudprovider.Provider, return quotaResp.Quota, nil } + +// GenerateMasterSecret generates a new master secret. +func (a *Applier) GenerateMasterSecret() (uri.MasterSecret, error) { + a.log.Debugf("Generating master secret") + key, err := crypto.GenerateRandomBytes(crypto.MasterSecretLengthDefault) + if err != nil { + return uri.MasterSecret{}, err + } + salt, err := crypto.GenerateRandomBytes(crypto.RNGLengthDefault) + if err != nil { + return uri.MasterSecret{}, err + } + secret := uri.MasterSecret{ + Key: key, + Salt: salt, + } + a.log.Debugf("Generated master secret key and salt values") + return secret, nil +} + +// GenerateMeasurementSalt generates a new measurement salt. +func (a *Applier) GenerateMeasurementSalt() ([]byte, error) { + a.log.Debugf("Generating measurement salt") + measurementSalt, err := crypto.GenerateRandomBytes(crypto.RNGLengthDefault) + if err != nil { + return nil, fmt.Errorf("generating measurement salt: %w", err) + } + a.log.Debugf("Generated measurement salt") + return measurementSalt, nil +} diff --git a/internal/constellation/apply_test.go b/internal/constellation/apply_test.go new file mode 100644 index 0000000000..ce2466fdfd --- /dev/null +++ b/internal/constellation/apply_test.go @@ -0,0 +1,74 @@ +/* +Copyright (c) Edgeless Systems GmbH + +SPDX-License-Identifier: AGPL-3.0-only +*/ + +package constellation + +import ( + "context" + "testing" + + "github.com/edgelesssys/constellation/v2/internal/cloud/cloudprovider" + "github.com/edgelesssys/constellation/v2/internal/crypto" + "github.com/edgelesssys/constellation/v2/internal/license" + "github.com/edgelesssys/constellation/v2/internal/logger" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCheckLicense(t *testing.T) { + testCases := map[string]struct { + licenseChecker *stubLicenseChecker + wantErr bool + }{ + "success": { + licenseChecker: &stubLicenseChecker{}, + wantErr: false, + }, + "check license error": { + licenseChecker: &stubLicenseChecker{checkLicenseErr: assert.AnError}, + wantErr: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + require := require.New(t) + + a := &Applier{licenseChecker: tc.licenseChecker, log: logger.NewTest(t)} + _, err := a.CheckLicense(context.Background(), cloudprovider.Unknown, license.CommunityLicense) + if tc.wantErr { + require.Error(err) + } else { + require.NoError(err) + } + }) + } +} + +type stubLicenseChecker struct { + checkLicenseErr error +} + +func (c *stubLicenseChecker) CheckLicense(context.Context, cloudprovider.Provider, string) (license.QuotaCheckResponse, error) { + return license.QuotaCheckResponse{}, c.checkLicenseErr +} + +func TestGenerateMasterSecret(t *testing.T) { + assert := assert.New(t) + a := &Applier{log: logger.NewTest(t)} + sec, err := a.GenerateMasterSecret() + assert.NoError(err) + assert.Len(sec.Key, crypto.MasterSecretLengthDefault) + assert.Len(sec.Key, crypto.RNGLengthDefault) +} + +func TestGenerateMeasurementSalt(t *testing.T) { + assert := assert.New(t) + a := &Applier{log: logger.NewTest(t)} + salt, err := a.GenerateMeasurementSalt() + assert.NoError(err) + assert.Len(salt, crypto.RNGLengthDefault) +} diff --git a/internal/constellation/applyinit.go b/internal/constellation/applyinit.go new file mode 100644 index 0000000000..ab231f75fc --- /dev/null +++ b/internal/constellation/applyinit.go @@ -0,0 +1,267 @@ +/* +Copyright (c) Edgeless Systems GmbH + +SPDX-License-Identifier: AGPL-3.0-only +*/ + +package constellation + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "strconv" + "sync" + "time" + + "github.com/edgelesssys/constellation/v2/bootstrapper/initproto" + "github.com/edgelesssys/constellation/v2/internal/atls" + "github.com/edgelesssys/constellation/v2/internal/constants" + "github.com/edgelesssys/constellation/v2/internal/grpc/grpclog" + grpcRetry "github.com/edgelesssys/constellation/v2/internal/grpc/retry" + "github.com/edgelesssys/constellation/v2/internal/kms/uri" + "github.com/edgelesssys/constellation/v2/internal/retry" + "github.com/edgelesssys/constellation/v2/internal/state" + "github.com/edgelesssys/constellation/v2/internal/versions" + "google.golang.org/grpc" +) + +// InitPayload contains the configurable data for the init RPC. +type InitPayload struct { + MasterSecret uri.MasterSecret + MeasurementSalt []byte + K8sVersion versions.ValidK8sVersion + ConformanceMode bool + ServiceCIDR string +} + +// GrpcDialer dials a gRPC server. +type GrpcDialer interface { + Dial(ctx context.Context, target string) (*grpc.ClientConn, error) +} + +// Init performs the init RPC. +func (a *Applier) Init( + ctx context.Context, + validator atls.Validator, + state *state.State, + clusterLogWriter io.Writer, + payload InitPayload, +) ( + *initproto.InitSuccessResponse, + error, +) { + // Prepare the Request + req := &initproto.InitRequest{ + KmsUri: payload.MasterSecret.EncodeToURI(), + StorageUri: uri.NoStoreURI, + MeasurementSalt: payload.MeasurementSalt, + KubernetesVersion: versions.VersionConfigs[payload.K8sVersion].ClusterVersion, + KubernetesComponents: versions.VersionConfigs[payload.K8sVersion].KubernetesComponents.ToInitProto(), + ConformanceMode: payload.ConformanceMode, + InitSecret: state.Infrastructure.InitSecret, + ClusterName: state.Infrastructure.Name, + ApiserverCertSans: state.Infrastructure.APIServerCertSANs, + ServiceCidr: payload.ServiceCIDR, + } + + doer := &initDoer{ + dialer: a.newDialer(validator), + endpoint: net.JoinHostPort( + state.Infrastructure.ClusterEndpoint, + strconv.Itoa(constants.BootstrapperPort), + ), + req: req, + log: a.log, + clusterLogWriter: clusterLogWriter, + spinner: a.spinner, + } + + // Create a wrapper function that allows logging any returned error from the retrier before checking if it's the expected retriable one. + serviceIsUnavailable := func(err error) bool { + isServiceUnavailable := grpcRetry.ServiceIsUnavailable(err) + a.log.Debugf("Encountered error (retriable: %t): %s", isServiceUnavailable, err) + return isServiceUnavailable + } + + // Perform the RPC + a.log.Debugf("Making initialization call, doer is %+v", doer) + a.spinner.Start("Connecting ", false) + retrier := retry.NewIntervalRetrier(doer, 30*time.Second, serviceIsUnavailable) + if err := retrier.Do(ctx); err != nil { + return nil, fmt.Errorf("doing init call: %w", err) + } + a.spinner.Stop() + a.log.Debugf("Initialization request finished") + + return doer.resp, nil +} + +// the initDoer performs the actual init RPC with retry logic. +type initDoer struct { + dialer GrpcDialer + endpoint string + req *initproto.InitRequest + log debugLog + connectedOnce bool + spinner spinnerInterf + + // clusterLogWriter is the writer to which the cluster logs are written. + clusterLogWriter io.Writer + + // Read-Only-fields: + + // resp is the response returned upon successful initialization. + resp *initproto.InitSuccessResponse +} + +type spinnerInterf interface { + Start(text string, showDots bool) + Stop() + io.Writer +} + +// Do performs the init gRPC call. +func (d *initDoer) Do(ctx context.Context) error { + // connectedOnce is set in handleGRPCStateChanges when a connection was established in one retry attempt. + // This should cancel any other retry attempts when the connection is lost since the bootstrapper likely won't accept any new attempts anymore. + if d.connectedOnce { + return &NonRetriableInitError{ + LogCollectionErr: errors.New("init already connected to the remote server in a previous attempt - resumption is not supported"), + Err: errors.New("init already connected to the remote server in a previous attempt - resumption is not supported"), + } + } + + conn, err := d.dialer.Dial(ctx, d.endpoint) + if err != nil { + d.log.Debugf("Dialing init server failed: %s. Retrying...", err) + return fmt.Errorf("dialing init server: %w", err) + } + defer conn.Close() + + var wg sync.WaitGroup + defer wg.Wait() + + grpcStateLogCtx, grpcStateLogCancel := context.WithCancel(ctx) + defer grpcStateLogCancel() + d.handleGRPCStateChanges(grpcStateLogCtx, &wg, conn) + + protoClient := initproto.NewAPIClient(conn) + d.log.Debugf("Created protoClient") + resp, err := protoClient.Init(ctx, d.req) + if err != nil { + return &NonRetriableInitError{ + LogCollectionErr: errors.New("rpc failed before first response was received - no logs available"), + Err: fmt.Errorf("init call: %w", err), + } + } + + res, err := resp.Recv() // get first response, either success or failure + if err != nil { + if e := d.getLogs(resp); e != nil { + d.log.Debugf("Failed to collect logs: %s", e) + return &NonRetriableInitError{ + LogCollectionErr: e, + Err: err, + } + } + return &NonRetriableInitError{Err: err} + } + + switch res.Kind.(type) { + case *initproto.InitResponse_InitSuccess: + d.resp = res.GetInitSuccess() + case *initproto.InitResponse_InitFailure: + if e := d.getLogs(resp); e != nil { + d.log.Debugf("Failed to get logs from cluster: %s", e) + return &NonRetriableInitError{ + LogCollectionErr: e, + Err: errors.New(res.GetInitFailure().GetError()), + } + } + return &NonRetriableInitError{Err: errors.New(res.GetInitFailure().GetError())} + case nil: + d.log.Debugf("Cluster returned nil response type") + err = errors.New("empty response from cluster") + if e := d.getLogs(resp); e != nil { + d.log.Debugf("Failed to collect logs: %s", e) + return &NonRetriableInitError{ + LogCollectionErr: e, + Err: err, + } + } + return &NonRetriableInitError{Err: err} + default: + d.log.Debugf("Cluster returned unknown response type") + err = errors.New("unknown response from cluster") + if e := d.getLogs(resp); e != nil { + d.log.Debugf("Failed to collect logs: %s", e) + return &NonRetriableInitError{ + LogCollectionErr: e, + Err: err, + } + } + return &NonRetriableInitError{Err: err} + } + return nil +} + +// getLogs retrieves the cluster logs from the bootstrapper and saves them in the initDoer. +func (d *initDoer) getLogs(resp initproto.API_InitClient) error { + d.log.Debugf("Attempting to collect cluster logs") + for { + res, err := resp.Recv() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("receiving logs: %w", err) + } + + switch res.Kind.(type) { + case *initproto.InitResponse_InitFailure: + return errors.New("trying to collect logs: received init failure response, expected log response") + case *initproto.InitResponse_InitSuccess: + return errors.New("trying to collect logs: received init success response, expected log response") + case nil: + return errors.New("trying to collect logs: received nil response, expected log response") + } + + log := res.GetLog().GetLog() + if log == nil { + return errors.New("received empty logs") + } + if _, err := d.clusterLogWriter.Write(log); err != nil { + return fmt.Errorf("writing logs: %w", err) + } + } + + d.log.Debugf("Received cluster logs") + return nil +} + +func (d *initDoer) handleGRPCStateChanges(ctx context.Context, wg *sync.WaitGroup, conn *grpc.ClientConn) { + grpclog.LogStateChangesUntilReady(ctx, conn, d.log, wg, func() { + d.connectedOnce = true + d.spinner.Stop() + d.spinner.Start("Initializing cluster ", false) + }) +} + +// NonRetriableInitError is returned when the init RPC fails and the error is not retriable. +type NonRetriableInitError struct { + LogCollectionErr error + Err error +} + +// Error returns the error message. +func (e *NonRetriableInitError) Error() string { + return e.Err.Error() +} + +// Unwrap returns the wrapped error. +func (e *NonRetriableInitError) Unwrap() error { + return e.Err +} diff --git a/internal/constellation/applyinit_test.go b/internal/constellation/applyinit_test.go new file mode 100644 index 0000000000..52b14f620e --- /dev/null +++ b/internal/constellation/applyinit_test.go @@ -0,0 +1,234 @@ +/* +Copyright (c) Edgeless Systems GmbH + +SPDX-License-Identifier: AGPL-3.0-only +*/ + +package constellation + +import ( + "bytes" + "context" + "io" + "net" + "strconv" + "testing" + "time" + + "github.com/edgelesssys/constellation/v2/bootstrapper/initproto" + "github.com/edgelesssys/constellation/v2/internal/atls" + "github.com/edgelesssys/constellation/v2/internal/constants" + "github.com/edgelesssys/constellation/v2/internal/grpc/atlscredentials" + "github.com/edgelesssys/constellation/v2/internal/grpc/dialer" + "github.com/edgelesssys/constellation/v2/internal/grpc/testdialer" + "github.com/edgelesssys/constellation/v2/internal/kms/uri" + "github.com/edgelesssys/constellation/v2/internal/logger" + "github.com/edgelesssys/constellation/v2/internal/state" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +func TestInit(t *testing.T) { + clusterEndpoint := "192.0.2.1" + newState := func(endpoint string) *state.State { + return &state.State{ + Infrastructure: state.Infrastructure{ + ClusterEndpoint: endpoint, + }, + } + } + newInitServer := func(initErr error, responses ...*initproto.InitResponse) *stubInitServer { + return &stubInitServer{ + res: responses, + initErr: initErr, + } + } + + testCases := map[string]struct { + server initproto.APIServer + state *state.State + initServerEndpoint string + wantClusterLogs []byte + wantErr bool + }{ + "success": { + server: newInitServer(nil, + &initproto.InitResponse{ + Kind: &initproto.InitResponse_InitSuccess{ + InitSuccess: &initproto.InitSuccessResponse{ + Kubeconfig: []byte{}, + OwnerId: []byte{}, + ClusterId: []byte{}, + }, + }, + }), + state: newState(clusterEndpoint), + initServerEndpoint: clusterEndpoint, + }, + "no response": { + server: newInitServer(nil), + state: newState(clusterEndpoint), + initServerEndpoint: clusterEndpoint, + wantErr: true, + }, + "nil response": { + server: newInitServer(nil, &initproto.InitResponse{Kind: nil}), + state: newState(clusterEndpoint), + initServerEndpoint: clusterEndpoint, + wantErr: true, + }, + "failure response": { + server: newInitServer(nil, + &initproto.InitResponse{ + Kind: &initproto.InitResponse_InitFailure{ + InitFailure: &initproto.InitFailureResponse{ + Error: assert.AnError.Error(), + }, + }, + }), + state: newState(clusterEndpoint), + initServerEndpoint: clusterEndpoint, + wantErr: true, + }, + "setup server error": { + server: newInitServer(assert.AnError), + state: newState(clusterEndpoint), + initServerEndpoint: clusterEndpoint, + wantErr: true, + }, + "expected log response, got failure": { + server: newInitServer(nil, + &initproto.InitResponse{ + Kind: &initproto.InitResponse_InitFailure{ + InitFailure: &initproto.InitFailureResponse{ + Error: assert.AnError.Error(), + }, + }, + }, + &initproto.InitResponse{ + Kind: &initproto.InitResponse_InitFailure{ + InitFailure: &initproto.InitFailureResponse{ + Error: assert.AnError.Error(), + }, + }, + }, + ), + state: newState(clusterEndpoint), + initServerEndpoint: clusterEndpoint, + wantErr: true, + }, + "expected log response, got success": { + server: newInitServer(nil, + &initproto.InitResponse{ + Kind: &initproto.InitResponse_InitFailure{ + InitFailure: &initproto.InitFailureResponse{ + Error: assert.AnError.Error(), + }, + }, + }, + &initproto.InitResponse{ + Kind: &initproto.InitResponse_InitSuccess{ + InitSuccess: &initproto.InitSuccessResponse{ + Kubeconfig: []byte{}, + OwnerId: []byte{}, + ClusterId: []byte{}, + }, + }, + }, + ), + state: newState(clusterEndpoint), + initServerEndpoint: clusterEndpoint, + wantErr: true, + }, + "collect logs": { + server: newInitServer(nil, + &initproto.InitResponse{ + Kind: &initproto.InitResponse_InitFailure{ + InitFailure: &initproto.InitFailureResponse{ + Error: assert.AnError.Error(), + }, + }, + }, + &initproto.InitResponse{ + Kind: &initproto.InitResponse_Log{ + Log: &initproto.LogResponseType{ + Log: []byte("some log"), + }, + }, + }, + ), + wantClusterLogs: []byte("some log"), + state: newState(clusterEndpoint), + initServerEndpoint: clusterEndpoint, + wantErr: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + assert := require.New(t) + + netDialer := testdialer.NewBufconnDialer() + stop := setupTestInitServer(netDialer, tc.server, tc.initServerEndpoint) + defer stop() + + a := &Applier{ + log: logger.NewTest(t), + spinner: &nopSpinner{}, + newDialer: func(atls.Validator) *dialer.Dialer { + return dialer.New(nil, nil, netDialer) + }, + } + + clusterLogs := &bytes.Buffer{} + ctx, cancel := context.WithTimeout(context.Background(), time.Second*4) + defer cancel() + _, err := a.Init(ctx, nil, tc.state, clusterLogs, InitPayload{ + MasterSecret: uri.MasterSecret{}, + MeasurementSalt: []byte{}, + K8sVersion: "v1.26.5", + ConformanceMode: false, + }) + if tc.wantErr { + assert.Error(err) + assert.Equal(tc.wantClusterLogs, clusterLogs.Bytes()) + } else { + assert.NoError(err) + } + }) + } +} + +type nopSpinner struct { + io.Writer +} + +func (s *nopSpinner) Start(string, bool) {} +func (s *nopSpinner) Stop() {} +func (s *nopSpinner) Write(p []byte) (n int, err error) { + return s.Writer.Write(p) +} + +func setupTestInitServer(dialer *testdialer.BufconnDialer, server initproto.APIServer, host string) func() { + serverCreds := atlscredentials.New(nil, nil) + initServer := grpc.NewServer(grpc.Creds(serverCreds)) + initproto.RegisterAPIServer(initServer, server) + listener := dialer.GetListener(net.JoinHostPort(host, strconv.Itoa(constants.BootstrapperPort))) + go initServer.Serve(listener) + return initServer.GracefulStop +} + +type stubInitServer struct { + res []*initproto.InitResponse + initErr error + + initproto.UnimplementedAPIServer +} + +func (s *stubInitServer) Init(_ *initproto.InitRequest, stream initproto.API_InitServer) error { + for _, r := range s.res { + _ = stream.Send(r) + } + return s.initErr +}