Skip to content
This repository has been archived by the owner on May 30, 2023. It is now read-only.

Add database saving #11

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 248 additions & 58 deletions Cargo.lock

Large diffs are not rendered by default.

24 changes: 11 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,21 @@ resolver = "2"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0.62"
borsh = { version = "0.10.3", features = ["bytes"] }
tokio = { version = "1", features = ["full"] }
jsonrpsee = { version = "0.16.2", features = ["http-client", "server"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = "0.10.6"
jupiter = { git = "https://github.com/Sovereign-Labs/Jupiter.git", rev = "0609c8ab69bffa1bbb1acc90dcad2f541d58a5e9"}
sov-app-template = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "5e43c3ee9b5785abdca33b21c86fd38dbd9285e0" }
accounts = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "5e43c3ee9b5785abdca33b21c86fd38dbd9285e0" }
bank = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "5e43c3ee9b5785abdca33b21c86fd38dbd9285e0" }
election = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "5e43c3ee9b5785abdca33b21c86fd38dbd9285e0" }
sovereign-sdk = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "5e43c3ee9b5785abdca33b21c86fd38dbd9285e0" }
sov-state = { git = "https://github.com/Sovereign-Labs/sovereign.git", features = ["temp"], rev = "5e43c3ee9b5785abdca33b21c86fd38dbd9285e0" }
sov-modules-api = { git = "https://github.com/Sovereign-Labs/sovereign.git", features = ["mocks"], rev = "5e43c3ee9b5785abdca33b21c86fd38dbd9285e0" }
sov-modules-macros = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "5e43c3ee9b5785abdca33b21c86fd38dbd9285e0" }
sovereign-db = { git = "https://github.com/Sovereign-Labs/sovereign.git", features = ["temp"], rev = "5e43c3ee9b5785abdca33b21c86fd38dbd9285e0" }
tokio = { version = "1", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
hex = "0.4.3"
tendermint = "0.32"

anyhow = "1.0.62"
jsonrpsee = { version = "0.16.2", features = ["http-client", "server"] }
jupiter = { git = "https://github.com/Sovereign-Labs/Jupiter.git", rev = "bbd0a9f67dece567e14f5184e303145dd3c379ad" }
demo-app = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "c90484449c0d6497fa5d845e38023845393411d" }
sov-modules-api = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "c90484449c0d6497fa5d845e38023845393411d" }
sov-app-template = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "c90484449c0d6497fa5d845e38023845393411d" }
sovereign-sdk = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "c90484449c0d6497fa5d845e38023845393411d" }
sovereign-db = { git = "https://github.com/Sovereign-Labs/sovereign.git", features = ["temp"], rev = "c90484449c0d6497fa5d845e38023845393411d" }
2 changes: 2 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[toolchain]
channel = "nightly-2023-04-17"
11 changes: 0 additions & 11 deletions src/data_generation.rs

This file was deleted.

14 changes: 0 additions & 14 deletions src/helpers.rs

This file was deleted.

224 changes: 102 additions & 122 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,185 +1,165 @@
use crate::runtime::GenesisConfig;
use bank::TokenConfig;
use jsonrpsee::http_client::HeaderMap;
use jupiter::da_service::{CelestiaService, FilteredCelestiaBlock};
use sha2::{Digest, Sha256};
use sov_app_template::{AppTemplate, Batch};
use sov_modules_api::{mocks::MockContext, Address};
use sov_state::ProverStorage;
use demo_app::app::create_demo_config;
use demo_app::app::NativeAppRunner;
use jupiter::da_service::{CelestiaService, DaServiceConfig};
use jupiter::types::NamespaceId;
use jupiter::verifier::CelestiaVerifier;
use jupiter::verifier::RollupParams;
use sov_modules_api::default_signature::private_key::DefaultPrivateKey;
use sovereign_sdk::da::DaVerifier;
use sovereign_sdk::services::da::{DaService, SlotData};
use sovereign_sdk::stf::{StateTransitionFunction, StateTransitionRunner};
use sov_app_template::SequencerOutcome;
use sovereign_db::{
ledger_db::{LedgerDB, SlotCommitBuilder},
schema::types::{
BatchNumber, DbBytes, EventNumber, Status, StoredBatch, StoredSlot, StoredTransaction,
TxNumber,
},
};
use sovereign_sdk::serial::Decode;
use sovereign_sdk::services::da::SlotData;
use sovereign_sdk::{da::BlobTransactionTrait, stf::StateTransitionFunction};
use sovereign_sdk::{serial::Encode, services::da::DaService};

use tracing::Level;
use tx_verifier_impl::DemoAppTxVerifier;

