Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pagination queries for balances endpoint #2490

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d4a6596
Unlock pagination for `balances()` query
rafal-ch Oct 24, 2024
4e0450b
Merge remote-tracking branch 'upstream/master' into rafal/2032_pagina…
rafal-ch Dec 10, 2024
aa7ee75
Fix typos
rafal-ch Dec 10, 2024
449775d
Remove `TODO` since indexer is no more
rafal-ch Dec 10, 2024
b2fe429
Support pagination queries for `balances` endpoint
rafal-ch Dec 10, 2024
4131855
Update PR number in the changelog
rafal-ch Dec 10, 2024
2dd9aef
Fix formatting
rafal-ch Dec 10, 2024
05f6851
Add test for balances pagination
rafal-ch Dec 10, 2024
5b010e4
Clean up pagination test
rafal-ch Dec 10, 2024
17a7ef4
Add `balances_reversed_pagination_without_base_asset()` test
rafal-ch Dec 10, 2024
73f50c0
Include message coins in balance pagination tests
rafal-ch Dec 10, 2024
c172d1b
Balances pagination supports base asset id
rafal-ch Dec 10, 2024
fdd2c11
Full coverage of balances pagination in integration tests
rafal-ch Dec 10, 2024
6dbc7a6
Simplify balance pagination tests
rafal-ch Dec 10, 2024
32d680f
Dodge a double-test glitch in `test_case`
rafal-ch Dec 11, 2024
088c50b
Make balances test more clear
rafal-ch Dec 11, 2024
695f99a
Test paginated balances using various chunk sizes
rafal-ch Dec 11, 2024
f109d1b
Simplify the implementation of balances query
rafal-ch Dec 11, 2024
27970eb
Update changelog
rafal-ch Dec 11, 2024
dbc32f3
Add assert for the returned chunk size
rafal-ch Dec 11, 2024
f866c54
Merge remote-tracking branch 'upstream/master' into rafal/2032_pagina…
rafal-ch Dec 13, 2024
b1bfb20
Fix the query for the base asset id
rafal-ch Dec 13, 2024
99e7b65
Fix formatting
rafal-ch Dec 13, 2024
d7c3b02
Merge remote-tracking branch 'upstream/master' into rafal/2032_pagina…
rafal-ch Dec 16, 2024
037351d
Remove not needed batch of tests
rafal-ch Dec 16, 2024
3ae5222
Ensure no balances are returned after the last page
rafal-ch Dec 16, 2024
af64206
Rename `non_base_asset_balance()` to `non_base_asset_balances()`
rafal-ch Dec 18, 2024
fd2b019
Update complexity for `balances` query
rafal-ch Dec 20, 2024
9c8ce1e
Merge remote-tracking branch 'upstream/master' into rafal/2032_pagina…
rafal-ch Dec 30, 2024
0c92b6a
Merge remote-tracking branch 'upstream/master' into rafal/2032_pagina…
rafal-ch Jan 7, 2025
a943e2b
Fix the pagination start point
rafal-ch Jan 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2327](https://github.com/FuelLabs/fuel-core/pull/2327): Add more services tests and more checks of the pool. Also add an high level documentation for users of the pool and contributors.
- [2416](https://github.com/FuelLabs/fuel-core/issues/2416): Define the `GasPriceServiceV1` task.
- [2033](https://github.com/FuelLabs/fuel-core/pull/2033): Remove `Option<BlockHeight>` in favor of `BlockHeightQuery` where applicable.
- [2490](https://github.com/FuelLabs/fuel-core/pull/2490): Added pagination support for the `balances` GraphQL query, available only when 'balances indexation' is enabled.
- [2472](https://github.com/FuelLabs/fuel-core/pull/2472): Added the `amountU128` field to the `Balance` GraphQL schema, providing the total balance as a `U128`. The existing `amount` field clamps any balance exceeding `U64` to `u64::MAX`.

### Fixed
Expand Down
9 changes: 5 additions & 4 deletions crates/fuel-core/src/graphql_api/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ pub trait OffChainDatabase: Send + Sync {
base_asset_id: &AssetId,
) -> StorageResult<TotalBalanceAmount>;

fn balances(
&self,
fn balances<'a>(
&'a self,
owner: &Address,
base_asset_id: &AssetId,
start: Option<AssetId>,
base_asset_id: &'a AssetId,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<(AssetId, TotalBalanceAmount)>>;
) -> BoxedIter<'a, StorageResult<(AssetId, TotalBalanceAmount)>>;

fn owned_coins_ids(
&self,
Expand Down
24 changes: 15 additions & 9 deletions crates/fuel-core/src/query/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,14 @@ impl ReadView {
pub fn balances<'a>(
&'a self,
owner: &'a Address,
start: Option<AssetId>,
direction: IterDirection,
base_asset_id: &'a AssetId,
) -> impl Stream<Item = StorageResult<AddressBalance>> + 'a {
if self.balances_enabled {
futures::future::Either::Left(self.balances_with_cache(
owner,
start,
base_asset_id,
direction,
))
Expand Down Expand Up @@ -140,17 +142,21 @@ impl ReadView {
fn balances_with_cache<'a>(
&'a self,
owner: &'a Address,
base_asset_id: &AssetId,
start: Option<AssetId>,
base_asset_id: &'a AssetId,
direction: IterDirection,
) -> impl Stream<Item = StorageResult<AddressBalance>> + 'a {
stream::iter(self.off_chain.balances(owner, base_asset_id, direction))
.map(move |result| {
result.map(|(asset_id, amount)| AddressBalance {
owner: *owner,
asset_id,
amount,
})
stream::iter(
self.off_chain
.balances(owner, start, base_asset_id, direction),
)
.map(move |result| {
result.map(|(asset_id, amount)| AddressBalance {
owner: *owner,
asset_id,
amount,
})
.yield_each(self.batch_size)
})
.yield_each(self.batch_size)
}
}
15 changes: 8 additions & 7 deletions crates/fuel-core/src/schema/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ impl BalanceQuery {
Ok(balance)
}

// TODO: This API should be migrated to the indexer for better support and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not something we want to do in the future anymore ? In my mind it's still something we want to do in the future to decouple both use cases of the node.

// discontinued within fuel-core.
#[graphql(complexity = "query_costs().balance_query")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[graphql(complexity = "query_costs().balance_query")]
#[graphql(complexity = "query_costs().balance_query + query_costs().storage_iterator \
+ (query_costs().storage_read + first.unwrap_or_default() as usize) * child_complexity \
+ (query_costs().storage_read + last.unwrap_or_default() as usize) * child_complexity\")]

We need to limit the number of requested balances=)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also makes this change breaking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in fd2b019

async fn balances(
&self,
Expand All @@ -92,18 +90,21 @@ impl BalanceQuery {
before: Option<String>,
) -> async_graphql::Result<Connection<AssetId, Balance, EmptyFields, EmptyFields>>
{
if before.is_some() || after.is_some() {
return Err(anyhow!("pagination is not yet supported").into())
}
let query = ctx.read_view()?;
if !query.balances_enabled && (before.is_some() || after.is_some()) {
return Err(anyhow!(
"Can not use pagination when balances indexation is not available"
)
.into())
}
let base_asset_id = *ctx
.data_unchecked::<ConsensusProvider>()
.latest_consensus_params()
.base_asset_id();
let owner = filter.owner.into();
crate::schema::query_pagination(after, before, first, last, |_, direction| {
crate::schema::query_pagination(after, before, first, last, |start, direction| {
Ok(query
.balances(&owner, direction, &base_asset_id)
.balances(&owner, (*start).map(Into::into), direction, &base_asset_id)
.map(|result| {
result.map(|balance| (balance.asset_id.into(), balance.into()))
}))
Expand Down
102 changes: 68 additions & 34 deletions crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,48 @@ impl OffChainDatabase for OffChainIterableKeyValueView {
}
}

fn balances(
&self,
fn balances<'a>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is a tricky function because of how we handle balances.

Hmm, do you remember why we decided to skip balances during iteration and add them at the begin?

I remember: if you don't have coins but have messages, they will not appear during the iteration.

The problem with the current solution is when start == base_asset_id. The current code will return an incorrect iterator.

Either we need to handle it differently, or come up with another solution. It would be nice if we could insert base asset balance into the mid of the iterator. Maybe we could change the indexation and make it always add empy indexation for the base asset(like entry in the balances table with 0 amount just for balances)?

Then during iteration we call always fetch its value by using self.base_asset_balance?

Copy link
Contributor Author

@rafal-ch rafal-ch Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two commits should solve the problem

&'a self,
owner: &Address,
base_asset_id: &AssetId,
start: Option<AssetId>,
base_asset_id: &'a AssetId,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<(AssetId, TotalBalanceAmount)>> {
) -> BoxedIter<'a, StorageResult<(AssetId, TotalBalanceAmount)>> {
match (direction, start) {
(IterDirection::Forward, None) => {
let base_asset_balance = self.base_asset_balance(base_asset_id, owner);
let non_base_asset_balance =
self.non_base_asset_balance(owner, None, direction, base_asset_id);
base_asset_balance
.chain(non_base_asset_balance)
.into_boxed()
}
(IterDirection::Forward, Some(asset_id)) => {
let start = CoinBalancesKey::new(owner, &asset_id);
self.non_base_asset_balance(owner, Some(start), direction, base_asset_id)
}
(IterDirection::Reverse, _) => {
let start = start.map(|asset_id| CoinBalancesKey::new(owner, &asset_id));
let base_asset_balance = self.base_asset_balance(base_asset_id, owner);
let non_base_asset_balance =
self.non_base_asset_balance(owner, start, direction, base_asset_id);
non_base_asset_balance
.chain(base_asset_balance)
.into_boxed()
}
}
}
}

impl OffChainIterableKeyValueView {
fn base_asset_balance(
&self,
base_asset_id: &AssetId,
owner: &Address,
) -> BoxedIter<'_, Result<(AssetId, u128), StorageError>> {
let base_asset_id = *base_asset_id;
let base_balance = self.balance(owner, &base_asset_id, &base_asset_id);
let base_asset_balance = match base_balance {
match base_balance {
Ok(base_asset_balance) => {
if base_asset_balance != 0 {
iter::once(Ok((base_asset_id, base_asset_balance))).into_boxed()
Expand All @@ -250,37 +283,38 @@ impl OffChainDatabase for OffChainIterableKeyValueView {
}
}
Err(err) => iter::once(Err(err)).into_boxed(),
};
}
}

let non_base_asset_balance = self
.iter_all_filtered_keys::<CoinBalances, _>(Some(owner), None, Some(direction))
.filter_map(move |result| match result {
Ok(key) if *key.asset_id() != base_asset_id => Some(Ok(key)),
Ok(_) => None,
Err(err) => Some(Err(err)),
})
.map(move |result| {
result.and_then(|key| {
let asset_id = key.asset_id();
let coin_balance =
self.storage_as_ref::<CoinBalances>()
.get(&key)?
.unwrap_or_default()
.into_owned() as TotalBalanceAmount;
Ok((*asset_id, coin_balance))
})
fn non_base_asset_balance<'a>(
&'a self,
owner: &Address,
start: Option<CoinBalancesKey>,
direction: IterDirection,
base_asset_id: &'a AssetId,
) -> BoxedIter<'_, Result<(AssetId, u128), StorageError>> {
self.iter_all_filtered_keys::<CoinBalances, _>(
Some(owner),
start.as_ref(),
Some(direction),
)
.filter_map(move |result| match result {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be valuable in the future to have a comment about this behavior because reading the comments on the review helped me but it's not really future proof and without it I wouldn't understand why None on base asset etc..

Ok(key) if *key.asset_id() != *base_asset_id => Some(Ok(key)),
Ok(_) => None,
Err(err) => Some(Err(err)),
})
.map(move |result| {
result.and_then(|key| {
let asset_id = key.asset_id();
let coin_balance =
self.storage_as_ref::<CoinBalances>()
.get(&key)?
.unwrap_or_default()
.into_owned() as TotalBalanceAmount;
Ok((*asset_id, coin_balance))
})
.into_boxed();

if direction == IterDirection::Forward {
base_asset_balance
.chain(non_base_asset_balance)
.into_boxed()
} else {
non_base_asset_balance
.chain(base_asset_balance)
.into_boxed()
}
})
.into_boxed()
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/services/gas_price_service/src/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub enum Error {
},
#[error("Failed to initialize updater: {0:?}")]
CouldNotInitUpdater(anyhow::Error),
#[error("Failed to convert metadata to concrete type. THere is no migration path for this metadata version")]
#[error("Failed to convert metadata to concrete type. There is no migration path for this metadata version")]
CouldNotConvertMetadata, // todo(https://github.com/FuelLabs/fuel-core/issues/2286)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/services/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ where
match shutdown.catch_unwind().await {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
tracing::error!("Go an error during shutdown of the task: {e}");
tracing::error!("Got an error during shutdown of the task: {e}");
}
Err(e) => {
if got_panic.is_some() {
Expand Down
Loading
Loading