Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fetch events by src addresses #44

Merged
merged 4 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion packages/data-flow/src/eventsFetcher.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { IIndexerClient } from "@grants-stack-indexer/indexer-client";
import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";
import { Address, AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";

import { IEventsFetcher } from "./interfaces/index.js";

Expand All @@ -19,4 +19,21 @@ export class EventsFetcher implements IEventsFetcher {
limit,
);
}

/** @inheritdoc */
async fetchEventsBySrcAddress(params: {
chainId: ChainId;
srcAddresses: Address[];
from?: {
blockNumber?: number;
logIndex?: number;
};
to: {
blockNumber: number;
logIndex: number;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be like this ?

Suggested change
from?: {
blockNumber?: number;
logIndex?: number;
};
to: {
blockNumber: number;
logIndex: number;
from: {
blockNumber: number;
logIndex: number;
};
to?: {
blockNumber: number;
logIndex: number;

};
limit?: number;
}): Promise<AnyIndexerFetchedEvent[]> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a sidenote, you got the fetchEventsByBlockNumberAndLogIndex method that receives multiple parameters and you got this method that receives a params object.

Is there any particular reason for that?

And another question 👉 have you ever considered leveraging some kind of query builder pattern for the event fetcher? You could end up with a cool class:

const query = EventsFetcher.buildQuery()
  .withChainId(chainId)
  .withSrcAddressess(addresses)
  .withBlockNumber(blockNumber)
  ...

If you envision having more fetchEventsBy... methods, I'd consider this hehe

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding first, now that you mention that you're right, originally i wrote the method only with to param but when adding the from actually both methods can be combined into one

regarding second, i like query builders 😄 but i don't think we will benefit too much for now

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i keep the old method until i integrate it with the rest of the code cuz if not i'll break everything

return this.indexerClient.getEventsBySrcAddress(params);
}
}
24 changes: 23 additions & 1 deletion packages/data-flow/src/interfaces/eventsFetcher.interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";
import { Address, AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";

/**
* Interface for the events fetcher
Expand All @@ -17,4 +17,26 @@ export interface IEventsFetcher {
logIndex: number,
limit?: number,
): Promise<AnyIndexerFetchedEvent[]>;

/**
* Fetch the events by src address, block number and log index for a chain
* @param chainId id of the chain
* @param srcAddresses src addresses to fetch events from
* @param toBlock block number to fetch events from
* @param logIndex log index in the block to fetch events from
* @param limit limit of events to fetch
*/
fetchEventsBySrcAddress(params: {
Comment on lines +22 to +29
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this ok ?

chainId: ChainId;
srcAddresses: Address[];
from?: {
blockNumber?: number;
logIndex?: number;
};
to: {
blockNumber: number;
logIndex: number;
};
limit?: number;
}): Promise<AnyIndexerFetchedEvent[]>;
}
1 change: 1 addition & 0 deletions packages/data-flow/test/unit/eventsFetcher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ describe("EventsFetcher", () => {
beforeEach(() => {
indexerClientMock = {
getEventsAfterBlockNumberAndLogIndex: vi.fn(),
getEventsBySrcAddress: vi.fn(),
};

eventsFetcher = new EventsFetcher(indexerClientMock);
Expand Down
24 changes: 23 additions & 1 deletion packages/indexer-client/src/interfaces/indexerClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";
import { Address, AnyIndexerFetchedEvent, ChainId } from "@grants-stack-indexer/shared";

/**
* Interface for the indexer client
Expand All @@ -17,4 +17,26 @@ export interface IIndexerClient {
logIndex: number,
limit?: number,
): Promise<AnyIndexerFetchedEvent[]>;

/**
* Get the events by src address from the indexer service
* @param chainId Id of the chain
* @param srcAddresses Src addresses to fetch events from
* @param from Block number to start fetching events from
* @param logIndex Log index in the block
* @param limit Limit of events to fetch
*/
getEventsBySrcAddress(params: {
chainId: ChainId;
srcAddresses: Address[];
from?: {
blockNumber?: number;
logIndex?: number;
};
to: {
blockNumber: number;
logIndex: number;
};
limit?: number;
}): Promise<AnyIndexerFetchedEvent[]>;
}
93 changes: 92 additions & 1 deletion packages/indexer-client/src/providers/envioIndexerClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { gql, GraphQLClient } from "graphql-request";

import { AnyIndexerFetchedEvent, ChainId, stringify } from "@grants-stack-indexer/shared";
import { Address, AnyIndexerFetchedEvent, ChainId, stringify } from "@grants-stack-indexer/shared";

import { IndexerClientError, InvalidIndexerResponse } from "../exceptions/index.js";
import { IIndexerClient } from "../internal.js";
Expand Down Expand Up @@ -73,4 +73,95 @@ export class EnvioIndexerClient implements IIndexerClient {
throw new IndexerClientError(stringify(error, Object.getOwnPropertyNames(error)));
}
}

/** @inheritdoc */
async getEventsBySrcAddress(params: {
chainId: ChainId;
srcAddresses: Address[];
from?: {
blockNumber?: number;
logIndex?: number;
};
to: {
blockNumber: number;
logIndex: number;
};
limit?: number;
}): Promise<AnyIndexerFetchedEvent[]> {
try {
const { chainId, srcAddresses, from, to, limit = 100 } = params;
const { blockNumber: toBlock, logIndex: toLogIndex } = to;
const { blockNumber: fromBlock, logIndex: fromLogIndex } = from ?? {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if from is {}?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

blockNumber: 0,
logIndex: 0,
};
const response = (await this.client.request(
gql`
query getEventsBySrcAddress(
$chainId: Int!
$srcAddresses: [String!]!
$fromBlock: Int!
$fromLogIndex: Int!
$toBlock: Int!
$toLogIndex: Int!
$limit: Int!
) {
raw_events(
order_by: [{ block_number: asc }, { log_index: asc }]
where: {
chain_id: { _eq: $chainId }
src_address: { _in: $srcAddresses }
_and: [
{
_or: [
{ block_number: { _gt: $fromBlock } }
{
_and: [
{ block_number: { _eq: $fromBlock } }
{ log_index: { _gt: $fromLogIndex } }
]
}
]
}
{
_or: [
{ block_number: { _lt: $toBlock } }
{
_and: [
{ block_number: { _eq: $toBlock } }
{ log_index: { _lte: $toLogIndex } }
]
}
]
}
]
}
limit: $limit
) {
blockNumber: block_number
blockTimestamp: block_timestamp
chainId: chain_id
contractName: contract_name
eventName: event_name
logIndex: log_index
params
srcAddress: src_address
transactionFields: transaction_fields
}
}
`,
{ chainId, srcAddresses, fromBlock, fromLogIndex, toBlock, toLogIndex, limit },
)) as { raw_events: AnyIndexerFetchedEvent[] };
if (response?.raw_events) {
return response.raw_events;
} else {
throw new InvalidIndexerResponse(stringify(response));
}
} catch (error) {
if (error instanceof InvalidIndexerResponse) {
throw error;
}
throw new IndexerClientError(stringify(error, Object.getOwnPropertyNames(error)));
}
}
}
163 changes: 163 additions & 0 deletions packages/indexer-client/test/unit/envioIndexerClient.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,167 @@ describe("EnvioIndexerClient", () => {
expect(result).toEqual([]);
});
});

