Skip to content

Commit

Permalink
fixed: build warnings and clippy warnings (#328)
Browse files Browse the repository at this point in the history
* fix build warnings

* refactor: fix clippy warnings

* build: Set default Rust version and add code formatting tools

- Added `rust-toolchain.toml` file with `channel` set to `"1.79.0"`
- Added `rustfmt` and `clippy` components

* chore: add a TODO comment.

* chore: removed a comment that was confirmed.

* refactor: #[allow(clippy::large_enum_variant)]
  • Loading branch information
dhilipsiva authored Jul 10, 2024
1 parent c459b13 commit a6e1d51
Show file tree
Hide file tree
Showing 27 changed files with 240 additions and 187 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[workspace]
resolver = "2"
members = [
"bin",
"packages/protocol",
Expand All @@ -12,7 +13,7 @@ members = [
]

[workspace.dependencies]
sans-io-runtime = { git = "https://github.com/giangndm/sans-io-runtime.git", rev = "c781cef12b2a435b5e31a6ede69d301a23719452" }
sans-io-runtime = { git = "https://github.com/giangndm/sans-io-runtime.git", rev = "c781cef12b2a435b5e31a6ede69d301a23719452" , default-features = false}
atm0s-sdn = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", rev = "2ad5d3a092b63f871a90a9600d2fcc8cb3027a24" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] }
convert-enum = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion bin/src/http/api_console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ struct ConsoleAuthorization(());

async fn api_checker(req: &Request, api_key: ApiKey) -> Option<()> {
let data = req.data::<ConsoleApisCtx>()?;
data.secure.validate_token(&api_key.key).then(|| ())
data.secure.validate_token(&api_key.key).then_some(())
}
1 change: 1 addition & 0 deletions bin/src/http/api_console/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ impl Apis {
}

/// get events
#[allow(clippy::too_many_arguments)]
#[oai(path = "/:node/log/events", method = "get")]
async fn events(
&self,
Expand Down
4 changes: 2 additions & 2 deletions bin/src/quinn/vnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::vsocket::VirtualUdpSocket;

#[derive(Debug)]
pub struct NetworkPkt {
pub local_port: u16,
pub _local_port: u16,
pub remote: NodeId,
pub remote_port: u16,
pub data: Buffer,
Expand Down Expand Up @@ -77,7 +77,7 @@ impl VirtualNetwork {
let event = event?;
match event {
socket::Event::RecvFrom(local_port, remote, remote_port, data, meta) => {
let pkt = NetworkPkt { data, local_port, remote, remote_port, meta };
let pkt = NetworkPkt { data, _local_port: local_port, remote, remote_port, meta };
if let Some(socket_tx) = self.sockets.get(&local_port) {
if let Err(e) = socket_tx.try_send(pkt) {
log::error!("Send to socket {} error {:?}", local_port, e);
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct Args {
pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) {
rustls::crypto::ring::default_provider().install_default().expect("should install ring as default");

let mut connector_storage = Arc::new(ConnectorStorage::new(&args.db_uri).await);
let connector_storage = Arc::new(ConnectorStorage::new(&args.db_uri).await);

let default_cluster_cert_buf = include_bytes!("../../certs/cluster.cert");
let default_cluster_key_buf = include_bytes!("../../certs/cluster.key");
Expand Down
8 changes: 4 additions & 4 deletions bin/src/server/console/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl Storage {
ClusterNodeInfo::Console(generic) => {
let zone_id = node & 0xFF_FF_FF_00;
log::info!("Zone {zone_id} on console ping, zones {}", self.zones.len());
let zone = self.zones.entry(zone_id).or_insert_with(Default::default);
let zone = self.zones.entry(zone_id).or_default();
zone.consoles.insert(
node,
ConsoleContainer {
Expand All @@ -165,7 +165,7 @@ impl Storage {
ClusterNodeInfo::Gateway(generic, info) => {
let zone_id = node & 0xFF_FF_FF_00;
log::info!("Zone {zone_id} on gateway ping");
let zone = self.zones.entry(zone_id).or_insert_with(Default::default);
let zone = self.zones.entry(zone_id).or_default();
zone.lat = info.lat;
zone.lon = info.lon;
zone.gateways.insert(
Expand All @@ -181,7 +181,7 @@ impl Storage {
ClusterNodeInfo::Media(generic, info) => {
let zone_id = node & 0xFF_FF_FF_00;
log::info!("Zone {zone_id} on media ping");
let zone = self.zones.entry(zone_id).or_insert_with(Default::default);
let zone = self.zones.entry(zone_id).or_default();
zone.medias.insert(
node,
MediaContainer {
Expand All @@ -195,7 +195,7 @@ impl Storage {
ClusterNodeInfo::Connector(generic) => {
let zone_id = node & 0xFF_FF_FF_00;
log::info!("Zone {zone_id} on connector ping, zones {}", self.zones.len());
let zone = self.zones.entry(zone_id).or_insert_with(Default::default);
let zone = self.zones.entry(zone_id).or_default();
zone.connectors.insert(
node,
ConnectorContainer {
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod
media_server_gateway::store_service::Event::FindNodeRes(req_id, res) => requester.on_find_node_res(req_id, res),
},
SdnExtOut::ServicesEvent(_, _, SE::Connector(event)) => match event {
media_server_connector::agent_service::Event::Stats { queue, inflight, acked } => {}
media_server_connector::agent_service::Event::Stats { queue: _, inflight: _, acked: _ } => {}
},
SdnExtOut::FeaturesEvent(_, FeaturesEvent::Socket(event)) => {
if let Err(e) = vnet_tx.try_send(event) {
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
if let Some(metrics) = node_metrics_collector.pop_measure() {
controller.send_to(
0, //because sdn controller allway is run inside worker 0
ExtIn::NodeStats(metrics).into(),
ExtIn::NodeStats(metrics),
);
}
while let Ok(control) = vnet_rx.try_recv() {
Expand Down
3 changes: 1 addition & 2 deletions bin/src/server/media/runtime_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use media_server_gateway::NodeMetrics;
use media_server_protocol::transport::{RpcReq, RpcRes};
use media_server_runner::{Input as WorkerInput, MediaConfig, MediaServerWorker, Output as WorkerOutput, Owner, UserData, SC, SE, TC, TW};
use media_server_secure::MediaEdgeSecure;
use rand::random;
use sans_io_runtime::{BusChannelControl, BusControl, BusEvent, WorkerInner, WorkerInnerInput, WorkerInnerOutput};

use crate::NodeConfig;
Expand Down Expand Up @@ -58,7 +57,7 @@ impl<ES: 'static + MediaEdgeSecure> WorkerInner<Owner, ExtIn, ExtOut, Channel, E
let worker = MediaServerWorker::new(
index,
cfg.node.node_id,
random(),
cfg.session,
&cfg.node.secret,
cfg.controller,
cfg.node.udp_port,
Expand Down
4 changes: 3 additions & 1 deletion packages/audio_mixer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ impl<Src: Debug + Clone + Eq + Hash> AudioMixer<Src> {
for (i, slot) in self.outputs.iter().enumerate() {
if let Some(OutputSlotState { audio_level, source }) = slot {
if let Some((_, _, lowest_slot_audio_level)) = &mut lowest {
if *audio_level < *lowest_slot_audio_level || (*audio_level == *lowest_slot_audio_level) {
// TODO: We need to process some case we have same audio_level. Just check with smaller only:
// https://github.com/8xFF/atm0s-media-server/pull/328#discussion_r1667336073
if *audio_level <= *lowest_slot_audio_level {
lowest = Some((i, source.clone(), *audio_level));
}
} else {
Expand Down
12 changes: 12 additions & 0 deletions packages/media_connector/src/agent_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ pub struct ConnectorAgentService<UserData, SC, SE, TC, TW> {
_tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
}

impl<UserData, SC, SE, TC, TW> Default for ConnectorAgentService<UserData, SC, SE, TC, TW> {
fn default() -> Self {
Self::new()
}
}

impl<UserData, SC, SE, TC, TW> ConnectorAgentService<UserData, SC, SE, TC, TW> {
pub fn new() -> Self {
Self {
Expand Down Expand Up @@ -159,6 +165,12 @@ pub struct ConnectorAgentServiceBuilder<UserData, SC, SE, TC, TW> {
_tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
}

impl<UserData, SC, SE, TC, TW> Default for ConnectorAgentServiceBuilder<UserData, SC, SE, TC, TW> {
fn default() -> Self {
Self::new()
}
}

impl<UserData, SC, SE, TC, TW> ConnectorAgentServiceBuilder<UserData, SC, SE, TC, TW> {
pub fn new() -> Self {
Self { _tmp: std::marker::PhantomData }
Expand Down
12 changes: 12 additions & 0 deletions packages/media_connector/src/handler_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ pub struct ConnectorHandlerService<UserData, SC, SE, TC, TW> {
_tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
}

impl<UserData, SC, SE, TC, TW> Default for ConnectorHandlerService<UserData, SC, SE, TC, TW> {
fn default() -> Self {
Self::new()
}
}

impl<UserData, SC, SE, TC, TW> ConnectorHandlerService<UserData, SC, SE, TC, TW> {
pub fn new() -> Self {
Self {
Expand Down Expand Up @@ -152,6 +158,12 @@ pub struct ConnectorHandlerServiceBuilder<UserData, SC, SE, TC, TW> {
_tmp: std::marker::PhantomData<(UserData, SC, SE, TC, TW)>,
}

impl<UserData, SC, SE, TC, TW> Default for ConnectorHandlerServiceBuilder<UserData, SC, SE, TC, TW> {
fn default() -> Self {
Self::new()
}
}

impl<UserData, SC, SE, TC, TW> ConnectorHandlerServiceBuilder<UserData, SC, SE, TC, TW> {
pub fn new() -> Self {
Self { _tmp: std::marker::PhantomData }
Expand Down
4 changes: 2 additions & 2 deletions packages/media_connector/src/msg_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ impl<M: Message, const MAX_INFLIGHT: usize> MessageQueue<M, MAX_INFLIGHT> {

pub fn pop(&mut self, now_ms: u64) -> Option<&M> {
if let Some(msg_id) = self.pop_retry_msg_id(now_ms) {
let entry = self.inflight_ts.entry(now_ms).or_insert_with(Default::default);
let entry = self.inflight_ts.entry(now_ms).or_default();
entry.push(msg_id);
return Some(self.inflight.get(&msg_id).expect("should exist retry_msg_id"));
}

if self.inflight.len() < MAX_INFLIGHT {
let msg = self.queue.pop_front()?;
let msg_id = msg.msg_id();
let entry = self.inflight_ts.entry(now_ms).or_insert_with(Default::default);
let entry = self.inflight_ts.entry(now_ms).or_default();
entry.push(msg_id);
self.inflight.insert(msg_id, msg);
self.inflight.get(&msg_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl MigrationTrait for Migration {
enum Room {
Table,
Id,
#[allow(clippy::enum_variant_names)]
Room,
CreatedAt,
}
Expand All @@ -130,6 +131,7 @@ enum Peer {
Table,
Id,
Room,
#[allow(clippy::enum_variant_names)]
Peer,
CreatedAt,
}
Expand Down Expand Up @@ -165,6 +167,7 @@ enum Event {
NodeTs,
Session,
CreatedAt,
#[allow(clippy::enum_variant_names)]
Event,
Meta,
}
4 changes: 3 additions & 1 deletion packages/media_core/src/cluster/room/audio_mixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub enum Output<Endpoint> {
OnResourceEmpty,
}

type AudioMixerManuals<T> = TaskSwitcherBranch<TaskGroup<manual::Input, Output<T>, ManualMixer<T>, 4>, (usize, Output<T>)>;

pub struct AudioMixer<Endpoint: Clone> {
room: ClusterRoomHash,
mix_channel_id: ChannelId,
Expand All @@ -64,7 +66,7 @@ pub struct AudioMixer<Endpoint: Clone> {
subscriber1: TaskSwitcherBranch<AudioMixerSubscriber<Endpoint, 1>, Output<Endpoint>>,
subscriber2: TaskSwitcherBranch<AudioMixerSubscriber<Endpoint, 2>, Output<Endpoint>>,
subscriber3: TaskSwitcherBranch<AudioMixerSubscriber<Endpoint, 3>, Output<Endpoint>>,
manuals: TaskSwitcherBranch<TaskGroup<manual::Input, Output<Endpoint>, ManualMixer<Endpoint>, 4>, (usize, Output<Endpoint>)>,
manuals: AudioMixerManuals<Endpoint>,
switcher: TaskSwitcher,
last_tick: Instant,
}
Expand Down
8 changes: 4 additions & 4 deletions packages/media_core/src/cluster/room/audio_mixer/manual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ impl<Endpoint: Clone> ManualMixer<Endpoint> {

fn attach(&mut self, _now: Instant, source: TrackSource) {
let channel_id = id_generator::gen_channel_id(self.room, &source.peer, &source.track);
if !self.sources.contains_key(&channel_id) {
if let std::collections::hash_map::Entry::Vacant(e) = self.sources.entry(channel_id) {
log::info!("[ClusterManualMixer] add source {:?} => sub {channel_id}", source);
self.sources.insert(channel_id, source);
e.insert(source);
self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, pubsub::ChannelControl::SubAuto)));
}
}
Expand Down Expand Up @@ -85,7 +85,7 @@ impl<Endpoint: Clone> ManualMixer<Endpoint> {

fn detach(&mut self, _now: Instant, source: TrackSource) {
let channel_id = id_generator::gen_channel_id(self.room, &source.peer, &source.track);
if let Some(_) = self.sources.remove(&channel_id) {
if self.sources.remove(&channel_id).is_some() {
log::info!("[ClusterManualMixer] remove source {:?} => unsub {channel_id}", source);
self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, pubsub::ChannelControl::UnsubAuto)));
}
Expand Down Expand Up @@ -124,7 +124,7 @@ impl<Endpoint: Clone> Task<Input, Output<Endpoint>> for ManualMixer<Endpoint> {
Input::LeaveRoom => {
// We need manual release sources because it is from client request,
// we cannot ensure client will release it before it disconnect.
let sources = std::mem::replace(&mut self.sources, Default::default());
let sources = std::mem::take(&mut self.sources);
for (channel_id, source) in sources {
log::info!("[ClusterManualMixer] remove source {:?} on queue => unsub {channel_id}", source);
self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, pubsub::ChannelControl::UnsubAuto)));
Expand Down
4 changes: 3 additions & 1 deletion packages/media_core/src/endpoint/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ pub enum InternalOutput {
Destroy,
}

type EndpointInternalWaitJoin = Option<(EndpointReqId, RoomId, PeerId, PeerMeta, RoomInfoPublish, RoomInfoSubscribe, Option<AudioMixerConfig>)>;

pub struct EndpointInternal {
cfg: EndpointCfg,
state: Option<(Instant, TransportState)>,
wait_join: Option<(EndpointReqId, RoomId, PeerId, PeerMeta, RoomInfoPublish, RoomInfoSubscribe, Option<AudioMixerConfig>)>,
wait_join: EndpointInternalWaitJoin,
joined: Option<(ClusterRoomHash, RoomId, PeerId, Option<AudioMixerMode>)>,
local_tracks_id: Small2dMap<LocalTrackId, usize>,
remote_tracks_id: Small2dMap<RemoteTrackId, usize>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::VecDeque;
use std::{cmp::Ordering, collections::VecDeque};

use media_server_protocol::media::{MediaLayersBitrate, MediaMeta, MediaPacket};

Expand Down Expand Up @@ -53,36 +53,42 @@ impl Selector {
if let MediaMeta::H264 { key, profile: _, sim: Some(_sim) } = &mut pkt.meta {
match (self.current, self.target) {
(Some(current), Some(target)) => {
if target < current {
//down spatial => need wait key-frame
if *key {
log::info!("[H264SimSelector] down {} => {} with key", current, target);
ctx.seq_rewrite.reinit();
ctx.ts_rewrite.reinit();
self.current = self.target;
} else {
self.queue.push_back(Action::RequestKeyFrame);
match target.cmp(&current) {
Ordering::Less => {
// down spatial => need wait key-frame
if *key {
log::info!("[H264SimSelector] down {} => {} with key", current, target);
ctx.seq_rewrite.reinit();
ctx.ts_rewrite.reinit();
self.current = self.target;
} else {
self.queue.push_back(Action::RequestKeyFrame);
}
}
} else if target > current {
//up spatial => need wait key-frame
if *key {
log::info!("[H264SimSelector] up {} => {} with key", current, target);
ctx.seq_rewrite.reinit();
ctx.ts_rewrite.reinit();
self.current = Some(target);
} else if !*key {
self.queue.push_back(Action::RequestKeyFrame);
Ordering::Greater => {
// up spatial => need wait key-frame
if *key {
log::info!("[H264SimSelector] up {} => {} with key", current, target);
ctx.seq_rewrite.reinit();
ctx.ts_rewrite.reinit();
self.current = Some(target);
} else {
self.queue.push_back(Action::RequestKeyFrame);
}
}
Ordering::Equal => {
// target is equal to current, handle if needed
}
}
}
(Some(current), None) => {
//need pause
//TODO wait current frame finished for avoiding interrupt client
// need pause
// TODO: wait current frame finished for avoiding interrupt client
log::info!("[H264SimSelector] pause from {}", current);
self.current = None;
}
(None, Some(target)) => {
//need resume or start => need wait key_frame
// need resume or start => need wait key-frame
if *key {
log::info!("[H264SimSelector] resume to {} with key", target);
// with other spatial we have difference tl0xidx and pic_id offset
Expand All @@ -91,7 +97,7 @@ impl Selector {
}
}
(None, None) => {
//reject
// reject
}
}
}
Expand Down
Loading

0 comments on commit a6e1d51

Please sign in to comment.