Skip to content

Commit

Permalink
Merge branch 'feat/openmedia-hotstandby-heartbeat-logic'
Browse files Browse the repository at this point in the history
  • Loading branch information
nytamin committed Dec 5, 2024
2 parents fe4e42d + 562d74b commit 340c078
Show file tree
Hide file tree
Showing 7 changed files with 362 additions and 2 deletions.
19 changes: 18 additions & 1 deletion packages/connector/src/MosConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export class MosConnection extends EventEmitter<MosConnectionEvents> implements
this._debug,
this.mosTypes.strict
)
let secondary = null
let secondary: NCSServerConnection | null = null
this._ncsConnections[connectionOptions.primary.host] = primary

primary.on('rawMessage', (type: string, message: string) => {
Expand Down Expand Up @@ -174,6 +174,23 @@ export class MosConnection extends EventEmitter<MosConnectionEvents> implements
false
)
}
// Handle that .openMediaHotStandby should not check for heartbeats on
// the secondary connection when the primary is connected
// And disable heartbeats on the primary when the primary is disconnected
if (connectionOptions.secondary?.openMediaHotStandby) {
// Initially disable heartbeats on secondary since primary should be attempted first
secondary.disableHeartbeats()

primary.on('connectionChanged', () => {
if (primary.connected) {
secondary?.disableHeartbeats()
primary.enableHeartbeats()
} else {
secondary?.enableHeartbeats()
primary.disableHeartbeats()
}
})
}
}

