Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: fix warns #16

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- added config seeds for avoiding manual process with seeds ([#6](https://github.com/8xFF/atm0s-small-p2p/pull/6))
- switch to use peer_id for send and recv ([#5](https://github.com/8xFF/atm0s-small-p2p/pull/5))
- visualization service ([#3](https://github.com/8xFF/atm0s-small-p2p/pull/3))
- mininum working version with service pkt and stream
- minimum working version with service pkt and stream

### Fixed

Expand Down
3 changes: 2 additions & 1 deletion src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl PeerDiscovery {
pub fn apply_sync(&mut self, now_ms: u64, sync: PeerDiscoverySync) {
log::debug!("[PeerDiscovery] apply sync with addrs: {:?}", sync.0);
for (peer, last_updated, address) in sync.0.into_iter() {
#[allow(clippy::collapsible_if)]
if last_updated + TIMEOUT_AFTER > now_ms {
if self.remotes.insert(peer, (last_updated, address)).is_none() {
log::info!("[PeerDiscovery] added new peer {peer}");
Expand All @@ -66,7 +67,7 @@ impl PeerDiscovery {
}
}
pub fn remotes(&self) -> impl Iterator<Item = PeerAddress> + '_ {
self.remotes.iter().map(|(p, (_, a))| PeerAddress(*p, a.clone())).chain(self.seeds.iter().map(|s| s.clone()))
self.remotes.iter().map(|(p, (_, a))| PeerAddress(*p, a.clone())).chain(self.seeds.iter().cloned())
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ enum PeerMainData {
Sync { route: RouterTableSync, advertise: PeerDiscoverySync },
}

#[allow(clippy::enum_variant_names)]
enum InternalEvent {
PeerConnected(ConnectionId, PeerId, u16),
PeerConnectError(ConnectionId, Option<PeerId>, anyhow::Error),
Expand Down Expand Up @@ -216,7 +217,7 @@ impl<SECURE: HandshakeProtocol> P2pNetwork<SECURE> {
fn process_tick(&mut self, now_ms: u64) -> anyhow::Result<P2pNetworkEvent> {
self.discovery.clear_timeout(now_ms);
for conn in self.neighbours.connected_conns() {
let peer_id = conn.peer_id().expect("conected neighbours should have peer_id");
let peer_id = conn.peer_id().expect("accepted neighbours should have peer_id");
let conn_id = conn.conn_id();
let route: router::RouterTableSync = self.router.create_sync(&peer_id);
let advertise = self.discovery.create_sync_for(now_ms, &peer_id);
Expand Down
6 changes: 3 additions & 3 deletions src/neighbours.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl NetworkNeighbours {
}

pub fn has_peer(&self, peer: &PeerId) -> bool {
self.conns.values().find(|c| c.peer_id().eq(&Some(*peer))).is_some()
self.conns.values().any(|c| c.peer_id().eq(&Some(*peer)))
}

pub fn mark_connected(&mut self, conn_id: &ConnectionId, peer: PeerId) -> Option<()> {
Expand All @@ -22,11 +22,11 @@ impl NetworkNeighbours {
}

pub fn remove(&mut self, conn_id: &ConnectionId) -> Option<()> {
self.conns.remove(&conn_id)?;
self.conns.remove(conn_id)?;
Some(())
}

pub fn connected_conns(&self) -> impl Iterator<Item = &PeerConnection> {
self.conns.values().into_iter().filter(|c| c.is_connected())
self.conns.values().filter(|c| c.is_connected())
}
}
1 change: 1 addition & 0 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ struct ConnectRes {
result: Result<Vec<u8>, String>,
}

#[allow(clippy::too_many_arguments)]
async fn run_connection<SECURE: HandshakeProtocol>(
secure: Arc<SECURE>,
ctx: SharedCtx,
Expand Down
4 changes: 2 additions & 2 deletions src/peer/peer_alias.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! PeerAlias allow control a peer-connection from othert task
//! This is done by using control_tx to send control to running task over chanel
//! This is done by using control_tx to send control to running task over channel

use tokio::sync::{mpsc::Sender, oneshot};

Expand Down Expand Up @@ -49,6 +49,6 @@ impl PeerConnectionAlias {
pub(crate) async fn open_stream(&self, service: P2pServiceId, source: PeerId, dest: PeerId, meta: Vec<u8>) -> anyhow::Result<P2pQuicStream> {
let (tx, rx) = oneshot::channel();
self.control_tx.send(PeerConnectionControl::OpenStream(service, source, dest, meta, tx)).await?;
Ok(rx.await??)
rx.await?
}
}
5 changes: 3 additions & 2 deletions src/peer/peer_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! We have some strict rules
//!
//! - Only use async with current connection stream
//! - For other communication shoud use try_send for avoding blocking
//! - For other communication should use try_send for avoiding blocking

use std::{net::SocketAddr, time::Duration};

Expand Down Expand Up @@ -42,6 +42,7 @@ pub struct PeerConnectionInternal {
}

impl PeerConnectionInternal {
#[allow(clippy::too_many_arguments)]
pub fn new(
ctx: SharedCtx,
conn_id: ConnectionId,
Expand Down Expand Up @@ -186,7 +187,7 @@ async fn accept_bi(to_peer: PeerId, mut stream: P2pQuicStream, ctx: SharedCtx) -
service_tx
.send(P2pServiceEvent::Stream(source, meta, stream))
.await
.print_on_err("[PeerConnectionInternal] send accpeted stream to service");
.print_on_err("[PeerConnectionInternal] send accepted stream to service");
Ok(())
} else {
log::warn!("[PeerConnectionInternal {to_peer}] stream service {service} source {source} to dest {dest} => service not found");
Expand Down
6 changes: 3 additions & 3 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl RouterTable {
self.peers.entry(*peer).or_default();
}

let mut new_paths = BTreeMap::<PeerId, PathMetric>::from_iter(sync.0.into_iter());
let mut new_paths = BTreeMap::<PeerId, PathMetric>::from_iter(sync.0);
// only loop over peer which don't equal source, because it is direct connection
for (peer, memory) in self.peers.iter_mut().filter(|(p, _)| !from_peer.eq(p)) {
let previous = memory.paths.contains_key(&conn);
Expand Down Expand Up @@ -121,7 +121,7 @@ impl RouterTable {
fn del_direct(&mut self, conn: &ConnectionId) {
if let Some((to, _)) = self.directs.remove(conn) {
if let Some(memory) = self.peers.get_mut(&to) {
memory.paths.remove(&conn);
memory.paths.remove(conn);
Self::select_best_for(&to, memory);
if memory.best().is_none() {
self.peers.remove(&to);
Expand Down Expand Up @@ -225,7 +225,7 @@ impl SharedRouterTable {
}

pub fn create_sync(&self, dest: &PeerId) -> RouterTableSync {
self.table.read().create_sync(&dest)
self.table.read().create_sync(dest)
}

pub fn apply_sync(&self, conn: ConnectionId, sync: RouterTableSync) {
Expand Down
4 changes: 2 additions & 2 deletions src/service/alias_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl AliasServiceInternal {
fn on_msg(&mut self, now: u64, from: PeerId, msg: AliasMessage) {
match msg {
AliasMessage::NotifySet(alias_id) => {
let slot = self.cache.get_or_insert_mut(alias_id, || HashSet::new());
let slot = self.cache.get_or_insert_mut(alias_id, HashSet::new);
slot.insert(from);
}
AliasMessage::NotifyDel(alias_id) => {
Expand All @@ -277,7 +277,7 @@ impl AliasServiceInternal {
}
}
AliasMessage::Found(alias_id) => {
let slot = self.cache.get_or_insert_mut(alias_id, || HashSet::new());
let slot = self.cache.get_or_insert_mut(alias_id, HashSet::new);
slot.insert(from);

if let Some(req) = self.find_reqs.remove(&alias_id) {
Expand Down
12 changes: 7 additions & 5 deletions src/service/pubsub_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl PubsubService {
for (_, sub_tx) in state.local_subscribers.iter() {
let _ = sub_tx.send(SubscriberEvent::PeerJoined(PeerSrc::Remote(from_peer)));
}
// we also send subscribe state it remote, as publisher it only care about whereever this node is a subscriber
// we also send subscribe state it remote, as publisher it only care about wherever this node is a subscriber
if !state.local_subscribers.is_empty() {
self.send_to(from_peer, &PubsubMessage::SubscriberJoined(channel)).await;
}
Expand All @@ -254,9 +254,9 @@ impl PubsubService {
for (_, pub_tx) in state.local_publishers.iter() {
let _ = pub_tx.send(PublisherEvent::PeerJoined(PeerSrc::Remote(from_peer)));
}
// we also send publisher state it remote, as subscriber it only care about whereever this node is a publisher
// we also send publisher state it remote, as subscriber it only care about wherever this node is a publisher
if !state.local_publishers.is_empty() {
self.send_to(from_peer, &&PubsubMessage::PublisherJoined(channel)).await;
self.send_to(from_peer, &PubsubMessage::PublisherJoined(channel)).await;
}
}
}
Expand Down Expand Up @@ -395,7 +395,7 @@ impl PubsubService {
let state = self.channels.entry(channel).or_default();
state.local_publishers.remove(&local_id);
if state.local_publishers.is_empty() {
// if this is last local_publisher => notify all subscibers
// if this is last local_publisher => notify all subscribers
for (_, sub_tx) in state.local_subscribers.iter() {
let _ = sub_tx.send(SubscriberEvent::PeerLeaved(PeerSrc::Local));
}
Expand All @@ -410,7 +410,7 @@ impl PubsubService {
let _ = tx.send(SubscriberEvent::PeerJoined(PeerSrc::Local));
}
if state.local_subscribers.is_empty() {
// if this is first local_subsciber => notify to all local_publishers
// if this is first local_subscriber => notify to all local_publishers
for (_, pub_tx) in state.local_publishers.iter() {
let _ = pub_tx.send(PublisherEvent::PeerJoined(PeerSrc::Local));
}
Expand Down Expand Up @@ -574,6 +574,7 @@ impl PubsubService {
if let PeerSrc::Remote(peer) = peer_src {
let _ = self.send_to(peer, &PubsubMessage::PublishRpcAnswer(data, rpc_id)).await;
} else {
#[allow(clippy::collapsible_if)]
if let Some(mut req) = self.publish_rpc_reqs.remove(&rpc_id) {
let _ = req.tx.take().expect("should have req_tx").send(Ok(data));
} else {
Expand All @@ -585,6 +586,7 @@ impl PubsubService {
if let PeerSrc::Remote(peer) = peer_src {
let _ = self.send_to(peer, &PubsubMessage::FeedbackRpcAnswer(data, rpc_id)).await;
} else {
#[allow(clippy::collapsible_if)]
if let Some(mut req) = self.feedback_rpc_reqs.remove(&rpc_id) {
let _ = req.tx.take().expect("should have req_tx").send(Ok(data));
} else {
Expand Down
Loading