Skip to content

Commit

Permalink
Updated getPastEvents and waitForEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
30mb1 committed Apr 20, 2022
1 parent 586c666 commit f8a743f
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 49 deletions.
128 changes: 102 additions & 26 deletions src/contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
parseTokensObject,
serializeTransaction,
MergeOutputObjectsArray,
TransactionId,
} from './models';
import { ProviderRpcClient } from './index';

Expand Down Expand Up @@ -241,25 +242,25 @@ export class Contract<Abi> {
return this._abi;
}

public async waitForFirstEvent(args: WaitForFirstEventParams<Abi>): Promise<DecodedEvent<Abi, AbiEventName<Abi>>> {
public async waitForEvent(args: WaitForEventParams<Abi>): Promise<DecodedEvent<Abi, AbiEventName<Abi>>> {
const subscriber = new this._provider.Subscriber();

const oldEventsStream = subscriber.oldTransactions(this._address, args.options);
const oldEventsStream = subscriber.oldTransactions(this._address, args?.options);
const eventsStream = oldEventsStream.merge(subscriber.transactions(
this._address
)).flatMap(
item => item.transactions
).flatMap(async tx => {
return await this.decodeTransactionEvents({transaction: tx});
}).filterMap(async event => {
if (args.filter) {
if (args?.filter) {
if (args.filter.name !== event.event) {
return undefined;
}
if (args.filter.params) {
if (args.filter?.params) {
for (const [key , value] of Object.entries(args.filter.params)) {
const event_param_name = key as keyof MergeOutputObjectsArray<any>;
if (event.data[event_param_name] !== value) {
if (event.data[event_param_name] != value) {
return undefined;
}
}
Expand All @@ -271,30 +272,89 @@ export class Contract<Abi> {
return await eventsStream.first();
}

public async getPastEvents(args: GetPastEventParams<Abi>): Promise<DecodedEvent<Abi, AbiEventName<Abi>>[]> {
const subscriber = new this._provider.Subscriber();
public async getPastEvents(args: GetPastEventParams<Abi>): Promise<EventsPaginatedResponse<Abi>> {
let res_events: DecodedEvent<Abi, AbiEventName<Abi>>[] = [];
let new_offset: TransactionId | undefined;
let cur_offset = args?.offset;
let continue_iteration = true;
while (continue_iteration) {
const { transactions, continuation } = await this._provider.getTransactions({
address: this._address,
continuation: cur_offset
});

const tx_list = await subscriber.oldTransactionsList(this._address, args.options);
const events_list = await Promise.all(tx_list.map(async tx => {
return await this.decodeTransactionEvents({transaction: tx});
}));
if (transactions.length === null) {
break;
}

return events_list.flat().filter(event => {
if (args.filter) {
if (args.filter.name !== event.event) {
return false;
}
if (args.filter.params) {
for (const [key , value] of Object.entries(args.filter.params)) {
const event_param_name = key as keyof MergeOutputObjectsArray<any>;
if (event.data[event_param_name] !== value) {
return false;
const fromFilteredTransactions = transactions.filter((item) => (
(args?.options?.fromLt == null || item.id.lt > args?.options?.fromLt) &&
(args?.options?.fromUtime == null || item.createdAt > args?.options?.fromUtime)
));

if (fromFilteredTransactions.length == 0) {
break;
}

const toFilteredTransactions = fromFilteredTransactions.filter((item) => (
(args?.options?.toLt == null || item.id.lt < args?.options?.toLt) &&
(args?.options?.toUtime == null || item.createdAt < args?.options?.toUtime)
));

if (toFilteredTransactions.length > 0) {
const events_tx_list = await Promise.all(toFilteredTransactions.map(async tx => {
const _events = await this.decodeTransactionEvents({transaction: tx});
return { tx: tx, events: _events };
}));

for (const {tx, events} of events_tx_list) {
const filtered_events = events.filter(event => {
if (args?.filters) {
let matched = false;
for (const filter of args.filters) {
let filter_match = true;
if (filter.name !== event.event) {
continue;
}
if (filter?.params) {
for (const [key , value] of Object.entries(filter.params)) {
const event_param_name = key as keyof MergeOutputObjectsArray<any>;
if (event.data[event_param_name] != value) {
filter_match = false;
break;
}
}
}
matched = matched || filter_match;
}
return matched;
}
return true;
});

for (const event of filtered_events) {
if (args?.limit && res_events.length === args.limit) {
continue_iteration = false;
break;
}
res_events.push(event);
}

if (!continue_iteration) {
new_offset = tx.id;
break;
}
}
}
return true;
});

if (continuation != null) {
cur_offset = continuation;
} else {
break;
}
}

return {events: res_events, offset: new_offset};
}

public async decodeTransaction(args: DecodeTransactionParams<Abi>): Promise<DecodedTransaction<Abi, AbiFunctionName<Abi>> | undefined> {
Expand Down Expand Up @@ -530,11 +590,27 @@ export type EventFilterOptions = {
* @category Contract
*/
export type GetPastEventParams<Abi> = {
filter: EventsFilter<Abi> | undefined,
options: EventFilterOptions | undefined
filters: EventsFilter<Abi>[] | undefined,
options: EventFilterOptions | undefined,
limit: number | undefined,
offset: TransactionId | undefined
}

/**
* @category Contract
*/
export type EventsPaginatedResponse<Abi> = {
events: DecodedEvent<Abi, AbiEventName<Abi>>[],
offset: TransactionId | undefined
}

export type WaitForFirstEventParams<Abi> = GetPastEventParams<Abi>;
/**
* @category Contract
*/
export type WaitForEventParams<Abi> = {
filter: EventsFilter<Abi> | undefined,
options: EventFilterOptions | undefined,
};

/**
* @category Contract
Expand Down
23 changes: 0 additions & 23 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,6 @@ export class Subscriber {
return this._addSubscription('transactionsFound', address);
}

public async oldTransactionsList(
address: Address,
filter?: { fromLt?: string, fromUtime?: number, toLt?: string, toUtime?: number }
): Promise<Transaction[]> {

let transactions_list: Transaction[] = [];
const onData = async(data: ProviderEventData<'transactionsFound'>) => {
transactions_list.push(...data.transactions);
}
const onEnd = () => {};

const scanner = new UnorderedTransactionsScanner(this.provider, {
address,
onData,
onEnd,
...filter,
});
await scanner.start();
await scanner.stop();

return transactions_list;
}

/**
* Returns stream of old transactions
*/
Expand Down

0 comments on commit f8a743f

Please sign in to comment.