Skip to content

Commit

Permalink
fix: make update drop-able
Browse files Browse the repository at this point in the history
  • Loading branch information
rustaceanrob committed Jan 20, 2025
1 parent 8f550e4 commit d4e1476
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub use kyoto::{

use kyoto::Receiver;
use kyoto::UnboundedReceiver;
use kyoto::{Event, IndexedBlock};
use kyoto::{BlockHash, Event, IndexedBlock};

pub mod builder;

Expand Down Expand Up @@ -97,6 +97,8 @@ pub struct UpdateSubscriber {
chain: local_chain::LocalChain,
// receive graph
graph: IndexedTxGraph<ConfirmationBlockTime, KeychainTxOutIndex<KeychainKind>>,
// staged changes for the chain
chain_changeset: BTreeMap<u32, Option<BlockHash>>,
}

impl UpdateSubscriber {
Expand All @@ -110,6 +112,7 @@ impl UpdateSubscriber {
receiver,
chain: LocalChain::from_tip(cp)?,
graph: IndexedTxGraph::new(index.clone()),
chain_changeset: BTreeMap::new(),
})
}

Expand All @@ -121,47 +124,48 @@ impl UpdateSubscriber {
/// running node. Production applications should define how the application handles
/// these events and displays them to end users.
pub async fn update(&mut self) -> Option<Update> {
let mut chain_changeset = BTreeMap::new();
while let Some(message) = self.receiver.recv().await {
match message {
Event::Block(IndexedBlock { height, block }) => {
let hash = block.header.block_hash();
chain_changeset.insert(height, Some(hash));
self.chain_changeset.insert(height, Some(hash));
let _ = self.graph.apply_block_relevant(&block, height);
}
Event::BlocksDisconnected(headers) => {
for header in headers {
let height = header.height;
chain_changeset.insert(height, None);
self.chain_changeset.insert(height, None);
}
}
Event::Synced(SyncUpdate {
tip,
recent_history,
}) => {
if chain_changeset.is_empty()
if self.chain_changeset.is_empty()
&& self.chain.tip().height() == tip.height
&& self.chain.tip().hash() == tip.hash
{
// return early if we're already synced
return None;
}
recent_history.into_iter().for_each(|(height, header)| {
chain_changeset.insert(height, Some(header.block_hash()));
self.chain_changeset
.insert(height, Some(header.block_hash()));
});
break;
}
}
}
self.chain
.apply_changeset(&local_chain::ChangeSet::from(chain_changeset))
.expect("chain was initialized with genesis");
Some(self.get_scan_response())
}

// When the client is believed to have synced to the chain tip of most work,
// we can return a wallet update.
fn get_scan_response(&mut self) -> Update {
let chain_changeset = core::mem::take(&mut self.chain_changeset);
self.chain
.apply_changeset(&local_chain::ChangeSet::from(chain_changeset))
.expect("chain was initialized with genesis");
let tx_update = TxUpdate::from(self.graph.graph().clone());
let graph = core::mem::take(&mut self.graph);
let last_active_indices = graph.index.last_used_indices();
Expand Down

0 comments on commit d4e1476

Please sign in to comment.