diff --git a/.github/workflows/proxy-docker-dev.yml b/.github/workflows/proxy-docker-dev.yml index cf9fd6eec..fbb5fbd2b 100644 --- a/.github/workflows/proxy-docker-dev.yml +++ b/.github/workflows/proxy-docker-dev.yml @@ -31,7 +31,7 @@ jobs: sh .github/workflows/scripts/proxyVersion.sh - name: Build and push - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v6 with: push: true platforms: amd64 @@ -39,7 +39,9 @@ jobs: cache-to: type=gha,mode=max file: ./apps/indexer-proxy/proxy/Dockerfile tags: subquerynetwork/indexer-proxy-dev:v${{ steps.fetch-version.outputs.VERSION }},subquerynetwork/indexer-proxy-dev:latest - build-args: RELEASE_VERSION=${{ steps.fetch-version.outputs.VERSION }} + build-args: | + RELEASE_VERSION=${{ steps.fetch-version.outputs.VERSION }} + SECRETS_SENTRY_DSN=${{ secrets.SENTRY_DSN }} - name: Image digest run: echo ${{ steps.docker_build.outputs.digest }} diff --git a/.github/workflows/proxy-docker-prod.yml b/.github/workflows/proxy-docker-prod.yml index e7bf7a008..5eb925b37 100644 --- a/.github/workflows/proxy-docker-prod.yml +++ b/.github/workflows/proxy-docker-prod.yml @@ -31,7 +31,7 @@ jobs: sh .github/workflows/scripts/proxyVersion.sh - name: Build and push - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v6 with: push: true platforms: amd64,arm64 @@ -40,6 +40,8 @@ jobs: file: ./apps/indexer-proxy/proxy/Dockerfile tags: subquerynetwork/indexer-proxy:v${{ steps.fetch-version.outputs.VERSION }} build-args: RELEASE_VERSION=${{ steps.fetch-version.outputs.VERSION }} + secrets: | + SECRETS_SENTRY_DSN=${{ secrets.SENTRY_DSN }} - name: Image digest run: echo ${{ steps.docker_build.outputs.digest }} diff --git a/apps/indexer-admin/src/pages/project-details/components/rpcSetting.tsx b/apps/indexer-admin/src/pages/project-details/components/rpcSetting.tsx index cc7766a60..d331c43d4 100644 --- a/apps/indexer-admin/src/pages/project-details/components/rpcSetting.tsx +++ b/apps/indexer-admin/src/pages/project-details/components/rpcSetting.tsx @@ -6,6 +6,7 @@ import { useParams } from 'react-router'; import { useLazyQuery, useMutation, useQuery } from '@apollo/client'; import { Spinner, Steps, Typography } from '@subql/components'; import { cidToBytes32 } from '@subql/network-clients'; +import { useUpdate } from 'ahooks'; import { Button, Form, Input, InputNumber } from 'antd'; import { Rule } from 'antd/es/form'; import debounce from 'debounce-promise'; @@ -23,6 +24,20 @@ interface IProps { id?: string; } +const getRuleField = (key: string) => { + const lowerCaseKey = key.toLowerCase(); + + if (lowerCaseKey.includes('metrics')) { + return 'metrics'; + } + + if (lowerCaseKey.includes('http')) { + return 'http'; + } + + return 'ws'; +}; + const RpcSetting: FC = (props) => { const { onCancel, onSubmit, id: propsId } = props; @@ -30,6 +45,11 @@ const RpcSetting: FC = (props) => { const mineId = useMemo(() => propsId || id, [propsId, id]); const projectQuery = useProjectDetails(mineId); const [form] = Form.useForm(); + const update = useUpdate(); + const [loadingUpdate, setLoadingUpdate] = React.useState(false); + const inputFieldFeedback = React.useRef<{ + [key in string]: 'warn' | 'error' | 'success' | 'validating' | ''; + }>({}); const keys = useQuery<{ getRpcEndpointKeys: string[] }>(GET_RPC_ENDPOINT_KEYS, { variables: { @@ -39,15 +59,30 @@ const RpcSetting: FC = (props) => { const rules = useMemo(() => { const checkIfWsAndHttpSame = () => { - const allValues = keys.data?.getRpcEndpointKeys.map((key) => { - try { - return new URL(form.getFieldValue(key)).hostname; - } catch { - return form.getFieldValue(key); - } - }); + const allValues = keys.data?.getRpcEndpointKeys + .map((key) => { + try { + return { + key, + val: new URL(form.getFieldValue(key)).hostname, + }; + } catch { + return { + key, + val: form.getFieldValue(key), + }; + } + }) + .filter((i) => i.val); - const ifSame = new Set(allValues).size === 1; + if ((allValues?.length || 0) < 2) { + return { + result: false, + message: 'Please input endpoint', + }; + } + + const ifSame = new Set(allValues?.map((i) => i.val)).size === 1; if (ifSame) return { @@ -56,15 +91,25 @@ const RpcSetting: FC = (props) => { return { result: false, - message: 'The origin of Ws and Http endpoint should be the same.', + message: 'The origin of all endpoint should be the same.', }; }; const polkadotAndSubstrateRule = ( - endpointType: 'http' | 'ws', + endpointType: 'http' | 'ws' | 'metrics', value: string, ruleField: string ) => { + if (endpointType === 'metrics') { + if (value) { + return checkIfWsAndHttpSame(); + } + + return { + result: true, + }; + } + if (endpointType === 'ws') { if (!value) return { @@ -112,7 +157,19 @@ const RpcSetting: FC = (props) => { }; return { - evm: (endpointType: 'http' | 'ws', value: string, ruleField: string) => { + evm: (endpointType: 'http' | 'ws' | 'metrics', value: string, ruleField: string) => { + if (endpointType === 'metrics') { + const metricsVal = form.getFieldValue(ruleField); + + if (metricsVal) { + return checkIfWsAndHttpSame(); + } + + return { + result: true, + }; + } + if (endpointType === 'ws') { const wsVal = form.getFieldValue(ruleField.replace('Http', 'Ws')); if (wsVal && wsVal?.startsWith('ws')) { @@ -146,7 +203,7 @@ const RpcSetting: FC = (props) => { }, [form, keys.data?.getRpcEndpointKeys]); const [validate] = useLazyQuery< - { validateRpcEndpoint: { valid: boolean; reason?: string } }, + { validateRpcEndpoint: { valid: boolean; reason?: string; level: 'warn' | 'error' | '' } }, { projectId: string; endpointKey: string; endpoint: string } >(VALID_RPC_ENDPOINT); @@ -160,6 +217,13 @@ const RpcSetting: FC = (props) => { // @ts-ignore const ruleField = rule.field as string; const ruleKeys = Object.keys(rules); + + inputFieldFeedback.current = { + ...inputFieldFeedback.current, + [ruleField]: 'validating', + }; + update(); + const whichRule = rules[ ruleKeys.find((key) => ruleField.includes(key as string)) as @@ -167,18 +231,27 @@ const RpcSetting: FC = (props) => { | 'polkadot' | 'substrate' ]; - const { result, message } = whichRule( - ruleField.toLocaleLowerCase().includes('http') ? 'http' : 'ws', - value, - ruleField - ); + + const { result, message } = whichRule(getRuleField(ruleField), value, ruleField); if (!result) { + inputFieldFeedback.current = { + ...inputFieldFeedback.current, + [ruleField]: 'error', + }; + update(); return Promise.reject(new Error(message)); } // whichRule should validate if the field can be empty, if empty, just true - if (!value) return Promise.resolve(); + if (!value) { + inputFieldFeedback.current = { + ...inputFieldFeedback.current, + [ruleField]: 'success', + }; + update(); + return Promise.resolve(); + } const res = await validate({ variables: { @@ -190,10 +263,20 @@ const RpcSetting: FC = (props) => { fetchPolicy: 'network-only', }, }); - + inputFieldFeedback.current = { + ...inputFieldFeedback.current, + [ruleField]: res?.data?.validateRpcEndpoint.level || '', + }; + update(); if (!res?.data?.validateRpcEndpoint.valid) { return Promise.reject(new Error(res?.data?.validateRpcEndpoint.reason)); } + + inputFieldFeedback.current = { + ...inputFieldFeedback.current, + [ruleField]: 'success', + }; + update(); // verification RPC endpoint return Promise.resolve(); }; @@ -204,6 +287,7 @@ const RpcSetting: FC = (props) => { [key]: debounce(validateFunc, 1000), }; }, {}) as Record Promise | void>; + // eslint-disable-next-line react-hooks/exhaustive-deps }, [validate, mineId, rules, keys.data?.getRpcEndpointKeys]); if (!projectQuery.data) return ; @@ -266,21 +350,35 @@ const RpcSetting: FC = (props) => { > {keys.data?.getRpcEndpointKeys.map((key) => { return ( - { - return { - validator: debouncedValidator[key], - }; - }, - ]} - > - - + + { + return { + validator: debouncedValidator[key], + }; + }, + ]} + > + + + ); })} @@ -310,43 +408,62 @@ const RpcSetting: FC = (props) => { shape="round" type="primary" style={{ borderColor: 'var(--sq-blue600)', background: 'var(--sq-blue600)' }} + loading={loadingUpdate} onClick={async () => { - await form.validateFields(); - const serviceEndpoints = keys.data?.getRpcEndpointKeys - .map((key) => { - return { - key, - value: form.getFieldValue(`${key}`)?.trim(), - }; - }) - .filter((i) => i.value); - try { - await startProjectRequest({ - variables: { - rateLimit: form.getFieldValue('rateLimit'), - poiEnabled: false, - queryVersion: '', - nodeVersion: '', - networkDictionary: '', - networkEndpoints: '', - batchSize: 1, - workers: 1, - timeout: 1, - cache: 1, - cpu: 1, - memory: 1, - id: mineId, - projectType: projectQuery.data?.project.projectType, - serviceEndpoints, - }, - }); - onSubmit?.(); - } catch (e) { - parseError(e, { - alert: true, - rawMsg: true, - }); + setLoadingUpdate(true); + try { + await form.validateFields(); + } catch (e: any) { + // ValidateErrorEntity + if (e?.errorFields && Array.isArray(e.errorFields)) { + // eslint-disable-next-line no-restricted-syntax + for (const err of e.errorFields) { + const name = err?.name?.[0] as string; + if (inputFieldFeedback.current[name] !== 'warn') { + return; + } + } + } + } + const serviceEndpoints = keys.data?.getRpcEndpointKeys + .map((key) => { + return { + key, + value: form.getFieldValue(`${key}`)?.trim(), + }; + }) + .filter((i) => i.value); + + try { + await startProjectRequest({ + variables: { + rateLimit: form.getFieldValue('rateLimit'), + poiEnabled: false, + queryVersion: '', + nodeVersion: '', + networkDictionary: '', + networkEndpoints: '', + batchSize: 1, + workers: 1, + timeout: 1, + cache: 1, + cpu: 1, + memory: 1, + id: mineId, + projectType: projectQuery.data?.project.projectType, + serviceEndpoints, + }, + }); + onSubmit?.(); + } catch (e) { + parseError(e, { + alert: true, + rawMsg: true, + }); + } + } finally { + setLoadingUpdate(false); } }} > @@ -374,4 +491,14 @@ export const HorizeFormItem = styled.div` } `; +const WithWarning = styled.div` + .ant-form-item-has-warning { + .ant-row { + .ant-form-item-explain-error { + color: var(--sq-warning); + } + } + } +`; + export default RpcSetting; diff --git a/apps/indexer-admin/src/pages/project-details/projectDetailsPage.tsx b/apps/indexer-admin/src/pages/project-details/projectDetailsPage.tsx index 9ea267a1b..baa8c8a56 100644 --- a/apps/indexer-admin/src/pages/project-details/projectDetailsPage.tsx +++ b/apps/indexer-admin/src/pages/project-details/projectDetailsPage.tsx @@ -177,9 +177,11 @@ const ProjectDetailsPage = () => { const projectStatus = useMemo(() => { if (!metadata) return ProjectStatus.Unknown; - - if (projectQuery.data?.project.projectConfig?.serviceEndpoints?.length) { - if (projectQuery.data?.project.projectConfig.serviceEndpoints.every((i) => i.valid)) { + const endpoints = projectQuery.data?.project.projectConfig?.serviceEndpoints.filter( + (i) => !['evmMetricsHttp', 'polkadotMetricsHttp'].includes(i.key) + ); + if (endpoints?.length) { + if (endpoints.every((i) => i.valid)) { return ProjectStatus.Ready; } diff --git a/apps/indexer-admin/src/utils/queries.ts b/apps/indexer-admin/src/utils/queries.ts index 3642e06e0..b8fd6d094 100644 --- a/apps/indexer-admin/src/utils/queries.ts +++ b/apps/indexer-admin/src/utils/queries.ts @@ -389,6 +389,7 @@ export const VALID_RPC_ENDPOINT = gql` validateRpcEndpoint(projectId: $projectId, endpointKey: $endpointKey, endpoint: $endpoint) { valid reason + level } } `; diff --git a/apps/indexer-coordinator/src/payg/payg.service.ts b/apps/indexer-coordinator/src/payg/payg.service.ts index 7d4605097..9ddc112ba 100644 --- a/apps/indexer-coordinator/src/payg/payg.service.ts +++ b/apps/indexer-coordinator/src/payg/payg.service.ts @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 import { Injectable, OnModuleInit } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; import { InjectRepository } from '@nestjs/typeorm'; import { StateChannel as StateChannelOnChain } from '@subql/contract-sdk'; import { bytes32ToCid } from '@subql/network-clients'; @@ -529,6 +530,9 @@ export class PaygService implements OnModuleInit { if (channel.onchain === channel.remote) { return channel; } + if (!channel.lastIndexerSign || !channel.lastConsumerSign) { + return channel; + } // terminate await this.contract.sendTransaction({ @@ -623,4 +627,22 @@ export class PaygService implements OnModuleInit { async getOpenChannels() { return await this.channelRepo.find({ where: { status: ChannelStatus.OPEN } }); } + + @Cron(CronExpression.EVERY_10_MINUTES) + async closeOutdatedAndNotExtended() { + const channels = await this.getOpenChannels(); + for (const c of channels) { + try { + const now = Math.floor(Date.now() / 1000); + if (c.expiredAt > now) { + continue; + } + await this.terminate(c.id); + } catch (e) { + logger.error( + `closeOutdatedAndNotExtended state channel(${c.id} ${c.deploymentId}) error: ${e.stack}` + ); + } + } + } } diff --git a/apps/indexer-coordinator/src/project/project.model.ts b/apps/indexer-coordinator/src/project/project.model.ts index 9ee446da6..871d56fde 100644 --- a/apps/indexer-coordinator/src/project/project.model.ts +++ b/apps/indexer-coordinator/src/project/project.model.ts @@ -48,6 +48,8 @@ export class ValidationResponse { valid: boolean; @Field() reason: string; + @Field({ nullable: true }) + level?: string; } @ObjectType('Metadata') diff --git a/apps/indexer-coordinator/src/project/project.rpc.service.ts b/apps/indexer-coordinator/src/project/project.rpc.service.ts index 929b19219..1eb56463a 100644 --- a/apps/indexer-coordinator/src/project/project.rpc.service.ts +++ b/apps/indexer-coordinator/src/project/project.rpc.service.ts @@ -1,12 +1,12 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: Apache-2.0 -import { Injectable } from '@nestjs/common'; +import { Injectable, OnModuleInit } from '@nestjs/common'; import { Cron } from '@nestjs/schedule'; import { InjectRepository } from '@nestjs/typeorm'; import { DesiredStatus } from 'src/core/types'; import { getLogger } from 'src/utils/logger'; -import { getDomain, getIpAddress, isIp, isPrivateIp } from 'src/utils/network'; +import { getDomain, getIpAddress, isIp, isPrivateIp, safeGetDomain } from 'src/utils/network'; import { Repository } from 'typeorm'; import { RpcManifest } from './project.manifest'; import { @@ -19,17 +19,51 @@ import { } from './project.model'; import { ProjectService } from './project.service'; import { RequiredRpcType, getRpcFamilyObject } from './rpc.factory'; -import { AccessType, ProjectType } from './types'; +import { + AccessType, + RpcEndpointType, + ErrorLevel, + ProjectType, + RpcEndpointAccessType, +} from './types'; const logger = getLogger('project.rpc.service'); @Injectable() -export class ProjectRpcService { +export class ProjectRpcService implements OnModuleInit { constructor( @InjectRepository(ProjectEntity) private projectRepo: Repository, private projectService: ProjectService ) {} + async onModuleInit() { + // await this.fillMetricsEndpoint(); + } + + async fillMetricsEndpoint() { + const projects = await this.projectRepo.find({ where: { projectType: ProjectType.RPC } }); + for (const p of projects) { + const manifest = p.manifest as RpcManifest; + const [filled, exists] = this.fillRpcEndpoints( + p.projectConfig.serviceEndpoints, + manifest.rpcFamily + ); + const target = exists || filled; + + if (target) { + let flag = false; + const found = p.serviceEndpoints.find((s) => s.key === target.key); + if (!found) { + flag = true; + p.serviceEndpoints.push(target); + } + if (flag || filled) { + await this.projectRepo.save(p); + } + } + } + } + @Cron('0 */8 * * * *') async autoValidateRpcEndpoints() { const projects = (await this.projectService.getAliveProjects()).filter( @@ -69,11 +103,13 @@ export class ProjectRpcService { const validateUrlResult = this.validateRpcEndpointsUrl(serviceEndpoints); serviceEndpoints = serviceEndpoints.filter((endpoint) => endpoint.value); let reason = ''; + let errorLevel = ErrorLevel.none; for (const endpoint of serviceEndpoints) { if (!validateUrlResult.valid) { endpoint.valid = false; endpoint.reason = validateUrlResult.reason; reason = reason || validateUrlResult.reason; + errorLevel = errorLevel || (validateUrlResult.level as ErrorLevel); continue; } const response = await this.validateRpcEndpoint(project.id, endpoint.key, endpoint.value); @@ -82,45 +118,63 @@ export class ProjectRpcService { `Project ${project.id} endpoint ${endpoint.key} is invalid: ${response.reason}` ); } + endpoint.valid = response.valid; endpoint.reason = response.reason; - reason = reason || response.reason; + + if (response.level !== ErrorLevel.none) { + if (errorLevel === ErrorLevel.error) { + continue; + } + errorLevel = response.level as ErrorLevel; + reason = response.reason; + } } - return this.formatResponse(!reason, reason); + return this.formatResponse(!reason, reason, errorLevel); } private validateRpcEndpointsUrl(serviceEndpoints: SeviceEndpoint[]): ValidationResponse { if (!serviceEndpoints || serviceEndpoints.length === 0) { - return this.formatResponse(false, 'No endpoints'); + return this.formatResponse(false, 'No endpoints', ErrorLevel.error); } - const rpcFamily = serviceEndpoints[0].key.replace(/(Http|Ws)$/, ''); - if (serviceEndpoints.length > 1) { - const rpcFamily2 = serviceEndpoints[1].key.replace(/(Http|Ws)$/, ''); + const rpcFamily = serviceEndpoints[0].key.replace(/(Http|Ws|MetricsHttp)$/, ''); + + for (let i = 1; i < serviceEndpoints.length; i++) { + const rpcFamily2 = serviceEndpoints[i].key.replace(/(Http|Ws|MetricsHttp)$/, ''); if (rpcFamily !== rpcFamily2) { - return this.formatResponse(false, 'Endpoints are not from the same rpc family'); + return this.formatResponse( + false, + 'Endpoints are not from the same rpc family', + ErrorLevel.error + ); } try { const host1 = new URL(serviceEndpoints[0].value).hostname; - const host2 = new URL(serviceEndpoints[1].value).hostname; + const host2 = new URL(serviceEndpoints[i].value).hostname; if (host1 !== host2) { - return this.formatResponse(false, 'Endpoints are not from the same host'); + return this.formatResponse( + false, + 'Endpoints are not from the same host', + ErrorLevel.error + ); } } catch (e) { - return this.formatResponse(false, 'Invalid url'); + return this.formatResponse(false, 'Invalid url', ErrorLevel.error); } } + for (const endpoint of serviceEndpoints) { if ( endpoint.key.endsWith('Http') && !(endpoint.value.startsWith('http://') || endpoint.value.startsWith('https://')) ) { - return this.formatResponse(false, 'Invalid http endpoint'); + return this.formatResponse(false, 'Invalid http endpoint', ErrorLevel.error); } if ( endpoint.key.endsWith('Ws') && !(endpoint.value.startsWith('ws://') || endpoint.value.startsWith('wss://')) ) { - return this.formatResponse(false, 'Invalid ws endpoint'); + return this.formatResponse(false, 'Invalid ws endpoint', ErrorLevel.error); } } return this.validateRequiredRpcType(rpcFamily, serviceEndpoints); @@ -134,12 +188,12 @@ export class ProjectRpcService { switch (rpcType) { case RequiredRpcType.http: if (!serviceEndpoints.find((endpoint) => endpoint.key.endsWith('Http'))) { - return this.formatResponse(false, 'Missing http endpoint'); + return this.formatResponse(false, 'Missing http endpoint', ErrorLevel.error); } break; case RequiredRpcType.ws: if (!serviceEndpoints.find((endpoint) => endpoint.key.endsWith('Ws'))) { - return this.formatResponse(false, 'Missing ws endpoint'); + return this.formatResponse(false, 'Missing ws endpoint', ErrorLevel.error); } break; case RequiredRpcType.any: @@ -148,7 +202,7 @@ export class ProjectRpcService { (endpoint) => endpoint.key.endsWith('Http') || endpoint.key.endsWith('Ws') ) ) { - return this.formatResponse(false, 'Missing http or ws endpoint'); + return this.formatResponse(false, 'Missing http or ws endpoint', ErrorLevel.error); } break; case RequiredRpcType.both: @@ -156,11 +210,11 @@ export class ProjectRpcService { !serviceEndpoints.find((endpoint) => endpoint.key.endsWith('Http')) || !serviceEndpoints.find((endpoint) => endpoint.key.endsWith('Ws')) ) { - return this.formatResponse(false, 'Missing http and ws endpoint'); + return this.formatResponse(false, 'Missing http and ws endpoint', ErrorLevel.error); } break; default: - return this.formatResponse(false, 'Unknown rpc type'); + return this.formatResponse(false, 'Unknown rpc type', ErrorLevel.error); } return this.formatResponse(true); } @@ -174,7 +228,7 @@ export class ProjectRpcService { try { const domain = getDomain(endpoint); if (!domain) { - return this.formatResponse(false, 'Invalid domain'); + return this.formatResponse(false, 'Invalid domain', ErrorLevel.error); } let ip: string; if (isIp(domain)) { @@ -183,14 +237,14 @@ export class ProjectRpcService { ip = await getIpAddress(domain); } if (!ip) { - return this.formatResponse(false, 'Invalid ip address'); + return this.formatResponse(false, 'Invalid ip address', ErrorLevel.error); } if (!isPrivateIp(ip)) { - return this.formatResponse(false, 'Endpoint is not private ip'); + return this.formatResponse(false, 'Endpoint is not private ip', ErrorLevel.error); } } catch (e) { logger.error(e); - return this.formatResponse(false, e.message); + return this.formatResponse(false, e.message, ErrorLevel.error); } // compare chain id, genesis hash, rpc family, client name and version, node type @@ -206,19 +260,21 @@ export class ProjectRpcService { .withChainId(projectManifest.chain?.chainId) .withGenesisHash(projectManifest.chain?.genesisHash) .withNodeType(projectManifest.nodeType) + .withHeight() .withClientNameAndVersion(projectManifest.client?.name, projectManifest.client?.version) - .validate(endpoint); + .validate(endpoint, endpointKey as RpcEndpointType); return this.formatResponse(true); } catch (e) { logger.debug(e); - return this.formatResponse(false, e.message); + return this.formatResponse(false, e.message, e.level || ErrorLevel.error); } } - private formatResponse(valid = false, reason = ''): ValidationResponse { + private formatResponse(valid = false, reason = '', level = ErrorLevel.none): ValidationResponse { return { valid, reason, + level, }; } @@ -236,20 +292,21 @@ export class ProjectRpcService { project.rateLimit = rateLimit; const manifest = project.manifest as RpcManifest; + // this.fillRpcEndpoints(projectConfig.serviceEndpoints, manifest.rpcFamily); + const endpointKeys = this.getAllEndpointKeys(manifest.rpcFamily || []); project.serviceEndpoints = projectConfig.serviceEndpoints.filter((endpoint) => { return endpointKeys.includes(endpoint.key); }); projectConfig.serviceEndpoints = project.serviceEndpoints; - const validateResult = await this.validateProjectEndpoints(project, project.serviceEndpoints); - if (!validateResult.valid) { + if (!validateResult.valid && validateResult.level === ErrorLevel.error) { throw new Error(`Invalid endpoints: ${validateResult.reason}`); } for (const endpoint of project.serviceEndpoints) { - endpoint.access = AccessType.DEFAULT; + endpoint.access = RpcEndpointAccessType[endpoint.key] || AccessType.DEFAULT; endpoint.isWebsocket = endpoint.key.endsWith('Ws'); endpoint.rpcFamily = manifest.rpcFamily || []; } @@ -257,6 +314,52 @@ export class ProjectRpcService { return this.projectRepo.save(project); } + fillRpcEndpoints( + serviceEndpoints: SeviceEndpoint[], + rpcFamilyList: string[] + ): [filled?: SeviceEndpoint, exists?: SeviceEndpoint] { + if (!serviceEndpoints.length) return []; + let targetKey = ''; + let defaultSuffix = ''; + if (rpcFamilyList.includes('evm')) { + targetKey = RpcEndpointType.evmMetricsHttp; + defaultSuffix = ':6060/debug/metrics'; + } else if (rpcFamilyList.includes('polkadot')) { + targetKey = RpcEndpointType.polkadotMetricsHttp; + defaultSuffix = ':9615/metrics'; + } + if (!targetKey) return []; + + let exists; + let value = ''; + for (const e of serviceEndpoints) { + if (e.key === targetKey) { + exists = e; + } + if (e.value) { + value = e.value; + } + } + if (exists) { + return [undefined, exists]; + } + + let res: SeviceEndpoint | undefined; + if (value) { + const domain = safeGetDomain(value); + if (!domain) return []; + res = new SeviceEndpoint( + targetKey, + `http://${domain}${defaultSuffix}`, + RpcEndpointAccessType[targetKey] || AccessType.DEFAULT + ); + res.isWebsocket = res.key.endsWith('Ws'); + res.rpcFamily = rpcFamilyList; + serviceEndpoints.push(res); + } + return [res, undefined]; + } + async stopRpcProject(id: string): Promise { const project = await this.projectService.getProject(id); if (!project) { diff --git a/apps/indexer-coordinator/src/project/rpc.factory.ts b/apps/indexer-coordinator/src/project/rpc.factory.ts index b18e6e5bb..a4163f2af 100644 --- a/apps/indexer-coordinator/src/project/rpc.factory.ts +++ b/apps/indexer-coordinator/src/project/rpc.factory.ts @@ -5,8 +5,10 @@ import axios from 'axios'; import { BigNumber } from 'ethers'; import _ from 'lodash'; import * as semver from 'semver'; +import { safeJSONParse } from 'src/utils/json'; import { getLogger } from 'src/utils/logger'; import { WebSocket } from 'ws'; +import { RpcEndpointType, ErrorLevel, ValidateRpcEndpointError } from './types'; const logger = getLogger('rpc.factory'); @@ -98,6 +100,39 @@ async function jsonWsRpcRequest(endpoint: string, method: string, params: any[]) } } +async function jsonMetricsHttpRpcRequest(endpoint: string): Promise<{ error: string; data: any }> { + if (!endpoint) { + return { + error: 'Endpoint is empty', + data: null, + }; + } + try { + const res = await axios.request({ + url: endpoint, + method: 'get', + timeout: 1000 * 10, + }); + return { + error: '', + data: res.data, + }; + } catch (err) { + return { + error: err.message, + data: null, + }; + } + // if (!endpoint) { + // throw new Error('Endpoint is empty'); + // } + // return axios.request({ + // url: endpoint, + // method: 'get', + // timeout: 1000 * 10, + // }); +} + function getRpcRequestFunction(endpoint: string) { if (!endpoint) { throw new Error('Endpoint is empty'); @@ -119,7 +154,8 @@ export interface IRpcFamily { withNodeType(nodeType: string): IRpcFamily; withClientNameAndVersion(clientName: string, clientVersion: string): IRpcFamily; withClientVersion(clientVersion: string): IRpcFamily; - validate(endpoint: string): Promise; + withHeight(height?: number): IRpcFamily; + validate(endpoint: string, endpointKey: string): Promise; getStartHeight(endpoint: string): Promise; getTargetHeight(endpoint: string): Promise; getLastHeight(endpoint: string): Promise; @@ -130,9 +166,12 @@ abstract class RpcFamily implements IRpcFamily { protected actions: (() => Promise)[] = []; protected endpoint: string; protected requiredRpcType: RequiredRpcType = RequiredRpcType.http; + protected targetEndpointKey: RpcEndpointType; - async validate(endpoint: string) { + async validate(endpoint: string, endpointKey: RpcEndpointType) { this.endpoint = endpoint; + this.targetEndpointKey = endpointKey; + await Promise.all(this.actions.map((action) => action())); } getEndpointKeys(): string[] { @@ -156,6 +195,9 @@ abstract class RpcFamily implements IRpcFamily { withClientVersion(clientVersion: string): IRpcFamily { throw new Error('Method not implemented.'); } + withHeight(height?: number): IRpcFamily { + throw new Error('Method not implemented.'); + } getStartHeight(endpoint: string): Promise { throw new Error('Method not implemented.'); } @@ -172,21 +214,55 @@ abstract class RpcFamily implements IRpcFamily { export class RpcFamilyEvm extends RpcFamily { getEndpointKeys(): string[] { - return ['evmWs', 'evmHttp']; + return [RpcEndpointType.evmWs, RpcEndpointType.evmHttp, RpcEndpointType.evmMetricsHttp]; } withChainId(chainId: string): IRpcFamily { this.actions.push(async () => { - const result = await getRpcRequestFunction(this.endpoint)(this.endpoint, 'eth_chainId', []); - if (result.data.error) { - throw new Error(`Request eth_chainId failed: ${result.data.error.message}`); + let p = null; + let errorLevel = ErrorLevel.error; + switch (this.targetEndpointKey) { + case RpcEndpointType.evmHttp: + p = jsonRpcRequest(this.endpoint, 'eth_chainId', []); + break; + case RpcEndpointType.evmWs: + p = jsonWsRpcRequest(this.endpoint, 'eth_chainId', []); + break; + case RpcEndpointType.evmMetricsHttp: + p = jsonMetricsHttpRpcRequest(this.endpoint); + errorLevel = ErrorLevel.warn; + break; + default: + throw new ValidateRpcEndpointError('Invalid endpointKey', errorLevel); } - const chainIdFromRpc = result.data.result; + const result = await p; + let chainIdFromRpc = null; + + if (this.targetEndpointKey === RpcEndpointType.evmMetricsHttp) { + if (result.error) { + throw new ValidateRpcEndpointError( + `Request eth_chainId failed: ${result.error}`, + errorLevel + ); + } + const info = safeJSONParse(result.data['chain/info']); + chainIdFromRpc = info?.chain_id; + } else { + if (result.data.error) { + throw new ValidateRpcEndpointError( + `Request eth_chainId failed: ${result.data.error.message}`, + errorLevel + ); + } + chainIdFromRpc = result.data.result; + } + if (!BigNumber.from(chainIdFromRpc).eq(BigNumber.from(chainId || 0))) { - throw new Error( + throw new ValidateRpcEndpointError( `ChainId mismatch: ${BigNumber.from(chainIdFromRpc).toString()} != ${BigNumber.from( chainId - ).toString()}` + ).toString()}`, + errorLevel ); } }); @@ -195,6 +271,10 @@ export class RpcFamilyEvm extends RpcFamily { withGenesisHash(genesisHash: string): IRpcFamily { this.actions.push(async () => { + if (this.targetEndpointKey === RpcEndpointType.evmMetricsHttp) { + return; + } + const result = await getRpcRequestFunction(this.endpoint)( this.endpoint, 'eth_getBlockByNumber', @@ -216,6 +296,9 @@ export class RpcFamilyEvm extends RpcFamily { withNodeType(nodeType: string): IRpcFamily { this.actions.push(async () => { + if (this.targetEndpointKey === RpcEndpointType.evmMetricsHttp) { + return; + } const result = await getRpcRequestFunction(this.endpoint)(this.endpoint, 'eth_getBalance', [ '0x0000000000000000000000000000000000000000', _.toLower(nodeType) === 'archive' ? '0x1' : 'latest', @@ -232,6 +315,9 @@ export class RpcFamilyEvm extends RpcFamily { if (!clientName && !clientVersion) { return; } + if (this.targetEndpointKey === RpcEndpointType.evmMetricsHttp) { + return; + } const result = await getRpcRequestFunction(this.endpoint)( this.endpoint, 'web3_clientVersion', @@ -256,6 +342,10 @@ export class RpcFamilyEvm extends RpcFamily { return this; } + withHeight(height?: number): IRpcFamily { + return this; + } + async getStartHeight(endpoint: string): Promise { const result = await getRpcRequestFunction(endpoint)(endpoint, 'eth_syncing', []); if (result.data.error) { @@ -318,7 +408,7 @@ export class RpcFamilySubstrate extends RpcFamily { } getEndpointKeys(): string[] { - return ['substrateWs', 'substrateHttp']; + return [RpcEndpointType.substrateWs, RpcEndpointType.substrateHttp]; } withChainId(chainId: string): IRpcFamily { @@ -327,6 +417,9 @@ export class RpcFamilySubstrate extends RpcFamily { withGenesisHash(genesisHash: string): IRpcFamily { this.actions.push(async () => { + if (this.targetEndpointKey === RpcEndpointType.polkadotMetricsHttp) { + return this; + } const result = await getRpcRequestFunction(this.endpoint)( this.endpoint, 'chain_getBlockHash', @@ -348,6 +441,9 @@ export class RpcFamilySubstrate extends RpcFamily { withNodeType(nodeType: string): IRpcFamily { this.actions.push(async () => { + if (this.targetEndpointKey === RpcEndpointType.polkadotMetricsHttp) { + return this; + } if (_.toLower(nodeType) !== 'archive') { return; } @@ -381,6 +477,10 @@ export class RpcFamilySubstrate extends RpcFamily { return this; } + withHeight(height?: number): IRpcFamily { + return this; + } + async getStartHeight(endpoint: string): Promise { if (this.startHeight) { return Promise.resolve(this.startHeight); @@ -429,6 +529,51 @@ export class RpcFamilySubstrate extends RpcFamily { export class RpcFamilyPolkadot extends RpcFamilySubstrate { getEndpointKeys(): string[] { - return ['polkadotWs', 'polkadotHttp']; + return [ + RpcEndpointType.polkadotWs, + RpcEndpointType.polkadotHttp, + RpcEndpointType.polkadotMetricsHttp, + ]; + } + + withHeight(height?: number): IRpcFamily { + this.actions.push(async () => { + if (this.targetEndpointKey === RpcEndpointType.polkadotMetricsHttp) { + const result = await jsonMetricsHttpRpcRequest(this.endpoint); + if (result.error) { + throw new ValidateRpcEndpointError( + `Request metrics failed: ${result.error}`, + ErrorLevel.warn + ); + } + const height = this.parseBestBlockHeight(result.data); + if (!Number(height)) { + throw new ValidateRpcEndpointError( + `parse metrics height fail. current: ${height}`, + ErrorLevel.warn + ); + } + } + }); + return this; + } + + parseBestBlockHeight(metrics: string) { + for (const line of metrics.split('\n')) { + if (line.startsWith('substrate_block_height')) { + const match = line.slice(22).match(/\{(.*)\}/); + if (match) { + const jsonObject: { [key: string]: string } = {}; + for (const pair of match[1].split(',')) { + const [key, value] = pair.split('='); + jsonObject[key.trim()] = value.replace(/"/g, '').trim(); + } + if (jsonObject.status === 'best') { + return line.split(/\s+/).pop(); + } + } + } + } + return ''; } } diff --git a/apps/indexer-coordinator/src/project/types.ts b/apps/indexer-coordinator/src/project/types.ts index 47143b5cc..bae19b827 100644 --- a/apps/indexer-coordinator/src/project/types.ts +++ b/apps/indexer-coordinator/src/project/types.ts @@ -68,6 +68,24 @@ export const SubgraphEndpointAccessType = { [SubgraphEndpointType.MetricsEndpoint]: AccessType.INTERNAL, }; +export enum RpcEndpointType { + evmHttp = 'evmHttp', + evmWs = 'evmWs', + evmMetricsHttp = 'evmMetricsHttp', + + polkadotWs = 'polkadotWs', + polkadotHttp = 'polkadotHttp', + polkadotMetricsHttp = 'polkadotMetricsHttp', + + substrateWs = 'substrateWs', + substrateHttp = 'substrateHttp', +} + +export const RpcEndpointAccessType = { + [RpcEndpointType.evmMetricsHttp]: AccessType.INTERNAL, + [RpcEndpointType.polkadotMetricsHttp]: AccessType.INTERNAL, +}; + @InputType('SubgraphPort') @ObjectType('SubgraphPort') export class SubgraphPort { @@ -126,3 +144,18 @@ export type TemplateType = { pgKey?: string; pgCert?: string; }; + +export enum ErrorLevel { + none = '', + warn = 'warn', + error = 'error', +} + +export class ValidateRpcEndpointError extends Error { + level: string; + constructor(message: string, level: string = ErrorLevel.none) { + super(message); + this.name = 'ValidateRpcEndpointError'; + this.level = level; + } +} diff --git a/apps/indexer-coordinator/src/project/validator/common.validator.ts b/apps/indexer-coordinator/src/project/validator/common.validator.ts index 48ef9ff46..b4c2e809d 100644 --- a/apps/indexer-coordinator/src/project/validator/common.validator.ts +++ b/apps/indexer-coordinator/src/project/validator/common.validator.ts @@ -4,6 +4,7 @@ import { getDomain, getIpAddress, isIp, isPrivateIp } from 'src/utils/network'; import { getLogger } from '../../utils/logger'; import { ValidationResponse } from '../project.model'; +import { ErrorLevel } from '../types'; const logger = getLogger('common.validator'); @@ -11,7 +12,7 @@ export async function validatePrivateEndpoint(endpoint: string): Promise = option_env!("SECRETS_SENTRY_DSN"); - let log_filter = if debug { Level::DEBUG } else { Level::WARN }; - tracing_subscriber::fmt().with_max_level(log_filter).init(); +fn main() { + if let Some(sentry_dsn) = GITHUB_SENTRY_DSN { + let sentry_option = sentry::ClientOptions { + before_send: Some(Arc::new(Box::new(before_send))), + release: sentry::release_name!(), + debug: false, + auto_session_tracking: true, + attach_stacktrace: true, + ..Default::default() + }; - cli::init_redis().await; + let _sentry = sentry::init((sentry_dsn, sentry_option)); - subscriber::subscribe(); - monitor::listen(); - p2p::listen(); - metrics::listen(); - whitelist::listen(); + start_tokio_main(); + } else { + start_tokio_main(); + } +} + +fn start_tokio_main() { + let body = async { + let port = COMMAND.port(); + let debug = COMMAND.debug(); + + let log_filter = if debug { Level::DEBUG } else { Level::WARN }; + tracing_subscriber::fmt().with_max_level(log_filter).init(); + + cli::init_redis().await; + + subscriber::subscribe(); + monitor::listen(); + p2p::listen(); + metrics::listen(); + whitelist::listen(); + + tokio::spawn(check_sentry_status()); + + server::start_server(port).await; + }; + + #[allow(clippy::expect_used, clippy::diverging_sub_expression)] + { + return tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("Failed building the Runtime") + .block_on(body); + } +} + +async fn check_sentry_status() { + if let Some(sentry_dsn) = GITHUB_SENTRY_DSN { + if sentry_dsn.len() > 20 { + info!( + "sentry is enabled, sentry_dsn top 20 characters is {}", + &sentry_dsn[0..20] + ); + } else { + info!("sentry is enabled, sentry_dsn is {}", sentry_dsn); + } + // loop { + // tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + // let sentry_msg = format!( + // "ep_query_handler, not inline or wrapped, ep_name: {} ||| sabc", + // "test_end_point" + // ); + // sentry::capture_message(&sentry_msg, sentry::Level::Error); - server::start_server(port).await; + // tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + // let maybe_number: Result = Err("This will crash"); + // let _number = maybe_number.unwrap(); // This will panic + // } + } else { + info!("sentry is disabled"); + } } diff --git a/apps/indexer-proxy/proxy/src/payg.rs b/apps/indexer-proxy/proxy/src/payg.rs index c3682b4d2..e185e1de4 100644 --- a/apps/indexer-proxy/proxy/src/payg.rs +++ b/apps/indexer-proxy/proxy/src/payg.rs @@ -52,9 +52,10 @@ use crate::contracts::{ use crate::metrics::{MetricsNetwork, MetricsQuery}; use crate::p2p::report_conflict; use crate::project::{get_project, list_projects, Project}; - +use crate::sentry_log::make_sentry_message; const CURRENT_VERSION: u8 = 3; +#[derive(Debug)] pub struct StateCache { pub expiration: i64, pub agent: Address, @@ -142,6 +143,7 @@ impl StateCache { } /// Supported consumer type. +#[derive(Debug)] pub enum ConsumerType { /// real account Account(Vec
), @@ -584,19 +586,44 @@ pub fn check_multiple_state_balance( let remote_prev = state_cache.remote; let used_amount = price * unit_times; - let local_next = state_cache.spent + used_amount; + let (local_next, flag) = state_cache.spent.overflowing_add(used_amount); + if flag { + make_sentry_message("overflowing_add used_amount overflow", + &format!("state_cache is {:#?}, used_amount is {:#?}, state_cache: {:#?}, unit_times: {}, start: {:#?}, end: {:#?}", state_cache, used_amount, state_cache, + unit_times, + start, + end)); + return Err(Error::Overflow(1058)); + } if local_next > total { // overflow the total return Err(Error::Overflow(1056)); } - let range = end - start; + let (range, flag) = end.overflowing_sub(start); + if flag { + make_sentry_message("overflowing_sub start overflow", + &format!("start is {:#?}, end is {:#?}, state_cache: {:#?}, unit_times: {}, start: {:#?}, end: {:#?}", start, end, state_cache, + unit_times, + start, + end)); + return Err(Error::Overflow(1058)); + } + if range > MULTIPLE_RANGE_MAX { return Err(Error::Overflow(1059)); } - let middle = start + range / 2; + let (middle, flag) = start.overflowing_add(range / 2); + if flag { + make_sentry_message("overflowing_add range overflow", + &format!("start is {:#?}, range is {:#?}, state_cache: {:#?}, unit_times: {}, start: {:#?}, end: {:#?}", start, range, state_cache, + unit_times, + start, + end)); + return Err(Error::Overflow(1058)); + } let mut mpqsa = if local_next < middle { MultipleQueryStateActive::Active } else if local_next > end { @@ -741,16 +768,31 @@ pub async fn extend_channel( // send to coordinator let expired_at = expired + expiration as i64; + let account = ACCOUNT.read().await; + let indexer_sign = extend_sign2( + channel_id, + indexer, + state_cache.agent, + new_price, + U256::from(expired), + U256::from(expiration), + &account.controller, + ) + .await?; + drop(account); + let indexer_sign = convert_sign_to_string(&indexer_sign); let mdata = format!( r#"mutation {{ channelExtend( id:"{:#X}", expiration:{}, price:"{}", - ) + indexerSign:"0x{}", + consumerSign:"0x{}" + ) {{ id, expiredAt }} }}"#, - channel_id, expired_at, new_price, + channel_id, expired_at, new_price, indexer_sign, signature ); let url = COMMAND.graphql_url(); let query = GraphQLQuery::query(&mdata); @@ -763,20 +805,7 @@ pub async fn extend_channel( return Err(Error::ServiceException(1202)); } - let account = ACCOUNT.read().await; - let indexer_sign = extend_sign2( - channel_id, - indexer, - state_cache.agent, - new_price, - U256::from(expired), - U256::from(expiration), - &account.controller, - ) - .await?; - drop(account); - - Ok(convert_sign_to_string(&indexer_sign)) + Ok(indexer_sign) } pub async fn pay_channel(mut state: QueryState) -> Result { diff --git a/apps/indexer-proxy/proxy/src/project.rs b/apps/indexer-proxy/proxy/src/project.rs index 8681af8a0..da68ec850 100644 --- a/apps/indexer-proxy/proxy/src/project.rs +++ b/apps/indexer-proxy/proxy/src/project.rs @@ -280,7 +280,7 @@ impl Project { } } - pub fn endpoint<'a>(&'a self, ep_name: &str, no_internal: bool) -> Result<&Endpoint> { + pub fn endpoint<'a>(&'a self, ep_name: &str, no_internal: bool) -> Result<&'a Endpoint> { if let Some(end) = self.endpoints.get(ep_name) { if no_internal && end.is_internal { Err(Error::InvalidServiceEndpoint(1037)) @@ -295,7 +295,7 @@ impl Project { pub fn is_rpc_project(&self) -> bool { matches!( self.ptype, - ProjectType::RpcEvm(_) | ProjectType::RpcSubstrate(_) | ProjectType::Subgraph + ProjectType::RpcEvm(_) | ProjectType::RpcSubstrate(_) ) } diff --git a/apps/indexer-proxy/proxy/src/sentry_log.rs b/apps/indexer-proxy/proxy/src/sentry_log.rs new file mode 100644 index 000000000..fbcc0cc51 --- /dev/null +++ b/apps/indexer-proxy/proxy/src/sentry_log.rs @@ -0,0 +1,72 @@ +// This file is part of SubQuery. + +// Copyright (C) 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::COMMAND; +use crate::GITHUB_SENTRY_DSN; +use cached::{stores::TimedSizedCache, Cached}; +use once_cell::sync::Lazy; +use sentry::{protocol::Event, types::protocol::v7::Exception}; +use std::sync::Mutex; + +static GLOBAL_MSG_SET: Lazy>> = + Lazy::new(|| Mutex::new(TimedSizedCache::with_size_and_lifespan(1000, 600))); +static GLOBAL_HASH_SET: Lazy>> = + Lazy::new(|| Mutex::new(TimedSizedCache::with_size_and_lifespan(1000, 600))); + +pub fn make_sentry_message(unique_title: &str, msg: &str) { + if GITHUB_SENTRY_DSN.is_some() { + let sentry_msg = format!("{} ||| {}", unique_title, msg); + sentry::capture_message(&sentry_msg, sentry::Level::Error); + } +} + +pub fn before_send(mut event: Event<'static>) -> Option> { + if let Some(ref message) = event.message { + let mut msg_set = GLOBAL_MSG_SET.lock().unwrap(); + let first_part = message.split("|||").next().unwrap_or(message).trim(); + if !msg_set.cache_get(first_part).is_some() { + msg_set.cache_set(first_part.to_string(), ()); + drop(msg_set); + add_event_extra_info(&mut event); + return Some(event); + } + } + if !event.exception.values.is_empty() { + if let Some(exception_str) = get_first_value_of_exception(&event.exception.values) { + let mut hash_set = GLOBAL_HASH_SET.lock().unwrap(); + if !hash_set.cache_get(exception_str).is_some() { + hash_set.cache_set(exception_str.clone(), ()); + drop(hash_set); + add_event_extra_info(&mut event); + return Some(event); + } + } + } + None +} + +fn add_event_extra_info(event: &mut Event<'static>) { + let network_name = format!("{:#?}", COMMAND.network()); + event + .extra + .insert("network".to_string(), network_name.into()); +} + +fn get_first_value_of_exception(values: &[Exception]) -> &Option { + &values[0].value +} diff --git a/apps/indexer-proxy/proxy/src/server.rs b/apps/indexer-proxy/proxy/src/server.rs index 67b8ebf46..d47fdaf06 100644 --- a/apps/indexer-proxy/proxy/src/server.rs +++ b/apps/indexer-proxy/proxy/src/server.rs @@ -53,6 +53,7 @@ use crate::payg::{ query_multiple_state, query_single_state, AuthPayg, }; use crate::project::get_project; +use crate::sentry_log::make_sentry_message; use crate::websocket::{connect_to_project_ws, handle_websocket, validate_project, QueryType}; use crate::{ account::{get_indexer, indexer_healthy}, @@ -318,7 +319,7 @@ async fn ep_query_handler( } let (data, signature, limit) = project .check_query( - body, + body.clone(), endpoint.endpoint.clone(), MetricsQuery::CloseAgreement, MetricsNetwork::HTTP, @@ -329,13 +330,29 @@ async fn ep_query_handler( .await?; let (body, mut headers) = match res_fmt.to_str() { - Ok("inline") => ( - String::from_utf8(data).unwrap_or("".to_owned()), - vec![ - ("X-Indexer-Sig", signature.as_str()), - ("X-Indexer-Response-Format", "inline"), - ], - ), + Ok("inline") => { + let return_body = if let Ok(return_data) = String::from_utf8(data.clone()) { + return_data + } else { + let unique_title = format!( + "ep_query_handler, inline returns empty, deployment_id: {}, ep_name: {}", + deployment_id, ep_name + ); + let msg = format!( + "res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:?}", + res_fmt, headers, body, data + ); + make_sentry_message(&unique_title, &msg); + "".to_owned() + }; + ( + return_body, + vec![ + ("X-Indexer-Sig", signature.as_str()), + ("X-Indexer-Response-Format", "inline"), + ], + ) + } Ok("wrapped") => ( serde_json::to_string(&json!({ "result": general_purpose::STANDARD.encode(&data), @@ -344,7 +361,18 @@ async fn ep_query_handler( .unwrap_or("".to_owned()), vec![("X-Indexer-Response-Format", "wrapped")], ), - _ => ("".to_owned(), vec![]), + _ => { + let unique_title = format!( + "ep_query_handler, not inline or wrapped, deployment_id: {}, ep_name: {}", + deployment_id, ep_name + ); + let msg = format!( + "res_fmt: {:#?}, headers: {:#?}, body: {}", + res_fmt, headers, body + ); + make_sentry_message(&unique_title, &msg); + ("".to_owned(), vec![]) + } }; headers.push(("Content-Type", "application/json")); headers.push(("Access-Control-Max-Age", "600")); @@ -464,7 +492,7 @@ async fn ep_payg_handler( }; match query_multiple_state( &deployment, - body, + body.clone(), endpoint.endpoint.clone(), state, MetricsNetwork::HTTP, @@ -483,7 +511,7 @@ async fn ep_payg_handler( }; match query_single_state( &deployment, - body, + body.clone(), endpoint.endpoint.clone(), state, MetricsNetwork::HTTP, @@ -498,14 +526,41 @@ async fn ep_payg_handler( }; let (body, mut headers) = match res_fmt.to_str() { - Ok("inline") => ( - String::from_utf8(data).unwrap_or("".to_owned()), - vec![ - ("X-Indexer-Sig", signature.as_str()), - ("X-Channel-State", state_data.as_str()), - ("X-Indexer-Response-Format", "inline"), - ], - ), + Ok("inline") => { + let return_body = if let Ok(return_data) = String::from_utf8(data.clone()) { + if return_data.is_empty() { + let unique_title = format!( + "payg ep_query_handler, inline returns empty, because endpoint returns empty, deployment_id: {}, ep_name: {}", + deployment, ep_name + ); + let msg = format!( + "res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:?}", + res_fmt, headers, body, data + ); + make_sentry_message(&unique_title, &msg); + } + return_data + } else { + let unique_title = format!( + "payg ep_query_handler, inline returns empty, deployment_id: {}, ep_name: {}", + deployment, ep_name + ); + let msg = format!( + "res_fmt: {:#?}, headers: {:#?}, body: {}, data: {:?}", + res_fmt, headers, body, data + ); + make_sentry_message(&unique_title, &msg); + "".to_owned() + }; + ( + return_body, + vec![ + ("X-Indexer-Sig", signature.as_str()), + ("X-Channel-State", state_data.as_str()), + ("X-Indexer-Response-Format", "inline"), + ], + ) + } // `wrapped` or other res format _ => ( serde_json::to_string(&json!({ diff --git a/apps/indexer-proxy/utils/Cargo.toml b/apps/indexer-proxy/utils/Cargo.toml index 3a0df0c91..94adfd5c5 100644 --- a/apps/indexer-proxy/utils/Cargo.toml +++ b/apps/indexer-proxy/utils/Cargo.toml @@ -1,24 +1,24 @@ [package] name = "subql-indexer-utils" -version = "2.1.0" +version = "2.2.0" edition = "2021" [dependencies] -subql-contracts = { git = "https://github.com/subquery/network-contracts", tag = "v1.3.0" } axum = "0.7" +base64 = "0.22" bincode = "1.3" -blake3="1.3" +blake3 = "1.3" bs58 = "0.5" -base64 = "0.22" +ethereum-types = "0.15" +ethers = { git = "https://github.com/gakonst/ethers-rs.git", tag = "ethers-v2.0.7" } hex = "0.4" +http = "1.1.0" +once_cell = "1.12" rand_chacha = "0.3" reqwest = { version = "0.12", features = ["json"] } rustc-hex = "2.1" -once_cell = "1.12" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -serde_with={ version = "3.0", features = ["json"] } +serde_with ={ version = "3.0", features = ["json"] } +subql-contracts = { git = "https://github.com/subquery/network-contracts", tag = "v1.4.0" } uint = "0.10" -ethers = { git = "https://github.com/gakonst/ethers-rs.git", tag = "ethers-v2.0.7" } -ethereum-types = "0.15" -http = "1.1.0"