Skip to content

Commit

Permalink
fix: batch writes to json file, to avoid timeouts when scheduling man…
Browse files Browse the repository at this point in the history
…y writes at the same time.
  • Loading branch information
nytamin committed Feb 13, 2024
1 parent 87b90d7 commit d0d6b60
Show file tree
Hide file tree
Showing 3 changed files with 437 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import path from 'path'
import { promisify } from 'util'
import fs from 'fs'
import * as LockFile from 'proper-lockfile'
import _ from 'underscore'
import {
ExpectedPackage,
StatusCode,
Expand All @@ -23,6 +21,7 @@ import { GenericAccessorHandle } from '../genericHandle'
import { MonitorInProgress } from '../../lib/monitorInProgress'
import { removeBasePath } from './pathJoin'
import { FileEvent, FileWatcher, IFileWatcher } from './FileWatcher'
import { updateJSONFileBatch } from './json-write-file'

export const LocalFolderAccessorHandleType = 'localFolder'
export const FileShareAccessorHandleType = 'fileShare'
Expand All @@ -32,7 +31,6 @@ const fsReadFile = promisify(fs.readFile)
const fsReaddir = promisify(fs.readdir)
const fsRmDir = promisify(fs.rmdir)
const fsStat = promisify(fs.stat)
const fsWriteFile = promisify(fs.writeFile)
const fsUnlink = promisify(fs.unlink)
const fsLstat = promisify(fs.lstat)

Expand Down Expand Up @@ -423,73 +421,23 @@ export abstract class GenericFileAccessorHandle<Metadata> extends GenericAccesso
const LOCK_ATTEMPTS_COUNT = 10
const RETRY_TIMEOUT = 100 // ms

let lockCompromisedError: Error | null = null

// Retry up to 10 times at locking and writing the file:
for (let i = 0; i < LOCK_ATTEMPTS_COUNT; i++) {
lockCompromisedError = null

// Get file lock
let releaseLock: (() => Promise<void>) | undefined = undefined
try {
releaseLock = await LockFile.lock(this.deferRemovePackagesPath, {
onCompromised: (err) => {
// This is called if the lock somehow gets compromised
this.worker.logger.warn(`updatePackagesToRemove: Lock compromised: ${err}`)
lockCompromisedError = err
},
})
} catch (e) {
if (e instanceof Error && (e as any).code === 'ENOENT') {
// The file does not exist. Create an empty file and try again:
await fsWriteFile(this.deferRemovePackagesPath, '')
continue
} else if (e instanceof Error && (e as any).code === 'ELOCKED') {
// Already locked, try again later:
await sleep(RETRY_TIMEOUT)
continue
} else {
// Unknown error. Log and exit:
this.worker.logger.error(e)
return
}
}
// At this point, we have acquired the lock.
try {
// Read and write to the file:
const oldList = await this.getPackagesToRemove()
const newList = cbManipulateList(clone(oldList))
if (!_.isEqual(oldList, newList)) {
if (lockCompromisedError) {
// The lock was compromised. Try again:
continue
}
await fsWriteFile(this.deferRemovePackagesPath, JSON.stringify(newList))
}

// Release the lock:
if (!lockCompromisedError) await releaseLock()
// Done, exit the function:
return
} catch (e) {
if (e instanceof Error && (e as any).code === 'ERELEASED') {
// Lock was already released. Something must have gone wrong (eg. someone deleted a folder),
// Log and try again:
this.worker.logger.warn(`updatePackagesToRemove: Lock was already released`)
continue
} else {
// Release the lock:
if (!lockCompromisedError) await releaseLock()
throw e
try {
await updateJSONFileBatch<DelayPackageRemovalEntry[]>(
this.deferRemovePackagesPath,
(list) => {
return cbManipulateList(list ?? [])
},
{
retryCount: LOCK_ATTEMPTS_COUNT,
retryTimeout: RETRY_TIMEOUT,
logError: (error) => this.worker.logger.error(error),
logWarning: (message) => this.worker.logger.warn(message),
}
}
}
// At this point, the lock failed
this.worker.logger.error(
`updatePackagesToRemove: Failed to lock file "${this.deferRemovePackagesPath}" after ${LOCK_ATTEMPTS_COUNT} attempts`
)
if (lockCompromisedError) {
this.worker.logger.error(`updatePackagesToRemove: lockCompromisedError: ${lockCompromisedError}`)
)
} catch (e) {
// Not much we can do about it..
// Log and continue:
this.worker.logger.error(e)
}
}
}
Expand All @@ -507,9 +455,3 @@ enum StatusCategory {
WATCHER = 'watcher',
FILE = 'file_',
}
function clone<T>(o: T): T {
return JSON.parse(JSON.stringify(o))
}
async function sleep(duration: number): Promise<void> {
return new Promise((r) => setTimeout(r, duration))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import { getTmpPath, updateJSONFile, updateJSONFileBatch } from '../json-write-file'
import { promises as fs } from 'fs'

const FILE_A = 'file_a.json'
async function cleanup() {
await Promise.all([unlinkIfExists(FILE_A), unlinkIfExists(getLockPath(FILE_A)), unlinkIfExists(getTmpPath(FILE_A))])
}

beforeEach(cleanup)
afterEach(cleanup)

test('updateJSONFile: single write', async () => {
const cbManipulate = jest.fn(() => {
return {
a: 1,
}
})
await updateJSONFile(FILE_A, cbManipulate)

expect(cbManipulate).toBeCalledTimes(1)
expect(await readIfExists(FILE_A)).toBe(
JSON.stringify({
a: 1,
})
)
})

test('updateJSONFile: 2 writes', async () => {
const cbManipulate = jest.fn((o) => {
o = o || []
o.push('a')
return o
})

const p0 = updateJSONFile(FILE_A, cbManipulate)
await sleep(5)

const p1 = updateJSONFile(FILE_A, cbManipulate)

await Promise.all([p0, p1])

expect(cbManipulate).toBeCalledTimes(2)
expect(await readIfExists(FILE_A)).toBe(JSON.stringify(['a', 'a']))
})
test('updateJSONFile: 10 writes', async () => {
const cbManipulate = jest.fn((o) => {
o = o || []
o.push('b')
return o
})

const config = {
retryTimeout: 30,
retryCount: 3,
}

// This should be an impossible tasks, because there will be too many locks, and not enough time to resolve them:

let error: any
try {
await Promise.all([
updateJSONFile(FILE_A, cbManipulate, config),
updateJSONFile(FILE_A, cbManipulate, config),
updateJSONFile(FILE_A, cbManipulate, config),
updateJSONFile(FILE_A, cbManipulate, config),
updateJSONFile(FILE_A, cbManipulate, config),
updateJSONFile(FILE_A, cbManipulate, config),
updateJSONFile(FILE_A, cbManipulate, config),
updateJSONFile(FILE_A, cbManipulate, config),
updateJSONFile(FILE_A, cbManipulate, config),
updateJSONFile(FILE_A, cbManipulate, config),
])
} catch (e) {
error = e
}
expect(error + '').toMatch(/Failed to lock file/)

// Wait for the lock functions to finish retrying:
await sleep(config.retryTimeout * config.retryCount)
})

test('updateJSONFileBatch: single write', async () => {
const cbManipulate = jest.fn(() => {
return {
b: 1,
}
})
await updateJSONFileBatch(FILE_A, cbManipulate)

expect(cbManipulate).toBeCalledTimes(1)
expect(await readIfExists(FILE_A)).toBe(
JSON.stringify({
b: 1,
})
)
})

test('updateJSONFileBatch: 3 writes', async () => {
const v = await readIfExists(FILE_A)
expect(v).toBe(undefined)

const cbManipulate = jest.fn((o) => {
o = o || []
o.push('a')
return o
})

const p0 = updateJSONFileBatch(FILE_A, cbManipulate)
await sleep(5)

const p1 = updateJSONFileBatch(FILE_A, cbManipulate)
const p2 = updateJSONFileBatch(FILE_A, cbManipulate)

await Promise.all([p0, p1, p2])

expect(cbManipulate).toBeCalledTimes(3)
expect(await readIfExists(FILE_A)).toBe(JSON.stringify(['a', 'a', 'a']))
})
test('updateJSONFileBatch: 20 writes', async () => {
const cbManipulate = jest.fn((o) => {
o = o || []
o.push('a')
return o
})

const config = {
retryTimeout: 30,
retryCount: 3,
}

const ps: Promise<void>[] = []
let expectResult: string[] = []
for (let i = 0; i < 20; i++) {
ps.push(updateJSONFileBatch(FILE_A, cbManipulate, config))
expectResult.push('a')
}

await Promise.all(ps)

expect(cbManipulate).toBeCalledTimes(20)
expect(await readIfExists(FILE_A)).toBe(JSON.stringify(expectResult))
})

async function readIfExists(filePath: string): Promise<string | undefined> {
try {
return await fs.readFile(filePath, 'utf-8')
} catch (e) {
if ((e as any)?.code === 'ENOENT') {
// not found
return undefined
} else throw e
}
}
async function unlinkIfExists(filePath: string): Promise<void> {
try {
await fs.unlink(filePath)
} catch (e) {
if ((e as any)?.code === 'ENOENT') {
// not found, that's okay
} else throw e
}
}
function getLockPath(filePath: string): string {
return filePath + '.lock'
}
function sleep(duration: number): Promise<void> {
return new Promise((r) => setTimeout(r, duration))
}
Loading

0 comments on commit d0d6b60

Please sign in to comment.