Skip to content

Commit

Permalink
fix:lock the remove-file-packages, to avoid creating corrupt files du…
Browse files Browse the repository at this point in the history
…e to multiple processes writing to it
  • Loading branch information
nytamin committed Feb 9, 2024
1 parent 145349b commit 592de67
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 51 deletions.
2 changes: 2 additions & 0 deletions shared/packages/worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"devDependencies": {
"@types/deep-diff": "^1.0.0",
"@types/node-fetch": "^2.5.8",
"@types/proper-lockfile": "^4.1.4",
"@types/tmp": "~0.2.2"
},
"dependencies": {
Expand All @@ -26,6 +27,7 @@
"form-data": "^4.0.0",
"mkdirp": "^2.1.3",
"node-fetch": "^2.6.1",
"proper-lockfile": "^4.1.2",
"tmp": "~0.2.1",
"tv-automation-quantel-gateway-client": "3.1.7",
"windows-network-drive": "^4.0.1",
Expand Down
167 changes: 116 additions & 51 deletions shared/packages/worker/src/worker/accessorHandlers/lib/FileHandler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import path from 'path'
import { promisify } from 'util'
import fs from 'fs'
import * as LockFile from 'proper-lockfile'
import _ from 'underscore'
import {
ExpectedPackage,
StatusCode,
Expand Down Expand Up @@ -53,48 +55,36 @@ export abstract class GenericFileAccessorHandle<Metadata> extends GenericAccesso

/** Schedule the package for later removal */
async delayPackageRemoval(filePath: string, ttl: number): Promise<void> {
const packagesToRemove = await this.getPackagesToRemove()

// Search for a pre-existing entry:
let found = false
for (const entry of packagesToRemove) {
if (entry.filePath === filePath) {
// extend the TTL if it was found:
entry.removeTime = Date.now() + ttl

found = true
break
await this.updatePackagesToRemove((packagesToRemove) => {
// Search for a pre-existing entry:
let alreadyExists = false
for (const entry of packagesToRemove) {
if (entry.filePath === filePath) {
// extend the TTL if it was found:
entry.removeTime = Date.now() + ttl

alreadyExists = true
break
}
}
}
if (!found) {
packagesToRemove.push({
filePath: filePath,
removeTime: Date.now() + ttl,
})
}

await this.storePackagesToRemove(packagesToRemove)
if (!alreadyExists) {
packagesToRemove.push({
filePath: filePath,
removeTime: Date.now() + ttl,
})
}
return packagesToRemove
})
}
/** Clear a scheduled later removal of a package */
async clearPackageRemoval(filePath: string): Promise<void> {
const packagesToRemove = await this.getPackagesToRemove()

let found = false
for (let i = 0; i < packagesToRemove.length; i++) {
const entry = packagesToRemove[i]
if (entry.filePath === filePath) {
packagesToRemove.splice(i, 1)
found = true
break
}
}
if (found) {
await this.storePackagesToRemove(packagesToRemove)
}
await this.updatePackagesToRemove((packagesToRemove) => {
return packagesToRemove.filter((entry) => entry.filePath !== filePath)
})
}
/** Remove any packages that are due for removal */
async removeDuePackages(): Promise<Reason | null> {
let packagesToRemove = await this.getPackagesToRemove()
const packagesToRemove = await this.getPackagesToRemove()

const removedFilePaths: string[] = []
for (const entry of packagesToRemove) {
Expand All @@ -112,21 +102,14 @@ export abstract class GenericFileAccessorHandle<Metadata> extends GenericAccesso
}
}

// Fetch again, to decrease the risk of race-conditions:
packagesToRemove = await this.getPackagesToRemove()
let changed = false
// Remove paths from array:
for (let i = 0; i < packagesToRemove.length; i++) {
const entry = packagesToRemove[i]
if (removedFilePaths.includes(entry.filePath)) {
packagesToRemove.splice(i, 1)
changed = true
break
}
}
if (changed) {
await this.storePackagesToRemove(packagesToRemove)
if (removedFilePaths.length > 0) {
// Update the list of packages to remove:
await this.updatePackagesToRemove((packagesToRemove) => {
// Remove the entries of the files we removed:
return packagesToRemove.filter((entry) => !removedFilePaths.includes(entry.filePath))
})
}

return null
}
/** Unlink (remove) a file, if it exists. Returns true if it did exist */
Expand Down Expand Up @@ -430,8 +413,84 @@ export abstract class GenericFileAccessorHandle<Metadata> extends GenericAccesso
}
return packagesToRemove
}
private async storePackagesToRemove(packagesToRemove: DelayPackageRemovalEntry[]): Promise<void> {
await fsWriteFile(this.deferRemovePackagesPath, JSON.stringify(packagesToRemove))
/** Update the deferred-remove-packages list */
private async updatePackagesToRemove(
cbManipulateList: (list: DelayPackageRemovalEntry[]) => DelayPackageRemovalEntry[]
): Promise<void> {
// Note: It is high likelihood that several processes will try to write to this file at the same time
// Therefore, we need to lock the file while writing to it.

const LOCK_ATTEMPTS_COUNT = 10
const RETRY_TIMEOUT = 100 // ms

let lockCompromisedError: Error | null = null

// Retry up to 10 times at locking and writing the file:
for (let i = 0; i < LOCK_ATTEMPTS_COUNT; i++) {
lockCompromisedError = null

// Get file lock
let releaseLock: (() => Promise<void>) | undefined = undefined
try {
releaseLock = await LockFile.lock(this.deferRemovePackagesPath, {
onCompromised: (err) => {
// This is called if the lock somehow gets compromised
this.worker.logger.warn(`updatePackagesToRemove: Lock compromised: ${err}`)
lockCompromisedError = err
},
})
} catch (e) {
if (e instanceof Error && (e as any).code === 'ENOENT') {
// The file does not exist. Create an empty file and try again:
await fsWriteFile(this.deferRemovePackagesPath, '')
continue
} else if (e instanceof Error && (e as any).code === 'ELOCKED') {
// Already locked, try again later:
await sleep(RETRY_TIMEOUT)
continue
} else {
// Unknown error. Log and exit:
this.worker.logger.error(e)
return
}
}
// At this point, we have acquired the lock.
try {
// Read and write to the file:
const oldList = await this.getPackagesToRemove()
const newList = cbManipulateList(clone(oldList))
if (!_.isEqual(oldList, newList)) {
if (lockCompromisedError) {
// The lock was compromised. Try again:
continue
}
await fsWriteFile(this.deferRemovePackagesPath, JSON.stringify(newList))
}

// Release the lock:
if (!lockCompromisedError) await releaseLock()
// Done, exit the function:
return
} catch (e) {
if (e instanceof Error && (e as any).code === 'ERELEASED') {
// Lock was already released. Something must have gone wrong (eg. someone deleted a folder),
// Log and try again:
this.worker.logger.warn(`updatePackagesToRemove: Lock was already released`)
continue
} else {
// Release the lock:
if (!lockCompromisedError) await releaseLock()
throw e
}
}
}
// At this point, the lock failed
this.worker.logger.error(
`updatePackagesToRemove: Failed to lock file "${this.deferRemovePackagesPath}" after ${LOCK_ATTEMPTS_COUNT} attempts`
)
if (lockCompromisedError) {
this.worker.logger.error(`updatePackagesToRemove: lockCompromisedError: ${lockCompromisedError}`)
}
}
}

Expand All @@ -448,3 +507,9 @@ enum StatusCategory {
WATCHER = 'watcher',
FILE = 'file_',
}
function clone<T>(o: T): T {
return JSON.parse(JSON.stringify(o))
}
async function sleep(duration: number): Promise<void> {
return new Promise((r) => setTimeout(r, duration))
}
21 changes: 21 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,13 @@
resolved "https://registry.yarnpkg.com/@types/parse-json/-/parse-json-4.0.0.tgz#2f8bb441434d163b35fb8ffdccd7138927ffb8c0"
integrity sha512-//oorEZjL6sbPcKUaCdIGlIUeH26mgzimjBB77G6XRgnDl/L5wOnpyBGRe/Mmf5CVW3PwEBE1NjiMZ/ssFh4wA==

"@types/proper-lockfile@^4.1.4":
version "4.1.4"
resolved "https://registry.yarnpkg.com/@types/proper-lockfile/-/proper-lockfile-4.1.4.tgz#cd9fab92bdb04730c1ada542c356f03620f84008"
integrity sha512-uo2ABllncSqg9F1D4nugVl9v93RmjxF6LJzQLMLDdPaXCUIDPeOJ21Gbqi43xNKzBi/WQ0Q0dICqufzQbMjipQ==
dependencies:
"@types/retry" "*"

"@types/qs@*":
version "6.9.7"
resolved "https://registry.yarnpkg.com/@types/qs/-/qs-6.9.7.tgz#63bb7d067db107cc1e457c303bc25d511febf6cb"
Expand All @@ -1756,6 +1763,11 @@
dependencies:
"@types/node" "*"

"@types/retry@*":
version "0.12.5"
resolved "https://registry.yarnpkg.com/@types/retry/-/retry-0.12.5.tgz#f090ff4bd8d2e5b940ff270ab39fd5ca1834a07e"
integrity sha512-3xSjTp3v03X/lSQLkczaN9UIEwJMoMCA1+Nb5HfbJEQWogdeQIyVtTvxPXDQjZ5zws8rFQfVfRdz03ARihPJgw==

"@types/semver@^7.3.12":
version "7.3.13"
resolved "https://registry.yarnpkg.com/@types/semver/-/semver-7.3.13.tgz#da4bfd73f49bd541d28920ab0e2bf0ee80f71c91"
Expand Down Expand Up @@ -8286,6 +8298,15 @@ promzard@^0.3.0:
dependencies:
read "1"

proper-lockfile@^4.1.2:
version "4.1.2"
resolved "https://registry.yarnpkg.com/proper-lockfile/-/proper-lockfile-4.1.2.tgz#c8b9de2af6b2f1601067f98e01ac66baa223141f"
integrity sha512-TjNPblN4BwAWMXU8s9AEz4JmQxnD1NNL7bNOY/AKUzyamc379FWASUhc/K1pL2noVb+XmZKLL68cjzLsiOAMaA==
dependencies:
graceful-fs "^4.2.4"
retry "^0.12.0"
signal-exit "^3.0.2"

proto-list@~1.2.1:
version "1.2.4"
resolved "https://registry.yarnpkg.com/proto-list/-/proto-list-1.2.4.tgz#212d5bfe1318306a420f6402b8e26ff39647a849"
Expand Down

0 comments on commit 592de67

Please sign in to comment.