Skip to content

Commit

Permalink
Revert "Add item ID to the workqueue instead"
Browse files Browse the repository at this point in the history
This reverts commit 1e482f2.
  • Loading branch information
pingsutw committed Oct 2, 2023
1 parent 1e482f2 commit 1dfd610
Showing 1 changed file with 6 additions and 19 deletions.
25 changes: 6 additions & 19 deletions flytestdlib/cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,8 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {
}

for _, batch := range batches {
for _, b := range batch {
logger.Debugf(ctx, "Enqueuing batch with id: %v", b.GetID())
w.workqueue.Add(b.GetID())
}
b := batch
w.workqueue.Add(&b)
}

return nil
Expand Down Expand Up @@ -275,29 +273,18 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {
case <-ctx.Done():
return nil
default:
itemID, shutdown := w.workqueue.Get()
item, shutdown := w.workqueue.Get()
if shutdown {
logger.Debugf(ctx, "Shutting down worker")
return nil
}

t := w.metrics.SyncLatency.Start()
logger.Debugf(ctx, "Syncing item with id [%v]", itemID)
item, ok := w.lruMap.Get(itemID)
if !ok {
logger.Debugf(ctx, "item with id [%v] not found in cache", itemID)
t.Stop()
continue
}
updatedBatch, err := w.syncCb(ctx, Batch{itemWrapper{
id: itemID.(ItemID),
item: item.(Item),
}})
updatedBatch, err := w.syncCb(ctx, *item.(*Batch))

// Since we create batches every time we sync, we will just remove the item from the queue here
// regardless of whether it succeeded the sync or not.
w.workqueue.Forget(itemID)
w.workqueue.Done(itemID)
w.workqueue.Forget(item)
w.workqueue.Done(item)

if err != nil {
w.metrics.SyncErrors.Inc()
Expand Down

0 comments on commit 1dfd610

Please sign in to comment.