return this._registerMosDevice(
Expand Down
6 changes: 6 additions & 0 deletions packages/connector/src/__mocks__/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export class SocketMock extends EventEmitter implements Socket {

private _responses: Array<ReplyTypes> = []
private _autoReplyToHeartBeat = true
public mockConnectCount = 0

constructor() {
super()
Expand Down Expand Up @@ -102,6 +103,7 @@ export class SocketMock extends EventEmitter implements Socket {
}
// @ts-expect-error mock
connect(port: number, host: string): this {
this.mockConnectCount++
this.connectedPort = port
this.connectedHost = host

Expand Down Expand Up @@ -197,6 +199,10 @@ export class SocketMock extends EventEmitter implements Socket {

this.emit('connect')
}
mockEmitClose(): void {
this.emit('close')
}

mockSentMessage0(data: unknown, encoding: string): void {
if (this._autoReplyToHeartBeat) {
const str: string = typeof data === 'string' ? data : this.decode(data as any)
Expand Down
163 changes: 162 additions & 1 deletion packages/connector/src/__tests__/MosConnection.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
/* eslint-disable @typescript-eslint/unbound-method */
import { clearMocks, decode, delay, encode, getMessageId, getXMLReply, initMosConnection, setupMocks } from './lib'
import {
clearMocks,
decode,
delay,
encode,
getConnectionsFromDevice,
getMessageId,
getXMLReply,
initMosConnection,
setupMocks,
} from './lib'
import { SocketMock } from '../__mocks__/socket'
import { ServerMock } from '../__mocks__/server'
import { xmlData, xmlApiData } from '../__mocks__/testData'
Expand Down Expand Up @@ -532,6 +542,157 @@ describe('MosDevice: General', () => {
expect(onError).toHaveBeenCalledTimes(0)
expect(onWarning).toHaveBeenCalledTimes(0)

await mos.dispose()
})
test('Hot standby', async () => {
const mos = new MosConnection({
mosID: 'jestMOS',
acceptsConnections: true,
profiles: {
'0': true,
'1': true,
},
})
const onError = jest.fn((e) => console.log(e))
const onWarning = jest.fn((e) => console.log(e))
mos.on('error', onError)
mos.on('warning', onWarning)

expect(mos.acceptsConnections).toBe(true)
await initMosConnection(mos)
expect(mos.isListening).toBe(true)

const mosDevice = await mos.connect({
primary: {
id: 'primary',
host: '192.168.0.1',
timeout: 200,
},
secondary: {
id: 'secondary',
host: '192.168.0.2',
timeout: 200,
openMediaHotStandby: true,
},
})

expect(mosDevice).toBeTruthy()
expect(mosDevice.idPrimary).toEqual('jestMOS_primary')

const connections = getConnectionsFromDevice(mosDevice)
expect(connections.primary).toBeTruthy()
expect(connections.secondary).toBeTruthy()
connections.primary?.setAutoReconnectInterval(300)
connections.secondary?.setAutoReconnectInterval(300)

const onConnectionChange = jest.fn()
mosDevice.onConnectionChange((connectionStatus: IMOSConnectionStatus) => {
onConnectionChange(connectionStatus)
})

expect(SocketMock.instances).toHaveLength(7)
expect(SocketMock.instances[1].connectedHost).toEqual('192.168.0.1')
expect(SocketMock.instances[1].connectedPort).toEqual(10540)
expect(SocketMock.instances[2].connectedHost).toEqual('192.168.0.1')
expect(SocketMock.instances[2].connectedPort).toEqual(10541)
expect(SocketMock.instances[3].connectedHost).toEqual('192.168.0.1')
expect(SocketMock.instances[3].connectedPort).toEqual(10542)

// TODO: Perhaps the hot-standby should not be connected at all at this point?
expect(SocketMock.instances[4].connectedHost).toEqual('192.168.0.2')
expect(SocketMock.instances[4].connectedPort).toEqual(10540)
expect(SocketMock.instances[5].connectedHost).toEqual('192.168.0.2')
expect(SocketMock.instances[5].connectedPort).toEqual(10541)
expect(SocketMock.instances[6].connectedHost).toEqual('192.168.0.2')
expect(SocketMock.instances[6].connectedPort).toEqual(10542)

// Simulate primary connected:
for (const i of SocketMock.instances) {
if (i.connectedHost === '192.168.0.1') i.mockEmitConnected()
}
// Wait for the primary to be initially connected:
await waitFor(() => mosDevice.getConnectionStatus().PrimaryConnected, 1000)

// Check that the connection status is as we expect:
expect(mosDevice.getConnectionStatus()).toMatchObject({
PrimaryConnected: true,
PrimaryStatus: 'Primary: Connected',
SecondaryConnected: false, // This is expected behaviour from a hot standby - we leave it up to the library consumer to decide if this is bad or not
SecondaryStatus: 'Secondary: No heartbeats on port query',
})
expect(onConnectionChange).toHaveBeenCalled()
expect(onConnectionChange).toHaveBeenLastCalledWith({
PrimaryConnected: true,
PrimaryStatus: 'Primary: Connected',
SecondaryConnected: false, // This is expected from a hot standby
SecondaryStatus: 'Secondary: No heartbeats on port query',
})
onConnectionChange.mockClear()

// Simulate primary disconnect, secondary hot standby takes over:
for (const i of SocketMock.instances) {
i.mockConnectCount = 0
if (i.connectedHost === '192.168.0.1') i.mockEmitClose()
if (i.connectedHost === '192.168.0.2') i.mockEmitConnected()
}

// Wait for the secondary to be connected:
await waitFor(() => mosDevice.getConnectionStatus().SecondaryConnected, 1000)

// Check that the connection status is as we expect:
expect(mosDevice.getConnectionStatus()).toMatchObject({
PrimaryConnected: false,
PrimaryStatus: expect.stringContaining('Primary'),
SecondaryConnected: true,
SecondaryStatus: 'Secondary: Connected',
})
expect(onConnectionChange).toHaveBeenCalled()
expect(onConnectionChange).toHaveBeenLastCalledWith({
PrimaryConnected: false,
PrimaryStatus: expect.stringContaining('Primary'),
SecondaryConnected: true,
SecondaryStatus: 'Secondary: Connected',
})
onConnectionChange.mockClear()

// Simulate that the primary comes back online:
for (const i of SocketMock.instances) {
if (i.connectedHost === '192.168.0.1') {
expect(i.mockConnectCount).toBeGreaterThanOrEqual(1) // should have tried to reconnect
i.mockEmitConnected()
}

if (i.connectedHost === '192.168.0.2') i.mockEmitClose()
}

// Wait for the primary to be connected:
await waitFor(() => mosDevice.getConnectionStatus().PrimaryConnected, 1000)

// Check that the connection status is as we expect:
expect(mosDevice.getConnectionStatus()).toMatchObject({
PrimaryConnected: true,
PrimaryStatus: 'Primary: Connected',
SecondaryConnected: false, // This is expected from a hot standby
SecondaryStatus: 'Secondary: No heartbeats on port query',
})
expect(onConnectionChange).toHaveBeenCalled()
expect(onConnectionChange).toHaveBeenLastCalledWith({
PrimaryConnected: true,
PrimaryStatus: 'Primary: Connected',
SecondaryConnected: false, // This is expected from a hot standby
SecondaryStatus: 'Secondary: No heartbeats on port query',
})

await mos.dispose()
})
})
async function waitFor(fcn: () => boolean, timeout: number): Promise<void> {
const startTime = Date.now()

while (Date.now() - startTime < timeout) {
await delay(10)

if (fcn()) return
}
throw new Error('Timeout in waitFor')
}
122 changes: 122 additions & 0 deletions packages/connector/src/__tests__/OpenMediaHotStandby.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { MosConnection } from "../MosConnection";
import { getMosConnection, setupMocks } from "./lib";
import { NCSServerConnection } from "../connection/NCSServerConnection";

describe('Hot Standby Feature', () => {
let mosConnection: MosConnection;
let primary: NCSServerConnection | null;
let secondary: NCSServerConnection | null;

beforeAll(() => {
setupMocks();
});

beforeEach(async () => {
mosConnection = await getMosConnection({
'0': true,
'1': true,
}, false);

const device = await mosConnection.connect({
primary: {
id: 'primary',
host: '127.0.0.1',
},
secondary: {
id: 'secondary',
host: '127.0.0.2',
openMediaHotStandby: true
}
});

// Wait for connections to be established
await new Promise(resolve => setTimeout(resolve, 100));

primary = device['_primaryConnection'];
secondary = device['_secondaryConnection'];
});

test('should disable secondary heartbeats when primary is connected', async () => {
expect(primary).toBeTruthy();
expect(secondary).toBeTruthy();

if (primary && secondary) {
expect(primary.isHearbeatEnabled()).toBe(true);
expect(secondary.isHearbeatEnabled()).toBe(false);
}
});

test('should enable secondary heartbeats when primary disconnects', async () => {
expect(primary).toBeTruthy();
expect(secondary).toBeTruthy();

if (primary && secondary) {
// Simulate primary disconnect
await primary.dispose();

// Wait for primary to disconnect
await new Promise(resolve => setTimeout(resolve, 100));

// Verify heartbeat states switched correctly
expect(secondary.isHearbeatEnabled()).toBe(true);
expect(primary.isHearbeatEnabled()).toBe(false);
}
});

test('should disable primary heartbeasts when secondary is connected and primary is disconnected', async () => {
expect(primary).toBeTruthy();
expect(secondary).toBeTruthy();

if (primary && secondary) {
// Simulate primary disconnect
await primary.dispose();

// Wait for primary to disconnect
await new Promise(resolve => setTimeout(resolve, 100));

// Wait for secondary to connect
await new Promise(resolve => setTimeout(resolve, 100));

// Verify heartbeat states switched correctly
expect(secondary.isHearbeatEnabled()).toBe(true);
expect(primary.isHearbeatEnabled()).toBe(false);
}
})

test('should handle rapid primary connection changes', async () => {
expect(primary).toBeTruthy();
expect(secondary).toBeTruthy();

if (primary && secondary) {
const connectionStates: boolean[] = [];

// Rapidly toggle primary connection
for (let i = 0; i < 5; i++) {
await primary.dispose();
await new Promise(resolve => setTimeout(resolve, 50));
primary.connect();
await new Promise(resolve => setTimeout(resolve, 50));

connectionStates.push(
secondary.connected,
primary.connected
);
}

// Verify states remained consistent
connectionStates.forEach((state, i) => {
if (i % 2 === 0) {
expect(state).toBe(false); // Secondary should be disabled
} else {
expect(state).toBe(true); // Primary should be enabled
}
});
}
});

afterEach(async () => {
if (mosConnection) {
await mosConnection.dispose();
}
});
});
16 changes: 16 additions & 0 deletions packages/connector/src/__tests__/lib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { Socket, Server } from 'net'
import { xml2js } from 'xml-js'

import * as iconv from 'iconv-lite'
import { NCSServerConnection } from '../connection/NCSServerConnection'
iconv.encodingExists('utf16-be')

// breaks net.Server, disabled for now
Expand Down Expand Up @@ -284,3 +285,18 @@ function fixSnapshotInner(data: any): [boolean, any] {
}
return [changed, data]
}

export function getConnectionsFromDevice(device: MosDevice): {
primary: NCSServerConnection | null
secondary: NCSServerConnection | null
current: NCSServerConnection | null
} {
return {
// @ts-expect-error private property
primary: device._primaryConnection,
// @ts-expect-error private property
secondary: device._secondaryConnection,
// @ts-expect-error private property
current: device._currentConnection,
}
}
Loading

0 comments on commit 340c078

Please sign in to comment.