Skip to content

Commit 9570fec

Browse files
committed
further improvement of the codebase with moving the persistence logic to the persistence layer
1 parent 46307a5 commit 9570fec

File tree

3 files changed

+142
-143
lines changed

3 files changed

+142
-143
lines changed

nft_ingester/src/gapfiller.rs

+2-134
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
use crate::error::IngesterError;
22
use entities::models::CompleteAssetDetails;
3-
use entities::models::Updated;
43
use futures::StreamExt;
54
use interface::asset_streaming_and_discovery::AssetDetailsStream;
65
use log::error;
7-
use rocks_db::asset::{AssetCollection, AssetLeaf};
8-
use rocks_db::cl_items::{ClItem, ClLeaf};
9-
use rocks_db::{AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, Storage};
10-
use serde_json::json;
11-
use std::collections::HashMap;
6+
use rocks_db::Storage;
127
use std::sync::Arc;
138

149
pub async fn process_asset_details_stream(storage: Arc<Storage>, mut stream: AssetDetailsStream) {
@@ -30,133 +25,6 @@ pub async fn insert_gaped_data(
3025
rocks_storage: Arc<Storage>,
3126
data: CompleteAssetDetails,
3227
) -> Result<(), IngesterError> {
33-
rocks_storage
34-
.asset_static_data
35-
.merge(
36-
data.pubkey,
37-
AssetStaticDetails {
38-
pubkey: data.pubkey,
39-
specification_asset_class: data.specification_asset_class,
40-
royalty_target_type: data.royalty_target_type,
41-
created_at: data.slot_created as i64,
42-
},
43-
)
44-
.await?;
45-
46-
rocks_storage
47-
.asset_dynamic_data
48-
.merge(
49-
data.pubkey,
50-
AssetDynamicDetails {
51-
pubkey: data.pubkey,
52-
is_compressible: data.is_compressible,
53-
is_compressed: data.is_compressed,
54-
is_frozen: data.is_frozen,
55-
supply: data.supply,
56-
seq: data.seq,
57-
is_burnt: data.is_burnt,
58-
was_decompressed: data.was_decompressed,
59-
onchain_data: data.onchain_data.map(|chain_data| {
60-
Updated::new(
61-
chain_data.slot_updated,
62-
chain_data.seq,
63-
json!(chain_data.value).to_string(),
64-
)
65-
}),
66-
creators: data.creators,
67-
royalty_amount: data.royalty_amount,
68-
url: data.url,
69-
},
70-
)
71-
.await?;
72-
73-
rocks_storage
74-
.asset_authority_data
75-
.merge(
76-
data.pubkey,
77-
AssetAuthority {
78-
pubkey: data.pubkey,
79-
authority: data.authority.value,
80-
slot_updated: data.authority.slot_updated,
81-
},
82-
)
83-
.await?;
84-
85-
if let Some(collection) = data.collection {
86-
rocks_storage
87-
.asset_collection_data
88-
.merge(
89-
data.pubkey,
90-
AssetCollection {
91-
pubkey: data.pubkey,
92-
collection: collection.value.collection,
93-
is_collection_verified: collection.value.is_collection_verified,
94-
collection_seq: collection.value.collection_seq,
95-
slot_updated: collection.slot_updated,
96-
},
97-
)
98-
.await?;
99-
}
100-
101-
if let Some(leaf) = data.asset_leaf {
102-
rocks_storage
103-
.asset_leaf_data
104-
.merge(
105-
data.pubkey,
106-
AssetLeaf {
107-
pubkey: data.pubkey,
108-
tree_id: leaf.value.tree_id,
109-
leaf: leaf.value.leaf.clone(),
110-
nonce: leaf.value.nonce,
111-
data_hash: leaf.value.data_hash,
112-
creator_hash: leaf.value.creator_hash,
113-
leaf_seq: leaf.value.leaf_seq,
114-
slot_updated: leaf.slot_updated,
115-
},
116-
)
117-
.await?
118-
}
119-
120-
rocks_storage
121-
.asset_owner_data
122-
.merge(
123-
data.pubkey,
124-
AssetOwner {
125-
pubkey: data.pubkey,
126-
owner: data.owner,
127-
delegate: data.delegate,
128-
owner_type: data.owner_type,
129-
owner_delegate_seq: data.owner_delegate_seq,
130-
},
131-
)
132-
.await?;
133-
134-
if let Some(leaf) = data.cl_leaf {
135-
rocks_storage.cl_leafs.put(
136-
(leaf.cli_leaf_idx, leaf.cli_tree_key),
137-
ClLeaf {
138-
cli_leaf_idx: leaf.cli_leaf_idx,
139-
cli_tree_key: leaf.cli_tree_key,
140-
cli_node_idx: leaf.cli_node_idx,
141-
},
142-
)?
143-
}
144-
let mut map = HashMap::new();
145-
data.cl_items.iter().for_each(|item| {
146-
map.insert(
147-
(item.cli_node_idx, item.cli_tree_key),
148-
ClItem {
149-
cli_node_idx: item.cli_node_idx,
150-
cli_tree_key: item.cli_tree_key,
151-
cli_leaf_idx: item.cli_leaf_idx,
152-
cli_seq: item.cli_seq,
153-
cli_level: item.cli_level,
154-
cli_hash: item.cli_hash.clone(),
155-
slot_updated: item.slot_updated,
156-
},
157-
);
158-
});
159-
rocks_storage.cl_items.merge_batch(map).await?;
160-
28+
rocks_storage.insert_gaped_data(data).await?;
16129
Ok(())
16230
}

rocks-db/src/batch_client.rs

+138-3
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@ use std::collections::{HashMap, HashSet};
22

33
use async_trait::async_trait;
44
use entities::enums::SpecificationVersions;
5+
use serde_json::json;
56
use solana_sdk::pubkey::Pubkey;
67

7-
use crate::asset::{AssetsUpdateIdx, SlotAssetIdx};
8+
use crate::asset::{AssetCollection, AssetLeaf, AssetsUpdateIdx, SlotAssetIdx};
9+
use crate::cl_items::{ClItem, ClLeaf};
810
use crate::column::TypedColumn;
11+
use crate::errors::StorageError;
912
use crate::key_encoders::{decode_u64x2_pubkey, encode_u64x2_pubkey};
1013
use crate::storage_traits::{AssetIndexReader, AssetSlotStorage, AssetUpdateIndexStorage};
11-
use crate::{Result, Storage};
12-
use entities::models::{AssetIndex, UrlWithStatus};
14+
use crate::{AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, Result, Storage};
15+
use entities::models::{AssetIndex, CompleteAssetDetails, Updated, UrlWithStatus};
1316

1417
impl AssetUpdateIndexStorage for Storage {
1518
fn last_known_asset_updated_key(&self) -> Result<Option<(u64, u64, Pubkey)>> {
@@ -245,3 +248,135 @@ impl AssetSlotStorage for Storage {
245248
Ok(None)
246249
}
247250
}
251+
252+
impl Storage {
253+
pub async fn insert_gaped_data(&self, data: CompleteAssetDetails) -> Result<()> {
254+
let mut batch = rocksdb::WriteBatch::default();
255+
self.asset_static_data.merge_with_batch(
256+
&mut batch,
257+
data.pubkey,
258+
&AssetStaticDetails {
259+
pubkey: data.pubkey,
260+
specification_asset_class: data.specification_asset_class,
261+
royalty_target_type: data.royalty_target_type,
262+
created_at: data.slot_created as i64,
263+
},
264+
)?;
265+
266+
self.asset_dynamic_data.merge_with_batch(
267+
&mut batch,
268+
data.pubkey,
269+
&AssetDynamicDetails {
270+
pubkey: data.pubkey,
271+
is_compressible: data.is_compressible,
272+
is_compressed: data.is_compressed,
273+
is_frozen: data.is_frozen,
274+
supply: data.supply,
275+
seq: data.seq,
276+
is_burnt: data.is_burnt,
277+
was_decompressed: data.was_decompressed,
278+
onchain_data: data.onchain_data.map(|chain_data| {
279+
Updated::new(
280+
chain_data.slot_updated,
281+
chain_data.seq,
282+
json!(chain_data.value).to_string(),
283+
)
284+
}),
285+
creators: data.creators,
286+
royalty_amount: data.royalty_amount,
287+
url: data.url,
288+
},
289+
)?;
290+
291+
self.asset_authority_data.merge_with_batch(
292+
&mut batch,
293+
data.pubkey,
294+
&AssetAuthority {
295+
pubkey: data.pubkey,
296+
authority: data.authority.value,
297+
slot_updated: data.authority.slot_updated,
298+
},
299+
)?;
300+
301+
if let Some(collection) = data.collection {
302+
self.asset_collection_data.merge_with_batch(
303+
&mut batch,
304+
data.pubkey,
305+
&AssetCollection {
306+
pubkey: data.pubkey,
307+
collection: collection.value.collection,
308+
is_collection_verified: collection.value.is_collection_verified,
309+
collection_seq: collection.value.collection_seq,
310+
slot_updated: collection.slot_updated,
311+
},
312+
)?;
313+
}
314+
315+
if let Some(leaf) = data.asset_leaf {
316+
self.asset_leaf_data.merge_with_batch(
317+
&mut batch,
318+
data.pubkey,
319+
&AssetLeaf {
320+
pubkey: data.pubkey,
321+
tree_id: leaf.value.tree_id,
322+
leaf: leaf.value.leaf.clone(),
323+
nonce: leaf.value.nonce,
324+
data_hash: leaf.value.data_hash,
325+
creator_hash: leaf.value.creator_hash,
326+
leaf_seq: leaf.value.leaf_seq,
327+
slot_updated: leaf.slot_updated,
328+
},
329+
)?
330+
}
331+
332+
self.asset_owner_data.merge_with_batch(
333+
&mut batch,
334+
data.pubkey,
335+
&AssetOwner {
336+
pubkey: data.pubkey,
337+
owner: data.owner,
338+
delegate: data.delegate,
339+
owner_type: data.owner_type,
340+
owner_delegate_seq: data.owner_delegate_seq,
341+
},
342+
)?;
343+
344+
if let Some(leaf) = data.cl_leaf {
345+
self.cl_leafs.put_with_batch(
346+
&mut batch,
347+
(leaf.cli_leaf_idx, leaf.cli_tree_key),
348+
&ClLeaf {
349+
cli_leaf_idx: leaf.cli_leaf_idx,
350+
cli_tree_key: leaf.cli_tree_key,
351+
cli_node_idx: leaf.cli_node_idx,
352+
},
353+
)?
354+
}
355+
for item in data.cl_items {
356+
self.cl_items.merge_with_batch(
357+
&mut batch,
358+
(item.cli_node_idx, item.cli_tree_key),
359+
&ClItem {
360+
cli_node_idx: item.cli_node_idx,
361+
cli_tree_key: item.cli_tree_key,
362+
cli_leaf_idx: item.cli_leaf_idx,
363+
cli_seq: item.cli_seq,
364+
cli_level: item.cli_level,
365+
cli_hash: item.cli_hash.clone(),
366+
slot_updated: item.slot_updated,
367+
},
368+
)?;
369+
}
370+
self.write_batch(batch).await?;
371+
Ok(())
372+
}
373+
374+
pub(crate) async fn write_batch(&self, batch: rocksdb::WriteBatch) -> Result<()> {
375+
let backend = self.db.clone();
376+
tokio::task::spawn_blocking(move || backend.write(batch))
377+
.await
378+
.map_err(|e| StorageError::Common(e.to_string()))?
379+
.map_err(|e| StorageError::Common(e.to_string()))?;
380+
Ok(())
381+
}
382+
}

rocks-db/src/transaction_client.rs

+2-6
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@ impl TransactionResultPersister for Storage {
1616
for tx in txs {
1717
self.store_transaction_result_with_batch(&mut batch, tx, false)?;
1818
}
19-
let backend = self.db.clone();
20-
tokio::task::spawn_blocking(move || backend.write(batch))
19+
self.write_batch(batch)
2120
.await
22-
.map_err(|e| StorageError::Common(e.to_string()))?
2321
.map_err(|e| StorageError::Common(e.to_string()))?;
2422
Ok(())
2523
}
@@ -33,10 +31,8 @@ impl Storage {
3331
) -> Result<(), StorageError> {
3432
let mut batch = rocksdb::WriteBatch::default();
3533
self.store_transaction_result_with_batch(&mut batch, tx, with_signatures)?;
36-
let backend = self.db.clone();
37-
tokio::task::spawn_blocking(move || backend.write(batch))
34+
self.write_batch(batch)
3835
.await
39-
.map_err(|e| StorageError::Common(e.to_string()))?
4036
.map_err(|e| StorageError::Common(e.to_string()))?;
4137
Ok(())
4238
}

0 commit comments

Comments
 (0)