From 4559dfa79bd40ae10f3f715b27311dbfc1c22ea8 Mon Sep 17 00:00:00 2001 From: giangndm <45644921+giangndm@users.noreply.github.com> Date: Tue, 26 Mar 2024 20:52:58 +0700 Subject: [PATCH] BREAKING CHANGE: Update newest atm0s-sdn with sans-io runtime (#14) * BREAKING CHANGE: Update newest atm0s-sdn with sans-io runtime * chore: fixing github actions release --- .github/workflows/release.yml | 4 +- Cargo.lock | 687 ++++++------------ Dockerfile | 4 +- crates/agent/node_local.sh | 4 +- crates/relayer/Cargo.toml | 5 +- crates/relayer/run_local_node1.sh | 5 +- crates/relayer/run_local_node2.sh | 5 +- crates/relayer/src/lib.rs | 16 +- crates/relayer/src/main.rs | 24 +- crates/relayer/src/proxy_listener/cluster.rs | 233 +++--- .../src/proxy_listener/cluster/alias_async.rs | 93 +++ .../src/proxy_listener/cluster/quinn_utils.rs | 31 + .../src/proxy_listener/cluster/service.rs | 75 ++ .../src/proxy_listener/cluster/vnet.rs | 106 +++ .../src/proxy_listener/cluster/vsocket.rs | 137 ++++ crates/relayer/src/tunnel.rs | 65 +- crates/relayer/src/utils.rs | 4 +- 17 files changed, 898 insertions(+), 600 deletions(-) create mode 100644 crates/relayer/src/proxy_listener/cluster/alias_async.rs create mode 100644 crates/relayer/src/proxy_listener/cluster/quinn_utils.rs create mode 100644 crates/relayer/src/proxy_listener/cluster/service.rs create mode 100644 crates/relayer/src/proxy_listener/cluster/vnet.rs create mode 100644 crates/relayer/src/proxy_listener/cluster/vsocket.rs diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index fd22c94..ed03a35 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -14,8 +14,8 @@ concurrency: cancel-in-progress: true env: - AGENT_NAME: agent - RELAYER_NAME: relayer + AGENT_NAME: atm0s-reverse-proxy-agent + RELAYER_NAME: atm0s-reverse-proxy-relayer ARTIFACT_DIR: release-builds REGISTRY: ghcr.io IMAGE_NAME: ${{ github.repository }} diff --git a/Cargo.lock b/Cargo.lock index 93e32b4..304cba9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,41 +17,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" -[[package]] -name = "aead" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0" -dependencies = [ - "crypto-common", - "generic-array", -] - -[[package]] -name = "aes" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac1f845298e95f983ff1944b728ae08b8cebab80d684f0a832ed0fc74dfa27e2" -dependencies = [ - "cfg-if", - "cipher", - "cpufeatures", -] - -[[package]] -name = "aes-gcm" -version = "0.10.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "831010a0f742e1209b3bcea8fab6a8e149051ba6099432c8cb2cc117dec3ead1" -dependencies = [ - "aead", - "aes", - "cipher", - "ctr", - "ghash", - "subtle", -] - [[package]] name = "ahash" version = "0.8.7" @@ -73,12 +38,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "allocator-api2" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" - [[package]] name = "anstream" version = "0.6.5" @@ -149,21 +108,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "async-bincode" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21849a990d47109757e820904d7c0b569a8013f6595bf14d911884634d58795f" -dependencies = [ - "bincode", - "byteorder", - "bytes", - "futures-core", - "futures-io", - "futures-sink", - "serde", -] - [[package]] name = "async-channel" version = "1.9.0" @@ -250,8 +194,8 @@ dependencies = [ "futures-io", "futures-lite 2.2.0", "parking", - "polling 3.3.1", - "rustix 0.38.28", + "polling 3.6.0", + "rustix 0.38.32", "slab", "tracing", "windows-sys 0.52.0", @@ -277,16 +221,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "async-notify" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8356f653934a654817bceada97a857ef8d68ab8992753d23ed8e8ccd5fc8fa31" -dependencies = [ - "futures-channel", - "futures-util", -] - [[package]] name = "async-process" version = "1.8.1" @@ -300,7 +234,7 @@ dependencies = [ "cfg-if", "event-listener 3.1.0", "futures-lite 1.13.0", - "rustix 0.38.28", + "rustix 0.38.32", "windows-sys 0.48.0", ] @@ -316,7 +250,7 @@ dependencies = [ "cfg-if", "futures-core", "futures-io", - "rustix 0.38.28", + "rustix 0.38.32", "signal-hook-registry", "slab", "windows-sys 0.48.0", @@ -422,6 +356,7 @@ dependencies = [ "metrics-dashboard", "poem", "quinn", + "quinn-plaintext", "rcgen", "rustls", "serde", @@ -433,276 +368,70 @@ dependencies = [ [[package]] name = "atm0s-sdn" version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dd1e950ae0c699218f2058790fd727a557caf8e89dcd16275733d40290c0c35" +source = "git+https://github.com/8xFF/atm0s-sdn.git?rev=7ee6dd7e50ae82f2c4e0d580aa6e6108b875f297#7ee6dd7e50ae82f2c4e0d580aa6e6108b875f297" dependencies = [ - "async-std", - "async-trait", "atm0s-sdn-identity", - "atm0s-sdn-key-value", - "atm0s-sdn-layers-spread-router", - "atm0s-sdn-layers-spread-router-sync", - "atm0s-sdn-manual-discovery", "atm0s-sdn-network", - "atm0s-sdn-node-alias", - "atm0s-sdn-pub-sub", "atm0s-sdn-router", - "atm0s-sdn-rpc", - "atm0s-sdn-transport-compose", - "atm0s-sdn-transport-tcp", - "atm0s-sdn-transport-udp", - "atm0s-sdn-utils", - "atm0s-sdn-virtual-socket", - "futures-util", - "paste", + "bincode", + "local-ip-address", + "log", + "parking_lot", + "rand", + "sans-io-runtime", + "serde", "thiserror", ] [[package]] name = "atm0s-sdn-identity" version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63e991909d52b8ee19ad314e6b57042970b9c5d2147ea493a60b23ae20368cda" +source = "git+https://github.com/8xFF/atm0s-sdn.git?rev=7ee6dd7e50ae82f2c4e0d580aa6e6108b875f297#7ee6dd7e50ae82f2c4e0d580aa6e6108b875f297" dependencies = [ "multiaddr", "rand", "serde", ] -[[package]] -name = "atm0s-sdn-key-value" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66b548a2b4a3671fab2ffef46b43027755bc787a684f4cd9bb606614834396b8" -dependencies = [ - "async-std", - "atm0s-sdn-identity", - "atm0s-sdn-network", - "atm0s-sdn-router", - "atm0s-sdn-utils", - "log", - "mockall", - "parking_lot", - "serde", - "small-map", - "thiserror", -] - -[[package]] -name = "atm0s-sdn-layers-spread-router" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecc477caf727b547d06189d4184b2db1027324df99cb71b37aba304ba234b9ef" -dependencies = [ - "atm0s-sdn-identity", - "atm0s-sdn-router", - "atm0s-sdn-utils", - "log", - "parking_lot", - "serde", -] - -[[package]] -name = "atm0s-sdn-layers-spread-router-sync" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c787bb2f971cd2e319a79cb6140a4b47f2ac2f46d8df0a1f0dd2fcecd44fc1" -dependencies = [ - "async-trait", - "atm0s-sdn-identity", - "atm0s-sdn-layers-spread-router", - "atm0s-sdn-network", - "atm0s-sdn-router", - "atm0s-sdn-utils", - "bincode", - "log", - "serde", - "thiserror", -] - -[[package]] -name = "atm0s-sdn-manual-discovery" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75ebc3f2c34104af60417870d396f8cad2db9cbad14b5653e6e8d6366e61fc32" -dependencies = [ - "async-trait", - "atm0s-sdn-identity", - "atm0s-sdn-key-value", - "atm0s-sdn-network", - "atm0s-sdn-router", - "atm0s-sdn-utils", - "log", - "parking_lot", - "serde", - "thiserror", -] - [[package]] name = "atm0s-sdn-network" version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26dcf34f17a804c7899dc4dcade5473030bc4260509736a129e93c8ea76c858c" +source = "git+https://github.com/8xFF/atm0s-sdn.git?rev=7ee6dd7e50ae82f2c4e0d580aa6e6108b875f297#7ee6dd7e50ae82f2c4e0d580aa6e6108b875f297" dependencies = [ - "async-std", - "async-trait", "atm0s-sdn-identity", "atm0s-sdn-router", "atm0s-sdn-utils", "bincode", "bytes", "convert-enum", - "futures", "log", + "num", "parking_lot", + "rand", "serde", "sha1", "thiserror", ] -[[package]] -name = "atm0s-sdn-node-alias" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f027dc972ebc22d32dcc8bf636bb0bbce9e8e5fdcca4ed74876e1e3d55b91a6" -dependencies = [ - "async-std", - "atm0s-sdn-identity", - "atm0s-sdn-network", - "atm0s-sdn-pub-sub", - "atm0s-sdn-router", - "atm0s-sdn-utils", - "bincode", - "bytes", - "log", - "parking_lot", - "serde", -] - -[[package]] -name = "atm0s-sdn-pub-sub" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f4545ef4995e98598c91a6f9158e253f6a2f8d4b57de19e415f7ff435040077" -dependencies = [ - "async-std", - "async-trait", - "atm0s-sdn-identity", - "atm0s-sdn-key-value", - "atm0s-sdn-network", - "atm0s-sdn-router", - "atm0s-sdn-utils", - "bytes", - "log", - "parking_lot", - "serde", -] - [[package]] name = "atm0s-sdn-router" version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d580980e58dad9642155f3f7004174b606d6b3552d18a08ed7ad27abf6e9dbff" +source = "git+https://github.com/8xFF/atm0s-sdn.git?rev=7ee6dd7e50ae82f2c4e0d580aa6e6108b875f297#7ee6dd7e50ae82f2c4e0d580aa6e6108b875f297" dependencies = [ "atm0s-sdn-identity", -] - -[[package]] -name = "atm0s-sdn-rpc" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f149a5868974c3c76572c484bf6a0b03a7dfe53dd78d6e2e00e0dcc75a52e783" -dependencies = [ - "async-std", - "atm0s-sdn-identity", - "atm0s-sdn-network", - "atm0s-sdn-router", "atm0s-sdn-utils", - "bincode", "log", - "parking_lot", - "serde", -] - -[[package]] -name = "atm0s-sdn-transport-compose" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2ad123e94796f04b6ecec127057cafc867dc108717c05decf8e556859779999" - -[[package]] -name = "atm0s-sdn-transport-tcp" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "204ba84e2ae66553e7916b2646f3325328ee23f682684c74c8240155d81b778a" -dependencies = [ - "async-bincode", - "async-std", - "async-trait", - "atm0s-sdn-identity", - "atm0s-sdn-network", - "atm0s-sdn-utils", - "bincode", - "futures-util", - "local-ip-address", - "log", - "parking_lot", - "serde", - "snow", -] - -[[package]] -name = "atm0s-sdn-transport-udp" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fa7f214a719465e9263265ea48db426127a7da396e6fb84f503c67e21358207" -dependencies = [ - "async-notify", - "async-std", - "async-trait", - "atm0s-sdn-identity", - "atm0s-sdn-network", - "atm0s-sdn-utils", - "bincode", - "futures-util", - "local-ip-address", - "log", - "parking_lot", + "mockall", "serde", - "snow", - "socket2 0.5.5", ] [[package]] name = "atm0s-sdn-utils" version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e648fecc372185b13c6da558d7675e90d6326229ed5af53ac4d4ed65685b5b0" -dependencies = [ - "async-notify", - "async-std", - "async-trait", - "log", - "rand", -] - -[[package]] -name = "atm0s-sdn-virtual-socket" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26d7a0b16f7a9618da0fc2b4b889c9aad065ce43fef82bc747b741dc0a55041d" +source = "git+https://github.com/8xFF/atm0s-sdn.git?rev=7ee6dd7e50ae82f2c4e0d580aa6e6108b875f297#7ee6dd7e50ae82f2c4e0d580aa6e6108b875f297" dependencies = [ - "async-std", - "async-trait", - "atm0s-sdn-identity", - "atm0s-sdn-network", - "atm0s-sdn-router", - "atm0s-sdn-utils", - "futures", "log", - "parking_lot", - "quinn", - "quinn-plaintext", + "serde", ] [[package]] @@ -771,15 +500,6 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" -[[package]] -name = "blake2" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" -dependencies = [ - "digest", -] - [[package]] name = "block-buffer" version = "0.10.4" @@ -833,54 +553,39 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" [[package]] -name = "cc" -version = "1.0.83" +name = "c2rust-bitfields" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +checksum = "b43c3f07ab0ef604fa6f595aa46ec2f8a22172c975e186f6f5bf9829a3b72c41" dependencies = [ - "libc", + "c2rust-bitfields-derive", ] [[package]] -name = "cfg-if" -version = "1.0.0" +name = "c2rust-bitfields-derive" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" - -[[package]] -name = "chacha20" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3613f74bd2eac03dad61bd53dbe620703d4371614fe0bc3b9f04dd36fe4e818" +checksum = "d3cbc102e2597c9744c8bd8c15915d554300601c91a079430d309816b0912545" dependencies = [ - "cfg-if", - "cipher", - "cpufeatures", + "proc-macro2", + "quote", + "syn 1.0.109", ] [[package]] -name = "chacha20poly1305" -version = "0.10.1" +name = "cc" +version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10cd79432192d1c0f4e1a0fef9527696cc039165d729fb41b3f4f4f354c2dc35" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" dependencies = [ - "aead", - "chacha20", - "cipher", - "poly1305", - "zeroize", + "libc", ] [[package]] -name = "cipher" -version = "0.4.4" +name = "cfg-if" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" -dependencies = [ - "crypto-common", - "inout", - "zeroize", -] +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clap" @@ -953,6 +658,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + [[package]] name = "core-foundation" version = "0.9.4" @@ -1019,19 +730,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" dependencies = [ "generic-array", - "rand_core", "typenum", ] -[[package]] -name = "ctr" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" -dependencies = [ - "cipher", -] - [[package]] name = "curve25519-dalek" version = "4.1.1" @@ -1106,6 +807,19 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "derive_more" +version = "0.99.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version", + "syn 1.0.109", +] + [[package]] name = "digest" version = "0.10.7" @@ -1404,16 +1118,6 @@ dependencies = [ "wasi", ] -[[package]] -name = "ghash" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d930750de5717d2dd0b8c0d42c076c0e884c81a73e6cab859bbd2339c71e3e40" -dependencies = [ - "opaque-debug", - "polyval", -] - [[package]] name = "gimli" version = "0.28.1" @@ -1451,6 +1155,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hash32" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606" +dependencies = [ + "byteorder", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1471,9 +1184,6 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" -dependencies = [ - "allocator-api2", -] [[package]] name = "headers" @@ -1499,6 +1209,16 @@ dependencies = [ "http", ] +[[package]] +name = "heapless" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad" +dependencies = [ + "hash32", + "stable_deref_trait", +] + [[package]] name = "heck" version = "0.4.1" @@ -1507,9 +1227,9 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermit-abi" -version = "0.3.3" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "hex" @@ -1586,7 +1306,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.5", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -1623,15 +1343,6 @@ dependencies = [ "hashbrown 0.14.3", ] -[[package]] -name = "inout" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" -dependencies = [ - "generic-array", -] - [[package]] name = "instant" version = "0.1.12" @@ -1652,6 +1363,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "ioctl-sys" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bd11f3a29434026f5ff98c730b668ba74b1033637b8817940b54d040696133c" + [[package]] name = "itertools" version = "0.11.0" @@ -1693,9 +1410,19 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.152" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" + +[[package]] +name = "libloading" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" +dependencies = [ + "cfg-if", + "windows-targets 0.52.0", +] [[package]] name = "libp2p-identity" @@ -1726,9 +1453,9 @@ checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" [[package]] name = "local-ip-address" -version = "0.5.7" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "612ed4ea9ce5acfb5d26339302528a5e1e59dfed95e9e11af3c083236ff1d15d" +checksum = "136ef34e18462b17bf39a7826f8f3bbc223341f8e83822beb8b77db9a3d49696" dependencies = [ "libc", "neli", @@ -1891,6 +1618,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.48.0", ] @@ -2064,6 +1792,72 @@ dependencies = [ "winapi", ] +[[package]] +name = "num" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05180d69e3da0e530ba2a1dae5110317e49e3b7f3d41be227dc5f92e49ee7af" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits 0.2.17", +] + +[[package]] +name = "num-bigint" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0" +dependencies = [ + "autocfg", + "num-integer", + "num-traits 0.2.17", +] + +[[package]] +name = "num-complex" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23c6602fda94a57c990fe0df199a035d83576b496aa29f4e634a8ac6004e68a6" +dependencies = [ + "num-traits 0.2.17", +] + +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits 0.2.17", +] + +[[package]] +name = "num-iter" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d869c01cc0c455284163fd0092f1f93835385ccab5a98a0dcc497b2f8bf055a9" +dependencies = [ + "autocfg", + "num-integer", + "num-traits 0.2.17", +] + +[[package]] +name = "num-rational" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" +dependencies = [ + "autocfg", + "num-bigint", + "num-integer", + "num-traits 0.2.17", +] + [[package]] name = "num-traits" version = "0.1.43" @@ -2107,12 +1901,6 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" -[[package]] -name = "opaque-debug" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" - [[package]] name = "openssl-probe" version = "0.1.5" @@ -2163,12 +1951,6 @@ dependencies = [ "windows-targets 0.48.5", ] -[[package]] -name = "paste" -version = "1.0.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" - [[package]] name = "pem" version = "3.0.3" @@ -2356,41 +2138,19 @@ dependencies = [ [[package]] name = "polling" -version = "3.3.1" +version = "3.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf63fa624ab313c11656b4cda960bfc46c410187ad493c41f6ba2d8c1e991c9e" +checksum = "e0c976a60b2d7e99d6f229e414670a9b85d13ac305cc6d1e9c134de58c5aaaf6" dependencies = [ "cfg-if", "concurrent-queue", + "hermit-abi", "pin-project-lite", - "rustix 0.38.28", + "rustix 0.38.32", "tracing", "windows-sys 0.52.0", ] -[[package]] -name = "poly1305" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8159bd90725d2df49889a078b54f4f79e87f1f8a8444194cdca81d38f5393abf" -dependencies = [ - "cpufeatures", - "opaque-debug", - "universal-hash", -] - -[[package]] -name = "polyval" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d52cff9d1d4dee5fe6d03729099f4a310a41179e0a10dbf542039873f2e826fb" -dependencies = [ - "cfg-if", - "cpufeatures", - "opaque-debug", - "universal-hash", -] - [[package]] name = "portable-atomic" version = "1.6.0" @@ -2825,9 +2585,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.28" +version = "0.38.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316" +checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89" dependencies = [ "bitflags 2.4.1", "errno", @@ -2899,6 +2659,21 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "sans-io-runtime" +version = "0.1.0" +source = "git+https://github.com/giangndm/sans-io-runtime.git?rev=29dd7661cfb10544743f795b7ce55be3886e7ca1#29dd7661cfb10544743f795b7ce55be3886e7ca1" +dependencies = [ + "derive_more", + "heapless", + "log", + "mio", + "parking_lot", + "polling 3.6.0", + "socket2 0.5.5", + "tun", +] + [[package]] name = "schannel" version = "0.1.23" @@ -3084,39 +2859,12 @@ dependencies = [ "autocfg", ] -[[package]] -name = "small-map" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f6bc8704b966e50b4fc057788d91533d3296594d4f68701da95f2c2289c808e" -dependencies = [ - "ahash", - "hashbrown 0.14.3", - "rustc-hash", -] - [[package]] name = "smallvec" version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" -[[package]] -name = "snow" -version = "0.9.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "850948bee068e713b8ab860fe1adc4d109676ab4c3b621fd8147f06b261f2f85" -dependencies = [ - "aes-gcm", - "blake2", - "chacha20poly1305", - "curve25519-dalek", - "rand_core", - "rustc_version", - "sha2", - "subtle", -] - [[package]] name = "socket2" version = "0.4.10" @@ -3159,6 +2907,12 @@ dependencies = [ "der", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "static_assertions" version = "1.1.0" @@ -3430,6 +3184,19 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tun" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0adb9992bbd5ca76f3847ed579ad4ee8defb2ec2eea918cceef17ccc66fa4fd4" +dependencies = [ + "ioctl-sys", + "libc", + "log", + "thiserror", + "wintun", +] + [[package]] name = "typenum" version = "1.17.0" @@ -3475,16 +3242,6 @@ dependencies = [ "tinyvec", ] -[[package]] -name = "universal-hash" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea" -dependencies = [ - "crypto-common", - "subtle", -] - [[package]] name = "unsigned-varint" version = "0.7.2" @@ -3682,6 +3439,25 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.51.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca229916c5ee38c2f2bc1e9d8f04df975b4bd93f9955dc69fabb5d91270045c9" +dependencies = [ + "windows-core", + "windows-targets 0.48.5", +] + +[[package]] +name = "windows-core" +version = "0.51.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -3823,6 +3599,19 @@ dependencies = [ "memchr", ] +[[package]] +name = "wintun" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29b83b0eca06dd125dbcd48a45327c708a6da8aada3d95a3f06db0ce4b17e0d4" +dependencies = [ + "c2rust-bitfields", + "libloading", + "log", + "thiserror", + "windows", +] + [[package]] name = "yamux" version = "0.13.1" diff --git a/Dockerfile b/Dockerfile index ab1b8fa..8124619 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,8 +11,8 @@ RUN case $TARGETPLATFORM in \ "linux/arm64") BUILD=aarch64-unknown-linux-gnu ;; \ *) exit 1 ;; \ esac; \ - mv /tmp/$BUILD/agent-$BUILD /agent; \ - mv /tmp/$BUILD/relayer-$BUILD /relayer; \ + mv /tmp/$BUILD/atm0s-reverse-proxy-agent-$BUILD /agent; \ + mv /tmp/$BUILD/atm0s-reverse-proxy-relayer-$BUILD /relayer; \ chmod +x /agent; \ chmod +x /relayer; diff --git a/crates/agent/node_local.sh b/crates/agent/node_local.sh index 3133b67..3845c0d 100644 --- a/crates/agent/node_local.sh +++ b/crates/agent/node_local.sh @@ -1,5 +1,5 @@ -cargo run --release -- \ +cargo run -- \ --connector-protocol tcp \ --connector-addr 127.0.0.1:13001 \ --http-dest 127.0.0.1:8080 \ - --https-dest 127.0.0.1:8443 \ No newline at end of file + --https-dest 127.0.0.1:8443 diff --git a/crates/relayer/Cargo.toml b/crates/relayer/Cargo.toml index d41b0f0..f1146ed 100644 --- a/crates/relayer/Cargo.toml +++ b/crates/relayer/Cargo.toml @@ -22,9 +22,10 @@ metrics = { version = "0.21.1" } quinn = { version = "0.10.2", default-features = false, features = ["native-certs", "tls-rustls", "log", "runtime-async-std", "futures-io", "ring"] } rustls = { version = "0.21.0", default-features = false, features = ["quic", "dangerous_configuration"] } rcgen = "0.12.0" -atm0s-sdn = { version = "0.1.10", features = ["all"]} +atm0s-sdn = { git = "https://github.com/8xFF/atm0s-sdn.git", rev = "7ee6dd7e50ae82f2c4e0d580aa6e6108b875f297" } +quinn-plaintext = "0.2.0" [features] default = ["binary"] expose-metrics = ["metrics-dashboard", "poem"] -binary = ["protocol-ed25519", "clap"] \ No newline at end of file +binary = ["protocol-ed25519", "clap"] diff --git a/crates/relayer/run_local_node1.sh b/crates/relayer/run_local_node1.sh index db46b0b..bf97106 100644 --- a/crates/relayer/run_local_node1.sh +++ b/crates/relayer/run_local_node1.sh @@ -1,7 +1,8 @@ -cargo run --release -- \ +cargo run -- \ --api-port 10001 \ --http-port 11001 \ --https-port 12001 \ --connector-port 0.0.0.0:13001 \ --root-domain local.ha.8xff.io \ - --sdn-node-id 1 \ No newline at end of file + --sdn-node-id 1 \ + --sdn-port 50001 \ No newline at end of file diff --git a/crates/relayer/run_local_node2.sh b/crates/relayer/run_local_node2.sh index a874f31..8c14058 100644 --- a/crates/relayer/run_local_node2.sh +++ b/crates/relayer/run_local_node2.sh @@ -1,8 +1,9 @@ -cargo run --release -- \ +cargo run -- \ --api-port 10002 \ --http-port 11002 \ --https-port 12002 \ --connector-port 0.0.0.0:13002 \ --root-domain local.ha.8xff.io \ --sdn-node-id 2 \ - --sdn-seeds '1@/ip4/192.168.1.39/udp/50001' \ No newline at end of file + --sdn-port 50002 \ + --sdn-seeds '1@/ip4/127.0.0.1/udp/50001' \ No newline at end of file diff --git a/crates/relayer/src/lib.rs b/crates/relayer/src/lib.rs index d778d8c..3bf9186 100644 --- a/crates/relayer/src/lib.rs +++ b/crates/relayer/src/lib.rs @@ -2,6 +2,7 @@ use metrics_dashboard::build_dashboard_route; #[cfg(feature = "expose-metrics")] use poem::{listener::TcpListener, middleware::Tracing, EndpointExt as _, Route, Server}; +use proxy_listener::cluster::AliasSdk; use std::{collections::HashMap, sync::Arc}; use async_std::sync::RwLock; @@ -34,17 +35,12 @@ pub use proxy_listener::cluster::{run_sdn, ProxyClusterListener, ProxyClusterTun pub use proxy_listener::http::{ProxyHttpListener, ProxyHttpTunnel}; pub use proxy_listener::{ProxyListener, ProxyTunnel}; -pub use atm0s_sdn::{ - virtual_socket::*, NodeAddr, NodeAliasError, NodeAliasId, NodeAliasResult, NodeAliasSdk, - NodeId, RouteRule, RpcBox, RpcEmitter, RpcError, RpcRequest, -}; - pub use tunnel::{tunnel_task, TunnelContext}; pub async fn run_agent_connection( agent_connection: AG, - agents: Arc>>>>, - node_alias_sdk: NodeAliasSdk, + agents: Arc>>>>, + node_alias_sdk: AliasSdk, agent_rpc_handler: Arc>, ) where AG: AgentConnection + 'static, @@ -62,7 +58,7 @@ pub async fn run_agent_connection( .write() .await .insert(home_id.clone(), proxy_tunnel_tx); - node_alias_sdk.register(home_id.clone()); + node_alias_sdk.register_alias(home_id.clone()).await; let agents = agents.clone(); async_std::task::spawn(async move { increment_gauge!(METRICS_AGENT_LIVE, 1.0); @@ -77,7 +73,9 @@ pub async fn run_agent_connection( } } agents.write().await.remove(&home_id); - node_alias_sdk.unregister(home_id_from_domain(&domain)); + node_alias_sdk + .unregister_alias(home_id_from_domain(&domain)) + .await; log::info!("agent_worker exit for domain: {}", domain); decrement_gauge!(METRICS_AGENT_LIVE, 1.0); }); diff --git a/crates/relayer/src/main.rs b/crates/relayer/src/main.rs index b70559a..b79a02a 100644 --- a/crates/relayer/src/main.rs +++ b/crates/relayer/src/main.rs @@ -55,6 +55,10 @@ struct Args { /// atm0s-sdn seed address #[arg(env, long)] sdn_seeds: Vec, + + /// atm0s-sdn workers + #[arg(env, long, default_value_t = 1)] + sdn_workers: usize, } #[async_std::main] @@ -100,11 +104,12 @@ async fn main() { .await; }); - let (mut cluster_endpoint, node_alias_sdk, virtual_net) = run_sdn( + let (mut cluster_endpoint, alias_sdk, mut virtual_net) = run_sdn( args.sdn_node_id, args.sdn_port, args.sdn_secret_key, args.sdn_seeds, + args.sdn_workers, ) .await; @@ -115,7 +120,7 @@ async fn main() { select! { e = quic_agent_listener.recv().fuse() => match e { Ok(agent_connection) => { - run_agent_connection(agent_connection, agents.clone(), node_alias_sdk.clone(), agent_rpc_handler_quic.clone()).await; + run_agent_connection(agent_connection, agents.clone(), alias_sdk.clone(), agent_rpc_handler_quic.clone()).await; } Err(e) => { log::error!("agent_listener error {}", e); @@ -124,7 +129,7 @@ async fn main() { }, e = tcp_agent_listener.recv().fuse() => match e { Ok(agent_connection) => { - run_agent_connection(agent_connection, agents.clone(), node_alias_sdk.clone(), agent_rpc_handler_tcp.clone()).await; + run_agent_connection(agent_connection, agents.clone(), alias_sdk.clone(), agent_rpc_handler_tcp.clone()).await; } Err(e) => { log::error!("agent_listener error {}", e); @@ -133,7 +138,8 @@ async fn main() { }, e = proxy_http_listener.recv().fuse() => match e { Some(proxy_tunnel) => { - async_std::task::spawn(tunnel_task(proxy_tunnel, agents.clone(), TunnelContext::Local(node_alias_sdk.clone(), virtual_net.clone()))); + let socket = virtual_net.udp_socket(0).await; + async_std::task::spawn(tunnel_task(proxy_tunnel, agents.clone(), TunnelContext::Local(alias_sdk.clone(), socket))); } None => { log::error!("proxy_http_listener.recv()"); @@ -142,7 +148,8 @@ async fn main() { }, e = proxy_tls_listener.recv().fuse() => match e { Some(proxy_tunnel) => { - async_std::task::spawn(tunnel_task(proxy_tunnel, agents.clone(), TunnelContext::Local(node_alias_sdk.clone(), virtual_net.clone()))); + let socket = virtual_net.udp_socket(0).await; + async_std::task::spawn(tunnel_task(proxy_tunnel, agents.clone(), TunnelContext::Local(alias_sdk.clone(), socket))); } None => { log::error!("proxy_http_listener.recv()"); @@ -157,6 +164,13 @@ async fn main() { log::error!("cluster_endpoint.accept()"); exit(3); } + }, + e = virtual_net.recv().fuse() => match e { + Some(()) => {} + None => { + log::error!("virtual_net.recv()"); + exit(4); + } } } } diff --git a/crates/relayer/src/proxy_listener/cluster.rs b/crates/relayer/src/proxy_listener/cluster.rs index ef1f1de..32a4d7b 100644 --- a/crates/relayer/src/proxy_listener/cluster.rs +++ b/crates/relayer/src/proxy_listener/cluster.rs @@ -1,105 +1,163 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; -use atm0s_sdn::virtual_socket::{create_vnet, make_insecure_quinn_server, VirtualNet}; use atm0s_sdn::{ - convert_enum, KeyValueBehavior, KeyValueBehaviorEvent, KeyValueHandlerEvent, KeyValueSdk, - KeyValueSdkEvent, LayersSpreadRouterSyncBehavior, LayersSpreadRouterSyncBehaviorEvent, - LayersSpreadRouterSyncHandlerEvent, ManualBehavior, ManualBehaviorConf, ManualBehaviorEvent, - ManualHandlerEvent, NetworkPlane, NetworkPlaneConfig, NodeAddrBuilder, NodeAliasBehavior, - NodeAliasSdk, PubsubServiceBehaviour, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent, - SharedRouter, SystemTimer, UdpTransport, + builder::SdnBuilder, + features::{ + alias::{self, FoundLocation}, + socket, FeaturesControl, FeaturesEvent, + }, + sans_io_runtime::{backend::PollingBackend, Owner}, + services::visualization, + tasks::{SdnExtIn, SdnExtOut}, + NodeAddr, NodeId, ServiceBroadcastLevel, }; -use atm0s_sdn::{NodeAddr, NodeId}; use futures::{AsyncRead, AsyncWrite}; use protocol::cluster::{ClusterTunnelRequest, ClusterTunnelResponse}; use quinn::{Connecting, Endpoint}; +use self::{ + alias_async::AliasAsyncEvent, + quinn_utils::make_insecure_quinn_server, + vnet::{NetworkPkt, OutEvent}, +}; + use super::{ProxyListener, ProxyTunnel}; -#[derive(convert_enum::From, convert_enum::TryInto)] -enum NodeBehaviorEvent { - Manual(ManualBehaviorEvent), - LayersSpreadRouterSync(LayersSpreadRouterSyncBehaviorEvent), - KeyValue(KeyValueBehaviorEvent), - Pubsub(PubsubServiceBehaviourEvent), -} +mod alias_async; +mod quinn_utils; +mod service; +mod vnet; +mod vsocket; -#[derive(convert_enum::From, convert_enum::TryInto)] -enum NodeHandleEvent { - Manual(ManualHandlerEvent), - LayersSpreadRouterSync(LayersSpreadRouterSyncHandlerEvent), - KeyValue(KeyValueHandlerEvent), - Pubsub(PubsubServiceHandlerEvent), -} +pub use alias_async::AliasSdk; +pub use quinn_utils::make_insecure_quinn_client; +pub use vnet::VirtualNetwork; +pub use vsocket::VirtualUdpSocket; -#[derive(convert_enum::From, convert_enum::TryInto)] -enum NodeSdkEvent { - KeyValue(KeyValueSdkEvent), -} +type SC = visualization::Control; +type SE = visualization::Event; +type TC = (); +type TW = (); pub async fn run_sdn( node_id: NodeId, sdn_port: u16, secret_key: String, seeds: Vec, -) -> (ProxyClusterListener, NodeAliasSdk, VirtualNet) { - let secure = Arc::new(atm0s_sdn::StaticKeySecure::new(&secret_key)); - let mut node_addr_builder = NodeAddrBuilder::new(node_id); - let udp_socket = UdpTransport::prepare(sdn_port, &mut node_addr_builder).await; - let transport = UdpTransport::new(node_addr_builder.addr(), udp_socket, secure.clone()); - - let node_addr = node_addr_builder.addr(); - log::info!("atm0s-sdn listen on addr {}", node_addr); - - let timer = Arc::new(SystemTimer()); - let router = SharedRouter::new(node_id); - - let manual = ManualBehavior::new(ManualBehaviorConf { - node_id, - node_addr, - seeds, - local_tags: vec!["relayer".to_string()], - connect_tags: vec!["relayer".to_string()], - }); + workers: usize, +) -> (ProxyClusterListener, AliasSdk, VirtualNetwork) { + let (mut vnet, tx, rx) = vnet::VirtualNetwork::new(node_id); + let (mut alias_async, alias_sdk) = alias_async::AliasAsync::new(); - let spreads_layer_router = LayersSpreadRouterSyncBehavior::new(router.clone()); - let router = Arc::new(router); - - let kv_sdk = KeyValueSdk::new(); - let kv_behaviour = KeyValueBehavior::new(node_id, 1000, Some(Box::new(kv_sdk))); - let (pubsub_behavior, pubsub_sdk) = PubsubServiceBehaviour::new(node_id, timer.clone()); - let (node_alias_behavior, node_alias_sdk) = NodeAliasBehavior::new(node_id, pubsub_sdk); - let (virtual_socket, virtual_socket_sdk) = create_vnet(node_id, router.clone()); - - let plan_cfg = NetworkPlaneConfig { - router, - node_id, - tick_ms: 1000, - behaviors: vec![ - Box::new(manual), - Box::new(spreads_layer_router), - Box::new(kv_behaviour), - Box::new(virtual_socket), - Box::new(pubsub_behavior), - Box::new(node_alias_behavior), - ], - transport: Box::new(transport), - timer, - }; - - let mut plane = NetworkPlane::::new(plan_cfg); + let server_socket = vnet.udp_socket(443).await; + + let mut builder = SdnBuilder::::new(node_id, sdn_port, vec![]); + + builder.set_manual_discovery(vec!["tunnel".to_string()], vec!["tunnel".to_string()]); + builder.set_visualization_collector(false); + builder.add_service(Arc::new(service::RelayServiceBuilder::default())); + + for seed in seeds { + builder.add_seed(seed); + } async_std::task::spawn(async move { - plane.started(); - while let Ok(_) = plane.recv().await {} - plane.stopped(); + let mut controller = builder.build::>(workers); + while controller.process().is_some() { + while let Ok(c) = rx.try_recv() { + // log::info!("Command: {:?}", c); + match c { + OutEvent::Bind(port) => { + log::info!("Bind port: {}", port); + controller.send_to( + Owner::worker(0), + SdnExtIn::FeaturesControl(FeaturesControl::Socket( + socket::Control::Bind(port), + )), + ); + } + OutEvent::Pkt(pkt) => { + let send = socket::Control::SendTo( + pkt.local_port, + pkt.remote, + pkt.remote_port, + pkt.data, + pkt.meta, + ); + controller.send_to( + Owner::worker(0), + SdnExtIn::FeaturesControl(FeaturesControl::Socket(send)), + ); + } + OutEvent::Unbind(port) => { + log::info!("Unbind port: {}", port); + controller.send_to( + Owner::worker(0), + SdnExtIn::FeaturesControl(FeaturesControl::Socket( + socket::Control::Unbind(port), + )), + ); + } + } + } + while let Some(event) = alias_async.pop_request() { + let control = match event { + AliasAsyncEvent::Query(alias) => alias::Control::Query { + alias, + service: service::SERVICE_ID, + level: ServiceBroadcastLevel::Global, + }, + AliasAsyncEvent::Register(alias) => alias::Control::Register { + alias, + service: service::SERVICE_ID, + level: ServiceBroadcastLevel::Global, + }, + AliasAsyncEvent::Unregister(alias) => alias::Control::Unregister { alias }, + }; + controller.send_to( + Owner::worker(0), + SdnExtIn::FeaturesControl(FeaturesControl::Alias(control)), + ); + } + while let Some(event) = controller.pop_event() { + // log::info!("Event: {:?}", event); + match event { + SdnExtOut::FeaturesEvent(FeaturesEvent::Socket(socket::Event::RecvFrom( + local_port, + remote, + remote_port, + data, + meta, + ))) => { + if let Err(e) = tx.try_send(NetworkPkt { + local_port, + remote, + remote_port, + data, + meta, + }) { + log::error!("Failed to send to tx: {:?}", e); + } + } + SdnExtOut::FeaturesEvent(FeaturesEvent::Alias(alias::Event::QueryResult( + alias, + res, + ))) => { + let res = res.map(|a| match a { + FoundLocation::Local => node_id, + FoundLocation::RemoteHint(node) => node, + FoundLocation::RemoteScan(node) => node, + }); + alias_async.push_response(alias, res); + } + _ => {} + } + } + async_std::task::sleep(Duration::from_millis(1)).await; + } }); - ( - ProxyClusterListener::new(&virtual_socket_sdk), - node_alias_sdk, - virtual_socket_sdk, - ) + (ProxyClusterListener::new(server_socket), alias_sdk, vnet) } pub struct ProxyClusterListener { @@ -107,10 +165,8 @@ pub struct ProxyClusterListener { } impl ProxyClusterListener { - pub fn new(sdk: &VirtualNet) -> Self { - let server = - make_insecure_quinn_server(sdk.create_udp_socket(80, 1000).expect("")).expect(""); - + pub fn new(socket: VirtualUdpSocket) -> Self { + let server = make_insecure_quinn_server(socket).expect(""); Self { server } } } @@ -118,9 +174,11 @@ impl ProxyClusterListener { #[async_trait::async_trait] impl ProxyListener for ProxyClusterListener { async fn recv(&mut self) -> Option> { + let connecting = self.server.accept().await?; + log::info!("incoming connection from {}", connecting.remote_address()); Some(Box::new(ProxyClusterTunnel { domain: "".to_string(), - connecting: Some(self.server.accept().await?), + connecting: Some(connecting), streams: None, })) } @@ -138,8 +196,11 @@ pub struct ProxyClusterTunnel { #[async_trait::async_trait] impl ProxyTunnel for ProxyClusterTunnel { async fn wait(&mut self) -> Option<()> { - let connection = self.connecting.take()?.await.ok()?; + let connecting = self.connecting.take()?; + let connection = connecting.await.ok()?; + log::info!("incoming connection from: {}", connection.remote_address()); let (mut send, mut recv) = connection.accept_bi().await.ok()?; + log::info!("accepted bi stream from: {}", connection.remote_address()); let mut req_buf = [0; 1500]; let req_size = recv.read(&mut req_buf).await.ok()??; let req = ClusterTunnelRequest::try_from(&req_buf[..req_size]).ok()?; diff --git a/crates/relayer/src/proxy_listener/cluster/alias_async.rs b/crates/relayer/src/proxy_listener/cluster/alias_async.rs new file mode 100644 index 0000000..f6294fe --- /dev/null +++ b/crates/relayer/src/proxy_listener/cluster/alias_async.rs @@ -0,0 +1,93 @@ +use std::collections::HashMap; + +use async_std::channel::{Receiver, Sender}; +use atm0s_sdn::NodeId; + +enum AliasSdkEvent { + Register(u64), + Unregister(u64), + Query(u64, Sender>), +} + +#[derive(Clone)] +pub struct AliasSdk { + requester: Sender, +} + +impl AliasSdk { + pub async fn find_alias(&self, alias: u64) -> Option { + log::info!("Querying alias: {}", alias); + let (tx, rx) = async_std::channel::bounded(1); + self.requester + .send(AliasSdkEvent::Query(alias, tx)) + .await + .ok()?; + rx.recv().await.ok().flatten() + } + + pub async fn register_alias(&self, alias: u64) { + if let Err(e) = self.requester.send(AliasSdkEvent::Register(alias)).await { + log::error!("Failed to register alias: {:?}", e); + } + } + + pub async fn unregister_alias(&self, alias: u64) { + if let Err(e) = self.requester.send(AliasSdkEvent::Unregister(alias)).await { + log::error!("Failed to unregister alias: {:?}", e); + } + } +} + +pub enum AliasAsyncEvent { + Register(u64), + Unregister(u64), + Query(u64), +} + +pub struct AliasAsync { + rx: Receiver, + waits: HashMap>>>, +} + +impl AliasAsync { + pub fn new() -> (Self, AliasSdk) { + let (tx, rx) = async_std::channel::bounded(100); + ( + Self { + rx, + waits: HashMap::new(), + }, + AliasSdk { requester: tx }, + ) + } + + pub fn push_response(&mut self, alias: u64, node_id: Option) { + if let Some(tx) = self.waits.remove(&alias) { + log::info!("Find alias {} response: {:?}", alias, node_id); + for tx in tx { + tx.try_send(node_id).ok(); + } + } + } + + pub fn pop_request(&mut self) -> Option { + loop { + match self.rx.try_recv().ok()? { + AliasSdkEvent::Query(alias, tx) => { + if let Some(req) = self.waits.get_mut(&alias) { + req.push(tx); + } else { + self.waits.insert(alias, vec![tx]); + return Some(AliasAsyncEvent::Query(alias)); + } + } + AliasSdkEvent::Register(alias) => { + return Some(AliasAsyncEvent::Register(alias)); + } + AliasSdkEvent::Unregister(alias) => { + return Some(AliasAsyncEvent::Unregister(alias)); + } + } + } + } +} diff --git a/crates/relayer/src/proxy_listener/cluster/quinn_utils.rs b/crates/relayer/src/proxy_listener/cluster/quinn_utils.rs new file mode 100644 index 0000000..38db4a5 --- /dev/null +++ b/crates/relayer/src/proxy_listener/cluster/quinn_utils.rs @@ -0,0 +1,31 @@ +use std::sync::Arc; + +use quinn::{AsyncStdRuntime, Endpoint, EndpointConfig}; + +use super::vsocket::VirtualUdpSocket; + +pub fn make_insecure_quinn_server(socket: VirtualUdpSocket) -> Result { + let runtime = Arc::new(AsyncStdRuntime); + let mut config = EndpointConfig::default(); + config + .max_udp_payload_size(1500) + .expect("Should config quinn server max_size to 1500"); + Endpoint::new_with_abstract_socket( + config, + Some(quinn_plaintext::server_config()), + socket, + runtime, + ) +} + +pub fn make_insecure_quinn_client(socket: VirtualUdpSocket) -> Result { + let runtime = Arc::new(AsyncStdRuntime); + let mut config = EndpointConfig::default(); + //Note that client mtu size shoud be smaller than server's + config + .max_udp_payload_size(1400) + .expect("Should config quinn client max_size to 1400"); + let mut endpoint = Endpoint::new_with_abstract_socket(config, None, socket, runtime)?; + endpoint.set_default_client_config(quinn_plaintext::client_config()); + Ok(endpoint) +} diff --git a/crates/relayer/src/proxy_listener/cluster/service.rs b/crates/relayer/src/proxy_listener/cluster/service.rs new file mode 100644 index 0000000..1ce73cd --- /dev/null +++ b/crates/relayer/src/proxy_listener/cluster/service.rs @@ -0,0 +1,75 @@ +use atm0s_sdn::{ + base::{ + Service, ServiceBuilder, ServiceCtx, ServiceInput, ServiceOutput, ServiceSharedInput, + ServiceWorker, + }, + features::{FeaturesControl, FeaturesEvent}, + services::visualization, +}; + +pub const SERVICE_ID: u8 = 100; +pub const SERVICE_NAME: &str = "relay"; + +type SC = visualization::Control; +type SE = visualization::Event; +type TC = (); +type TW = (); + +struct RelayService; + +impl Service for RelayService { + fn service_id(&self) -> u8 { + SERVICE_ID + } + + fn service_name(&self) -> &str { + SERVICE_NAME + } + + fn on_input( + &mut self, + _ctx: &ServiceCtx, + _now: u64, + _input: ServiceInput, + ) { + } + + fn on_shared_input<'a>(&mut self, _ctx: &ServiceCtx, _now: u64, _input: ServiceSharedInput) {} + + fn pop_output(&mut self, _ctx: &ServiceCtx) -> Option> { + None + } +} + +struct RelayServiceWorker; + +impl ServiceWorker for RelayServiceWorker { + fn service_id(&self) -> u8 { + SERVICE_ID + } + + fn service_name(&self) -> &str { + SERVICE_NAME + } +} + +#[derive(Default)] +pub struct RelayServiceBuilder; + +impl ServiceBuilder for RelayServiceBuilder { + fn service_id(&self) -> u8 { + SERVICE_ID + } + + fn service_name(&self) -> &str { + SERVICE_NAME + } + + fn create(&self) -> Box> { + Box::new(RelayService) + } + + fn create_worker(&self) -> Box> { + Box::new(RelayServiceWorker) + } +} diff --git a/crates/relayer/src/proxy_listener/cluster/vnet.rs b/crates/relayer/src/proxy_listener/cluster/vnet.rs new file mode 100644 index 0000000..a89f33a --- /dev/null +++ b/crates/relayer/src/proxy_listener/cluster/vnet.rs @@ -0,0 +1,106 @@ +use std::collections::{HashMap, VecDeque}; + +use async_std::channel::{Receiver, Sender}; +use atm0s_sdn::NodeId; +use futures::{select, FutureExt}; + +use super::vsocket::VirtualUdpSocket; + +#[derive(Debug)] +pub enum OutEvent { + Bind(u16), + Pkt(NetworkPkt), + Unbind(u16), +} + +#[derive(Debug)] +pub struct NetworkPkt { + pub local_port: u16, + pub remote: NodeId, + pub remote_port: u16, + pub data: Vec, + pub meta: u8, +} + +pub struct VirtualNetwork { + node_id: NodeId, + in_rx: Receiver, + out_tx: Sender, + close_socket_tx: Sender, + close_socket_rx: Receiver, + sockets: HashMap>, + ports: VecDeque, +} + +impl VirtualNetwork { + pub fn new(node_id: NodeId) -> (Self, Sender, Receiver) { + let (in_tx, in_rx) = async_std::channel::bounded(1000); + let (out_tx, out_rx) = async_std::channel::bounded(1000); + let (close_socket_tx, close_socket_rx) = async_std::channel::unbounded(); + + ( + Self { + node_id, + in_rx, + out_tx, + close_socket_rx, + close_socket_tx, + sockets: HashMap::new(), + ports: (0..60000).collect(), + }, + in_tx, + out_rx, + ) + } + + pub async fn udp_socket(&mut self, port: u16) -> VirtualUdpSocket { + //remove port from ports + let port = if port > 0 { + let index = self + .ports + .iter() + .position(|&x| x == port) + .expect("Should have port"); + self.ports.swap_remove_back(index); + port + } else { + self.ports.pop_front().expect("Should have port") + }; + self.out_tx + .send(OutEvent::Bind(port)) + .await + .expect("Should send bind"); + let (tx, rx) = async_std::channel::bounded(1000); + self.sockets.insert(port, tx); + VirtualUdpSocket::new( + self.node_id, + port, + self.out_tx.clone(), + rx, + self.close_socket_tx.clone(), + ) + } + + pub async fn recv(&mut self) -> Option<()> { + select! { + port = self.close_socket_rx.recv().fuse() => { + let port = port.expect("Should have port"); + self.ports.push_back(port); + self.out_tx.send(OutEvent::Unbind(port)).await.expect("Should send unbind"); + Some(()) + } + pkt = self.in_rx.recv().fuse() => { + let pkt = pkt.ok()?; + let src = pkt.local_port; + if let Some(socket_tx) = self.sockets.get(&src) { + if let Err(e) = socket_tx.try_send(pkt) { + log::error!("Send to socket {} error {:?}", src, e); + } + } else { + log::warn!("No socket for port {}", src); + } + Some(()) + } + } + } +} diff --git a/crates/relayer/src/proxy_listener/cluster/vsocket.rs b/crates/relayer/src/proxy_listener/cluster/vsocket.rs new file mode 100644 index 0000000..0ae164f --- /dev/null +++ b/crates/relayer/src/proxy_listener/cluster/vsocket.rs @@ -0,0 +1,137 @@ +use std::{ + fmt::Debug, + io::IoSliceMut, + net::{SocketAddr, SocketAddrV4}, + ops::DerefMut, + sync::Mutex, + task::{Context, Poll}, +}; + +use async_std::channel::{Receiver, Sender}; +use futures::StreamExt; +use quinn::{ + udp::{EcnCodepoint, RecvMeta, Transmit, UdpState}, + AsyncUdpSocket, +}; + +use super::vnet::{NetworkPkt, OutEvent}; + +pub struct VirtualUdpSocket { + port: u16, + addr: SocketAddr, + rx: Mutex>, + tx: Sender, + close_socket_tx: Sender, +} + +impl VirtualUdpSocket { + pub fn new( + node_id: u32, + port: u16, + tx: Sender, + rx: Receiver, + close_socket_tx: Sender, + ) -> Self { + Self { + port, + addr: SocketAddr::V4(SocketAddrV4::new(node_id.into(), port)), + rx: Mutex::new(rx), + tx, + close_socket_tx, + } + } +} + +impl Debug for VirtualUdpSocket { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("VirtualUdpSocket").finish() + } +} + +impl AsyncUdpSocket for VirtualUdpSocket { + fn poll_send( + &self, + _state: &UdpState, + _cx: &mut Context, + transmits: &[Transmit], + ) -> Poll> { + let mut sent = 0; + for transmit in transmits { + match transmit.destination { + SocketAddr::V4(addr) => { + let pkt = NetworkPkt { + local_port: self.port, + remote: u32::from_be_bytes(addr.ip().octets()), + remote_port: addr.port(), + data: transmit.contents.to_vec(), + meta: transmit.ecn.map(|x| x as u8).unwrap_or(0), + }; + log::debug!("{} sending {} bytes to {}", self.addr, pkt.data.len(), addr); + if self.tx.try_send(OutEvent::Pkt(pkt)).is_ok() { + sent += 1; + } + } + _ => { + sent += 1; + } + } + } + std::task::Poll::Ready(Ok(sent)) + } + + fn poll_recv( + &self, + cx: &mut Context, + bufs: &mut [IoSliceMut<'_>], + meta: &mut [RecvMeta], + ) -> Poll> { + let mut rx = self.rx.lock().expect("Should lock mutex"); + match rx.poll_next_unpin(cx) { + std::task::Poll::Pending => std::task::Poll::Pending, + std::task::Poll::Ready(Some(pkt)) => { + let len = pkt.data.len(); + if len <= bufs[0].len() { + let addr = + SocketAddr::V4(SocketAddrV4::new(pkt.remote.into(), pkt.remote_port)); + log::debug!("{} received {} bytes from {}", self.addr, len, addr); + bufs[0].deref_mut()[0..len].copy_from_slice(&pkt.data); + meta[0] = quinn::udp::RecvMeta { + addr, + len, + stride: len, + ecn: if pkt.meta == 0 { + None + } else { + EcnCodepoint::from_bits(pkt.meta) + }, + dst_ip: None, + }; + std::task::Poll::Ready(Ok(1)) + } else { + log::warn!( + "Buffer too small for packet {} vs {}, dropping", + len, + bufs[0].len() + ); + std::task::Poll::Pending + } + } + std::task::Poll::Ready(None) => std::task::Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::ConnectionAborted, + "Socket closed", + ))), + } + } + + fn local_addr(&self) -> std::io::Result { + Ok(self.addr) + } +} + +impl Drop for VirtualUdpSocket { + fn drop(&mut self) { + if let Err(e) = self.close_socket_tx.try_send(self.port) { + log::error!("Failed to send close socket: {:?}", e); + } + } +} diff --git a/crates/relayer/src/tunnel.rs b/crates/relayer/src/tunnel.rs index 7c67313..e32805d 100644 --- a/crates/relayer/src/tunnel.rs +++ b/crates/relayer/src/tunnel.rs @@ -1,26 +1,32 @@ -use std::{collections::HashMap, error::Error, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + error::Error, + net::{SocketAddr, SocketAddrV4}, + sync::Arc, + time::Duration, +}; use crate::{ - proxy_listener::ProxyTunnel, utils::home_id_from_domain, METRICS_CLUSTER_COUNT, - METRICS_CLUSTER_LIVE, METRICS_PROXY_COUNT, + proxy_listener::{ + cluster::{make_insecure_quinn_client, AliasSdk, VirtualUdpSocket}, + ProxyTunnel, + }, + utils::home_id_from_domain, + METRICS_CLUSTER_COUNT, METRICS_CLUSTER_LIVE, METRICS_PROXY_COUNT, }; use async_std::{prelude::FutureExt, sync::RwLock}; -use atm0s_sdn::{ - virtual_socket::{make_insecure_quinn_client, vnet_addr, VirtualNet}, - NodeAliasId, NodeAliasResult, NodeAliasSdk, -}; use futures::{select, FutureExt as _}; use metrics::{decrement_gauge, increment_counter, increment_gauge}; use protocol::cluster::{ClusterTunnelRequest, ClusterTunnelResponse}; pub enum TunnelContext { Cluster, - Local(NodeAliasSdk, VirtualNet), + Local(AliasSdk, VirtualUdpSocket), } pub async fn tunnel_task( mut proxy_tunnel: Box, - agents: Arc>>>>, + agents: Arc>>>>, context: TunnelContext, ) { match proxy_tunnel.wait().timeout(Duration::from_secs(5)).await { @@ -51,42 +57,29 @@ pub async fn tunnel_task( async fn tunnel_over_cluster( domain: String, mut proxy_tunnel: Box, - node_alias_sdk: NodeAliasSdk, - virtual_net: VirtualNet, + node_alias_sdk: AliasSdk, + socket: VirtualUdpSocket, ) -> Result<(), Box> { log::warn!( "agent not found for domain: {} in local => finding in cluster", domain ); let node_alias_id = home_id_from_domain(&domain); - let (tx, rx) = async_std::channel::bounded(1); - node_alias_sdk.find_alias( - node_alias_id, - Box::new(move |res| { - tx.try_send(res).expect("Should send result"); - }), - ); - - let dest = match rx.recv().await? { - Ok(NodeAliasResult::FromLocal) => { - log::warn!( - "something wrong, found alias {} in local but mapper not found", - domain - ); - return Err("INTERNAL_ERROR".into()); - } - Ok(NodeAliasResult::FromHint(dest)) => dest, - Ok(NodeAliasResult::FromScan(dest)) => dest, - Err(_e) => return Err("NODE_ALIAS_NOT_FOUND".into()), - }; - - let socket = virtual_net - .create_udp_socket(0, 20) - .map_err(|e| format!("{:?}", e))?; + let dest = node_alias_sdk + .find_alias(node_alias_id) + .await + .ok_or("NODE_ALIAS_NOT_FOUND".to_string())?; + log::info!("found agent for domain: {domain} in node {dest}"); let client = make_insecure_quinn_client(socket)?; - let connecting = client.connect(vnet_addr(dest, 80), "localhost")?; + log::info!("connecting to agent for domain: {domain} in node {dest}"); + let connecting = client.connect( + SocketAddr::V4(SocketAddrV4::new(dest.into(), 443)), + "localhost", + )?; let connection = connecting.await?; + log::info!("connected to agent for domain: {domain} in node {dest}"); let (mut send, mut recv) = connection.open_bi().await?; + log::info!("opened bi stream to agent for domain: {domain} in node {dest}"); let req_buf: Vec = (&ClusterTunnelRequest { domain: domain.clone(), diff --git a/crates/relayer/src/utils.rs b/crates/relayer/src/utils.rs index 7a8432b..9578996 100644 --- a/crates/relayer/src/utils.rs +++ b/crates/relayer/src/utils.rs @@ -3,10 +3,8 @@ use std::{ hash::{Hash, Hasher}, }; -use atm0s_sdn::NodeAliasId; - /// get home id from domain by get first subdomain -pub fn home_id_from_domain(domain: &str) -> NodeAliasId { +pub fn home_id_from_domain(domain: &str) -> u64 { let mut parts = domain.split('.'); let mut hasher = DefaultHasher::default(); parts.next().unwrap_or(domain).hash(&mut hasher);