From c353b2a3011f0b3799d20912f74092d5ef69d52e Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Wed, 5 Feb 2025 16:35:40 +0200 Subject: [PATCH 01/11] fix(editor): Fix push connection reconnection --- .../src/push-connection/AbstractPushClient.ts | 87 +++++++ .../src/push-connection/EventSourceClient.ts | 43 ++++ .../src/push-connection/WebSocketClient.ts | 97 ++++++++ .../__tests__/AbstractPushClient.test.ts | 165 ++++++++++++++ .../__tests__/EventSourceClient.test.ts | 107 +++++++++ .../__tests__/WebSocketClient.test.ts | 213 ++++++++++++++++++ .../src/stores/pushConnection.store.ts | 107 ++++----- 7 files changed, 753 insertions(+), 66 deletions(-) create mode 100644 packages/editor-ui/src/push-connection/AbstractPushClient.ts create mode 100644 packages/editor-ui/src/push-connection/EventSourceClient.ts create mode 100644 packages/editor-ui/src/push-connection/WebSocketClient.ts create mode 100644 packages/editor-ui/src/push-connection/__tests__/AbstractPushClient.test.ts create mode 100644 packages/editor-ui/src/push-connection/__tests__/EventSourceClient.test.ts create mode 100644 packages/editor-ui/src/push-connection/__tests__/WebSocketClient.test.ts diff --git a/packages/editor-ui/src/push-connection/AbstractPushClient.ts b/packages/editor-ui/src/push-connection/AbstractPushClient.ts new file mode 100644 index 0000000000000..ee617662eb924 --- /dev/null +++ b/packages/editor-ui/src/push-connection/AbstractPushClient.ts @@ -0,0 +1,87 @@ +export type PushClientCallbacks = { + onMessage: (data: unknown) => void; + onConnect?: () => void; + onDisconnect?: () => void; +}; + +export type PushClientOptions = { + url: string; + callbacks: PushClientCallbacks; +}; + +/** + * Abstract base class for push connection clients (WebSocket, EventSource) + * Handles common functionality like connection management and reconnection logic + */ +export abstract class AbstractPushClient { + protected readonly url: string; + + private readonly callbacks: PushClientCallbacks; + + protected reconnectAttempts = 0; + + /** Initial delay between reconnection attempts */ + protected readonly initialReconnectDelay = 1000; + + /** Maximum delay between reconnection attempts */ + protected readonly maxReconnectDelay = 15_000; + + protected reconnectTimer: ReturnType | null = null; + + constructor(options: PushClientOptions) { + this.url = options.url; + this.callbacks = options.callbacks; + } + + /** Connect to the push server */ + abstract connect(): void; + + abstract sendMessage(serializedMessage: string): void; + + /** Close the push connection */ + disconnect() { + this.stopReconnectTimer(); + } + + /** Handle the connection being established */ + protected handleConnectEvent() { + this.reconnectAttempts = 0; // Reset attempts on successful connection + this.callbacks.onConnect?.(); + } + + /** Handle the connection being lost */ + protected handleDisconnectEvent(code?: number) { + console.warn(`[PushConnection] Connection lost, code=${code ?? 'unknown'}`); + this.callbacks.onDisconnect?.(); + this.scheduleReconnect(); + } + + /** Handle an error in the connection */ + protected handleErrorEvent(error: unknown) { + console.error('[PushConnection] Connection error:', error); + } + + /** Handle a message being received */ + protected handleMessageEvent(event: MessageEvent) { + this.callbacks.onMessage(event.data); + } + + private scheduleReconnect() { + const delay = Math.min( + this.initialReconnectDelay * 2 ** this.reconnectAttempts, + this.maxReconnectDelay, + ); + + console.info(`[PushConnection] Reconnecting in ${delay / 1000} seconds...`); + this.reconnectAttempts++; + + this.reconnectTimer = setTimeout(() => this.connect(), delay); + } + + private stopReconnectTimer() { + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + } +} diff --git a/packages/editor-ui/src/push-connection/EventSourceClient.ts b/packages/editor-ui/src/push-connection/EventSourceClient.ts new file mode 100644 index 0000000000000..f3bfb570dfdeb --- /dev/null +++ b/packages/editor-ui/src/push-connection/EventSourceClient.ts @@ -0,0 +1,43 @@ +import { AbstractPushClient, type PushClientOptions } from './AbstractPushClient'; + +export type EventSourceClientOptions = PushClientOptions; + +export class EventSourceClient extends AbstractPushClient { + private eventSource: EventSource | null = null; + + connect() { + this.eventSource = new EventSource(this.url, { withCredentials: true }); + + this.eventSource.addEventListener('open', this.onConnectionOpen); + this.eventSource.addEventListener('message', this.handleMessageEvent); + this.eventSource.addEventListener('error', this.onConnectionError); + } + + disconnect() { + super.disconnect(); + + this.eventSource?.close(); + this.eventSource = null; + } + + sendMessage() { + // noop, not supported in EventSource + } + + //#region Event handlers + + protected onConnectionOpen = () => { + super.handleConnectEvent(); + }; + + protected handleMessageEvent = (event: MessageEvent) => { + super.handleMessageEvent(event); + }; + + protected onConnectionError = () => { + // EventSource triggers an "error" event when the connection fails to open + super.handleDisconnectEvent(); + }; + + //#endregion Event handlers +} diff --git a/packages/editor-ui/src/push-connection/WebSocketClient.ts b/packages/editor-ui/src/push-connection/WebSocketClient.ts new file mode 100644 index 0000000000000..b28467f6052b8 --- /dev/null +++ b/packages/editor-ui/src/push-connection/WebSocketClient.ts @@ -0,0 +1,97 @@ +import { AbstractPushClient } from './AbstractPushClient'; + +const WebSocketState = { + CONNECTING: 0, + OPEN: 1, + CLOSING: 2, + CLOSED: 3, +}; + +/** + * A WebSocket implementation that automatically reconnects when the connection is lost. + * It also sends a heartbeat to the server in attempt to keep the connection alive. + * This heartbeat is sent in addition to the protocol level ping/pong mechanism the + * server sends. + */ +export class WebSocketClient extends AbstractPushClient { + protected socket: WebSocket | null = null; + + /** Interval between heartbeats */ + protected readonly heartbeatInterval = 30_000; + + protected heartbeatTimer: ReturnType | null = null; + + connect() { + this.socket = new WebSocket(this.url); + + this.socket.addEventListener('open', this.onConnectionOpen); + this.socket.addEventListener('message', this.handleMessageEvent); + this.socket.addEventListener('error', this.onConnectionError); + this.socket.addEventListener('close', this.onConnectionClose); + } + + sendMessage(serializedMessage: string) { + if (this.socket?.readyState === WebSocketState.OPEN) { + this.socket.send(serializedMessage); + } + } + + disconnect() { + super.disconnect(); + + this.stopHeartbeat(); + this.removeHandlers(); + this.socket?.close(1000, 'Client closed connection'); + this.socket = null; + } + + //#region Event handlers + + protected onConnectionOpen = () => { + super.handleConnectEvent(); + this.startHeartbeat(); + }; + + protected onConnectionClose = (event: CloseEvent) => { + super.handleDisconnectEvent(event.code); + this.stopHeartbeat(); + }; + + protected handleMessageEvent = (event: MessageEvent) => { + super.handleMessageEvent(event); + }; + + protected onConnectionError = (error: unknown) => { + super.handleErrorEvent(error); + }; + + //#endregion Event handlers + + //#region Heartbeat + + private startHeartbeat() { + if (!this.socket) return; + + this.heartbeatTimer = setInterval(() => { + if (this.socket?.readyState === WebSocketState.OPEN) { + this.socket.send(JSON.stringify({ type: 'heartbeat' })); + } + }, this.heartbeatInterval); + } + + private stopHeartbeat() { + if (this.heartbeatTimer) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = null; + } + } + + //#endregion Heartbeat + + private removeHandlers() { + this.socket?.removeEventListener('open', this.onConnectionOpen); + this.socket?.removeEventListener('message', this.handleMessageEvent); + this.socket?.removeEventListener('error', this.onConnectionError); + this.socket?.removeEventListener('close', this.onConnectionClose); + } +} diff --git a/packages/editor-ui/src/push-connection/__tests__/AbstractPushClient.test.ts b/packages/editor-ui/src/push-connection/__tests__/AbstractPushClient.test.ts new file mode 100644 index 0000000000000..a8207399a8fd0 --- /dev/null +++ b/packages/editor-ui/src/push-connection/__tests__/AbstractPushClient.test.ts @@ -0,0 +1,165 @@ +import { describe, test, vi, beforeEach, expect } from 'vitest'; +import { AbstractPushClient } from '@/push-connection/AbstractPushClient'; +import type { PushClientCallbacks, PushClientOptions } from '@/push-connection/AbstractPushClient'; + +export class TestPushClient extends AbstractPushClient { + connect(): void {} + + sendMessage(): void {} + + // Helper methods to expose protected methods for testing + testHandleConnectEvent() { + this.handleConnectEvent(); + } + + testHandleDisconnectEvent(code?: number) { + this.handleDisconnectEvent(code); + } + + testHandleErrorEvent(error: unknown) { + this.handleErrorEvent(error); + } + + testHandleMessageEvent(event: MessageEvent) { + this.handleMessageEvent(event); + } + + // Expose protected properties for testing + getReconnectAttempts() { + return this.reconnectAttempts; + } + + setReconnectAttempts(value: number) { + this.reconnectAttempts = value; + } + + getReconnectTimer() { + return this.reconnectTimer; + } + + getUrl() { + return this.url; + } +} + +describe('AbstractPushClient', () => { + let client: TestPushClient; + let callbacks: PushClientCallbacks; + let options: PushClientOptions; + + beforeEach(() => { + vi.useFakeTimers(); + callbacks = { + onMessage: vi.fn(), + onConnect: vi.fn(), + onDisconnect: vi.fn(), + }; + + options = { + url: 'http://localhost:5678', + callbacks, + }; + + client = new TestPushClient(options); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + describe('initialization', () => { + test('should initialize with correct options', () => { + expect(client.getUrl()).toBe(options.url); + expect(client.getReconnectAttempts()).toBe(0); + expect(client.getReconnectTimer()).toBeNull(); + }); + }); + + describe('handleConnectEvent', () => { + test('should reset reconnect attempts and call onConnect callback', () => { + client.setReconnectAttempts(3); + client.testHandleConnectEvent(); + + expect(client.getReconnectAttempts()).toBe(0); + expect(callbacks.onConnect).toHaveBeenCalled(); + }); + }); + + describe('handleDisconnectEvent', () => { + test('should call onDisconnect callback and schedule reconnect', () => { + const consoleSpy = vi.spyOn(console, 'warn'); + client.testHandleDisconnectEvent(1006); + + expect(callbacks.onDisconnect).toHaveBeenCalled(); + expect(consoleSpy).toHaveBeenCalledWith('[PushConnection] Connection lost, code=1006'); + expect(client.getReconnectTimer()).not.toBeNull(); + }); + + test('should use exponential backoff for reconnect delay', async () => { + const connectSpy = vi.spyOn(client, 'connect').mockImplementation(() => { + client.testHandleDisconnectEvent(); + }); + + client.testHandleDisconnectEvent(); + expect(client.getReconnectAttempts()).toBe(1); + + // First attempt: 1000ms + vi.advanceTimersByTime(1000); + expect(connectSpy).toHaveBeenCalledTimes(1); + expect(client.getReconnectAttempts()).toBe(2); + + // Second attempt: 2000ms + vi.advanceTimersByTime(2000); + expect(connectSpy).toHaveBeenCalledTimes(2); + expect(client.getReconnectAttempts()).toBe(3); + + // Third attempt: 4000ms + vi.advanceTimersByTime(4000); + expect(connectSpy).toHaveBeenCalledTimes(3); + expect(client.getReconnectAttempts()).toBe(4); + }); + + test('should not exceed maximum reconnect delay', () => { + // Set reconnect attempts high enough to exceed max delay + client.setReconnectAttempts(10); + const consoleSpy = vi.spyOn(console, 'info'); + + client.testHandleDisconnectEvent(); + + // Max delay should be 15000ms + expect(consoleSpy).toHaveBeenCalledWith('[PushConnection] Reconnecting in 15 seconds...'); + }); + }); + + describe('handleErrorEvent', () => { + test('should log error to console', () => { + const consoleSpy = vi.spyOn(console, 'error'); + const error = new Error('Test error'); + + client.testHandleErrorEvent(error); + + expect(consoleSpy).toHaveBeenCalledWith('[PushConnection] Connection error:', error); + }); + }); + + describe('handleMessageEvent', () => { + test('should call onMessage callback with event data', () => { + const testData = { foo: 'bar' }; + const messageEvent = new MessageEvent('message', { data: testData }); + + client.testHandleMessageEvent(messageEvent); + + expect(callbacks.onMessage).toHaveBeenCalledWith(testData); + }); + }); + + describe('disconnect', () => { + test('should clear reconnect timer', () => { + client.testHandleDisconnectEvent(); // This will set up a reconnect timer + expect(client.getReconnectTimer()).not.toBeNull(); + + client.disconnect(); + expect(client.getReconnectTimer()).toBeNull(); + }); + }); +}); diff --git a/packages/editor-ui/src/push-connection/__tests__/EventSourceClient.test.ts b/packages/editor-ui/src/push-connection/__tests__/EventSourceClient.test.ts new file mode 100644 index 0000000000000..790dacadd76ef --- /dev/null +++ b/packages/editor-ui/src/push-connection/__tests__/EventSourceClient.test.ts @@ -0,0 +1,107 @@ +import { describe, test, vi, beforeEach, expect } from 'vitest'; +import { EventSourceClient } from '../EventSourceClient'; +import type { PushClientCallbacks, PushClientOptions } from '../AbstractPushClient'; + +/** Last created MockEventSource instance */ +let lastEventSourceInstance: MockEventSource; + +/** Mocked EventSource class to help testing */ +class MockEventSource extends EventTarget { + constructor( + public url: string, + public opts: EventSourceInit, + ) { + super(); + + // eslint-disable-next-line @typescript-eslint/no-this-alias + lastEventSourceInstance = this; + } + + dispatchOpenEvent() { + this.dispatchEvent(new Event('open')); + } + + dispatchMessageEvent(data: string) { + this.dispatchEvent(new MessageEvent('message', { data })); + } + + dispatchErrorEvent() { + this.dispatchEvent(new Event('error')); + } + + close = vi.fn(); +} + +describe('EventSourceClient', () => { + let client: EventSourceClient; + let callbacks: PushClientCallbacks; + let options: PushClientOptions; + + beforeEach(() => { + // Mock global EventSource constructor + global.EventSource = MockEventSource as unknown as typeof EventSource; + + callbacks = { + onMessage: vi.fn(), + onConnect: vi.fn(), + onDisconnect: vi.fn(), + }; + + options = { + url: 'http://localhost:5678/events', + callbacks, + }; + + client = new EventSourceClient(options); + }); + + test('should initialize with correct options', () => { + expect(client).toBeInstanceOf(EventSourceClient); + }); + + describe('connect', () => { + test('should create EventSource with correct URL and options', () => { + client.connect(); + + expect(lastEventSourceInstance.url).toBe('http://localhost:5678/events'); + expect(lastEventSourceInstance.opts).toEqual({ withCredentials: true }); + }); + + test('should trigger onConnect event when connection is opened', () => { + client.connect(); + + lastEventSourceInstance.dispatchOpenEvent(); + + expect(callbacks.onConnect).toHaveBeenCalled(); + }); + }); + + describe('onMessage', () => { + test('should trigger onMessage event when a message is received', () => { + client.connect(); + + lastEventSourceInstance.dispatchMessageEvent('test'); + + expect(callbacks.onMessage).toHaveBeenCalledWith('test'); + }); + }); + + describe('disconnect', () => { + test('should close EventSource connection', () => { + client.connect(); + client.disconnect(); + + expect(lastEventSourceInstance.close).toHaveBeenCalled(); + // @ts-expect-error - reconnectTimer is protected + expect(client.reconnectTimer).toBeNull(); + }); + }); + + describe('sendMessage', () => { + test('should be a noop function', () => { + expect(() => { + client.sendMessage(); + }).not.toThrow(); + }); + }); +}); diff --git a/packages/editor-ui/src/push-connection/__tests__/WebSocketClient.test.ts b/packages/editor-ui/src/push-connection/__tests__/WebSocketClient.test.ts new file mode 100644 index 0000000000000..2fb17d8ca17bf --- /dev/null +++ b/packages/editor-ui/src/push-connection/__tests__/WebSocketClient.test.ts @@ -0,0 +1,213 @@ +import type { PushClientCallbacks, PushClientOptions } from '@/push-connection/AbstractPushClient'; +import { WebSocketClient } from '@/push-connection/WebSocketClient'; + +const WebSocketState = { + CONNECTING: 0, + OPEN: 1, + CLOSING: 2, + CLOSED: 3, +}; + +/** Mocked WebSocket class to help testing */ +class MockWebSocket extends EventTarget { + /** Last created MockWebSocket instance */ + static instance: MockWebSocket | null = null; + + static getInstance() { + if (!MockWebSocket.instance) { + throw new Error('No instance was created'); + } + + return MockWebSocket.instance; + } + + static reset() { + MockWebSocket.instance = null; + } + + readyState: number = WebSocketState.CONNECTING; + + constructor(public url: string) { + super(); + + MockWebSocket.instance = this; + } + + openConnection() { + this.dispatchEvent(new Event('open')); + this.readyState = WebSocketState.OPEN; + } + + closeConnection(code: number) { + this.dispatchEvent(new CloseEvent('close', { code })); + this.readyState = WebSocketState.CLOSED; + } + + dispatchMessageEvent(data: string) { + this.dispatchEvent(new MessageEvent('message', { data })); + } + + dispatchErrorEvent() { + this.dispatchEvent(new Event('error')); + } + + send = vi.fn(); + + close = vi.fn(); +} + +/** Test class that extends WebSocketClient to expose protected methods */ +class TestWebSocketClient extends WebSocketClient { + getHeartbeatTimer() { + return this.heartbeatTimer; + } + + getReconnectTimer() { + return this.reconnectTimer; + } + + getSocket() { + return this.socket; + } + + getSocketOrFail() { + if (!this.socket) { + throw new Error('Socket is not initialized'); + } + + return this.socket; + } +} + +describe('WebSocketClient', () => { + let client: TestWebSocketClient; + let callbacks: PushClientCallbacks; + let options: PushClientOptions; + + beforeEach(() => { + vi.useFakeTimers(); + + global.WebSocket = MockWebSocket as unknown as typeof WebSocket; + + callbacks = { + onMessage: vi.fn(), + onConnect: vi.fn(), + onDisconnect: vi.fn(), + }; + + options = { + url: 'ws://test.com', + callbacks, + }; + + client = new TestWebSocketClient(options); + }); + + afterEach(() => { + vi.clearAllMocks(); + vi.clearAllTimers(); + vi.useRealTimers(); + MockWebSocket.reset(); + }); + + describe('connect', () => { + it('should establish websocket connection', () => { + client.connect(); + expect(MockWebSocket.getInstance().url).toBe('ws://test.com'); + + MockWebSocket.getInstance().openConnection(); + + expect(callbacks.onConnect).toHaveBeenCalled(); + }); + }); + + describe('sendMessage', () => { + it('should send message when socket is open', () => { + client.connect(); + MockWebSocket.getInstance().openConnection(); + + client.sendMessage('test message'); + + expect(client.getSocketOrFail().send).toHaveBeenCalledWith('test message'); + }); + + test.each([[WebSocketState.CONNECTING], [WebSocketState.CLOSING], [WebSocketState.CLOSED]])( + 'should not send message when socket is %s', + (state) => { + client.connect(); + MockWebSocket.getInstance().readyState = state; + + client.sendMessage('test message'); + + expect(MockWebSocket.getInstance().send).not.toHaveBeenCalled(); + }, + ); + }); + + describe('heartbeat', () => { + it('should start sending heartbeats after connection opens', () => { + client.connect(); + MockWebSocket.getInstance().openConnection(); + + expect(client.getHeartbeatTimer()).not.toBeNull(); + + vi.advanceTimersByTime(30_000); + + expect(MockWebSocket.getInstance().send).toHaveBeenCalledWith( + JSON.stringify({ type: 'heartbeat' }), + ); + }); + + it('should stop heartbeat when connection closes', () => { + client.connect(); + MockWebSocket.getInstance().openConnection(); + + vi.advanceTimersByTime(30_000); + expect(MockWebSocket.getInstance().send).toHaveBeenCalledTimes(1); + + MockWebSocket.getInstance().closeConnection(1000); + vi.advanceTimersByTime(30_000); + + // The reconnect timer creates a new WebSocket instance which has calls set to 0 + expect(MockWebSocket.getInstance().send).toHaveBeenCalledTimes(0); + }); + }); + + describe('disconnect', () => { + it('should clean up connection and stop heartbeat', () => { + client.connect(); + MockWebSocket.getInstance().openConnection(); + + client.disconnect(); + + expect(MockWebSocket.getInstance().close).toHaveBeenCalledWith( + 1000, + 'Client closed connection', + ); + expect(client.getHeartbeatTimer()).toBeNull(); + expect(client.getReconnectTimer()).toBeNull(); + }); + }); + + describe('event handling', () => { + it('should handle incoming messages', () => { + client.connect(); + MockWebSocket.getInstance().openConnection(); + MockWebSocket.getInstance().dispatchMessageEvent('test data'); + + expect(callbacks.onMessage).toHaveBeenCalledWith('test data'); + }); + + it('should handle connection close and reconnect', () => { + const connectSpy = vi.spyOn(client, 'connect'); + client.connect(); + MockWebSocket.getInstance().closeConnection(1000); + + expect(callbacks.onDisconnect).toHaveBeenCalled(); + expect(client.getReconnectTimer()).not.toBeNull(); + + vi.advanceTimersByTime(1000); + expect(connectSpy).toHaveBeenCalledTimes(2); + }); + }); +}); diff --git a/packages/editor-ui/src/stores/pushConnection.store.ts b/packages/editor-ui/src/stores/pushConnection.store.ts index 26e72f2f46384..9cbe12f069a89 100644 --- a/packages/editor-ui/src/stores/pushConnection.store.ts +++ b/packages/editor-ui/src/stores/pushConnection.store.ts @@ -2,21 +2,12 @@ import { defineStore } from 'pinia'; import { ref, computed } from 'vue'; import type { PushMessage } from '@n8n/api-types'; -import { STORES, TIME } from '@/constants'; +import { STORES } from '@/constants'; import { useSettingsStore } from './settings.store'; import { useRootStore } from './root.store'; - -export interface PushState { - pushRef: string; - pushSource: WebSocket | EventSource | null; - reconnectTimeout: NodeJS.Timeout | null; - retryTimeout: NodeJS.Timeout | null; - pushMessageQueue: Array<{ event: Event; retriesLeft: number }>; - connectRetries: number; - lostConnection: boolean; - outgoingQueue: unknown[]; - isConnectionOpen: boolean; -} +import { WebSocketClient } from '@/push-connection/WebSocketClient'; +import { EventSourceClient } from '@/push-connection/EventSourceClient'; +import type { PushClientOptions } from '@/push-connection/AbstractPushClient'; export type OnPushMessageHandler = (event: PushMessage) => void; @@ -28,10 +19,7 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => { const settingsStore = useSettingsStore(); const pushRef = computed(() => rootStore.pushRef); - const pushSource = ref(null); - const reconnectTimeout = ref(null); - const connectRetries = ref(0); - const lostConnection = ref(false); + const pushConnection = ref(null); const outgoingQueue = ref([]); const isConnectionOpen = ref(false); @@ -39,6 +27,7 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => { const addEventListener = (handler: OnPushMessageHandler) => { onMessageReceivedHandlers.value.push(handler); + return () => { const index = onMessageReceivedHandlers.value.indexOf(handler); if (index !== -1) { @@ -47,44 +36,16 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => { }; }; - function onConnectionError() { - pushDisconnect(); - connectRetries.value++; - reconnectTimeout.value = setTimeout( - attemptReconnect, - Math.min(connectRetries.value * 2000, 8 * TIME.SECOND), // maximum 8 seconds backoff - ); - } - /** * Close connection to server */ function pushDisconnect() { - if (pushSource.value !== null) { - pushSource.value.removeEventListener('error', onConnectionError); - pushSource.value.removeEventListener('close', onConnectionError); - pushSource.value.removeEventListener('message', pushMessageReceived); - if (pushSource.value.readyState < 2) pushSource.value.close(); - pushSource.value = null; - } + pushConnection.value?.disconnect(); isConnectionOpen.value = false; } - /** - * Connect to server to receive data via a WebSocket or EventSource - */ - function pushConnect() { - // always close the previous connection so that we do not end up with multiple connections - pushDisconnect(); - - if (reconnectTimeout.value) { - clearTimeout(reconnectTimeout.value); - reconnectTimeout.value = null; - } - - const useWebSockets = settingsStore.pushBackend === 'websocket'; - + function getConnectionUrl(useWebSockets: boolean) { const restUrl = rootStore.restUrl; const url = `/push?pushRef=${pushRef.value}`; @@ -93,32 +54,42 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => { const baseUrl = restUrl.startsWith('http') ? restUrl.replace(/^http/, 'ws') : `${protocol === 'https:' ? 'wss' : 'ws'}://${host + restUrl}`; - pushSource.value = new WebSocket(`${baseUrl}${url}`); + return `${baseUrl}${url}`; } else { - pushSource.value = new EventSource(`${restUrl}${url}`, { withCredentials: true }); + return `${restUrl}${url}`; } - - pushSource.value.addEventListener('open', onConnectionSuccess, false); - pushSource.value.addEventListener('message', pushMessageReceived, false); - pushSource.value.addEventListener(useWebSockets ? 'close' : 'error', onConnectionError, false); } - function attemptReconnect() { - pushConnect(); + /** + * Connect to server to receive data via a WebSocket or EventSource + */ + function pushConnect() { + // always close the previous connection so that we do not end up with multiple connections + pushDisconnect(); + + const useWebSockets = settingsStore.pushBackend === 'websocket'; + const url = getConnectionUrl(useWebSockets); + + const opts: PushClientOptions = { + url, + callbacks: { + onMessage: pushMessageReceived, + onConnect, + onDisconnect, + }, + }; + + pushConnection.value = useWebSockets ? new WebSocketClient(opts) : new EventSourceClient(opts); + pushConnection.value.connect(); } function serializeAndSend(message: unknown) { - if (pushSource.value && 'send' in pushSource.value) { - pushSource.value.send(JSON.stringify(message)); - } + pushConnection.value?.sendMessage(JSON.stringify(message)); } - function onConnectionSuccess() { + function onConnect() { isConnectionOpen.value = true; - connectRetries.value = 0; - lostConnection.value = false; rootStore.setPushConnectionActive(); - pushSource.value?.removeEventListener('open', onConnectionSuccess); if (outgoingQueue.value.length) { for (const message of outgoingQueue.value) { @@ -128,6 +99,11 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => { } } + function onDisconnect() { + isConnectionOpen.value = false; + rootStore.setPushConnectionInactive(); + } + function send(message: unknown) { if (!isConnectionOpen.value) { outgoingQueue.value.push(message); @@ -139,11 +115,10 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => { /** * Process a newly received message */ - async function pushMessageReceived(event: Event) { + async function pushMessageReceived(data: unknown) { let receivedData: PushMessage; try { - // @ts-ignore - receivedData = JSON.parse(event.data); + receivedData = JSON.parse(data as string); } catch (error) { return; } @@ -157,7 +132,7 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => { return { pushRef, - pushSource, + pushSource: pushConnection, isConnectionOpen, onMessageReceivedHandlers, addEventListener, From 7d14a7fde1b1b6d6d5407c26d70e6aef8d254b79 Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Wed, 5 Feb 2025 16:44:58 +0200 Subject: [PATCH 02/11] refactor(editor): Remove push connection state from root store --- packages/editor-ui/src/Interface.ts | 1 - .../src/components/PushConnectionTracker.vue | 8 +++---- .../src/composables/useRunWorkflow.test.ts | 23 ++++++++++--------- .../src/composables/useRunWorkflow.ts | 4 +++- .../src/stores/pushConnection.store.ts | 2 -- packages/editor-ui/src/stores/root.store.ts | 14 ----------- 6 files changed, 18 insertions(+), 34 deletions(-) diff --git a/packages/editor-ui/src/Interface.ts b/packages/editor-ui/src/Interface.ts index 7753ba8e1850c..4ab63eb599a68 100644 --- a/packages/editor-ui/src/Interface.ts +++ b/packages/editor-ui/src/Interface.ts @@ -888,7 +888,6 @@ export interface RootState { endpointWebhook: string; endpointWebhookTest: string; endpointWebhookWaiting: string; - pushConnectionActive: boolean; timezone: string; executionTimeout: number; maxExecutionTimeout: number; diff --git a/packages/editor-ui/src/components/PushConnectionTracker.vue b/packages/editor-ui/src/components/PushConnectionTracker.vue index 66175c2af400e..adc5dfbece8c3 100644 --- a/packages/editor-ui/src/components/PushConnectionTracker.vue +++ b/packages/editor-ui/src/components/PushConnectionTracker.vue @@ -1,16 +1,14 @@