From da9f95c942ed51b0b9874fd21d111ef39c3ff202 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Tue, 4 Feb 2025 15:59:36 +0100 Subject: [PATCH] fix(core): Handle cancellation of waiting executions correctly --- .../src/__tests__/active-executions.test.ts | 221 +++++++++--------- packages/cli/src/active-executions.ts | 20 +- packages/cli/src/webhooks/webhook-helpers.ts | 1 + 3 files changed, 128 insertions(+), 114 deletions(-) diff --git a/packages/cli/src/__tests__/active-executions.test.ts b/packages/cli/src/__tests__/active-executions.test.ts index c1a63cd0f3724..c7869a315ce14 100644 --- a/packages/cli/src/__tests__/active-executions.test.ts +++ b/packages/cli/src/__tests__/active-executions.test.ts @@ -1,10 +1,11 @@ import { mock } from 'jest-mock-extended'; import type { + IDeferredPromise, IExecuteResponsePromiseData, IRun, IWorkflowExecutionDataProcess, } from 'n8n-workflow'; -import { createDeferredPromise } from 'n8n-workflow'; +import { ExecutionCancelledError } from 'n8n-workflow'; import PCancelable from 'p-cancelable'; import { v4 as uuid } from 'uuid'; @@ -31,9 +32,41 @@ const concurrencyControl = mockInstance(ConcurrencyControlService, { describe('ActiveExecutions', () => { let activeExecutions: ActiveExecutions; + let responsePromise: IDeferredPromise; + let workflowExecution: PCancelable; + let postExecutePromise: Promise; + + const fullRunData: IRun = { + data: { + resultData: { + runData: {}, + }, + }, + mode: 'manual', + startedAt: new Date(), + status: 'new', + }; + + const executionData: IWorkflowExecutionDataProcess = { + executionMode: 'manual', + workflowData: { + id: '123', + name: 'Test workflow 1', + active: false, + createdAt: new Date(), + updatedAt: new Date(), + nodes: [], + connections: {}, + }, + userId: uuid(), + }; beforeEach(() => { activeExecutions = new ActiveExecutions(mock(), executionRepository, concurrencyControl); + + workflowExecution = new PCancelable((resolve) => resolve()); + workflowExecution.cancel = jest.fn(); + responsePromise = mock>(); }); afterEach(() => { @@ -45,8 +78,7 @@ describe('ActiveExecutions', () => { }); test('Should add execution to active execution list', async () => { - const newExecution = mockExecutionData(); - const executionId = await activeExecutions.add(newExecution); + const executionId = await activeExecutions.add(executionData); expect(executionId).toBe(FAKE_EXECUTION_ID); expect(activeExecutions.getActiveExecutions()).toHaveLength(1); @@ -55,8 +87,7 @@ describe('ActiveExecutions', () => { }); test('Should update execution if add is called with execution ID', async () => { - const newExecution = mockExecutionData(); - const executionId = await activeExecutions.add(newExecution, FAKE_SECOND_EXECUTION_ID); + const executionId = await activeExecutions.add(executionData, FAKE_SECOND_EXECUTION_ID); expect(executionId).toBe(FAKE_SECOND_EXECUTION_ID); expect(activeExecutions.getActiveExecutions()).toHaveLength(1); @@ -64,150 +95,120 @@ describe('ActiveExecutions', () => { expect(updateExistingExecution).toHaveBeenCalledTimes(1); }); - test('Should fail attaching execution to invalid executionId', async () => { - const deferredPromise = mockCancelablePromise(); - - expect(() => { - activeExecutions.attachWorkflowExecution(FAKE_EXECUTION_ID, deferredPromise); - }).toThrow(); - }); + describe('attachWorkflowExecution', () => { + test('Should fail attaching execution to invalid executionId', async () => { + expect(() => { + activeExecutions.attachWorkflowExecution(FAKE_EXECUTION_ID, workflowExecution); + }).toThrow(); + }); - test('Should successfully attach execution to valid executionId', async () => { - const newExecution = mockExecutionData(); - await activeExecutions.add(newExecution, FAKE_EXECUTION_ID); - const deferredPromise = mockCancelablePromise(); + test('Should successfully attach execution to valid executionId', async () => { + await activeExecutions.add(executionData, FAKE_EXECUTION_ID); - expect(() => - activeExecutions.attachWorkflowExecution(FAKE_EXECUTION_ID, deferredPromise), - ).not.toThrow(); + expect(() => + activeExecutions.attachWorkflowExecution(FAKE_EXECUTION_ID, workflowExecution), + ).not.toThrow(); + }); }); test('Should attach and resolve response promise to existing execution', async () => { - const newExecution = mockExecutionData(); - await activeExecutions.add(newExecution, FAKE_EXECUTION_ID); - const deferredPromise = mockDeferredPromise(); - activeExecutions.attachResponsePromise(FAKE_EXECUTION_ID, deferredPromise); + await activeExecutions.add(executionData, FAKE_EXECUTION_ID); + activeExecutions.attachResponsePromise(FAKE_EXECUTION_ID, responsePromise); const fakeResponse = { data: { resultData: { runData: {} } } }; activeExecutions.resolveResponsePromise(FAKE_EXECUTION_ID, fakeResponse); - await expect(deferredPromise.promise).resolves.toEqual(fakeResponse); + expect(responsePromise.resolve).toHaveBeenCalledWith(fakeResponse); }); test('Should copy over startedAt and responsePromise when resuming a waiting execution', async () => { - const newExecution = mockExecutionData(); - const executionId = await activeExecutions.add(newExecution); + const executionId = await activeExecutions.add(executionData); activeExecutions.setStatus(executionId, 'waiting'); - activeExecutions.attachResponsePromise(executionId, mockDeferredPromise()); + activeExecutions.attachResponsePromise(executionId, responsePromise); const waitingExecution = activeExecutions.getExecutionOrFail(executionId); expect(waitingExecution.responsePromise).toBeDefined(); // Resume the execution - await activeExecutions.add(newExecution, executionId); + await activeExecutions.add(executionData, executionId); const resumedExecution = activeExecutions.getExecutionOrFail(executionId); expect(resumedExecution.startedAt).toBe(waitingExecution.startedAt); - expect(resumedExecution.responsePromise).toBe(waitingExecution.responsePromise); + expect(resumedExecution.responsePromise).toBe(responsePromise); }); - test('Should not remove a waiting execution', async () => { - const newExecution = mockExecutionData(); - const executionId = await activeExecutions.add(newExecution); - activeExecutions.setStatus(executionId, 'waiting'); - activeExecutions.finalizeExecution(executionId); + describe('finalizeExecution', () => { + test('Should not remove a waiting execution', async () => { + const executionId = await activeExecutions.add(executionData); + activeExecutions.setStatus(executionId, 'waiting'); + activeExecutions.finalizeExecution(executionId); - // Wait until the next tick to ensure that the post-execution promise has settled - await new Promise(setImmediate); + // Wait until the next tick to ensure that the post-execution promise has settled + await new Promise(setImmediate); - // Execution should still be in activeExecutions - expect(activeExecutions.getActiveExecutions()).toHaveLength(1); - expect(activeExecutions.getStatus(executionId)).toBe('waiting'); - }); + // Execution should still be in activeExecutions + expect(activeExecutions.getActiveExecutions()).toHaveLength(1); + expect(activeExecutions.getStatus(executionId)).toBe('waiting'); + }); - test('Should remove an existing execution', async () => { - // ARRANGE - const newExecution = mockExecutionData(); - const executionId = await activeExecutions.add(newExecution); + test('Should remove an existing execution', async () => { + const executionId = await activeExecutions.add(executionData); - // ACT - activeExecutions.finalizeExecution(executionId); + activeExecutions.finalizeExecution(executionId); - // Wait until the next tick to ensure that the post-execution promise has settled - await new Promise(setImmediate); + await new Promise(setImmediate); + expect(activeExecutions.getActiveExecutions()).toHaveLength(0); + }); - // ASSERT - expect(activeExecutions.getActiveExecutions()).toHaveLength(0); - }); + test('Should not try to resolve a post-execute promise for an inactive execution', async () => { + const getExecutionSpy = jest.spyOn(activeExecutions, 'getExecutionOrFail'); - test('Should not try to resolve a post-execute promise for an inactive execution', async () => { - const getExecutionSpy = jest.spyOn(activeExecutions, 'getExecutionOrFail'); + activeExecutions.finalizeExecution('inactive-execution-id', fullRunData); - activeExecutions.finalizeExecution('inactive-execution-id', mockFullRunData()); + expect(getExecutionSpy).not.toHaveBeenCalled(); + }); - expect(getExecutionSpy).not.toHaveBeenCalled(); - }); + test('Should resolve post execute promise on removal', async () => { + const executionId = await activeExecutions.add(executionData); + const postExecutePromise = activeExecutions.getPostExecutePromise(executionId); - test('Should resolve post execute promise on removal', async () => { - const newExecution = mockExecutionData(); - const executionId = await activeExecutions.add(newExecution); - const postExecutePromise = activeExecutions.getPostExecutePromise(executionId); - // Force the above to be executed since we cannot await it - await new Promise((res) => { - setTimeout(res, 100); - }); - const fakeOutput = mockFullRunData(); - activeExecutions.finalizeExecution(executionId, fakeOutput); + await new Promise(setImmediate); + activeExecutions.finalizeExecution(executionId, fullRunData); - await expect(postExecutePromise).resolves.toEqual(fakeOutput); + await expect(postExecutePromise).resolves.toEqual(fullRunData); + }); }); - test('Should throw error when trying to create a promise with invalid execution', async () => { - await expect(activeExecutions.getPostExecutePromise(FAKE_EXECUTION_ID)).rejects.toThrow(); + describe('getPostExecutePromise', () => { + test('Should throw error when trying to create a promise with invalid execution', async () => { + await expect(activeExecutions.getPostExecutePromise(FAKE_EXECUTION_ID)).rejects.toThrow(); + }); }); - test('Should call function to cancel execution when asked to stop', async () => { - const newExecution = mockExecutionData(); - const executionId = await activeExecutions.add(newExecution); - const cancelExecution = jest.fn(); - const cancellablePromise = mockCancelablePromise(); - cancellablePromise.cancel = cancelExecution; - activeExecutions.attachWorkflowExecution(executionId, cancellablePromise); - activeExecutions.stopExecution(executionId); + describe('stopExecution', () => { + let executionId: string; - expect(cancelExecution).toHaveBeenCalledTimes(1); - }); -}); + beforeEach(async () => { + executionId = await activeExecutions.add(executionData); + postExecutePromise = activeExecutions.getPostExecutePromise(executionId); -function mockExecutionData(): IWorkflowExecutionDataProcess { - return { - executionMode: 'manual', - workflowData: { - id: '123', - name: 'Test workflow 1', - active: false, - createdAt: new Date(), - updatedAt: new Date(), - nodes: [], - connections: {}, - }, - userId: uuid(), - }; -} + activeExecutions.attachWorkflowExecution(executionId, workflowExecution); + activeExecutions.attachResponsePromise(executionId, responsePromise); + }); -function mockFullRunData(): IRun { - return { - data: { - resultData: { - runData: {}, - }, - }, - mode: 'manual', - startedAt: new Date(), - status: 'new', - }; -} + test('Should cancel ongoing executions', async () => { + activeExecutions.stopExecution(executionId); + + expect(responsePromise.reject).toHaveBeenCalledWith(expect.any(ExecutionCancelledError)); + expect(workflowExecution.cancel).toHaveBeenCalledTimes(1); + await expect(postExecutePromise).rejects.toThrow(ExecutionCancelledError); + }); -// eslint-disable-next-line @typescript-eslint/promise-function-async -const mockCancelablePromise = () => new PCancelable((resolve) => resolve()); + test('Should cancel waiting executions', async () => { + activeExecutions.setStatus(executionId, 'waiting'); + activeExecutions.stopExecution(executionId); -const mockDeferredPromise = () => createDeferredPromise(); + expect(responsePromise.reject).toHaveBeenCalledWith(expect.any(ExecutionCancelledError)); + expect(workflowExecution.cancel).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/cli/src/active-executions.ts b/packages/cli/src/active-executions.ts index f9ac41e18bc9c..fa38fe0dc9979 100644 --- a/packages/cli/src/active-executions.ts +++ b/packages/cli/src/active-executions.ts @@ -95,13 +95,14 @@ export class ActiveExecutions { const resumingExecution = this.activeExecutions[executionId]; const postExecutePromise = createDeferredPromise(); - this.activeExecutions[executionId] = { + const execution: IExecutingWorkflowData = { executionData, startedAt: resumingExecution?.startedAt ?? new Date(), postExecutePromise, status: executionStatus, responsePromise: resumingExecution?.responsePromise, }; + this.activeExecutions[executionId] = execution; // Automatically remove execution once the postExecutePromise settles void postExecutePromise.promise @@ -111,7 +112,10 @@ export class ActiveExecutions { }) .finally(() => { this.concurrencyControl.release({ mode: executionData.executionMode }); - if (this.activeExecutions[executionId]?.status !== 'waiting') { + if (execution.status === 'waiting') { + // Do not hold on a reference to the previous WorkflowExecute instance, since a resuming execution will use a new instance + delete execution.workflowExecution; + } else { delete this.activeExecutions[executionId]; this.logger.debug('Execution removed', { executionId }); } @@ -149,8 +153,16 @@ export class ActiveExecutions { // There is no execution running with that id return; } - execution.workflowExecution?.cancel(); - execution.postExecutePromise.reject(new ExecutionCancelledError(executionId)); + const error = new ExecutionCancelledError(executionId); + execution.responsePromise?.reject(error); + if (execution.status === 'waiting') { + // A waiting execution will not have a valid workflowExecution or postExecutePromise + // So we can't rely on the `.finally` on the postExecutePromise for the execution removal + delete this.activeExecutions[executionId]; + } else { + execution.workflowExecution?.cancel(); + execution.postExecutePromise.reject(error); + } this.logger.debug('Execution cancelled', { executionId }); } diff --git a/packages/cli/src/webhooks/webhook-helpers.ts b/packages/cli/src/webhooks/webhook-helpers.ts index dd6a77560686e..ca54a14b52c18 100644 --- a/packages/cli/src/webhooks/webhook-helpers.ts +++ b/packages/cli/src/webhooks/webhook-helpers.ts @@ -530,6 +530,7 @@ export async function executeWebhook( `Error with Webhook-Response for execution "${executionId}": "${error.message}"`, { executionId, workflowId: workflow.id }, ); + responseCallback(error, {}); }); }