From 72f7c2fe15684acf462c1c18f85e36d0d5f8b1be Mon Sep 17 00:00:00 2001 From: Shiming Zhang Date: Tue, 21 Jan 2025 11:24:01 +0800 Subject: [PATCH] Fix --- gateway/manifest.go | 47 ++++++++++++++++++++----------------- queue/client/client.go | 26 ++++++++++++++++++-- queue/controller/message.go | 15 ++++++------ 3 files changed, 58 insertions(+), 30 deletions(-) diff --git a/gateway/manifest.go b/gateway/manifest.go index d7220d7..e65e377 100644 --- a/gateway/manifest.go +++ b/gateway/manifest.go @@ -43,7 +43,7 @@ func (c *Gateway) cacheManifestResponse(rw http.ResponseWriter, r *http.Request, if done { return } - if fallback { + if fallback && c.recacheMaxWait > 0 { ctx, cancel = context.WithTimeout(ctx, c.recacheMaxWait) defer cancel() } @@ -66,23 +66,29 @@ func (c *Gateway) cacheManifestResponse(rw http.ResponseWriter, r *http.Request, } func (c *Gateway) waitingQueue(ctx context.Context, msg string, weight int) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + mr, err := c.queueClient.Create(ctx, msg, weight+1) if err != nil { return fmt.Errorf("failed to create queue: %w", err) } if mr.Status == model.StatusPending { + c.logger.Info("watching message from queue", "msg", msg) + chMr, err := c.queueClient.Watch(ctx, mr.MessageID) if err != nil { return fmt.Errorf("failed to watch message: %w", err) } - c.logger.Info("Watching message in queue", "msg", msg) watiQueue: for { select { + case <-ctx.Done(): + return ctx.Err() case m, ok := <-chMr: if !ok { - if m.Status != model.StatusPending { + if mr.Status != model.StatusPending && mr.Status != model.StatusProcessing { break watiQueue } @@ -94,9 +100,6 @@ func (c *Gateway) waitingQueue(ctx context.Context, msg string, weight int) erro } else { mr = m } - - case <-ctx.Done(): - return ctx.Err() } } } @@ -156,21 +159,23 @@ func (c *Gateway) cacheManifest(info *PathInfo, weight int) (int, error) { if c.queueClient != nil { cachedDigest, err := c.cache.DigestManifest(ctx, info.Host, info.Image, info.Manifests) if err == nil { - if cachedDigest != digest { - msg := fmt.Sprintf("%s/%s:%s", info.Host, info.Image, info.Manifests) - _, err := c.queueClient.Create(context.Background(), msg, 0) - if err != nil { - c.logger.Warn("failed add message to queue", "msg", msg, "digest", digest, "error", err) - } else { - c.logger.Info("Add message to queue", "msg", msg, "digest", digest) + _, err := c.cache.StatBlob(ctx, cachedDigest) + if err == nil { + if cachedDigest != digest { + msg := fmt.Sprintf("%s/%s:%s", info.Host, info.Image, info.Manifests) + _, err := c.queueClient.Create(context.Background(), msg, 0) + if err != nil { + c.logger.Warn("failed add message to queue", "msg", msg, "digest", digest, "error", err) + } else { + c.logger.Info("Add message to queue", "msg", msg, "digest", digest) + } + digest = cachedDigest } - digest = cachedDigest + c.manifestCache.Put(info, cacheValue{ + Digest: digest, + }) + return 0, nil } - - c.manifestCache.Put(info, cacheValue{ - Digest: digest, - }) - return 0, nil } } @@ -285,7 +290,7 @@ func (c *Gateway) tryFirstServeCachedManifest(rw http.ResponseWriter, r *http.Re } if val.MediaType == "" || val.Length == "" { - return c.serveCachedManifest(rw, r, info, true, "hit"), false + return c.serveCachedManifest(rw, r, info, true, "hit and mark"), false } if r.Method == http.MethodHead { @@ -322,7 +327,7 @@ func (c *Gateway) missServeCachedManifest(rw http.ResponseWriter, r *http.Reques } if val.MediaType == "" || val.Length == "" { - return c.serveCachedManifest(rw, r, info, true, "miss") + return c.serveCachedManifest(rw, r, info, true, "miss and mark") } if r.Method == http.MethodHead { diff --git a/queue/client/client.go b/queue/client/client.go index 6470bc4..f3d1732 100644 --- a/queue/client/client.go +++ b/queue/client/client.go @@ -146,7 +146,18 @@ func (c *MessageClient) WatchList(ctx context.Context) (chan MessageResponse, er if resp.Header.Get("Content-Type") != "text/event-stream" { defer resp.Body.Close() - return nil, handleErrorResponse(resp) + + decoder := json.NewDecoder(resp.Body) + var message MessageResponse + err := decoder.Decode(&message) + if err != nil { + return nil, err + } + + messageChannel := make(chan MessageResponse, 1) + messageChannel <- message + close(messageChannel) + return messageChannel, nil } messageChannel := make(chan MessageResponse) @@ -220,7 +231,18 @@ func (c *MessageClient) Watch(ctx context.Context, messageID int64) (chan Messag if resp.Header.Get("Content-Type") != "text/event-stream" { defer resp.Body.Close() - return nil, handleErrorResponse(resp) + + decoder := json.NewDecoder(resp.Body) + var message MessageResponse + err := decoder.Decode(&message) + if err != nil { + return nil, err + } + + messageChannel := make(chan MessageResponse, 1) + messageChannel <- message + close(messageChannel) + return messageChannel, nil } messageChannel := make(chan MessageResponse) diff --git a/queue/controller/message.go b/queue/controller/message.go index 5984c16..ae178da 100644 --- a/queue/controller/message.go +++ b/queue/controller/message.go @@ -388,20 +388,21 @@ func (mc *MessageController) Get(req *restful.Request, resp *restful.Response) { return } - message, err := mc.messageService.GetByID(req.Request.Context(), messageID) + curr, err := mc.messageService.GetByID(req.Request.Context(), messageID) if err != nil { resp.WriteHeaderAndEntity(http.StatusNotFound, Error{Code: "MessageNotFoundError", Message: "Message not found: " + err.Error()}) return } - if message.Status != model.StatusProcessing && message.Status != model.StatusPending { - resp.WriteHeaderAndEntity(http.StatusOK, MessageResponse{MessageID: message.MessageID, Content: message.Content, Priority: message.Priority, Status: message.Status, Data: message.Data, LastHeartbeat: message.LastHeartbeat}) + data := MessageResponse{MessageID: curr.MessageID, Content: curr.Content, Priority: curr.Priority, Status: curr.Status, Data: curr.Data, LastHeartbeat: curr.LastHeartbeat} + watch, _ := strconv.ParseBool(req.QueryParameter("watch")) + if !watch { + resp.WriteHeaderAndEntity(http.StatusOK, data) return } - watch, _ := strconv.ParseBool(req.QueryParameter("watch")) - if !watch { - resp.WriteHeaderAndEntity(http.StatusOK, MessageResponse{MessageID: message.MessageID, Content: message.Content, Priority: message.Priority, Status: message.Status, Data: message.Data, LastHeartbeat: message.LastHeartbeat}) + if data.Status != model.StatusProcessing && data.Status != model.StatusPending { + resp.WriteHeaderAndEntity(http.StatusOK, data) return } @@ -415,7 +416,7 @@ func (mc *MessageController) Get(req *restful.Request, resp *restful.Response) { watchCh := mc.newWatchChannel(messageID) defer mc.cancelWatchChannel(messageID, watchCh) - mc.updateWatchChannel(messageID, MessageResponse{MessageID: message.MessageID, Content: message.Content, Priority: message.Priority, Status: message.Status, Data: message.Data, LastHeartbeat: message.LastHeartbeat}) + mc.updateWatchChannel(messageID, data) encoder := json.NewEncoder(resp.ResponseWriter)