diff --git a/packages/editor-ui/src/Interface.ts b/packages/editor-ui/src/Interface.ts
index af71d5349cabf..fc4ebbce15e80 100644
--- a/packages/editor-ui/src/Interface.ts
+++ b/packages/editor-ui/src/Interface.ts
@@ -889,7 +889,6 @@ export interface RootState {
endpointWebhook: string;
endpointWebhookTest: string;
endpointWebhookWaiting: string;
- pushConnectionActive: boolean;
timezone: string;
executionTimeout: number;
maxExecutionTimeout: number;
diff --git a/packages/editor-ui/src/components/CanvasChat/CanvasChat.test.ts b/packages/editor-ui/src/components/CanvasChat/CanvasChat.test.ts
index 41f27c39c06ba..153b7eabb9b22 100644
--- a/packages/editor-ui/src/components/CanvasChat/CanvasChat.test.ts
+++ b/packages/editor-ui/src/components/CanvasChat/CanvasChat.test.ts
@@ -38,6 +38,13 @@ vi.mock('@/composables/useToast', () => {
},
};
});
+
+vi.mock('@/stores/pushConnection.store', () => ({
+ usePushConnectionStore: vi.fn().mockReturnValue({
+ isConnected: true,
+ }),
+}));
+
// Test data
const mockNodes: INodeUi[] = [
{
diff --git a/packages/editor-ui/src/components/MainHeader/WorkflowDetails.test.ts b/packages/editor-ui/src/components/MainHeader/WorkflowDetails.test.ts
index d7392a149fa66..06d5555286c87 100644
--- a/packages/editor-ui/src/components/MainHeader/WorkflowDetails.test.ts
+++ b/packages/editor-ui/src/components/MainHeader/WorkflowDetails.test.ts
@@ -11,6 +11,12 @@ vi.mock('vue-router', () => ({
RouterLink: vi.fn(),
}));
+vi.mock('@/stores/pushConnection.store', () => ({
+ usePushConnectionStore: vi.fn().mockReturnValue({
+ isConnected: true,
+ }),
+}));
+
const initialState = {
[STORES.SETTINGS]: {
settings: {
diff --git a/packages/editor-ui/src/components/PushConnectionTracker.test.ts b/packages/editor-ui/src/components/PushConnectionTracker.test.ts
new file mode 100644
index 0000000000000..5e6c37157c848
--- /dev/null
+++ b/packages/editor-ui/src/components/PushConnectionTracker.test.ts
@@ -0,0 +1,58 @@
+import { createComponentRenderer } from '@/__tests__/render';
+import PushConnectionTracker from '@/components/PushConnectionTracker.vue';
+import { STORES } from '@/constants';
+import { createTestingPinia } from '@pinia/testing';
+import { setActivePinia } from 'pinia';
+
+let isConnected = true;
+let isConnectionRequested = true;
+
+vi.mock('@/stores/pushConnection.store', () => {
+ return {
+ usePushConnectionStore: vi.fn(() => ({
+ isConnected,
+ isConnectionRequested,
+ })),
+ };
+});
+
+describe('PushConnectionTracker', () => {
+ const render = () => {
+ const pinia = createTestingPinia({
+ stubActions: false,
+ initialState: {
+ [STORES.PUSH]: {
+ isConnected,
+ isConnectionRequested,
+ },
+ },
+ });
+ setActivePinia(pinia);
+
+ return createComponentRenderer(PushConnectionTracker)();
+ };
+
+ it('should not render error when connected and connection requested', () => {
+ isConnected = true;
+ isConnectionRequested = true;
+ const { container } = render();
+
+ expect(container).toMatchSnapshot();
+ });
+
+ it('should render error when disconnected and connection requested', () => {
+ isConnected = false;
+ isConnectionRequested = true;
+ const { container } = render();
+
+ expect(container).toMatchSnapshot();
+ });
+
+ it('should not render error when connected and connection not requested', () => {
+ isConnected = true;
+ isConnectionRequested = false;
+ const { container } = render();
+
+ expect(container).toMatchSnapshot();
+ });
+});
diff --git a/packages/editor-ui/src/components/PushConnectionTracker.vue b/packages/editor-ui/src/components/PushConnectionTracker.vue
index 66175c2af400e..0ce23634b0ac5 100644
--- a/packages/editor-ui/src/components/PushConnectionTracker.vue
+++ b/packages/editor-ui/src/components/PushConnectionTracker.vue
@@ -1,16 +1,23 @@
-
+
diff --git a/packages/editor-ui/src/components/__snapshots__/PushConnectionTracker.test.ts.snap b/packages/editor-ui/src/components/__snapshots__/PushConnectionTracker.test.ts.snap
new file mode 100644
index 0000000000000..5e72b2eb94652
--- /dev/null
+++ b/packages/editor-ui/src/components/__snapshots__/PushConnectionTracker.test.ts.snap
@@ -0,0 +1,55 @@
+// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html
+
+exports[`PushConnectionTracker > should not render error when connected and connection not requested 1`] = `
+
+
+
+
+
+
+`;
+
+exports[`PushConnectionTracker > should not render error when connected and connection requested 1`] = `
+
+
+
+
+
+
+`;
+
+exports[`PushConnectionTracker > should render error when disconnected and connection requested 1`] = `
+
+
+
+
+
+
+ Connection lost
+
+
+
+
+
+
+
+`;
diff --git a/packages/editor-ui/src/composables/useRunWorkflow.test.ts b/packages/editor-ui/src/composables/useRunWorkflow.test.ts
index f6a38d471ebb2..6b8505c5fe284 100644
--- a/packages/editor-ui/src/composables/useRunWorkflow.test.ts
+++ b/packages/editor-ui/src/composables/useRunWorkflow.test.ts
@@ -11,7 +11,6 @@ import {
type ITaskData,
} from 'n8n-workflow';
-import { useRootStore } from '@/stores/root.store';
import { useRunWorkflow } from '@/composables/useRunWorkflow';
import type { IStartRunData, IWorkflowData } from '@/Interface';
import { useWorkflowsStore } from '@/stores/workflows.store';
@@ -21,6 +20,7 @@ import { useToast } from './useToast';
import { useI18n } from '@/composables/useI18n';
import { captor, mock } from 'vitest-mock-extended';
import { useSettingsStore } from '@/stores/settings.store';
+import { usePushConnectionStore } from '@/stores/pushConnection.store';
vi.mock('@/stores/workflows.store', () => ({
useWorkflowsStore: vi.fn().mockReturnValue({
@@ -40,6 +40,12 @@ vi.mock('@/stores/workflows.store', () => ({
}),
}));
+vi.mock('@/stores/pushConnection.store', () => ({
+ usePushConnectionStore: vi.fn().mockReturnValue({
+ isConnected: true,
+ }),
+}));
+
vi.mock('@/composables/useTelemetry', () => ({
useTelemetry: vi.fn().mockReturnValue({ track: vi.fn() }),
}));
@@ -90,7 +96,7 @@ vi.mock('vue-router', async (importOriginal) => {
});
describe('useRunWorkflow({ router })', () => {
- let rootStore: ReturnType;
+ let pushConnectionStore: ReturnType;
let uiStore: ReturnType;
let workflowsStore: ReturnType;
let router: ReturnType;
@@ -102,7 +108,7 @@ describe('useRunWorkflow({ router })', () => {
setActivePinia(pinia);
- rootStore = useRootStore();
+ pushConnectionStore = usePushConnectionStore();
uiStore = useUIStore();
workflowsStore = useWorkflowsStore();
settingsStore = useSettingsStore();
@@ -120,7 +126,7 @@ describe('useRunWorkflow({ router })', () => {
it('should throw an error if push connection is not active', async () => {
const { runWorkflowApi } = useRunWorkflow({ router });
- rootStore.setPushConnectionInactive();
+ vi.mocked(pushConnectionStore).isConnected = false;
await expect(runWorkflowApi({} as IStartRunData)).rejects.toThrow(
'workflowRun.noActiveConnectionToTheServer',
@@ -130,7 +136,7 @@ describe('useRunWorkflow({ router })', () => {
it('should successfully run a workflow', async () => {
const { runWorkflowApi } = useRunWorkflow({ router });
- rootStore.setPushConnectionActive();
+ vi.mocked(pushConnectionStore).isConnected = true;
const mockResponse = { executionId: '123', waitingForWebhook: false };
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockResponse);
@@ -161,7 +167,7 @@ describe('useRunWorkflow({ router })', () => {
it('should handle workflow run failure', async () => {
const { runWorkflowApi } = useRunWorkflow({ router });
- rootStore.setPushConnectionActive();
+ vi.mocked(pushConnectionStore).isConnected = true;
vi.mocked(workflowsStore).runWorkflow.mockRejectedValue(new Error('Failed to run workflow'));
await expect(runWorkflowApi({} as IStartRunData)).rejects.toThrow('Failed to run workflow');
@@ -171,7 +177,7 @@ describe('useRunWorkflow({ router })', () => {
it('should set waitingForWebhook if response indicates waiting', async () => {
const { runWorkflowApi } = useRunWorkflow({ router });
- rootStore.setPushConnectionActive();
+ vi.mocked(pushConnectionStore).isConnected = true;
const mockResponse = { executionId: '123', waitingForWebhook: true };
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockResponse);
@@ -210,6 +216,7 @@ describe('useRunWorkflow({ router })', () => {
type: 'error',
});
});
+
it('should execute workflow has pin data and is active with single webhook trigger', async () => {
const pinia = createTestingPinia({ stubActions: false });
setActivePinia(pinia);
@@ -295,7 +302,7 @@ describe('useRunWorkflow({ router })', () => {
const mockExecutionResponse = { executionId: '123' };
const { runWorkflow } = useRunWorkflow({ router });
- vi.mocked(rootStore).pushConnectionActive = true;
+ vi.mocked(pushConnectionStore).isConnected = true;
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).nodesIssuesExist = false;
vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue({
@@ -405,7 +412,7 @@ describe('useRunWorkflow({ router })', () => {
workflow.getParentNodes.mockReturnValue([]);
vi.mocked(settingsStore).partialExecutionVersion = 1;
- vi.mocked(rootStore).pushConnectionActive = true;
+ vi.mocked(pushConnectionStore).isConnected = true;
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).nodesIssuesExist = false;
vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue(workflow);
@@ -436,7 +443,7 @@ describe('useRunWorkflow({ router })', () => {
workflow.getParentNodes.mockReturnValue([]);
vi.mocked(settingsStore).partialExecutionVersion = 2;
- vi.mocked(rootStore).pushConnectionActive = true;
+ vi.mocked(pushConnectionStore).isConnected = true;
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).nodesIssuesExist = false;
vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue(workflow);
@@ -465,7 +472,7 @@ describe('useRunWorkflow({ router })', () => {
workflow.getParentNodes.mockReturnValue([]);
vi.mocked(settingsStore).partialExecutionVersion = 2;
- vi.mocked(rootStore).pushConnectionActive = true;
+ vi.mocked(pushConnectionStore).isConnected = true;
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).nodesIssuesExist = false;
vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue(workflow);
diff --git a/packages/editor-ui/src/composables/useRunWorkflow.ts b/packages/editor-ui/src/composables/useRunWorkflow.ts
index ca84f21ecf9b4..cdcec5f5a432d 100644
--- a/packages/editor-ui/src/composables/useRunWorkflow.ts
+++ b/packages/editor-ui/src/composables/useRunWorkflow.ts
@@ -36,6 +36,7 @@ import { useI18n } from '@/composables/useI18n';
import { get } from 'lodash-es';
import { useExecutionsStore } from '@/stores/executions.store';
import { useSettingsStore } from '@/stores/settings.store';
+import { usePushConnectionStore } from '@/stores/pushConnection.store';
const getDirtyNodeNames = (
runData: IRunData,
@@ -63,12 +64,13 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType {
- if (!rootStore.pushConnectionActive) {
+ if (!pushConnectionStore.isConnected) {
// Do not start if the connection to server is not active
// because then it can not receive the data as it executes.
throw new Error(i18n.baseText('workflowRun.noActiveConnectionToTheServer'));
diff --git a/packages/editor-ui/src/push-connection/__tests__/mockEventSource.ts b/packages/editor-ui/src/push-connection/__tests__/mockEventSource.ts
new file mode 100644
index 0000000000000..e7b7afbd7518d
--- /dev/null
+++ b/packages/editor-ui/src/push-connection/__tests__/mockEventSource.ts
@@ -0,0 +1,20 @@
+/** Mocked EventSource class to help testing */
+export class MockEventSource extends EventTarget {
+ constructor(public url: string) {
+ super();
+ }
+
+ simulateConnectionOpen() {
+ this.dispatchEvent(new Event('open'));
+ }
+
+ simulateConnectionClose() {
+ this.dispatchEvent(new Event('close'));
+ }
+
+ simulateMessageEvent(data: string) {
+ this.dispatchEvent(new MessageEvent('message', { data }));
+ }
+
+ close = vi.fn();
+}
diff --git a/packages/editor-ui/src/push-connection/__tests__/mockWebSocketClient.ts b/packages/editor-ui/src/push-connection/__tests__/mockWebSocketClient.ts
new file mode 100644
index 0000000000000..cbeebe0161431
--- /dev/null
+++ b/packages/editor-ui/src/push-connection/__tests__/mockWebSocketClient.ts
@@ -0,0 +1,32 @@
+import { WebSocketState } from '@/push-connection/useWebSocketClient';
+
+/** Mocked WebSocket class to help testing */
+export class MockWebSocket extends EventTarget {
+ readyState: number = WebSocketState.CONNECTING;
+
+ constructor(public url: string) {
+ super();
+ }
+
+ simulateConnectionOpen() {
+ this.dispatchEvent(new Event('open'));
+ this.readyState = WebSocketState.OPEN;
+ }
+
+ simulateConnectionClose(code: number) {
+ this.dispatchEvent(new CloseEvent('close', { code }));
+ this.readyState = WebSocketState.CLOSED;
+ }
+
+ simulateMessageEvent(data: string) {
+ this.dispatchEvent(new MessageEvent('message', { data }));
+ }
+
+ dispatchErrorEvent() {
+ this.dispatchEvent(new Event('error'));
+ }
+
+ send = vi.fn();
+
+ close = vi.fn();
+}
diff --git a/packages/editor-ui/src/push-connection/__tests__/useEventSourceClient.test.ts b/packages/editor-ui/src/push-connection/__tests__/useEventSourceClient.test.ts
new file mode 100644
index 0000000000000..706f5c576680b
--- /dev/null
+++ b/packages/editor-ui/src/push-connection/__tests__/useEventSourceClient.test.ts
@@ -0,0 +1,153 @@
+import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest';
+import { useEventSourceClient } from '../useEventSourceClient';
+import { MockEventSource } from './mockEventSource';
+
+describe('useEventSourceClient', () => {
+ let mockEventSource: MockEventSource;
+
+ beforeEach(() => {
+ mockEventSource = new MockEventSource('http://test.com');
+
+ // @ts-expect-error - mock EventSource
+ global.EventSource = vi.fn(() => mockEventSource);
+
+ vi.useFakeTimers();
+ });
+
+ afterEach(() => {
+ vi.clearAllTimers();
+ vi.clearAllMocks();
+ });
+
+ test('should create EventSource connection with provided URL', () => {
+ const url = 'http://test.com';
+ const onMessage = vi.fn();
+
+ const { connect } = useEventSourceClient({ url, onMessage });
+ connect();
+
+ expect(EventSource).toHaveBeenCalledWith(url, { withCredentials: true });
+ });
+
+ test('should update connection status on successful connection', () => {
+ const { connect, isConnected } = useEventSourceClient({
+ url: 'http://test.com',
+ onMessage: vi.fn(),
+ });
+ connect();
+
+ mockEventSource.simulateConnectionOpen();
+
+ expect(isConnected.value).toBe(true);
+ });
+
+ test('should handle incoming messages', () => {
+ const onMessage = vi.fn();
+ const { connect } = useEventSourceClient({ url: 'http://test.com', onMessage });
+ connect();
+
+ mockEventSource.simulateMessageEvent('test data');
+
+ expect(onMessage).toHaveBeenCalledWith('test data');
+ });
+
+ test('should handle disconnection', () => {
+ const { connect, disconnect, isConnected } = useEventSourceClient({
+ url: 'http://test.com',
+ onMessage: vi.fn(),
+ });
+ connect();
+
+ // Simulate successful connection
+ mockEventSource.simulateConnectionOpen();
+ expect(isConnected.value).toBe(true);
+
+ disconnect();
+
+ expect(isConnected.value).toBe(false);
+ expect(mockEventSource.close).toHaveBeenCalled();
+ });
+
+ test('should handle connection loss', () => {
+ const { connect, isConnected } = useEventSourceClient({
+ url: 'http://test.com',
+ onMessage: vi.fn(),
+ });
+ connect();
+ expect(EventSource).toHaveBeenCalledTimes(1);
+
+ // Simulate successful connection
+ mockEventSource.simulateConnectionOpen();
+ expect(isConnected.value).toBe(true);
+
+ // Simulate connection loss
+ mockEventSource.simulateConnectionClose();
+ expect(isConnected.value).toBe(false);
+
+ // Advance timer to trigger reconnect
+ vi.advanceTimersByTime(1_000);
+ expect(EventSource).toHaveBeenCalledTimes(2);
+ });
+
+ test('sendMessage should be a noop function', () => {
+ const { connect, sendMessage } = useEventSourceClient({
+ url: 'http://test.com',
+ onMessage: vi.fn(),
+ });
+ connect();
+
+ // Simulate successful connection
+ mockEventSource.simulateConnectionOpen();
+
+ const message = 'test message';
+ // Should not throw error and should do nothing
+ expect(() => sendMessage(message)).not.toThrow();
+ });
+
+ test('should attempt reconnection with increasing delays', () => {
+ const { connect } = useEventSourceClient({
+ url: 'http://test.com',
+ onMessage: vi.fn(),
+ });
+ connect();
+
+ mockEventSource.simulateConnectionOpen();
+ mockEventSource.simulateConnectionClose();
+
+ // First reconnection attempt after 1 second
+ vi.advanceTimersByTime(1_000);
+ expect(EventSource).toHaveBeenCalledTimes(2);
+
+ mockEventSource.simulateConnectionClose();
+
+ // Second reconnection attempt after 2 seconds
+ vi.advanceTimersByTime(2_000);
+ expect(EventSource).toHaveBeenCalledTimes(3);
+ });
+
+ test('should reset connection attempts on successful connection', () => {
+ const { connect } = useEventSourceClient({
+ url: 'http://test.com',
+ onMessage: vi.fn(),
+ });
+ connect();
+
+ // First connection attempt
+ mockEventSource.simulateConnectionOpen();
+ mockEventSource.simulateConnectionClose();
+
+ // First reconnection attempt
+ vi.advanceTimersByTime(1_000);
+ expect(EventSource).toHaveBeenCalledTimes(2);
+
+ // Successful connection
+ mockEventSource.simulateConnectionOpen();
+
+ // Connection lost again
+ mockEventSource.simulateConnectionClose();
+
+ // Should start with initial delay again
+ vi.advanceTimersByTime(1_000);
+ expect(EventSource).toHaveBeenCalledTimes(3);
+ });
+});
diff --git a/packages/editor-ui/src/push-connection/__tests__/useHeartbeat.test.ts b/packages/editor-ui/src/push-connection/__tests__/useHeartbeat.test.ts
new file mode 100644
index 0000000000000..7640e1fe170c4
--- /dev/null
+++ b/packages/editor-ui/src/push-connection/__tests__/useHeartbeat.test.ts
@@ -0,0 +1,88 @@
+import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
+import { useHeartbeat } from '../useHeartbeat';
+
+describe('useHeartbeat', () => {
+ beforeEach(() => {
+ vi.useFakeTimers();
+ });
+
+ afterEach(() => {
+ vi.clearAllTimers();
+ });
+
+ it('should start heartbeat and call onHeartbeat at specified intervals', () => {
+ const onHeartbeat = vi.fn();
+ const interval = 1000;
+
+ const heartbeat = useHeartbeat({ interval, onHeartbeat });
+ heartbeat.startHeartbeat();
+
+ // Initially, the callback should not be called
+ expect(onHeartbeat).not.toHaveBeenCalled();
+
+ // Advance timer by interval
+ vi.advanceTimersByTime(interval);
+ expect(onHeartbeat).toHaveBeenCalledTimes(1);
+
+ // Advance timer by another interval
+ vi.advanceTimersByTime(interval);
+ expect(onHeartbeat).toHaveBeenCalledTimes(2);
+ });
+
+ it('should stop heartbeat when stopHeartbeat is called', () => {
+ const onHeartbeat = vi.fn();
+ const interval = 1000;
+
+ const heartbeat = useHeartbeat({ interval, onHeartbeat });
+ heartbeat.startHeartbeat();
+
+ // Advance timer by interval
+ vi.advanceTimersByTime(interval);
+ expect(onHeartbeat).toHaveBeenCalledTimes(1);
+
+ // Stop the heartbeat
+ heartbeat.stopHeartbeat();
+
+ // Advance timer by multiple intervals
+ vi.advanceTimersByTime(interval * 3);
+ expect(onHeartbeat).toHaveBeenCalledTimes(1); // Should still be 1
+ });
+
+ it('should be safe to call stopHeartbeat multiple times', () => {
+ const onHeartbeat = vi.fn();
+ const interval = 1000;
+
+ const heartbeat = useHeartbeat({ interval, onHeartbeat });
+ heartbeat.startHeartbeat();
+
+ // Stop multiple times
+ heartbeat.stopHeartbeat();
+ heartbeat.stopHeartbeat();
+ heartbeat.stopHeartbeat();
+
+ vi.advanceTimersByTime(interval * 2);
+ expect(onHeartbeat).not.toHaveBeenCalled();
+ });
+
+ it('should restart heartbeat after stopping', () => {
+ const onHeartbeat = vi.fn();
+ const interval = 1000;
+
+ const heartbeat = useHeartbeat({ interval, onHeartbeat });
+
+ // First start
+ heartbeat.startHeartbeat();
+ vi.advanceTimersByTime(interval);
+ expect(onHeartbeat).toHaveBeenCalledTimes(1);
+
+ // Stop
+ heartbeat.stopHeartbeat();
+ vi.advanceTimersByTime(interval);
+ expect(onHeartbeat).toHaveBeenCalledTimes(1);
+
+ // Restart
+ heartbeat.startHeartbeat();
+ vi.advanceTimersByTime(interval);
+ expect(onHeartbeat).toHaveBeenCalledTimes(2);
+ });
+});
diff --git a/packages/editor-ui/src/push-connection/__tests__/useWebSocketClient.test.ts b/packages/editor-ui/src/push-connection/__tests__/useWebSocketClient.test.ts
new file mode 100644
index 0000000000000..a7e12f0542433
--- /dev/null
+++ b/packages/editor-ui/src/push-connection/__tests__/useWebSocketClient.test.ts
@@ -0,0 +1,140 @@
+import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest';
+import { useWebSocketClient } from '../useWebSocketClient';
+import { MockWebSocket } from './mockWebSocketClient';
+
+describe('useWebSocketClient', () => {
+ let mockWebSocket: MockWebSocket;
+
+ beforeEach(() => {
+ mockWebSocket = new MockWebSocket('ws://test.com');
+
+ // @ts-expect-error - mock WebSocket
+ global.WebSocket = vi.fn(() => mockWebSocket);
+
+ vi.useFakeTimers();
+ });
+
+ afterEach(() => {
+ vi.clearAllTimers();
+ vi.clearAllMocks();
+ });
+
+ test('should create WebSocket connection with provided URL', () => {
+ const url = 'ws://test.com';
+ const onMessage = vi.fn();
+
+ const { connect } = useWebSocketClient({ url, onMessage });
+ connect();
+
+ expect(WebSocket).toHaveBeenCalledWith(url);
+ });
+
+ test('should update connection status and start heartbeat on successful connection', () => {
+ const { connect, isConnected } = useWebSocketClient({
+ url: 'ws://test.com',
+ onMessage: vi.fn(),
+ });
+ connect();
+
+ mockWebSocket.simulateConnectionOpen();
+
+ expect(isConnected.value).toBe(true);
+
+ // Advance timer to trigger heartbeat
+ vi.advanceTimersByTime(30_000);
+ expect(mockWebSocket.send).toHaveBeenCalledWith(JSON.stringify({ type: 'heartbeat' }));
+ });
+
+ test('should handle incoming messages', () => {
+ const onMessage = vi.fn();
+ const { connect } = useWebSocketClient({ url: 'ws://test.com', onMessage });
+ connect();
+
+ mockWebSocket.simulateMessageEvent('test data');
+
+ expect(onMessage).toHaveBeenCalledWith('test data');
+ });
+
+ test('should handle disconnection', () => {
+ const { connect, disconnect, isConnected } = useWebSocketClient({
+ url: 'ws://test.com',
+ onMessage: vi.fn(),
+ });
+ connect();
+
+ // Simulate successful connection
+ mockWebSocket.simulateConnectionOpen();
+
+ expect(isConnected.value).toBe(true);
+
+ disconnect();
+
+ expect(isConnected.value).toBe(false);
+ expect(mockWebSocket.close).toHaveBeenCalledWith(1000);
+ });
+
+ test('should handle connection loss', () => {
+ const { connect, isConnected } = useWebSocketClient({
+ url: 'ws://test.com',
+ onMessage: vi.fn(),
+ });
+ connect();
+ expect(WebSocket).toHaveBeenCalledTimes(1);
+
+ // Simulate successful connection
+ mockWebSocket.simulateConnectionOpen();
+
+ expect(isConnected.value).toBe(true);
+
+ // Simulate connection loss
+ mockWebSocket.simulateConnectionClose(1006);
+
+ expect(isConnected.value).toBe(false);
+ // Advance timer to reconnect
+ vi.advanceTimersByTime(1_000);
+ expect(WebSocket).toHaveBeenCalledTimes(2);
+ });
+
+ test('should throw error when trying to send message while disconnected', () => {
+ const { sendMessage } = useWebSocketClient({ url: 'ws://test.com', onMessage: vi.fn() });
+
+ expect(() => sendMessage('test')).toThrow('Not connected to the server');
+ });
+
+ test('should attempt reconnection with increasing delays', () => {
+ const { connect } = useWebSocketClient({
+ url: 'http://test.com',
+ onMessage: vi.fn(),
+ });
+ connect();
+
+ mockWebSocket.simulateConnectionOpen();
+ mockWebSocket.simulateConnectionClose(1006);
+
+ // First reconnection attempt after 1 second
+ vi.advanceTimersByTime(1_000);
+ expect(WebSocket).toHaveBeenCalledTimes(2);
+
+ mockWebSocket.simulateConnectionClose(1006);
+
+ // Second reconnection attempt after 2 seconds
+ vi.advanceTimersByTime(2_000);
+ expect(WebSocket).toHaveBeenCalledTimes(3);
+ });
+
+ test('should send message when connected', () => {
+ const { connect, sendMessage } = useWebSocketClient({
+ url: 'ws://test.com',
+ onMessage: vi.fn(),
+ });
+ connect();
+
+ // Simulate successful connection
+ mockWebSocket.simulateConnectionOpen();
+
+ const message = 'test message';
+ sendMessage(message);
+
+ expect(mockWebSocket.send).toHaveBeenCalledWith(message);
+ });
+});
diff --git a/packages/editor-ui/src/push-connection/useEventSourceClient.ts b/packages/editor-ui/src/push-connection/useEventSourceClient.ts
new file mode 100644
index 0000000000000..575df67b9a6aa
--- /dev/null
+++ b/packages/editor-ui/src/push-connection/useEventSourceClient.ts
@@ -0,0 +1,69 @@
+import { useReconnectTimer } from '@/push-connection/useReconnectTimer';
+import { ref } from 'vue';
+
+export type UseEventSourceClientOptions = {
+ url: string;
+ onMessage: (data: string) => void;
+};
+
+/**
+ * Creates an EventSource connection to the server. Uses reconnection logic
+ * to reconnect if the connection is lost.
+ */
+export const useEventSourceClient = (options: UseEventSourceClientOptions) => {
+ const isConnected = ref(false);
+ const eventSource = ref(null);
+
+ const onConnected = () => {
+ isConnected.value = true;
+ reconnectTimer.resetConnectionAttempts();
+ };
+
+ const onConnectionLost = () => {
+ console.warn('[EventSourceClient] Connection lost');
+ isConnected.value = false;
+ reconnectTimer.scheduleReconnect();
+ };
+
+ const onMessage = (event: MessageEvent) => {
+ options.onMessage(event.data);
+ };
+
+ const disconnect = () => {
+ if (eventSource.value) {
+ reconnectTimer.stopReconnectTimer();
+ eventSource.value.close();
+ eventSource.value = null;
+ }
+
+ isConnected.value = false;
+ };
+
+ const connect = () => {
+ // Ensure we disconnect any existing connection
+ disconnect();
+
+ eventSource.value = new EventSource(options.url, { withCredentials: true });
+ eventSource.value.addEventListener('open', onConnected);
+ eventSource.value.addEventListener('message', onMessage);
+ eventSource.value.addEventListener('close', onConnectionLost);
+ };
+
+ const reconnectTimer = useReconnectTimer({
+ onAttempt: connect,
+ onAttemptScheduled: (delay) => {
+ console.log(`[EventSourceClient] Attempting to reconnect in ${delay}ms`);
+ },
+ });
+
+ const sendMessage = (_: string) => {
+ // Noop, EventSource does not support sending messages
+ };
+
+ return {
+ isConnected,
+ connect,
+ disconnect,
+ sendMessage,
+ };
+};
diff --git a/packages/editor-ui/src/push-connection/useHeartbeat.ts b/packages/editor-ui/src/push-connection/useHeartbeat.ts
new file mode 100644
index 0000000000000..22fb8a7b54c4c
--- /dev/null
+++ b/packages/editor-ui/src/push-connection/useHeartbeat.ts
@@ -0,0 +1,32 @@
+import { ref } from 'vue';
+
+export type UseHeartbeatOptions = {
+ interval: number;
+ onHeartbeat: () => void;
+};
+
+/**
+ * Creates a heartbeat timer using the given interval. The timer needs
+ * to be started and stopped manually.
+ */
+export const useHeartbeat = (options: UseHeartbeatOptions) => {
+ const { interval, onHeartbeat } = options;
+
+ const heartbeatTimer = ref | null>(null);
+
+ const startHeartbeat = () => {
+ heartbeatTimer.value = setInterval(onHeartbeat, interval);
+ };
+
+ const stopHeartbeat = () => {
+ if (heartbeatTimer.value) {
+ clearInterval(heartbeatTimer.value);
+ heartbeatTimer.value = null;
+ }
+ };
+
+ return {
+ startHeartbeat,
+ stopHeartbeat,
+ };
+};
diff --git a/packages/editor-ui/src/push-connection/useReconnectTimer.ts b/packages/editor-ui/src/push-connection/useReconnectTimer.ts
new file mode 100644
index 0000000000000..695b4ab611bf1
--- /dev/null
+++ b/packages/editor-ui/src/push-connection/useReconnectTimer.ts
@@ -0,0 +1,48 @@
+import { ref } from 'vue';
+
+type UseReconnectTimerOptions = {
+ /** Callback that an attempt should be made */
+ onAttempt: () => void;
+
+ /** Callback that a future attempt was scheduled */
+ onAttemptScheduled: (delay: number) => void;
+};
+
+/**
+ * A timer for exponential backoff reconnect attempts.
+ */
+export const useReconnectTimer = ({ onAttempt, onAttemptScheduled }: UseReconnectTimerOptions) => {
+ const initialReconnectDelay = 1000;
+ const maxReconnectDelay = 15_000;
+
+ const reconnectTimer = ref | null>(null);
+ const reconnectAttempts = ref(0);
+
+ const scheduleReconnect = () => {
+ const delay = Math.min(initialReconnectDelay * 2 ** reconnectAttempts.value, maxReconnectDelay);
+
+ reconnectAttempts.value++;
+
+ onAttemptScheduled(delay);
+ reconnectTimer.value = setTimeout(() => {
+ onAttempt();
+ }, delay);
+ };
+
+ const stopReconnectTimer = () => {
+ if (reconnectTimer.value) {
+ clearTimeout(reconnectTimer.value);
+ reconnectTimer.value = null;
+ }
+ };
+
+ const resetConnectionAttempts = () => {
+ reconnectAttempts.value = 0;
+ };
+
+ return {
+ scheduleReconnect,
+ stopReconnectTimer,
+ resetConnectionAttempts,
+ };
+};
diff --git a/packages/editor-ui/src/push-connection/useWebSocketClient.ts b/packages/editor-ui/src/push-connection/useWebSocketClient.ts
new file mode 100644
index 0000000000000..235e25be540e7
--- /dev/null
+++ b/packages/editor-ui/src/push-connection/useWebSocketClient.ts
@@ -0,0 +1,106 @@
+import { useHeartbeat } from '@/push-connection/useHeartbeat';
+import { useReconnectTimer } from '@/push-connection/useReconnectTimer';
+import { ref } from 'vue';
+
+export type UseWebSocketClientOptions = {
+ url: string;
+ onMessage: (data: T) => void;
+};
+
+/** Defined here as not available in tests */
+export const WebSocketState = {
+ CONNECTING: 0,
+ OPEN: 1,
+ CLOSING: 2,
+ CLOSED: 3,
+};
+
+/**
+ * Creates a WebSocket connection to the server. Uses reconnection logic
+ * to reconnect if the connection is lost.
+ */
+export const useWebSocketClient = (options: UseWebSocketClientOptions) => {
+ const isConnected = ref(false);
+ const socket = ref(null);
+
+ /**
+ * Heartbeat timer to keep the connection alive. This is an additional
+ * mechanism to the protocol level ping/pong mechanism the server sends.
+ * This is used the ensure the client notices connection issues.
+ */
+ const { startHeartbeat, stopHeartbeat } = useHeartbeat({
+ interval: 30_000,
+ onHeartbeat: () => {
+ socket.value?.send(JSON.stringify({ type: 'heartbeat' }));
+ },
+ });
+
+ const onConnected = () => {
+ socket.value?.removeEventListener('open', onConnected);
+ isConnected.value = true;
+ startHeartbeat();
+ reconnectTimer.resetConnectionAttempts();
+ };
+
+ const onConnectionLost = (event: CloseEvent) => {
+ console.warn(`[WebSocketClient] Connection lost, code=${event.code ?? 'unknown'}`);
+ isConnected.value = false;
+ stopHeartbeat();
+ reconnectTimer.scheduleReconnect();
+ };
+
+ const onMessage = (event: MessageEvent) => {
+ options.onMessage(event.data);
+ };
+
+ const onError = (error: unknown) => {
+ console.warn('[WebSocketClient] Connection error:', error);
+ };
+
+ const disconnect = () => {
+ if (socket.value) {
+ stopHeartbeat();
+ reconnectTimer.stopReconnectTimer();
+ socket.value.removeEventListener('message', onMessage);
+ socket.value.removeEventListener('error', onError);
+ socket.value.removeEventListener('close', onConnectionLost);
+ socket.value.close(1000);
+ socket.value = null;
+ }
+
+ isConnected.value = false;
+ };
+
+ const connect = () => {
+ // Ensure we disconnect any existing connection
+ disconnect();
+
+ socket.value = new WebSocket(options.url);
+ socket.value.addEventListener('open', onConnected);
+ socket.value.addEventListener('message', onMessage);
+ socket.value.addEventListener('error', onError);
+ socket.value.addEventListener('close', onConnectionLost);
+ };
+
+ const reconnectTimer = useReconnectTimer({
+ onAttempt: connect,
+ onAttemptScheduled: (delay) => {
+ console.log(`[WebSocketClient] Attempting to reconnect in ${delay}ms`);
+ },
+ });
+
+ const sendMessage = (serializedMessage: string) => {
+ if (!isConnected.value || !socket.value) {
+ throw new Error('Not connected to the server');
+ }
+
+ socket.value.send(serializedMessage);
+ };
+
+ return {
+ isConnected,
+ connect,
+ disconnect,
+ sendMessage,
+ };
+};
diff --git a/packages/editor-ui/src/stores/pushConnection.store.ts b/packages/editor-ui/src/stores/pushConnection.store.ts
index 26e72f2f46384..33a0ede331d09 100644
--- a/packages/editor-ui/src/stores/pushConnection.store.ts
+++ b/packages/editor-ui/src/stores/pushConnection.store.ts
@@ -1,22 +1,12 @@
import { defineStore } from 'pinia';
-import { ref, computed } from 'vue';
+import { computed, ref, watch } from 'vue';
import type { PushMessage } from '@n8n/api-types';
-import { STORES, TIME } from '@/constants';
+import { STORES } from '@/constants';
import { useSettingsStore } from './settings.store';
import { useRootStore } from './root.store';
-
-export interface PushState {
- pushRef: string;
- pushSource: WebSocket | EventSource | null;
- reconnectTimeout: NodeJS.Timeout | null;
- retryTimeout: NodeJS.Timeout | null;
- pushMessageQueue: Array<{ event: Event; retriesLeft: number }>;
- connectRetries: number;
- lostConnection: boolean;
- outgoingQueue: unknown[];
- isConnectionOpen: boolean;
-}
+import { useWebSocketClient } from '@/push-connection/useWebSocketClient';
+import { useEventSourceClient } from '@/push-connection/useEventSourceClient';
export type OnPushMessageHandler = (event: PushMessage) => void;
@@ -27,18 +17,20 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => {
const rootStore = useRootStore();
const settingsStore = useSettingsStore();
- const pushRef = computed(() => rootStore.pushRef);
- const pushSource = ref(null);
- const reconnectTimeout = ref(null);
- const connectRetries = ref(0);
- const lostConnection = ref(false);
+ /**
+ * Queue of messages to be sent to the server. Messages are queued if
+ * the connection is down.
+ */
const outgoingQueue = ref([]);
- const isConnectionOpen = ref(false);
+
+ /** Whether the connection has been requested */
+ const isConnectionRequested = ref(false);
const onMessageReceivedHandlers = ref([]);
const addEventListener = (handler: OnPushMessageHandler) => {
onMessageReceivedHandlers.value.push(handler);
+
return () => {
const index = onMessageReceivedHandlers.value.indexOf(handler);
if (index !== -1) {
@@ -47,123 +39,86 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => {
};
};
- function onConnectionError() {
- pushDisconnect();
- connectRetries.value++;
- reconnectTimeout.value = setTimeout(
- attemptReconnect,
- Math.min(connectRetries.value * 2000, 8 * TIME.SECOND), // maximum 8 seconds backoff
- );
- }
-
- /**
- * Close connection to server
- */
- function pushDisconnect() {
- if (pushSource.value !== null) {
- pushSource.value.removeEventListener('error', onConnectionError);
- pushSource.value.removeEventListener('close', onConnectionError);
- pushSource.value.removeEventListener('message', pushMessageReceived);
- if (pushSource.value.readyState < 2) pushSource.value.close();
- pushSource.value = null;
- }
-
- isConnectionOpen.value = false;
- }
-
- /**
- * Connect to server to receive data via a WebSocket or EventSource
- */
- function pushConnect() {
- // always close the previous connection so that we do not end up with multiple connections
- pushDisconnect();
-
- if (reconnectTimeout.value) {
- clearTimeout(reconnectTimeout.value);
- reconnectTimeout.value = null;
- }
-
- const useWebSockets = settingsStore.pushBackend === 'websocket';
+ const useWebSockets = settingsStore.pushBackend === 'websocket';
+ const getConnectionUrl = () => {
const restUrl = rootStore.restUrl;
- const url = `/push?pushRef=${pushRef.value}`;
+ const url = `/push?pushRef=${rootStore.pushRef}`;
if (useWebSockets) {
const { protocol, host } = window.location;
const baseUrl = restUrl.startsWith('http')
? restUrl.replace(/^http/, 'ws')
: `${protocol === 'https:' ? 'wss' : 'ws'}://${host + restUrl}`;
- pushSource.value = new WebSocket(`${baseUrl}${url}`);
+ return `${baseUrl}${url}`;
} else {
- pushSource.value = new EventSource(`${restUrl}${url}`, { withCredentials: true });
+ return `${restUrl}${url}`;
}
+ };
- pushSource.value.addEventListener('open', onConnectionSuccess, false);
- pushSource.value.addEventListener('message', pushMessageReceived, false);
- pushSource.value.addEventListener(useWebSockets ? 'close' : 'error', onConnectionError, false);
- }
+ /**
+ * Process a newly received message
+ */
+ async function onMessage(data: unknown) {
+ let receivedData: PushMessage;
+ try {
+ receivedData = JSON.parse(data as string);
+ } catch (error) {
+ return;
+ }
- function attemptReconnect() {
- pushConnect();
+ onMessageReceivedHandlers.value.forEach((handler) => handler(receivedData));
}
+ const url = getConnectionUrl();
+
+ const client = useWebSockets
+ ? useWebSocketClient({ url, onMessage })
+ : useEventSourceClient({ url, onMessage });
+
function serializeAndSend(message: unknown) {
- if (pushSource.value && 'send' in pushSource.value) {
- pushSource.value.send(JSON.stringify(message));
- }
+ client.sendMessage(JSON.stringify(message));
}
- function onConnectionSuccess() {
- isConnectionOpen.value = true;
- connectRetries.value = 0;
- lostConnection.value = false;
- rootStore.setPushConnectionActive();
- pushSource.value?.removeEventListener('open', onConnectionSuccess);
+ const pushConnect = () => {
+ isConnectionRequested.value = true;
+ client.connect();
+ };
+
+ const pushDisconnect = () => {
+ isConnectionRequested.value = false;
+ client.disconnect();
+ };
+
+ watch(client.isConnected, (didConnect) => {
+ if (!didConnect) {
+ return;
+ }
+ // Send any buffered messages
if (outgoingQueue.value.length) {
for (const message of outgoingQueue.value) {
serializeAndSend(message);
}
outgoingQueue.value = [];
}
- }
-
- function send(message: unknown) {
- if (!isConnectionOpen.value) {
- outgoingQueue.value.push(message);
- return;
- }
- serializeAndSend(message);
- }
-
- /**
- * Process a newly received message
- */
- async function pushMessageReceived(event: Event) {
- let receivedData: PushMessage;
- try {
- // @ts-ignore
- receivedData = JSON.parse(event.data);
- } catch (error) {
- return;
- }
-
- onMessageReceivedHandlers.value.forEach((handler) => handler(receivedData));
- }
+ });
+ /** Removes all buffered messages from the sent queue */
const clearQueue = () => {
outgoingQueue.value = [];
};
+ const isConnected = computed(() => client.isConnected.value);
+
return {
- pushRef,
- pushSource,
- isConnectionOpen,
+ isConnected,
+ isConnectionRequested,
onMessageReceivedHandlers,
addEventListener,
pushConnect,
pushDisconnect,
- send,
+ send: serializeAndSend,
clearQueue,
};
});
diff --git a/packages/editor-ui/src/stores/root.store.ts b/packages/editor-ui/src/stores/root.store.ts
index 6e9bee10dacf5..7736f2076b047 100644
--- a/packages/editor-ui/src/stores/root.store.ts
+++ b/packages/editor-ui/src/stores/root.store.ts
@@ -20,7 +20,6 @@ export const useRootStore = defineStore(STORES.ROOT, () => {
endpointWebhook: 'webhook',
endpointWebhookTest: 'webhook-test',
endpointWebhookWaiting: 'webhook-waiting',
- pushConnectionActive: true,
timezone: 'America/New_York',
executionTimeout: -1,
maxExecutionTimeout: Number.MAX_SAFE_INTEGER,
@@ -66,8 +65,6 @@ export const useRootStore = defineStore(STORES.ROOT, () => {
const versionCli = computed(() => state.value.versionCli);
- const pushConnectionActive = computed(() => state.value.pushConnectionActive);
-
const OAuthCallbackUrls = computed(() => state.value.oauthCallbackUrls);
const webhookTestUrl = computed(
@@ -105,14 +102,6 @@ export const useRootStore = defineStore(STORES.ROOT, () => {
state.value.urlBaseWebhook = url;
};
- const setPushConnectionActive = () => {
- state.value.pushConnectionActive = true;
- };
-
- const setPushConnectionInactive = () => {
- state.value.pushConnectionActive = false;
- };
-
const setUrlBaseEditor = (urlBaseEditor: string) => {
const url = urlBaseEditor.endsWith('/') ? urlBaseEditor : `${urlBaseEditor}/`;
state.value.urlBaseEditor = url;
@@ -198,13 +187,10 @@ export const useRootStore = defineStore(STORES.ROOT, () => {
pushRef,
defaultLocale,
binaryDataMode,
- pushConnectionActive,
OAuthCallbackUrls,
executionTimeout,
maxExecutionTimeout,
timezone,
- setPushConnectionInactive,
- setPushConnectionActive,
setUrlBaseWebhook,
setUrlBaseEditor,
setEndpointForm,