Skip to content

Commit

Permalink
feat: fetch events by src addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
0xnigir1 committed Dec 17, 2024
1 parent eb7e1c3 commit aa49ef6
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 9 deletions.
19 changes: 18 additions & 1 deletion packages/data-flow/src/eventsFetcher.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { IIndexerClient } from "@grants-stack-indexer/indexer-client";
import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";
import { Address, AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";

import { IEventsFetcher } from "./interfaces/index.js";

Expand All @@ -19,4 +19,21 @@ export class EventsFetcher implements IEventsFetcher {
limit,
);
}

/** @inheritdoc */
async fetchEventsBySrcAddress(params: {
chainId: ChainId;
srcAddresses: Address[];
from?: {
blockNumber?: number;
logIndex?: number;
};
to: {
blockNumber: number;
logIndex: number;
};
limit?: number;
}): Promise<AnyIndexerFetchedEvent[]> {
return this.indexerClient.getEventsBySrcAddress(params);
}
}
24 changes: 23 additions & 1 deletion packages/data-flow/src/interfaces/eventsFetcher.interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";
import { Address, AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";

/**
* Interface for the events fetcher
Expand All @@ -17,4 +17,26 @@ export interface IEventsFetcher {
logIndex: number,
limit?: number,
): Promise<AnyIndexerFetchedEvent[]>;

/**
* Fetch the events by src address, block number and log index for a chain
* @param chainId id of the chain
* @param srcAddresses src addresses to fetch events from
* @param toBlock block number to fetch events from
* @param logIndex log index in the block to fetch events from
* @param limit limit of events to fetch
*/
fetchEventsBySrcAddress(params: {
chainId: ChainId;
srcAddresses: Address[];
from?: {
blockNumber?: number;
logIndex?: number;
};
to: {
blockNumber: number;
logIndex: number;
};
limit?: number;
}): Promise<AnyIndexerFetchedEvent[]>;
}
1 change: 1 addition & 0 deletions packages/data-flow/test/unit/eventsFetcher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ describe("EventsFetcher", () => {
beforeEach(() => {
indexerClientMock = {
getEventsAfterBlockNumberAndLogIndex: vi.fn(),
getEventsBySrcAddress: vi.fn(),
};

eventsFetcher = new EventsFetcher(indexerClientMock);
Expand Down
24 changes: 23 additions & 1 deletion packages/indexer-client/src/interfaces/indexerClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";
import { Address, AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";

/**
* Interface for the indexer client
Expand All @@ -17,4 +17,26 @@ export interface IIndexerClient {
logIndex: number,
limit?: number,
): Promise<AnyIndexerFetchedEvent[]>;

/**
* Get the events by src address from the indexer service
* @param chainId Id of the chain
* @param srcAddresses Src addresses to fetch events from
* @param from Block number to start fetching events from
* @param logIndex Log index in the block
* @param limit Limit of events to fetch
*/
getEventsBySrcAddress(params: {
chainId: ChainId;
srcAddresses: Address[];
from?: {
blockNumber?: number;
logIndex?: number;
};
to: {
blockNumber: number;
logIndex: number;
};
limit?: number;
}): Promise<AnyIndexerFetchedEvent[]>;
}
93 changes: 92 additions & 1 deletion packages/indexer-client/src/providers/envioIndexerClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { gql, GraphQLClient } from "graphql-request";

import { AnyIndexerFetchedEvent, ChainId, stringify } from "@grants-stack-indexer/shared";
import { Address, AnyIndexerFetchedEvent, ChainId, stringify } from "@grants-stack-indexer/shared";

import { IndexerClientError, InvalidIndexerResponse } from "../exceptions/index.js";
import { IIndexerClient } from "../internal.js";
Expand Down Expand Up @@ -73,4 +73,95 @@ export class EnvioIndexerClient implements IIndexerClient {
throw new IndexerClientError(stringify(error, Object.getOwnPropertyNames(error)));
}
}

/** @inheritdoc */
async getEventsBySrcAddress(params: {
chainId: ChainId;
srcAddresses: Address[];
from?: {
blockNumber?: number;
logIndex?: number;
};
to: {
blockNumber: number;
logIndex: number;
};
limit?: number;
}): Promise<AnyIndexerFetchedEvent[]> {
try {
const { chainId, srcAddresses, from, to, limit = 100 } = params;
const { blockNumber: toBlock, logIndex: toLogIndex } = to;
const { blockNumber: fromBlock, logIndex: fromLogIndex } = from ?? {
blockNumber: 0,
logIndex: 0,
};
const response = (await this.client.request(
gql`
query getEventsBySrcAddress(
$chainId: Int!
$srcAddresses: [String!]!
$fromBlock: Int!
$fromLogIndex: Int!
$toBlock: Int!
$toLogIndex: Int!
$limit: Int!
) {
raw_events(
order_by: [{ block_number: asc }, { log_index: asc }]
where: {
chain_id: { _eq: $chainId }
src_address: { _in: $srcAddresses }
_and: [
{
_or: [
{ block_number: { _gt: $fromBlock } }
{
_and: [
{ block_number: { _eq: $fromBlock } }
{ log_index: { _gt: $fromLogIndex } }
]
}
]
}
{
_or: [
{ block_number: { _lt: $toBlock } }
{
_and: [
{ block_number: { _eq: $toBlock } }
{ log_index: { _lte: $toLogIndex } }
]
}
]
}
]
}
limit: $limit
) {
blockNumber: block_number
blockTimestamp: block_timestamp
chainId: chain_id
contractName: contract_name
eventName: event_name
logIndex: log_index
params
srcAddress: src_address
transactionFields: transaction_fields
}
}
`,
{ chainId, srcAddresses, fromBlock, fromLogIndex, toBlock, toLogIndex, limit },
)) as { raw_events: AnyIndexerFetchedEvent[] };
if (response?.raw_events) {
return response.raw_events;
} else {
throw new InvalidIndexerResponse(stringify(response));
}
} catch (error) {
if (error instanceof InvalidIndexerResponse) {
throw error;
}
throw new IndexerClientError(stringify(error, Object.getOwnPropertyNames(error)));
}
}
}
163 changes: 163 additions & 0 deletions packages/indexer-client/test/unit/envioIndexerClient.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,167 @@ describe("EnvioIndexerClient", () => {
expect(result).toEqual([]);
});
});