describe("getEventsBySrcAddress", () => {
beforeEach(() => {
// Update the mock implementation for getEventsBySrcAddress queries
graphqlClient.request.mockImplementation(
async (
_document: RequestDocument | RequestOptions<object, object>,
...args: object[]
) => {
const variables = args[0] as {
chainId: ChainId;
srcAddresses: string[];
fromBlock: number;
fromLogIndex: number;
toBlock: number;
toLogIndex: number;
limit: number;
};
const {
chainId,
srcAddresses,
fromBlock,
fromLogIndex,
toBlock,
toLogIndex,
limit,
} = variables;

const filteredEvents = testEvents
.filter((event) => {
// Match chainId and srcAddress
if (event.chainId !== chainId) return false;
if (!srcAddresses.includes(event.srcAddress)) return false;

// Check if event is after fromBlock/fromLogIndex
const isAfterFrom =
event.blockNumber > fromBlock ||
(event.blockNumber === fromBlock && event.logIndex > fromLogIndex);

// Check if event is before or at toBlock/toLogIndex
const isBeforeTo =
event.blockNumber < toBlock ||
(event.blockNumber === toBlock && event.logIndex <= toLogIndex);

return isAfterFrom && isBeforeTo;
})
.slice(0, limit);

return { raw_events: filteredEvents };
},
);
});

it("returns events within the specified block range and matching srcAddresses", async () => {
const result = await envioIndexerClient.getEventsBySrcAddress({
chainId: 1 as ChainId,
srcAddresses: ["0x1234"],
from: { blockNumber: 100, logIndex: 0 },
to: { blockNumber: 101, logIndex: 2 },
});

expect(result).toHaveLength(3);
expect(result).toEqual(
expect.arrayContaining([
expect.objectContaining({ blockNumber: 100, logIndex: 1 }),
expect.objectContaining({ blockNumber: 100, logIndex: 3 }),
expect.objectContaining({ blockNumber: 101, logIndex: 1 }),
]),
);
});

it("uses default from values when not provided", async () => {
const result = await envioIndexerClient.getEventsBySrcAddress({
chainId: 1 as ChainId,
srcAddresses: ["0x1234"],
to: { blockNumber: 101, logIndex: 2 },
});

expect(result).toHaveLength(3);
// Should include all events up to block 101, logIndex 2
expect(result[0]?.blockNumber).toBe(100);
expect(result[2]?.blockNumber).toBe(101);
});

it("respects the limit parameter", async () => {
const result = await envioIndexerClient.getEventsBySrcAddress({
chainId: 1 as ChainId,
srcAddresses: ["0x1234"],
to: { blockNumber: 101, logIndex: 2 },
limit: 2,
});

expect(result).toHaveLength(2);
});

it("uses default limit when not provided", async () => {
await envioIndexerClient.getEventsBySrcAddress({
chainId: 1 as ChainId,
srcAddresses: ["0x1234"],
to: { blockNumber: 101, logIndex: 2 },
});

expect(graphqlClient.request).toHaveBeenCalledWith(
expect.any(String),
expect.objectContaining({ limit: 100 }),
);
});

it("returns empty array when no events match srcAddresses", async () => {
const result = await envioIndexerClient.getEventsBySrcAddress({
chainId: 1 as ChainId,
srcAddresses: ["0x9999"],
to: { blockNumber: 101, logIndex: 2 },
});

expect(result).toHaveLength(0);
});

it("throws InvalidIndexerResponse when response structure is incorrect", async () => {
graphqlClient.request.mockResolvedValue({ status: 200, headers: {}, data: {} });

await expect(
envioIndexerClient.getEventsBySrcAddress({
chainId: 1 as ChainId,
srcAddresses: ["0x1234"],
to: { blockNumber: 101, logIndex: 2 },
}),
).rejects.toThrow(InvalidIndexerResponse);
});

it("throws IndexerClientError when GraphQL request fails", async () => {
graphqlClient.request.mockRejectedValue(new Error("GraphQL request failed"));

await expect(
envioIndexerClient.getEventsBySrcAddress({
chainId: 1 as ChainId,
srcAddresses: ["0x1234"],
to: { blockNumber: 101, logIndex: 2 },
}),
).rejects.toThrow(IndexerClientError);
});

it("filters events by multiple srcAddresses", async () => {
// Add a test event with a different srcAddress
const extraTestEvent = {
...testEvents[0],
srcAddress: "0x5678",
blockNumber: 100,
logIndex: 2,
} as AnyIndexerFetchedEvent;
testEvents.push(extraTestEvent);

const result = await envioIndexerClient.getEventsBySrcAddress({
chainId: 1 as ChainId,
srcAddresses: ["0x1234", "0x5678"],
from: { blockNumber: 100, logIndex: 0 },
to: { blockNumber: 101, logIndex: 2 },
});

expect(result).toContainEqual(expect.objectContaining({ srcAddress: "0x5678" }));
expect(result).toContainEqual(expect.objectContaining({ srcAddress: "0x1234" }));
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ export class KyselyStrategyRegistryRepository implements IStrategyRegistryReposi

/** @inheritdoc */
async getStrategies(filters?: { handled?: boolean; chainId?: ChainId }): Promise<Strategy[]> {
const query = this.db.withSchema(this.schemaName).selectFrom("strategies");
let query = this.db.withSchema(this.schemaName).selectFrom("strategies").selectAll();

if (filters?.chainId) {
query.where("chainId", "=", filters.chainId);
query = query.where("chainId", "=", filters.chainId);
}

if (filters?.handled) {
query.where("handled", "=", filters.handled);
if (filters?.handled !== undefined && filters?.handled !== null) {
query = query.where("handled", "=", filters.handled);
}

return query.selectAll().execute();
return query.execute();
}
}
Loading