Skip to content

Commit

Permalink
test: add unit tests and natspec
Browse files Browse the repository at this point in the history
  • Loading branch information
0xnigir1 committed Oct 31, 2024
1 parent efa1446 commit 39c885a
Show file tree
Hide file tree
Showing 15 changed files with 1,104 additions and 17 deletions.
10 changes: 8 additions & 2 deletions packages/data-flow/src/eventsRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@ import type { AnyEvent, ContractName, ProcessorEvent } from "@grants-stack-index
import type { IEventsRegistry } from "./internal.js";

/**
* Class to store the last processed event
* Class to store the last processed event in memory
*/
//TODO: Implement storage version to persist the last processed event. we need to store it by chainId
export class InMemoryEventsRegistry implements IEventsRegistry {
//TODO: Implement storage to persist the last processed event. we need to store it by chainId
private lastProcessedEvent: ProcessorEvent<ContractName, AnyEvent> | undefined;

/**
* @inheritdoc
*/
async getLastProcessedEvent(): Promise<ProcessorEvent<ContractName, AnyEvent> | undefined> {
return this.lastProcessedEvent;
}

/**
* @inheritdoc
*/
async saveLastProcessedEvent(event: ProcessorEvent<ContractName, AnyEvent>): Promise<void> {
this.lastProcessedEvent = event;
}
Expand Down
6 changes: 4 additions & 2 deletions packages/data-flow/src/exceptions/invalidEvent.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { AnyEvent, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared";
import { AnyEvent, ContractName, ProcessorEvent, stringify } from "@grants-stack-indexer/shared";

export class InvalidEvent extends Error {
constructor(event: ProcessorEvent<ContractName, AnyEvent>) {
super(`Event couldn't be processed: ${event}`);
super(`Event couldn't be processed: ${stringify(event)}`);

this.name = "InvalidEvent";
}
}
12 changes: 12 additions & 0 deletions packages/data-flow/src/interfaces/eventsRegistry.interface.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
import { AnyEvent, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared";

/**
* The events registry saves as a checkpoint to the last processed event by the system.
* This is used to resume the indexing from the last processed event in case of an error or temporary interruption.
*/
export interface IEventsRegistry {
/**
* Get the last processed event by the system
* @returns The last processed event or undefined if no event has been processed yet.
*/
getLastProcessedEvent(): Promise<ProcessorEvent<ContractName, AnyEvent> | undefined>;
/**
* Save the last processed event by the system
* @param event - The event to save.
*/
saveLastProcessedEvent(event: ProcessorEvent<ContractName, AnyEvent>): Promise<void>;
}
16 changes: 16 additions & 0 deletions packages/data-flow/src/interfaces/strategyRegistry.interface.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
import { Address, Hex } from "viem";

/**
* The strategy registry saves the mapping between the strategy address and the strategy id. Serves as a Cache
* to avoid having to read from the chain to get the strategy id every time.
*/
//TODO: implement a mechanism to record Strategy that we still don't have a corresponding handler
// we need to store and mark that this strategy is not handled yet, so when it's supported we can process all of the pending events for it
export interface IStrategyRegistry {
/**
* Get the strategy id by the strategy address
* @param strategyAddress - The strategy address
* @returns The strategy id or undefined if the strategy address is not registered
*/
getStrategyId(strategyAddress: Address): Promise<Hex | undefined>;
/**
* Save the strategy id by the strategy address
* @param strategyAddress - The strategy address
* @param strategyId - The strategy id
*/
saveStrategyId(strategyAddress: Address, strategyId: Hex): Promise<void>;
}
93 changes: 83 additions & 10 deletions packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
// class should contain the logic to orchestrate the data flow Events Fetcher -> Events Processor -> Data Loader

import { IIndexerClient } from "@grants-stack-indexer/indexer-client";
import { UnsupportedStrategy } from "@grants-stack-indexer/processors/dist/src/internal.js";
import {
existsHandler,
UnsupportedEventException,
UnsupportedStrategy,
} from "@grants-stack-indexer/processors";
import {
Address,
AnyEvent,
Expand All @@ -23,6 +27,33 @@ import { IEventsFetcher } from "./interfaces/index.js";
import { IStrategyRegistry } from "./interfaces/strategyRegistry.interface.js";
import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from "./internal.js";

/**
* The Orchestrator is the central coordinator of the data flow system, managing the interaction between
* three main components:
*
* 1. Events Fetcher: Retrieves blockchain events from the indexer service
* 2. Events Processor: Processes and transforms raw events into domain events
* 3. Data Loader: Persists the processed events into the database
*
* The Orchestrator implements a continuous processing loop that:
*
* 1. Fetches batches of events from the indexer and stores them in an internal queue
* 2. Processes each event from the queue:
* - For strategy events and PoolCreated from Allo contract, enhances them with strategyId
* - Forwards the event to the Events Processor which is in charge of delagating the processing of the event to the correct handler
* - Discards events for unsupported strategies or events
* 3. Loads the processed events into the database via the Data Loader
*
* The Orchestrator provides fault tolerance and performance optimization through:
* - Configurable batch sizes for event fetching
* - Delayed processing to prevent overwhelming the system
* - Error handling and logging for various failure scenarios
* - Registry tracking of supported/unsupported strategies and events
*
* TODO: Implement a circuit breaker to gracefully stop the orchestrator
* TODO: Enhance the error handling/retries, logging and observability
* TODO: Handle unhandled strategies appropriately
*/
export class Orchestrator {
private readonly eventsQueue: IQueue<ProcessorEvent<ContractName, AnyEvent>>;
private readonly eventsFetcher: IEventsFetcher;
Expand Down Expand Up @@ -55,6 +86,7 @@ export class Orchestrator {
}

async run(): Promise<void> {
//TODO: implement a circuit breaker to gracefully stop the orchestrator
while (true) {
let event: ProcessorEvent<ContractName, AnyEvent> | undefined;
try {
Expand All @@ -68,8 +100,19 @@ export class Orchestrator {
}

event = await this.enhanceStrategyId(event);
const changesets = await this.eventsProcessor.processEvent(event);
if (event.contractName === "Strategy" && "strategyId" in event) {
if (!existsHandler(event.strategyId)) {
//TODO: save to registry as unsupported strategy, so when the strategy is handled it will be backwards compatible and process all of the events
console.log(
`No handler found for strategyId: ${event.strategyId}. Event: ${stringify(
event,
)}`,
);
continue;
}
}

const changesets = await this.eventsProcessor.processEvent(event);
const executionResult = await this.dataLoader.applyChanges(changesets);

if (executionResult.numFailed > 0) {
Expand All @@ -79,20 +122,29 @@ export class Orchestrator {
event,
)}`,
);
} else {
await this.eventsRegistry.saveLastProcessedEvent(event);
}

await this.eventsRegistry.saveLastProcessedEvent(event);
} catch (error: unknown) {
// TODO: improve error handling and notify
if (error instanceof UnsupportedStrategy || error instanceof InvalidEvent) {
console.error(`${error.name}: ${error.message}. Event: ${stringify(event)}`);
// TODO: improve error handling, retries and notify
if (
error instanceof UnsupportedStrategy ||
error instanceof InvalidEvent ||
error instanceof UnsupportedEventException
) {
console.error(
`Current event cannot be handled. ${error.name}: ${error.message}. Event: ${stringify(event)}`,
);
} else {
console.error(`Error processing event: ${stringify(event)}`, error);
}
}
}
}

/**
* Fill the events queue with the events from the events fetcher
*/
private async fillQueue(): Promise<void> {
const lastProcessedEvent = await this.eventsRegistry.getLastProcessedEvent();
const blockNumber = lastProcessedEvent?.blockNumber ?? 0;
Expand All @@ -108,9 +160,15 @@ export class Orchestrator {
this.eventsQueue.push(...events);
}

// if poolCreated event, get strategyId and save in the map
// if strategy event populate event with strategyId if exists in the map
// get strategyId and populate event with it
/**
* Enhance the event with the strategy id when required
* @param event - The event
* @returns The event with the strategy id or the same event if strategyId is not required
*
* StrategyId is required for the following events:
* - PoolCreated from Allo contract
* - Any event from Strategy contract or its implementations
*/
private async enhanceStrategyId(
event: ProcessorEvent<ContractName, AnyEvent>,
): Promise<ProcessorEvent<ContractName, AnyEvent>> {
Expand All @@ -125,6 +183,11 @@ export class Orchestrator {
return event;
}

/**
* Get the strategy address from the event
* @param event - The event
* @returns The strategy address
*/
private getStrategyAddress(
event: ProcessorEvent<"Allo", "PoolCreated"> | ProcessorEvent<"Strategy", StrategyEvent>,
): Address {
Expand All @@ -133,6 +196,11 @@ export class Orchestrator {
: event.srcAddress;
}

/**
* Get the strategy id from the strategy registry or fetch it from the chain
* @param strategyAddress - The strategy address
* @returns The strategy id
*/
private async getOrFetchStrategyId(strategyAddress: Address): Promise<Hex> {
const existingId = await this.strategyRegistry.getStrategyId(strategyAddress);
if (existingId) {
Expand All @@ -150,6 +218,11 @@ export class Orchestrator {
return strategyId;
}

/**
* Check if the event requires a strategy id
* @param event - The event
* @returns True if the event requires a strategy id, false otherwise
*/
private requiresStrategyId(
event: ProcessorEvent<ContractName, AnyEvent>,
): event is ProcessorEvent<"Allo", "PoolCreated"> | ProcessorEvent<"Strategy", StrategyEvent> {
Expand Down
6 changes: 4 additions & 2 deletions packages/data-flow/src/strategyRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ import type { Address, Hex } from "viem";
import type { IStrategyRegistry } from "./internal.js";

/**
* Class to store strategy ids
* Class to store strategy ids in memory
*/
//TODO: Implement storage to persist strategies. since we're using address, do we need ChainId?
export class InMemoryStrategyRegistry implements IStrategyRegistry {
//TODO: Implement storage to persist strategies. since we're using address, do we need ChainId?
private strategiesMap: Map<Address, Hex> = new Map();

/** @inheritdoc */
async getStrategyId(strategyAddress: Address): Promise<Hex | undefined> {
return this.strategiesMap.get(strategyAddress);
}

/** @inheritdoc */
async saveStrategyId(strategyAddress: Address, strategyId: Hex): Promise<void> {
this.strategiesMap.set(strategyAddress, strategyId);
}
Expand Down
3 changes: 3 additions & 0 deletions packages/data-flow/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import {
IRoundRepository,
} from "@grants-stack-indexer/repository";

/**
* The result of the execution of the changesets.
*/
export type ExecutionResult = {
changesets: Changeset["type"][];
numExecuted: number;
Expand Down
16 changes: 16 additions & 0 deletions packages/data-flow/src/utils/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@ export interface IQueue<T> {
get length(): number;
isEmpty(): boolean;
}
/**
* A circular buffer-based queue implementation that provides efficient O(1) operations
* with automatic resizing capabilities.
*
* Key benefits:
* - Constant O(1) time complexity for push/pop/peek operations
* - Memory efficient circular buffer that reuses space
* - Automatic buffer resizing to handle growth and shrinkage
* - Amortized O(1) push operations even when resizing is needed
* - Memory optimization by shrinking when queue becomes very empty
* - Initial capacity can be tuned based on expected usage
*
* The circular buffer approach avoids the need to shift elements, making it more
* efficient than array-based queues for high-throughput scenarios. The automatic
* resizing ensures memory usage adapts to actual needs while maintaining performance.
*/

export class Queue<T> implements IQueue<T> {
private buffer: (T | undefined)[];
Expand Down
Loading

0 comments on commit 39c885a

Please sign in to comment.