Skip to content

Commit a1cdfab

Browse files
author
snorochevskiy
committed
Resolving conflicts
1 parent 395c02e commit a1cdfab

File tree

5 files changed

+262
-1
lines changed

5 files changed

+262
-1
lines changed

nft_ingester/cleaners/fork_cleaner.rs

+255
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
use crate::consistency_calculator::NftChangesTracker;
2+
use entities::models::ForkedItem;
3+
use interface::fork_cleaner::{CompressedTreeChangesManager, ForkChecker};
4+
use metrics_utils::ForkCleanerMetricsConfig;
5+
use rocks_db::storage_consistency::BubblegumChangeKey;
6+
use rocks_db::storage_consistency::DataConsistencyStorage;
7+
use rocks_db::Storage;
8+
use solana_sdk::pubkey::Pubkey;
9+
use solana_sdk::signature::Signature;
10+
use std::sync::atomic::AtomicU64;
11+
use std::sync::Arc;
12+
use std::time::Duration;
13+
use tokio::sync::broadcast::Receiver;
14+
use tokio::task::JoinError;
15+
use tokio::time::sleep as tokio_sleep;
16+
use tokio::time::Instant;
17+
use tracing::info;
18+
19+
const CI_ITEMS_DELETE_BATCH_SIZE: usize = 100;
20+
const SLOT_CHECK_OFFSET: u64 = 1500;
21+
22+
pub async fn run_fork_cleaner(
23+
fork_cleaner: ForkCleaner<Storage, Storage>,
24+
metrics: Arc<ForkCleanerMetricsConfig>,
25+
mut rx: Receiver<()>,
26+
sequence_consistent_checker_wait_period_sec: u64,
27+
) -> Result<(), JoinError> {
28+
info!("Start cleaning forks...");
29+
loop {
30+
let start = Instant::now();
31+
fork_cleaner.clean_forks(rx.resubscribe()).await;
32+
metrics.set_scans_latency(start.elapsed().as_secs_f64());
33+
metrics.inc_total_scans();
34+
tokio::select! {
35+
_ = tokio_sleep(Duration::from_secs(sequence_consistent_checker_wait_period_sec)) => {},
36+
_ = rx.recv() => {
37+
info!("Received stop signal, stopping cleaning forks!");
38+
break;
39+
}
40+
}
41+
}
42+
43+
Ok(())
44+
}
45+
46+
static FORK_CLEANER_LAST_CHECKED_SLOT: AtomicU64 = AtomicU64::new(0);
47+
48+
pub fn last_fork_cleaned_slot() -> u64 {
49+
FORK_CLEANER_LAST_CHECKED_SLOT.load(std::sync::atomic::Ordering::Relaxed)
50+
}
51+
52+
pub struct ForkCleaner<CM, FC>
53+
where
54+
CM: CompressedTreeChangesManager,
55+
FC: ForkChecker,
56+
{
57+
cl_items_manager: Arc<CM>,
58+
fork_checker: Arc<FC>,
59+
data_consistency_storage: Arc<dyn DataConsistencyStorage + Send + Sync>,
60+
nft_changes_tracker: Option<Arc<NftChangesTracker>>,
61+
metrics: Arc<ForkCleanerMetricsConfig>,
62+
}
63+
64+
impl<CM, FC> ForkCleaner<CM, FC>
65+
where
66+
CM: CompressedTreeChangesManager,
67+
FC: ForkChecker,
68+
{
69+
pub fn new(
70+
cl_items_manager: Arc<CM>,
71+
fork_checker: Arc<FC>,
72+
data_consistency_storage: Arc<dyn DataConsistencyStorage + Send + Sync>,
73+
nft_changes_tracker: Option<Arc<NftChangesTracker>>,
74+
metrics: Arc<ForkCleanerMetricsConfig>,
75+
) -> Self {
76+
Self {
77+
cl_items_manager,
78+
fork_checker,
79+
data_consistency_storage,
80+
nft_changes_tracker,
81+
metrics,
82+
}
83+
}
84+
85+
pub async fn clean_forks(&self, rx: Receiver<()>) {
86+
let last_slot_for_check = self
87+
.fork_checker
88+
.last_slot_for_check()
89+
.saturating_sub(SLOT_CHECK_OFFSET);
90+
let all_non_forked_slots = self.fork_checker.get_all_non_forked_slots(rx.resubscribe());
91+
92+
let mut forked_slots = 0;
93+
let mut delete_items = Vec::new();
94+
let mut changes_to_delete = Vec::new();
95+
96+
// from this column data will be dropped by slot
97+
// if we have any update from forked slot we have to delete it
98+
for cl_item in self.cl_items_manager.cl_items_iter() {
99+
if !rx.is_empty() {
100+
info!("Stop iteration over cl items iterator...");
101+
return;
102+
}
103+
104+
if cl_item.slot_updated == 0 || cl_item.slot_updated > last_slot_for_check {
105+
continue;
106+
}
107+
108+
if !all_non_forked_slots.contains(&cl_item.slot_updated) {
109+
delete_items.push(ForkedItem {
110+
tree: cl_item.cli_tree_key,
111+
seq: cl_item.cli_seq,
112+
node_idx: cl_item.cli_node_idx,
113+
});
114+
}
115+
116+
if delete_items.len() >= CI_ITEMS_DELETE_BATCH_SIZE {
117+
self.delete_cl_items(&mut delete_items).await;
118+
}
119+
}
120+
121+
if !delete_items.is_empty() {
122+
self.delete_cl_items(&mut delete_items).await;
123+
}
124+
125+
let mut signatures_to_drop = Vec::new();
126+
127+
// fork cleaner iterate over signatures which are saved for each parsed transaction
128+
// so even if transaction was in fork this column family has it
129+
for signature in self.cl_items_manager.tree_seq_idx_iter() {
130+
if let Some(max_slot) = signature.slot_sequences.keys().max() {
131+
// if max slot for selected transaction(tx) is greater then last_slot_for_check
132+
// it means that tx is fresh and we should not check it such as there is high possibility
133+
// that it's updates will be overwritten
134+
if max_slot > &last_slot_for_check {
135+
continue;
136+
}
137+
138+
// here we have a vector because forked transaction can appear in different slots with same sequence
139+
// in such case we have to check if one of those blocks is in fork
140+
let mut slots_with_highest_sequence = vec![];
141+
// looking for a block with highest sequence because CLItems merge function checks that value
142+
// meaning CLItems will contain updates from the transaction with highest sequence, even if it has the lowest slot number
143+
let mut highest_sequence = 0;
144+
145+
for (slot, sequences) in &signature.slot_sequences {
146+
for seq in sequences {
147+
match seq.cmp(&highest_sequence) {
148+
std::cmp::Ordering::Greater => {
149+
highest_sequence = *seq;
150+
slots_with_highest_sequence.clear(); // Clear previous slots since a new highest sequence is found
151+
slots_with_highest_sequence.push(*slot);
152+
}
153+
std::cmp::Ordering::Equal => {
154+
slots_with_highest_sequence.push(*slot);
155+
}
156+
std::cmp::Ordering::Less => {
157+
// Do nothing
158+
}
159+
}
160+
}
161+
}
162+
163+
let mut clean_up = false;
164+
// check if either of slots appeared in fork
165+
for slot in slots_with_highest_sequence {
166+
if !all_non_forked_slots.contains(&slot) {
167+
clean_up = true;
168+
169+
forked_slots += 1;
170+
}
171+
}
172+
173+
if clean_up {
174+
// if at least one of the blocks appeared in a fork we need to drop all the tree sequences which are related to transaction
175+
// which fork cleaner is processing at the moment.
176+
//
177+
// since we may have saved sequence 5 (which is forked) to CLItems,
178+
// but the valid sequence for this transaction on the main branch is actually 4,
179+
// dropping only sequence 5 would result in an incorrect update during backfill.
180+
// therefore, we need to drop sequence 4 as well. Sequence 5 must be dropped because
181+
// it contains a different tree update in the main branch
182+
for (slot, sequences) in signature.slot_sequences.iter() {
183+
for seq in sequences {
184+
delete_items.push(ForkedItem {
185+
tree: signature.tree,
186+
seq: *seq,
187+
// in this context it doesn't matter what value we put in here
188+
// because deletion will happen by tree and seq values
189+
node_idx: 0,
190+
});
191+
changes_to_delete.push(BubblegumChangeKey::new(
192+
signature.tree,
193+
*slot,
194+
*seq,
195+
));
196+
}
197+
}
198+
}
199+
200+
signatures_to_drop.push((signature.signature, signature.tree, signature.leaf_idx));
201+
}
202+
203+
if delete_items.len() >= CI_ITEMS_DELETE_BATCH_SIZE {
204+
self.data_consistency_storage
205+
.drop_forked_bubblegum_changes(&changes_to_delete)
206+
.await;
207+
if let Some(changes_tracker) = self.nft_changes_tracker.as_ref() {
208+
changes_tracker
209+
.watch_remove_forked_bubblegum_changes(&changes_to_delete)
210+
.await;
211+
}
212+
self.delete_tree_seq_idx(&mut delete_items).await;
213+
}
214+
}
215+
216+
if !delete_items.is_empty() {
217+
self.data_consistency_storage
218+
.drop_forked_bubblegum_changes(&changes_to_delete)
219+
.await;
220+
if let Some(changes_tracker) = self.nft_changes_tracker.as_ref() {
221+
changes_tracker
222+
.watch_remove_forked_bubblegum_changes(&changes_to_delete)
223+
.await;
224+
}
225+
self.delete_tree_seq_idx(&mut delete_items).await;
226+
}
227+
228+
if !signatures_to_drop.is_empty() {
229+
self.delete_leaf_signatures(signatures_to_drop).await;
230+
}
231+
232+
FORK_CLEANER_LAST_CHECKED_SLOT
233+
.store(last_slot_for_check, std::sync::atomic::Ordering::Relaxed);
234+
self.metrics.set_forks_detected(forked_slots as i64);
235+
}
236+
237+
async fn delete_tree_seq_idx(&self, delete_items: &mut Vec<ForkedItem>) {
238+
self.metrics.inc_by_deleted_items(delete_items.len() as u64);
239+
self.cl_items_manager
240+
.delete_tree_seq_idx(std::mem::take(delete_items))
241+
.await;
242+
}
243+
244+
async fn delete_cl_items(&self, delete_items: &mut Vec<ForkedItem>) {
245+
self.metrics.inc_by_deleted_items(delete_items.len() as u64);
246+
self.cl_items_manager
247+
.delete_cl_items(std::mem::take(delete_items))
248+
.await;
249+
}
250+
251+
async fn delete_leaf_signatures(&self, keys: Vec<(Signature, Pubkey, u64)>) {
252+
self.metrics.inc_by_deleted_items(keys.len() as u64);
253+
self.cl_items_manager.delete_signatures(keys).await;
254+
}
255+
}

