Skip to content

Commit

Permalink
refactor(liveman): proxy plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
a-wing committed Oct 13, 2024
1 parent 2b943b7 commit edc7a71
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 43 deletions.
6 changes: 3 additions & 3 deletions libs/net4mqtt/src/kxdns.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
const SPLIT: char = '.';

pub struct Kxdns {
domain: &'static str,
domain: String,
}

impl Kxdns {
pub fn new(domain: &'static str) -> Self {
pub fn new(domain: String) -> Self {
Self { domain }
}

Expand All @@ -21,7 +21,7 @@ impl Kxdns {

#[test]
fn test_kxdns() {
let kxdns = Kxdns::new("kxdns.com");
let kxdns = Kxdns::new("kxdns.com".to_string());
let key = "test";
let domain = kxdns.registry(key);
assert_eq!(domain, "test.kxdns.com");
Expand Down
10 changes: 5 additions & 5 deletions libs/net4mqtt/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ async fn test_socks_simple() {
let res = client
.get(format!(
"http://{}/",
kxdns::Kxdns::new(DOMAIN_SUFFIX).registry("0")
kxdns::Kxdns::new(DOMAIN_SUFFIX.to_string()).registry("0")
))
.send()
.await
Expand All @@ -660,7 +660,7 @@ async fn test_socks_simple() {
let res = client
.get(format!(
"http://{}/",
kxdns::Kxdns::new(DOMAIN_SUFFIX).registry("0")
kxdns::Kxdns::new(DOMAIN_SUFFIX.to_string()).registry("0")
))
.send()
.await
Expand Down Expand Up @@ -706,7 +706,7 @@ async fn test_socks_restart() {
let res = client
.get(format!(
"http://{}/",
kxdns::Kxdns::new(DOMAIN_SUFFIX).registry("0")
kxdns::Kxdns::new(DOMAIN_SUFFIX.to_string()).registry("0")
))
.send()
.await
Expand All @@ -726,7 +726,7 @@ async fn test_socks_restart() {
let res = client
.get(format!(
"http://{}/",
kxdns::Kxdns::new(DOMAIN_SUFFIX).registry("0")
kxdns::Kxdns::new(DOMAIN_SUFFIX.to_string()).registry("0")
))
.send()
.await
Expand Down Expand Up @@ -795,7 +795,7 @@ async fn test_socks_multiple_server() {
let res = client
.get(format!(
"http://{}/",
kxdns::Kxdns::new(DOMAIN_SUFFIX).registry(&id.to_string())
kxdns::Kxdns::new(DOMAIN_SUFFIX.to_string()).registry(&id.to_string())
))
.send()
.await
Expand Down
14 changes: 11 additions & 3 deletions liveman/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ pub struct Net4mqtt {
pub mqtt_url: String,
#[serde(default)]
pub alias: String,
#[serde(default = "default_socks_listen")]
#[serde(default = "default_net4mqtt_listen")]
pub listen: SocketAddr,
#[serde(default = "default_net4mqtt_domain")]
pub domain: String,
}

#[cfg(feature = "net4mqtt")]
Expand All @@ -49,16 +51,22 @@ impl Default for Net4mqtt {
Self {
mqtt_url: String::new(),
alias: String::new(),
listen: default_socks_listen(),
listen: default_net4mqtt_listen(),
domain: default_net4mqtt_domain(),
}
}
}

#[cfg(feature = "net4mqtt")]
fn default_socks_listen() -> SocketAddr {
fn default_net4mqtt_listen() -> SocketAddr {
SocketAddr::from_str("0.0.0.0:1077").expect("invalid listen socks address")
}

#[cfg(feature = "net4mqtt")]
fn default_net4mqtt_domain() -> String {
String::from("net4mqtt.local")
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Http {
#[serde(default = "default_http_listen")]
Expand Down
63 changes: 31 additions & 32 deletions liveman/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,40 +44,37 @@ where
F: Future<Output = ()> + Send + 'static,
{
#[cfg(feature = "net4mqtt")]
let net4mqtt_domain = "net4mqtt.local";

#[cfg(feature = "net4mqtt")]
let proxy_addr = match cfg.net4mqtt.clone() {
Some(c) => Some((c.listen, net4mqtt_domain.to_string())),
let proxy_plugin = match cfg.net4mqtt.clone() {
Some(c) => {
// References: https://github.com/seanmonstar/reqwest/issues/899
let target = reqwest::Url::parse(&format!("socks5h://{}", c.listen)).unwrap();
Some(reqwest::Proxy::custom(move |url| match url.host_str() {
Some(host) => {
if host.ends_with(c.domain.as_str()) {
Some(target.clone())
} else {
None
}
}
None => None,
}))
}
None => None,
};
#[cfg(not(feature = "net4mqtt"))]
let proxy_addr = None;

let client_builder = reqwest::Client::builder()
.connect_timeout(Duration::from_millis(1000))
.timeout(Duration::from_millis(10000));

let client = if let Some((addr, domain)) = proxy_addr {
// References: https://github.com/seanmonstar/reqwest/issues/899
let target = reqwest::Url::parse(format!("socks5h://{}", addr).as_str()).unwrap();
client_builder.proxy(reqwest::Proxy::custom(move |url| match url.host_str() {
Some(host) => {
if host.ends_with(domain.as_str()) {
Some(target.clone())
} else {
None
}
}
None => None,
}))
let proxy_plugin = None;

let client_req = reqwest::Client::builder();
let client_mem = reqwest::Client::builder()
.connect_timeout(Duration::from_millis(500))
.timeout(Duration::from_millis(1000));
let (client_req, client_mem) = if let Some(proxy) = proxy_plugin {
(client_req.proxy(proxy.clone()), client_mem.proxy(proxy))
} else {
client_builder
}
.build()
.unwrap();
(client_req, client_mem)
};

let store = MemStorage::new(client.clone());
let store = MemStorage::new(client_mem.build().unwrap());
let nodes = store.get_map_nodes_mut();
for v in cfg.nodes.clone() {
nodes
Expand All @@ -104,6 +101,8 @@ where
let (sender, mut receiver) =
tokio::sync::mpsc::channel::<(String, String, Vec<u8>)>(10);

let domain = c.domain.clone();

std::thread::spawn(move || {
tokio::runtime::Runtime::new()
.unwrap()
Expand All @@ -113,7 +112,7 @@ where
&c.mqtt_url,
listener,
("-", &c.alias.clone()),
Some(net4mqtt_domain.to_string()),
Some(c.domain),
Some(net4mqtt::proxy::VDataConfig {
receiver: Some(sender),
..Default::default()
Expand All @@ -126,7 +125,7 @@ where
});

std::thread::spawn(move || {
let dns = net4mqtt::kxdns::Kxdns::new(net4mqtt_domain);
let dns = net4mqtt::kxdns::Kxdns::new(domain);
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
Expand Down Expand Up @@ -159,7 +158,7 @@ where

let app_state = AppState {
config: cfg.clone(),
client,
client: client_req.build().unwrap(),
storage: store,
};

Expand Down

0 comments on commit edc7a71

Please sign in to comment.