Skip to content

Commit

Permalink
Merge pull request #5033 from mnaamani/colossus-3.10.2
Browse files Browse the repository at this point in the history
Colossus 3.10.2
  • Loading branch information
mnaamani authored Jan 11, 2024
2 parents 7701e08 + 6a04fd0 commit 62ea182
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 21 deletions.
5 changes: 5 additions & 0 deletions storage-node/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
### 3.10.2
- Fix processing large arrays causing high cpu during sync and cleanup runs [#5033](https://github.com/Joystream/joystream/pull/5033)

- Fix task runner to avoid ending prematurely on individual task failure [#5033](https://github.com/Joystream/joystream/pull/5033)

### 3.10.1

- Bug fix: call stack size exceeded error - [#5021](https://github.com/Joystream/joystream/pull/5021)
Expand Down
2 changes: 1 addition & 1 deletion storage-node/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "storage-node",
"description": "Joystream storage subsystem.",
"version": "3.10.1",
"version": "3.10.2",
"author": "Joystream contributors",
"bin": {
"storage-node": "./bin/run"
Expand Down
11 changes: 4 additions & 7 deletions storage-node/src/commands/util/fetch-bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import { QueryNodeApi } from '../..//services/queryNode/api'
import logger from '../../services/logger'
import stringify from 'fast-safe-stringify'
import path from 'path'

import { loadDataObjectIdCache } from '../../services/caching/localDataObjects'
/**
* CLI command:
* Fetch all data objects from a bucket into local store.
* Fetch data objects assigned to assigned bucket from remote node(s) into local store.
*
* @remarks
* Should not be executed while server is running.
Expand All @@ -18,11 +18,6 @@ export default class FetchBucket extends Command {

static flags = {
help: flags.help({ char: 'h' }),
workerId: flags.integer({
char: 'w',
required: true,
description: 'Storage node operator worker ID.',
}),
bucketId: flags.integer({
char: 'b',
required: true,
Expand Down Expand Up @@ -66,6 +61,8 @@ export default class FetchBucket extends Command {
const { flags } = this.parse(FetchBucket)
const bucketId = flags.bucketId.toString()
const qnApi = new QueryNodeApi(flags.queryNodeEndpoint)
await loadDataObjectIdCache(flags.uploads)

logger.info('Fetching bucket...')

try {
Expand Down
9 changes: 9 additions & 0 deletions storage-node/src/services/caching/localDataObjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ export function getDataObjectIdFromCache(
}
}

/**
* Checks if data object is present.
* @param dataObjectId
* @returns boolean
*/
export function isDataObjectIdInCache(dataObjectId: string): boolean {
return idCache.has(dataObjectId)
}

/**
* Returns file names from the local directory, ignoring subfolders.
*
Expand Down
9 changes: 4 additions & 5 deletions storage-node/src/services/sync/cleanupService.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import _ from 'lodash'
import superagent from 'superagent'
import urljoin from 'url-join'
import { getDataObjectIDs } from '../../services/caching/localDataObjects'
Expand All @@ -8,6 +7,7 @@ import { DataObjectDetailsFragment } from '../queryNode/generated/queries'
import { DataObligations, getDataObjectsByIDs, getStorageObligationsFromRuntime } from './storageObligations'
import { DeleteLocalFileTask } from './tasks'
import { TaskProcessorSpawner, WorkingStack } from './workingProcess'
import _ from 'lodash'

/**
* The maximum allowed threshold by which the QN processor can lag behind
Expand Down Expand Up @@ -69,10 +69,9 @@ export async function performCleanup(
)
}

const [model, storedObjectsIds] = await Promise.all([
getStorageObligationsFromRuntime(qnApi, buckets),
getDataObjectIDs(),
])
const model = await getStorageObligationsFromRuntime(qnApi, buckets)
const storedObjectsIds = getDataObjectIDs()

const assignedObjectsIds = model.dataObjects.map((obj) => obj.id)
const removedIds = _.difference(storedObjectsIds, assignedObjectsIds)
const removedObjects = await getDataObjectsByIDs(qnApi, removedIds)
Expand Down
14 changes: 8 additions & 6 deletions storage-node/src/services/sync/synchronizer.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import _ from 'lodash'
import { getDataObjectIDs } from '../../services/caching/localDataObjects'
import { getDataObjectIDs, isDataObjectIdInCache } from '../../services/caching/localDataObjects'
import logger from '../../services/logger'
import { QueryNodeApi } from '../queryNode/api'
import { DataObligations, getStorageObligationsFromRuntime } from './storageObligations'
import { DownloadFileTask } from './tasks'
import { TaskProcessorSpawner, WorkingStack } from './workingProcess'
import _ from 'lodash'

/**
* Temporary directory name for data uploading.
Expand Down Expand Up @@ -45,12 +45,14 @@ export async function performSync(
selectedOperatorUrl?: string
): Promise<void> {
logger.info('Started syncing...')
const [model, files] = await Promise.all([getStorageObligationsFromRuntime(qnApi, buckets), getDataObjectIDs()])
const model = await getStorageObligationsFromRuntime(qnApi, buckets)
const storedObjectIds = getDataObjectIDs()

const required = model.dataObjects
const assignedObjects = model.dataObjects
const assignedObjectIds = assignedObjects.map((obj) => obj.id)

const added = _.differenceWith(required, files, (required, file) => required.id === file)
const removed = _.differenceWith(files, required, (file, required) => file === required.id)
const added = assignedObjects.filter((obj) => !isDataObjectIdInCache(obj.id))
const removed = _.difference(storedObjectIds, assignedObjectIds)

logger.debug(`Sync - new objects: ${added.length}`)
logger.debug(`Sync - obsolete objects: ${removed.length}`)
Expand Down
9 changes: 7 additions & 2 deletions storage-node/src/services/sync/workingProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ export class TaskProcessor {

if (task !== null) {
logger.debug(task.description())
await task.execute()
try {
await task.execute()
} catch (err) {
// Catch the task failure to avoid the current process worker failing
logger.warn(`task failed: ${err.message}`)
}
} else {
if (this.exitOnCompletion) {
return
Expand Down Expand Up @@ -126,6 +131,6 @@ export class TaskProcessorSpawner {
processes.push(processor.process())
}

await Promise.all(processes)
await Promise.allSettled(processes)
}
}

0 comments on commit 62ea182

Please sign in to comment.