nft_ingester/src/bin/ingester/main.rs

+3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ use arweave_rs::consts::ARWEAVE_BASE_URL;
22
use arweave_rs::Arweave;
33
use entities::enums::{AssetType, ASSET_TYPES};
44
use nft_ingester::batch_mint::batch_mint_persister::{BatchMintDownloaderForPersister, BatchMintPersister};
5+
use nft_ingester::cleaners::indexer_cleaner::clean_syncronized_idxs;
6+
use nft_ingester::consistency_bg_job::FileSrcAuraPeersProvides;
7+
use nft_ingester::consistency_calculator::{self, NftChangesTracker, NTF_CHANGES_NOTIFICATION_QUEUE_SIZE};
58
use nft_ingester::scheduler::Scheduler;
69
use postgre_client::PG_MIGRATIONS_PATH;
710
use std::panic;

nft_ingester/src/consistency_calculator.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use tokio::sync::{
3232
};
3333
use tokio::time::Instant;
3434

35-
use crate::fork_cleaner::last_fork_cleaned_slot;
35+
use crate::cleaners::fork_cleaner::last_fork_cleaned_slot;
3636

3737
/// This flag is set to true before bubblegum epoch calculation is started,
3838
/// and set to false after the calculation is finished.

nft_ingester/src/fork_cleaner.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

nft_ingester/tests/api_tests.rs

+2
Original file line numberDiff line numberDiff line change
@@ -3772,6 +3772,7 @@ mod tests {
37723772
slot_updated: 10,
37733773
write_version: 10,
37743774
extensions: None,
3775+
data_hash: 0,
37753776
};
37763777

37773778
let owner: Pubkey = generated_assets.owners[1].owner.value.unwrap();
@@ -3787,6 +3788,7 @@ mod tests {
37873788
slot_updated: 10,
37883789
amount: 100,
37893790
write_version: 10,
3791+
data_hash: 0,
37903792
};
37913793
let mut batch_storage = BatchSaveStorage::new(
37923794
env.rocks_env.storage.clone(),

0 commit comments

Comments
 (0)