diff --git a/package-lock.json b/package-lock.json index 28760ca985..48f4a732c9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17614,9 +17614,9 @@ } }, "node_modules/@types/yargs": { - "version": "17.0.29", - "resolved": "https://registry.npmjs.org/@types/yargs/-/yargs-17.0.29.tgz", - "integrity": "sha512-nacjqA3ee9zRF/++a3FUY1suHTFKZeHba2n8WeDw9cCVdmzmHpIxyzOJBcpHvvEmS8E9KqWlSnWHUkOrkhWcvA==", + "version": "17.0.32", + "resolved": "https://registry.npmjs.org/@types/yargs/-/yargs-17.0.32.tgz", + "integrity": "sha512-xQ67Yc/laOG5uMfX/093MRlGGCIBzZMarVa+gfNKJxWAIgykYpVGkBdbqEzGDDfCrVUj6Hiff4mTZ5BA6TmAog==", "dependencies": { "@types/yargs-parser": "*" } @@ -55385,6 +55385,7 @@ "rxjs": "^7.2.0", "typeorm": "^0.3.15", "winston": "^3.8.2", + "yargs": "^17.7.2", "zksync-web3": "0.15.4" }, "devDependencies": { @@ -55394,6 +55395,7 @@ "@types/express": "^4.17.13", "@types/jest": "28.1.8", "@types/supertest": "^2.0.11", + "@types/yargs": "^17.0.32", "@typescript-eslint/eslint-plugin": "^5.0.0", "@typescript-eslint/parser": "^5.0.0", "eslint-config-prettier": "^8.3.0", diff --git a/packages/data-fetcher/.eslintrc.js b/packages/data-fetcher/.eslintrc.js index 151d5b63d3..8f5aedb718 100644 --- a/packages/data-fetcher/.eslintrc.js +++ b/packages/data-fetcher/.eslintrc.js @@ -15,7 +15,7 @@ module.exports = { node: true, jest: true, }, - ignorePatterns: ['.eslintrc.js', 'migrationScripts'], + ignorePatterns: ['.eslintrc.js'], rules: { '@typescript-eslint/interface-name-prefix': 'off', '@typescript-eslint/explicit-function-return-type': 'off', diff --git a/packages/worker/.eslintrc.js b/packages/worker/.eslintrc.js index 151d5b63d3..8f5aedb718 100644 --- a/packages/worker/.eslintrc.js +++ b/packages/worker/.eslintrc.js @@ -15,7 +15,7 @@ module.exports = { node: true, jest: true, }, - ignorePatterns: ['.eslintrc.js', 'migrationScripts'], + ignorePatterns: ['.eslintrc.js'], rules: { '@typescript-eslint/interface-name-prefix': 'off', '@typescript-eslint/explicit-function-return-type': 'off', diff --git a/packages/worker/migrationScripts/checkAddressBalances.js b/packages/worker/migrationScripts/checkAddressBalances.js deleted file mode 100644 index 5c03aa1786..0000000000 --- a/packages/worker/migrationScripts/checkAddressBalances.js +++ /dev/null @@ -1,78 +0,0 @@ -const { Client } = require("pg"); -const { Contract, BigNumber } = require("ethers"); -const zksync = require("zksync-web3"); - -const connectionString = process.env.DATABASE_URL; -const provider = new zksync.Provider(process.env.BLOCKCHAIN_RPC_URL); -const batchSize = parseInt(process.env.BATCH_SIZE, 10) || 1000; - -const fromHex = (buffer) => { - return `0x${buffer.toString("hex")}`; -} - -const toDbHexStr = (value) => { - return value.startsWith("0x") ? value.substring(2) : value; -} - -const getBalance = async (address, tokenAddress = zksync.utils.ETH_ADDRESS) => { - if (zksync.utils.isETH(tokenAddress)) { - return await provider.getBalance(address, "latest"); - } - - const erc20Contract = new Contract(tokenAddress, zksync.utils.IERC20, provider); - return await erc20Contract.balanceOf(address, { blockTag: "latest" }); -}; - -const getUpdateBalanceScript = async (balanceRecord) => { - const address = fromHex(balanceRecord.address); - const tokenAddress = fromHex(balanceRecord.tokenAddress); - let balance = null; - try { - balance = await getBalance(address, tokenAddress); - } catch (e) { - if (!(e.code === 'CALL_EXCEPTION' && e.method === 'balanceOf(address)' && !!e.transaction && - (e.message && e.message.startsWith("call revert exception")))) { - return ""; - } - } - if (balance && balance.eq(BigNumber.from(0))) { - return `UPDATE "addressBalances" SET "toDelete" = TRUE, checked = TRUE, "latestBalance" = '${balance.toString()}' WHERE address = decode('${toDbHexStr(address)}', 'hex') AND "tokenAddress" = decode('${toDbHexStr(tokenAddress)}', 'hex');`; - } else { - return `UPDATE "addressBalances" SET checked = TRUE, "latestBalance" = '${balance ? balance.toString() : null}' WHERE address = decode('${toDbHexStr(address)}', 'hex') AND "tokenAddress" = decode('${toDbHexStr(tokenAddress)}', 'hex');`; - } -}; - -const getNextRecordsBatch = async (pgClient) => { - let balances = await pgClient.query(`SELECT * FROM "addressBalances" WHERE checked = FALSE LIMIT ${batchSize};`); - return balances; -}; - -const main = async () => { - const client = new Client(connectionString); - await client.connect() - let batchNum = 0; - let balances = await getNextRecordsBatch(client); - while (balances && balances.rows.length) { - console.log(`Processing items ${batchNum * batchSize} - ${(batchNum + 1) * batchSize - 1}`); - batchNum += 1; - console.log('Getting balances:') - console.log(new Date()); - const updateScripts = await Promise.all(balances.rows.map(balanceRecord => getUpdateBalanceScript(balanceRecord))); - console.log(new Date()); - console.log('Updating DB:') - console.log(new Date()); - await client.query(updateScripts.join("")); - console.log(new Date()); - balances = await getNextRecordsBatch(client); - } -}; - -main() - .then(() => { - console.log("Done"); - process.exit(0); - }) - .catch((e) => { - console.error(e); - process.exit(0); - }); \ No newline at end of file diff --git a/packages/worker/migrationScripts/updateTransactions.js b/packages/worker/migrationScripts/updateTransactions.js deleted file mode 100644 index c5c1df43a6..0000000000 --- a/packages/worker/migrationScripts/updateTransactions.js +++ /dev/null @@ -1,58 +0,0 @@ -const { Client } = require("pg"); -const zksync = require("zksync-web3"); - -const connectionString = process.env.DATABASE_URL; -const provider = new zksync.Provider(process.env.BLOCKCHAIN_RPC_URL); -const batchSize = parseInt(process.env.BATCH_SIZE, 10) || 100; - -const fromHex = (buffer) => { - return `0x${buffer.toString("hex")}`; -} - -const toDbHexStr = (value) => { - return value.startsWith("0x") ? value.substring(2) : value; -} - -const getUpdateTransactionScript = async (transactionRecord) => { - const txHash = fromHex(transactionRecord.hash); - try { - const txDetails = await provider.getTransactionDetails(txHash); - return `UPDATE transactions SET fee = '${txDetails.fee}', "gasPerPubdata" = '${txDetails.gasPerPubdata}', "updatedAt" = CURRENT_TIMESTAMP WHERE hash = decode('${toDbHexStr(txHash)}', 'hex');`; - } catch (e) { - return ""; - } -}; - -const getNextRecordsBatch = async (pgClient) => { - return await pgClient.query(`SELECT hash FROM transactions WHERE "gasPerPubdata" IS NULL LIMIT $1;`, [batchSize]); -}; - -const main = async () => { - const client = new Client(connectionString); - await client.connect() - let batchNum = 0; - let transactions = await getNextRecordsBatch(client); - while (transactions && transactions.rows.length) { - console.log(`Processing items ${batchNum * batchSize} - ${(batchNum + 1) * batchSize - 1}`); - batchNum += 1; - console.log('Getting transactions:') - console.log(new Date()); - const updateScripts = await Promise.all(transactions.rows.map((transaction) => getUpdateTransactionScript(transaction))); - console.log(new Date()); - console.log('Updating DB:') - console.log(new Date()); - await client.query(updateScripts.join("")); - console.log(new Date()); - transactions = await getNextRecordsBatch(client); - } -}; - -main() - .then(() => { - console.log("Done"); - process.exit(0); - }) - .catch((e) => { - console.error(e); - process.exit(0); - }); \ No newline at end of file diff --git a/packages/worker/package.json b/packages/worker/package.json index f56ada9107..1b64487f5f 100644 --- a/packages/worker/package.json +++ b/packages/worker/package.json @@ -28,7 +28,8 @@ "migration:generate": "npm run typeorm migration:generate ./src/migrations/$npm_config_name -- -d ./src/typeorm.config.ts", "migration:create": "npm run typeorm migration:create ./src/migrations/$npm_config_name", "migration:run": "npm run typeorm migration:run -- -d ./src/typeorm.config.ts", - "migration:revert": "npm run typeorm migration:revert -- -d ./src/typeorm.config.ts" + "migration:revert": "npm run typeorm migration:revert -- -d ./src/typeorm.config.ts", + "migration-script:run": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register ./src/migrationScripts/{migrationFileName}.ts --runInBand" }, "dependencies": { "@nestjs/axios": "^3.0.0", @@ -50,6 +51,7 @@ "rxjs": "^7.2.0", "typeorm": "^0.3.15", "winston": "^3.8.2", + "yargs": "^17.7.2", "zksync-web3": "0.15.4" }, "devDependencies": { @@ -59,6 +61,7 @@ "@types/express": "^4.17.13", "@types/jest": "28.1.8", "@types/supertest": "^2.0.11", + "@types/yargs": "^17.0.32", "@typescript-eslint/eslint-plugin": "^5.0.0", "@typescript-eslint/parser": "^5.0.0", "eslint-config-prettier": "^8.3.0", @@ -105,7 +108,8 @@ ".module.ts", "src/logger.ts", "src/typeorm.config.ts", - "src/migrations" + "src/migrations", + "src/migrationScripts" ], "reporters": [ "default", diff --git a/packages/worker/src/migrationScripts/1709722093204-AddAddressTransferType.ts b/packages/worker/src/migrationScripts/1709722093204-AddAddressTransferType.ts new file mode 100644 index 0000000000..9b1997be90 --- /dev/null +++ b/packages/worker/src/migrationScripts/1709722093204-AddAddressTransferType.ts @@ -0,0 +1,106 @@ +import { config } from "dotenv"; +import { DataSource } from "typeorm"; +import yargs from "yargs"; +import { hideBin } from "yargs/helpers"; +import { setTimeout } from "timers/promises"; +import { typeOrmModuleOptions } from "../typeorm.config"; +import logger from "../logger"; + +config(); + +const QUERY_MAX_RETRIES = 5; +const QUERY_RETRY_MIN_INTERVAL_MS = 1000; + +// eslint-disable-next-line prefer-const +let { fromTransferNumber, toTransferNumber, updateBatchSize, parallelWorkers } = yargs(hideBin(process.argv)) + .options({ + fromTransferNumber: { + default: 0, + type: "number", + }, + toTransferNumber: { + default: 0, + type: "number", + }, + updateBatchSize: { + default: 4000, + type: "number", + }, + parallelWorkers: { + default: 50, + type: "number", + }, + }) + .parseSync(); + +const updateAddressTransfers = async (dataSource: DataSource, from: number, to: number, attempt = 0): Promise => { + try { + await dataSource.query( + `Update "addressTransfers" + Set "type" = "transfers".type::VARCHAR::"addressTransfers_type_enum" + From "transfers" + WHERE "transfers"."number" = "addressTransfers"."transferNumber" + AND "transfers"."number" >= ${from} + AND "transfers"."number" < ${to} + AND "transfers"."type" != 'transfer'` + ); + } catch (error) { + if (attempt >= QUERY_MAX_RETRIES) { + logger.error(`Failed to update AddressTransfers from ${from} to ${to} after ${QUERY_MAX_RETRIES} retries.`); + throw error; + } + await setTimeout(QUERY_RETRY_MIN_INTERVAL_MS * Math.pow(2, attempt)); + logger.error(`Failed to update AddressTransfers from ${from} to ${to}, retrying...`); + return updateAddressTransfers(dataSource, from, to, attempt + 1); + } +}; + +const main = async () => { + const typeOrmCliDataSource = new DataSource(typeOrmModuleOptions); + await typeOrmCliDataSource.initialize(); + + if (!toTransferNumber) { + const lastTransferNumber = await typeOrmCliDataSource.query( + `Select "number" from "transfers" order by "number" DESC limit 1;` + ); + toTransferNumber = parseInt(lastTransferNumber[0].number, 10); + } + logger.log( + `Starting migration with params: { fromTransferNumber: ${fromTransferNumber}, toTransferNumber: ${toTransferNumber}, updateBatchSize: ${updateBatchSize}, parallelWorkers: ${parallelWorkers} }` + ); + + let cursor = fromTransferNumber; + while (cursor <= toTransferNumber) { + const tasks = []; + for (let workerIndex = 0; workerIndex < parallelWorkers; workerIndex++) { + const batchStartNumber = cursor + workerIndex * updateBatchSize; + if (batchStartNumber > toTransferNumber) { + break; + } + let batchEndNumber = batchStartNumber + updateBatchSize; + if (batchEndNumber > toTransferNumber) { + batchEndNumber = toTransferNumber + 1; + } + tasks.push(updateAddressTransfers(typeOrmCliDataSource, batchStartNumber, batchEndNumber)); + } + await Promise.all(tasks); + + logger.log( + `Updated address transfers from ${cursor} to ${ + cursor + parallelWorkers * updateBatchSize + }. Time: ${new Date().toJSON()}.` + ); + cursor = cursor + parallelWorkers * updateBatchSize; + } +}; + +main() + .then(() => { + logger.log(`Migration script 1709722093204-AddAddressTransferType executed successfully.`); + process.exit(0); + }) + .catch((error) => { + logger.error(`Migration script 1709722093204-AddAddressTransferType failed.`); + logger.error(error); + process.exit(0); + });