Skip to content

Commit

Permalink
fix(core): Handle cancellation of waiting executions correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy committed Feb 4, 2025
1 parent b5905c6 commit da9f95c
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 114 deletions.
221 changes: 111 additions & 110 deletions packages/cli/src/__tests__/active-executions.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { mock } from 'jest-mock-extended';
import type {
IDeferredPromise,
IExecuteResponsePromiseData,
IRun,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import { createDeferredPromise } from 'n8n-workflow';
import { ExecutionCancelledError } from 'n8n-workflow';
import PCancelable from 'p-cancelable';
import { v4 as uuid } from 'uuid';

Expand All @@ -31,9 +32,41 @@ const concurrencyControl = mockInstance(ConcurrencyControlService, {

describe('ActiveExecutions', () => {
let activeExecutions: ActiveExecutions;
let responsePromise: IDeferredPromise<IExecuteResponsePromiseData>;
let workflowExecution: PCancelable<IRun>;
let postExecutePromise: Promise<IRun | undefined>;

const fullRunData: IRun = {
data: {
resultData: {
runData: {},
},
},
mode: 'manual',
startedAt: new Date(),
status: 'new',
};

const executionData: IWorkflowExecutionDataProcess = {
executionMode: 'manual',
workflowData: {
id: '123',
name: 'Test workflow 1',
active: false,
createdAt: new Date(),
updatedAt: new Date(),
nodes: [],
connections: {},
},
userId: uuid(),
};

beforeEach(() => {
activeExecutions = new ActiveExecutions(mock(), executionRepository, concurrencyControl);

workflowExecution = new PCancelable<IRun>((resolve) => resolve());
workflowExecution.cancel = jest.fn();
responsePromise = mock<IDeferredPromise<IExecuteResponsePromiseData>>();
});

afterEach(() => {
Expand All @@ -45,8 +78,7 @@ describe('ActiveExecutions', () => {
});

test('Should add execution to active execution list', async () => {
const newExecution = mockExecutionData();
const executionId = await activeExecutions.add(newExecution);
const executionId = await activeExecutions.add(executionData);

expect(executionId).toBe(FAKE_EXECUTION_ID);
expect(activeExecutions.getActiveExecutions()).toHaveLength(1);
Expand All @@ -55,159 +87,128 @@ describe('ActiveExecutions', () => {
});

test('Should update execution if add is called with execution ID', async () => {
const newExecution = mockExecutionData();
const executionId = await activeExecutions.add(newExecution, FAKE_SECOND_EXECUTION_ID);
const executionId = await activeExecutions.add(executionData, FAKE_SECOND_EXECUTION_ID);

expect(executionId).toBe(FAKE_SECOND_EXECUTION_ID);
expect(activeExecutions.getActiveExecutions()).toHaveLength(1);
expect(createNewExecution).toHaveBeenCalledTimes(0);
expect(updateExistingExecution).toHaveBeenCalledTimes(1);
});

test('Should fail attaching execution to invalid executionId', async () => {
const deferredPromise = mockCancelablePromise();

expect(() => {
activeExecutions.attachWorkflowExecution(FAKE_EXECUTION_ID, deferredPromise);
}).toThrow();
});
describe('attachWorkflowExecution', () => {
test('Should fail attaching execution to invalid executionId', async () => {
expect(() => {
activeExecutions.attachWorkflowExecution(FAKE_EXECUTION_ID, workflowExecution);
}).toThrow();
});

test('Should successfully attach execution to valid executionId', async () => {
const newExecution = mockExecutionData();
await activeExecutions.add(newExecution, FAKE_EXECUTION_ID);
const deferredPromise = mockCancelablePromise();
test('Should successfully attach execution to valid executionId', async () => {
await activeExecutions.add(executionData, FAKE_EXECUTION_ID);

expect(() =>
activeExecutions.attachWorkflowExecution(FAKE_EXECUTION_ID, deferredPromise),
).not.toThrow();
expect(() =>
activeExecutions.attachWorkflowExecution(FAKE_EXECUTION_ID, workflowExecution),
).not.toThrow();
});
});

test('Should attach and resolve response promise to existing execution', async () => {
const newExecution = mockExecutionData();
await activeExecutions.add(newExecution, FAKE_EXECUTION_ID);
const deferredPromise = mockDeferredPromise();
activeExecutions.attachResponsePromise(FAKE_EXECUTION_ID, deferredPromise);
await activeExecutions.add(executionData, FAKE_EXECUTION_ID);
activeExecutions.attachResponsePromise(FAKE_EXECUTION_ID, responsePromise);
const fakeResponse = { data: { resultData: { runData: {} } } };
activeExecutions.resolveResponsePromise(FAKE_EXECUTION_ID, fakeResponse);

await expect(deferredPromise.promise).resolves.toEqual(fakeResponse);
expect(responsePromise.resolve).toHaveBeenCalledWith(fakeResponse);
});

test('Should copy over startedAt and responsePromise when resuming a waiting execution', async () => {
const newExecution = mockExecutionData();
const executionId = await activeExecutions.add(newExecution);
const executionId = await activeExecutions.add(executionData);
activeExecutions.setStatus(executionId, 'waiting');
activeExecutions.attachResponsePromise(executionId, mockDeferredPromise());
activeExecutions.attachResponsePromise(executionId, responsePromise);

const waitingExecution = activeExecutions.getExecutionOrFail(executionId);
expect(waitingExecution.responsePromise).toBeDefined();

// Resume the execution
await activeExecutions.add(newExecution, executionId);
await activeExecutions.add(executionData, executionId);

const resumedExecution = activeExecutions.getExecutionOrFail(executionId);
expect(resumedExecution.startedAt).toBe(waitingExecution.startedAt);
expect(resumedExecution.responsePromise).toBe(waitingExecution.responsePromise);
expect(resumedExecution.responsePromise).toBe(responsePromise);
});

test('Should not remove a waiting execution', async () => {
const newExecution = mockExecutionData();
const executionId = await activeExecutions.add(newExecution);
activeExecutions.setStatus(executionId, 'waiting');
activeExecutions.finalizeExecution(executionId);
describe('finalizeExecution', () => {
test('Should not remove a waiting execution', async () => {
const executionId = await activeExecutions.add(executionData);
activeExecutions.setStatus(executionId, 'waiting');
activeExecutions.finalizeExecution(executionId);

// Wait until the next tick to ensure that the post-execution promise has settled
await new Promise(setImmediate);
// Wait until the next tick to ensure that the post-execution promise has settled
await new Promise(setImmediate);

// Execution should still be in activeExecutions
expect(activeExecutions.getActiveExecutions()).toHaveLength(1);
expect(activeExecutions.getStatus(executionId)).toBe('waiting');
});
// Execution should still be in activeExecutions
expect(activeExecutions.getActiveExecutions()).toHaveLength(1);
expect(activeExecutions.getStatus(executionId)).toBe('waiting');
});

test('Should remove an existing execution', async () => {
// ARRANGE
const newExecution = mockExecutionData();
const executionId = await activeExecutions.add(newExecution);
test('Should remove an existing execution', async () => {
const executionId = await activeExecutions.add(executionData);

// ACT
activeExecutions.finalizeExecution(executionId);
activeExecutions.finalizeExecution(executionId);

// Wait until the next tick to ensure that the post-execution promise has settled
await new Promise(setImmediate);
await new Promise(setImmediate);
expect(activeExecutions.getActiveExecutions()).toHaveLength(0);
});

// ASSERT
expect(activeExecutions.getActiveExecutions()).toHaveLength(0);
});
test('Should not try to resolve a post-execute promise for an inactive execution', async () => {
const getExecutionSpy = jest.spyOn(activeExecutions, 'getExecutionOrFail');

test('Should not try to resolve a post-execute promise for an inactive execution', async () => {
const getExecutionSpy = jest.spyOn(activeExecutions, 'getExecutionOrFail');
activeExecutions.finalizeExecution('inactive-execution-id', fullRunData);

activeExecutions.finalizeExecution('inactive-execution-id', mockFullRunData());
expect(getExecutionSpy).not.toHaveBeenCalled();
});

expect(getExecutionSpy).not.toHaveBeenCalled();
});
test('Should resolve post execute promise on removal', async () => {
const executionId = await activeExecutions.add(executionData);
const postExecutePromise = activeExecutions.getPostExecutePromise(executionId);

test('Should resolve post execute promise on removal', async () => {
const newExecution = mockExecutionData();
const executionId = await activeExecutions.add(newExecution);
const postExecutePromise = activeExecutions.getPostExecutePromise(executionId);
// Force the above to be executed since we cannot await it
await new Promise((res) => {
setTimeout(res, 100);
});
const fakeOutput = mockFullRunData();
activeExecutions.finalizeExecution(executionId, fakeOutput);
await new Promise(setImmediate);
activeExecutions.finalizeExecution(executionId, fullRunData);

await expect(postExecutePromise).resolves.toEqual(fakeOutput);
await expect(postExecutePromise).resolves.toEqual(fullRunData);
});
});

test('Should throw error when trying to create a promise with invalid execution', async () => {
await expect(activeExecutions.getPostExecutePromise(FAKE_EXECUTION_ID)).rejects.toThrow();
describe('getPostExecutePromise', () => {
test('Should throw error when trying to create a promise with invalid execution', async () => {
await expect(activeExecutions.getPostExecutePromise(FAKE_EXECUTION_ID)).rejects.toThrow();
});
});

test('Should call function to cancel execution when asked to stop', async () => {
const newExecution = mockExecutionData();
const executionId = await activeExecutions.add(newExecution);
const cancelExecution = jest.fn();
const cancellablePromise = mockCancelablePromise();
cancellablePromise.cancel = cancelExecution;
activeExecutions.attachWorkflowExecution(executionId, cancellablePromise);
activeExecutions.stopExecution(executionId);
describe('stopExecution', () => {
let executionId: string;

expect(cancelExecution).toHaveBeenCalledTimes(1);
});
});
beforeEach(async () => {
executionId = await activeExecutions.add(executionData);
postExecutePromise = activeExecutions.getPostExecutePromise(executionId);

function mockExecutionData(): IWorkflowExecutionDataProcess {
return {
executionMode: 'manual',
workflowData: {
id: '123',
name: 'Test workflow 1',
active: false,
createdAt: new Date(),
updatedAt: new Date(),
nodes: [],
connections: {},
},
userId: uuid(),
};
}
activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
activeExecutions.attachResponsePromise(executionId, responsePromise);
});

function mockFullRunData(): IRun {
return {
data: {
resultData: {
runData: {},
},
},
mode: 'manual',
startedAt: new Date(),
status: 'new',
};
}
test('Should cancel ongoing executions', async () => {
activeExecutions.stopExecution(executionId);

expect(responsePromise.reject).toHaveBeenCalledWith(expect.any(ExecutionCancelledError));
expect(workflowExecution.cancel).toHaveBeenCalledTimes(1);
await expect(postExecutePromise).rejects.toThrow(ExecutionCancelledError);
});

// eslint-disable-next-line @typescript-eslint/promise-function-async
const mockCancelablePromise = () => new PCancelable<IRun>((resolve) => resolve());
test('Should cancel waiting executions', async () => {
activeExecutions.setStatus(executionId, 'waiting');
activeExecutions.stopExecution(executionId);

const mockDeferredPromise = () => createDeferredPromise<IExecuteResponsePromiseData>();
expect(responsePromise.reject).toHaveBeenCalledWith(expect.any(ExecutionCancelledError));
expect(workflowExecution.cancel).not.toHaveBeenCalled();
});
});
});
20 changes: 16 additions & 4 deletions packages/cli/src/active-executions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,14 @@ export class ActiveExecutions {
const resumingExecution = this.activeExecutions[executionId];
const postExecutePromise = createDeferredPromise<IRun | undefined>();

this.activeExecutions[executionId] = {
const execution: IExecutingWorkflowData = {
executionData,
startedAt: resumingExecution?.startedAt ?? new Date(),
postExecutePromise,
status: executionStatus,
responsePromise: resumingExecution?.responsePromise,
};
this.activeExecutions[executionId] = execution;

// Automatically remove execution once the postExecutePromise settles
void postExecutePromise.promise
Expand All @@ -111,7 +112,10 @@ export class ActiveExecutions {
})
.finally(() => {
this.concurrencyControl.release({ mode: executionData.executionMode });
if (this.activeExecutions[executionId]?.status !== 'waiting') {
if (execution.status === 'waiting') {
// Do not hold on a reference to the previous WorkflowExecute instance, since a resuming execution will use a new instance
delete execution.workflowExecution;
} else {
delete this.activeExecutions[executionId];
this.logger.debug('Execution removed', { executionId });
}
Expand Down Expand Up @@ -149,8 +153,16 @@ export class ActiveExecutions {
// There is no execution running with that id
return;
}
execution.workflowExecution?.cancel();
execution.postExecutePromise.reject(new ExecutionCancelledError(executionId));
const error = new ExecutionCancelledError(executionId);
execution.responsePromise?.reject(error);
if (execution.status === 'waiting') {
// A waiting execution will not have a valid workflowExecution or postExecutePromise
// So we can't rely on the `.finally` on the postExecutePromise for the execution removal
delete this.activeExecutions[executionId];
} else {
execution.workflowExecution?.cancel();
execution.postExecutePromise.reject(error);
}
this.logger.debug('Execution cancelled', { executionId });
}

Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/webhooks/webhook-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ export async function executeWebhook(
`Error with Webhook-Response for execution "${executionId}": "${error.message}"`,
{ executionId, workflowId: workflow.id },
);
responseCallback(error, {});
});
}

Expand Down

0 comments on commit da9f95c

Please sign in to comment.