use crate::{runtime::Runtime, tx_hooks_impl::DemoAppTxHooks};

mod data_generation;
mod helpers;
mod rpc;
mod runtime;
mod tx_hooks_impl;
mod tx_verifier_impl;

type C = MockContext;
type DemoApp =
AppTemplate<C, DemoAppTxVerifier<C>, Runtime<C>, DemoAppTxHooks<C>, GenesisConfig<C>>;
const CELESTIA_NODE_AUTH_TOKEN: &'static str = "";

const START_HEIGHT: u64 = HEIGHT_OF_FIRST_TXS - 5;
// I sent 8 demo election transactions at height 293686, generated using the demo app data generator
const HEIGHT_OF_FIRST_TXS: u64 = 293686;
pub fn default_celestia_service() -> CelestiaService {
let mut headers = HeaderMap::new();
headers.insert(
"Authorization",
format!("Bearer {}", CELESTIA_NODE_AUTH_TOKEN)
.parse()
.unwrap(),
);
let client = jsonrpsee::http_client::HttpClientBuilder::default()
.set_headers(headers)
.max_request_body_size(1024 * 1024 * 100) // 100 MB
.build("http://localhost:11111/")
.unwrap();
CelestiaService::with_client(client)
}

fn sha2(data: impl AsRef<[u8]>) -> [u8; 32] {
let mut hasher = Sha256::new();
hasher.update(data.as_ref());
hasher.finalize().into()
}

const START_HEIGHT: u64 = HEIGHT_OF_FIRST_TXS - 1;
const DATA_DIR_LOCATION: &'static str = "demo_data";
const ROLLUP_NAMESPACE: NamespaceId = NamespaceId([115, 111, 118, 45, 116, 101, 115, 116]);

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let sequencer_address: Address = [1u8; 32].into();

// Initializing logging
let subscriber = tracing_subscriber::fmt()
.with_max_level(Level::WARN)
.finish();
tracing::subscriber::set_global_default(subscriber)
.map_err(|_err| eprintln!("Unable to set global default subscriber"))
.expect("Cannot fail to set subscriber");
let cel_service = default_celestia_service();

