Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip #3101

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft

wip #3101

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 84 additions & 4 deletions w3sper.js/src/network/syncer/account.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ export class AccountSyncer extends EventTarget {
async balances(profiles) {
const balances = await accountsIntoRaw(profiles).then((rawUsers) =>
rawUsers.map((user) =>
this.#network.contracts.transferContract.call.account(user),
),
this.#network.contracts.transferContract.call.account(user)
)
);

return Promise.all(balances)
Expand All @@ -165,6 +165,86 @@ export class AccountSyncer extends EventTarget {
.then((buffers) => buffers.map(parseBalance));
}

/**
* Fetches the moonlight transactions history for
* the given profiles.
*
* @param {Array<Object>} profiles
* @param {Object} [options={}]
* @param {bigint} [options.from]
* @param {AbortSignal} [options.signal]
* @returns {ReadableStream}
*/
history(profiles, options = {}) {
const max = (a, b) => (a > b ? a : b);
const HISTORY_CHUNK_SIZE = 100n;
const { signal } = options;

let data = [];
let idx = 0;

return new ReadableStream({
cancel(reason) {
console.log("Stream canceled:", reason);
},

async pull(controller) {
if (signal?.aborted) {
this.cancel(signal.reason ?? "Abort signal received");
controller.close();
return;
}

if (idx < data.length) {
controller.enqueue(data[idx++]);
} else {
controller.close();
}
},

start: async (controller) => {
const to = options.from ?? (await this.#network.blockHeight);
const from = max(0n, to - HISTORY_CHUNK_SIZE);

data = await Promise.all(
profiles.map((profile) => {
const key = profile.account.toString();

return Promise.all(
["sender", "receiver"].map((type) =>
this.#network
.query(
`moonlightHistory(
${type}: "${key}",
fromBlock: ${from},
toBlock: ${to}
) { json }`,
{ signal }
)
.then((result) => result.moonlightHistory?.json ?? [])
)
);
})
)
.then((results) =>
results
.flat(2)
.map((data) => {
return {
blockHeight: BigInt(data.block_height),
id: data.origin,
};
})
.sort((a, b) => Number(b.blockHeight - a.blockHeight))
)
.catch((error) => {
console.error("Error fetching data", error);
controller.error(error);
});
},
});
}

/**
* Fetches the stakes for the given profiles.
*
Expand All @@ -174,8 +254,8 @@ export class AccountSyncer extends EventTarget {
async stakes(profiles) {
const stakes = await accountsIntoRaw(profiles).then((rawUsers) =>
rawUsers.map((user) =>
this.#network.contracts.stakeContract.call.get_stake(user),
),
this.#network.contracts.stakeContract.call.get_stake(user)
)
);

return Promise.all(stakes)
Expand Down
152 changes: 152 additions & 0 deletions w3sper.js/tests/history_test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Copyright (c) DUSK NETWORK. All rights reserved.

import {
AccountSyncer,
Bookkeeper,
Network,
ProfileGenerator,
Transfer,
useAsProtocolDriver,
} from "@dusk/w3sper";

import {
assert,
getLocalWasmBuffer,
seeder,
test,
Treasury,
} from "./harness.js";

const resolveAfter = (delay, value) =>
new Promise((resolve) => {
setTimeout(() => resolve(value), delay);
});

const skip = () => {};

const getSortedHashes = (txs) =>
txs
.toSorted((a, b) => Number(b.blockHeight - a.blockHeight))
.map((tx) => tx.id);

async function getTxsFromStream(txsStream) {
const result = [];

for await (const tx of txsStream) {
result.push(tx);
}

return result;
}

const profileIndexer = (rawTransfers) => (profileIdx) =>
rawTransfers.reduce((result, current, idx) => {
if (current.from === profileIdx || current.to === profileIdx) {
result.push(idx);
}

return result;
}, []);

skip("debug", async () => {
const network = await Network.connect("http://localhost:8080/");

const profileGenerator = new ProfileGenerator(seeder);
const profiles = [
await profileGenerator.default,
await profileGenerator.next(),
await profileGenerator.next(),
];
const syncer = new AccountSyncer(network);

const txs = await getTxsFromStream(await syncer.history(profiles));

console.log(txs);

await network.disconnect();
});

test("accounts history", async () => {
const rawTransfers = [
{ amount: 10n, from: 0, to: 1 },
{ amount: 30n, from: 0, to: 1 },
{ amount: 20n, from: 1, to: 0 },
{ amount: 12n, from: 2, to: 1 },
];
const getIndexesForProfile = profileIndexer(rawTransfers);
const { profiles, transfers } = await useAsProtocolDriver(
await getLocalWasmBuffer()
).then(async () => {
const profileGenerator = new ProfileGenerator(seeder);
const profiles = await Promise.all([
profileGenerator.default,
profileGenerator.next(),
profileGenerator.next(),
]);
const nonces = Array(profiles.length).fill(0n);
const transfers = await Promise.all(
rawTransfers.map((raw) =>
new Transfer(profiles[raw.from])
.amount(raw.amount)
.to(profiles[raw.to].account)
.nonce(nonces[raw.from]++)
.chain(Network.LOCALNET)
.gas({ limit: 500_000_000n })
.build()
)
);

return { profiles, transfers };
});

const network = await Network.connect("http://localhost:8080/");

for (const transfer of transfers) {
await network.execute(transfer);
await network.transactions.withId(transfer.hash).once.executed();
}

console.log("start waiting", new Date());

await resolveAfter(15000);

console.log("end waiting", new Date());

const syncer = new AccountSyncer(network);

let txs;

// All transactions
const allHashes = transfers
.flatMap((transfer) => [transfer.hash, transfer.hash])
.toReversed();

txs = await getTxsFromStream(syncer.history(profiles));

assert.equal(
getSortedHashes(txs).join(","),
allHashes.join(","),
"All hashes"
);

// Per profile transactions
for (let i = 0; i < profiles.length; i++) {
const profileHashes = getIndexesForProfile(i)
.map((idx) => transfers[idx].hash)
.toReversed();

txs = await getTxsFromStream(syncer.history([profiles[i]]));

assert.equal(
getSortedHashes(txs).join(","),
profileHashes.join(","),
`hashes for profile ${i}`
);
}

await network.disconnect();
});
Loading