diff --git a/cache/cache_manifest.go b/cache/cache_manifest.go index 56f7551..a12db97 100644 --- a/cache/cache_manifest.go +++ b/cache/cache_manifest.go @@ -124,6 +124,16 @@ func (c *Cache) GetManifestContent(ctx context.Context, host, image, tagOrBlob s return content, digest, mediaType, nil } +func (c *Cache) DigestManifest(ctx context.Context, host, image, tag string) (string, error) { + manifestLinkPath := manifestTagCachePath(host, image, tag) + + digestContent, err := c.GetContent(ctx, manifestLinkPath) + if err != nil { + return "", fmt.Errorf("get manifest path %s error: %w", manifestLinkPath, err) + } + return string(digestContent), nil +} + func (c *Cache) StatManifest(ctx context.Context, host, image, tagOrBlob string) (bool, error) { var manifestLinkPath string isHash := strings.HasPrefix(tagOrBlob, "sha256:") diff --git a/cmd/crproxy/cluster/gateway/gateway.go b/cmd/crproxy/cluster/gateway/gateway.go index 7088da1..b5b75ab 100644 --- a/cmd/crproxy/cluster/gateway/gateway.go +++ b/cmd/crproxy/cluster/gateway/gateway.go @@ -15,6 +15,7 @@ import ( "github.com/daocloud/crproxy/internal/pki" "github.com/daocloud/crproxy/internal/server" "github.com/daocloud/crproxy/internal/utils" + "github.com/daocloud/crproxy/queue/client" "github.com/daocloud/crproxy/signing" "github.com/daocloud/crproxy/storage" "github.com/daocloud/crproxy/token" @@ -57,6 +58,9 @@ type flagpole struct { RegistryAlias map[string]string Concurrency int + + QueueURL string + QueueToken string } func NewCommand() *cobra.Command { @@ -105,6 +109,10 @@ func NewCommand() *cobra.Command { cmd.Flags().StringToStringVar(&flags.RegistryAlias, "registry-alias", flags.RegistryAlias, "registry alias") cmd.Flags().IntVar(&flags.Concurrency, "concurrency", flags.Concurrency, "Concurrency to source") + + cmd.Flags().StringVar(&flags.QueueToken, "queue-token", flags.QueueToken, "Queue token") + cmd.Flags().StringVar(&flags.QueueURL, "queue-url", flags.QueueURL, "Queue URL") + return cmd } @@ -207,6 +215,11 @@ func runE(ctx context.Context, flags *flagpole) error { }) } + if flags.QueueURL != "" { + queueClient := client.NewMessageClient(http.DefaultClient, flags.QueueURL, flags.QueueToken) + opts = append(opts, gateway.WithQueueClient(queueClient)) + } + tp = transport.NewLogTransport(tp, logger, time.Second) client := &http.Client{ diff --git a/gateway/gateway.go b/gateway/gateway.go index 77cdaa5..aacd6f8 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -15,6 +15,7 @@ import ( "github.com/daocloud/crproxy/cache" "github.com/daocloud/crproxy/internal/queue" "github.com/daocloud/crproxy/internal/utils" + "github.com/daocloud/crproxy/queue/client" "github.com/daocloud/crproxy/token" "github.com/docker/distribution/registry/api/errcode" "github.com/wzshiming/geario" @@ -55,6 +56,8 @@ type Gateway struct { blobsLENoAgent int agent *agent.Agent + + queueClient *client.MessageClient } type Option func(c *Gateway) @@ -137,6 +140,12 @@ func WithConcurrency(concurrency int) Option { } } +func WithQueueClient(queueClient *client.MessageClient) Option { + return func(c *Gateway) { + c.queueClient = queueClient + } +} + func NewGateway(opts ...Option) (*Gateway, error) { c := &Gateway{ logger: slog.Default(), diff --git a/gateway/manifest.go b/gateway/manifest.go index 88107ea..ea2239b 100644 --- a/gateway/manifest.go +++ b/gateway/manifest.go @@ -105,6 +105,27 @@ func (c *Gateway) cacheManifest(info *PathInfo) (int, error) { return 0, errcode.ErrorCodeDenied } + if c.queueClient != nil { + cachedDigest, err := c.cache.DigestManifest(ctx, info.Host, info.Image, info.Manifests) + if err == nil { + if cachedDigest != digest { + msg := fmt.Sprintf("%s/%s:%s", info.Host, info.Image, digest) + _, err := c.queueClient.Create(context.Background(), msg, 0) + if err != nil { + c.logger.Warn("failed add message to queue", "msg", msg, "error", err) + } else { + c.logger.Info("Add message to queue", "msg", msg, "digest", digest) + } + digest = cachedDigest + } + + c.manifestCache.Put(info, cacheValue{ + Digest: digest, + }) + return 0, nil + } + } + err = c.cache.RelinkManifest(ctx, info.Host, info.Image, info.Manifests, digest) if err != nil { c.logger.Warn("failed relink manifest", "url", u.String(), "error", err)