diff --git a/package.json b/package.json index 8a8f9fc..e8c3441 100644 --- a/package.json +++ b/package.json @@ -20,6 +20,7 @@ "express": "^4.18.2", "firebase-admin": "^10.0.2", "ioredis": "^5.3.2", + "lodash": "^4.17.21", "memoizee": "^0.4.15", "pino": "^6.11.2", "pino-http": "^5.5.0", diff --git a/src/index.ts b/src/index.ts index c3c5826..e21e9e8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -31,7 +31,7 @@ import logger from './utils/logger'; import { getSignedMoonPayUrl } from './utils/moonpay/get-signed-moonpay-url'; import SingleQueryDataProvider from './utils/SingleQueryDataProvider'; import { tezExchangeRateProvider } from './utils/tezos'; -import { tokensExchangeRatesProvider } from './utils/tokens'; +import { getExchangeRatesFromDB } from './utils/tokens'; const PINO_LOGGER = { logger: logger.child({ name: 'web' }), @@ -166,28 +166,18 @@ app.get('/api/abtest', (_, res) => { app.get('/api/exchange-rates/tez', makeProviderDataRequestHandler(tezExchangeRateProvider)); app.get('/api/exchange-rates', async (_req, res) => { - const { data: tokensExchangeRates, error: tokensExchangeRatesError } = await getProviderStateWithTimeout( - tokensExchangeRatesProvider - ); + const tokensExchangeRates = await getExchangeRatesFromDB(); const { data: tezExchangeRate, error: tezExchangeRateError } = await getProviderStateWithTimeout( tezExchangeRateProvider ); - if (tokensExchangeRatesError !== undefined) { - return res.status(500).send({ - error: tokensExchangeRatesError.message - }); - } else if (tezExchangeRateError !== undefined) { + + if (tezExchangeRateError !== undefined) { return res.status(500).send({ error: tezExchangeRateError.message }); - } else { - if (tokensExchangeRates !== undefined && tezExchangeRate !== undefined) { - return res.json([ - ...tokensExchangeRates.map(({ ...restProps }) => restProps), - { exchangeRate: tezExchangeRate.toString() } - ]); - } } + + res.json([...tokensExchangeRates, { exchangeRate: tezExchangeRate.toString() }]); }); app.get('/api/moonpay-sign', async (_req, res) => { diff --git a/src/utils/three-route.ts b/src/utils/three-route.ts index 4782feb..1a89c89 100644 --- a/src/utils/three-route.ts +++ b/src/utils/three-route.ts @@ -25,7 +25,7 @@ export interface ThreeRouteChain { } // TODO: add axios adapter and change type if precision greater than of standard js number type is necessary -export interface ThreeRouteSwapResponse { +export interface ThreeRouteClassicSwapResponse { input: number; output: number; chains: ThreeRouteChain[]; @@ -34,8 +34,8 @@ export interface ThreeRouteSwapResponse { export interface ThreeRouteSirsSwapResponse { input: number; output: number; - tzbtcChain: ThreeRouteSwapResponse; - xtzChain: ThreeRouteSwapResponse; + tzbtcChain: ThreeRouteClassicSwapResponse; + xtzChain: ThreeRouteClassicSwapResponse; } interface ThreeRouteTokenCommon { @@ -98,11 +98,13 @@ export interface ThreeRouteDex { type ThreeRouteQueryParams = object | SwapQueryParams; type ThreeRouteQueryResponse = - | ThreeRouteSwapResponse + | ThreeRouteClassicSwapResponse | ThreeRouteSirsSwapResponse | ThreeRouteDex[] | ThreeRouteToken[]; +export type ThreeRouteSwapResponse = ThreeRouteClassicSwapResponse | ThreeRouteSirsSwapResponse; + export const THREE_ROUTE_SIRS_SYMBOL = 'SIRS'; const threeRouteBuildQueryFn = makeBuildQueryFn( @@ -111,18 +113,17 @@ const threeRouteBuildQueryFn = makeBuildQueryFn(({ inputTokenSymbol, outputTokenSymbol, realAmount }) => { - const isSirsSwap = inputTokenSymbol === THREE_ROUTE_SIRS_SYMBOL || outputTokenSymbol === THREE_ROUTE_SIRS_SYMBOL; +export const getThreeRouteSwap = threeRouteBuildQueryFn( + ({ inputTokenSymbol, outputTokenSymbol, realAmount }) => { + const isSirsSwap = inputTokenSymbol === THREE_ROUTE_SIRS_SYMBOL || outputTokenSymbol === THREE_ROUTE_SIRS_SYMBOL; - return `/${isSirsSwap ? 'swap-sirs' : 'swap'}/${inputTokenSymbol}/${outputTokenSymbol}/${realAmount}`; -}); + return `/${isSirsSwap ? 'swap-sirs' : 'swap'}/${inputTokenSymbol}/${outputTokenSymbol}/${realAmount}`; + } +); export const getThreeRouteDexes = threeRouteBuildQueryFn('/dexes', []); export const getThreeRouteTokens = threeRouteBuildQueryFn('/tokens', []); -export const getChains = (response: ThreeRouteSwapResponse | ThreeRouteSirsSwapResponse) => +export const getChains = (response: ThreeRouteSwapResponse) => 'chains' in response ? response.chains : [...response.xtzChain.chains, ...response.tzbtcChain.chains]; diff --git a/src/utils/tokens.ts b/src/utils/tokens.ts index 5bcf340..2212f3d 100644 --- a/src/utils/tokens.ts +++ b/src/utils/tokens.ts @@ -1,6 +1,8 @@ import { BigNumber } from 'bignumber.js'; +import { differenceBy, isEqual } from 'lodash'; import { IPriceHistory } from '../interfaces/price-history.interfaces'; +import { redisClient } from '../redis'; import { blockFinder, EMPTY_BLOCK } from './block-finder'; import DataProvider from './DataProvider'; import fetch from './fetch'; @@ -18,31 +20,74 @@ import { ThreeRouteFa12Token, ThreeRouteFa2Token, getChains, - THREE_ROUTE_SIRS_SYMBOL + THREE_ROUTE_SIRS_SYMBOL, + ThreeRouteSwapResponse, + ThreeRouteDex } from './three-route'; import { BcdTokenData, mapTzktTokenDataToBcdTokenData, tokensMetadataProvider } from './tzkt'; +interface SwapsResponse { + directSwap: ThreeRouteSwapResponse; + invertedSwap: ThreeRouteSwapResponse; + updatedAt: string; +} + +interface TokenExchangeRateEntry { + tokenAddress: string; + tokenId?: number; + exchangeRate: BigNumber; + metadata?: BcdTokenData; + swapsUpdatedAt?: string; +} + +class TimeoutError extends Error {} + const tokensListProvider = new SingleQueryDataProvider(30000, () => getThreeRouteTokens({})); const dexesListProvider = new SingleQueryDataProvider(30000, () => getThreeRouteDexes({})); const THREE_ROUTE_TEZ_SYMBOL = 'XTZ'; -const PROBE_TEZ_AMOUNT = 10; -const probeSwapsProvider = new DataProvider(Infinity, async (outputTokenSymbol: string) => { +const EMPTY_SWAP = { + input: 0, + output: 0, + chains: [] +}; + +const ASPENCOIN_ADDRESS = 'KT1S5iPRQ612wcNm6mXDqDhTNegGFcvTV7vM'; + +const EXCHANGE_RATES_STORAGE_KEY = 'exchange_rates'; +const PREV_DEXES_LIST_STORAGE_KEY = 'prev_dexes_list'; + +const getToTezExchangeRatesVersions = ({ directSwap, invertedSwap }: SwapsResponse) => { + const toTezExchangeRatesVersions: BigNumber[] = []; + if (directSwap.output !== 0) { + toTezExchangeRatesVersions.push(new BigNumber(directSwap.input).div(directSwap.output)); + } + if (invertedSwap.output !== 0) { + toTezExchangeRatesVersions.push(new BigNumber(invertedSwap.output).div(invertedSwap.input)); + } + + return toTezExchangeRatesVersions; +}; + +const assertSmallRatesDifference = (swapResponse: SwapsResponse) => { + const toTezExchangeRatesVersions = getToTezExchangeRatesVersions(swapResponse); + const minRate = BigNumber.min(...toTezExchangeRatesVersions); + const maxRate = BigNumber.max(...toTezExchangeRatesVersions); + if (minRate.div(maxRate).lt(0.9)) { + throw new Error('Prices difference is too big'); + } +}; + +const getSwapsByTezAmount = async (outputTokenSymbol: string, tezAmount: number) => { + const updatedAt = new Date().toISOString(); const directSwap = await getThreeRouteSwap({ inputTokenSymbol: THREE_ROUTE_TEZ_SYMBOL, outputTokenSymbol, - realAmount: PROBE_TEZ_AMOUNT + realAmount: tezAmount }); if (directSwap.output === 0) { - return { - directSwap, - invertedSwap: { - input: 0, - output: 0, - chains: [] - } - }; + throw new Error('Failed to get direct swap'); } const invertedSwap = await getThreeRouteSwap({ @@ -51,19 +96,54 @@ const probeSwapsProvider = new DataProvider(Infinity, async (outputTokenSymbol: realAmount: directSwap.output }); - return { directSwap, invertedSwap }; -}); + const response = { directSwap, invertedSwap, updatedAt }; + assertSmallRatesDifference(response); -export class TimeoutError extends Error {} + return response; +}; -export type TokenExchangeRateEntry = { - tokenAddress: string; - tokenId?: number; - exchangeRate: BigNumber; - metadata?: BcdTokenData; +const getSwapsByOneToken = async (outputTokenSymbol: string) => { + const updatedAt = new Date().toISOString(); + const invertedSwap = await getThreeRouteSwap({ + inputTokenSymbol: outputTokenSymbol, + outputTokenSymbol: THREE_ROUTE_TEZ_SYMBOL, + realAmount: 1 + }); + + if (invertedSwap.output === 0) { + throw new Error(`Failed to get swaps for 1 ${outputTokenSymbol}`); + } + + const directSwap = await getThreeRouteSwap({ + inputTokenSymbol: THREE_ROUTE_TEZ_SYMBOL, + outputTokenSymbol, + realAmount: invertedSwap.output + }); + + return { directSwap, invertedSwap, updatedAt }; }; -export const ASPENCOIN_ADDRESS = 'KT1S5iPRQ612wcNm6mXDqDhTNegGFcvTV7vM'; +const getSwaps = async (outputTokenSymbol: string) => { + try { + return await getSwapsByTezAmount(outputTokenSymbol, 10); + } catch {} + + try { + return await getSwapsByTezAmount(outputTokenSymbol, 1); + } catch {} + + try { + return await getSwapsByOneToken(outputTokenSymbol); + } catch { + return { directSwap: EMPTY_SWAP, invertedSwap: EMPTY_SWAP, updatedAt: new Date().toISOString() }; + } +}; + +const probeSwapsProvider = new DataProvider(Infinity, getSwaps); + +const rejectOnTimeout = (timeoutMs: number) => + new Promise((_, rej) => setTimeout(() => rej(new TimeoutError()), timeoutMs)); + tokensMetadataProvider.subscribe(ASPENCOIN_ADDRESS); const getTokensExchangeRates = async (): Promise => { @@ -82,14 +162,13 @@ const getTokensExchangeRates = async (): Promise => { (token): token is ThreeRouteFa12Token | ThreeRouteFa2Token => token.standard !== ThreeRouteStandardEnum.xtz ) .map(async (token): Promise => { - logger.info(`Getting exchange rate for ${token.symbol}`); const { contract, tokenId: rawTokenId } = token; const tokenId = isDefined(rawTokenId) ? Number(rawTokenId) : undefined; await probeSwapsProvider.subscribe(token.symbol); try { const { data: probeSwaps, error: swapError } = await Promise.race([ probeSwapsProvider.get(token.symbol), - new Promise((_, rej) => setTimeout(() => rej(new TimeoutError()), 10000)) + rejectOnTimeout(10000) ]); await tokensMetadataProvider.subscribe(contract, tokenId); const { data: metadata } = await tokensMetadataProvider.get(contract, tokenId); @@ -99,14 +178,7 @@ const getTokensExchangeRates = async (): Promise => { throw swapError; } - const { directSwap, invertedSwap } = probeSwaps; - const toTezExchangeRatesVersions: BigNumber[] = []; - if (directSwap.output !== 0) { - toTezExchangeRatesVersions.push(new BigNumber(PROBE_TEZ_AMOUNT).div(directSwap.output)); - } - if (invertedSwap.output !== 0) { - toTezExchangeRatesVersions.push(new BigNumber(invertedSwap.output).div(invertedSwap.input)); - } + const toTezExchangeRatesVersions = getToTezExchangeRatesVersions(probeSwaps); const exchangeRate = toTezExchangeRatesVersions.length === 0 ? new BigNumber(0) @@ -118,7 +190,8 @@ const getTokensExchangeRates = async (): Promise => { tokenAddress: contract, tokenId, exchangeRate, - metadata: mapTzktTokenDataToBcdTokenData(metadata?.[0]) + metadata: mapTzktTokenDataToBcdTokenData(metadata?.[0]), + swapsUpdatedAt: probeSwaps.updatedAt }; } catch (e) { if (e instanceof TimeoutError) { @@ -166,7 +239,46 @@ const getTokensExchangeRates = async (): Promise => { return [...exchangeRates].filter(({ exchangeRate }) => !exchangeRate.eq(0)); }; -export const tokensExchangeRatesProvider = new SingleQueryDataProvider(60000, getTokensExchangeRates); +const tokensExchangeRatesProvider = new SingleQueryDataProvider(60000, getTokensExchangeRates); + +export const getExchangeRatesFromDB = async (): Promise => { + const rawValue = await redisClient.get(EXCHANGE_RATES_STORAGE_KEY); + + return JSON.parse(rawValue ?? '[]'); +}; + +const updateExchangeRatesInDB = async () => { + const prevExchangeRates = await getExchangeRatesFromDB(); + const prevIndexedExchangeRates = Object.fromEntries( + prevExchangeRates.map(exchangeRate => [`${exchangeRate.tokenAddress}_${exchangeRate.tokenId}`, exchangeRate]) + ); + const { data: exchangeRatesUpdates, error: exchangeRatesError } = await tokensExchangeRatesProvider.getState(); + + if (exchangeRatesError) { + return; + } + + const indexedExchangeRatesUpdates = Object.fromEntries( + exchangeRatesUpdates.map(exchangeRate => [`${exchangeRate.tokenAddress}_${exchangeRate.tokenId}`, exchangeRate]) + ); + + const newExchangeRates = Object.values({ + ...prevIndexedExchangeRates, + ...indexedExchangeRatesUpdates + }); + + await redisClient.set(EXCHANGE_RATES_STORAGE_KEY, JSON.stringify(newExchangeRates)); +}; +updateExchangeRatesInDB().catch(logger.error); + +const getPrevDexesList = async (): Promise => { + const rawValue = await redisClient.get(PREV_DEXES_LIST_STORAGE_KEY); + + return isDefined(rawValue) ? JSON.parse(rawValue) : null; +}; + +const setPrevDexesList = async (dexesList: ThreeRouteDex[]) => + redisClient.set(PREV_DEXES_LIST_STORAGE_KEY, JSON.stringify(dexesList)); const swapsUpdateSemaphore = new PromisifiedSemaphore(); blockFinder(EMPTY_BLOCK, async block => @@ -180,14 +292,29 @@ blockFinder(EMPTY_BLOCK, async block => throw tokensError ?? dexesError; } + let dexesListChanged = false; + const prevDexesList = await getPrevDexesList(); + if (prevDexesList) { + const createdOrUpdatedDexesList = differenceBy(dexes, prevDexesList, isEqual); + const deletedOrUpdatedDexesList = differenceBy(prevDexesList, dexes, isEqual); + dexesListChanged = createdOrUpdatedDexesList.length > 0 || deletedOrUpdatedDexesList.length > 0; + } + await setPrevDexesList(dexes); + + if (dexesListChanged) { + logger.info('dexes list changed, refreshing all tokens exchange rates'); + } + const outputsUpdatesFlags = await Promise.all( tokens.map(async token => { if (token.symbol === THREE_ROUTE_TEZ_SYMBOL) { return false; } - if (token.symbol === THREE_ROUTE_SIRS_SYMBOL) { + if (dexesListChanged || token.symbol === THREE_ROUTE_SIRS_SYMBOL) { // Swap output for SIRS should be updated each block because of baking subsidy + await probeSwapsProvider.refetchInSubscription(token.symbol); + return true; } @@ -195,7 +322,7 @@ blockFinder(EMPTY_BLOCK, async block => await probeSwapsProvider.subscribe(token.symbol); const { data: probeSwaps, error: swapError } = await Promise.race([ probeSwapsProvider.get(token.symbol), - new Promise((_, rej) => setTimeout(() => rej(new TimeoutError()), 10000)) + rejectOnTimeout(10000) ]); if (swapError) { @@ -205,6 +332,14 @@ blockFinder(EMPTY_BLOCK, async block => const { directSwap, invertedSwap } = probeSwaps; const directSwapChains = getChains(directSwap); const invertedSwapChains = getChains(invertedSwap); + + if (directSwapChains.length === 0) { + logger.info(`updating swap output for token ${token.symbol} because of direct swap chains absence`); + await probeSwapsProvider.refetchInSubscription(token.symbol); + + return true; + } + const dexesAddresses = directSwapChains .concat(invertedSwapChains) .map(chain => chain.hops.map(hop => dexes.find(dex => dex.id === hop.dex)?.contract).filter(isDefined)) @@ -229,6 +364,7 @@ blockFinder(EMPTY_BLOCK, async block => if (outputsUpdatesFlags.some(flag => flag)) { logger.info('refreshing tokens exchange rates because of swaps outputs updates'); await tokensExchangeRatesProvider.refetch(); + await updateExchangeRatesInDB(); } logger.info(`stats updated for level ${block.header.level}`); })