diff --git a/tap-agent/src/agent.rs b/tap-agent/src/agent.rs index 5030b648..0562c809 100644 --- a/tap-agent/src/agent.rs +++ b/tap-agent/src/agent.rs @@ -8,7 +8,10 @@ use indexer_common::prelude::{ escrow_accounts, indexer_allocations, DeploymentDetails, SubgraphClient, }; -use crate::{aggregator_endpoints, config, database, tap::accounts_manager}; +use crate::{ + aggregator_endpoints, config, database, + tap::sender_allocation_relationships_manager::SenderAllocationRelationshipsManager, +}; pub async fn start_agent(config: &'static config::Cli) { let pgpool = database::connect(&config.postgres).await; @@ -78,7 +81,7 @@ pub async fn start_agent(config: &'static config::Cli) { verifying_contract: config.receipts.receipts_verifier_address, }; - let _accounts_manager = accounts_manager::AccountsManager::new( + let _sender_allocation_relationships_manager = SenderAllocationRelationshipsManager::new( config, pgpool, indexer_allocations, diff --git a/tap-agent/src/tap/mod.rs b/tap-agent/src/tap/mod.rs index d648d3da..07e01cb1 100644 --- a/tap-agent/src/tap/mod.rs +++ b/tap-agent/src/tap/mod.rs @@ -1,12 +1,12 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -mod account; -pub mod accounts_manager; mod escrow_adapter; mod rav_storage_adapter; mod receipt_checks_adapter; mod receipt_storage_adapter; +mod sender_allocation_relationship; +pub mod sender_allocation_relationships_manager; #[cfg(test)] pub mod test_utils; diff --git a/tap-agent/src/tap/account.rs b/tap-agent/src/tap/sender_allocation_relationship.rs similarity index 82% rename from tap-agent/src/tap/account.rs rename to tap-agent/src/tap/sender_allocation_relationship.rs index aa5a81be..326ce896 100644 --- a/tap-agent/src/tap/account.rs +++ b/tap-agent/src/tap/sender_allocation_relationship.rs @@ -23,7 +23,7 @@ use tokio::{ }; use tracing::{error, warn}; -use super::accounts_manager::NewReceiptNotification; +use super::sender_allocation_relationships_manager::NewReceiptNotification; use crate::{ config::{self}, tap::{ @@ -64,20 +64,20 @@ struct Inner { config: &'static config::Cli, } -/// An Account is the relationship between the allocation and the sender in the context of a single -/// allocation. +/// A SenderAllocationRelationship is the relationship between the indexer and the sender in the +/// context of a single allocation. /// -/// Manages the lifecycle of Scalar TAP for the Account, including: +/// Manages the lifecycle of Scalar TAP for the SenderAllocationRelationship, including: /// - Monitoring new receipts and keeping track of the unaggregated fees. /// - Requesting RAVs from the sender's TAP aggregator once the unaggregated fees reach a certain /// threshold. -/// - Requesting the last RAV from the sender's TAP aggregator (on Account EOL) -pub struct Account { +/// - Requesting the last RAV from the sender's TAP aggregator (on SenderAllocationRelationship EOL) +pub struct SenderAllocationRelationship { inner: Arc, rav_requester_task: Arc>>>, } -impl Account { +impl SenderAllocationRelationship { #[allow(clippy::too_many_arguments)] pub fn new( config: &'static config::Cli, @@ -400,8 +400,9 @@ impl Account { } } -impl Drop for Account { - /// Trying to make sure the RAV requester task is dropped when the account is dropped. +impl Drop for SenderAllocationRelationship { + /// Trying to make sure the RAV requester task is dropped when the SenderAllocationRelationship + /// is dropped. fn drop(&mut self) { let rav_requester_task = self.rav_requester_task.clone(); @@ -434,11 +435,11 @@ mod tests { const DUMMY_URL: &str = "http://localhost:1234"; - async fn create_account( + async fn create_sender_allocation_relationship( pgpool: PgPool, sender_aggregator_endpoint: String, escrow_subgraph_endpoint: &str, - ) -> Account { + ) -> SenderAllocationRelationship { let config = Box::leak(Box::new(config::Cli { config: None, ethereum: config::Ethereum { @@ -467,7 +468,7 @@ mod tests { let escrow_adapter = EscrowAdapter::new(escrow_accounts_eventual.clone()); - Account::new( + SenderAllocationRelationship::new( config, pgpool.clone(), *ALLOCATION_ID, @@ -480,13 +481,16 @@ mod tests { ) } - /// Test that the account correctly updates the unaggregated fees from the database when there - /// is no RAV in the database. + /// Test that the sender_allocation_relatioship correctly updates the unaggregated fees from the + /// database when there is no RAV in the database. /// - /// The account should consider all receipts found for the allocation and sender. + /// The sender_allocation_relatioship should consider all receipts found for the allocation and + /// sender. #[sqlx::test(migrations = "../migrations")] async fn test_update_unaggregated_fees_no_rav(pgpool: PgPool) { - let account = create_account(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await; + let sender_allocation_relatioship = + create_sender_allocation_relationship(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL) + .await; // Add receipts to the database. for i in 1..10 { @@ -497,25 +501,39 @@ mod tests { .unwrap(); } - // Let the account update the unaggregated fees from the database. - account.update_unaggregated_fees().await.unwrap(); + // Let the sender_allocation_relatioship update the unaggregated fees from the database. + sender_allocation_relatioship + .update_unaggregated_fees() + .await + .unwrap(); // Check that the unaggregated fees are correct. - assert_eq!(account.inner.unaggregated_fees.lock().await.value, 45u128); + assert_eq!( + sender_allocation_relatioship + .inner + .unaggregated_fees + .lock() + .await + .value, + 45u128 + ); } - /// Test that the account correctly updates the unaggregated fees from the database when there - /// is a RAV in the database as well as receipts which timestamp are lesser and greater than - /// the RAV's timestamp. + /// Test that the sender_allocation_relatioship correctly updates the unaggregated fees from the + /// database when there is a RAV in the database as well as receipts which timestamp are lesser + /// and greater than the RAV's timestamp. /// - /// The account should only consider receipts with a timestamp greater than the RAV's timestamp. + /// The sender_allocation_relatioship should only consider receipts with a timestamp greater + /// than the RAV's timestamp. #[sqlx::test(migrations = "../migrations")] async fn test_update_unaggregated_fees_with_rav(pgpool: PgPool) { - let account = create_account(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await; + let sender_allocation_relatioship = + create_sender_allocation_relationship(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL) + .await; // Add the RAV to the database. - // This RAV has timestamp 4. The Account should only consider receipts with a timestamp - // greater than 4. + // This RAV has timestamp 4. The sender_allocation_relatioship should only consider receipts + // with a timestamp greater than 4. let signed_rav = create_rav(*ALLOCATION_ID, SENDER.0.clone(), 4, 10).await; store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap(); @@ -528,18 +546,32 @@ mod tests { .unwrap(); } - // Let the account update the unaggregated fees from the database. - account.update_unaggregated_fees().await.unwrap(); + // Let the sender_allocation_relatioship update the unaggregated fees from the database. + sender_allocation_relatioship + .update_unaggregated_fees() + .await + .unwrap(); // Check that the unaggregated fees are correct. - assert_eq!(account.inner.unaggregated_fees.lock().await.value, 35u128); + assert_eq!( + sender_allocation_relatioship + .inner + .unaggregated_fees + .lock() + .await + .value, + 35u128 + ); } - /// Test that the account correctly ignores new receipt notifications with an ID lower than - /// the last receipt ID processed (be it from the DB or from a prior receipt notification). + /// Test that the sender_allocation_relatioship correctly ignores new receipt notifications with + /// an ID lower than the last receipt ID processed (be it from the DB or from a prior receipt + /// notification). #[sqlx::test(migrations = "../migrations")] async fn test_handle_new_receipt_notification(pgpool: PgPool) { - let account = create_account(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await; + let sender_allocation_relatioship = + create_sender_allocation_relationship(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL) + .await; // Add receipts to the database. let mut expected_unaggregated_fees = 0u128; @@ -552,11 +584,19 @@ mod tests { expected_unaggregated_fees += u128::from(i); } - account.update_unaggregated_fees().await.unwrap(); + sender_allocation_relatioship + .update_unaggregated_fees() + .await + .unwrap(); // Check that the unaggregated fees are correct. assert_eq!( - account.inner.unaggregated_fees.lock().await.value, + sender_allocation_relatioship + .inner + .unaggregated_fees + .lock() + .await + .value, expected_unaggregated_fees ); @@ -570,13 +610,18 @@ mod tests { timestamp_ns: 19, value: 19, }; - account + sender_allocation_relatioship .handle_new_receipt_notification(new_receipt_notification) .await; // Check that the unaggregated fees have *not* increased. assert_eq!( - account.inner.unaggregated_fees.lock().await.value, + sender_allocation_relatioship + .inner + .unaggregated_fees + .lock() + .await + .value, expected_unaggregated_fees ); @@ -588,14 +633,19 @@ mod tests { timestamp_ns: 20, value: 20, }; - account + sender_allocation_relatioship .handle_new_receipt_notification(new_receipt_notification) .await; expected_unaggregated_fees += 20; // Check that the unaggregated fees are correct. assert_eq!( - account.inner.unaggregated_fees.lock().await.value, + sender_allocation_relatioship + .inner + .unaggregated_fees + .lock() + .await + .value, expected_unaggregated_fees ); @@ -607,13 +657,18 @@ mod tests { timestamp_ns: 19, value: 19, }; - account + sender_allocation_relatioship .handle_new_receipt_notification(new_receipt_notification) .await; // Check that the unaggregated fees have *not* increased. assert_eq!( - account.inner.unaggregated_fees.lock().await.value, + sender_allocation_relatioship + .inner + .unaggregated_fees + .lock() + .await + .value, expected_unaggregated_fees ); } @@ -647,8 +702,8 @@ mod tests { ) .await; - // Create an account. - let account = create_account( + // Create a sender_allocation_relatioship. + let sender_allocation_relatioship = create_sender_allocation_relationship( pgpool.clone(), "http://".to_owned() + &aggregator_endpoint.to_string(), &mock_server.uri(), @@ -664,11 +719,16 @@ mod tests { .unwrap(); } - // Let the account update the unaggregated fees from the database. - account.update_unaggregated_fees().await.unwrap(); + // Let the sender_allocation_relatioship update the unaggregated fees from the database. + sender_allocation_relatioship + .update_unaggregated_fees() + .await + .unwrap(); // Trigger a RAV request manually. - Account::rav_requester_try(&account.inner).await.unwrap(); + SenderAllocationRelationship::rav_requester_try(&sender_allocation_relatioship.inner) + .await + .unwrap(); // Stop the TAP aggregator server. handle.stop().unwrap(); @@ -704,8 +764,8 @@ mod tests { ) .await; - // Create an account. - let account = create_account( + // Create a sender_allocation_relatioship. + let sender_allocation_relatioship = create_sender_allocation_relationship( pgpool.clone(), "http://".to_owned() + &aggregator_endpoint.to_string(), &mock_server.uri(), @@ -726,7 +786,7 @@ mod tests { store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); - account + sender_allocation_relatioship .handle_new_receipt_notification(NewReceiptNotification { allocation_id: *ALLOCATION_ID, sender_address: SENDER.1, @@ -743,7 +803,12 @@ mod tests { } // Wait for the RAV requester to finish. - while Account::rav_requester_task_is_running(&account.rav_requester_task.lock().await) { + while SenderAllocationRelationship::rav_requester_task_is_running( + &sender_allocation_relatioship + .rav_requester_task + .lock() + .await, + ) { tokio::time::sleep(std::time::Duration::from_millis(100)).await; } @@ -773,10 +838,23 @@ mod tests { assert!(latest_rav.message.value_aggregate >= trigger_value); // Check that the unaggregated fees value is reduced. - assert!(account.inner.unaggregated_fees.lock().await.value <= trigger_value); + assert!( + sender_allocation_relatioship + .inner + .unaggregated_fees + .lock() + .await + .value + <= trigger_value + ); // Reset the total value and trigger value. - total_value = account.inner.unaggregated_fees.lock().await.value; + total_value = sender_allocation_relatioship + .inner + .unaggregated_fees + .lock() + .await + .value; trigger_value = 0; // Add more receipts @@ -789,7 +867,7 @@ mod tests { .await .unwrap(); - account + sender_allocation_relatioship .handle_new_receipt_notification(NewReceiptNotification { allocation_id: *ALLOCATION_ID, sender_address: SENDER.1, @@ -806,7 +884,12 @@ mod tests { } // Wait for the RAV requester to finish. - while Account::rav_requester_task_is_running(&account.rav_requester_task.lock().await) { + while SenderAllocationRelationship::rav_requester_task_is_running( + &sender_allocation_relatioship + .rav_requester_task + .lock() + .await, + ) { tokio::time::sleep(std::time::Duration::from_millis(100)).await; } @@ -837,7 +920,15 @@ mod tests { assert!(latest_rav.message.value_aggregate >= trigger_value); // Check that the unaggregated fees value is reduced. - assert!(account.inner.unaggregated_fees.lock().await.value <= trigger_value); + assert!( + sender_allocation_relatioship + .inner + .unaggregated_fees + .lock() + .await + .value + <= trigger_value + ); // Stop the TAP aggregator server. handle.stop().unwrap(); diff --git a/tap-agent/src/tap/accounts_manager.rs b/tap-agent/src/tap/sender_allocation_relationships_manager.rs similarity index 72% rename from tap-agent/src/tap/accounts_manager.rs rename to tap-agent/src/tap/sender_allocation_relationships_manager.rs index 7676a501..9365666d 100644 --- a/tap-agent/src/tap/accounts_manager.rs +++ b/tap-agent/src/tap/sender_allocation_relationships_manager.rs @@ -15,8 +15,8 @@ use sqlx::{postgres::PgListener, PgPool}; use tokio::sync::RwLock; use tracing::{error, warn}; -use super::account::Account; use super::escrow_adapter::EscrowAdapter; +use super::sender_allocation_relationship::SenderAllocationRelationship; use crate::config; #[derive(Deserialize, Debug)] @@ -28,7 +28,7 @@ pub struct NewReceiptNotification { pub value: u128, } -pub struct AccountsManager { +pub struct SenderAllocationRelationshipsManager { _inner: Arc, new_receipts_watcher_handle: tokio::task::JoinHandle<()>, _eligible_allocations_senders_pipe: PipeHandle, @@ -38,8 +38,9 @@ pub struct AccountsManager { struct Inner { config: &'static config::Cli, pgpool: PgPool, - /// Map of (allocation_id, sender_address) to Account. - accounts: Arc>>, + /// Map of (allocation_id, sender_address) to SenderAllocationRelationship. + sender_allocation_relationships: + Arc>>, indexer_allocations: Eventual>, escrow_accounts: Eventual>, escrow_subgraph: &'static SubgraphClient, @@ -48,7 +49,7 @@ struct Inner { sender_aggregator_endpoints: HashMap, } -impl AccountsManager { +impl SenderAllocationRelationshipsManager { pub async fn new( config: &'static config::Cli, pgpool: PgPool, @@ -63,7 +64,7 @@ impl AccountsManager { let inner = Arc::new(Inner { config, pgpool, - accounts: Arc::new(RwLock::new(HashMap::new())), + sender_allocation_relationships: Arc::new(RwLock::new(HashMap::new())), indexer_allocations, escrow_accounts, escrow_subgraph, @@ -72,7 +73,7 @@ impl AccountsManager { sender_aggregator_endpoints, }); - Self::update_accounts( + Self::update_sender_allocation_relationships( &inner, inner .indexer_allocations @@ -86,11 +87,11 @@ impl AccountsManager { .expect("Should get escrow accounts from Eventual"), ) .await - .expect("Should be able to update accounts"); + .expect("Should be able to update sender_allocation_relationships"); - // Listen to pg_notify events. We start it before updating the unaggregated_fees for - // all accounts, so that we don't miss any receipts. PG will buffer the notifications - // until we start consuming them with `new_receipts_watcher`. + // Listen to pg_notify events. We start it before updating the unaggregated_fees for all + // SenderAllocationRelationship instances, so that we don't miss any receipts. PG will + // buffer the notifications until we start consuming them with `new_receipts_watcher`. let mut pglistener = PgListener::connect_with(&inner.pgpool.clone()) .await .unwrap(); @@ -102,11 +103,12 @@ impl AccountsManager { 'scalar_tap_receipt_notification'", ); - let mut accounts_write_lock = inner.accounts.write().await; + let mut sender_allocation_relationships_write_lock = + inner.sender_allocation_relationships.write().await; - // Create accounts for all outstanding receipts in the database, because they may - // be linked to allocations that are not eligible anymore, but still need to get - // aggregated. + // Create SenderAllocationRelationship instances for all outstanding receipts in the + // database, because they may be linked to allocations that are not eligible anymore, but + // still need to get aggregated. sqlx::query!( r#" SELECT DISTINCT allocation_id, sender_address @@ -123,11 +125,11 @@ impl AccountsManager { let sender = Address::from_str(&row.sender_address) .expect("sender_address should be a valid address"); - // Only create a account if it doesn't exist yet. + // Only create a SenderAllocationRelationship if it doesn't exist yet. if let std::collections::hash_map::Entry::Vacant(e) = - accounts_write_lock.entry((allocation_id, sender)) + sender_allocation_relationships_write_lock.entry((allocation_id, sender)) { - e.insert(Account::new( + e.insert(SenderAllocationRelationship::new( config, inner.pgpool.clone(), allocation_id, @@ -145,25 +147,25 @@ impl AccountsManager { } }); - // Update the unaggregated_fees for all accounts by pulling the receipts from the - // database. - for account in accounts_write_lock.values() { - account + // Update the unaggregated_fees for all SenderAllocationRelationship instances by pulling + // the receipts from the database. + for sender_allocation_relationship in sender_allocation_relationships_write_lock.values() { + sender_allocation_relationship .update_unaggregated_fees() .await .expect("should be able to update unaggregated_fees"); } - drop(accounts_write_lock); + drop(sender_allocation_relationships_write_lock); // Start the new_receipts_watcher task that will consume from the `pglistener` let new_receipts_watcher_handle = tokio::spawn(Self::new_receipts_watcher( pglistener, - inner.accounts.clone(), + inner.sender_allocation_relationships.clone(), )); // Start the eligible_allocations_senders_pipe that watches for changes in eligible senders - // and allocations and updates the accounts accordingly. + // and allocations and updates the SenderAllocationRelationship instances accordingly. let inner_clone = inner.clone(); let eligible_allocations_senders_pipe = eventuals::join(( inner.indexer_allocations.clone(), @@ -172,11 +174,18 @@ impl AccountsManager { .pipe_async(move |(indexer_allocations, escrow_accounts)| { let inner = inner_clone.clone(); async move { - Self::update_accounts(&inner, indexer_allocations, escrow_accounts) - .await - .unwrap_or_else(|e| { - error!("Error while updating accounts: {:?}", e); - }); + Self::update_sender_allocation_relationships( + &inner, + indexer_allocations, + escrow_accounts, + ) + .await + .unwrap_or_else(|e| { + error!( + "Error while updating sender_allocation_relationships: {:?}", + e + ); + }); } }); @@ -188,10 +197,12 @@ impl AccountsManager { } /// Continuously listens for new receipt notifications from Postgres and forwards them to the - /// corresponding account. + /// corresponding SenderAllocationRelationship. async fn new_receipts_watcher( mut pglistener: PgListener, - accounts: Arc>>, + sender_allocation_relationships: Arc< + RwLock>, + >, ) { loop { // TODO: recover from errors or shutdown the whole program? @@ -205,40 +216,45 @@ impl AccountsManager { NewReceiptNotification", ); - if let Some(account) = accounts.read().await.get(&( - new_receipt_notification.allocation_id, - new_receipt_notification.sender_address, - )) { - account + if let Some(sender_allocation_relationship) = + sender_allocation_relationships.read().await.get(&( + new_receipt_notification.allocation_id, + new_receipt_notification.sender_address, + )) + { + sender_allocation_relationship .handle_new_receipt_notification(new_receipt_notification) .await; } else { warn!( - "No account found for allocation_id {} and sender_address {} to process \ - new receipt notification. This should not happen.", + "No sender_allocation_relationship found for allocation_id {} and \ + sender_address {} to process new receipt notification. This should not \ + happen.", new_receipt_notification.allocation_id, new_receipt_notification.sender_address ); } } } - async fn update_accounts( + async fn update_sender_allocation_relationships( inner: &Inner, indexer_allocations: HashMap, escrow_accounts: HashMap, ) -> Result<()> { let eligible_allocations: Vec
= indexer_allocations.keys().copied().collect(); let senders: Vec
= escrow_accounts.keys().copied().collect(); - let mut accounts_write = inner.accounts.write().await; + let mut sender_allocation_relationships_write = + inner.sender_allocation_relationships.write().await; - // Create accounts for all currently eligible (allocation, sender) + // Create SenderAllocationRelationship instances for all currently eligible + // (allocation, sender) for allocation_id in &eligible_allocations { for sender in &senders { - // Only create an account if it doesn't exist yet. + // Only create a SenderAllocationRelationship if it doesn't exist yet. if let std::collections::hash_map::Entry::Vacant(e) = - accounts_write.entry((*allocation_id, *sender)) + sender_allocation_relationships_write.entry((*allocation_id, *sender)) { - e.insert(Account::new( + e.insert(SenderAllocationRelationship::new( inner.config, inner.pgpool.clone(), *allocation_id, @@ -259,21 +275,24 @@ impl AccountsManager { } } - // Trigger a last rav request for all accounts that correspond to ineligible - // (allocations, sender). - for ((allocation_id, sender), account) in accounts_write.iter() { + // Trigger a last rav request for all SenderAllocationRelationship instances that correspond + // to ineligible (allocations, sender). + for ((allocation_id, sender), sender_allocation_relatioship) in + sender_allocation_relationships_write.iter() + { if !eligible_allocations.contains(allocation_id) || !senders.contains(sender) { - account.start_last_rav_request().await + sender_allocation_relatioship.start_last_rav_request().await } } - // TODO: remove accounts that are finished. Ideally done in another async task? + // TODO: remove SenderAllocationRelationship instances that are finished. Ideally done in + // another async task? Ok(()) } } -impl Drop for AccountsManager { +impl Drop for SenderAllocationRelationshipsManager { fn drop(&mut self) { // Abort the notification watcher on drop. Otherwise it may panic because the PgPool could // get dropped before. (Observed in tests) @@ -295,14 +314,14 @@ mod tests { }; use crate::tap::{ - account::State, + sender_allocation_relationship::State, test_utils::{INDEXER, SENDER, TAP_EIP712_DOMAIN_SEPARATOR}, }; use super::*; #[sqlx::test(migrations = "../migrations")] - async fn test_account_creation_and_eol(pgpool: PgPool) { + async fn test_sender_allocation_relatioship_creation_and_eol(pgpool: PgPool) { let config = Box::leak(Box::new(config::Cli { config: None, ethereum: config::Ethereum { @@ -345,7 +364,7 @@ mod tests { .unwrap(), )); - let accounts = AccountsManager::new( + let sender_allocation_relatioships = SenderAllocationRelationshipsManager::new( config, pgpool.clone(), indexer_allocations_eventual, @@ -390,13 +409,13 @@ mod tests { // Add an escrow account to the escrow_accounts Eventual. escrow_accounts_writer.write(HashMap::from([(SENDER.1, U256::from_str("1000").unwrap())])); - // Wait for the account to be created. + // Wait for the SenderAllocationRelationship to be created. tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - // Check that the account was created. - assert!(accounts + // Check that the SenderAllocationRelationship was created. + assert!(sender_allocation_relatioships ._inner - .accounts + .sender_allocation_relationships .write() .await .contains_key(&(allocation_id, SENDER.1))); @@ -407,11 +426,11 @@ mod tests { // Wait a bit tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - // Check that the account state is last_rav_pending + // Check that the SenderAllocationRelationship state is last_rav_pending assert_eq!( - accounts + sender_allocation_relatioships ._inner - .accounts + .sender_allocation_relationships .read() .await .get(&(allocation_id, SENDER.1))