Skip to content

Commit

Permalink
fix: race condition around heartbeating timeout after re-election
Browse files Browse the repository at this point in the history
  • Loading branch information
ilikejames committed Feb 24, 2023
1 parent 721a63d commit 998bbc1
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {
race,
shareReplay,
switchMap,
tap,
timer,
withLatestFrom,
} from 'rxjs'
Expand All @@ -31,7 +30,7 @@ import {
WhoIsLeaderResponse,
} from './messageTypes'

type LeadershipOptions = Omit<ElectionOptions, '___delaySelfVoteForTesting'> & {
export type LeadershipOptions = Omit<ElectionOptions, '___delaySelfVoteForTesting'> & {
startupTimeout: number
channelName: string
logger?: LoggerOptions
Expand Down Expand Up @@ -63,6 +62,13 @@ export class LeadershipSvc {
this.leader$ = this.leader.pipe(shareReplay(1))
this.subscription = this.leader$.subscribe()

// remove undefined values
options &&
Object.keys(options).forEach(k => {
const key = k as keyof LeadershipOptions
if (options[key] === undefined) delete options[key]
})

this.options = { ...defaultSettings, ...options }
this.channel = new ChannelNetwork<LeadershipTopics>(this.options.channelName, this.iam, this.options.logger)

Expand Down Expand Up @@ -171,6 +177,8 @@ export class LeadershipSvc {
leaderId: results.winner!,
})

this.heartbeat()

this.options.logger?.info(loggerName, `I am ${results.type === ElectionResultTypes.Won ? 'the leader' : 'a follower'}`)
}

Expand All @@ -181,7 +189,7 @@ export class LeadershipSvc {
// TODO: can we check for something present e.g. local storage item.
// no item... go straight to election
this.options.logger?.info(loggerName, 'Starting...')
const timeoutMs = 10 + Math.random() * this.options.startupTimeout
const timeoutMs = this.options.startupTimeout

this.options.logger?.debug(loggerName, `Waiting for ${LeadershipTopicTypes.WhoIsLeaderResponse} within ${timeoutMs}ms`)

Expand All @@ -197,8 +205,6 @@ export class LeadershipSvc {
status: LeadershipStatus.ELECTING,
})
this.election.start()
} finally {
this.heartbeat()
}
}

Expand All @@ -219,18 +225,20 @@ export class LeadershipSvc {
const timeout$ = timer(this.options.heartbeatTimeout).pipe(map(() => 'timeout'))
return combineLatest([of(time), race(request$, timeout$)])
}),
tap(([sent, received]) => {
map(([sent, received]) => {
if (received === 'timeout') {
// new election
this.options.logger?.info(loggerName, 'leader is dead.')
this.election.start()
return
return 'timeout'
}
const totalTime = Date.now() - sent
this.options.logger?.info(loggerName, 'heartbeat', 'total =', totalTime)
}),
filter(r => r === 'timeout'),
)
.subscribe(),
.subscribe(() => {
// new election
this.options.logger?.info(loggerName, 'leader is dead.')
this.election.start()
}),
)
}

Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion packages/leader/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export * from './types'
export * from './LeadshipSvc/LeadershipSvc'
export * from './LeadershipSvc'
16 changes: 16 additions & 0 deletions packages/test-app/src/api/testValues.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export const testValues = {
getInteger: (key: string, defaultValue?: number) => {
const v = sessionStorage.getItem(key)
console.log('getInteger', key, v)
try {
const n = v ? parseInt(v) : undefined
return Number.isInteger(n) ? n : defaultValue
} catch {
return defaultValue
}
},
getString: (key: string, defaultValue?: string) => {
const v = sessionStorage.getItem(key)
return v ?? defaultValue
},
}
15 changes: 12 additions & 3 deletions packages/test-app/src/api/useLeader.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
import { bind } from '@react-rxjs/core'
import { LeadershipSvc } from '@tabrx/leader'
import { LeadershipOptions, LeadershipSvc } from '@tabrx/leader'
import { testValues } from './testValues'

export const leadershipSvc = new LeadershipSvc({
const options: Partial<LeadershipOptions> = {
channelName: testValues.getString('test-channelName', 'tabrx-leader'),
electionChannelName: testValues.getString('test-channelName', 'tabrx-leader'),
logger: {
debug: console.debug,
info: console.info,
},
heartbeatInterval: 1000,
heartbeatTimeout: 900,
})
startupTimeout: testValues.getInteger('test-startupTimeout'),
electionTimeoutMin: testValues.getInteger('test-electionTimeoutMin'),
}

console.log('options.a', JSON.stringify(options))

export const leadershipSvc = new LeadershipSvc(options)
leadershipSvc.start()

