Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis TxnLog #6

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 316 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ serde_derive = "1.0"
serde_json = "1.0"
base64 = "0.12"
querystring = "1.1"
redis = { version = "0.17", features = ["tokio-rt-core", "streams"] }
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ FROM debian:buster-slim

ENV RUST_LOG=info
ENV DGRAPH_ALPHAS=http://localhost:9080
ENV REDIS_URL=redis://localhost:6379/0
ENV LISTEN_ADDRESS=0.0.0.0:9000
ENV CONNECTION_CHECK_INTERVAL=5000
ENV CONNECTION_CHECK_RETRY=3
Expand Down
66 changes: 66 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
version: '3.7'

services:
dgraph-zero:
image: dgraph/dgraph:master
volumes:
- type: volume
source: dgraph
target: /dgraph
volume:
nocopy: true
ports:
- 5080:5080
- 6080:6080
restart: always
command: dgraph zero --my=dgraph-zero:5080
dgraph-alpha:
image: dgraph/dgraph:master
volumes:
- type: volume
source: dgraph
target: /dgraph
volume:
nocopy: true
ports:
- 8080:8080
- 9080:9080
restart: always
environment:
- DGRAPH_ALPHA_WHITELIST=0.0.0.0/0
- DGRAPH_ALPHA_LRU_MB=2048
command: dgraph alpha --my=dgraph-alpha:7080 --zero=dgraph-zero:5080
dgraph-ratel:
image: dgraph/dgraph:master
volumes:
- type: volume
source: dgraph
target: /dgraph
volume:
nocopy: true
ports:
- 8000:8000
command: dgraph-ratel
restart: always
redis:
image: redis:6
command: redis-server --appendonly yes
restart: always
ports:
- 6379:6379
volumes:
- redis:/data
redis-commander:
container_name: redis-commander
hostname: redis-commander
image: rediscommander/redis-commander:latest
restart: always
environment:
- REDIS_HOSTS=local:redis:6379
ports:
- "8081:8081"
volumes:
dgraph:
driver: local
redis:
driver: local
70 changes: 52 additions & 18 deletions src/channels.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use tungstenite::protocol::WebSocketConfig;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;

Expand All @@ -6,17 +7,26 @@ use futures::future::{select, FutureExt, TryFutureExt};
use futures::StreamExt;
use hyper::upgrade::Upgraded;
use log::debug;
use redis::{AsyncCommands, RedisError, aio::MultiplexedConnection};
use tokio::sync::{oneshot, Mutex};

use crate::connections::{
accept_mutate_txn_connection, accept_query_txn_connection, auto_close_connection,
};

pub async fn create_read_only_txn_channel(upgraded: Upgraded, client: Arc<Client>) {
fn get_websocket_config() -> WebSocketConfig {
WebSocketConfig {
max_send_queue: None,
max_message_size: Some(2048 << 20), // 2GB
max_frame_size: Some(2048 << 20), // 2GB
}
}

pub async fn create_read_only_txn_channel(upgraded: Upgraded, client: Arc<Client>, disable_auto_close: bool) {
let stream = tokio_tungstenite::WebSocketStream::from_raw_socket(
upgraded,
tungstenite::protocol::Role::Server,
None,
Some(get_websocket_config()),
)
.await;
let txn_arc_mutex = Arc::new(Mutex::new(Some(client.new_read_only_txn())));
Expand All @@ -27,10 +37,13 @@ pub async fn create_read_only_txn_channel(upgraded: Upgraded, client: Arc<Client

let (shutdown_hook, shutdown) = oneshot::channel::<()>();
let shutdown_hook_arc_mutex = Arc::new(Mutex::new(Some(shutdown_hook)));
tokio::spawn(select(
auto_close_connection(sender_arc_mutex.clone(), query_count.clone()).boxed(),
shutdown.map_err(drop),
));

if !disable_auto_close {
tokio::spawn(select(
auto_close_connection(sender_arc_mutex.clone(), query_count.clone()).boxed(),
shutdown.map_err(drop),
));
}

debug!("creating new read only txn");
accept_query_txn_connection(
Expand All @@ -43,11 +56,11 @@ pub async fn create_read_only_txn_channel(upgraded: Upgraded, client: Arc<Client
.await
}

pub async fn create_best_effort_txn_channel(upgraded: Upgraded, client: Arc<Client>) {
pub async fn create_best_effort_txn_channel(upgraded: Upgraded, client: Arc<Client>, disable_auto_close: bool) {
let stream = tokio_tungstenite::WebSocketStream::from_raw_socket(
upgraded,
tungstenite::protocol::Role::Server,
None,
Some(get_websocket_config()),
)
.await;
let txn_arc_mutex = Arc::new(Mutex::new(Some(client.new_best_effort_txn())));
Expand All @@ -58,10 +71,13 @@ pub async fn create_best_effort_txn_channel(upgraded: Upgraded, client: Arc<Clie

let (shutdown_hook, shutdown) = oneshot::channel::<()>();
let shutdown_hook_arc_mutex = Arc::new(Mutex::new(Some(shutdown_hook)));
tokio::spawn(select(
auto_close_connection(sender_arc_mutex.clone(), query_count.clone()).boxed(),
shutdown.map_err(drop),
));

if !disable_auto_close {
tokio::spawn(select(
auto_close_connection(sender_arc_mutex.clone(), query_count.clone()).boxed(),
shutdown.map_err(drop),
));
}

debug!("creating new best effort txn");
accept_query_txn_connection(
Expand All @@ -74,11 +90,11 @@ pub async fn create_best_effort_txn_channel(upgraded: Upgraded, client: Arc<Clie
.await
}

pub async fn create_mutated_txn_channel(upgraded: Upgraded, client: Arc<Client>, txn_id: String) {
pub async fn create_mutated_txn_channel(upgraded: Upgraded, client: Arc<Client>, txn_id: String, mut redis_connection: MultiplexedConnection, disable_auto_close: bool) {
let stream = tokio_tungstenite::WebSocketStream::from_raw_socket(
upgraded,
tungstenite::protocol::Role::Server,
None,
Some(get_websocket_config()),
)
.await;
let txn_arc_mutex = Arc::new(Mutex::new(Some(client.new_mutated_txn())));
Expand All @@ -89,10 +105,27 @@ pub async fn create_mutated_txn_channel(upgraded: Upgraded, client: Arc<Client>,

let (shutdown_hook, shutdown) = oneshot::channel::<()>();
let shutdown_hook_arc_mutex = Arc::new(Mutex::new(Some(shutdown_hook)));
tokio::spawn(select(
auto_close_connection(sender_arc_mutex.clone(), query_count.clone()).boxed(),
shutdown.map_err(drop),
));

if !disable_auto_close {
tokio::spawn(select(
auto_close_connection(sender_arc_mutex.clone(), query_count.clone()).boxed(),
shutdown.map_err(drop),
));
}

let _inc_txn_result: Result<Vec<u8>, RedisError> = redis_connection.xadd(
"incoming_txns",
"*",
&[
("txnId", &txn_id.clone()),
],
).await;

let _result: Result<Vec<u8>, RedisError> = redis_connection.xadd(
format!("txn:{:}", txn_id.clone()),
"*",
&[("event", "txn_started")],
).await;

debug!("creating new mutated txn");
accept_mutate_txn_connection(
Expand All @@ -102,6 +135,7 @@ pub async fn create_mutated_txn_channel(upgraded: Upgraded, client: Arc<Client>,
shutdown_hook_arc_mutex.clone(),
query_count.clone(),
txn_id,
redis_connection.clone(),
)
.await
}
Loading