From 3f6966e6659e8d3498589355eebad0f1d879c909 Mon Sep 17 00:00:00 2001 From: Simon Liang Date: Tue, 20 Oct 2020 18:16:36 +0800 Subject: [PATCH 1/5] txnlog feature basically ready --- Cargo.lock | 316 +++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + Dockerfile | 1 + docker-compose.yml | 66 ++++++++++ src/channels.rs | 10 +- src/connections.rs | 72 ++++++++++- src/main.rs | 14 +- src/server.rs | 8 +- 8 files changed, 477 insertions(+), 11 deletions(-) create mode 100644 docker-compose.yml diff --git a/Cargo.lock b/Cargo.lock index 76b3948..54beb2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -21,6 +21,100 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034" +[[package]] +name = "async-channel" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59740d83946db6a5af71ae25ddf9562c2b176b2ca42cf99a455f09f4a220d6b9" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + +[[package]] +name = "async-executor" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d373d78ded7d0b3fa8039375718cde0aace493f2e34fb60f51cbf567562ca801" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "once_cell", + "vec-arena", +] + +[[package]] +name = "async-global-executor" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "124ac8c265e407641c3362b8f4d39cdb4e243885b71eef087be27199790f5a3a" +dependencies = [ + "async-executor", + "async-io", + "futures-lite", + "num_cpus", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d54bc4c1c7292475efb2253227dbcfad8fe1ca4c02bc62c510cc2f3da5c4704e" +dependencies = [ + "concurrent-queue", + "fastrand", + "futures-lite", + "libc", + "log", + "nb-connect", + "once_cell", + "parking", + "polling", + "vec-arena", + "waker-fn", + "winapi 0.3.9", +] + +[[package]] +name = "async-mutex" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-std" +version = "1.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9fa76751505e8df1c7a77762f60486f60c71bbd9b8557f4da6ad47d083732ed" +dependencies = [ + "async-global-executor", + "async-io", + "async-mutex", + "blocking", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "num_cpus", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + [[package]] name = "async-stream" version = "0.2.1" @@ -42,6 +136,12 @@ dependencies = [ "syn", ] +[[package]] +name = "async-task" +version = "4.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" + [[package]] name = "async-trait" version = "0.1.41" @@ -53,6 +153,12 @@ dependencies = [ "syn", ] +[[package]] +name = "atomic-waker" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" + [[package]] name = "atty" version = "0.2.14" @@ -91,6 +197,20 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5e170dbede1f740736619b776d7251cb1b9095c435c34d8ca9f57fcd2f335e9" +dependencies = [ + "async-channel", + "async-task", + "atomic-waker", + "fastrand", + "futures-lite", + "once_cell", +] + [[package]] name = "bumpalo" version = "3.4.0" @@ -109,6 +229,12 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" +[[package]] +name = "cache-padded" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" + [[package]] name = "cc" version = "1.0.61" @@ -121,6 +247,28 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +[[package]] +name = "combine" +version = "4.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2809f67365382d65fd2b6d9c22577231b954ed27400efeafbe687bda75abcc0b" +dependencies = [ + "bytes", + "futures-util", + "memchr", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "concurrent-queue" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" +dependencies = [ + "cache-padded", +] + [[package]] name = "core-foundation" version = "0.7.0" @@ -143,6 +291,17 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634" +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if", + "lazy_static", +] + [[package]] name = "dgraph-tonic" version = "0.8.1" @@ -181,6 +340,7 @@ dependencies = [ "hyper", "log", "querystring", + "redis", "ring", "serde", "serde_derive", @@ -206,6 +366,12 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" +[[package]] +name = "dtoa" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "134951f4028bdadb9b84baf4232681efbf277da25144b9b0ad65df75946c422b" + [[package]] name = "either" version = "1.6.1" @@ -225,6 +391,21 @@ dependencies = [ "termcolor", ] +[[package]] +name = "event-listener" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" + +[[package]] +name = "fastrand" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca5faf057445ce5c9d4329e382b2ce7ca38550ef3b73a5348362d5f24e0c7fe3" +dependencies = [ + "instant", +] + [[package]] name = "fixedbitset" version = "0.2.0" @@ -316,6 +497,21 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fc94b64bb39543b4e432f1790b6bf18e3ee3b74653c5449f63310e9a74b123c" +[[package]] +name = "futures-lite" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "381a7ad57b1bad34693f63f6f377e1abded7a9c85c9d3eb6771e11c60aaadab9" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.6" @@ -384,6 +580,19 @@ dependencies = [ "wasi", ] +[[package]] +name = "gloo-timers" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47204a46aaff920a1ea58b11d03dec6f704287d27561724a4631e450654a891f" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "h2" version = "0.2.6" @@ -523,6 +732,15 @@ dependencies = [ "bytes", ] +[[package]] +name = "instant" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63312a18f7ea8760cdd0a7c5aac1a619752a246b833545e3e36d1f81f7cd9e66" +dependencies = [ + "cfg-if", +] + [[package]] name = "iovec" version = "0.1.4" @@ -566,6 +784,15 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -687,6 +914,16 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nb-connect" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8123a81538e457d44b933a02faf885d3fe8408806b23fa700e8f01c6c3a98998" +dependencies = [ + "libc", + "winapi 0.3.9", +] + [[package]] name = "net2" version = "0.2.35" @@ -753,6 +990,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + [[package]] name = "percent-encoding" version = "2.1.0" @@ -807,6 +1050,19 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" +[[package]] +name = "polling" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2a7bc6b2a29e632e45451c941832803a18cce6781db04de8a04696cdca8bde4" +dependencies = [ + "cfg-if", + "libc", + "log", + "wepoll-sys", + "winapi 0.3.9", +] + [[package]] name = "ppv-lite86" version = "0.2.9" @@ -957,6 +1213,27 @@ dependencies = [ "rand_core", ] +[[package]] +name = "redis" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95357caf2640abc54651b93c98a8df4fe1ccbf44b8e601ccdf43d5c1451f29ac" +dependencies = [ + "async-std", + "async-trait", + "bytes", + "combine", + "dtoa", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "sha1", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.1.57" @@ -1111,6 +1388,12 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "sha1" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" + [[package]] name = "signal-hook-registry" version = "1.2.1" @@ -1643,12 +1926,24 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c" +[[package]] +name = "vec-arena" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eafc1b9b2dfc6f5529177b62cf806484db55b32dc7c9658a118e11bbeb33061d" + [[package]] name = "version_check" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "want" version = "0.3.0" @@ -1690,6 +1985,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7866cab0aa01de1edf8b5d7936938a7e397ee50ce24119aef3e1eaa3b6171da" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.68" @@ -1739,6 +2046,15 @@ dependencies = [ "untrusted", ] +[[package]] +name = "wepoll-sys" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "142bc2cba3fe88be1a8fcb55c727fa4cd5b0cf2d7438722792e22f26f04bc1e0" +dependencies = [ + "cc", +] + [[package]] name = "which" version = "3.1.1" diff --git a/Cargo.toml b/Cargo.toml index ebe04f3..f6475cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,3 +25,4 @@ serde_derive = "1.0" serde_json = "1.0" base64 = "0.12" querystring = "1.1" +redis = { version = "0.17", features = ["tokio-rt-core", "streams"] } diff --git a/Dockerfile b/Dockerfile index 7c7a05a..d07b7fe 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,6 +22,7 @@ FROM debian:buster-slim ENV RUST_LOG=info ENV DGRAPH_ALPHAS=http://localhost:9080 +ENV REDIS_URL=redis://localhost:6379/0 ENV LISTEN_ADDRESS=0.0.0.0:9000 ENV CONNECTION_CHECK_INTERVAL=5000 ENV CONNECTION_CHECK_RETRY=3 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..2996225 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,66 @@ +version: '3.2' + +services: + dgraph-zero: + image: dgraph/dgraph:master + volumes: + - type: volume + source: dgraph + target: /dgraph + volume: + nocopy: true + ports: + - 5080:5080 + - 6080:6080 + restart: always + command: dgraph zero --my=dgraph-zero:5080 + dgraph-alpha: + image: dgraph/dgraph:master + volumes: + - type: volume + source: dgraph + target: /dgraph + volume: + nocopy: true + ports: + - 8080:8080 + - 9080:9080 + restart: always + environment: + - DGRAPH_ALPHA_WHITELIST=0.0.0.0/0 + - DGRAPH_ALPHA_LRU_MB=2048 + command: dgraph alpha --my=dgraph-alpha:7080 --zero=dgraph-zero:5080 + dgraph-ratel: + image: dgraph/dgraph:master + volumes: + - type: volume + source: dgraph + target: /dgraph + volume: + nocopy: true + ports: + - 8000:8000 + command: dgraph-ratel + restart: always + redis: + image: redis:6 + command: redis-server --appendonly yes + restart: always + ports: + - 6379:6379 + volumes: + - redis:/data + redis-commander: + container_name: redis-commander + hostname: redis-commander + image: rediscommander/redis-commander:latest + restart: always + environment: + - REDIS_HOSTS=local:redis:6379 + ports: + - "8081:8081" +volumes: + dgraph: + driver: local + redis: + driver: local diff --git a/src/channels.rs b/src/channels.rs index 4cee3cb..846eb6e 100644 --- a/src/channels.rs +++ b/src/channels.rs @@ -6,6 +6,7 @@ use futures::future::{select, FutureExt, TryFutureExt}; use futures::StreamExt; use hyper::upgrade::Upgraded; use log::debug; +use redis::{AsyncCommands, RedisError, aio::MultiplexedConnection}; use tokio::sync::{oneshot, Mutex}; use crate::connections::{ @@ -74,7 +75,7 @@ pub async fn create_best_effort_txn_channel(upgraded: Upgraded, client: Arc, txn_id: String) { +pub async fn create_mutated_txn_channel(upgraded: Upgraded, client: Arc, txn_id: String, mut redis_connection: MultiplexedConnection) { let stream = tokio_tungstenite::WebSocketStream::from_raw_socket( upgraded, tungstenite::protocol::Role::Server, @@ -94,6 +95,12 @@ pub async fn create_mutated_txn_channel(upgraded: Upgraded, client: Arc, shutdown.map_err(drop), )); + let _result: Result, RedisError> = redis_connection.xadd( + format!("txn:{:}", txn_id.clone()), + "*", + &[("event", "txn_started")], + ).await; + debug!("creating new mutated txn"); accept_mutate_txn_connection( sender_arc_mutex, @@ -102,6 +109,7 @@ pub async fn create_mutated_txn_channel(upgraded: Upgraded, client: Arc, shutdown_hook_arc_mutex.clone(), query_count.clone(), txn_id, + redis_connection.clone(), ) .await } diff --git a/src/connections.rs b/src/connections.rs index 5c48a60..68e6380 100644 --- a/src/connections.rs +++ b/src/connections.rs @@ -8,6 +8,7 @@ use futures::stream::{SplitSink, SplitStream}; use futures::{SinkExt, StreamExt}; use hyper::upgrade::Upgraded; use log::{debug, error}; +use redis::{AsyncCommands, RedisError, aio::MultiplexedConnection}; use tokio::sync::{oneshot, Mutex}; use tokio_tungstenite::WebSocketStream; use tungstenite::{Error, Message}; @@ -42,11 +43,13 @@ pub async fn accept_mutate_txn_connection( shutdown_hook_arc_mutex: Arc>>>, query_count: Arc, txn_id: String, + redis_connection: MultiplexedConnection, ) where M: Mutate + TxnContextExport + 'static, { while let Some(message) = receiver.next().await { let id = txn_id.clone(); + let conn = redis_connection.clone(); let sam = sender_arc_mutex.clone(); let tam = txn_arc_mutex.clone(); let sham = shutdown_hook_arc_mutex.clone(); @@ -60,6 +63,7 @@ pub async fn accept_mutate_txn_connection( qc, message, id.clone(), + conn, ).await }); } @@ -155,7 +159,7 @@ async fn process_query_message( | ClientError::CannotMutate(status) | ClientError::CannotQuery(status) | ClientError::CannotRefreshLogin(status) => { - error_msg.replace(format!("{{\"status\": {:}, \"message\": \"{:}\"}}", status.code(), status.message())); + error_msg.replace(format!("{{\"status\": \"{:}\", \"code\": {:}, \"message\": \"{:}\"}}", status.code(), status.code() as i32, status.message())); }, _ => {}, }; @@ -199,6 +203,7 @@ async fn process_mutate_message( query_count: Arc, message: Result, txn_id: String, + mut redis_connection: MultiplexedConnection, ) where M: Mutate + TxnContextExport, { @@ -210,6 +215,15 @@ async fn process_mutate_message( let response = discard_txn(None, txn_arc_mutex.clone()).await; match response { Ok(payload) => { + let _result: Result, RedisError> = redis_connection.xadd( + format!("txn:{:}", txn_id.clone()), + "*", + &[ + ("event", "txn_discarded"), + ("reason", &format!("{:?}", receive_error)), + ], + ).await; + send_message( sender_arc_mutex.clone(), Message::Text(serde_json::to_string(&payload).unwrap_or_default()), @@ -217,6 +231,15 @@ async fn process_mutate_message( .await } Err(err) => { + let _result: Result, RedisError> = redis_connection.xadd( + format!("txn:{:}", txn_id.clone()), + "*", + &[ + ("event", "txn_discard_failed"), + ("message", &format!("{:?}", err)), + ], + ).await; + let payload = ResponsePayload { id: None, error: Some(format!("Txn Error: {:?}", err)), @@ -247,6 +270,15 @@ async fn process_mutate_message( let response = discard_txn(None, txn_arc_mutex.clone()).await; let _ = match response { Ok(payload) => { + let _result: Result, RedisError> = redis_connection.xadd( + format!("txn:{:}", txn_id.clone()), + "*", + &[ + ("event", "txn_discarded"), + ("reason", "ws channel closed"), + ], + ).await; + send_message( sender_arc_mutex.clone(), Message::Text(serde_json::to_string(&payload).unwrap_or_default()), @@ -254,6 +286,15 @@ async fn process_mutate_message( .await } Err(err) => { + let _result: Result, RedisError> = redis_connection.xadd( + format!("txn:{:}", txn_id.clone()), + "*", + &[ + ("event", "txn_discard_failed"), + ("message", &format!("{:?}", err)), + ], + ).await; + let payload = ResponsePayload { id: None, error: Some(format!("Txn Error: {:?}", err)), @@ -299,15 +340,32 @@ async fn process_mutate_message( decrement_counter(query_count.clone()); match response { Ok(payload) => { + let stringified = serde_json::to_string(&payload).unwrap_or_default(); + + let _result: Result, RedisError> = redis_connection.xadd( + format!("txn:{:}", txn_id.clone()), + "*", + &[ + ("event", "txn_request_processed"), + ("request", &t.as_str()), + ("response", &stringified), + ], + ).await; + send_message( sender_arc_mutex.clone(), - Message::Text( - serde_json::to_string(&payload).unwrap_or_default(), - ), - ) - .await + Message::Text(stringified), + ).await } Err(err) => { + let _result: Result, RedisError> = redis_connection.xadd( + format!("txn:{:}", txn_id.clone()), + "*", + &[ + ("event", "txn_request_errored"), + ("message", &format!("{:?}", err)), + ], + ).await; let mut error_msg = Some(format!("{{\"message\": \"Txn Error: {:?}\"}}", &err)); if err.is::() { let dgraph_err: DgraphError = err.downcast::().unwrap(); @@ -324,7 +382,7 @@ async fn process_mutate_message( | ClientError::CannotMutate(status) | ClientError::CannotQuery(status) | ClientError::CannotRefreshLogin(status) => { - error_msg.replace(format!("{{\"status\": {:}, \"message\": \"{:}\"}}", status.code(), status.message())); + error_msg.replace(format!("{{\"status\": \"{:}\", \"code\": {:}, \"message\": \"{:}\"}}", status.code(), status.code() as i32, status.message())); }, _ => {}, }; diff --git a/src/main.rs b/src/main.rs index 41054a0..a2bbfee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,5 @@ #[macro_use] extern crate anyhow; - extern crate serde_json; mod channels; @@ -15,6 +14,7 @@ use std::net::SocketAddr; use std::sync::Arc; use dgraph_tonic::Client; +use redis::Client as RedisClient; use log::info; #[tokio::main] @@ -32,6 +32,16 @@ async fn main() { let client_arc = Arc::new(Client::new(address_vec).expect("dgraph client")); + let redis_url = match env::var("REDIS_URL") { + Ok(val) => val.clone(), + Err(_) => "redis://localhost:6379/0".to_string(), + }; + + info!("creating redis connection against {:?}", redis_url); + + let redis_client = RedisClient::open(redis_url).unwrap(); + let redis_connection = redis_client.get_multiplexed_tokio_connection().await.unwrap(); + let addr_str = match env::var("LISTEN_ADDRESS") { Ok(val) => val.clone(), Err(_) => "0.0.0.0:9000".to_string(), @@ -42,5 +52,5 @@ async fn main() { info!("server listening at {:}", addr); - server::build(addr, client_arc.clone()).await + server::build(addr, client_arc.clone(), redis_connection.clone()).await } diff --git a/src/server.rs b/src/server.rs index d72fd2c..257cfda 100644 --- a/src/server.rs +++ b/src/server.rs @@ -8,6 +8,7 @@ use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use log::{debug, error, info}; +use redis::aio::MultiplexedConnection; use ring::digest::{digest, SHA1_FOR_LEGACY_USE_ONLY}; use crate::channels::{ @@ -25,17 +26,21 @@ async fn shutdown_signal() { .expect("failed to install CTRL+C signal handler"); } -pub async fn build(addr: SocketAddr, client_arc: Arc) { +pub async fn build(addr: SocketAddr, client_arc: Arc, redis_connection: MultiplexedConnection) { + let redis_conn1 = redis_connection.clone(); let client_one = client_arc.clone(); Arc::downgrade(&client_one); let make_svc = make_service_fn(|_socket: &AddrStream| { + let redis_conn2 = redis_conn1.clone(); let client_two = client_one.clone(); Arc::downgrade(&client_two); async move { let client_three = client_two.clone(); + let redis_conn3 = redis_conn2.clone(); Arc::downgrade(&client_three); Ok::<_, hyper::Error>(service_fn(move |req: Request| { let client_four = client_three.clone(); + let redis_conn4 = redis_conn3.clone(); async move { let mut res = Response::new(Body::empty()); @@ -108,6 +113,7 @@ pub async fn build(addr: SocketAddr, client_arc: Arc) { upgraded, client_four.clone(), sec_websocket_accept.clone(), + redis_conn4.clone(), ) .await; } From 2f718595902d80fbd8b085819c53717f615aceed Mon Sep 17 00:00:00 2001 From: Simon Liang Date: Tue, 20 Oct 2020 18:29:40 +0800 Subject: [PATCH 2/5] added incoming_txns channel --- src/channels.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/channels.rs b/src/channels.rs index 846eb6e..6e71411 100644 --- a/src/channels.rs +++ b/src/channels.rs @@ -95,6 +95,14 @@ pub async fn create_mutated_txn_channel(upgraded: Upgraded, client: Arc, shutdown.map_err(drop), )); + let _inc_txn_result: Result, RedisError> = redis_connection.xadd( + "incoming_txns", + "*", + &[ + ("txnId", &txn_id.clone()), + ], + ).await; + let _result: Result, RedisError> = redis_connection.xadd( format!("txn:{:}", txn_id.clone()), "*", From 480c2a76c5d743167823efeb2c2692ad50633b5a Mon Sep 17 00:00:00 2001 From: Simon Liang Date: Wed, 21 Oct 2020 09:43:15 +0800 Subject: [PATCH 3/5] fixed message serialization by relying on serde_json --- src/connections.rs | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/src/connections.rs b/src/connections.rs index 68e6380..54cde3e 100644 --- a/src/connections.rs +++ b/src/connections.rs @@ -9,6 +9,7 @@ use futures::{SinkExt, StreamExt}; use hyper::upgrade::Upgraded; use log::{debug, error}; use redis::{AsyncCommands, RedisError, aio::MultiplexedConnection}; +use serde_json::json; use tokio::sync::{oneshot, Mutex}; use tokio_tungstenite::WebSocketStream; use tungstenite::{Error, Message}; @@ -143,7 +144,9 @@ async fn process_query_message( .await } Err(err) => { - let mut error_msg = Some(format!("{{\"message\": \"Txn Error: {:?}\"}}", &err)); + let mut error_msg = Some(json!({ + "message": format!("Txn Error: {:?}", &err), + }).to_string()); if err.is::() { let dgraph_err: DgraphError = err.downcast::().unwrap(); match dgraph_err { @@ -159,7 +162,11 @@ async fn process_query_message( | ClientError::CannotMutate(status) | ClientError::CannotQuery(status) | ClientError::CannotRefreshLogin(status) => { - error_msg.replace(format!("{{\"status\": \"{:}\", \"code\": {:}, \"message\": \"{:}\"}}", status.code(), status.code() as i32, status.message())); + error_msg.replace(json!({ + "status": status.code().description(), + "code": status.code() as i32, + "message": format!("{:}", status.message()), + }).to_string()); }, _ => {}, }; @@ -366,7 +373,9 @@ async fn process_mutate_message( ("message", &format!("{:?}", err)), ], ).await; - let mut error_msg = Some(format!("{{\"message\": \"Txn Error: {:?}\"}}", &err)); + let mut error_msg = Some(json!({ + "message": format!("Txn Error: {:?}", &err), + }).to_string()); if err.is::() { let dgraph_err: DgraphError = err.downcast::().unwrap(); match dgraph_err { @@ -382,7 +391,11 @@ async fn process_mutate_message( | ClientError::CannotMutate(status) | ClientError::CannotQuery(status) | ClientError::CannotRefreshLogin(status) => { - error_msg.replace(format!("{{\"status\": \"{:}\", \"code\": {:}, \"message\": \"{:}\"}}", status.code(), status.code() as i32, status.message())); + error_msg.replace(json!({ + "status": status.code().description(), + "code": status.code() as i32, + "message": format!("{:}", status.message()), + }).to_string()); }, _ => {}, }; From f581eb7901dff04b84f9f327164c9c28c8c1298d Mon Sep 17 00:00:00 2001 From: Simon Liang Date: Fri, 23 Oct 2020 15:03:38 +0800 Subject: [PATCH 4/5] added websocket config to increase the send/receive limits --- src/channels.rs | 15 ++++++++++++--- src/connections.rs | 2 +- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/channels.rs b/src/channels.rs index 6e71411..335535f 100644 --- a/src/channels.rs +++ b/src/channels.rs @@ -1,3 +1,4 @@ +use tungstenite::protocol::WebSocketConfig; use std::sync::atomic::AtomicU32; use std::sync::Arc; @@ -13,11 +14,19 @@ use crate::connections::{ accept_mutate_txn_connection, accept_query_txn_connection, auto_close_connection, }; +fn get_websocket_config() -> WebSocketConfig { + WebSocketConfig { + max_send_queue: None, + max_message_size: Some(2048 << 20), // 2GB + max_frame_size: Some(2048 << 20), // 2GB + } +} + pub async fn create_read_only_txn_channel(upgraded: Upgraded, client: Arc) { let stream = tokio_tungstenite::WebSocketStream::from_raw_socket( upgraded, tungstenite::protocol::Role::Server, - None, + Some(get_websocket_config()), ) .await; let txn_arc_mutex = Arc::new(Mutex::new(Some(client.new_read_only_txn()))); @@ -48,7 +57,7 @@ pub async fn create_best_effort_txn_channel(upgraded: Upgraded, client: Arc, let stream = tokio_tungstenite::WebSocketStream::from_raw_socket( upgraded, tungstenite::protocol::Role::Server, - None, + Some(get_websocket_config()), ) .await; let txn_arc_mutex = Arc::new(Mutex::new(Some(client.new_mutated_txn()))); diff --git a/src/connections.rs b/src/connections.rs index 54cde3e..93a7429 100644 --- a/src/connections.rs +++ b/src/connections.rs @@ -227,7 +227,7 @@ async fn process_mutate_message( "*", &[ ("event", "txn_discarded"), - ("reason", &format!("{:?}", receive_error)), + ("reason", &format!("receive error: {:?}", receive_error)), ], ).await; From ce9e4194ca2e238837e1a4bd867776d7cd58b116 Mon Sep 17 00:00:00 2001 From: Simon Liang Date: Thu, 12 Nov 2020 18:20:27 +0800 Subject: [PATCH 5/5] added a way to not auto close connection --- docker-compose.yml | 2 +- src/channels.rs | 39 ++++++++++++++++++++++++--------------- src/connections.rs | 2 ++ src/server.rs | 4 ++++ 4 files changed, 31 insertions(+), 16 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 2996225..f0c22ca 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -version: '3.2' +version: '3.7' services: dgraph-zero: diff --git a/src/channels.rs b/src/channels.rs index 335535f..cc59f82 100644 --- a/src/channels.rs +++ b/src/channels.rs @@ -22,7 +22,7 @@ fn get_websocket_config() -> WebSocketConfig { } } -pub async fn create_read_only_txn_channel(upgraded: Upgraded, client: Arc) { +pub async fn create_read_only_txn_channel(upgraded: Upgraded, client: Arc, disable_auto_close: bool) { let stream = tokio_tungstenite::WebSocketStream::from_raw_socket( upgraded, tungstenite::protocol::Role::Server, @@ -37,10 +37,13 @@ pub async fn create_read_only_txn_channel(upgraded: Upgraded, client: Arc(); let shutdown_hook_arc_mutex = Arc::new(Mutex::new(Some(shutdown_hook))); - tokio::spawn(select( - auto_close_connection(sender_arc_mutex.clone(), query_count.clone()).boxed(), - shutdown.map_err(drop), - )); + + if !disable_auto_close { + tokio::spawn(select( + auto_close_connection(sender_arc_mutex.clone(), query_count.clone()).boxed(), + shutdown.map_err(drop), + )); + } debug!("creating new read only txn"); accept_query_txn_connection( @@ -53,7 +56,7 @@ pub async fn create_read_only_txn_channel(upgraded: Upgraded, client: Arc) { +pub async fn create_best_effort_txn_channel(upgraded: Upgraded, client: Arc, disable_auto_close: bool) { let stream = tokio_tungstenite::WebSocketStream::from_raw_socket( upgraded, tungstenite::protocol::Role::Server, @@ -68,10 +71,13 @@ pub async fn create_best_effort_txn_channel(upgraded: Upgraded, client: Arc(); let shutdown_hook_arc_mutex = Arc::new(Mutex::new(Some(shutdown_hook))); - tokio::spawn(select( - auto_close_connection(sender_arc_mutex.clone(), query_count.clone()).boxed(), - shutdown.map_err(drop), - )); + + if !disable_auto_close { + tokio::spawn(select( + auto_close_connection(sender_arc_mutex.clone(), query_count.clone()).boxed(), + shutdown.map_err(drop), + )); + } debug!("creating new best effort txn"); accept_query_txn_connection( @@ -84,7 +90,7 @@ pub async fn create_best_effort_txn_channel(upgraded: Upgraded, client: Arc, txn_id: String, mut redis_connection: MultiplexedConnection) { +pub async fn create_mutated_txn_channel(upgraded: Upgraded, client: Arc, txn_id: String, mut redis_connection: MultiplexedConnection, disable_auto_close: bool) { let stream = tokio_tungstenite::WebSocketStream::from_raw_socket( upgraded, tungstenite::protocol::Role::Server, @@ -99,10 +105,13 @@ pub async fn create_mutated_txn_channel(upgraded: Upgraded, client: Arc, let (shutdown_hook, shutdown) = oneshot::channel::<()>(); let shutdown_hook_arc_mutex = Arc::new(Mutex::new(Some(shutdown_hook))); - tokio::spawn(select( - auto_close_connection(sender_arc_mutex.clone(), query_count.clone()).boxed(), - shutdown.map_err(drop), - )); + + if !disable_auto_close { + tokio::spawn(select( + auto_close_connection(sender_arc_mutex.clone(), query_count.clone()).boxed(), + shutdown.map_err(drop), + )); + } let _inc_txn_result: Result, RedisError> = redis_connection.xadd( "incoming_txns", diff --git a/src/connections.rs b/src/connections.rs index 93a7429..413b6f8 100644 --- a/src/connections.rs +++ b/src/connections.rs @@ -475,6 +475,8 @@ pub async fn auto_close_connection( Ok(_) => (), Err(e) => error!("Error sending close message {:?}", e), }; + query_count.store(0, Ordering::Relaxed); + retry.store(0, Ordering::Relaxed); break; } } diff --git a/src/server.rs b/src/server.rs index 257cfda..bed11e0 100644 --- a/src/server.rs +++ b/src/server.rs @@ -94,17 +94,20 @@ pub async fn build(addr: SocketAddr, client_arc: Arc, redis_connection: let read_only = query_map.get("read_only").unwrap_or(&""); let best_effort = query_map.get("best_effort").unwrap_or(&""); + let disable_auto_close = query_map.get("disable_auto_close").unwrap_or(&""); if *read_only == "true" { if *best_effort == "true" { create_best_effort_txn_channel( upgraded, client_four.clone(), + *disable_auto_close == "true", ) .await; } else { create_read_only_txn_channel( upgraded, client_four.clone(), + *disable_auto_close == "true", ) .await; } @@ -114,6 +117,7 @@ pub async fn build(addr: SocketAddr, client_arc: Arc, redis_connection: client_four.clone(), sec_websocket_accept.clone(), redis_conn4.clone(), + *disable_auto_close == "true", ) .await; }