From d8dcf61dd6621fc6a4fd16697126ab5f439accc3 Mon Sep 17 00:00:00 2001 From: Anik Bhattacharjee Date: Thu, 16 Jan 2025 15:37:41 -0500 Subject: [PATCH] Tests --- catalogd/internal/serverutil/serverutil.go | 27 +- .../internal/serverutil/serverutil_test.go | 254 +++++ catalogd/internal/storage/localdir.go | 15 +- catalogd/internal/storage/localdir_test.go | 871 ++++++++++-------- go.mod | 2 +- 5 files changed, 773 insertions(+), 396 deletions(-) create mode 100644 catalogd/internal/serverutil/serverutil_test.go diff --git a/catalogd/internal/serverutil/serverutil.go b/catalogd/internal/serverutil/serverutil.go index daec33057..b87bd1c0a 100644 --- a/catalogd/internal/serverutil/serverutil.go +++ b/catalogd/internal/serverutil/serverutil.go @@ -1,7 +1,6 @@ package serverutil import ( - "context" "crypto/tls" "fmt" "io" @@ -11,9 +10,9 @@ import ( "github.com/go-logr/logr" "github.com/gorilla/handlers" + "github.com/klauspost/compress/gzhttp" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/certwatcher" - "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" catalogdmetrics "github.com/operator-framework/operator-controller/catalogd/internal/metrics" @@ -44,20 +43,12 @@ func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFil } shutdownTimeout := 30 * time.Second - - l := mgr.GetLogger().WithName("catalogd-http-server") - handler := catalogdmetrics.AddMetricsToHandler(cfg.LocalStorage.StorageServerHandler()) - handler = logrLoggingHandler(l, handler) - catalogServer := manager.Server{ Name: "catalogs", OnlyServeWhenLeader: true, Server: &http.Server{ - Addr: cfg.CatalogAddr, - Handler: handler, - BaseContext: func(_ net.Listener) context.Context { - return log.IntoContext(context.Background(), mgr.GetLogger().WithName("http.catalogs")) - }, + Addr: cfg.CatalogAddr, + Handler: storageServerHandlerWrapped(mgr, cfg), ReadTimeout: 5 * time.Second, // TODO: Revert this to 10 seconds if/when the API // evolves to have significantly smaller responses @@ -102,3 +93,15 @@ func logrLoggingHandler(l logr.Logger, handler http.Handler) http.Handler { l.Info("handled request", "host", host, "username", username, "method", params.Request.Method, "uri", uri, "protocol", params.Request.Proto, "status", params.StatusCode, "size", params.Size) }) } + +func storageServerHandlerWrapped(mgr ctrl.Manager, cfg CatalogServerConfig) http.Handler { + + handler := cfg.LocalStorage.StorageServerHandler() + handler = gzhttp.GzipHandler(handler) + handler = catalogdmetrics.AddMetricsToHandler(handler) + + l := mgr.GetLogger().WithName("catalogd-http-server") + handler = logrLoggingHandler(l, handler) + return handler + +} diff --git a/catalogd/internal/serverutil/serverutil_test.go b/catalogd/internal/serverutil/serverutil_test.go new file mode 100644 index 000000000..89938d82a --- /dev/null +++ b/catalogd/internal/serverutil/serverutil_test.go @@ -0,0 +1,254 @@ +package serverutil + +import ( + "compress/gzip" + "context" + "io" + "io/fs" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + ctrl "sigs.k8s.io/controller-runtime" +) + +func TestStorageServerHandlerWrapped_Gzip(t *testing.T) { + tests := []struct { + name string + acceptEncoding string + responseContent string + expectCompressed bool + expectedStatus int + }{ + { + name: "compresses large response when client accepts gzip", + acceptEncoding: "gzip", + responseContent: testCompressableJSON, + expectCompressed: true, + expectedStatus: http.StatusOK, + }, + { + name: "does not compress small response even when client accepts gzip", + acceptEncoding: "gzip", + responseContent: `{"foo":"bar"}`, + expectCompressed: false, + expectedStatus: http.StatusOK, + }, + { + name: "does not compress when client doesn't accept gzip", + acceptEncoding: "", + responseContent: testCompressableJSON, + expectCompressed: false, + expectedStatus: http.StatusOK, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a mock storage instance that returns our test content + mockStorage := &mockStorageInstance{ + content: tt.responseContent, + } + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{}) + require.NoError(t, err) + + cfg := CatalogServerConfig{ + LocalStorage: mockStorage, + } + handler := storageServerHandlerWrapped(mgr, cfg) + + // Create test request + req := httptest.NewRequest("GET", "/test", nil) + if tt.acceptEncoding != "" { + req.Header.Set("Accept-Encoding", tt.acceptEncoding) + } + + // Create response recorder + rec := httptest.NewRecorder() + + // Handle the request + handler.ServeHTTP(rec, req) + + // Check status code + require.Equal(t, tt.expectedStatus, rec.Code) + + // Check if response was compressed + wasCompressed := rec.Header().Get("Content-Encoding") == "gzip" + require.Equal(t, tt.expectCompressed, wasCompressed) + + // Get the response body + var responseBody []byte + if wasCompressed { + // Decompress the response + gzipReader, err := gzip.NewReader(rec.Body) + require.NoError(t, err) + responseBody, err = io.ReadAll(gzipReader) + require.NoError(t, err) + require.NoError(t, gzipReader.Close()) + } else { + responseBody = rec.Body.Bytes() + } + + // Verify the response content + require.Equal(t, tt.responseContent, string(responseBody)) + }) + } +} + +const testCompressableJSON = `{ + "defaultChannel": "stable-v6.x", + "name": "cockroachdb", + "schema": "olm.package" + } + { + "entries": [ + { + "name": "cockroachdb.v5.0.3" + }, + { + "name": "cockroachdb.v5.0.4", + "replaces": "cockroachdb.v5.0.3" + } + ], + "name": "stable-5.x", + "package": "cockroachdb", + "schema": "olm.channel" + } + { + "entries": [ + { + "name": "cockroachdb.v6.0.0", + "skipRange": "<6.0.0" + } + ], + "name": "stable-v6.x", + "package": "cockroachdb", + "schema": "olm.channel" + } + { + "image": "quay.io/openshift-community-operators/cockroachdb@sha256:a5d4f4467250074216eb1ba1c36e06a3ab797d81c431427fc2aca97ecaf4e9d8", + "name": "cockroachdb.v5.0.3", + "package": "cockroachdb", + "properties": [ + { + "type": "olm.gvk", + "value": { + "group": "charts.operatorhub.io", + "kind": "Cockroachdb", + "version": "v1alpha1" + } + }, + { + "type": "olm.package", + "value": { + "packageName": "cockroachdb", + "version": "5.0.3" + } + } + ], + "relatedImages": [ + { + "name": "", + "image": "quay.io/helmoperators/cockroachdb:v5.0.3" + }, + { + "name": "", + "image": "quay.io/openshift-community-operators/cockroachdb@sha256:a5d4f4467250074216eb1ba1c36e06a3ab797d81c431427fc2aca97ecaf4e9d8" + } + ], + "schema": "olm.bundle" + } + { + "image": "quay.io/openshift-community-operators/cockroachdb@sha256:f42337e7b85a46d83c94694638e2312e10ca16a03542399a65ba783c94a32b63", + "name": "cockroachdb.v5.0.4", + "package": "cockroachdb", + "properties": [ + { + "type": "olm.gvk", + "value": { + "group": "charts.operatorhub.io", + "kind": "Cockroachdb", + "version": "v1alpha1" + } + }, + { + "type": "olm.package", + "value": { + "packageName": "cockroachdb", + "version": "5.0.4" + } + } + ], + "relatedImages": [ + { + "name": "", + "image": "quay.io/helmoperators/cockroachdb:v5.0.4" + }, + { + "name": "", + "image": "quay.io/openshift-community-operators/cockroachdb@sha256:f42337e7b85a46d83c94694638e2312e10ca16a03542399a65ba783c94a32b63" + } + ], + "schema": "olm.bundle" + } + { + "image": "quay.io/openshift-community-operators/cockroachdb@sha256:d3016b1507515fc7712f9c47fd9082baf9ccb070aaab58ed0ef6e5abdedde8ba", + "name": "cockroachdb.v6.0.0", + "package": "cockroachdb", + "properties": [ + { + "type": "olm.gvk", + "value": { + "group": "charts.operatorhub.io", + "kind": "Cockroachdb", + "version": "v1alpha1" + } + }, + { + "type": "olm.package", + "value": { + "packageName": "cockroachdb", + "version": "6.0.0" + } + } + ], + "relatedImages": [ + { + "name": "", + "image": "quay.io/cockroachdb/cockroach-helm-operator:6.0.0" + }, + { + "name": "", + "image": "quay.io/openshift-community-operators/cockroachdb@sha256:d3016b1507515fc7712f9c47fd9082baf9ccb070aaab58ed0ef6e5abdedde8ba" + } + ], + "schema": "olm.bundle" + } + ` + +// mockStorageInstance implements storage.Instance interface for testing +type mockStorageInstance struct { + content string +} + +func (m *mockStorageInstance) StorageServerHandler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(m.content)) + }) +} + +func (m *mockStorageInstance) Store(ctx context.Context, catalogName string, fs fs.FS) error { + return nil +} + +func (m *mockStorageInstance) Delete(catalogName string) error { + return nil +} + +func (m *mockStorageInstance) ContentExists(catalog string) bool { + return true +} +func (m *mockStorageInstance) BaseURL(catalog string) string { + return "" +} diff --git a/catalogd/internal/storage/localdir.go b/catalogd/internal/storage/localdir.go index db642e8f5..07bc13faf 100644 --- a/catalogd/internal/storage/localdir.go +++ b/catalogd/internal/storage/localdir.go @@ -34,6 +34,11 @@ type LocalDirV1 struct { sf singleflight.Group } +var ( + _ Instance = &LocalDirV1{} + ErrInvalidParams = errors.New("invalid parameters") +) + func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) error { s.m.Lock() defer s.m.Unlock() @@ -56,9 +61,13 @@ func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) erro eg, egCtx = errgroup.WithContext(ctx) metaChans []chan *declcfg.Meta ) - for i, f := range storeMetaFuncs { + for range storeMetaFuncs { metaChans = append(metaChans, make(chan *declcfg.Meta, 1)) - eg.Go(func() error { return f(tmpCatalogDir, metaChans[i]) }) + } + for i, f := range storeMetaFuncs { + eg.Go(func() error { + return f(tmpCatalogDir, metaChans[i]) + }) } err = declcfg.WalkMetasFS(egCtx, fsys, func(path string, meta *declcfg.Meta, err error) error { if err != nil { @@ -249,6 +258,8 @@ func httpError(w http.ResponseWriter, err error) { code = http.StatusNotFound case errors.Is(err, fs.ErrPermission): code = http.StatusForbidden + case errors.Is(err, ErrInvalidParams): + code = http.StatusBadRequest default: code = http.StatusInternalServerError } diff --git a/catalogd/internal/storage/localdir_test.go b/catalogd/internal/storage/localdir_test.go index 0e84d3adc..663207490 100644 --- a/catalogd/internal/storage/localdir_test.go +++ b/catalogd/internal/storage/localdir_test.go @@ -1,10 +1,7 @@ package storage import ( - "bytes" - "compress/gzip" "context" - "encoding/json" "errors" "fmt" "io" @@ -15,219 +12,494 @@ import ( "os" "path/filepath" "strings" + "sync" + "testing" "testing/fstest" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "github.com/google/go-cmp/cmp" - "sigs.k8s.io/yaml" - - "github.com/operator-framework/operator-registry/alpha/declcfg" + "github.com/stretchr/testify/require" ) const urlPrefix = "/catalogs/" -var ctx = context.Background() - -var _ = Describe("LocalDir Storage Test", func() { - var ( - catalog = "test-catalog" - store Instance - rootDir string - baseURL *url.URL - testBundleName = "bundle.v0.0.1" - testBundleImage = "quaydock.io/namespace/bundle:0.0.3" - testBundleRelatedImageName = "test" - testBundleRelatedImageImage = "testimage:latest" - testBundleObjectData = "dW5pbXBvcnRhbnQK" - testPackageDefaultChannel = "preview_test" - testPackageName = "webhook_operator_test" - testChannelName = "preview_test" - testPackage = fmt.Sprintf(testPackageTemplate, testPackageDefaultChannel, testPackageName) - testBundle = fmt.Sprintf(testBundleTemplate, testBundleImage, testBundleName, testPackageName, testBundleRelatedImageName, testBundleRelatedImageImage, testBundleObjectData) - testChannel = fmt.Sprintf(testChannelTemplate, testPackageName, testChannelName, testBundleName) - - unpackResultFS fs.FS - ) - BeforeEach(func() { - d, err := os.MkdirTemp(GinkgoT().TempDir(), "cache") - Expect(err).ToNot(HaveOccurred()) - rootDir = d - - baseURL = &url.URL{Scheme: "http", Host: "test-addr", Path: urlPrefix} - store = &LocalDirV1{RootDir: rootDir, RootURL: baseURL} - unpackResultFS = &fstest.MapFS{ - "bundle.yaml": &fstest.MapFile{Data: []byte(testBundle), Mode: os.ModePerm}, - "package.yaml": &fstest.MapFile{Data: []byte(testPackage), Mode: os.ModePerm}, - "channel.yaml": &fstest.MapFile{Data: []byte(testChannel), Mode: os.ModePerm}, - } - }) - When("An unpacked FBC is stored using LocalDir", func() { - BeforeEach(func() { - err := store.Store(context.Background(), catalog, unpackResultFS) - Expect(err).To(Not(HaveOccurred())) - }) - It("should store the content in the RootDir correctly", func() { - fbcFile := filepath.Join(rootDir, fmt.Sprintf("%s.jsonl", catalog)) - _, err := os.Stat(fbcFile) - Expect(err).To(Not(HaveOccurred())) - - gotConfig, err := declcfg.LoadFS(ctx, unpackResultFS) - Expect(err).To(Not(HaveOccurred())) - storedConfig, err := declcfg.LoadFile(os.DirFS(filepath.Dir(fbcFile)), filepath.Base(fbcFile)) - Expect(err).To(Not(HaveOccurred())) - diff := cmp.Diff(gotConfig, storedConfig) - Expect(diff).To(Equal("")) - }) - It("should form the content URL correctly", func() { - Expect(store.BaseURL(catalog)).To(Equal(baseURL.JoinPath(catalog).String())) +func TestLocalDirStoraget(t *testing.T) { + tests := []struct { + name string + setup func(*testing.T) (*LocalDirV1, fs.FS) + test func(*testing.T, *LocalDirV1, fs.FS) + cleanup func(*testing.T, *LocalDirV1) + }{ + { + name: "store and retrieve catalog content", + setup: func(t *testing.T) (*LocalDirV1, fs.FS) { + s := &LocalDirV1{ + RootDir: t.TempDir(), + RootURL: &url.URL{Scheme: "http", Host: "test-addr", Path: urlPrefix}, + } + return s, createTestFS(t) + }, + test: func(t *testing.T, s *LocalDirV1, fsys fs.FS) { + const catalog = "test-catalog" + + // Initially content should not exist + if s.ContentExists(catalog) { + t.Fatal("content should not exist before store") + } + + // Store the content + if err := s.Store(context.Background(), catalog, fsys); err != nil { + t.Fatal(err) + } + + // Verify content exists after store + if !s.ContentExists(catalog) { + t.Fatal("content should exist after store") + } + + // Delete the content + if err := s.Delete(catalog); err != nil { + t.Fatal(err) + } + + // Verify content no longer exists + if s.ContentExists(catalog) { + t.Fatal("content should not exist after delete") + } + }, + }, + { + name: "storing with query handler enabled should create indexes", + setup: func(t *testing.T) (*LocalDirV1, fs.FS) { + s := &LocalDirV1{ + RootDir: t.TempDir(), + EnableQueryHandler: true, + } + return s, createTestFS(t) + }, + test: func(t *testing.T, s *LocalDirV1, fsys fs.FS) { + err := s.Store(context.Background(), "test-catalog", fsys) + if err != nil { + t.Fatal(err) + } + + if !s.ContentExists("test-catalog") { + t.Error("content should exist after store") + } + + // Verify index file was created + indexPath := filepath.Join(s.RootDir, "test-catalog", "index.json") + if _, err := os.Stat(indexPath); err != nil { + t.Errorf("index file should exist: %v", err) + } + }, + }, + { + name: "concurrent reads during write should not cause data race", + setup: func(t *testing.T) (*LocalDirV1, fs.FS) { + dir := t.TempDir() + s := &LocalDirV1{RootDir: dir} + return s, createTestFS(t) + }, + test: func(t *testing.T, s *LocalDirV1, fsys fs.FS) { + const catalog = "test-catalog" + var wg sync.WaitGroup + + // Start multiple concurrent readers + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Add(-1) + for j := 0; j < 100; j++ { + s.ContentExists(catalog) + } + }() + } + + // Write while readers are active + err := s.Store(context.Background(), catalog, fsys) + if err != nil { + t.Fatal(err) + } + + wg.Wait() + }, + }, + { + name: "delete nonexistent catalog", + setup: func(t *testing.T) (*LocalDirV1, fs.FS) { + return &LocalDirV1{RootDir: t.TempDir()}, nil + }, + test: func(t *testing.T, s *LocalDirV1, _ fs.FS) { + err := s.Delete("nonexistent") + if err != nil { + t.Errorf("expected no error deleting nonexistent catalog, got: %v", err) + } + }, + }, + { + name: "store with invalid permissions", + setup: func(t *testing.T) (*LocalDirV1, fs.FS) { + dir := t.TempDir() + // Set directory permissions to deny access + if err := os.Chmod(dir, 0000); err != nil { + t.Fatal(err) + } + return &LocalDirV1{RootDir: dir}, createTestFS(t) + }, + test: func(t *testing.T, s *LocalDirV1, fsys fs.FS) { + err := s.Store(context.Background(), "test-catalog", fsys) + if !errors.Is(err, fs.ErrPermission) { + t.Errorf("expected permission error, got: %v", err) + } + }, + cleanup: func(t *testing.T, s *LocalDirV1) { + // Restore permissions so cleanup can succeed + require.NoError(t, os.Chmod(s.RootDir, 0700)) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, fsys := tt.setup(t) + tt.test(t, s, fsys) + if tt.cleanup != nil { + tt.cleanup(t, s) + } }) - It("should report content exists", func() { - Expect(store.ContentExists(catalog)).To(BeTrue()) + } +} + +func TestLocalDirServerHandler(t *testing.T) { + store := &LocalDirV1{RootDir: t.TempDir(), RootURL: &url.URL{Path: urlPrefix}} + testFS := fstest.MapFS{ + "meta.json": &fstest.MapFile{ + Data: []byte(`{"foo":"bar"}`), + }, + } + if store.Store(context.Background(), "test-catalog", testFS) != nil { + t.Fatal("failed to store test catalog and start server") + } + testServer := httptest.NewServer(store.StorageServerHandler()) + defer testServer.Close() + + for _, tc := range []struct { + name string + URLPath string + expectedStatusCode int + expectedContent string + }{ + { + name: "Server returns 404 when root URL is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "", + }, + { + name: "Server returns 404 when path '/' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/", + }, + { + name: "Server returns 404 when path '/catalogs/' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/", + }, + { + name: "Server returns 404 when path '/catalogs//' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/test-catalog/", + }, + { + name: "Server returns 404 when path '/catalogs//api/' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/test-catalog/api/", + }, + { + name: "Serer return 404 when path '/catalogs//api/v1' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/test-catalog/api/v1c", + }, + { + name: "Server return 404 when path '/catalogs//non-existent.txt' is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/test-catalog/non-existent.txt", + }, + { + name: "Server returns 404 when path '/catalogs/.jsonl' is queried even if the file exists, since we don't serve the filesystem, and serve an API instead", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 page not found", + URLPath: "/catalogs/test-catalog.jsonl", + }, + { + name: "Server returns 200 when path '/catalogs//api/v1/all' is queried, when catalog exists", + expectedStatusCode: http.StatusOK, + expectedContent: `{"foo":"bar"}`, + URLPath: "/catalogs/test-catalog/api/v1/all", + }, + { + name: "Server returns 404 when non-existent catalog is queried", + expectedStatusCode: http.StatusNotFound, + expectedContent: "404 Not Found", + URLPath: "/catalogs/non-existent-catalog/api/v1/all", + }, + } { + t.Run(tc.name, func(t *testing.T) { + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", testServer.URL, tc.URLPath), nil) + require.NoError(t, err) + req.Header.Set("Accept-Encoding", "gzip") + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + + require.Equal(t, tc.expectedStatusCode, resp.StatusCode) + + var actualContent []byte + actualContent, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, tc.expectedContent, strings.TrimSpace(string(actualContent))) + require.NoError(t, resp.Body.Close()) }) - When("The stored content is deleted", func() { - BeforeEach(func() { - err := store.Delete(catalog) - Expect(err).To(Not(HaveOccurred())) - }) - It("should delete the FBC from the cache directory", func() { - fbcFile := filepath.Join(rootDir, fmt.Sprintf("%s.jsonl", catalog)) - _, err := os.Stat(fbcFile) - Expect(err).To(HaveOccurred()) - Expect(os.IsNotExist(err)).To(BeTrue()) - - indexFile := filepath.Join(rootDir, fmt.Sprintf("%s.index.json", catalog)) - _, err = os.Stat(indexFile) - Expect(err).To(HaveOccurred()) - Expect(os.IsNotExist(err)).To(BeTrue()) - }) - It("should report content does not exist", func() { - Expect(store.ContentExists(catalog)).To(BeFalse()) - }) + } +} + +// Tests to verify the behavior of the query endpoint, as described in +// https://docs.google.com/document/d/1s6_9IFEKGQLNh3ueH7SF4Yrx4PW9NSiNFqFIJx0pU-8/edit?usp=sharing +func TestQueryEndpoint(t *testing.T) { + store := &LocalDirV1{ + RootDir: t.TempDir(), + RootURL: &url.URL{Path: urlPrefix}, + EnableQueryHandler: true, + } + if store.Store(context.Background(), "test-catalog", createTestFS(t)) != nil { + t.Fatal("failed to store test catalog") + } + testServer := httptest.NewServer(store.StorageServerHandler()) + + testCases := []struct { + name string + setupStore func() (*httptest.Server, error) + queryParams string + expectedStatusCode int + expectedContent string + }{ + { + name: "valid query with package schema", + queryParams: "?schema=olm.package", + expectedStatusCode: http.StatusOK, + expectedContent: `{"defaultChannel":"preview_test","name":"webhook_operator_test","schema":"olm.package"}`, + }, + { + name: "valid query with schema and name combination", + queryParams: "?schema=olm.package&name=webhook_operator_test", + expectedStatusCode: http.StatusOK, + expectedContent: `{"defaultChannel":"preview_test","name":"webhook_operator_test","schema":"olm.package"}`, + }, + { + name: "valid query with channel schema and package name combination", + queryParams: "?schema=olm.channel&package=webhook_operator_test", + expectedStatusCode: http.StatusOK, + expectedContent: `{"entries":[{"name":"bundle.v0.0.1"}],"name":"preview_test","package":"webhook_operator_test","schema":"olm.channel"}`, + }, + { + name: "query with all meta fields", + queryParams: "?schema=olm.bundle&package=webhook_operator_test&name=bundle.v0.0.1", + expectedStatusCode: http.StatusOK, + expectedContent: `{"image":"quaydock.io/namespace/bundle:0.0.3","name":"bundle.v0.0.1","package":"webhook_operator_test","properties":[{"type":"olm.bundle.object","value":{"data":"dW5pbXBvcnRhbnQK"}},{"type":"some.other","value":{"data":"arbitrary-info"}}],"relatedImages":[{"image":"testimage:latest","name":"test"}],"schema":"olm.bundle"}`, + }, + // { + // name: "valid query for package schema for a package that does not exist", + // queryParams: "?schema=olm.package&name=not-present", + // expectedStatusCode: http.StatusOK, + // expectedContent: "", + // }, + { + name: "valid query with package and name", + queryParams: "?package=webhook_operator_test&name=bundle.v0.0.1", + expectedStatusCode: http.StatusOK, + expectedContent: `{"image":"quaydock.io/namespace/bundle:0.0.3","name":"bundle.v0.0.1","package":"webhook_operator_test","properties":[{"type":"olm.bundle.object","value":{"data":"dW5pbXBvcnRhbnQK"}},{"type":"some.other","value":{"data":"arbitrary-info"}}],"relatedImages":[{"image":"testimage:latest","name":"test"}],"schema":"olm.bundle"}`, + }, + // { + // name: "invalid query with non-existent schema", + // queryParams: "?schema=non_existent_schema", + // expectedStatusCode: http.StatusNotFound, + // expectedContent: "400 Bad Request", + // }, + { + name: "cached response with If-Modified-Since", + queryParams: "?schema=olm.package", + expectedStatusCode: http.StatusNotModified, + expectedContent: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/catalogs/test-catalog/api/v1/query%s", testServer.URL, tc.queryParams), nil) + require.NoError(t, err) + + if strings.Contains(tc.name, "If-Modified-Since") { + // Do an initial request to get a Last-Modified timestamp + // for the actual request + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + req.Header.Set("If-Modified-Since", resp.Header.Get("Last-Modified")) + } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + require.Equal(t, tc.expectedStatusCode, resp.StatusCode) + + actualContent, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, tc.expectedContent, strings.TrimSpace(string(actualContent))) }) - }) -}) - -var _ = Describe("LocalDir Server Handler tests", func() { - var ( - testServer *httptest.Server - store LocalDirV1 - ) - BeforeEach(func() { - d := GinkgoT().TempDir() - store = LocalDirV1{RootDir: d, RootURL: &url.URL{Path: urlPrefix}} - testServer = httptest.NewServer(store.StorageServerHandler()) - - }) - It("gets 404 for the path /", func() { - expectNotFound(testServer.URL) - }) - It("gets 404 for the path /catalogs/", func() { - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/")) - }) - It("gets 404 for the path /catalogs/test-catalog/", func() { - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/")) - }) - It("gets 404 for the path /catalogs/test-catalog/api", func() { - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api")) - }) - It("gets 404 for the path /catalogs/test-catalog/api/v1", func() { - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1")) - }) - It("gets 404 for the path /catalogs/test-catalog.jsonl", func() { - // This is actually how the file is stored, but we don't serve - // the filesystem, we serve an API. Hence, expect 404 not found - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), []byte("foobar"), 0600)).To(Succeed()) - expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog.jsonl")) - }) - It("gets 200 for the path /catalogs/test-catalog/api/v1/all", func() { - expectedContent := []byte(`{"foo":"bar"}`) - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent, false) - }) - It("ignores accept-encoding for the path /catalogs/test-catalog/api/v1/all with size < 1400 bytes", func() { - expectedContent := []byte(`{"foo":"bar"}`) - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent, false) - }) - It("provides gzipped content for the path /catalogs/test-catalog/api/v1/all with size > 1400 bytes", func() { - expectedContent := []byte(testCompressableJSON) - Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog.jsonl"), expectedContent, 0600)).To(Succeed()) - expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/api/v1/all"), expectedContent, true) - }) - It("provides json-lines format for the served JSON catalog", func() { - catalog := "test-catalog" - unpackResultFS := &fstest.MapFS{ - "catalog.json": &fstest.MapFile{Data: []byte(testCompressableJSON), Mode: os.ModePerm}, - } - err := store.Store(context.Background(), catalog, unpackResultFS) - Expect(err).To(Not(HaveOccurred())) - - expectedContent, err := generateJSONLines([]byte(testCompressableJSON)) - Expect(err).To(Not(HaveOccurred())) - path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, "api", "v1", "all") - Expect(err).To(Not(HaveOccurred())) - expectFound(path, []byte(expectedContent), true) - }) - It("provides json-lines format for the served YAML catalog", func() { - catalog := "test-catalog" - yamlData, err := makeYAMLFromConcatenatedJSON([]byte(testCompressableJSON)) - Expect(err).To(Not(HaveOccurred())) - unpackResultFS := &fstest.MapFS{ - "catalog.yaml": &fstest.MapFile{Data: yamlData, Mode: os.ModePerm}, - } - err = store.Store(context.Background(), catalog, unpackResultFS) - Expect(err).To(Not(HaveOccurred())) - - expectedContent, err := generateJSONLines(yamlData) - Expect(err).To(Not(HaveOccurred())) - path, err := url.JoinPath(testServer.URL, urlPrefix, catalog, "api", "v1", "all") - Expect(err).To(Not(HaveOccurred())) - expectFound(path, []byte(expectedContent), true) - }) - AfterEach(func() { - testServer.Close() - }) -}) - -func expectNotFound(url string) { - resp, err := http.Get(url) //nolint:gosec - Expect(err).To(Not(HaveOccurred())) - Expect(resp.StatusCode).To(Equal(http.StatusNotFound)) - Expect(resp.Body.Close()).To(Succeed()) + } } -func expectFound(url string, expectedContent []byte, expectCompression bool) { - req, err := http.NewRequest(http.MethodGet, url, nil) - Expect(err).To(Not(HaveOccurred())) - req.Header.Set("Accept-Encoding", "gzip") - resp, err := http.DefaultClient.Do(req) - Expect(err).To(Not(HaveOccurred())) - Expect(resp.StatusCode).To(Equal(http.StatusOK)) - - var actualContent []byte - if expectCompression { - Expect(resp.Header.Get("Content-Encoding")).To(Equal("gzip")) - Expect(len(expectedContent)).To(BeNumerically(">", 1400), - fmt.Sprintf("gzipped content should only be provided for content larger than 1400 bytes, but our expected content is only %d bytes", len(expectedContent))) - gz, err := gzip.NewReader(resp.Body) - Expect(err).To(Not(HaveOccurred())) - actualContent, err = io.ReadAll(gz) - Expect(err).To(Not(HaveOccurred())) - } else { - Expect(resp.Header.Get("Content-Encoding")).To(BeEmpty()) - actualContent, err = io.ReadAll(resp.Body) - Expect(len(expectedContent)).To(BeNumerically("<", 1400), - fmt.Sprintf("plaintext content should only be provided for content smaller than 1400 bytes, but we received plaintext for %d bytes\n expectedContent:\n%s\n", len(expectedContent), expectedContent)) - Expect(err).To(Not(HaveOccurred())) +func TestServerLoadHandling(t *testing.T) { + store := &LocalDirV1{ + RootDir: t.TempDir(), + RootURL: &url.URL{Path: urlPrefix}, + EnableQueryHandler: true, + } + + // Create large test data + largeFS := fstest.MapFS{} + for i := 0; i < 1000; i++ { + largeFS[fmt.Sprintf("meta_%d.json", i)] = &fstest.MapFile{ + Data: []byte(fmt.Sprintf(`{"schema":"olm.bundle","package":"test-op-%d","name":"test-op.v%d.0"}`, i, i)), + } + } + + if err := store.Store(context.Background(), "test-catalog", largeFS); err != nil { + t.Fatal("failed to store test catalog") } - Expect(actualContent).To(Equal(expectedContent)) - Expect(resp.Body.Close()).To(Succeed()) + testServer := httptest.NewServer(store.StorageServerHandler()) + defer testServer.Close() + + tests := []struct { + name string + concurrent int + requests func(baseURL string) []*http.Request + validateFunc func(t *testing.T, responses []*http.Response, errs []error) + }{ + { + name: "concurrent identical queries", + concurrent: 100, + requests: func(baseURL string) []*http.Request { + var reqs []*http.Request + for i := 0; i < 100; i++ { + req, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("%s/catalogs/test-catalog/api/v1/query?schema=olm.bundle", baseURL), + nil) + req.Header.Set("Accept", "application/jsonl") + reqs = append(reqs, req) + } + return reqs + }, + validateFunc: func(t *testing.T, responses []*http.Response, errs []error) { + for _, err := range errs { + require.NoError(t, err) + } + for _, resp := range responses { + require.Equal(t, http.StatusOK, resp.StatusCode) + require.Equal(t, resp.Header.Get("Content-Type"), "application/jsonl") + resp.Body.Close() + } + }, + }, + { + name: "concurrent different queries", + concurrent: 50, + requests: func(baseURL string) []*http.Request { + var reqs []*http.Request + for i := 0; i < 50; i++ { + req, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("%s/catalogs/test-catalog/api/v1/query?package=test-op-%d", baseURL, i), + nil) + req.Header.Set("Accept", "application/jsonl") + reqs = append(reqs, req) + } + return reqs + }, + validateFunc: func(t *testing.T, responses []*http.Response, errs []error) { + for _, err := range errs { + require.NoError(t, err) + } + for _, resp := range responses { + require.Equal(t, http.StatusOK, resp.StatusCode) + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Contains(t, string(body), "test-op-") + resp.Body.Close() + } + }, + }, + { + name: "mixed all and query endpoints", + concurrent: 40, + requests: func(baseURL string) []*http.Request { + var reqs []*http.Request + for i := 0; i < 20; i++ { + allReq, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("%s/catalogs/test-catalog/api/v1/all", baseURL), + nil) + queryReq, _ := http.NewRequest(http.MethodGet, + fmt.Sprintf("%s/catalogs/test-catalog/api/v1/query?schema=olm.bundle", baseURL), + nil) + allReq.Header.Set("Accept", "application/jsonl") + queryReq.Header.Set("Accept", "application/jsonl") + reqs = append(reqs, allReq, queryReq) + } + return reqs + }, + validateFunc: func(t *testing.T, responses []*http.Response, errs []error) { + for _, err := range errs { + require.NoError(t, err) + } + for _, resp := range responses { + require.Equal(t, http.StatusOK, resp.StatusCode) + resp.Body.Close() + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var ( + wg sync.WaitGroup + responses = make([]*http.Response, tt.concurrent) + errs = make([]error, tt.concurrent) + ) + + requests := tt.requests(testServer.URL) + for i := 0; i < tt.concurrent; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + resp, err := http.DefaultClient.Do(requests[idx]) + responses[idx] = resp + errs[idx] = err + }(i) + } + + wg.Wait() + tt.validateFunc(t, responses, errs) + }) + } } -const testBundleTemplate = `--- +func createTestFS(t *testing.T) fs.FS { + t.Helper() + testBundleTemplate := `--- image: %s name: %s schema: olm.bundle @@ -244,197 +516,34 @@ properties: data: arbitrary-info ` -const testPackageTemplate = `--- + testPackageTemplate := `--- defaultChannel: %s name: %s schema: olm.package ` -const testChannelTemplate = `--- + testChannelTemplate := `--- schema: olm.channel package: %s name: %s entries: - name: %s ` + testBundleName := "bundle.v0.0.1" + testBundleImage := "quaydock.io/namespace/bundle:0.0.3" + testBundleRelatedImageName := "test" + testBundleRelatedImageImage := "testimage:latest" + testBundleObjectData := "dW5pbXBvcnRhbnQK" + testPackageDefaultChannel := "preview_test" + testPackageName := "webhook_operator_test" + testChannelName := "preview_test" -// by default the compressor will only trigger for files larger than 1400 bytes -const testCompressableJSON = `{ - "defaultChannel": "stable-v6.x", - "name": "cockroachdb", - "schema": "olm.package" -} -{ - "entries": [ - { - "name": "cockroachdb.v5.0.3" - }, - { - "name": "cockroachdb.v5.0.4", - "replaces": "cockroachdb.v5.0.3" - } - ], - "name": "stable-5.x", - "package": "cockroachdb", - "schema": "olm.channel" -} -{ - "entries": [ - { - "name": "cockroachdb.v6.0.0", - "skipRange": "<6.0.0" - } - ], - "name": "stable-v6.x", - "package": "cockroachdb", - "schema": "olm.channel" -} -{ - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:a5d4f4467250074216eb1ba1c36e06a3ab797d81c431427fc2aca97ecaf4e9d8", - "name": "cockroachdb.v5.0.3", - "package": "cockroachdb", - "properties": [ - { - "type": "olm.gvk", - "value": { - "group": "charts.operatorhub.io", - "kind": "Cockroachdb", - "version": "v1alpha1" - } - }, - { - "type": "olm.package", - "value": { - "packageName": "cockroachdb", - "version": "5.0.3" - } - } - ], - "relatedImages": [ - { - "name": "", - "image": "quay.io/helmoperators/cockroachdb:v5.0.3" - }, - { - "name": "", - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:a5d4f4467250074216eb1ba1c36e06a3ab797d81c431427fc2aca97ecaf4e9d8" - } - ], - "schema": "olm.bundle" -} -{ - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:f42337e7b85a46d83c94694638e2312e10ca16a03542399a65ba783c94a32b63", - "name": "cockroachdb.v5.0.4", - "package": "cockroachdb", - "properties": [ - { - "type": "olm.gvk", - "value": { - "group": "charts.operatorhub.io", - "kind": "Cockroachdb", - "version": "v1alpha1" - } - }, - { - "type": "olm.package", - "value": { - "packageName": "cockroachdb", - "version": "5.0.4" - } - } - ], - "relatedImages": [ - { - "name": "", - "image": "quay.io/helmoperators/cockroachdb:v5.0.4" - }, - { - "name": "", - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:f42337e7b85a46d83c94694638e2312e10ca16a03542399a65ba783c94a32b63" - } - ], - "schema": "olm.bundle" -} -{ - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:d3016b1507515fc7712f9c47fd9082baf9ccb070aaab58ed0ef6e5abdedde8ba", - "name": "cockroachdb.v6.0.0", - "package": "cockroachdb", - "properties": [ - { - "type": "olm.gvk", - "value": { - "group": "charts.operatorhub.io", - "kind": "Cockroachdb", - "version": "v1alpha1" - } - }, - { - "type": "olm.package", - "value": { - "packageName": "cockroachdb", - "version": "6.0.0" - } - } - ], - "relatedImages": [ - { - "name": "", - "image": "quay.io/cockroachdb/cockroach-helm-operator:6.0.0" - }, - { - "name": "", - "image": "quay.io/openshift-community-operators/cockroachdb@sha256:d3016b1507515fc7712f9c47fd9082baf9ccb070aaab58ed0ef6e5abdedde8ba" - } - ], - "schema": "olm.bundle" -} -` - -// makeYAMLFromConcatenatedJSON takes a byte slice of concatenated JSON objects and returns a byte slice of concatenated YAML objects. -func makeYAMLFromConcatenatedJSON(data []byte) ([]byte, error) { - var msg json.RawMessage - var delimiter = []byte("---\n") - var yamlData []byte - - yamlData = append(yamlData, delimiter...) - - dec := json.NewDecoder(bytes.NewReader(data)) - for { - err := dec.Decode(&msg) - if errors.Is(err, io.EOF) { - break - } - y, err := yaml.JSONToYAML(msg) - if err != nil { - return []byte{}, err - } - yamlData = append(yamlData, delimiter...) - yamlData = append(yamlData, y...) + testPackage := fmt.Sprintf(testPackageTemplate, testPackageDefaultChannel, testPackageName) + testBundle := fmt.Sprintf(testBundleTemplate, testBundleImage, testBundleName, testPackageName, testBundleRelatedImageName, testBundleRelatedImageImage, testBundleObjectData) + testChannel := fmt.Sprintf(testChannelTemplate, testPackageName, testChannelName, testBundleName) + return &fstest.MapFS{ + "bundle.yaml": {Data: []byte(testBundle), Mode: os.ModePerm}, + "package.yaml": {Data: []byte(testPackage), Mode: os.ModePerm}, + "channel.yaml": {Data: []byte(testChannel), Mode: os.ModePerm}, } - return yamlData, nil -} - -// generateJSONLines takes a byte slice of concatenated JSON objects and returns a JSONlines-formatted string. -func generateJSONLines(in []byte) (string, error) { - var out strings.Builder - reader := bytes.NewReader(in) - - err := declcfg.WalkMetasReader(reader, func(meta *declcfg.Meta, err error) error { - if err != nil { - return err - } - - if meta != nil && meta.Blob != nil { - if meta.Blob[len(meta.Blob)-1] != '\n' { - return fmt.Errorf("blob does not end with newline") - } - } - - _, err = out.Write(meta.Blob) - if err != nil { - return err - } - return nil - }) - return out.String(), err } diff --git a/go.mod b/go.mod index 0fab3290f..8d2f731a0 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.10.0 golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c + golang.org/x/sync v0.10.0 gopkg.in/yaml.v2 v2.4.0 helm.sh/helm/v3 v3.17.0 k8s.io/api v0.32.0 @@ -227,7 +228,6 @@ require ( golang.org/x/crypto v0.32.0 // indirect golang.org/x/net v0.34.0 // indirect golang.org/x/oauth2 v0.25.0 // indirect - golang.org/x/sync v0.10.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/term v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect