Skip to content

Commit

Permalink
Fix relay and update to new contract (#21)
Browse files Browse the repository at this point in the history
* clean fixed gas limit

* fix relayer relay

* helpful log

* fix index 100 records limit

* fix syntax

* fix ci

* fix index query

* fix relay
  • Loading branch information
fewensa authored Nov 2, 2023
1 parent 945fb4b commit 3dc3969
Show file tree
Hide file tree
Showing 12 changed files with 303 additions and 536 deletions.
16 changes: 14 additions & 2 deletions packages/indexer/src/thegraph/oracle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ export class ThegraphIndexerOracle extends GraphCommon {

public async allAssignedList(): Promise<OrmpOracleAssigned[]> {
const query = `
query QueryNextOracleAssignedList {
query QueryNextOracleAssignedList($skip: Int!) {
ormpOracleAssigneds(
skip: $skip
orderBy: seq
orderDirection: asc
) {
Expand All @@ -21,7 +22,18 @@ export class ThegraphIndexerOracle extends GraphCommon {
}
}
`;
return await super.list({query, schema: 'ormpOracleAssigneds'});
const assignedList: OrmpOracleAssigned[] = [];
let skip = 0;
while (true) {
const variables = {skip};
const parts: OrmpOracleAssigned[] = await super.list({query, variables, schema: 'ormpOracleAssigneds'});
const length = parts.length;
if (length == 0) {
return assignedList;
}
assignedList.push(...parts);
skip += length;
}
}

public async inspectAssigned(variables: QueryNextOracleAssigned): Promise<OrmpOracleAssigned | undefined> {
Expand Down
98 changes: 75 additions & 23 deletions packages/indexer/src/thegraph/ormp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ import {
OrmpMessageAccepted,
OrmpMessageDispatched,
QueryChannelMessageAccepted,
QueryInspectMessageDispatched, QueryMessageAcceptedListByHashes,
QueryInspectMessageDispatched,
QueryMessageAcceptedListByHashes,
QueryMessageHashes,
QueryNextMessageAccepted,
QueryPreparedMessages
} from "../types/graph";
import {GraphCommon} from "./_common";
import {CollectionKit} from "../toolkit/collection";

export class ThegraphIndexOrmp extends GraphCommon {

Expand Down Expand Up @@ -41,6 +43,7 @@ export class ThegraphIndexOrmp extends GraphCommon {
message_from
message_toChainId
message_to
message_gasLimit
message_encoded
}
}
Expand All @@ -49,6 +52,7 @@ export class ThegraphIndexOrmp extends GraphCommon {
}

public async queryMessageAcceptedListByHashes(variables: QueryMessageAcceptedListByHashes): Promise<OrmpMessageAccepted[]> {
const msgHashesParts: string[][] = CollectionKit.split(variables.msgHashes, 100);
const query = `
query QueryMessageAcceptedList($msgHashes: [String!]!) {
ormpProtocolMessageAccepteds(
Expand All @@ -71,17 +75,29 @@ export class ThegraphIndexOrmp extends GraphCommon {
message_from
message_toChainId
message_to
message_gasLimit
message_encoded
}
}
`;
return await super.list({query, variables, schema: 'ormpProtocolMessageAccepteds'});
const rets: OrmpMessageAccepted[] = [];
for (const parts of msgHashesParts) {
const _variables = {msgHashes: parts};
const pickedAssignedMessages: OrmpMessageAccepted[] = await super.list({
query,
variables: _variables,
schema: 'ormpProtocolMessageAccepteds',
});
rets.push(...pickedAssignedMessages);
}
return rets;
}

public async messageHashes(variables: QueryMessageHashes): Promise<string[]> {
const query = `
query QueryMessageAcceptedHashes($messageIndex: BigInt!) {
query QueryMessageAcceptedHashes($skip: Int!, $messageIndex: BigInt!) {
ormpProtocolMessageAccepteds(
skip: $skip
orderBy: message_index
orderDirection: asc
where: {
Expand All @@ -93,12 +109,26 @@ export class ThegraphIndexOrmp extends GraphCommon {
}
}
`;
const resp: OrmpMessageAccepted[] = await super.list({
query,
variables,
schema: 'ormpProtocolMessageAccepteds',
});
return resp.map(item => item.msgHash)
let skip = 0;
const rets: string[] = [];
while (true) {
const _variables = {
...variables,
skip,
};
const parts: OrmpMessageAccepted[] = await super.list({
query,
variables: _variables,
schema: 'ormpProtocolMessageAccepteds',
});
const length = parts.length;
if (length == 0) {
return rets;
}
const hashes = parts.map(item => item.msgHash);
rets.push(...hashes);
skip += length;
}
}

public async nextMessageAccepted(variables: QueryNextMessageAccepted): Promise<OrmpMessageAccepted | undefined> {
Expand All @@ -125,6 +155,7 @@ export class ThegraphIndexOrmp extends GraphCommon {
message_from
message_toChainId
message_to
message_gasLimit
message_encoded
}
}
Expand Down Expand Up @@ -158,8 +189,9 @@ export class ThegraphIndexOrmp extends GraphCommon {

public async queryPreparedMessageAcceptedHashes(variables: QueryPreparedMessages): Promise<string[]> {
const query = `
query QueryNextMessageAccepted($messageIndex: BigInt!) {
query QueryNextMessageAccepted($skip: Int!, $messageIndex: BigInt!) {
ormpProtocolMessageAccepteds(
skip: $skip
orderBy: message_index
orderDirection: asc
where: {
Expand All @@ -170,18 +202,34 @@ export class ThegraphIndexOrmp extends GraphCommon {
}
}
`;
const preparedMessageAcceptedHashes: OrmpMessageAccepted[] = await super.list({
query,
variables,
schema: 'ormpProtocolMessageAccepteds',
});
return preparedMessageAcceptedHashes.map(item => item.msgHash);

const rets: string[] = [];
let skip = 0;
while (true) {
const _variable = {
...variables,
skip,
};
const parts: OrmpMessageAccepted[] = await super.list({
query,
variables: _variable,
schema: 'ormpProtocolMessageAccepteds',
});
const length = parts.length;
if (length == 0) {
return rets;
}
const hashes = parts.map(item => item.msgHash);
rets.push(...hashes);
skip += length;
}
}

public async pickUnRelayedMessageHashes(msgHashes: string[]): Promise<string[]> {
if (!msgHashes.length) {
return [];
}
const msgHashesParts: string[][] = CollectionKit.split(msgHashes, 100);
const query = `
query QueryLastMessageDispatched($msgHashes: [String!]!) {
ormpProtocolMessageDispatcheds(
Expand All @@ -195,13 +243,17 @@ export class ThegraphIndexOrmp extends GraphCommon {
}
}
`;
const variables = {msgHashes};
const unRelayMessages: OrmpMessageDispatched[] = await super.list({
query,
variables,
schema: 'ormpProtocolMessageDispatcheds',
});
const unRelayMessageHashes = unRelayMessages.map(item => item.msgHash);
const unRelayMessageHashes: string[] = [];
for (const parts of msgHashesParts) {
const variables = {msgHashes: parts};
const unRelayMessages: OrmpMessageDispatched[] = await super.list({
query,
variables,
schema: 'ormpProtocolMessageDispatcheds',
});
const hashes = unRelayMessages.map(item => item.msgHash);
unRelayMessageHashes.push(...hashes);
}
return msgHashes.filter(item => unRelayMessageHashes.indexOf(item) == -1);
}

Expand Down
38 changes: 29 additions & 9 deletions packages/indexer/src/thegraph/relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ import {
QueryNextRelayerAssigned,
} from "../types/graph";
import {GraphCommon} from "./_common";
import {CollectionKit} from "../toolkit/collection";

export class ThegraphIndexerRelayer extends GraphCommon {

public async allAssignedList(): Promise<OrmpRelayerAssigned[]> {
const query = `
query QueryRelayerAssignedList {
query QueryRelayerAssignedList($skip: Int!) {
ormpRelayerAssigneds(
skip: $skip
orderBy: seq
orderDirection: asc
) {
Expand All @@ -24,7 +26,18 @@ export class ThegraphIndexerRelayer extends GraphCommon {
}
}
`;
return await super.list({query, schema: 'ormpRelayerAssigneds'});
const assignedList: OrmpRelayerAssigned[] = [];
let skip = 0;
while (true) {
const variables = {skip};
const parts: OrmpRelayerAssigned[] = await super.list({query, variables, schema: 'ormpRelayerAssigneds'});
const length = parts.length;
if (length == 0) {
return assignedList;
}
assignedList.push(...parts);
skip += length;
}
}

public async lastAssignedMessage(): Promise<OrmpRelayerAssigned | undefined> {
Expand Down Expand Up @@ -78,6 +91,8 @@ export class ThegraphIndexerRelayer extends GraphCommon {
if (!msgHashes.length) {
return [];
}

const msgHashesParts: string[][] = CollectionKit.split(msgHashes, 100);
const query = `
query QueryRelayerAssigned($msgHashes: [String!]!) {
ormpRelayerAssigneds(
Expand All @@ -91,13 +106,18 @@ export class ThegraphIndexerRelayer extends GraphCommon {
}
}
`;
const variables = {msgHashes};
const pickedAssignedMessages: OrmpRelayerAssigned[] = await super.list({
query,
variables,
schema: 'ormpRelayerAssigneds',
});
return pickedAssignedMessages.map(item => item.msgHash);
const rets: string[] = [];
for (const parts of msgHashesParts) {
const variables = {msgHashes: parts};
const pickedAssignedMessages: OrmpRelayerAssigned[] = await super.list({
query,
variables,
schema: 'ormpRelayerAssigneds',
});
const pamhashes: string[] = pickedAssignedMessages.map(item => item.msgHash);
rets.push(...pamhashes);
}
return rets;
}

}
10 changes: 5 additions & 5 deletions packages/indexer/src/thegraph/subapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ export class ThegraphIndexerSubapi extends GraphCommon {
const completeds = [] as AirnodeBeaconCompletedDistruibution[];
for (const beaconId of beacons) {
const c = await this.lastAirnodeCompleted({beaconId});
// logger.debug(
// 'queried completed events %s by %s',
// JSON.stringify(c),
// beaconId,
// );
logger.debug(
'queried completed events %s by %s',
JSON.stringify(c),
beaconId,
);
if (!c) continue;
completeds.push(c);
}
Expand Down
35 changes: 35 additions & 0 deletions packages/indexer/src/toolkit/collection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
export class CollectionKit {
public static parts<T>(array: T[], part = 1): T[][] {
const rets = [];
const length = array.length;
const size = Math.floor(length / part);
let start = 0, end = size;
let times = 0;
for (; start < length;) {
times += 1;
const items = array.slice(start, end);
if (times === part && start < length) {
items.push(...array.slice(end, length));
rets.push(items);
break;
}
rets.push(items);
start = end;
end += size;
}
return rets;
}

public static split<T>(array: T[], size = 10): T[][] {
const rets = [];
const length = array.length;
let start = 0, end = size;
for (; start < length;) {
rets.push(array.slice(start, end));
start = end;
end += size;
}
return rets;
}

}
1 change: 1 addition & 0 deletions packages/indexer/src/types/graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export interface OrmpMessageAccepted extends BaseGraphEntity {
message_toChainId: string
message_to: string
message_encoded: string
message_gasLimit: string
}

export interface OrmpMessageDispatched extends BaseGraphEntity {
Expand Down
Loading

0 comments on commit 3dc3969

Please sign in to comment.