diff --git a/storage-node/CHANGELOG.md b/storage-node/CHANGELOG.md index da23df9c92..193e9b6a88 100644 --- a/storage-node/CHANGELOG.md +++ b/storage-node/CHANGELOG.md @@ -1,3 +1,13 @@ +### 4.4.0 + +- **Optimizations:** The way data objects / data object ids are queried and processed during sync and cleanup has been optimized: + - Sync and cleanup services now process tasks in batches of configurable size (`--syncBatchSize`, `--cleanupBatchSize`) to avoid overflowing the memory. + - Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`). + - Enforced a limit of max. results per single GraphQL query to `10,000` and max input arguments per query to `1,000`. + - Added `--cleanupWorkersNumber` flag to limit the number of concurrent async requests during cleanup. +- A safety mechanism was added to avoid removing "deleted" objects for which a related `DataObjectDeleted` event cannot be found in storage squid. +- Improved logging during sync and cleanup. + ### 4.3.0 - Adds `archive` mode / command, which allows downloading, compressing and uploading all data objects to an external S3 bucket that can be used as a backup. diff --git a/storage-node/README.md b/storage-node/README.md index 52fa902d43..e7228f6c62 100644 --- a/storage-node/README.md +++ b/storage-node/README.md @@ -170,6 +170,7 @@ There is also an option to run Colossus as [Docker container](../colossus.Docker * [`storage-node util:cleanup`](#storage-node-utilcleanup) * [`storage-node util:fetch-bucket`](#storage-node-utilfetch-bucket) * [`storage-node util:multihash`](#storage-node-utilmultihash) +* [`storage-node util:search-archives`](#storage-node-utilsearch-archives) * [`storage-node util:verify-bag-id`](#storage-node-utilverify-bag-id) ## `storage-node archive` @@ -181,11 +182,6 @@ USAGE $ storage-node archive OPTIONS - -b, --buckets=buckets - [default: 1] Comma separated list of bucket IDs to sync. Buckets that are not assigned to - worker are ignored. - If not specified all buckets will be synced. - -e, --elasticSearchEndpoint=elasticSearchEndpoint Elasticsearch endpoint (e.g.: http://some.com:8081). Log level could be set using the ELASTIC_LOG_LEVEL environment variable. @@ -210,12 +206,12 @@ OPTIONS [default: 7] Maximum rolling log files number. -p, --password=password - Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. If - not specified a single password can be set in ACCOUNT_PWD environment variable. + Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. If not specified a single + password can be set in ACCOUNT_PWD environment variable. -q, --storageSquidEndpoint=storageSquidEndpoint - (required) [default: http://localhost:4352/graphql] Storage Squid graphql server endpoint - (e.g.: http://some.com:4352/graphql) + (required) [default: http://localhost:4352/graphql] Storage Squid graphql server endpoint (e.g.: + http://some.com:4352/graphql) -r, --syncWorkersNumber=syncWorkersNumber [default: 8] Sync workers number (max async operations in progress). @@ -233,19 +229,17 @@ OPTIONS [default: 50000000] Maximum rolling log files size in bytes. -y, --accountUri=accountUri - Account URI (optional). If not specified a single key can be set in ACCOUNT_URI environment - variable. + Account URI (optional). If not specified a single key can be set in ACCOUNT_URI environment variable. -z, --logFileChangeFrequency=(yearly|monthly|daily|hourly|none) [default: daily] Log files update frequency. --archiveFileSizeLimitMB=archiveFileSizeLimitMB - [default: 1000] Try to avoid creating archive files larger than this size limit (in MB) - unless necessary + [default: 1000] Try to avoid creating archive files larger than this size limit (in MB) unless unaviodable. --archiveTrackfileBackupFreqMinutes=archiveTrackfileBackupFreqMinutes - [default: 60] Determines how frequently the archive tracking file (containing information - about .7z files content) should be uploaded to S3 in case a change is detected. + [default: 60] Specifies how frequently the archive tracking file (containing information about archive files + content) should be uploaded to S3 (in case it's changed). --awsS3BucketName=awsS3BucketName (required) Name of the AWS S3 bucket where the files will be stored. @@ -253,52 +247,65 @@ OPTIONS --awsS3BucketRegion=awsS3BucketRegion (required) AWS region of the AWS S3 bucket where the files will be stored. + --awsStorageClass=(DEEP_ARCHIVE|EXPRESS_ONEZONE|GLACIER|GLACIER_IR|INTELLIGENT_TIERING|ONEZONE_IA|OUTPOSTS|REDUCED_RED + UNDANCY|SNOW|STANDARD|STANDARD_IA) + (required) [default: DEEP_ARCHIVE] AWS S3 storage class to use when uploading the archives to S3. + + --compressionAlgorithm=(7zip|zstd|none) + (required) [default: zstd] Compression algorithm to use for archive files + + --compressionLevel=(low|medium|high) + (required) [default: medium] Compression level to use for archive files (lower is faster, but provides lower storage + savings) + + --compressionThreads=compressionThreads + (required) [default: 1] Number of threads to use for compression. Note that {uploadWorkersNumber} upload tasks may + be running at once and each of them can spawn a separate compression task which uses {compressionThreads} threads! + --elasticSearchIndexPrefix=elasticSearchIndexPrefix - [default: logs-colossus] Elasticsearch index prefix. Node ID will be appended to the prefix. - Default: logs-colossus. Can be passed through ELASTIC_INDEX_PREFIX environment variable. + [default: logs-colossus] Elasticsearch index prefix. Node ID will be appended to the prefix. Default: logs-colossus. + Can be passed through ELASTIC_INDEX_PREFIX environment variable. --elasticSearchPassword=elasticSearchPassword - Elasticsearch password for basic authentication. Can be passed through ELASTIC_PASSWORD - environment variable. + Elasticsearch password for basic authentication. Can be passed through ELASTIC_PASSWORD environment variable. --elasticSearchUser=elasticSearchUser - Elasticsearch user for basic authentication. Can be passed through ELASTIC_USER environment - variable. + Elasticsearch user for basic authentication. Can be passed through ELASTIC_USER environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. --localAgeTriggerThresholdMinutes=localAgeTriggerThresholdMinutes - [default: 1440] Compress and upload local data objects to S3 if the oldest of them was - downloaded more than X minutes ago + [default: 1440] Compress and upload all local data objects to S3 if the oldest of them was downloaded more than X + minutes ago --localCountTriggerThreshold=localCountTriggerThreshold - Compress and upload local data objects to S3 if the number of them reaches this threshold. + Compress and upload all local data objects to S3 if the number of them reaches this threshold. --localSizeTriggerThresholdMB=localSizeTriggerThresholdMB - [default: 10000] Compress and upload local data objects to S3 if the combined size of them - reaches this threshold (in MB) + [default: 10000] Compress and upload all local data objects to S3 if the combined size of them reaches this + threshold (in MB) + + --statsLoggingInterval=statsLoggingInterval + (required) [default: 60] How often the upload/download/compression statistics summary will be logged (in minutes). --tmpDownloadDir=tmpDownloadDir - (required) Directory to store tempory files during sync (absolute path). + (required) Directory to store temporary data (downloads in progress) during sync (absolute path). --uploadQueueDir=uploadQueueDir - (required) Directory to store fully downloaded data objects before compressing them and - uploading to S3 (absolute path). + (required) Directory to store fully downloaded data objects before compressing them and uploading to S3 (absolute + path). --uploadQueueDirSizeLimitMB=uploadQueueDirSizeLimitMB - (required) [default: 20000] Limits the total size of files stored in upload queue directory - (in MB). Download of the new objects will be limitted in order to prevent exceeding this - limit. To leave a safe margin of error, it should be set to ~50% of available disk space. - - --uploadRetryInterval=uploadRetryInterval - [default: 3] Interval before retrying failed upload (in minutes) + (required) [default: 20000] Limits the total size of files stored in upload queue directory (in MB). Download of the + new objects may be slowed down in order to try to prevent exceeding this limit. WARNING: To leave a safe margin of + error (for compression etc.), it should be set to ~50% of available disk space. --uploadWorkersNumber=uploadWorkersNumber [default: 4] Upload workers number (max async operations in progress). ``` -_See code: [src/commands/archive.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/archive.ts)_ +_See code: [src/commands/archive.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/archive.ts)_ ## `storage-node help [COMMAND]` @@ -331,20 +338,18 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/cancel-invite.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/cancel-invite.ts)_ +_See code: [src/commands/leader/cancel-invite.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/cancel-invite.ts)_ ## `storage-node leader:create-bucket` @@ -362,22 +367,20 @@ OPTIONS -m, --dev Use development mode -n, --number=number Storage bucket max total objects number - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. -s, --size=size Storage bucket max total objects size - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/create-bucket.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/create-bucket.ts)_ +_See code: [src/commands/leader/create-bucket.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/create-bucket.ts)_ ## `storage-node leader:delete-bucket` @@ -393,20 +396,18 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/delete-bucket.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/delete-bucket.ts)_ +_See code: [src/commands/leader/delete-bucket.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/delete-bucket.ts)_ ## `storage-node leader:invite-operator` @@ -422,22 +423,20 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. -w, --operatorId=operatorId (required) Storage bucket operator ID (storage group worker ID) - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/invite-operator.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/invite-operator.ts)_ +_See code: [src/commands/leader/invite-operator.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/invite-operator.ts)_ ## `storage-node leader:remove-operator` @@ -453,20 +452,18 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/remove-operator.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/remove-operator.ts)_ +_See code: [src/commands/leader/remove-operator.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/remove-operator.ts)_ ## `storage-node leader:set-bucket-limits` @@ -483,22 +480,20 @@ OPTIONS -m, --dev Use development mode -o, --objects=objects (required) New 'voucher object number limit' value - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. -s, --size=size (required) New 'voucher object size limit' value - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/set-bucket-limits.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/set-bucket-limits.ts)_ +_See code: [src/commands/leader/set-bucket-limits.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/set-bucket-limits.ts)_ ## `storage-node leader:set-global-uploading-status` @@ -513,22 +508,20 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. -s, --set=(on|off) (required) Sets global uploading block (on/off). - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/set-global-uploading-status.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/set-global-uploading-status.ts)_ +_See code: [src/commands/leader/set-global-uploading-status.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/set-global-uploading-status.ts)_ ## `storage-node leader:update-bag-limit` @@ -544,20 +537,18 @@ OPTIONS -l, --limit=limit (required) New StorageBucketsPerBagLimit value -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-bag-limit.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/update-bag-limit.ts)_ +_See code: [src/commands/leader/update-bag-limit.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-bag-limit.ts)_ ## `storage-node leader:update-bags` @@ -594,8 +585,8 @@ OPTIONS Use development mode -p, --password=password - Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. If - not specified a single password can be set in ACCOUNT_PWD environment variable. + Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. If not specified a single + password can be set in ACCOUNT_PWD environment variable. -r, --remove=remove [default: ] Comma separated list of bucket IDs to remove from all bag/s @@ -607,14 +598,13 @@ OPTIONS [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. -y, --accountUri=accountUri - Account URI (optional). If not specified a single key can be set in ACCOUNT_URI environment - variable. + Account URI (optional). If not specified a single key can be set in ACCOUNT_URI environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-bags.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/update-bags.ts)_ +_See code: [src/commands/leader/update-bags.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-bags.ts)_ ## `storage-node leader:update-blacklist` @@ -630,22 +620,20 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. -r, --remove=remove [default: ] Content ID to remove - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-blacklist.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/update-blacklist.ts)_ +_See code: [src/commands/leader/update-blacklist.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-blacklist.ts)_ ## `storage-node leader:update-bucket-status` @@ -661,23 +649,20 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -s, --set=(on|off) (required) Sets 'accepting new bags' parameter for the bucket - (on/off). + -s, --set=(on|off) (required) Sets 'accepting new bags' parameter for the bucket (on/off). - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-bucket-status.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/update-bucket-status.ts)_ +_See code: [src/commands/leader/update-bucket-status.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-bucket-status.ts)_ ## `storage-node leader:update-data-fee` @@ -693,20 +678,18 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-data-fee.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/update-data-fee.ts)_ +_See code: [src/commands/leader/update-data-fee.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-data-fee.ts)_ ## `storage-node leader:update-data-object-bloat-bond` @@ -721,22 +704,20 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. -v, --value=value (required) New data object bloat bond value - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-data-object-bloat-bond.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/update-data-object-bloat-bond.ts)_ +_See code: [src/commands/leader/update-data-object-bloat-bond.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-data-object-bloat-bond.ts)_ ## `storage-node leader:update-dynamic-bag-policy` @@ -752,23 +733,21 @@ OPTIONS -m, --dev Use development mode -n, --number=number (required) New storage buckets number - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, - to try against all files. If not specified a single password - can be set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all + files. If not specified a single password can be set in ACCOUNT_PWD environment + variable. -t, --bagType=(Channel|Member) (required) Dynamic bag type (Channel, Member). - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be - set in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. - --keyStore=keyStore Path to a folder with multiple key files to load into - keystore. + --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-dynamic-bag-policy.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/update-dynamic-bag-policy.ts)_ +_See code: [src/commands/leader/update-dynamic-bag-policy.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-dynamic-bag-policy.ts)_ ## `storage-node leader:update-voucher-limits` @@ -784,22 +763,20 @@ OPTIONS -m, --dev Use development mode -o, --objects=objects (required) New 'max voucher object number limit' value - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. -s, --size=size (required) New 'max voucher object size limit' value - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-voucher-limits.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/update-voucher-limits.ts)_ +_See code: [src/commands/leader/update-voucher-limits.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-voucher-limits.ts)_ ## `storage-node operator:accept-invitation` @@ -815,27 +792,24 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords - can be passed, to try against all files. If not - specified a single password can be set in + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try + against all files. If not specified a single password can be set in ACCOUNT_PWD environment variable. -t, --transactorAccountId=transactorAccountId (required) Transactor account ID (public key) - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. - Mandatory in non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev + environment. -w, --workerId=workerId (required) Storage operator worker ID - -y, --accountUri=accountUri Account URI (optional). If not specified a - single key can be set in ACCOUNT_URI - environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in + ACCOUNT_URI environment variable. - --keyStore=keyStore Path to a folder with multiple key files to - load into keystore. + --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/operator/accept-invitation.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/operator/accept-invitation.ts)_ +_See code: [src/commands/operator/accept-invitation.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/operator/accept-invitation.ts)_ ## `storage-node operator:set-metadata` @@ -853,22 +827,20 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. -w, --workerId=workerId (required) Storage operator worker ID - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/operator/set-metadata.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/operator/set-metadata.ts)_ +_See code: [src/commands/operator/set-metadata.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/operator/set-metadata.ts)_ ## `storage-node server` @@ -879,109 +851,102 @@ USAGE $ storage-node server OPTIONS - -b, --buckets=buckets - [default: ] Comma separated list of bucket IDs to service. Buckets that are not assigned to - worker are ignored. If not specified all buckets will be serviced. + -b, --buckets=buckets [default: ] Comma separated list of bucket IDs to + service. Buckets that are not assigned to worker are + ignored. If not specified all buckets will be + serviced. - -c, --cleanup - Enable cleanup/pruning of no-longer assigned assets. + -c, --cleanup Enable cleanup/pruning of no-longer assigned assets. - -d, --uploads=uploads - (required) Data uploading directory (absolute path). + -d, --uploads=uploads (required) Data uploading directory (absolute path). - -e, --elasticSearchEndpoint=elasticSearchEndpoint - Elasticsearch endpoint (e.g.: http://some.com:8081). - Log level could be set using the ELASTIC_LOG_LEVEL environment variable. - Supported values: warn, error, debug, info. Default:debug + -e, --elasticSearchEndpoint=elasticSearchEndpoint Elasticsearch endpoint (e.g.: http://some.com:8081). + Log level could be set using the ELASTIC_LOG_LEVEL + environment variable. + Supported values: warn, error, debug, info. + Default:debug - -h, --help - show CLI help + -h, --help show CLI help - -i, --cleanupInterval=cleanupInterval - [default: 360] Interval between periodic cleanup actions (in minutes) + -i, --cleanupInterval=cleanupInterval [default: 360] Interval between periodic cleanup + actions (in minutes) - -i, --syncInterval=syncInterval - [default: 20] Interval between synchronizations (in minutes) + -i, --syncInterval=syncInterval [default: 20] Interval between synchronizations (in + minutes) - -k, --keyFile=keyFile - Path to key file to add to the keyring. + -k, --keyFile=keyFile Path to key file to add to the keyring. - -l, --logFilePath=logFilePath - Absolute path to the rolling log files. + -l, --logFilePath=logFilePath Absolute path to the rolling log files. - -m, --dev - Use development mode + -m, --dev Use development mode - -n, --logMaxFileNumber=logMaxFileNumber - [default: 7] Maximum rolling log files number. + -n, --logMaxFileNumber=logMaxFileNumber [default: 7] Maximum rolling log files number. - -o, --port=port - (required) Server port. + -o, --port=port (required) Server port. - -p, --password=password - Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. If - not specified a single password can be set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can + be passed, to try against all files. If not specified + a single password can be set in ACCOUNT_PWD + environment variable. - -q, --storageSquidEndpoint=storageSquidEndpoint - (required) [default: http://localhost:4352/graphql] Storage Squid graphql server endpoint - (e.g.: http://some.com:4352/graphql) + -q, --storageSquidEndpoint=storageSquidEndpoint (required) [default: http://localhost:4352/graphql] + Storage Squid graphql server endpoint (e.g.: + http://some.com:4352/graphql) - -r, --syncWorkersNumber=syncWorkersNumber - [default: 20] Sync workers number (max async operations in progress). + -r, --syncWorkersNumber=syncWorkersNumber [default: 20] Sync workers number (max async + operations in progress). - -s, --sync - Enable data synchronization. + -s, --sync Enable data synchronization. - -t, --syncWorkersTimeout=syncWorkersTimeout - [default: 30] Asset downloading timeout for the syncronization (in minutes). + -t, --syncWorkersTimeout=syncWorkersTimeout [default: 30] Asset downloading timeout for the + syncronization (in minutes). - -u, --apiUrl=apiUrl - [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. + Mandatory in non-dev environment. - -w, --worker=worker - (required) Storage provider worker ID + -w, --worker=worker (required) Storage provider worker ID - -x, --logMaxFileSize=logMaxFileSize - [default: 50000000] Maximum rolling log files size in bytes. + -x, --logMaxFileSize=logMaxFileSize [default: 50000000] Maximum rolling log files size in + bytes. - -y, --accountUri=accountUri - Account URI (optional). If not specified a single key can be set in ACCOUNT_URI environment - variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key + can be set in ACCOUNT_URI environment variable. - -z, --logFileChangeFrequency=(yearly|monthly|daily|hourly|none) - [default: daily] Log files update frequency. + -z, --logFileChangeFrequency=(yearly|monthly|daily|hourly|none) [default: daily] Log files update frequency. - --elasticSearchIndexPrefix=elasticSearchIndexPrefix - Elasticsearch index prefix. Node ID will be appended to the prefix. Default: logs-colossus. - Can be passed through ELASTIC_INDEX_PREFIX environment variable. + --elasticSearchIndexPrefix=elasticSearchIndexPrefix Elasticsearch index prefix. Node ID will be appended + to the prefix. Default: logs-colossus. Can be passed + through ELASTIC_INDEX_PREFIX environment variable. - --elasticSearchPassword=elasticSearchPassword - Elasticsearch password for basic authentication. Can be passed through ELASTIC_PASSWORD - environment variable. + --elasticSearchPassword=elasticSearchPassword Elasticsearch password for basic authentication. Can + be passed through ELASTIC_PASSWORD environment + variable. - --elasticSearchUser=elasticSearchUser - Elasticsearch user for basic authentication. Can be passed through ELASTIC_USER environment - variable. + --elasticSearchUser=elasticSearchUser Elasticsearch user for basic authentication. Can be + passed through ELASTIC_USER environment variable. - --keyStore=keyStore - Path to a folder with multiple key files to load into keystore. + --keyStore=keyStore Path to a folder with multiple key files to load into + keystore. - --maxBatchTxSize=maxBatchTxSize - [default: 20] Maximum number of `accept_pending_data_objects` in a batch transactions. + --maxBatchTxSize=maxBatchTxSize [default: 20] Maximum number of + `accept_pending_data_objects` in a batch + transactions. - --pendingFolder=pendingFolder - Directory to store pending files which are being uploaded (absolute path). - If not specified a subfolder under the uploads directory will be used. + --pendingFolder=pendingFolder Directory to store pending files which are being + uploaded (absolute path). + If not specified a subfolder under the uploads + directory will be used. - --syncRetryInterval=syncRetryInterval - [default: 3] Interval before retrying failed synchronization run (in minutes) + --syncRetryInterval=syncRetryInterval [default: 3] Interval before retrying failed + synchronization run (in minutes) - --tempFolder=tempFolder - Directory to store tempory files during sync (absolute path). - If not specified a subfolder under the uploads directory will be used. + --tempFolder=tempFolder Directory to store tempory files during sync + (absolute path). + If not specified a subfolder under the uploads + directory will be used. ``` -_See code: [src/commands/server.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/server.ts)_ +_See code: [src/commands/server.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/server.ts)_ ## `storage-node util:cleanup` @@ -993,42 +958,33 @@ USAGE OPTIONS -b, --bucketId=bucketId (required) The buckerId to sync prune/cleanup - - -d, --uploads=uploads (required) Data uploading directory (absolute - path). - + -d, --uploads=uploads (required) Data uploading directory (absolute path). -h, --help show CLI help - -k, --keyFile=keyFile Path to key file to add to the keyring. - -m, --dev Use development mode - -p, --cleanupWorkersNumber=cleanupWorkersNumber [default: 20] Cleanup/Pruning workers number - (max async operations in progress). + -p, --cleanupWorkersNumber=cleanupWorkersNumber [default: 20] Cleanup/Pruning workers number (max async operations in + progress). - -p, --password=password Password to unlock keyfiles. Multiple - passwords can be passed, to try against all - files. If not specified a single password can - be set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try + against all files. If not specified a single password can be set in + ACCOUNT_PWD environment variable. - -q, --queryNodeEndpoint=queryNodeEndpoint [default: http://localhost:4352/graphql] - Storage Squid graphql server endpoint (e.g.: - http://some.com:4352/graphql) + -q, --queryNodeEndpoint=queryNodeEndpoint [default: http://localhost:4352/graphql] Storage Squid graphql server + endpoint (e.g.: http://some.com:4352/graphql) - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API - URL. Mandatory in non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev + environment. -w, --workerId=workerId (required) Storage node operator worker ID. - -y, --accountUri=accountUri Account URI (optional). If not specified a - single key can be set in ACCOUNT_URI - environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in + ACCOUNT_URI environment variable. - --keyStore=keyStore Path to a folder with multiple key files to - load into keystore. + --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/util/cleanup.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/util/cleanup.ts)_ +_See code: [src/commands/util/cleanup.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/util/cleanup.ts)_ ## `storage-node util:fetch-bucket` @@ -1040,33 +996,28 @@ USAGE OPTIONS -b, --bucketId=bucketId (required) The buckerId to fetch - - -d, --uploads=uploads (required) Data uploading directory - (absolute path). - + -d, --uploads=uploads (required) Data uploading directory (absolute path). -h, --help show CLI help - -n, --syncWorkersNumber=syncWorkersNumber [default: 20] Sync workers number (max - async operations in progress). + -n, --syncWorkersNumber=syncWorkersNumber [default: 20] Sync workers number (max async operations in + progress). - -o, --dataSourceOperatorUrl=dataSourceOperatorUrl Storage node url base (e.g.: - http://some.com:3333) to get data from. + -o, --dataSourceOperatorUrl=dataSourceOperatorUrl Storage node url base (e.g.: http://some.com:3333) to get data + from. - -q, --queryNodeEndpoint=queryNodeEndpoint [default: http://localhost:4352/graphql] - Storage Squid graphql server endpoint - (e.g.: http://some.com:4352/graphql) + -q, --queryNodeEndpoint=queryNodeEndpoint [default: http://localhost:4352/graphql] Storage Squid graphql + server endpoint (e.g.: http://some.com:4352/graphql) - -t, --syncWorkersTimeout=syncWorkersTimeout [default: 30] Asset downloading timeout for - the syncronization (in minutes). + -t, --syncWorkersTimeout=syncWorkersTimeout [default: 30] Asset downloading timeout for the syncronization (in + minutes). - --tempFolder=tempFolder Directory to store tempory files during - sync and upload (absolute path). - ,Temporary directory (absolute path). If - not specified a subfolder under the uploads - directory will be used. + --tempFolder=tempFolder Directory to store tempory files during sync and upload (absolute + path). + ,Temporary directory (absolute path). If not specified a subfolder + under the uploads directory will be used. ``` -_See code: [src/commands/util/fetch-bucket.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/util/fetch-bucket.ts)_ +_See code: [src/commands/util/fetch-bucket.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/util/fetch-bucket.ts)_ ## `storage-node util:multihash` @@ -1081,7 +1032,24 @@ OPTIONS -h, --help show CLI help ``` -_See code: [src/commands/util/multihash.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/util/multihash.ts)_ +_See code: [src/commands/util/multihash.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/util/multihash.ts)_ + +## `storage-node util:search-archives` + +Searches for the archive file names given an archive trackfile and a list of data objects of interest. + +``` +USAGE + $ storage-node util:search-archives + +OPTIONS + -f, --archiveTrackfile=archiveTrackfile (required) Path to the archive trackfile (jsonl) + -j, --json Output as JSON + -n, --nameOnly Output only the archive names + -o, --dataObjects=dataObjects (required) List of the data object ids to look for (comma-separated) +``` + +_See code: [src/commands/util/search-archives.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/util/search-archives.ts)_ ## `storage-node util:verify-bag-id` @@ -1109,5 +1077,5 @@ OPTIONS - dynamic:member:4 ``` -_See code: [src/commands/util/verify-bag-id.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/util/verify-bag-id.ts)_ +_See code: [src/commands/util/verify-bag-id.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/util/verify-bag-id.ts)_ diff --git a/storage-node/package.json b/storage-node/package.json index a7e782ad41..fb5127f42d 100644 --- a/storage-node/package.json +++ b/storage-node/package.json @@ -1,7 +1,7 @@ { "name": "storage-node", "description": "Joystream storage subsystem.", - "version": "4.3.0", + "version": "4.4.0", "author": "Joystream contributors", "bin": { "storage-node": "./bin/run" @@ -54,6 +54,7 @@ "multihashes": "^4.0.2", "node-cache": "^5.1.2", "openapi-editor": "^0.3.0", + "p-limit": "^3", "promise-timeout": "^1.3.0", "proper-lockfile": "^4.1.2", "react": "^18.2.0", diff --git a/storage-node/src/commands/server.ts b/storage-node/src/commands/server.ts index 6f61b38fc9..62be3c75cc 100644 --- a/storage-node/src/commands/server.ts +++ b/storage-node/src/commands/server.ts @@ -78,16 +78,29 @@ export default class Server extends ApiCommandBase { description: 'Interval before retrying failed synchronization run (in minutes)', default: 3, }), + syncBatchSize: flags.integer({ + description: 'Maximum number of objects to process in a single batch during synchronization.', + default: 10_000, + }), cleanup: flags.boolean({ char: 'c', description: 'Enable cleanup/pruning of no-longer assigned assets.', default: false, }), + cleanupBatchSize: flags.integer({ + description: 'Maximum number of objects to process in a single batch during cleanup.', + default: 10_000, + }), cleanupInterval: flags.integer({ char: 'i', description: 'Interval between periodic cleanup actions (in minutes)', default: 360, }), + cleanupWorkersNumber: flags.integer({ + required: false, + description: 'Cleanup workers number (max async operations in progress).', + default: 100, + }), storageSquidEndpoint: flags.string({ char: 'q', required: true, @@ -299,6 +312,7 @@ Supported values: warn, error, debug, info. Default:debug`, flags.syncWorkersTimeout, flags.syncInterval, flags.syncRetryInterval, + flags.syncBatchSize, X_HOST_ID ), 0 @@ -319,8 +333,9 @@ Supported values: warn, error, debug, info. Default:debug`, api, qnApi, flags.uploads, - flags.syncWorkersNumber, + flags.cleanupWorkersNumber, flags.cleanupInterval, + flags.cleanupBatchSize, X_HOST_ID ), 0 @@ -397,6 +412,7 @@ async function runSyncWithInterval( syncWorkersTimeout: number, syncIntervalMinutes: number, syncRetryIntervalMinutes: number, + syncBatchSize: number, hostId: string ) { const sleepInterval = syncIntervalMinutes * 60 * 1000 @@ -404,7 +420,16 @@ async function runSyncWithInterval( while (true) { try { logger.info(`Resume syncing....`) - await performSync(buckets, syncWorkersNumber, syncWorkersTimeout, qnApi, uploadsDirectory, tempDirectory, hostId) + await performSync( + buckets, + syncWorkersNumber, + syncWorkersTimeout, + qnApi, + uploadsDirectory, + tempDirectory, + syncBatchSize, + hostId + ) logger.info(`Sync run complete. Next run in ${syncIntervalMinutes} minute(s).`) await sleep(sleepInterval) } catch (err) { @@ -434,6 +459,7 @@ async function runCleanupWithInterval( uploadsDirectory: string, syncWorkersNumber: number, cleanupIntervalMinutes: number, + cleanupBatchSize: number, hostId: string ) { const sleepInterval = cleanupIntervalMinutes * 60 * 1000 @@ -442,7 +468,7 @@ async function runCleanupWithInterval( await sleep(sleepInterval) try { logger.info(`Resume cleanup....`) - await performCleanup(buckets, syncWorkersNumber, api, qnApi, uploadsDirectory, hostId) + await performCleanup(buckets, syncWorkersNumber, api, qnApi, uploadsDirectory, cleanupBatchSize, hostId) } catch (err) { logger.error(`Critical cleanup error: ${err}`) } diff --git a/storage-node/src/commands/util/cleanup.ts b/storage-node/src/commands/util/cleanup.ts index 6ebde5d051..d1b5cffba7 100644 --- a/storage-node/src/commands/util/cleanup.ts +++ b/storage-node/src/commands/util/cleanup.ts @@ -27,6 +27,10 @@ export default class Cleanup extends ApiCommandBase { required: true, description: 'The buckerId to sync prune/cleanup', }), + cleanupBatchSize: flags.integer({ + description: 'Maximum number of objects to process in a single batch during cleanup.', + default: 10_000, + }), cleanupWorkersNumber: flags.integer({ char: 'p', required: false, @@ -57,7 +61,15 @@ export default class Cleanup extends ApiCommandBase { logger.info('Cleanup...') try { - await performCleanup([bucketId], flags.cleanupWorkersNumber, api, qnApi, flags.uploads, '') + await performCleanup( + [bucketId], + flags.cleanupWorkersNumber, + api, + qnApi, + flags.uploads, + flags.cleanupBatchSize, + '' + ) } catch (err) { logger.error(err) logger.error(stringify(err)) diff --git a/storage-node/src/commands/util/fetch-bucket.ts b/storage-node/src/commands/util/fetch-bucket.ts index 8be873cd7e..1bfc097bb4 100644 --- a/storage-node/src/commands/util/fetch-bucket.ts +++ b/storage-node/src/commands/util/fetch-bucket.ts @@ -36,6 +36,10 @@ export default class FetchBucket extends Command { description: 'Asset downloading timeout for the syncronization (in minutes).', default: 30, }), + syncBatchSize: flags.integer({ + description: 'Maximum number of objects to process in a single batch.', + default: 10_000, + }), queryNodeEndpoint: flags.string({ char: 'q', required: false, @@ -74,6 +78,7 @@ export default class FetchBucket extends Command { qnApi, flags.uploads, flags.tempFolder ? flags.tempFolder : path.join(flags.uploads, 'temp'), + flags.syncBatchSize, '', flags.dataSourceOperatorUrl ) diff --git a/storage-node/src/services/archive/ArchiveService.ts b/storage-node/src/services/archive/ArchiveService.ts index 9ed7dd1bd6..0d1cdedced 100644 --- a/storage-node/src/services/archive/ArchiveService.ts +++ b/storage-node/src/services/archive/ArchiveService.ts @@ -13,7 +13,7 @@ import { OBJECTS_TRACKING_FILENAME, } from './tracking' import { QueryNodeApi } from '../queryNode/api' -import { getStorageObligationsFromRuntime } from '../sync/storageObligations' +import { getDataObjectsByIDs, getStorageObligationsFromRuntime } from '../sync/storageObligations' import { getDownloadTasks } from '../sync/synchronizer' import sleep from 'sleep-promise' import { Logger } from 'winston' @@ -369,40 +369,49 @@ export class ArchiveService { public async performSync(): Promise { const model = await getStorageObligationsFromRuntime(this.queryNodeApi) - const assignedObjects = model.dataObjects - const added = assignedObjects.filter((obj) => !this.objectTrackingService.isTracked(obj.id)) - added.sort((a, b) => parseInt(b.id) - parseInt(a.id)) + const unsyncedIds = (await model.getAssignedDataObjectIds(true)) + .filter((id) => !this.objectTrackingService.isTracked(id)) + .map((id) => parseInt(id)) + // Sort unsynced ids in ASCENDING order (oldest first) + .sort((a, b) => a - b) - this.logger.info(`Sync - new objects: ${added.length}`) + this.logger.info(`Sync - new objects: ${unsyncedIds.length}`) - // Add new download tasks while the upload dir size limit allows - while (added.length) { - const uploadDirectorySize = await this.getUploadDirSize() - while (true) { - const object = added.pop() - if (!object) { - break - } - if (object.size + uploadDirectorySize + this.syncQueueObjectsSize > this.uploadDirSizeLimit) { - this.logger.debug( - `Waiting for some disk space to free ` + - `(upload_dir: ${uploadDirectorySize} / ${this.uploadDirSizeLimit}, ` + - `sync_q=${this.syncQueueObjectsSize}, obj_size=${object.size})... ` + // Sync objects in batches of 10_000 + for (const unsyncedIdsBatch of _.chunk(unsyncedIds, 10_000)) { + const objectIdsBatch = unsyncedIdsBatch.map((id) => id.toString()) + // Sort objectsBatch by ids in DESCENDING order (because we're using .pop() to get the next object) + const objectsBatch = (await getDataObjectsByIDs(this.queryNodeApi, objectIdsBatch)).sort( + (a, b) => parseInt(b.id) - parseInt(a.id) + ) + // Add new download tasks while the upload dir size limit allows + while (objectsBatch.length) { + const uploadDirectorySize = await this.getUploadDirSize() + while (true) { + const object = objectsBatch.pop() + if (!object) { + break + } + if (object.size + uploadDirectorySize + this.syncQueueObjectsSize > this.uploadDirSizeLimit) { + this.logger.debug( + `Waiting for some disk space to free ` + + `(upload_dir: ${uploadDirectorySize} / ${this.uploadDirSizeLimit}, ` + + `sync_q=${this.syncQueueObjectsSize}, obj_size=${object.size})... ` + ) + objectsBatch.push(object) + await sleep(60_000) + break + } + const [downloadTask] = await getDownloadTasks( + model, + [object], + this.uploadQueueDir, + this.tmpDownloadDir, + this.syncWorkersTimeout, + this.hostId ) - added.push(object) - await sleep(60_000) - break + await this.addDownloadTask(downloadTask, object.size) } - const [downloadTask] = await getDownloadTasks( - model, - [], - [object], - this.uploadQueueDir, - this.tmpDownloadDir, - this.syncWorkersTimeout, - this.hostId - ) - await this.addDownloadTask(downloadTask, object.size) } } } diff --git a/storage-node/src/services/queryNode/api.ts b/storage-node/src/services/queryNode/api.ts index 8fb896dafa..0ab027b609 100644 --- a/storage-node/src/services/queryNode/api.ts +++ b/storage-node/src/services/queryNode/api.ts @@ -4,20 +4,29 @@ import fetch from 'cross-fetch' import stringify from 'fast-safe-stringify' import logger from '../logger' import { - DataObjectByBagIdsDetailsFragment, DataObjectDetailsFragment, + DataObjectIdsByBagId, + DataObjectIdsByBagIdQuery, + DataObjectIdsByBagIdQueryVariables, + DataObjectIdsByBagIdsConnection, + DataObjectIdsByBagIdsConnectionQuery, + DataObjectIdsByBagIdsConnectionQueryVariables, + DataObjectDetailsByIds, + DataObjectDetailsByIdsQuery, + DataObjectIdsByIdsQueryVariables, + DataObjectIdsByIds, + DataObjectIdsByIdsQuery, + DataObjectDetailsByIdsQueryVariables, + DataObjectsWithBagDetailsByIds, + DataObjectsWithBagDetailsByIdsQuery, + DataObjectsWithBagDetailsByIdsQueryVariables, + DataObjectWithBagDetailsFragment, GetAllStorageBagDetails, GetAllStorageBagDetailsQuery, GetAllStorageBagDetailsQueryVariables, - GetDataObjects, - GetDataObjectsByBagIds, - GetDataObjectsByBagIdsQuery, - GetDataObjectsByBagIdsQueryVariables, GetDataObjectsDeletedEvents, GetDataObjectsDeletedEventsQuery, GetDataObjectsDeletedEventsQueryVariables, - GetDataObjectsQuery, - GetDataObjectsQueryVariables, GetSquidVersion, GetSquidVersionQuery, GetSquidVersionQueryVariables, @@ -41,16 +50,18 @@ import { StorageBucketDetailsFragment, StorageBucketIdsFragment, } from './generated/queries' -import { Maybe, StorageBagWhereInput } from './generated/schema' +import { Maybe } from './generated/schema' +import _ from 'lodash' /** * Defines query paging limits. */ -export const MAX_RESULTS_PER_QUERY = 1000 +export const MAX_INPUT_ARGS_SIZE = 1_000 +export const MAX_RESULTS_PER_QUERY = 10_000 type PaginationQueryVariables = { - limit: number - lastCursor?: Maybe + limit?: Maybe + after?: Maybe } type PaginationQueryResult = { @@ -128,10 +139,10 @@ export class QueryNodeApi { protected async multipleEntitiesWithPagination< NodeT, QueryT extends { [k: string]: PaginationQueryResult }, - CustomVariablesT extends Record + VariablesT extends PaginationQueryVariables >( query: DocumentNode, - variables: CustomVariablesT, + variables: VariablesT, resultKey: keyof QueryT, itemsPerPage = MAX_RESULTS_PER_QUERY ): Promise { @@ -139,10 +150,9 @@ export class QueryNodeApi { let results: NodeT[] = [] let lastCursor: string | undefined while (hasNextPage) { - const paginationVariables = { limit: itemsPerPage, cursor: lastCursor } + const paginationVariables: PaginationQueryVariables = { limit: itemsPerPage, after: lastCursor } const queryVariables = { ...variables, ...paginationVariables } - logger.debug(`Query - ${String(resultKey)}`) - const result = await this.apolloClient.query({ + const result = await this.apolloClient.query({ query, variables: queryVariables, }) @@ -249,50 +259,108 @@ export class QueryNodeApi { } /** - * Returns data objects info by pages for the given bags. + * Gets a list of all data object ids belonging to provided bags. * * @param bagIds - query filter: bag IDs + * @param isAccepted - query filter: value of isAccepted field (any if not specified) + * @param bagIdsBatchSize - max. size of a single batch of bagIds to query */ - public async getDataObjectsByBagIds(bagIds: string[]): Promise> { - const allBagIds = [...bagIds] // Copy to avoid modifying the original array - let fullResult: DataObjectByBagIdsDetailsFragment[] = [] - while (allBagIds.length) { - const bagIdsBatch = allBagIds.splice(0, 1000) - const input: StorageBagWhereInput = { id_in: bagIdsBatch } - fullResult = [ - ...fullResult, - ...(await this.multipleEntitiesQuery( - GetDataObjectsByBagIds, - { bagIds: input }, - 'storageDataObjects' - )), - ] + public async getDataObjectIdsByBagIds( + bagIds: string[], + isAccepted?: boolean, + bagIdsBatchSize = MAX_INPUT_ARGS_SIZE + ): Promise { + let dataObjectIds: string[] = [] + for (const bagIdsBatch of _.chunk(bagIds, bagIdsBatchSize)) { + const dataObjectIdsBatch = await this.multipleEntitiesWithPagination< + { id: string }, + DataObjectIdsByBagIdsConnectionQuery, + DataObjectIdsByBagIdsConnectionQueryVariables + >( + DataObjectIdsByBagIdsConnection, + { + bagIds: bagIdsBatch, + isAccepted, + }, + 'storageDataObjectsConnection' + ) + dataObjectIds = dataObjectIds.concat(dataObjectIdsBatch.map(({ id }) => id)) } + return dataObjectIds + } - return fullResult + /** + * Gets a list of existing data object ids by the given list of data object ids. + * + * @param ids - query filter: data object ids + * @param batchSize - max. size of a single batch of ids to query + */ + public async getExistingDataObjectsIdsByIds(ids: string[], batchSize = MAX_INPUT_ARGS_SIZE): Promise { + let existingIds: string[] = [] + for (const idsBatch of _.chunk(ids, batchSize)) { + const existingIdsBatch = await this.multipleEntitiesQuery< + DataObjectIdsByIdsQuery, + DataObjectIdsByIdsQueryVariables + >(DataObjectIdsByIds, { ids: idsBatch }, 'storageDataObjects') + existingIds = existingIds.concat(existingIdsBatch.map(({ id }) => id)) + } + return existingIds } /** - * Returns data objects info by pages for the given dataObject IDs. + * Gets a list of data object details by the given list of dataObject IDs. * - * @param dataObjectIds - query filter: dataObject IDs + * @param ids - query filter: data object ids + * @param batchSize - max. size of a single batch of ids to query */ - public async getDataObjectDetails(dataObjectIds: string[]): Promise> { - const allDataObjectIds = [...dataObjectIds] // Copy to avoid modifying the original array - let fullResult: DataObjectDetailsFragment[] = [] - while (allDataObjectIds.length) { - const dataObjectIdsBatch = allDataObjectIds.splice(0, 1000) - fullResult = [ - ...fullResult, - ...(await this.multipleEntitiesQuery( - GetDataObjects, - { dataObjectIds: dataObjectIdsBatch }, - 'storageDataObjects' - )), - ] + public async getDataObjectsDetailsByIds( + ids: string[], + batchSize = MAX_INPUT_ARGS_SIZE + ): Promise { + let dataObjects: DataObjectDetailsFragment[] = [] + for (const idsBatch of _.chunk(ids, batchSize)) { + const dataObjectsBatch = await this.multipleEntitiesQuery< + DataObjectDetailsByIdsQuery, + DataObjectDetailsByIdsQueryVariables + >(DataObjectDetailsByIds, { ids: idsBatch }, 'storageDataObjects') + dataObjects = dataObjects.concat(dataObjectsBatch) } + return dataObjects + } - return fullResult + /** + * Returns a list of data objects by ids, with their corresponding bag details + * + * @param ids - query filter: data object ids + * @param batchSize - max. size of a single batch of ids to query + */ + public async getDataObjectsWithBagDetails( + ids: string[], + batchSize = MAX_INPUT_ARGS_SIZE + ): Promise { + let dataObjects: DataObjectWithBagDetailsFragment[] = [] + for (const idsBatch of _.chunk(ids, batchSize)) { + const dataObjectsBatch = await this.multipleEntitiesQuery< + DataObjectsWithBagDetailsByIdsQuery, + DataObjectsWithBagDetailsByIdsQueryVariables + >(DataObjectsWithBagDetailsByIds, { ids: idsBatch }, 'storageDataObjects') + dataObjects = dataObjects.concat(dataObjectsBatch) + } + return dataObjects + } + + /** + * Returns a list of data object ids that belong to a given bag. + * + * @param bagId - query filter: bag ID + */ + public async getDataObjectIdsByBagId(bagId: string): Promise { + const result = await this.multipleEntitiesQuery( + DataObjectIdsByBagId, + { bagId }, + 'storageDataObjects' + ) + return result.map((o) => o.id) } /** diff --git a/storage-node/src/services/queryNode/queries/queries.graphql b/storage-node/src/services/queryNode/queries/queries.graphql index b29c1c51df..43cd37d62a 100644 --- a/storage-node/src/services/queryNode/queries/queries.graphql +++ b/storage-node/src/services/queryNode/queries/queries.graphql @@ -59,7 +59,13 @@ query getAllStorageBagDetails { } } -fragment DataObjectByBagIdsDetails on StorageDataObject { +query dataObjectIdsByBagId($bagId: String) { + storageDataObjects(where: { storageBag: { id_eq: $bagId } }) { + id + } +} + +fragment DataObjectDetails on StorageDataObject { id size ipfsHash @@ -68,13 +74,7 @@ fragment DataObjectByBagIdsDetails on StorageDataObject { } } -query getDataObjectsByBagIds($bagIds: StorageBagWhereInput) { - storageDataObjects(where: { storageBag: $bagIds, isAccepted_eq: true }) { - ...DataObjectByBagIdsDetails - } -} - -fragment DataObjectDetails on StorageDataObject { +fragment DataObjectWithBagDetails on StorageDataObject { id isAccepted ipfsHash @@ -83,12 +83,45 @@ fragment DataObjectDetails on StorageDataObject { } } -query getDataObjects($dataObjectIds: [String!]) { - storageDataObjects(where: { id_in: $dataObjectIds }) { +query dataObjectIdsByBagIdsConnection($bagIds: [String!], $limit: Int, $after: String, $isAccepted: Boolean) { + storageDataObjectsConnection( + where: { storageBag: { id_in: $bagIds }, isAccepted_eq: $isAccepted } + first: $limit + after: $after + orderBy: id_ASC + ) { + edges { + node { + id + } + } + pageInfo { + startCursor + endCursor + hasNextPage + } + } +} + +# For verifying if data objects still exist +query dataObjectIdsByIds($ids: [String!]) { + storageDataObjects(where: { id_in: $ids }) { + id + } +} + +query dataObjectDetailsByIds($ids: [String!]) { + storageDataObjects(where: { id_in: $ids }) { ...DataObjectDetails } } +query dataObjectsWithBagDetailsByIds($ids: [String!]) { + storageDataObjects(where: { id_in: $ids }) { + ...DataObjectWithBagDetails + } +} + query getDataObjectsDeletedEvents($dataObjectIds: [String!]) { events(where: { data: { isTypeOf_eq: "DataObjectDeletedEventData", dataObjectId_in: $dataObjectIds } }) { data { diff --git a/storage-node/src/services/sync/acceptPendingObjects.ts b/storage-node/src/services/sync/acceptPendingObjects.ts index 170498688d..7b8f1db4b8 100644 --- a/storage-node/src/services/sync/acceptPendingObjects.ts +++ b/storage-node/src/services/sync/acceptPendingObjects.ts @@ -91,7 +91,7 @@ export class AcceptPendingObjectsService { } private async processPendingObjects(pendingIds: string[]): Promise { - const pendingDataObjects = await this.qnApi.getDataObjectDetails(pendingIds) + const pendingDataObjects = await this.qnApi.getDataObjectsWithBagDetails(pendingIds) // objects not found in the query node const maybeDeletedObjectIds = pendingIds.filter( diff --git a/storage-node/src/services/sync/cleanupService.ts b/storage-node/src/services/sync/cleanupService.ts index bfc99e54e6..683f350881 100644 --- a/storage-node/src/services/sync/cleanupService.ts +++ b/storage-node/src/services/sync/cleanupService.ts @@ -3,12 +3,14 @@ import _ from 'lodash' import superagent from 'superagent' import urljoin from 'url-join' import { getDataObjectIDs } from '../../services/caching/localDataObjects' -import logger from '../../services/logger' +import rootLogger from '../../services/logger' import { QueryNodeApi } from '../queryNode/api' -import { DataObjectDetailsFragment } from '../queryNode/generated/queries' -import { DataObligations, getDataObjectsByIDs, getStorageObligationsFromRuntime } from './storageObligations' +import { DataObligations, getStorageObligationsFromRuntime } from './storageObligations' import { DeleteLocalFileTask } from './tasks' import { TaskProcessorSpawner, WorkingStack } from '../processing/workingProcess' +import { DataObjectWithBagDetailsFragment } from '../queryNode/generated/queries' +import { Logger } from 'winston' +import pLimit from 'p-limit' /** * The maximum allowed threshold by which the QN processor can lag behind @@ -40,14 +42,13 @@ export const MINIMUM_REPLICATION_THRESHOLD = parseInt(process.env.CLEANUP_MIN_RE * - If the asset being pruned from this storage-node is currently being downloaded * by some external actors, then the cleanup action for this asset would be postponed * - * @param api - (optional) runtime API promise - * @param workerId - current storage provider ID - * @param buckets - Selected storage buckets - * @param asyncWorkersNumber - maximum parallel downloads number - * @param asyncWorkersTimeout - downloading asset timeout + * @param buckets - selected storage buckets + * @param asyncWorkersNumber - maximum parallel cleanups number + * @param api - runtime API promise * @param qnApi - Query Node API * @param uploadDirectory - local directory to get file names from - * @param tempDirectory - local directory for temporary data uploading + * @param batchSize - max. number of data objects to process in a single batch + * @param hostId */ export async function performCleanup( buckets: string[], @@ -55,8 +56,10 @@ export async function performCleanup( api: ApiPromise, qnApi: QueryNodeApi, uploadDirectory: string, + batchSize: number, hostId: string ): Promise { + const logger = rootLogger.child({ label: 'Cleanup' }) logger.info('Started cleanup service...') const squidStatus = await qnApi.getState() if (!squidStatus || !squidStatus.height) { @@ -77,95 +80,164 @@ export async function performCleanup( const model = await getStorageObligationsFromRuntime(qnApi, buckets) const storedObjectsIds = getDataObjectIDs() - const assignedObjectsIds = model.dataObjects.map((obj) => obj.id) - const removedIds = _.difference(storedObjectsIds, assignedObjectsIds) - const removedObjects = await getDataObjectsByIDs(qnApi, removedIds) - - logger.debug(`Cleanup - stored objects: ${storedObjectsIds.length}, assigned objects: ${assignedObjectsIds.length}`) - logger.debug(`Cleanup - pruning ${removedIds.length} obsolete objects`) - - // Data objects permanently deleted from the runtime - const deletedDataObjects = removedIds.filter( - (removedId) => !removedObjects.some((removedObject) => removedObject.id === removedId) - ) - - // Data objects no-longer assigned to current storage-node - // operated buckets, and have been moved to other buckets - const movedDataObjects = removedObjects - - const workingStack = new WorkingStack() - const processSpawner = new TaskProcessorSpawner(workingStack, asyncWorkersNumber) - - const deletionTasksOfDeletedDataObjects = await Promise.all( - deletedDataObjects.map((dataObject) => new DeleteLocalFileTask(uploadDirectory, dataObject)) - ) - const deletionTasksOfMovedDataObjects = await getDeletionTasksFromMovedDataObjects( - buckets, - uploadDirectory, - model, - movedDataObjects, - hostId - ) - - await workingStack.add(deletionTasksOfDeletedDataObjects) - await workingStack.add(deletionTasksOfMovedDataObjects) - await processSpawner.process() + const assignedObjectIds = new Set(await model.getAssignedDataObjectIds()) + const obsoleteObjectIds = new Set(storedObjectsIds.filter((id) => !assignedObjectIds.has(id))) + + // If objects are obsolete but still exist: They are "moved" objects + const movedObjectIds = new Set(await qnApi.getExistingDataObjectsIdsByIds([...obsoleteObjectIds])) + + // If objects are obsolete and don't exist: They are "deleted objects" + const deletedDataObjectIds = new Set([...obsoleteObjectIds].filter((id) => !movedObjectIds.has(id))) + + logger.info(`stored objects: ${storedObjectsIds.length}, assigned objects: ${assignedObjectIds.size}`) + if (obsoleteObjectIds.size) { + logger.info( + `pruning ${obsoleteObjectIds.size} obsolete objects ` + + `(${movedObjectIds.size} moved, ${deletedDataObjectIds.size} deleted)` + ) + + const workingStack = new WorkingStack() + const processSpawner = new TaskProcessorSpawner(workingStack, asyncWorkersNumber) + + // Execute deleted objects removal tasks in batches + if (deletedDataObjectIds.size) { + let deletedProcessed = 0 + logger.info(`removing ${deletedDataObjectIds.size} deleted objects...`) + for (let deletedObjectsIdsBatch of _.chunk([...deletedDataObjectIds], batchSize)) { + // Confirm whether the objects were actually deleted by fetching the related deletion events + const dataObjectDeletedEvents = await qnApi.getDataObjectDeletedEvents(deletedObjectsIdsBatch) + const confirmedIds = new Set(dataObjectDeletedEvents.map((e) => e.data.dataObjectId)) + deletedObjectsIdsBatch = deletedObjectsIdsBatch.filter((id) => { + if (confirmedIds.has(id)) { + return true + } else { + logger.warn(`Could not find DataObjectDeleted event for object ${id}, skipping from cleanup...`) + return false + } + }) + const deletionTasks = deletedObjectsIdsBatch.map((id) => new DeleteLocalFileTask(uploadDirectory, id)) + await workingStack.add(deletionTasks) + await processSpawner.process() + deletedProcessed += deletedObjectsIdsBatch.length + logger.debug(`${deletedProcessed} / ${deletedDataObjectIds.size} deleted objects processed...`) + } + logger.info(`${deletedProcessed}/${deletedDataObjectIds.size} deleted data objects successfully cleared.`) + } + + // Execute moved objects removal tasks in batches + if (movedObjectIds.size) { + let movedProcessed = 0 + logger.info(`removing ${movedObjectIds.size} moved objects...`) + for (const movedObjectsIdsBatch of _.chunk([...movedObjectIds], batchSize)) { + const movedDataObjectsBatch = await qnApi.getDataObjectsWithBagDetails(movedObjectsIdsBatch) + const deletionTasksOfMovedDataObjects = await getDeletionTasksFromMovedDataObjects( + logger, + uploadDirectory, + model, + movedDataObjectsBatch, + asyncWorkersNumber, + hostId + ) + const numberOfTasks = deletionTasksOfMovedDataObjects.length + if (numberOfTasks !== movedObjectsIdsBatch.length) { + logger.warn( + `Only ${numberOfTasks} / ${movedObjectsIdsBatch.length} moved objects will be removed in this batch...` + ) + } + await workingStack.add(deletionTasksOfMovedDataObjects) + await processSpawner.process() + movedProcessed += numberOfTasks + logger.debug(`${movedProcessed} / ${movedObjectIds.size} moved objects processed...`) + } + logger.info(`${movedProcessed}/${movedObjectIds.size} moved data objects successfully cleared.`) + } + } else { + logger.info('No objects to prune, skipping...') + } + logger.info('Cleanup ended.') } /** * Creates the local file deletion tasks. * - * @param ownBuckets - list of bucket ids operated by this node + * @param logger - cleanup service logger * @param uploadDirectory - local directory for data uploading * @param dataObligations - defines the current data obligations for the node * @param movedDataObjects- obsolete (no longer assigned) data objects that has been moved to other buckets + * @param asyncWorkersNumber - number of async workers assigned for cleanup tasks + * @param hostId - host id of the current node */ async function getDeletionTasksFromMovedDataObjects( - ownBuckets: string[], + logger: Logger, uploadDirectory: string, dataObligations: DataObligations, - movedDataObjects: DataObjectDetailsFragment[], + movedDataObjects: DataObjectWithBagDetailsFragment[], + asyncWorkersNumber: number, hostId: string ): Promise { - const ownOperatorUrls: string[] = [] - for (const entry of dataObligations.storageBuckets) { - if (ownBuckets.includes(entry.id)) { - ownOperatorUrls.push(entry.operatorUrl) + const timeoutMs = 60 * 1000 // 1 minute since it's only a HEAD request + const deletionTasks: DeleteLocalFileTask[] = [] + + const { bucketOperatorUrlById } = dataObligations + const limit = pLimit(asyncWorkersNumber) + let checkedObjects = 0 + const checkReplicationThreshold = async (movedDataObject: DataObjectWithBagDetailsFragment) => { + ++checkedObjects + if (checkedObjects % asyncWorkersNumber === 0) { + logger.debug( + `Checking replication: ${checkedObjects}/${movedDataObjects.length} (active: ${limit.activeCount}, pending: ${limit.pendingCount})` + ) } - } - const bucketOperatorUrlById = new Map() - for (const entry of dataObligations.storageBuckets) { - if (!ownBuckets.includes(entry.id)) { - if (ownOperatorUrls.includes(entry.operatorUrl)) { - logger.warn(`(cleanup) Skipping remote bucket ${entry.id} - ${entry.operatorUrl}`) - } else { - bucketOperatorUrlById.set(entry.id, entry.operatorUrl) + const externaBucketEndpoints = movedDataObject.storageBag.storageBuckets + .map(({ storageBucket: { id } }) => { + return bucketOperatorUrlById.get(id) + }) + .filter((url): url is string => !!url) + let lastErr = '' + let successes = 0 + let failures = 0 + + if (externaBucketEndpoints.length >= MINIMUM_REPLICATION_THRESHOLD) { + for (const nodeUrl of externaBucketEndpoints) { + const fileUrl = urljoin(nodeUrl, 'api/v1/files', movedDataObject.id) + try { + await superagent.head(fileUrl).timeout(timeoutMs).set('X-COLOSSUS-HOST-ID', hostId) + ++successes + } catch (e) { + ++failures + lastErr = e instanceof Error ? e.message : e.toString() + } + if (successes >= MINIMUM_REPLICATION_THRESHOLD) { + break + } } } + + if (successes < MINIMUM_REPLICATION_THRESHOLD) { + logger.debug( + `Replication threshold unmet for object ${movedDataObject.id} ` + + `(buckets: ${externaBucketEndpoints.length}, successes: ${successes}, failures: ${failures}). ` + + (lastErr ? `Last error: ${lastErr}. ` : '') + + `File deletion canceled...` + ) + return + } + + deletionTasks.push(new DeleteLocalFileTask(uploadDirectory, movedDataObject.id)) } - const timeoutMs = 60 * 1000 // 1 minute since it's only a HEAD request - const deletionTasks: DeleteLocalFileTask[] = [] - await Promise.allSettled( - movedDataObjects.map(async (movedDataObject) => { - let dataObjectReplicationCount = 0 - - for (const { storageBucket } of movedDataObject.storageBag.storageBuckets) { - const url = urljoin(bucketOperatorUrlById.get(storageBucket.id), 'api/v1/files', movedDataObject.id) - await superagent.head(url).timeout(timeoutMs).set('X-COLOSSUS-HOST-ID', hostId) - dataObjectReplicationCount++ - } + await Promise.all(movedDataObjects.map((movedDataObject) => limit(() => checkReplicationThreshold(movedDataObject)))) - if (dataObjectReplicationCount < MINIMUM_REPLICATION_THRESHOLD) { - logger.warn(`Cleanup - data object replication threshold unmet - file deletion canceled: ${movedDataObject.id}`) - return - } + const failedCount = movedDataObjects.length - deletionTasks.length + if (failedCount > 0) { + logger.warn( + `Replication threshold was unmet or couldn't be verified for ${failedCount} / ${movedDataObjects.length} objects in the current batch.` + ) + } - deletionTasks.push(new DeleteLocalFileTask(uploadDirectory, movedDataObject.id)) - }) - ) + logger.debug('Checking replication: Done') return deletionTasks } diff --git a/storage-node/src/services/sync/storageObligations.ts b/storage-node/src/services/sync/storageObligations.ts index 58f6b75ac6..165f8a30fe 100644 --- a/storage-node/src/services/sync/storageObligations.ts +++ b/storage-node/src/services/sync/storageObligations.ts @@ -1,12 +1,7 @@ import _ from 'lodash' import logger from '../logger' import { MAX_RESULTS_PER_QUERY, QueryNodeApi } from '../queryNode/api' -import { - DataObjectByBagIdsDetailsFragment, - DataObjectDetailsFragment, - StorageBagDetailsFragment, - StorageBucketDetailsFragment, -} from '../queryNode/generated/queries' +import { StorageBucketDetailsFragment } from '../queryNode/generated/queries' import { ApiPromise } from '@polkadot/api' import { PalletStorageStorageBucketRecord } from '@polkadot/types/lookup' @@ -25,9 +20,19 @@ export type DataObligations = { bags: Bag[] /** - * Assigned data objects for the storage provider. + * Map from bucket id to storage node url, without own buckets. */ - dataObjects: DataObject[] + bucketOperatorUrlById: Map + + /** + * Map from assigned bag ids to storage node urls. + */ + bagOperatorsUrlsById: Map + + /** + * A function that returns assigned data object ids + */ + getAssignedDataObjectIds(isAccepted?: boolean): Promise } /** @@ -102,30 +107,61 @@ export async function getStorageObligationsFromRuntime( qnApi: QueryNodeApi, bucketIds?: string[] ): Promise { - const allBuckets = await getAllBuckets(qnApi) + const storageBuckets = (await getAllBuckets(qnApi)).map((bucket) => ({ + id: bucket.id, + operatorUrl: bucket.operatorMetadata?.nodeEndpoint ?? '', + workerId: bucket.operatorStatus?.workerId, + })) + + const bags = ( + bucketIds === undefined ? await qnApi.getAllStorageBagsDetails() : await qnApi.getStorageBagsDetails(bucketIds) + ).map((bag) => ({ + id: bag.id, + buckets: bag.storageBuckets.map((bucketInBag) => bucketInBag.storageBucket.id), + })) + + const ownBuckets = new Set(bucketIds || []) + const ownOperatorUrls = new Set() + for (const bucket of storageBuckets) { + if (ownBuckets.has(bucket.id)) { + ownOperatorUrls.add(bucket.operatorUrl) + } + } - const assignedBags = - bucketIds === undefined ? await qnApi.getAllStorageBagsDetails() : await getAllAssignedBags(qnApi, bucketIds) + const bucketOperatorUrlById = new Map() + for (const bucket of storageBuckets) { + if (!ownBuckets.has(bucket.id)) { + if (ownOperatorUrls.has(bucket.operatorUrl)) { + logger.warn(`Skipping remote bucket ${bucket.id} - ${bucket.operatorUrl}`) + } else { + bucketOperatorUrlById.set(bucket.id, bucket.operatorUrl) + } + } + } + + const bagOperatorsUrlsById = new Map() + for (const bag of bags) { + const operatorUrls = [] + for (const bucketId of bag.buckets) { + const operatorUrl = bucketOperatorUrlById.get(bucketId) + if (operatorUrl) { + operatorUrls.push(operatorUrl) + } + } - const bagIds = assignedBags.map((bag) => bag.id) - const assignedDataObjects = await getAllAssignedDataObjects(qnApi, bagIds) + bagOperatorsUrlsById.set(bag.id, operatorUrls) + } const model: DataObligations = { - storageBuckets: allBuckets.map((bucket) => ({ - id: bucket.id, - operatorUrl: bucket.operatorMetadata?.nodeEndpoint ?? '', - workerId: bucket.operatorStatus?.workerId, - })), - bags: assignedBags.map((bag) => ({ - id: bag.id, - buckets: bag.storageBuckets.map((bucketInBag) => bucketInBag.storageBucket.id), - })), - dataObjects: assignedDataObjects.map((dataObject) => ({ - id: dataObject.id, - size: parseInt(dataObject.size), - bagId: dataObject.storageBag.id, - ipfsHash: dataObject.ipfsHash, - })), + storageBuckets, + bags, + bagOperatorsUrlsById, + bucketOperatorUrlById, + getAssignedDataObjectIds: (isAccepted?: boolean) => + qnApi.getDataObjectIdsByBagIds( + bags.map((b) => b.id), + isAccepted + ), } return model @@ -145,19 +181,6 @@ export async function getStorageBucketIdsByWorkerId(qnApi: QueryNodeApi, workerI return ids } -/** - * Get IDs of the data objects assigned to the bag ID. - * - * @param qnApi - initialized QueryNodeApi instance - * @param bagId - bag ID - * @returns data object IDs - */ -export async function getDataObjectIDsByBagId(qnApi: QueryNodeApi, bagId: string): Promise { - const dataObjects = await getAllAssignedDataObjects(qnApi, [bagId]) - - return dataObjects.map((obj) => obj.id) -} - /** * Get all storage buckets registered in the runtime (Query Node). * @@ -171,7 +194,7 @@ async function getAllBuckets(api: QueryNodeApi): Promise { const idsPart = ids.slice(offset, offset + limit) if (!_.isEmpty(idsPart)) { - logger.debug(`Sync - getting all storage buckets: offset = ${offset}, limit = ${limit}`) + logger.debug(`Getting all storage buckets: offset = ${offset}, limit = ${limit}`) return await api.getStorageBucketDetails(idsPart) } else { return false @@ -179,20 +202,6 @@ async function getAllBuckets(api: QueryNodeApi): Promise { - return await api.getDataObjectsByBagIds(bagIds) -} - /** * Get details of storage data objects by IDs. * @@ -200,22 +209,13 @@ async function getAllAssignedDataObjects( * @param dataObjectIds - data objects' IDs * @returns storage data objects */ -export async function getDataObjectsByIDs( - api: QueryNodeApi, - dataObjectIds: string[] -): Promise { - return await api.getDataObjectDetails(dataObjectIds) -} - -/** - * Get all bags assigned to storage provider. - * - * @param api - initialiazed QueryNodeApi instance - * @param bucketIds - assigned storage provider buckets' IDs - * @returns storage bag data - */ -async function getAllAssignedBags(api: QueryNodeApi, bucketIds: string[]): Promise { - return await api.getStorageBagsDetails(bucketIds) +export async function getDataObjectsByIDs(api: QueryNodeApi, dataObjectIds: string[]): Promise { + return (await api.getDataObjectsDetailsByIds(dataObjectIds)).map((o) => ({ + id: o.id, + size: parseInt(o.size), + bagId: o.storageBag.id, + ipfsHash: o.ipfsHash, + })) } /** diff --git a/storage-node/src/services/sync/synchronizer.ts b/storage-node/src/services/sync/synchronizer.ts index a4a7f0a409..2637d7acc7 100644 --- a/storage-node/src/services/sync/synchronizer.ts +++ b/storage-node/src/services/sync/synchronizer.ts @@ -1,7 +1,12 @@ import { getDataObjectIDs, isDataObjectIdInCache } from '../../services/caching/localDataObjects' import logger from '../../services/logger' import { QueryNodeApi } from '../queryNode/api' -import { DataObligations, getStorageObligationsFromRuntime } from './storageObligations' +import { + DataObject, + DataObligations, + getDataObjectsByIDs, + getStorageObligationsFromRuntime, +} from './storageObligations' import { DownloadFileTask } from './tasks' import { TaskProcessorSpawner, WorkingStack } from '../processing/workingProcess' import _ from 'lodash' @@ -30,6 +35,7 @@ export const PendingDirName = 'pending' * @param qnApi - Query Node API * @param uploadDirectory - local directory to get file names from * @param tempDirectory - local directory for temporary data uploading + * @param batchSize - maximum number of data objects to process in a single batch * @param selectedOperatorUrl - (optional) defines the data source URL. If not set * the source URL is resolved for each data object separately using the Query * Node information about the storage providers. @@ -41,6 +47,7 @@ export async function performSync( qnApi: QueryNodeApi, uploadDirectory: string, tempDirectory: string, + batchSize: number, hostId: string, selectedOperatorUrl?: string ): Promise { @@ -48,35 +55,37 @@ export async function performSync( const model = await getStorageObligationsFromRuntime(qnApi, buckets) const storedObjectIds = getDataObjectIDs() - const assignedObjects = model.dataObjects - const assignedObjectIds = assignedObjects.map((obj) => obj.id) + const assignedObjectIds = new Set(await model.getAssignedDataObjectIds(true)) - const added = assignedObjects.filter((obj) => !isDataObjectIdInCache(obj.id)) - const removed = _.difference(storedObjectIds, assignedObjectIds) + const unsyncedObjectIds = [...assignedObjectIds].filter((id) => !isDataObjectIdInCache(id)) + const obsoleteObjectsNum = storedObjectIds.reduce((count, id) => (assignedObjectIds.has(id) ? count : count + 1), 0) - logger.debug(`Sync - new objects: ${added.length}`) - logger.debug(`Sync - obsolete objects: ${removed.length}`) + logger.debug(`Sync - new objects: ${unsyncedObjectIds.length}`) + logger.debug(`Sync - obsolete objects: ${obsoleteObjectsNum}`) const workingStack = new WorkingStack() - - const addedTasks = await getDownloadTasks( - model, - buckets, - added, - uploadDirectory, - tempDirectory, - asyncWorkersTimeout, - hostId, - selectedOperatorUrl - ) - - logger.debug(`Sync - started processing...`) - const processSpawner = new TaskProcessorSpawner(workingStack, asyncWorkersNumber) - await workingStack.add(addedTasks) + // Process unsynced objects in batches + logger.debug(`Sync - started processing...`) + let processed = 0 + for (const unsyncedIdsBatch of _.chunk(unsyncedObjectIds, batchSize)) { + const objectsBatch = await getDataObjectsByIDs(qnApi, unsyncedIdsBatch) + const syncTasks = await getDownloadTasks( + model, + objectsBatch, + uploadDirectory, + tempDirectory, + asyncWorkersTimeout, + hostId, + selectedOperatorUrl + ) + await workingStack.add(syncTasks) + await processSpawner.process() + processed += objectsBatch.length + logger.debug(`Sync - processed ${processed} / ${unsyncedObjectIds.length} objects...`) + } - await processSpawner.process() logger.info('Sync ended.') } @@ -84,8 +93,7 @@ export async function performSync( * Creates the download tasks. * * @param dataObligations - defines the current data obligations for the node - * @param ownBuckets - list of bucket ids operated this node - * @param addedIds - data object IDs to download + * @param dataObjects - list of data objects to download * @param uploadDirectory - local directory for data uploading * @param tempDirectory - local directory for temporary data uploading * @param taskSink - a destination for the newly created tasks @@ -95,65 +103,18 @@ export async function performSync( */ export async function getDownloadTasks( dataObligations: DataObligations, - ownBuckets: string[], - added: DataObligations['dataObjects'], + dataObjects: DataObject[], uploadDirectory: string, tempDirectory: string, asyncWorkersTimeout: number, hostId: string, selectedOperatorUrl?: string ): Promise { - const bagIdByDataObjectId = new Map() - for (const entry of dataObligations.dataObjects) { - bagIdByDataObjectId.set(entry.id, entry.bagId) - } - - const ownOperatorUrls: string[] = [] - for (const entry of dataObligations.storageBuckets) { - if (ownBuckets.includes(entry.id)) { - ownOperatorUrls.push(entry.operatorUrl) - } - } - - const bucketOperatorUrlById = new Map() - for (const entry of dataObligations.storageBuckets) { - if (!ownBuckets.includes(entry.id)) { - if (ownOperatorUrls.includes(entry.operatorUrl)) { - logger.warn(`(sync) Skipping remote bucket ${entry.id} - ${entry.operatorUrl}`) - } else { - bucketOperatorUrlById.set(entry.id, entry.operatorUrl) - } - } - } - - const bagOperatorsUrlsById = new Map() - for (const entry of dataObligations.bags) { - const operatorUrls = [] - - for (const bucket of entry.buckets) { - if (bucketOperatorUrlById.has(bucket)) { - const operatorUrl = bucketOperatorUrlById.get(bucket) - if (operatorUrl) { - operatorUrls.push(operatorUrl) - } - } - } - - bagOperatorsUrlsById.set(entry.id, operatorUrls) - } - - const tasks = added.map((dataObject) => { - let operatorUrls: string[] = [] // can be empty after look up - let bagId = null - if (bagIdByDataObjectId.has(dataObject.id)) { - bagId = bagIdByDataObjectId.get(dataObject.id) - if (bagOperatorsUrlsById.has(bagId)) { - operatorUrls = bagOperatorsUrlsById.get(bagId) - } - } + const { bagOperatorsUrlsById } = dataObligations + const tasks = dataObjects.map((dataObject) => { return new DownloadFileTask( - selectedOperatorUrl ? [selectedOperatorUrl] : operatorUrls, + selectedOperatorUrl ? [selectedOperatorUrl] : bagOperatorsUrlsById.get(dataObject.bagId) || [], dataObject.id, dataObject.ipfsHash, dataObject.size, diff --git a/storage-node/src/services/webApi/controllers/stateApi.ts b/storage-node/src/services/webApi/controllers/stateApi.ts index 10273719f4..ea49a6d58c 100644 --- a/storage-node/src/services/webApi/controllers/stateApi.ts +++ b/storage-node/src/services/webApi/controllers/stateApi.ts @@ -8,7 +8,6 @@ import { promisify } from 'util' import { getDataObjectIDs } from '../../../services/caching/localDataObjects' import logger from '../../logger' import { QueryNodeApi } from '../../queryNode/api' -import { getDataObjectIDsByBagId } from '../../sync/storageObligations' import { DataObjectResponse, DataStatsResponse, @@ -168,7 +167,7 @@ async function getCachedDataObjectsObligations(qnApi: QueryNodeApi, bagId: strin const entryName = `data_object_obligations_${bagId}` if (!dataCache.has(entryName)) { - const data = await getDataObjectIDsByBagId(qnApi, bagId) + const data = await qnApi.getDataObjectIdsByBagId(bagId) dataCache.set(entryName, data) } diff --git a/yarn.lock b/yarn.lock index a7d5d565ac..6e9cdd6401 100644 --- a/yarn.lock +++ b/yarn.lock @@ -18274,7 +18274,7 @@ p-is-promise@^2.0.0: resolved "https://registry.npmjs.org/p-is-promise/-/p-is-promise-2.1.0.tgz" integrity sha512-Y3W0wlRPK8ZMRbNq97l4M5otioeA5lm1z7bkNkxCka8HSPjR0xRWmpCmc9utiaLP9Jb1eD8BgeIxTW4AIF45Pg== -p-limit@3.1.0, p-limit@^3.0.2: +p-limit@3.1.0, p-limit@^3, p-limit@^3.0.2: version "3.1.0" resolved "https://registry.npmjs.org/p-limit/-/p-limit-3.1.0.tgz" integrity sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ==