diff --git a/packages/connector/src/MosConnection.ts b/packages/connector/src/MosConnection.ts index 0c4219fc..3db18111 100644 --- a/packages/connector/src/MosConnection.ts +++ b/packages/connector/src/MosConnection.ts @@ -92,7 +92,7 @@ export class MosConnection extends EventEmitter implements this._debug, this.mosTypes.strict ) - let secondary = null + let secondary: NCSServerConnection | null = null this._ncsConnections[connectionOptions.primary.host] = primary primary.on('rawMessage', (type: string, message: string) => { @@ -174,6 +174,23 @@ export class MosConnection extends EventEmitter implements false ) } + // Handle that .openMediaHotStandby should not check for heartbeats on + // the secondary connection when the primary is connected + // And disable heartbeats on the primary when the primary is disconnected + if (connectionOptions.secondary?.openMediaHotStandby) { + // Initially disable heartbeats on secondary since primary should be attempted first + secondary.disableHeartbeats() + + primary.on('connectionChanged', () => { + if (primary.connected) { + secondary?.disableHeartbeats() + primary.enableHeartbeats() + } else { + secondary?.enableHeartbeats() + primary.disableHeartbeats() + } + }) + } } return this._registerMosDevice( diff --git a/packages/connector/src/__mocks__/socket.ts b/packages/connector/src/__mocks__/socket.ts index da55a5ea..d6afcd93 100644 --- a/packages/connector/src/__mocks__/socket.ts +++ b/packages/connector/src/__mocks__/socket.ts @@ -51,6 +51,7 @@ export class SocketMock extends EventEmitter implements Socket { private _responses: Array = [] private _autoReplyToHeartBeat = true + public mockConnectCount = 0 constructor() { super() @@ -102,6 +103,7 @@ export class SocketMock extends EventEmitter implements Socket { } // @ts-expect-error mock connect(port: number, host: string): this { + this.mockConnectCount++ this.connectedPort = port this.connectedHost = host @@ -197,6 +199,10 @@ export class SocketMock extends EventEmitter implements Socket { this.emit('connect') } + mockEmitClose(): void { + this.emit('close') + } + mockSentMessage0(data: unknown, encoding: string): void { if (this._autoReplyToHeartBeat) { const str: string = typeof data === 'string' ? data : this.decode(data as any) diff --git a/packages/connector/src/__tests__/MosConnection.spec.ts b/packages/connector/src/__tests__/MosConnection.spec.ts index 8e110027..832e6445 100644 --- a/packages/connector/src/__tests__/MosConnection.spec.ts +++ b/packages/connector/src/__tests__/MosConnection.spec.ts @@ -1,5 +1,15 @@ /* eslint-disable @typescript-eslint/unbound-method */ -import { clearMocks, decode, delay, encode, getMessageId, getXMLReply, initMosConnection, setupMocks } from './lib' +import { + clearMocks, + decode, + delay, + encode, + getConnectionsFromDevice, + getMessageId, + getXMLReply, + initMosConnection, + setupMocks, +} from './lib' import { SocketMock } from '../__mocks__/socket' import { ServerMock } from '../__mocks__/server' import { xmlData, xmlApiData } from '../__mocks__/testData' @@ -532,6 +542,157 @@ describe('MosDevice: General', () => { expect(onError).toHaveBeenCalledTimes(0) expect(onWarning).toHaveBeenCalledTimes(0) + await mos.dispose() + }) + test('Hot standby', async () => { + const mos = new MosConnection({ + mosID: 'jestMOS', + acceptsConnections: true, + profiles: { + '0': true, + '1': true, + }, + }) + const onError = jest.fn((e) => console.log(e)) + const onWarning = jest.fn((e) => console.log(e)) + mos.on('error', onError) + mos.on('warning', onWarning) + + expect(mos.acceptsConnections).toBe(true) + await initMosConnection(mos) + expect(mos.isListening).toBe(true) + + const mosDevice = await mos.connect({ + primary: { + id: 'primary', + host: '192.168.0.1', + timeout: 200, + }, + secondary: { + id: 'secondary', + host: '192.168.0.2', + timeout: 200, + openMediaHotStandby: true, + }, + }) + + expect(mosDevice).toBeTruthy() + expect(mosDevice.idPrimary).toEqual('jestMOS_primary') + + const connections = getConnectionsFromDevice(mosDevice) + expect(connections.primary).toBeTruthy() + expect(connections.secondary).toBeTruthy() + connections.primary?.setAutoReconnectInterval(300) + connections.secondary?.setAutoReconnectInterval(300) + + const onConnectionChange = jest.fn() + mosDevice.onConnectionChange((connectionStatus: IMOSConnectionStatus) => { + onConnectionChange(connectionStatus) + }) + + expect(SocketMock.instances).toHaveLength(7) + expect(SocketMock.instances[1].connectedHost).toEqual('192.168.0.1') + expect(SocketMock.instances[1].connectedPort).toEqual(10540) + expect(SocketMock.instances[2].connectedHost).toEqual('192.168.0.1') + expect(SocketMock.instances[2].connectedPort).toEqual(10541) + expect(SocketMock.instances[3].connectedHost).toEqual('192.168.0.1') + expect(SocketMock.instances[3].connectedPort).toEqual(10542) + + // TODO: Perhaps the hot-standby should not be connected at all at this point? + expect(SocketMock.instances[4].connectedHost).toEqual('192.168.0.2') + expect(SocketMock.instances[4].connectedPort).toEqual(10540) + expect(SocketMock.instances[5].connectedHost).toEqual('192.168.0.2') + expect(SocketMock.instances[5].connectedPort).toEqual(10541) + expect(SocketMock.instances[6].connectedHost).toEqual('192.168.0.2') + expect(SocketMock.instances[6].connectedPort).toEqual(10542) + + // Simulate primary connected: + for (const i of SocketMock.instances) { + if (i.connectedHost === '192.168.0.1') i.mockEmitConnected() + } + // Wait for the primary to be initially connected: + await waitFor(() => mosDevice.getConnectionStatus().PrimaryConnected, 1000) + + // Check that the connection status is as we expect: + expect(mosDevice.getConnectionStatus()).toMatchObject({ + PrimaryConnected: true, + PrimaryStatus: 'Primary: Connected', + SecondaryConnected: false, // This is expected behaviour from a hot standby - we leave it up to the library consumer to decide if this is bad or not + SecondaryStatus: 'Secondary: No heartbeats on port query', + }) + expect(onConnectionChange).toHaveBeenCalled() + expect(onConnectionChange).toHaveBeenLastCalledWith({ + PrimaryConnected: true, + PrimaryStatus: 'Primary: Connected', + SecondaryConnected: false, // This is expected from a hot standby + SecondaryStatus: 'Secondary: No heartbeats on port query', + }) + onConnectionChange.mockClear() + + // Simulate primary disconnect, secondary hot standby takes over: + for (const i of SocketMock.instances) { + i.mockConnectCount = 0 + if (i.connectedHost === '192.168.0.1') i.mockEmitClose() + if (i.connectedHost === '192.168.0.2') i.mockEmitConnected() + } + + // Wait for the secondary to be connected: + await waitFor(() => mosDevice.getConnectionStatus().SecondaryConnected, 1000) + + // Check that the connection status is as we expect: + expect(mosDevice.getConnectionStatus()).toMatchObject({ + PrimaryConnected: false, + PrimaryStatus: expect.stringContaining('Primary'), + SecondaryConnected: true, + SecondaryStatus: 'Secondary: Connected', + }) + expect(onConnectionChange).toHaveBeenCalled() + expect(onConnectionChange).toHaveBeenLastCalledWith({ + PrimaryConnected: false, + PrimaryStatus: expect.stringContaining('Primary'), + SecondaryConnected: true, + SecondaryStatus: 'Secondary: Connected', + }) + onConnectionChange.mockClear() + + // Simulate that the primary comes back online: + for (const i of SocketMock.instances) { + if (i.connectedHost === '192.168.0.1') { + expect(i.mockConnectCount).toBeGreaterThanOrEqual(1) // should have tried to reconnect + i.mockEmitConnected() + } + + if (i.connectedHost === '192.168.0.2') i.mockEmitClose() + } + + // Wait for the primary to be connected: + await waitFor(() => mosDevice.getConnectionStatus().PrimaryConnected, 1000) + + // Check that the connection status is as we expect: + expect(mosDevice.getConnectionStatus()).toMatchObject({ + PrimaryConnected: true, + PrimaryStatus: 'Primary: Connected', + SecondaryConnected: false, // This is expected from a hot standby + SecondaryStatus: 'Secondary: No heartbeats on port query', + }) + expect(onConnectionChange).toHaveBeenCalled() + expect(onConnectionChange).toHaveBeenLastCalledWith({ + PrimaryConnected: true, + PrimaryStatus: 'Primary: Connected', + SecondaryConnected: false, // This is expected from a hot standby + SecondaryStatus: 'Secondary: No heartbeats on port query', + }) + await mos.dispose() }) }) +async function waitFor(fcn: () => boolean, timeout: number): Promise { + const startTime = Date.now() + + while (Date.now() - startTime < timeout) { + await delay(10) + + if (fcn()) return + } + throw new Error('Timeout in waitFor') +} diff --git a/packages/connector/src/__tests__/OpenMediaHotStandby.spec.ts b/packages/connector/src/__tests__/OpenMediaHotStandby.spec.ts new file mode 100644 index 00000000..79c15ab8 --- /dev/null +++ b/packages/connector/src/__tests__/OpenMediaHotStandby.spec.ts @@ -0,0 +1,122 @@ +import { MosConnection } from "../MosConnection"; +import { getMosConnection, setupMocks } from "./lib"; +import { NCSServerConnection } from "../connection/NCSServerConnection"; + +describe('Hot Standby Feature', () => { + let mosConnection: MosConnection; + let primary: NCSServerConnection | null; + let secondary: NCSServerConnection | null; + + beforeAll(() => { + setupMocks(); + }); + + beforeEach(async () => { + mosConnection = await getMosConnection({ + '0': true, + '1': true, + }, false); + + const device = await mosConnection.connect({ + primary: { + id: 'primary', + host: '127.0.0.1', + }, + secondary: { + id: 'secondary', + host: '127.0.0.2', + openMediaHotStandby: true + } + }); + + // Wait for connections to be established + await new Promise(resolve => setTimeout(resolve, 100)); + + primary = device['_primaryConnection']; + secondary = device['_secondaryConnection']; + }); + + test('should disable secondary heartbeats when primary is connected', async () => { + expect(primary).toBeTruthy(); + expect(secondary).toBeTruthy(); + + if (primary && secondary) { + expect(primary.isHearbeatEnabled()).toBe(true); + expect(secondary.isHearbeatEnabled()).toBe(false); + } + }); + + test('should enable secondary heartbeats when primary disconnects', async () => { + expect(primary).toBeTruthy(); + expect(secondary).toBeTruthy(); + + if (primary && secondary) { + // Simulate primary disconnect + await primary.dispose(); + + // Wait for primary to disconnect + await new Promise(resolve => setTimeout(resolve, 100)); + + // Verify heartbeat states switched correctly + expect(secondary.isHearbeatEnabled()).toBe(true); + expect(primary.isHearbeatEnabled()).toBe(false); + } + }); + + test('should disable primary heartbeasts when secondary is connected and primary is disconnected', async () => { + expect(primary).toBeTruthy(); + expect(secondary).toBeTruthy(); + + if (primary && secondary) { + // Simulate primary disconnect + await primary.dispose(); + + // Wait for primary to disconnect + await new Promise(resolve => setTimeout(resolve, 100)); + + // Wait for secondary to connect + await new Promise(resolve => setTimeout(resolve, 100)); + + // Verify heartbeat states switched correctly + expect(secondary.isHearbeatEnabled()).toBe(true); + expect(primary.isHearbeatEnabled()).toBe(false); + } + }) + + test('should handle rapid primary connection changes', async () => { + expect(primary).toBeTruthy(); + expect(secondary).toBeTruthy(); + + if (primary && secondary) { + const connectionStates: boolean[] = []; + + // Rapidly toggle primary connection + for (let i = 0; i < 5; i++) { + await primary.dispose(); + await new Promise(resolve => setTimeout(resolve, 50)); + primary.connect(); + await new Promise(resolve => setTimeout(resolve, 50)); + + connectionStates.push( + secondary.connected, + primary.connected + ); + } + + // Verify states remained consistent + connectionStates.forEach((state, i) => { + if (i % 2 === 0) { + expect(state).toBe(false); // Secondary should be disabled + } else { + expect(state).toBe(true); // Primary should be enabled + } + }); + } + }); + + afterEach(async () => { + if (mosConnection) { + await mosConnection.dispose(); + } + }); +}); \ No newline at end of file diff --git a/packages/connector/src/__tests__/lib.ts b/packages/connector/src/__tests__/lib.ts index a48ad19d..c7808c57 100644 --- a/packages/connector/src/__tests__/lib.ts +++ b/packages/connector/src/__tests__/lib.ts @@ -9,6 +9,7 @@ import { Socket, Server } from 'net' import { xml2js } from 'xml-js' import * as iconv from 'iconv-lite' +import { NCSServerConnection } from '../connection/NCSServerConnection' iconv.encodingExists('utf16-be') // breaks net.Server, disabled for now @@ -284,3 +285,18 @@ function fixSnapshotInner(data: any): [boolean, any] { } return [changed, data] } + +export function getConnectionsFromDevice(device: MosDevice): { + primary: NCSServerConnection | null + secondary: NCSServerConnection | null + current: NCSServerConnection | null +} { + return { + // @ts-expect-error private property + primary: device._primaryConnection, + // @ts-expect-error private property + secondary: device._secondaryConnection, + // @ts-expect-error private property + current: device._currentConnection, + } +} diff --git a/packages/connector/src/api.ts b/packages/connector/src/api.ts index 6e6f3158..10df731a 100644 --- a/packages/connector/src/api.ts +++ b/packages/connector/src/api.ts @@ -663,5 +663,15 @@ export interface IMOSDeviceConnectionOptions { * Set this to true to not use that port (will cause some methods to stop working) */ dontUseQueryPort?: boolean + + /** (Optional) Treat the secondary server as a "hot standby". + * A "hot standby" is a server that is powered down / in standby while the primary server is up. + * When a server is a hot standby it is expected to be disconnected and hence we will not send + * heartbeat messages. The connection status will still be reported as disconnected however as we + * do not pretend to be connected to something that is powered down. + * + * (This was added to support the hot standby feature of OpenMedia NRCS.) + */ + openMediaHotStandby?: boolean } } diff --git a/packages/connector/src/connection/NCSServerConnection.ts b/packages/connector/src/connection/NCSServerConnection.ts index 859d9f6d..94c0ac09 100644 --- a/packages/connector/src/connection/NCSServerConnection.ts +++ b/packages/connector/src/connection/NCSServerConnection.ts @@ -106,6 +106,34 @@ export class NCSServerConnection extends EventEmitter delete this._clients[clientID] } + /** */ + disableHeartbeats(): void { + for (const i in this._clients) { + this._clients[i].useHeartbeats = false + } + } + + /** */ + enableHeartbeats(): void { + for (const i in this._clients) { + this._clients[i].useHeartbeats = true + } + } + + /** */ + isHearbeatEnabled(): boolean { + for (const i in this._clients) { + if (this._clients[i].useHeartbeats) return true + } + return false + } + + setAutoReconnectInterval(interval: number): void { + for (const i in this._clients) { + this._clients[i].client.autoReconnectInterval = interval + } + } + connect(): void { for (const i in this._clients) { // Connect client