diff --git a/packages/core/src/discovery.js b/packages/core/src/discovery.js index a3a4fb6cf..863fc340d 100644 --- a/packages/core/src/discovery.js +++ b/packages/core/src/discovery.js @@ -248,7 +248,7 @@ export const RESOURCE_CACHE_KEY = Symbol('resource-cache'); // snapshot which is used to intercept and capture snapshot resource requests. export function createDiscoveryQueue(percy) { let { concurrency } = percy.config.discovery; - let queue = new Queue(); + let queue = new Queue('discovery'); let cache; return queue diff --git a/packages/core/src/queue.js b/packages/core/src/queue.js index a7089e0b6..d44894501 100644 --- a/packages/core/src/queue.js +++ b/packages/core/src/queue.js @@ -3,6 +3,7 @@ import { generatePromise, AbortController } from './utils.js'; +import logger from '@percy/logger'; // Assigns a deffered promise and resolve & reject functions to an object function deferred(obj) { @@ -31,6 +32,11 @@ class QueueClosedError extends Error { export class Queue { // item concurrency concurrency = 10; + log = logger('core:queue'); + + constructor(name) { + this.name = name; + } // Configure queue properties set({ concurrency }) { @@ -181,6 +187,7 @@ export class Queue { // clear and abort any queued tasks clear() { let tasks = [...this.#queued]; + this.log.debug(`Clearing ${this.name} queue, queued state: ${this.#queued.size}, pending state: ${this.#pending.size}`); this.#queued.clear(); for (let task of tasks) { @@ -245,6 +252,7 @@ export class Queue { // process items up to the latest queued item, starting the queue if necessary; // returns a generator that yields until the flushed item has finished processing flush(callback) { + this.log.debug(`Flushing ${this.name} queue, queued state: ${this.#queued.size}, pending state: ${this.#pending.size}`); let interrupt = ( // check for existing interrupts [...this.#pending].find(t => t.stop) ?? @@ -274,10 +282,8 @@ export class Queue { let queued, pending = this.#pending.size; // calculate the position within queued when not pending if (task && task.pending == null) queued = positionOf(this.#queued, task); - // calculate the position within pending when not stopping - if (!task?.stop && task?.pending != null) pending = positionOf(this.#pending, task); // call the callback and return true when not queued or pending - let position = (queued ?? 0) + (pending ?? 0); + let position = (queued ?? 0) + pending; callback?.(position); return !position; }, { idle: 10 }); diff --git a/packages/core/src/snapshot.js b/packages/core/src/snapshot.js index 5884a00ac..282b8612c 100644 --- a/packages/core/src/snapshot.js +++ b/packages/core/src/snapshot.js @@ -285,7 +285,7 @@ function mergeSnapshotOptions(prev = {}, next) { // Creates a snapshots queue that manages a Percy build and uploads snapshots. export function createSnapshotsQueue(percy) { let { concurrency } = percy.config.discovery; - let queue = new Queue(); + let queue = new Queue('snapshot'); let build; return queue diff --git a/packages/core/test/unit/queue.test.js b/packages/core/test/unit/queue.test.js index 815d2006b..30bb7c29f 100644 --- a/packages/core/test/unit/queue.test.js +++ b/packages/core/test/unit/queue.test.js @@ -1,11 +1,13 @@ import { AbortController, generatePromise, waitForTimeout } from '../../src/utils.js'; import Queue from '../../src/queue.js'; +import { logger, setupTest } from '../helpers/index.js'; describe('Unit / Tasks Queue', () => { let q; - beforeEach(() => { - q = new Queue(); + beforeEach(async () => { + q = new Queue('test'); + await setupTest(); }); it('has a customizable concurrency', () => { @@ -238,10 +240,10 @@ describe('Unit / Tasks Queue', () => { let p1 = q.push('item #1'); let p2 = q.push('item #2'); expect(q.size).toBe(2); - + q.log.loglevel('debug'); q.close(true); + expect(logger.stderr).toEqual(jasmine.arrayContaining(['[percy:core:queue] Clearing test queue, queued state: 2, pending state: 0'])); expect(q.size).toBe(0); - await expectAsync(p1) .toBeRejectedWithError('This operation was aborted'); await expectAsync(p2) @@ -401,24 +403,36 @@ describe('Unit / Tasks Queue', () => { expect(q.size).toBe(2); }); - it('can flush twice without interruption', async () => { - let resolve1, deferred1 = new Promise(r => (resolve1 = r)); - let resolve2, deferred2 = new Promise(r => (resolve2 = r)); - - q.push(deferred1); - let p1 = generatePromise(q.flush()); + it('flushing queued task', async () => { + q.set({ concurrency: 1 }); + q.log.loglevel('debug'); + await q.start(); + let resolve, deferred = new Promise(r => (resolve = r)); + q.readyState = 2; + q.push('item_1'); + let p1 = q.push(deferred); + expect(q.size).toBe(2); + let promise = generatePromise(q.flush()); + expect(logger.stderr).toEqual(jasmine.arrayContaining(['[percy:core:queue] Flushing test queue, queued state: 1, pending state: 1'])); + await expectAsync(promise).toBePending(); await expectAsync(p1).toBePending(); - - q.push(deferred2); - let p2 = generatePromise(q.flush()); - await expectAsync(p2).toBePending(); - - resolve1(); + q.process(deferred); + resolve(); + await expectAsync(promise).toBeResolved(); await expectAsync(p1).toBeResolved(); - await expectAsync(p2).toBePending(); + expect(q.size).toBe(0); + }); - resolve2(); - await expectAsync(p2).toBeResolved(); + it('empty flush', async () => { + q.set({ concurrency: 1 }); + q.log.loglevel('debug'); + await q.start(); + q.readyState = 2; + expect(q.size).toBe(0); + let promise = generatePromise(q.flush()); + expect(logger.stderr).toEqual(jasmine.arrayContaining(['[percy:core:queue] Flushing test queue, queued state: 0, pending state: 0'])); + await expectAsync(promise).toBePending(); + await expectAsync(promise).toBeResolved(); }); it('cancels the flush when aborted', async () => {