Skip to content

Commit

Permalink
Merge branch 'develop' into tooling-dashboard/hook-fiat-price
Browse files Browse the repository at this point in the history
  • Loading branch information
VmMad committed Jan 15, 2025
2 parents 7cdcf89 + 1b9f001 commit 5dad3ee
Show file tree
Hide file tree
Showing 218 changed files with 4,950 additions and 3,187 deletions.
240 changes: 122 additions & 118 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ members = [

[workspace.package]
# This version string will be inherited by iota-core, iota-faucet, iota-node, iota-tools, iota-sdk, iota-move-build, and iota crates.
version = "0.8.0-alpha"
version = "0.9.0-alpha"

[profile.release]
# debug = 1 means line charts only, which is minimum needed for good stack traces
Expand Down
8 changes: 4 additions & 4 deletions consensus/config/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ impl Committee {
}
}

/// -----------------------------------------------------------------------
/// Accessors to Committee fields.
// -----------------------------------------------------------------------
// Accessors to Committee fields.

pub fn epoch(&self) -> Epoch {
self.epoch
Expand Down Expand Up @@ -92,8 +92,8 @@ impl Committee {
.map(|(i, a)| (AuthorityIndex(i as u32), a))
}

/// -----------------------------------------------------------------------
/// Helpers for Committee properties.
// -----------------------------------------------------------------------
// Helpers for Committee properties.

/// Returns true if the provided stake has reached quorum (2f+1).
pub fn reached_quorum(&self, stake: Stake) -> bool {
Expand Down
6 changes: 3 additions & 3 deletions consensus/core/src/network/metrics_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
// Modifications Copyright (c) 2024 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

//! Tower layer adapters that allow specifying callbacks for request and
//! response handling can be implemented for different networking stacks.
use std::sync::Arc;

use prometheus::HistogramTimer;

use super::metrics::NetworkRouteMetrics;

/// Tower layer adapters that allow specifying callbacks for request and
/// response handling can be implemented for different networking stacks.
pub(crate) trait SizedRequest {
fn size(&self) -> usize;
fn route(&self) -> String;
Expand Down
1 change: 0 additions & 1 deletion consensus/core/src/test_dag_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use crate::{
/// dag_builder.print(); // print the parsed DAG
/// dag_builder.persist_all_blocks(dag_state.clone()); // persist all blocks to DagState
/// ```
pub(crate) fn parse_dag(dag_string: &str) -> IResult<&str, DagBuilder> {
let (input, _) = tuple((tag("DAG"), multispace0, char('{')))(dag_string)?;

Expand Down
8 changes: 3 additions & 5 deletions crates/iota-analytics-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn main() -> Result<()> {
batch_size: 10,
..Default::default()
};
let (executor, exit_sender) = setup_single_workflow(
let (executor, token) = setup_single_workflow(
processor,
remote_store_url,
watermark,
Expand All @@ -51,13 +51,11 @@ async fn main() -> Result<()> {
)
.await?;

tokio::spawn(async {
tokio::spawn(async move {
signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
exit_sender
.send(())
.expect("Failed to gracefully process shutdown");
token.cancel();
});
executor.await?;
Ok(())
Expand Down
7 changes: 4 additions & 3 deletions crates/iota-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,9 @@ pub struct AuthorityEpochTables {
/// Transactions that are being deferred until some future time
deferred_transactions: DBMap<DeferralKey, Vec<VerifiedSequencedConsensusTransaction>>,

/// Tables for recording state for RandomnessManager.
// Tables for recording state for RandomnessManager.

//
/// Records messages processed from other nodes. Updated when receiving a
/// new dkg::Message via consensus.
pub(crate) dkg_processed_messages: DBMap<PartyId, VersionedProcessedMessage>,
Expand Down Expand Up @@ -1358,8 +1359,8 @@ impl AuthorityPerEpochStore {
Ok(result)
}

/// `pending_certificates` table related methods. Should only be used from
/// TransactionManager.
// `pending_certificates` table related methods.
// Should only be used from TransactionManager.

/// Gets all pending certificates. Used during recovery.
pub fn all_pending_execution(&self) -> IotaResult<Vec<VerifiedExecutableTransaction>> {
Expand Down
1 change: 1 addition & 0 deletions crates/iota-data-ingestion-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde_json.workspace = true
tap.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-util.workspace = true
tracing.workspace = true
url.workspace = true

Expand Down
379 changes: 379 additions & 0 deletions crates/iota-data-ingestion-core/README.md

Large diffs are not rendered by default.

15 changes: 8 additions & 7 deletions crates/iota-data-ingestion-core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use iota_types::{
full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
};
use prometheus::Registry;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use crate::{
DataIngestionMetrics, ReaderOptions, Worker,
Expand Down Expand Up @@ -65,7 +66,7 @@ impl<P: ProgressStore> IndexerExecutor<P> {
remote_store_url: Option<String>,
remote_store_options: Vec<(String, String)>,
reader_options: ReaderOptions,
mut exit_receiver: oneshot::Receiver<()>,
token: CancellationToken,
) -> Result<ExecutorProgress> {
let mut reader_checkpoint_number = self.progress_store.min_watermark()?;
let (checkpoint_reader, mut checkpoint_recv, gc_sender, _exit_sender) =
Expand All @@ -83,7 +84,7 @@ impl<P: ProgressStore> IndexerExecutor<P> {
}
loop {
tokio::select! {
_ = &mut exit_receiver => break,
_ = token.cancelled() => break,
Some((task_name, sequence_number)) = self.pool_progress_receiver.recv() => {
self.progress_store.save(task_name.clone(), sequence_number).await?;
let seq_number = self.progress_store.min_watermark()?;
Expand Down Expand Up @@ -112,9 +113,9 @@ pub async fn setup_single_workflow<W: Worker + 'static>(
reader_options: Option<ReaderOptions>,
) -> Result<(
impl Future<Output = Result<ExecutorProgress>>,
oneshot::Sender<()>,
CancellationToken,
)> {
let (exit_sender, exit_receiver) = oneshot::channel();
let token = CancellationToken::new();
let metrics = DataIngestionMetrics::new(&Registry::new());
let progress_store = ShimProgressStore(initial_checkpoint_number);
let mut executor = IndexerExecutor::new(progress_store, 1, metrics);
Expand All @@ -126,8 +127,8 @@ pub async fn setup_single_workflow<W: Worker + 'static>(
Some(remote_store_url),
vec![],
reader_options.unwrap_or_default(),
exit_receiver,
token.clone(),
),
exit_sender,
token,
))
}
28 changes: 19 additions & 9 deletions crates/iota-data-ingestion-core/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use iota_types::{
use prometheus::Registry;
use rand::{SeedableRng, prelude::StdRng};
use tempfile::NamedTempFile;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;

use crate::{
DataIngestionMetrics, FileProgressStore, IndexerExecutor, ReaderOptions, Worker, WorkerPool,
Expand All @@ -48,21 +48,31 @@ async fn run(
batch_size: 1,
..Default::default()
};
let (sender, recv) = oneshot::channel();

match duration {
None => {
indexer
.run(path.unwrap_or_else(temp_dir), None, vec![], options, recv)
.run(
path.unwrap_or_else(temp_dir),
None,
vec![],
options,
CancellationToken::new(),
)
.await
}
Some(duration) => {
let handle = tokio::task::spawn(async move {
indexer
.run(path.unwrap_or_else(temp_dir), None, vec![], options, recv)
.await
});
let token = CancellationToken::new();
let token_child = token.child_token();
let handle = tokio::task::spawn(indexer.run(
path.unwrap_or_else(temp_dir),
None,
vec![],
options,
token_child,
));
tokio::time::sleep(duration).await;
drop(sender);
token.cancel();
handle.await?
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/iota-data-ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-util.workspace = true
tracing.workspace = true
url.workspace = true

Expand Down
18 changes: 9 additions & 9 deletions crates/iota-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use iota_data_ingestion::{
use iota_data_ingestion_core::{DataIngestionMetrics, IndexerExecutor, ReaderOptions, WorkerPool};
use prometheus::Registry;
use serde::{Deserialize, Serialize};
use tokio::{signal, sync::oneshot};
use tokio::signal;
use tokio_util::sync::CancellationToken;

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "lowercase")]
Expand Down Expand Up @@ -68,28 +69,27 @@ fn default_remote_read_batch_size() -> usize {
100
}

fn setup_env(exit_sender: oneshot::Sender<()>) {
fn setup_env(token: CancellationToken) {
let default_hook = std::panic::take_hook();

std::panic::set_hook(Box::new(move |panic| {
default_hook(panic);
std::process::exit(12);
}));

tokio::spawn(async {
tokio::spawn(async move {
signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
exit_sender
.send(())
.expect("Failed to gracefully process shutdown");
token.cancel();
});
}

#[tokio::main]
async fn main() -> Result<()> {
let (exit_sender, exit_receiver) = oneshot::channel();
setup_env(exit_sender);
let token = CancellationToken::new();
let token_child = token.child_token();
setup_env(token);

let args: Vec<String> = env::args().collect();
assert_eq!(args.len(), 2, "configuration yaml file is required");
Expand Down Expand Up @@ -152,7 +152,7 @@ async fn main() -> Result<()> {
config.remote_store_url,
config.remote_store_options,
reader_options,
exit_receiver,
token_child,
)
.await?;
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions crates/iota-e2e-tests/tests/snapshot_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ async fn basic_read_cmd_snapshot_tests() -> Result<(), anyhow::Error> {
"iota client objects 0x0000000000000000000000000000000000000000000000000000000000000000", /* empty addr */
"iota client object 0x5", // valid object
"iota client object 0x5 --bcs", // valid object BCS
"iota client object 0x9135cb3b5aca99a1555b742bd11ddc45fba33343be182bdc161be69da2c41be1", /* valid object */
"iota client object 0x9135cb3b5aca99a1555b742bd11ddc45fba33343be182bdc161be69da2c41be1 --bcs", /* valid object BCS */
"iota client object 0x4d03f39deb5e27a76a568adb591da553688e6df6fb053bc9ac069f2bd9495ae3", /* valid object */
"iota client object 0x4d03f39deb5e27a76a568adb591da553688e6df6fb053bc9ac069f2bd9495ae3 --bcs", /* valid object BCS */
"iota client object 0x0000000000000000000000000000000000000000000000000000000000000000", /* non-existent object */
"iota client tx-block E5Zp4QQ84PQEceSw4JRi4VTScSAQweKSgdwp9XH4aVPd", // valid tx digest
"iota client tx-block 88FqW2hyUgShTyLcGzbh6scZB45XZYmXpXSxydkBVTPu", // valid tx digest
"iota client tx-block 11111111111111111111111111111111", /* non-existent tx
* digest */
];
Expand Down
Loading

0 comments on commit 5dad3ee

Please sign in to comment.