Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Colossus: Rework sync and cleanup #5194

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions storage-node/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
### 4.4.0

- **Optimizations:** The way data objects / data object ids are queried and processed during sync and cleanup has been optimized:
- `DataObjectDetailsLoader` and `DataObjectIdsLoader` were implemented. They allow loading data objects / data object ids in batches using a connection query and avoid fetching redundant data from the GraphQL server.
- Sync and cleanup services now process tasks in batches of `10_000` to avoid overflowing the memory.
- Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`).
- A safety mechanism was added to avoid removing "deleted" objects for which a related `DataObjectDeleted` event cannot be found in storage squid.
- Improved logging during cleanup.

### 4.3.0

- Adds `archive` mode / command, which allows downloading, compressing and uploading all data objects to an external S3 bucket that can be used as a backup.
Expand Down
552 changes: 260 additions & 292 deletions storage-node/README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion storage-node/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "storage-node",
"description": "Joystream storage subsystem.",
"version": "4.3.0",
"version": "4.4.0",
"author": "Joystream contributors",
"bin": {
"storage-node": "./bin/run"
Expand Down
71 changes: 40 additions & 31 deletions storage-node/src/services/archive/ArchiveService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
OBJECTS_TRACKING_FILENAME,
} from './tracking'
import { QueryNodeApi } from '../queryNode/api'
import { getStorageObligationsFromRuntime } from '../sync/storageObligations'
import { DataObjectDetailsLoader, getStorageObligationsFromRuntime } from '../sync/storageObligations'
import { getDownloadTasks } from '../sync/synchronizer'
import sleep from 'sleep-promise'
import { Logger } from 'winston'
Expand Down Expand Up @@ -369,40 +369,49 @@ export class ArchiveService {
public async performSync(): Promise<void> {
const model = await getStorageObligationsFromRuntime(this.queryNodeApi)

const assignedObjects = model.dataObjects
const added = assignedObjects.filter((obj) => !this.objectTrackingService.isTracked(obj.id))
added.sort((a, b) => parseInt(b.id) - parseInt(a.id))
const assignedObjectsIds = await model.createAssignedObjectsIdsLoader(true).getAll()
const unsyncedIds = assignedObjectsIds
.filter((id) => !this.objectTrackingService.isTracked(id))
.map((id) => parseInt(id))
.sort((a, b) => a - b)

this.logger.info(`Sync - new objects: ${added.length}`)
this.logger.info(`Sync - new objects: ${unsyncedIds.length}`)

// Add new download tasks while the upload dir size limit allows
while (added.length) {
const uploadDirectorySize = await this.getUploadDirSize()
while (true) {
const object = added.pop()
if (!object) {
break
}
if (object.size + uploadDirectorySize + this.syncQueueObjectsSize > this.uploadDirSizeLimit) {
this.logger.debug(
`Waiting for some disk space to free ` +
`(upload_dir: ${uploadDirectorySize} / ${this.uploadDirSizeLimit}, ` +
`sync_q=${this.syncQueueObjectsSize}, obj_size=${object.size})... `
// Sync objects in batches of 10_000
for (const unsyncedIdsBatch of _.chunk(unsyncedIds, 10_000)) {
const objectsBatchLoader = new DataObjectDetailsLoader(this.queryNodeApi, {
by: 'ids',
ids: unsyncedIdsBatch.map((id) => id.toString()),
})
const objectsBatch = await objectsBatchLoader.getAll()
// Add new download tasks while the upload dir size limit allows
while (objectsBatch.length) {
const uploadDirectorySize = await this.getUploadDirSize()
while (true) {
const object = objectsBatch.pop()
if (!object) {
break
}
if (object.size + uploadDirectorySize + this.syncQueueObjectsSize > this.uploadDirSizeLimit) {
this.logger.debug(
`Waiting for some disk space to free ` +
`(upload_dir: ${uploadDirectorySize} / ${this.uploadDirSizeLimit}, ` +
`sync_q=${this.syncQueueObjectsSize}, obj_size=${object.size})... `
)
objectsBatch.push(object)
await sleep(60_000)
break
}
const [downloadTask] = await getDownloadTasks(
model,
[object],
this.uploadQueueDir,
this.tmpDownloadDir,
this.syncWorkersTimeout,
this.hostId
)
added.push(object)
await sleep(60_000)
break
await this.addDownloadTask(downloadTask, object.size)
}
const [downloadTask] = await getDownloadTasks(
model,
[],
[object],
this.uploadQueueDir,
this.tmpDownloadDir,
this.syncWorkersTimeout,
this.hostId
)
await this.addDownloadTask(downloadTask, object.size)
}
}
}
Expand Down
129 changes: 86 additions & 43 deletions storage-node/src/services/queryNode/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@ import fetch from 'cross-fetch'
import stringify from 'fast-safe-stringify'
import logger from '../logger'
import {
DataObjectByBagIdsDetailsFragment,
DataObjectDetailsFragment,
DataObjectIdsByBagId,
DataObjectIdsByBagIdQuery,
DataObjectIdsByBagIdQueryVariables,
DataObjectsByBagsConnection,
DataObjectsByBagsConnectionQuery,
DataObjectsByBagsConnectionQueryVariables,
DataObjectsByIdsConnection,
DataObjectsByIdsConnectionQuery,
DataObjectsByIdsConnectionQueryVariables,
DataObjectsWithBagDetailsByIds,
DataObjectsWithBagDetailsByIdsQuery,
DataObjectsWithBagDetailsByIdsQueryVariables,
DataObjectWithBagDetailsFragment,
GetAllStorageBagDetails,
GetAllStorageBagDetailsQuery,
GetAllStorageBagDetailsQueryVariables,
GetDataObjects,
GetDataObjectsByBagIds,
GetDataObjectsByBagIdsQuery,
GetDataObjectsByBagIdsQueryVariables,
GetDataObjectsDeletedEvents,
GetDataObjectsDeletedEventsQuery,
GetDataObjectsDeletedEventsQueryVariables,
GetDataObjectsQuery,
GetDataObjectsQueryVariables,
GetSquidVersion,
GetSquidVersionQuery,
GetSquidVersionQueryVariables,
Expand All @@ -41,7 +47,7 @@ import {
StorageBucketDetailsFragment,
StorageBucketIdsFragment,
} from './generated/queries'
import { Maybe, StorageBagWhereInput } from './generated/schema'
import { Maybe } from './generated/schema'

/**
* Defines query paging limits.
Expand All @@ -53,7 +59,7 @@ type PaginationQueryVariables = {
lastCursor?: Maybe<string>
}

type PaginationQueryResult<T = unknown> = {
export type PaginationQueryResult<T = unknown> = {
edges: { node: T }[]
pageInfo: {
hasNextPage: boolean
Expand Down Expand Up @@ -249,50 +255,87 @@ export class QueryNodeApi {
}

/**
* Returns data objects info by pages for the given bags.
* Gets a page of data objects belonging to specified bags.
*
* @param bagIds - query filter: bag IDs
*/
public async getDataObjectsByBagIds(bagIds: string[]): Promise<Array<DataObjectByBagIdsDetailsFragment>> {
const allBagIds = [...bagIds] // Copy to avoid modifying the original array
let fullResult: DataObjectByBagIdsDetailsFragment[] = []
while (allBagIds.length) {
const bagIdsBatch = allBagIds.splice(0, 1000)
const input: StorageBagWhereInput = { id_in: bagIdsBatch }
fullResult = [
...fullResult,
...(await this.multipleEntitiesQuery<GetDataObjectsByBagIdsQuery, GetDataObjectsByBagIdsQueryVariables>(
GetDataObjectsByBagIds,
{ bagIds: input },
'storageDataObjects'
)),
]
}
public async getDataObjectsByBagsPage<IncludeDetails extends boolean>(
bagIds: string[],
limit: number,
after: string | undefined,
includeDetails: IncludeDetails,
isAccepted?: boolean
): Promise<
IncludeDetails extends true
? PaginationQueryResult<DataObjectDetailsFragment> | null
: PaginationQueryResult<{ id: string }> | null
> {
return this.uniqueEntityQuery<DataObjectsByBagsConnectionQuery, DataObjectsByBagsConnectionQueryVariables>(
DataObjectsByBagsConnection,
{
bagIds: [...bagIds],
isAccepted,
limit,
after,
includeDetails: includeDetails,
},
'storageDataObjectsConnection'
)
}

return fullResult
/**
* Gets a page of data objects by the given list of dataObject IDs.
*
* @param ids - query filter: data object ids
*/
public async getDataObjectsByIdsPage<IncludeDetails extends boolean>(
ids: string[],
limit: number,
after: string | undefined,
includeDetails: IncludeDetails,
isAccepted?: boolean
): Promise<
IncludeDetails extends true
? PaginationQueryResult<DataObjectDetailsFragment> | null
: PaginationQueryResult<{ id: string }> | null
> {
return this.uniqueEntityQuery<DataObjectsByIdsConnectionQuery, DataObjectsByIdsConnectionQueryVariables>(
DataObjectsByIdsConnection,
{
ids: [...ids],
isAccepted,
limit,
after,
includeDetails: includeDetails,
},
'storageDataObjectsConnection'
)
}

/**
* Returns data objects info by pages for the given dataObject IDs.
* Returns a list of data objects by ids, with their corresponding bag details
*
* @param dataObjectIds - query filter: dataObject IDs
* @param ids - query filter: data object ids
*/
public async getDataObjectDetails(dataObjectIds: string[]): Promise<Array<DataObjectDetailsFragment>> {
const allDataObjectIds = [...dataObjectIds] // Copy to avoid modifying the original array
let fullResult: DataObjectDetailsFragment[] = []
while (allDataObjectIds.length) {
const dataObjectIdsBatch = allDataObjectIds.splice(0, 1000)
fullResult = [
...fullResult,
...(await this.multipleEntitiesQuery<GetDataObjectsQuery, GetDataObjectsQueryVariables>(
GetDataObjects,
{ dataObjectIds: dataObjectIdsBatch },
'storageDataObjects'
)),
]
}
public async getDataObjectsWithBagDetails(ids: string[]): Promise<DataObjectWithBagDetailsFragment[]> {
return this.multipleEntitiesQuery<
DataObjectsWithBagDetailsByIdsQuery,
DataObjectsWithBagDetailsByIdsQueryVariables
>(DataObjectsWithBagDetailsByIds, { ids: [...ids] }, 'storageDataObjects')
}

return fullResult
/**
* Returns a list of data object ids that belong to a given bag.
*
* @param bagId - query filter: bag ID
*/
public async getDataObjectIdsByBagId(bagId: string): Promise<string[]> {
const result = await this.multipleEntitiesQuery<DataObjectIdsByBagIdQuery, DataObjectIdsByBagIdQueryVariables>(
DataObjectIdsByBagId,
{ bagId },
'storageDataObjects'
)
return result.map((o) => o.id)
}

/**
Expand Down
76 changes: 65 additions & 11 deletions storage-node/src/services/queryNode/queries/queries.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ query getAllStorageBagDetails {
}
}

fragment DataObjectByBagIdsDetails on StorageDataObject {
query dataObjectIdsByBagId($bagId: String) {
storageDataObjects(where: { storageBag: { id_eq: $bagId } }) {
id
}
}

fragment DataObjectDetails on StorageDataObject {
id
size
ipfsHash
Expand All @@ -68,13 +74,7 @@ fragment DataObjectByBagIdsDetails on StorageDataObject {
}
}

query getDataObjectsByBagIds($bagIds: StorageBagWhereInput) {
storageDataObjects(where: { storageBag: $bagIds, isAccepted_eq: true }) {
...DataObjectByBagIdsDetails
}
}

fragment DataObjectDetails on StorageDataObject {
fragment DataObjectWithBagDetails on StorageDataObject {
id
isAccepted
ipfsHash
Expand All @@ -83,9 +83,63 @@ fragment DataObjectDetails on StorageDataObject {
}
}

query getDataObjects($dataObjectIds: [String!]) {
storageDataObjects(where: { id_in: $dataObjectIds }) {
...DataObjectDetails
query dataObjectsByBagsConnection(
$bagIds: [String!]
$limit: Int
$after: String
$includeDetails: Boolean!
$isAccepted: Boolean
) {
storageDataObjectsConnection(
where: { storageBag: { id_in: $bagIds }, isAccepted_eq: $isAccepted }
first: $limit
after: $after
orderBy: id_ASC
) {
edges {
node {
id
...DataObjectDetails @include(if: $includeDetails)
}
}
pageInfo {
startCursor
endCursor
hasNextPage
}
}
}

query dataObjectsByIdsConnection(
$ids: [String!]
$limit: Int
$after: String
$includeDetails: Boolean!
$isAccepted: Boolean
) {
storageDataObjectsConnection(
where: { id_in: $ids, isAccepted_eq: $isAccepted }
first: $limit
after: $after
orderBy: id_ASC
) {
edges {
node {
id
...DataObjectDetails @include(if: $includeDetails)
}
}
pageInfo {
startCursor
endCursor
hasNextPage
}
}
}

query dataObjectsWithBagDetailsByIds($ids: [String!]) {
storageDataObjects(where: { id_in: $ids }) {
...DataObjectWithBagDetails
}
}

Expand Down
2 changes: 1 addition & 1 deletion storage-node/src/services/sync/acceptPendingObjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export class AcceptPendingObjectsService {
}

private async processPendingObjects(pendingIds: string[]): Promise<PendingObjectDetails> {
const pendingDataObjects = await this.qnApi.getDataObjectDetails(pendingIds)
const pendingDataObjects = await this.qnApi.getDataObjectsWithBagDetails(pendingIds)

// objects not found in the query node
const maybeDeletedObjectIds = pendingIds.filter(
Expand Down
Loading
Loading