export const [useLeader, leader$] = bind(leadershipSvc.leader$, undefined)
2 changes: 1 addition & 1 deletion packages/test-e2e/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ export const config: PlaywrightTestConfig = {
projects: [
{
name: 'chromium',

use: {
...devices['Desktop Chrome'],
launchOptions: {
Expand All @@ -59,6 +58,7 @@ export const config: PlaywrightTestConfig = {
{
name: 'webkit',
use: {
headless: true,
...devices['Desktop Safari'],
},
},
Expand Down
67 changes: 60 additions & 7 deletions packages/test-e2e/src/leadership.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,41 @@ test.describe.parallel('leadership', () => {
return env.dispose()
})

test('instances go through the startup process', async ({ context }) => {
const tabNames = ['a', 'b', 'c']
log.when('Opening tabs', tabNames)
const instances = await createMultipleInstances(env.url, context, tabNames, {
electionTimeoutMin: 2_000,
startupTimeout: 2_000,
})

const waitOptions: WaitOptions = { interval: 500, timeout: 5_000 }

log.then('Starts "INITIALIZING"')
await waitUntil(async () => {
const status = await getAllLeadershipStatus(instances, { silent: true })
return status.every(v => v.status === 'INITIALIZING')
}, waitOptions)
const initial = await getAllLeadershipStatus(instances)
initial.forEach(({ leader }) => expect(leader).toBe('-'))

log.then('Then "ELECTING"')
await waitUntil(async () => {
const status = await getAllLeadershipStatus(instances, { silent: true })
return status.every(v => v.status === 'ELECTING')
}, waitOptions)
const electing = await getAllLeadershipStatus(instances)
electing.forEach(({ leader }) => expect(leader).toBe('-'))

log.then('A leader is elected')
await waitUntil(async () => {
const status = await getAllLeadershipStatus(instances, { silent: true })
return status.every(v => v.status !== 'ELECTING' && v.status !== 'INITIALIZING')
}, waitOptions)
const consensus = await getAllLeadershipStatus(instances)
consensus.forEach(({ leader }) => expect(leader).toMatch(/^[a|b|c]$/))
})

test('with multiple opening an election is held to find the leader', async ({ context }) => {
const tabNames = ['a', 'b', 'c']
log.when('Opening tabs', tabNames)
Expand All @@ -38,7 +73,7 @@ test.describe.parallel('leadership', () => {
const originalStatus = await getAllLeadershipStatus(instances)
const originalLeader = originalStatus.find(r => r.status === 'LEADER')!

const newTabs = ['d', 'e']
const newTabs = ['new-d', 'new- e']
log.when('Opening more tabs', newTabs)
const newInstances = await createMultipleInstances(env.url, context, newTabs)
await waitForElectionResults(newInstances)
Expand Down Expand Up @@ -119,18 +154,36 @@ test.describe.parallel('leadership', () => {
})
})

const createInstance = async (url: string, name: string, context: BrowserContext) => {
const createInstance = async (url: string, context: BrowserContext, options: InstanceOptions & { name: string }) => {
const page = await context.newPage()
await page.addInitScript(([name]) => sessionStorage.setItem('tabId', name), [name])

const testOptions: (keyof InstanceOptions)[] = ['channelName', 'electionTimeoutMin', 'startupTimeout']
await page.addInitScript(([name]) => sessionStorage.setItem('tabId', name), [options.name])
await Promise.all(
testOptions.map(key => {
if (options[key]) {
return page.addInitScript(([key, value]) => sessionStorage.setItem(`test-${key}`, `${value}`), [key, options[key]])
}
}),
)
await page.goto(url)
return page
}

const createMultipleInstances = async (url: string, context: BrowserContext, names: string[]) => {
// Why? Running all and resolving in Promise.all causes some to fail to be created (in firefox)
type InstanceOptions = {
channelName?: string
electionTimeoutMin?: number
startupTimeout?: number
}

const createMultipleInstances = async (url: string, context: BrowserContext, names: string[], options: InstanceOptions = {}) => {
// Why create individually?
// Running all and resolving in Promise.all causes some
// to fail to be created (in firefox)
const instances = new Array<Page>()
for (const name of names) {
instances.push(await createInstance(url, name, context))
const instance = await createInstance(url, context, { ...options, name })
instances.push(instance)
}
return instances
}
Expand Down Expand Up @@ -170,7 +223,7 @@ const waitForLeaderConsensus = async (instances: Page[], options?: WaitOptions)
const leader = status.find(s => s.status && ['LEADER'].includes(s.status))
return Boolean(leader && status.every(s => s.leader === leader.iam))
},
{ ...options, interval: 500 },
{ interval: 500, ...options },
)
}

Expand Down

0 comments on commit 998bbc1

Please sign in to comment.