Skip to content

Commit

Permalink
fix(core): Reduce risk of race condition during workflow activation l…
Browse files Browse the repository at this point in the history
…oop (#13186)

Co-authored-by: Tomi Turtiainen <[email protected]>
  • Loading branch information
ivov and tomi authored Feb 13, 2025
1 parent 47c5688 commit 64c5b6e
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 26 deletions.
29 changes: 27 additions & 2 deletions packages/cli/src/__tests__/active-workflow-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ import type {
import { Workflow } from 'n8n-workflow';

import { ActiveWorkflowManager } from '@/active-workflow-manager';
import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
import type { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import type { NodeTypes } from '@/node-types';

describe('ActiveWorkflowManager', () => {
let activeWorkflowManager: ActiveWorkflowManager;
const instanceSettings = mock<InstanceSettings>();
const instanceSettings = mock<InstanceSettings>({ isMultiMain: false });
const nodeTypes = mock<NodeTypes>();
const workflowRepository = mock<WorkflowRepository>();

beforeEach(() => {
jest.clearAllMocks();
Expand All @@ -27,7 +30,7 @@ describe('ActiveWorkflowManager', () => {
mock(),
nodeTypes,
mock(),
mock(),
workflowRepository,
mock(),
mock(),
mock(),
Expand Down Expand Up @@ -122,5 +125,27 @@ describe('ActiveWorkflowManager', () => {
}
});
});

describe('add', () => {
test.each<[WorkflowActivateMode]>([['init'], ['leadershipChange']])(
'should skip inactive workflow in `%s` activation mode',
async (mode) => {
const checkSpy = jest.spyOn(activeWorkflowManager, 'checkIfWorkflowCanBeActivated');
const addWebhooksSpy = jest.spyOn(activeWorkflowManager, 'addWebhooks');
const addTriggersAndPollersSpy = jest.spyOn(
activeWorkflowManager,
'addTriggersAndPollers',
);
workflowRepository.findById.mockResolvedValue(mock<WorkflowEntity>({ active: false }));

const result = await activeWorkflowManager.add('some-id', mode);

expect(checkSpy).not.toHaveBeenCalled();
expect(addWebhooksSpy).not.toHaveBeenCalled();
expect(addTriggersAndPollersSpy).not.toHaveBeenCalled();
expect(result).toBe(false);
},
);
});
});
});
59 changes: 37 additions & 22 deletions packages/cli/src/active-workflow-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import type {
WorkflowActivateMode,
WorkflowExecuteMode,
INodeType,
WorkflowId,
} from 'n8n-workflow';
import {
Workflow,
Expand All @@ -39,11 +40,13 @@ import {
WORKFLOW_REACTIVATE_INITIAL_TIMEOUT,
WORKFLOW_REACTIVATE_MAX_TIMEOUT,
} from '@/constants';
import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { OnShutdown } from '@/decorators/on-shutdown';
import { executeErrorWorkflow } from '@/execution-lifecycle/execute-error-workflow';
import { ExecutionService } from '@/executions/execution.service';
import { ExternalHooks } from '@/external-hooks';
import type { IWorkflowDb } from '@/interfaces';
import { NodeTypes } from '@/node-types';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { ActiveWorkflowsService } from '@/services/active-workflows.service';
Expand All @@ -59,12 +62,12 @@ interface QueuedActivation {
activationMode: WorkflowActivateMode;
lastTimeout: number;
timeout: NodeJS.Timeout;
workflowData: IWorkflowBase;
workflowData: IWorkflowDb;
}

@Service()
export class ActiveWorkflowManager {
private queuedActivations: { [workflowId: string]: QueuedActivation } = {};
private queuedActivations: Record<WorkflowId, QueuedActivation> = {};

constructor(
private readonly logger: Logger,
Expand Down Expand Up @@ -92,7 +95,6 @@ export class ActiveWorkflowManager {
await this.addActiveWorkflows('init');

await this.externalHooks.run('activeWorkflows.initialized');
await this.webhookService.populateCache();
}

async getAllWorkflowActivationErrors() {
Expand Down Expand Up @@ -134,7 +136,7 @@ export class ActiveWorkflowManager {
* @important Do not confuse with `ActiveWorkflows.isActive()`,
* which checks if the workflow is active in memory.
*/
async isActive(workflowId: string) {
async isActive(workflowId: WorkflowId) {
const workflow = await this.workflowRepository.findOne({
select: ['active'],
where: { id: workflowId },
Expand Down Expand Up @@ -230,7 +232,7 @@ export class ActiveWorkflowManager {
* Remove all webhooks of a workflow from the database, and
* deregister those webhooks from external services.
*/
async clearWebhooks(workflowId: string) {
async clearWebhooks(workflowId: WorkflowId) {
const workflowData = await this.workflowRepository.findOne({
where: { id: workflowId },
});
Expand Down Expand Up @@ -270,7 +272,7 @@ export class ActiveWorkflowManager {
* and overwrites the emit to be able to start it in subprocess
*/
getExecutePollFunctions(
workflowData: IWorkflowBase,
workflowData: IWorkflowDb,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
activation: WorkflowActivateMode,
Expand Down Expand Up @@ -321,7 +323,7 @@ export class ActiveWorkflowManager {
* and overwrites the emit to be able to start it in subprocess
*/
getExecuteTriggerFunctions(
workflowData: IWorkflowBase,
workflowData: IWorkflowDb,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
activation: WorkflowActivateMode,
Expand Down Expand Up @@ -378,7 +380,7 @@ export class ActiveWorkflowManager {
);
this.executeErrorWorkflow(activationError, workflowData, mode);

this.addQueuedWorkflowActivation(activation, workflowData);
this.addQueuedWorkflowActivation(activation, workflowData as WorkflowEntity);
};
return new TriggerContext(workflow, node, additionalData, mode, activation, emit, emitError);
};
Expand Down Expand Up @@ -411,21 +413,21 @@ export class ActiveWorkflowManager {
* only on instance init or (in multi-main setup) on leadership change.
*/
async addActiveWorkflows(activationMode: 'init' | 'leadershipChange') {
const dbWorkflows = await this.workflowRepository.getAllActive();
const dbWorkflowIds = await this.workflowRepository.getAllActiveIds();

if (dbWorkflows.length === 0) return;
if (dbWorkflowIds.length === 0) return;

if (this.instanceSettings.isLeader) {
this.logger.info(' ================================');
this.logger.info(' Start Active Workflows:');
this.logger.info(' ================================');
}

const batches = chunk(dbWorkflows, this.workflowsConfig.activationBatchSize);
const batches = chunk(dbWorkflowIds, this.workflowsConfig.activationBatchSize);

for (const batch of batches) {
const activationPromises = batch.map(async (dbWorkflow) => {
await this.activateWorkflow(dbWorkflow, activationMode);
const activationPromises = batch.map(async (dbWorkflowId) => {
await this.activateWorkflow(dbWorkflowId, activationMode);
});

await Promise.all(activationPromises);
Expand All @@ -435,9 +437,12 @@ export class ActiveWorkflowManager {
}

private async activateWorkflow(
dbWorkflow: IWorkflowBase,
workflowId: WorkflowId,
activationMode: 'init' | 'leadershipChange',
) {
const dbWorkflow = await this.workflowRepository.findById(workflowId);
if (!dbWorkflow) return;

try {
const wasActivated = await this.add(dbWorkflow.id, activationMode, dbWorkflow, {
shouldPublish: false,
Expand Down Expand Up @@ -515,9 +520,9 @@ export class ActiveWorkflowManager {
* since webhooks do not require continuous execution.
*/
async add(
workflowId: string,
workflowId: WorkflowId,
activationMode: WorkflowActivateMode,
existingWorkflow?: IWorkflowBase,
existingWorkflow?: WorkflowEntity,
{ shouldPublish } = { shouldPublish: true },
) {
if (this.instanceSettings.isMultiMain && shouldPublish) {
Expand Down Expand Up @@ -547,6 +552,16 @@ export class ActiveWorkflowManager {
});
}

if (['init', 'leadershipChange'].includes(activationMode) && !dbWorkflow.active) {
this.logger.debug(
`Skipping workflow ${formatWorkflow(dbWorkflow)} as it is no longer active`,
{
workflowId: dbWorkflow.id,
},
);
return false;
}

if (shouldDisplayActivationMessage) {
this.logger.debug(`Initializing active workflow ${formatWorkflow(dbWorkflow)} (startup)`, {
workflowName: dbWorkflow.name,
Expand Down Expand Up @@ -672,7 +687,7 @@ export class ActiveWorkflowManager {
*/
private addQueuedWorkflowActivation(
activationMode: WorkflowActivateMode,
workflowData: IWorkflowBase,
workflowData: WorkflowEntity,
) {
const workflowId = workflowData.id;
const workflowName = workflowData.name;
Expand Down Expand Up @@ -729,7 +744,7 @@ export class ActiveWorkflowManager {
/**
* Remove a workflow from the activation queue
*/
private removeQueuedWorkflowActivation(workflowId: string) {
private removeQueuedWorkflowActivation(workflowId: WorkflowId) {
if (this.queuedActivations[workflowId]) {
clearTimeout(this.queuedActivations[workflowId].timeout);
delete this.queuedActivations[workflowId];
Expand All @@ -752,7 +767,7 @@ export class ActiveWorkflowManager {
*/
// TODO: this should happen in a transaction
// maybe, see: https://github.com/n8n-io/n8n/pull/8904#discussion_r1530150510
async remove(workflowId: string) {
async remove(workflowId: WorkflowId) {
if (this.instanceSettings.isMultiMain) {
try {
await this.clearWebhooks(workflowId);
Expand Down Expand Up @@ -794,7 +809,7 @@ export class ActiveWorkflowManager {
/**
* Stop running active triggers and pollers for a workflow.
*/
async removeWorkflowTriggersAndPollers(workflowId: string) {
async removeWorkflowTriggersAndPollers(workflowId: WorkflowId) {
if (!this.activeWorkflows.isActive(workflowId)) return;

const wasRemoved = await this.activeWorkflows.remove(workflowId);
Expand All @@ -810,7 +825,7 @@ export class ActiveWorkflowManager {
* Register as active in memory a trigger- or poller-based workflow.
*/
async addTriggersAndPollers(
dbWorkflow: IWorkflowBase,
dbWorkflow: WorkflowEntity,
workflow: Workflow,
{
activationMode,
Expand Down Expand Up @@ -856,7 +871,7 @@ export class ActiveWorkflowManager {
}
}

async removeActivationError(workflowId: string) {
async removeActivationError(workflowId: WorkflowId) {
await this.activationErrorsService.deregister(workflowId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ export class WorkflowRepository extends Repository<WorkflowEntity> {
});
}

async getAllActive() {
return await this.find({
async getAllActiveIds() {
const result = await this.find({
select: { id: true },
where: { active: true },
relations: { shared: { project: { projectRelations: true } } },
});

return result.map(({ id }) => id);
}

async getActiveIds({ maxResults }: { maxResults?: number } = {}) {
Expand Down

0 comments on commit 64c5b6e

Please sign in to comment.