Skip to content

Commit

Permalink
operation main polishment
Browse files Browse the repository at this point in the history
  • Loading branch information
maciejka committed Jan 9, 2025
1 parent 14f2d04 commit 8068f1b
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 27 deletions.
11 changes: 9 additions & 2 deletions operator/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions operator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
},
"dependencies": {
"fs": "^0.0.1-security",
"lodash": "^4.17.21",
"rxjs": "^7.8.1",
"starknet": "^6.11.0"
},
"devDependencies": {
"@types/lodash": "^4.17.14",
"@types/node": "^22.7.5",
"gts": "^6.0.2",
"prettier": "^3.4.2",
Expand Down
6 changes: 5 additions & 1 deletion operator/src/l2/state.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { ReceiptTx, TransactionStatus } from 'starknet';
import {
ReceiptTx,
TransactionStatus,
GetTransactionReceiptResponse,
} from 'starknet';

type L1Address = `0x${string}`; // or bigint?
type L2Address = `0x${string}`; // or bigint?
Expand Down
11 changes: 5 additions & 6 deletions operator/src/l2/transactions.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Provider, RpcProvider } from 'starknet';
import { Provider, ReceiptTx, RpcProvider } from 'starknet';
import { L2TxHashAndStatus, OperatorState } from './state';
import {
filter,
Expand All @@ -25,7 +25,6 @@ export function getAllL2Txs(state: OperatorState): Set<L2TxHashAndStatus> {
results.add(depositBatch.l2Tx);
break;
default:
// Exhaustive check — if you add a new `status` later, TypeScript will error
const _exhaustive: never = depositBatch;
return _exhaustive;
}
Expand All @@ -51,13 +50,13 @@ export function getAllL2Txs(state: OperatorState): Set<L2TxHashAndStatus> {
return results;
}

export function l2TransactionStatus(
export function l2TransactionStatus<T extends L2TxHashAndStatus>(
provider: RpcProvider,
tx: L2TxHashAndStatus
): Observable<L2TxHashAndStatus> {
tx: T
): Observable<T> {
return interval(5000).pipe(
switchMap(() => from(provider.waitForTransaction(tx.hash))),
map((status) => ({ ...tx, status: status })),
map((status) => ({ ...tx, status })),
filter((recentTx) => recentTx != tx)
// finish when tx is accepted
// takeWhile(tx) => tx.status.statusReceipt
Expand Down
63 changes: 45 additions & 18 deletions operator/src/operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
of,
scan,
Subject,
switchScan,
tap,
} from 'rxjs';
import {
Expand All @@ -21,6 +20,7 @@ import {
import { RpcProvider, TransactionStatus } from 'starknet';
import { L2Event, l2Events } from './l2/events';
import { getAllL2Txs, l2TransactionStatus } from './l2/transactions';
import * as _ from 'lodash';

function diff<T>(a: Set<T>, b: Set<T>): Set<T> {
const result = new Set<T>();
Expand All @@ -34,25 +34,31 @@ function diff<T>(a: Set<T>, b: Set<T>): Set<T> {
return result;
}

function mapSet<T, U>(input: Set<T>, fn: (item: T) => U): Set<U> {
return new Set(Array.from(input, fn));
}

function operatorMain<E, T, S>(
events: Observable<E>,
transactions: (state: S) => Set<T>,
transactionStatus: (tx: T) => Observable<T>,
process: (state: S, input: E | T) => Observable<S>,
applyChange: (state: S, change: E | T) => Observable<S>,
initialState: S
) {
): Observable<S> {
const s = new Subject<S>();
return merge(
events,
s
.pipe(
map(transactions),
scan(diff, new Set()),
mergeMap(from),
mergeMap(transactionStatus)
)
.pipe(mergeScan(process, initialState, 1), tap(s))
);
s.pipe(
map(transactions),
scan(
([allTxs, _], currentTxs) => [currentTxs, diff(currentTxs, allTxs)],
[new Set(), new Set()]
),
map(([_, newTxs]) => newTxs),
mergeMap(from),
mergeMap(transactionStatus)
)
).pipe(mergeScan(applyChange, initialState, 1), tap(s));
}

type Event =
Expand All @@ -71,27 +77,48 @@ function applyChange(
return of();
}

function operator(provider: RpcProvider) {
function l2TxHashAndStatusToTransaction(tx: L2TxHashAndStatus): Transaction {
return { type: 'l2tx', ...tx };
}

function l2EventToEvent(e: L2Event): Event {
return { type: 'l2event', ...e };
}

function operator(provider: RpcProvider): Observable<OperatorState> {
const initialState = new OperatorState(); // load from storage

const l2BridgeContractAddress = '';

provider.waitForTransaction;

return operatorMain(
// events
merge(
l2Events(provider, initialState.l2BlockNumber, [l2BridgeContractAddress])
l2Events(provider, initialState.l2BlockNumber, [
l2BridgeContractAddress,
]).pipe(map(l2EventToEvent))
// add l1Events
),
(state: OperatorState) => {
return getAllL2Txs(state);
// add l2 transactions
return mapSet(getAllL2Txs(state), l2TxHashAndStatusToTransaction);
// add l1 transactions
},
(tx: Transaction) => {
switch (tx.type) {
case 'l2tx':
return l2TransactionStatus(provider, tx);
case 'l1tx':
throw new Error('not implemented'); // add l1 transactions
default:
const _exhaustive: never = tx;
return _exhaustive;
}
},
(tx) => l2TransactionStatus(provider, tx), // add l1 transactions
applyChange,
initialState
).pipe(
distinctUntilChanged()
distinctUntilChanged(_.isEqual)
//save state to storage
);
}

0 comments on commit 8068f1b

Please sign in to comment.