Skip to content

Commit

Permalink
Support blob to queue
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Feb 5, 2025
1 parent fad00b5 commit ef7a794
Show file tree
Hide file tree
Showing 10 changed files with 387 additions and 24 deletions.
72 changes: 72 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/daocloud/crproxy/cache"
"github.com/daocloud/crproxy/internal/queue"
"github.com/daocloud/crproxy/internal/utils"
"github.com/daocloud/crproxy/queue/client"
"github.com/daocloud/crproxy/queue/model"
"github.com/daocloud/crproxy/token"
"github.com/docker/distribution/registry/api/errcode"
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
Expand Down Expand Up @@ -53,6 +55,8 @@ type Agent struct {
authenticator *token.Authenticator

blobsLENoAgent int

queueClient *client.MessageClient
}

type Option func(c *Agent) error
Expand Down Expand Up @@ -112,6 +116,13 @@ func WithConcurrency(concurrency int) Option {
}
}

func WithQueueClient(queueClient *client.MessageClient) Option {
return func(c *Agent) error {
c.queueClient = queueClient
return nil
}
}

func NewAgent(opts ...Option) (*Agent, error) {
c := &Agent{
logger: slog.Default(),
Expand Down Expand Up @@ -182,6 +193,15 @@ func (c *Agent) worker(ctx context.Context) {
if !ok {
return
}

if c.queueClient != nil {
_, err := c.waitingQueue(ctx, info.Blobs, weight, &info)
if err == nil {
finish()
continue
}
c.logger.Warn("waitingQueue error", "info", info, "error", err)
}
size, continueFunc, sc, err := c.cacheBlob(&info)
if err != nil {
c.logger.Warn("failed download file request", "info", info, "error", err)
Expand Down Expand Up @@ -489,3 +509,55 @@ func (c *Agent) serveCachedBlob(rw http.ResponseWriter, r *http.Request, blob st
c.logger.Info("Cache hit", "digest", blob, "url", u)
http.Redirect(rw, r, u, http.StatusTemporaryRedirect)
}

func (c *Agent) waitingQueue(ctx context.Context, msg string, weight int, info *BlobInfo) (client.MessageResponse, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

mr, err := c.queueClient.Create(ctx, msg, weight+1, model.MessageAttr{
Host: info.Host,
Image: info.Image,
})
if err != nil {
return client.MessageResponse{}, fmt.Errorf("failed to create queue: %w", err)
}

if mr.Status == model.StatusPending {
c.logger.Info("watching message from queue", "msg", msg)

chMr, err := c.queueClient.Watch(ctx, mr.MessageID)
if err != nil {
return client.MessageResponse{}, fmt.Errorf("failed to watch message: %w", err)
}
watiQueue:
for {
select {
case <-ctx.Done():
return client.MessageResponse{}, ctx.Err()
case m, ok := <-chMr:
if !ok {
if mr.Status != model.StatusPending && mr.Status != model.StatusProcessing {
break watiQueue
}

time.Sleep(1 * time.Second)
chMr, err = c.queueClient.Watch(ctx, mr.MessageID)
if err != nil {
return client.MessageResponse{}, fmt.Errorf("failed to re-watch message: %w", err)
}
} else {
mr = m
}
}
}
}

switch mr.Status {
case model.StatusCompleted:
return mr, nil
case model.StatusFailed:
return client.MessageResponse{}, fmt.Errorf("%q Queue Error: %s", msg, mr.Data.Error)
default:
return client.MessageResponse{}, fmt.Errorf("unexpected status %q for message %q", mr.Status, msg)
}
}
9 changes: 9 additions & 0 deletions cmd/crproxy/cluster/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/daocloud/crproxy/cache"
"github.com/daocloud/crproxy/internal/pki"
"github.com/daocloud/crproxy/internal/server"
"github.com/daocloud/crproxy/queue/client"
"github.com/daocloud/crproxy/signing"
"github.com/daocloud/crproxy/storage"
"github.com/daocloud/crproxy/token"
Expand Down Expand Up @@ -47,6 +48,9 @@ type flagpole struct {
BlobCacheDuration time.Duration

Concurrency int

QueueURL string
QueueToken string
}

func NewCommand() *cobra.Command {
Expand Down Expand Up @@ -132,6 +136,11 @@ func runE(ctx context.Context, flags *flagpole) error {
agent.WithConcurrency(flags.Concurrency),
)

if flags.QueueURL != "" {
queueClient := client.NewMessageClient(http.DefaultClient, flags.QueueURL, flags.QueueToken)
opts = append(opts, agent.WithQueueClient(queueClient))
}

if flags.TokenPublicKeyFile != "" {
publicKeyData, err := os.ReadFile(flags.TokenPublicKeyFile)
if err != nil {
Expand Down
54 changes: 47 additions & 7 deletions cmd/crproxy/cluster/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@ import (
"github.com/daocloud/crproxy/transport"
"github.com/docker/distribution/manifest/manifestlist"
"github.com/spf13/cobra"
"github.com/wzshiming/httpseek"
)

type flagpole struct {
QueueURL string

AdminToken string

StorageURL []string
Deep bool
Quick bool
Platform []string
Userpass []string
StorageURL []string
Deep bool
Quick bool
Platform []string
Userpass []string
Retry int
RetryInterval time.Duration

Lease string

Expand Down Expand Up @@ -61,7 +64,8 @@ func NewCommand() *cobra.Command {
cmd.Flags().BoolVar(&flags.Quick, "quick", flags.Quick, "Quick sync with tags")
cmd.Flags().StringSliceVar(&flags.Platform, "platform", flags.Platform, "Platform")
cmd.Flags().StringArrayVarP(&flags.Userpass, "user", "u", flags.Userpass, "host and username and password -u user:pwd@host")

cmd.Flags().IntVar(&flags.Retry, "retry", flags.Retry, "Retry")
cmd.Flags().DurationVar(&flags.RetryInterval, "retry-interval", flags.RetryInterval, "Retry interval")
cmd.Flags().DurationVar(&flags.Duration, "duration", flags.Duration, "Duration of the runner")
cmd.Flags().StringVar(&flags.Lease, "lease", flags.Lease, "Lease of the runner")

Expand Down Expand Up @@ -125,7 +129,43 @@ func runE(ctx context.Context, flags *flagpole) error {
lease = fmt.Sprintf("%s-%d", flags.Lease, time.Now().Unix())
}

runner, err := runner.NewRunner(http.DefaultClient, lease, flags.QueueURL, flags.AdminToken, sm)
if flags.RetryInterval > 0 {
tp = httpseek.NewMustReaderTransport(tp, func(request *http.Request, retry int, err error) error {
if errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) {
return err
}
if flags.Retry > 0 && retry >= flags.Retry {
return err
}
if logger != nil {
logger.Warn("Retry", "url", request.URL, "retry", retry, "error", err)
}
time.Sleep(flags.RetryInterval)
return nil
})
}

client := &http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
if len(via) > 10 {
return http.ErrUseLastResponse
}
s := make([]string, 0, len(via)+1)
for _, v := range via {
s = append(s, v.URL.String())
}

lastRedirect := req.URL.String()
s = append(s, lastRedirect)
logger.Info("redirect", "redirects", s)

return nil
},
Transport: tp,
}

runner, err := runner.NewRunner(client, caches, lease, flags.QueueURL, flags.AdminToken, sm)
if err != nil {
return err
}
Expand Down
9 changes: 8 additions & 1 deletion gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,20 @@ func NewGateway(opts ...Option) (*Gateway, error) {
}

if c.cache != nil {
a, err := agent.NewAgent(
opts := []agent.Option{
agent.WithClient(c.httpClient),
agent.WithAuthenticator(c.authenticator),
agent.WithLogger(c.logger),
agent.WithCache(c.cache),
agent.WithBlobsLENoAgent(c.blobsLENoAgent),
agent.WithConcurrency(c.concurrency),
}
if c.queueClient != nil {
opts = append(opts, agent.WithQueueClient(c.queueClient))
}

a, err := agent.NewAgent(
opts...,
)
if err != nil {
return nil, fmt.Errorf("failed to create agent: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions gateway/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c *Gateway) waitingQueue(ctx context.Context, msg string, weight int) erro
ctx, cancel := context.WithCancel(ctx)
defer cancel()

mr, err := c.queueClient.Create(ctx, msg, weight+1)
mr, err := c.queueClient.Create(ctx, msg, weight+1, model.MessageAttr{})
if err != nil {
return fmt.Errorf("failed to create queue: %w", err)
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func (c *Gateway) cacheManifest(info *PathInfo, weight int) (int, error) {
if err == nil {
if cachedDigest != digest {
msg := fmt.Sprintf("%s/%s:%s", info.Host, info.Image, info.Manifests)
_, err := c.queueClient.Create(context.Background(), msg, 0)
_, err := c.queueClient.Create(context.Background(), msg, 0, model.MessageAttr{})
if err != nil {
c.logger.Warn("failed add message to queue", "msg", msg, "digest", digest, "error", err)
} else {
Expand Down
6 changes: 4 additions & 2 deletions queue/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
type MessageRequest struct {
Content string `json:"content"`
Priority int `json:"priority"`

Data model.MessageAttr `json:"data,omitempty"`
}

type MessageResponse struct {
Expand Down Expand Up @@ -63,8 +65,8 @@ func NewMessageClient(httpClient *http.Client, baseURL string, adminToken string
}
}

func (c *MessageClient) Create(ctx context.Context, content string, priority int) (MessageResponse, error) {
messageRequest := MessageRequest{Content: content, Priority: priority}
func (c *MessageClient) Create(ctx context.Context, content string, priority int, data model.MessageAttr) (MessageResponse, error) {
messageRequest := MessageRequest{Content: content, Priority: priority, Data: data}
body, err := json.Marshal(messageRequest)
if err != nil {
return MessageResponse{}, err
Expand Down
30 changes: 23 additions & 7 deletions queue/controller/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
type MessageRequest struct {
Content string `json:"content"`
Priority int `json:"priority"`

Data model.MessageAttr `json:"data,omitempty"`
}

type MessageResponse struct {
Expand Down Expand Up @@ -304,6 +306,7 @@ func (mc *MessageController) Create(req *restful.Request, resp *restful.Response
newMessage := model.Message{
Content: messageRequest.Content,
Priority: messageRequest.Priority,
Data: messageRequest.Data,
}
messageID, err := mc.messageService.Create(req.Request.Context(), newMessage)
if err != nil {
Expand All @@ -315,6 +318,7 @@ func (mc *MessageController) Create(req *restful.Request, resp *restful.Response
MessageID: messageID,
Content: messageRequest.Content,
Priority: messageRequest.Priority,
Data: messageRequest.Data,
}

mc.updateWatchListChannels(data)
Expand Down Expand Up @@ -498,14 +502,18 @@ func (mc *MessageController) Heartbeat(req *restful.Request, resp *restful.Respo
return
}

if err := mc.messageService.Heartbeat(req.Request.Context(), messageID, heartbeatRequest.Data, heartbeatRequest.Lease); err != nil {
resp.WriteHeaderAndEntity(http.StatusNotAcceptable, Error{Code: "MessageNotAcceptableError", Message: "Message not found: " + err.Error()})
curr, err := mc.messageService.GetByID(req.Request.Context(), messageID)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusNotFound, Error{Code: "MessageNotFoundError", Message: "Message not found after heartbeat: " + err.Error()})
return
}

curr, err := mc.messageService.GetByID(context.Background(), messageID)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusNotFound, Error{Code: "MessageNotFoundError", Message: "Message not found after heartbeat: " + err.Error()})
curr.Data.Blobs = heartbeatRequest.Data.Blobs
curr.Data.Progress = heartbeatRequest.Data.Progress
curr.Data.Size = heartbeatRequest.Data.Size

if err := mc.messageService.Heartbeat(req.Request.Context(), messageID, curr.Data, heartbeatRequest.Lease); err != nil {
resp.WriteHeaderAndEntity(http.StatusNotAcceptable, Error{Code: "MessageNotAcceptableError", Message: "Message not found: " + err.Error()})
return
}

Expand Down Expand Up @@ -574,12 +582,20 @@ func (mc *MessageController) Failed(req *restful.Request, resp *restful.Response
return
}

if err := mc.messageService.Failed(req.Request.Context(), messageID, failedRequest.Lease, failedRequest.Data); err != nil {
curr, err := mc.messageService.GetByID(context.Background(), messageID)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusNotFound, Error{Code: "MessageNotFoundError", Message: "Message not found after heartbeat: " + err.Error()})
return
}

curr.Data.Error = failedRequest.Data.Error

if err := mc.messageService.Failed(req.Request.Context(), messageID, failedRequest.Lease, curr.Data); err != nil {
resp.WriteHeaderAndEntity(http.StatusNotAcceptable, Error{Code: "MessageNotAcceptableError", Message: "Message not found: " + err.Error()})
return
}

curr, err := mc.messageService.GetByID(context.Background(), messageID)
curr, err = mc.messageService.GetByID(context.Background(), messageID)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusNotFound, Error{Code: "MessageNotFoundError", Message: "Message not found after failure: " + err.Error()})
return
Expand Down
6 changes: 3 additions & 3 deletions queue/dao/messgae.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ INSERT INTO messages (content, lease, priority, status, data) VALUES (?, ?, ?, ?

func (m *Message) Create(ctx context.Context, message model.Message) (int64, error) {
db := GetDB(ctx)
result, err := db.ExecContext(ctx, createMessageSQL, message.Content, message.Lease, message.Priority, model.StatusPending, "{}")
result, err := db.ExecContext(ctx, createMessageSQL, message.Content, message.Lease, message.Priority, model.StatusPending, message.Data)
if err != nil {
return 0, fmt.Errorf("failed to create message: %w", err)
}
Expand Down Expand Up @@ -112,12 +112,12 @@ func (m *Message) UpdatePriorityByID(ctx context.Context, id int64, priority int
}

const deleteMessageByIDSQL = `
UPDATE messages SET delete_at = NOW(), data = ? WHERE id = ? AND delete_at IS NULL
UPDATE messages SET delete_at = NOW() WHERE id = ? AND delete_at IS NULL
`

func (m *Message) DeleteByID(ctx context.Context, id int64) error {
db := GetDB(ctx)
_, err := db.ExecContext(ctx, deleteMessageByIDSQL, model.MessageAttr{}, id)
_, err := db.ExecContext(ctx, deleteMessageByIDSQL, id)
if err != nil {
return fmt.Errorf("failed to delete message: %w", err)
}
Expand Down
7 changes: 7 additions & 0 deletions queue/model/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ type Message struct {

type MessageAttr struct {
Error string `json:"error,omitempty"`

Host string `json:"host,omitempty"`
Image string `json:"image,omitempty"`
Progress int64 `json:"progress,omitempty"`
Size int64 `json:"size,omitempty"`

// Deprecate
Blobs []Blob `json:"blobs,omitempty"`
}

Expand Down
Loading

0 comments on commit ef7a794

Please sign in to comment.