Skip to content

Commit

Permalink
Merge branch 'master' of github.com:percy/cli into refactor-network-js
Browse files Browse the repository at this point in the history
  • Loading branch information
nilshah98 committed Nov 29, 2023
2 parents 3582916 + 6287cfc commit e617894
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 24 deletions.
2 changes: 1 addition & 1 deletion packages/core/src/discovery.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ export const RESOURCE_CACHE_KEY = Symbol('resource-cache');
// snapshot which is used to intercept and capture snapshot resource requests.
export function createDiscoveryQueue(percy) {
let { concurrency } = percy.config.discovery;
let queue = new Queue();
let queue = new Queue('discovery');
let cache;

return queue
Expand Down
12 changes: 9 additions & 3 deletions packages/core/src/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
generatePromise,
AbortController
} from './utils.js';
import logger from '@percy/logger';

// Assigns a deffered promise and resolve & reject functions to an object
function deferred(obj) {
Expand Down Expand Up @@ -31,6 +32,11 @@ class QueueClosedError extends Error {
export class Queue {
// item concurrency
concurrency = 10;
log = logger('core:queue');

constructor(name) {
this.name = name;
}

// Configure queue properties
set({ concurrency }) {
Expand Down Expand Up @@ -181,6 +187,7 @@ export class Queue {
// clear and abort any queued tasks
clear() {
let tasks = [...this.#queued];
this.log.debug(`Clearing ${this.name} queue, queued state: ${this.#queued.size}, pending state: ${this.#pending.size}`);
this.#queued.clear();

for (let task of tasks) {
Expand Down Expand Up @@ -245,6 +252,7 @@ export class Queue {
// process items up to the latest queued item, starting the queue if necessary;
// returns a generator that yields until the flushed item has finished processing
flush(callback) {
this.log.debug(`Flushing ${this.name} queue, queued state: ${this.#queued.size}, pending state: ${this.#pending.size}`);
let interrupt = (
// check for existing interrupts
[...this.#pending].find(t => t.stop) ??
Expand Down Expand Up @@ -274,10 +282,8 @@ export class Queue {
let queued, pending = this.#pending.size;
// calculate the position within queued when not pending
if (task && task.pending == null) queued = positionOf(this.#queued, task);
// calculate the position within pending when not stopping
if (!task?.stop && task?.pending != null) pending = positionOf(this.#pending, task);
// call the callback and return true when not queued or pending
let position = (queued ?? 0) + (pending ?? 0);
let position = (queued ?? 0) + pending;
callback?.(position);
return !position;
}, { idle: 10 });
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/snapshot.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ function mergeSnapshotOptions(prev = {}, next) {
// Creates a snapshots queue that manages a Percy build and uploads snapshots.
export function createSnapshotsQueue(percy) {
let { concurrency } = percy.config.discovery;
let queue = new Queue();
let queue = new Queue('snapshot');
let build;

return queue
Expand Down
52 changes: 33 additions & 19 deletions packages/core/test/unit/queue.test.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { AbortController, generatePromise, waitForTimeout } from '../../src/utils.js';
import Queue from '../../src/queue.js';
import { logger, setupTest } from '../helpers/index.js';

describe('Unit / Tasks Queue', () => {
let q;

beforeEach(() => {
q = new Queue();
beforeEach(async () => {
q = new Queue('test');
await setupTest();
});

it('has a customizable concurrency', () => {
Expand Down Expand Up @@ -238,10 +240,10 @@ describe('Unit / Tasks Queue', () => {
let p1 = q.push('item #1');
let p2 = q.push('item #2');
expect(q.size).toBe(2);

q.log.loglevel('debug');
q.close(true);
expect(logger.stderr).toEqual(jasmine.arrayContaining(['[percy:core:queue] Clearing test queue, queued state: 2, pending state: 0']));
expect(q.size).toBe(0);

await expectAsync(p1)
.toBeRejectedWithError('This operation was aborted');
await expectAsync(p2)
Expand Down Expand Up @@ -401,24 +403,36 @@ describe('Unit / Tasks Queue', () => {
expect(q.size).toBe(2);
});

it('can flush twice without interruption', async () => {
let resolve1, deferred1 = new Promise(r => (resolve1 = r));
let resolve2, deferred2 = new Promise(r => (resolve2 = r));

q.push(deferred1);
let p1 = generatePromise(q.flush());
it('flushing queued task', async () => {
q.set({ concurrency: 1 });
q.log.loglevel('debug');
await q.start();
let resolve, deferred = new Promise(r => (resolve = r));
q.readyState = 2;
q.push('item_1');
let p1 = q.push(deferred);
expect(q.size).toBe(2);
let promise = generatePromise(q.flush());
expect(logger.stderr).toEqual(jasmine.arrayContaining(['[percy:core:queue] Flushing test queue, queued state: 1, pending state: 1']));
await expectAsync(promise).toBePending();
await expectAsync(p1).toBePending();

q.push(deferred2);
let p2 = generatePromise(q.flush());
await expectAsync(p2).toBePending();

resolve1();
q.process(deferred);
resolve();
await expectAsync(promise).toBeResolved();
await expectAsync(p1).toBeResolved();
await expectAsync(p2).toBePending();
expect(q.size).toBe(0);
});

resolve2();
await expectAsync(p2).toBeResolved();
it('empty flush', async () => {
q.set({ concurrency: 1 });
q.log.loglevel('debug');
await q.start();
q.readyState = 2;
expect(q.size).toBe(0);
let promise = generatePromise(q.flush());
expect(logger.stderr).toEqual(jasmine.arrayContaining(['[percy:core:queue] Flushing test queue, queued state: 0, pending state: 0']));
await expectAsync(promise).toBePending();
await expectAsync(promise).toBeResolved();
});

it('cancels the flush when aborted', async () => {
Expand Down

0 comments on commit e617894

Please sign in to comment.