// Initialize the Celestia service
let ledger_db = LedgerDB::with_path(DATA_DIR_LOCATION).unwrap();
let storage = ProverStorage::with_path(DATA_DIR_LOCATION).unwrap();
let mut demo = DemoApp::new(
storage.clone(),
Runtime::new(),
DemoAppTxVerifier::new(),
DemoAppTxHooks::new(),
GenesisConfig::new(
(),
bank::BankConfig {
tokens: vec![TokenConfig {
token_name: "sovereign".to_string(),
address_and_balances: vec![(sequencer_address, 1000)],
}],
},
accounts::AccountConfig { pub_keys: vec![] },
),
let mut demo_runner = NativeAppRunner::new(DATA_DIR_LOCATION);
let celestia_rpc_auth_token = if !CELESTIA_NODE_AUTH_TOKEN.is_empty() {
CELESTIA_NODE_AUTH_TOKEN.to_string()
} else {
std::env::var("CELESTIA_NODE_AUTH_TOKEN").expect("please set CELESTIA_NODE_AUTH_TOKEN environment variable")
};
let da_service = CelestiaService::new(
DaServiceConfig {
celestia_rpc_auth_token,
celestia_rpc_address: "http://localhost:11111/".into(),
max_celestia_response_body_size: 1024 * 1024 * 100,
},
RollupParams {
namespace: ROLLUP_NAMESPACE,
},
);
let da_verifier = CelestiaVerifier::new(RollupParams {
namespace: ROLLUP_NAMESPACE,
});

// let rpc_ledger = ledger_db.clone();
// let _rpc_handle = rpc::RpcProvider::start(rpc_ledger, rpc_storage).await?;

// Initialize the demo app
let demo = demo_runner.inner_mut();
let genesis_config = create_demo_config(
100000000,
&DefaultPrivateKey::generate(),
&DefaultPrivateKey::generate(),
);

let rpc_ledger = ledger_db.clone();
let rpc_storage = storage.clone();

let _rpc_handle = rpc::RpcProvider::start(rpc_ledger, rpc_storage).await?;

let mut item_numbers = ledger_db.get_next_items_numbers();
if item_numbers.slot_number == 1 {
if item_numbers.slot_number > 1 {
print!("No history detected. Initializing chain...");
demo.init_chain(());
demo.init_chain(genesis_config);
println!("Done.");
} else {
println!("Chain is already initialized. Skipping initialization.");
}
let last_slot_processed_before_shutdown = item_numbers.slot_number - 1;

println!("Beginning sync from da slot {}...", START_HEIGHT);
for i in 0.. {
let height = START_HEIGHT + i;
if last_slot_processed_before_shutdown > i {
continue;
}
demo.begin_slot(Default::default());
let (prev_state_root, _, _) = demo.end_slot();
let mut prev_state_root = prev_state_root.0;

let last_slot_processed_before_shutdown = item_numbers.slot_number - 1;

let filtered_block: FilteredCelestiaBlock = cel_service.get_finalized_at(height).await?;
let start_height = std::cmp::max(START_HEIGHT,last_slot_processed_before_shutdown);
// Request data from the DA layer and apply it to the demo app
for height in start_height.. {
println!(
"Requesting data for height {} and prev_state_root 0x{}",
height,
hex::encode(&prev_state_root)
);
let filtered_block = da_service.get_finalized_at(height).await?;
let slot_hash = filtered_block.hash();
let header = filtered_block.header.clone();
let (blob_txs, inclusion_proof, completeness_proof) =
da_service.extract_relevant_txs_with_proof(filtered_block);
assert!(da_verifier
.verify_relevant_tx_list(&header, &blob_txs, inclusion_proof, completeness_proof)
.is_ok());
println!("Received {} blobs", blob_txs.len());


let mut data_to_persist = SlotCommitBuilder::default();
let batches = cel_service.extract_relevant_txs(filtered_block);

demo.begin_slot();
let num_batches = batches.len();
for raw_batch in batches {
let mut data = raw_batch.data();
let batch = match Batch::decode(&mut data) {
Ok(batch) => batch,
Err(e) => {
println!(" Error decoding batch: {}. Skipping.", e);
continue;
}
};
let batch_hash = sha2(batch.encode_to_vec());
demo.begin_slot(Default::default());

let num_batches = blob_txs.len();
for blob in blob_txs {
let sender = blob.sender.as_ref().to_vec();

let batch_receipt = demo.apply_blob(blob, None);

let batch_hash = batch_receipt.batch_hash;
let tx_start = item_numbers.tx_number;
let num_txs = batch.txs.len();
let num_txs = batch_receipt.tx_receipts.len();

let mut batch_to_store = StoredBatch {
sender: raw_batch.sender.as_ref().to_vec(),
sender: sender.clone(),
hash: batch_hash,
extra_data: DbBytes::new(raw_batch.sender.as_ref().to_vec()),
extra_data: DbBytes::new(sender),
txs: TxNumber(tx_start)..TxNumber(tx_start + num_txs as u64),
status: Status::Skipped,
};
item_numbers.tx_number += num_txs as u64;
print!(" Applying batch of {} transactions...", num_txs);

match demo.apply_batch(batch.clone(), raw_batch.sender().as_ref(), None) {
Ok(events) => {
println!(" Done!");
match batch_receipt.inner {
SequencerOutcome::Rewarded => {
batch_to_store.status = Status::Applied;
data_to_persist.txs.extend(batch.txs.into_iter().map(|tx| {
data_to_persist.batches.push(batch_to_store);
data_to_persist.txs.extend(batch_receipt.tx_receipts.into_iter().map(|tx_receipt| {
let start_event_number = item_numbers.event_number;
let end_event_number = start_event_number + events.len() as u64;
let end_event_number = start_event_number + tx_receipt.events.len() as u64;
item_numbers.event_number = end_event_number;

StoredTransaction {
hash: sha2(&tx.data[..]),
hash: tx_receipt.tx_hash,
events: EventNumber(start_event_number)..EventNumber(end_event_number),
data: DbBytes::new(tx.data),
data: DbBytes::new(tx_receipt.body_to_save.unwrap_or_default()),
// TODO: Should be removed (Nikolai add issue)
status: Status::Applied,
}
}));
data_to_persist.batches.push(batch_to_store);

data_to_persist.events.extend(events.into_iter());
}
Err(e) => {
println!(
" Uh-oh! Failed to apply batch. Applying consensus set update {:?}",
e
);
}
}
}
SequencerOutcome::Slashed(_) | SequencerOutcome::Ignored =>
{
println!("Oops, Failed to apply batch")
}
};


demo.end_slot();
item_numbers.tx_number += num_txs as u64;
}
let (next_state_root, _witness, _) = demo.end_slot();
prev_state_root = next_state_root.0;
data_to_persist.slot_data = Some(StoredSlot {
hash: slot_hash,
extra_data: DbBytes::new(vec![]),
batches: BatchNumber(item_numbers.batch_number)
..BatchNumber(item_numbers.batch_number + num_batches as u64),
});
item_numbers.batch_number += num_batches as u64;
ledger_db.commit_slot(data_to_persist.finalize()?)?;
}

Ok(())
Expand Down
Loading