describe("getEventsBySrcAddress", () => {
beforeEach(() => {
// Update the mock implementation for getEventsBySrcAddress queries
graphqlClient.request.mockImplementation(
async (
_document: RequestDocument | RequestOptions<object, object>,
...args: object[]
) => {
const variables = args[0] as {
chainId: ChainId;
srcAddresses: string[];
fromBlock: number;
fromLogIndex: number;
toBlock: number;
toLogIndex: number;
limit: number;
};
const {
chainId,
srcAddresses,
fromBlock,
fromLogIndex,
toBlock,
toLogIndex,
limit,
} = variables;

const filteredEvents = testEvents
.filter((event) => {
// Match chainId and srcAddress
if (event.chainId !== chainId) return false;
if (!srcAddresses.includes(event.srcAddress)) return false;

// Check if event is after fromBlock/fromLogIndex
const isAfterFrom =
event.blockNumber > fromBlock ||
(event.blockNumber === fromBlock && event.logIndex > fromLogIndex);

// Check if event is before or at toBlock/toLogIndex
const isBeforeTo =
event.blockNumber < toBlock ||
(event.blockNumber === toBlock && event.logIndex <= toLogIndex);

return isAfterFrom && isBeforeTo;
})
.slice(0, limit);

return { raw_events: filteredEvents };
},
);
});

it("returns events within the specified block range and matching srcAddresses", async () => {
const result = await envioIndexerClient.getEventsBySrcAddress({
chainId: 1 as ChainId,
srcAddresses: ["0x1234"],
from: { blockNumber: 100, logIndex: 0 },
to: { blockNumber: 101, logIndex: 2 },
});

expect(result).toHaveLength(3);
expect(result).toEqual(
expect.arrayContaining([
expect.objectContaining({ blockNumber: 100, logIndex: 1 }),
expect.objectContaining({ blockNumber: 100, logIndex: 3 }),
expect.objectContaining({ blockNumber: 101, logIndex: 1 }),
]),
);
});

it("uses default from values when not provided", async () => {
const result = await envioIndexerClient.getEventsBySrcAddress({
chainId: 1 as ChainId,
srcAddresses: ["0x1234"],
to: { blockNumber: 101, logIndex: 2 },
});

expect(result).toHaveLength(3);
// Should include all events up to block 101, logIndex 2
expect(result[0]?.blockNumber).toBe(100);
expect(result[2]?.blockNumber).toBe(101);
});

it("respects the limit parameter", async () => {
const result = await envioIndexerClient.getEventsBySrcAddress({
chainId: 1 as ChainId,
srcAddresses: ["0x1234"],
to: { blockNumber: 101, logIndex: 2 },
limit: 2,
});

expect(result).toHaveLength(2);
});

it("uses default limit when not provided", async () => {
await envioIndexerClient.getEventsBySrcAddress({
chainId: 1 as ChainId,
srcAddresses: ["0x1234"],
to: { blockNumber: 101, logIndex: 2 },
});

expect(graphqlClient.request).toHaveBeenCalledWith(
expect.any(String),
expect.objectContaining({ limit: 100 }),
);
});

it("returns empty array when no events match srcAddresses", async () => {
const result = await envioIndexerClient.getEventsBySrcAddress({
chainId: 1 as ChainId,
srcAddresses: ["0x9999"],
to: { blockNumber: 101, logIndex: 2 },
});

expect(result).toHaveLength(0);
});

it("throws InvalidIndexerResponse when response structure is incorrect", async () => {
graphqlClient.request.mockResolvedValue({ status: 200, headers: {}, data: {} });

await expect(
envioIndexerClient.getEventsBySrcAddress({
chainId: 1 as ChainId,
srcAddresses: ["0x1234"],
to: { blockNumber: 101, logIndex: 2 },
}),
).rejects.toThrow(InvalidIndexerResponse);
});

it("throws IndexerClientError when GraphQL request fails", async () => {
graphqlClient.request.mockRejectedValue(new Error("GraphQL request failed"));

await expect(
envioIndexerClient.getEventsBySrcAddress({
chainId: 1 as ChainId,
srcAddresses: ["0x1234"],
to: { blockNumber: 101, logIndex: 2 },
}),
).rejects.toThrow(IndexerClientError);
});

it("filters events by multiple srcAddresses", async () => {
// Add a test event with a different srcAddress
const extraTestEvent = {
...testEvents[0],
srcAddress: "0x5678",
blockNumber: 100,
logIndex: 2,
} as AnyIndexerFetchedEvent;
testEvents.push(extraTestEvent);

const result = await envioIndexerClient.getEventsBySrcAddress({
chainId: 1 as ChainId,
srcAddresses: ["0x1234", "0x5678"],
from: { blockNumber: 100, logIndex: 0 },
to: { blockNumber: 101, logIndex: 2 },
});

expect(result).toContainEqual(expect.objectContaining({ srcAddress: "0x5678" }));
expect(result).toContainEqual(expect.objectContaining({ srcAddress: "0x1234" }));
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ export class KyselyStrategyRegistryRepository implements IStrategyRegistryReposi

/** @inheritdoc */
async getStrategies(filters?: { handled?: boolean; chainId?: ChainId }): Promise<Strategy[]> {
const query = this.db.withSchema(this.schemaName).selectFrom("strategies");
let query = this.db.withSchema(this.schemaName).selectFrom("strategies").selectAll();

if (filters?.chainId) {
query.where("chainId", "=", filters.chainId);
query = query.where("chainId", "=", filters.chainId);
}

if (filters?.handled) {
query.where("handled", "=", filters.handled);
if (filters?.handled !== undefined && filters?.handled !== null) {
query = query.where("handled", "=", filters.handled);
}

return query.selectAll().execute();
return query.execute();
}
}

0 comments on commit aa49ef6

Please sign in to comment.