Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Jan 21, 2025
1 parent 7293c7e commit 72f7c2f
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 30 deletions.
47 changes: 26 additions & 21 deletions gateway/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (c *Gateway) cacheManifestResponse(rw http.ResponseWriter, r *http.Request,
if done {
return
}
if fallback {
if fallback && c.recacheMaxWait > 0 {
ctx, cancel = context.WithTimeout(ctx, c.recacheMaxWait)
defer cancel()
}
Expand All @@ -66,23 +66,29 @@ func (c *Gateway) cacheManifestResponse(rw http.ResponseWriter, r *http.Request,
}

func (c *Gateway) waitingQueue(ctx context.Context, msg string, weight int) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

mr, err := c.queueClient.Create(ctx, msg, weight+1)
if err != nil {
return fmt.Errorf("failed to create queue: %w", err)
}

if mr.Status == model.StatusPending {
c.logger.Info("watching message from queue", "msg", msg)

chMr, err := c.queueClient.Watch(ctx, mr.MessageID)
if err != nil {
return fmt.Errorf("failed to watch message: %w", err)
}
c.logger.Info("Watching message in queue", "msg", msg)
watiQueue:
for {
select {
case <-ctx.Done():
return ctx.Err()
case m, ok := <-chMr:
if !ok {
if m.Status != model.StatusPending {
if mr.Status != model.StatusPending && mr.Status != model.StatusProcessing {
break watiQueue
}

Expand All @@ -94,9 +100,6 @@ func (c *Gateway) waitingQueue(ctx context.Context, msg string, weight int) erro
} else {
mr = m
}

case <-ctx.Done():
return ctx.Err()
}
}
}
Expand Down Expand Up @@ -156,21 +159,23 @@ func (c *Gateway) cacheManifest(info *PathInfo, weight int) (int, error) {
if c.queueClient != nil {
cachedDigest, err := c.cache.DigestManifest(ctx, info.Host, info.Image, info.Manifests)
if err == nil {
if cachedDigest != digest {
msg := fmt.Sprintf("%s/%s:%s", info.Host, info.Image, info.Manifests)
_, err := c.queueClient.Create(context.Background(), msg, 0)
if err != nil {
c.logger.Warn("failed add message to queue", "msg", msg, "digest", digest, "error", err)
} else {
c.logger.Info("Add message to queue", "msg", msg, "digest", digest)
_, err := c.cache.StatBlob(ctx, cachedDigest)
if err == nil {
if cachedDigest != digest {
msg := fmt.Sprintf("%s/%s:%s", info.Host, info.Image, info.Manifests)
_, err := c.queueClient.Create(context.Background(), msg, 0)
if err != nil {
c.logger.Warn("failed add message to queue", "msg", msg, "digest", digest, "error", err)
} else {
c.logger.Info("Add message to queue", "msg", msg, "digest", digest)
}
digest = cachedDigest
}
digest = cachedDigest
c.manifestCache.Put(info, cacheValue{
Digest: digest,
})
return 0, nil
}

c.manifestCache.Put(info, cacheValue{
Digest: digest,
})
return 0, nil
}
}

Expand Down Expand Up @@ -285,7 +290,7 @@ func (c *Gateway) tryFirstServeCachedManifest(rw http.ResponseWriter, r *http.Re
}

if val.MediaType == "" || val.Length == "" {
return c.serveCachedManifest(rw, r, info, true, "hit"), false
return c.serveCachedManifest(rw, r, info, true, "hit and mark"), false
}

if r.Method == http.MethodHead {
Expand Down Expand Up @@ -322,7 +327,7 @@ func (c *Gateway) missServeCachedManifest(rw http.ResponseWriter, r *http.Reques
}

if val.MediaType == "" || val.Length == "" {
return c.serveCachedManifest(rw, r, info, true, "miss")
return c.serveCachedManifest(rw, r, info, true, "miss and mark")
}

if r.Method == http.MethodHead {
Expand Down
26 changes: 24 additions & 2 deletions queue/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,18 @@ func (c *MessageClient) WatchList(ctx context.Context) (chan MessageResponse, er

if resp.Header.Get("Content-Type") != "text/event-stream" {
defer resp.Body.Close()
return nil, handleErrorResponse(resp)

decoder := json.NewDecoder(resp.Body)
var message MessageResponse
err := decoder.Decode(&message)
if err != nil {
return nil, err
}

messageChannel := make(chan MessageResponse, 1)
messageChannel <- message
close(messageChannel)
return messageChannel, nil
}

messageChannel := make(chan MessageResponse)
Expand Down Expand Up @@ -220,7 +231,18 @@ func (c *MessageClient) Watch(ctx context.Context, messageID int64) (chan Messag

if resp.Header.Get("Content-Type") != "text/event-stream" {
defer resp.Body.Close()
return nil, handleErrorResponse(resp)

decoder := json.NewDecoder(resp.Body)
var message MessageResponse
err := decoder.Decode(&message)
if err != nil {
return nil, err
}

messageChannel := make(chan MessageResponse, 1)
messageChannel <- message
close(messageChannel)
return messageChannel, nil
}

messageChannel := make(chan MessageResponse)
Expand Down
15 changes: 8 additions & 7 deletions queue/controller/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,20 +388,21 @@ func (mc *MessageController) Get(req *restful.Request, resp *restful.Response) {
return
}

message, err := mc.messageService.GetByID(req.Request.Context(), messageID)
curr, err := mc.messageService.GetByID(req.Request.Context(), messageID)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusNotFound, Error{Code: "MessageNotFoundError", Message: "Message not found: " + err.Error()})
return
}

if message.Status != model.StatusProcessing && message.Status != model.StatusPending {
resp.WriteHeaderAndEntity(http.StatusOK, MessageResponse{MessageID: message.MessageID, Content: message.Content, Priority: message.Priority, Status: message.Status, Data: message.Data, LastHeartbeat: message.LastHeartbeat})
data := MessageResponse{MessageID: curr.MessageID, Content: curr.Content, Priority: curr.Priority, Status: curr.Status, Data: curr.Data, LastHeartbeat: curr.LastHeartbeat}
watch, _ := strconv.ParseBool(req.QueryParameter("watch"))
if !watch {
resp.WriteHeaderAndEntity(http.StatusOK, data)
return
}

watch, _ := strconv.ParseBool(req.QueryParameter("watch"))
if !watch {
resp.WriteHeaderAndEntity(http.StatusOK, MessageResponse{MessageID: message.MessageID, Content: message.Content, Priority: message.Priority, Status: message.Status, Data: message.Data, LastHeartbeat: message.LastHeartbeat})
if data.Status != model.StatusProcessing && data.Status != model.StatusPending {
resp.WriteHeaderAndEntity(http.StatusOK, data)
return
}

Expand All @@ -415,7 +416,7 @@ func (mc *MessageController) Get(req *restful.Request, resp *restful.Response) {
watchCh := mc.newWatchChannel(messageID)
defer mc.cancelWatchChannel(messageID, watchCh)

mc.updateWatchChannel(messageID, MessageResponse{MessageID: message.MessageID, Content: message.Content, Priority: message.Priority, Status: message.Status, Data: message.Data, LastHeartbeat: message.LastHeartbeat})
mc.updateWatchChannel(messageID, data)

encoder := json.NewEncoder(resp.ResponseWriter)

Expand Down

0 comments on commit 72f7c2f

Please sign in to comment.