Skip to content

Commit

Permalink
fix(net4mqtt): network timeout reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
a-wing committed Nov 26, 2024
1 parent 101b63b commit 07e3f88
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 51 deletions.
20 changes: 4 additions & 16 deletions libs/net4mqtt/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,7 @@ pub async fn agent(
}
}
},
Err(e) => {
error!("agent mqtt error: {:?}", e);
time::sleep(time::Duration::from_secs(1)).await;
}
Err(e) => return Err(anyhow!("agent mqtt error: {:?}", e))
}
}
else => { error!("vagent proxy error"); }
Expand Down Expand Up @@ -508,10 +505,7 @@ pub async fn local_ports_tcp(
}
}
},
Err(e) => {
error!("local mqtt error: {:?}", e);
time::sleep(time::Duration::from_secs(1)).await;
}
Err(e) => return Err(anyhow!("local mqtt error: {:?}", e))
}

}
Expand Down Expand Up @@ -580,10 +574,7 @@ pub async fn local_ports_udp(
}
}
},
Err(e) => {
error!("local mqtt error: {:?}", e);
time::sleep(time::Duration::from_secs(1)).await;
}
Err(e) => return Err(anyhow!("local mqtt error: {:?}", e))
}

}
Expand Down Expand Up @@ -716,10 +707,7 @@ pub async fn local_socks(
}
}
},
Err(e) => {
error!("local mqtt error: {:?}", e);
time::sleep(time::Duration::from_secs(1)).await;
}
Err(e) => return Err(anyhow!("local mqtt error: {:?}", e))
}

}
Expand Down
46 changes: 26 additions & 20 deletions liveion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::net::TcpListener;
use tower_http::{
cors::CorsLayer, trace::TraceLayer, validate_request::ValidateRequestHeaderLayer,
};
use tracing::{error, info_span, Level};
use tracing::{error, info_span, warn, Level};

use auth::{access::access_middleware, ManyValidate};
use error::AppError;
Expand Down Expand Up @@ -89,25 +89,31 @@ where
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
net4mqtt::proxy::agent(
&c.mqtt_url,
&cfg.http.listen.to_string(),
&c.alias.clone(),
Some(net4mqtt::proxy::VDataConfig {
online: Some(
serde_json::json!({
"alias": c.alias,
})
.to_string()
.bytes()
.collect(),
),
offline: Some("{}".bytes().collect()),
..Default::default()
}),
)
.await
.unwrap()
loop {
match net4mqtt::proxy::agent(
&c.mqtt_url,
&cfg.http.listen.to_string(),
&c.alias.clone(),
Some(net4mqtt::proxy::VDataConfig {
online: Some(
serde_json::json!({
"alias": c.alias,
})
.to_string()
.bytes()
.collect(),
),
offline: Some("{}".bytes().collect()),
..Default::default()
}),
)
.await
{
Ok(_) => warn!("net4mqtt service is end, restart net4mqtt service"),
Err(e) => error!("mqtt4mqtt error: {:?}", e),
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
});
});
}
Expand Down
36 changes: 21 additions & 15 deletions liveman/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::net::TcpListener;
use tower_http::{
cors::CorsLayer, trace::TraceLayer, validate_request::ValidateRequestHeaderLayer,
};
use tracing::{error, info, info_span};
use tracing::{error, info, info_span, warn};

use crate::admin::{authorize, token};
use crate::config::Config;
Expand Down Expand Up @@ -81,20 +81,26 @@ where
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
let listener = TcpListener::bind(c.listen).await.unwrap();
net4mqtt::proxy::local_socks(
&c.mqtt_url,
listener,
("-", &c.alias.clone()),
Some(c.domain),
Some(net4mqtt::proxy::VDataConfig {
receiver: Some(sender),
..Default::default()
}),
false,
)
.await
.unwrap()
loop {
let listener = TcpListener::bind(c.listen).await.unwrap();
match net4mqtt::proxy::local_socks(
&c.mqtt_url,
listener,
("-", &c.alias.clone()),
Some(c.domain.clone()),
Some(net4mqtt::proxy::VDataConfig {
receiver: Some(sender.clone()),
..Default::default()
}),
false,
)
.await
{
Ok(_) => warn!("net4mqtt service is end, restart net4mqtt service"),
Err(e) => error!("mqtt4mqtt error: {:?}", e),
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
});
});

Expand Down

0 comments on commit 07e3f88

Please sign in to comment.