Skip to content

Commit

Permalink
Interim progress.
Browse files Browse the repository at this point in the history
  • Loading branch information
maldrake authored and wolf4ood committed Feb 27, 2021
1 parent 0e50fb4 commit a923624
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 8 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
Cargo.lock
target
/.directory
.vscode/
42 changes: 38 additions & 4 deletions gremlin-client/src/aio/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,16 @@ use mobc::{Connection, Pool};
use serde::Serialize;
use std::collections::{HashMap, VecDeque};

#[derive(Clone)]
pub struct Session {
pool: Pool<GremlinConnectionManager>,
name: String,
}

#[derive(Clone)]
pub struct GremlinClient {
pool: Pool<GremlinConnectionManager>,
session: Option<Session>,
alias: Option<String>,
pub(crate) options: ConnectionOptions,
}
Expand All @@ -35,11 +42,23 @@ impl GremlinClient {

Ok(GremlinClient {
pool,
session: None,
alias: None,
options: opts,
})
}

pub async fn create_session(&mut self, name: String) -> GremlinResult<()> {
let manager = GremlinConnectionManager::new(self.options.clone());
let pool = Pool::builder().max_open(1).build(manager);
self.session = Some(Session { pool, name });
Ok(())
}

pub fn close_session(&mut self) {
self.session = None
}

/// Return a cloned client with the provided alias
pub fn alias<T>(&self, alias: T) -> GremlinClient
where
Expand Down Expand Up @@ -85,15 +104,30 @@ impl GremlinClient {

args.insert(String::from("bindings"), GValue::from(bindings));

if let Some(session) = &self.session {
args.insert(String::from("session"), GValue::from(session.name.clone()));
}

let args = self.options.serializer.write(&GValue::from(args))?;

let processor = if self.session.is_some() {
"session".to_string()
} else {
String::default()
};

let message = match self.options.serializer {
GraphSON::V1 => message_with_args_v1(String::from("eval"), String::default(), args),
GraphSON::V2 => message_with_args_v2(String::from("eval"), String::default(), args),
GraphSON::V3 => message_with_args(String::from("eval"), String::default(), args),
GraphSON::V1 => message_with_args_v1(String::from("eval"), processor, args),
GraphSON::V2 => message_with_args_v2(String::from("eval"), processor, args),
GraphSON::V3 => message_with_args(String::from("eval"), processor, args),
};

let conn = if let Some(session) = &self.session {
session.pool.get().await?
} else {
self.pool.get().await?
};

let conn = self.pool.get().await?;
self.send_message_new(conn, message).await
}

Expand Down
41 changes: 37 additions & 4 deletions gremlin-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@ use r2d2::Pool;
use serde::Serialize;
use std::collections::{HashMap, VecDeque};

#[derive(Clone, Debug)]
pub struct Session {
pool: Pool<GremlinConnectionManager>,
name: String,
}

#[derive(Clone, Debug)]
pub struct GremlinClient {
pool: Pool<GremlinConnectionManager>,
session: Option<Session>,
alias: Option<String>,
options: ConnectionOptions,
}
Expand All @@ -33,11 +40,23 @@ impl GremlinClient {

Ok(GremlinClient {
pool,
session: None,
alias: None,
options: opts,
})
}

pub fn create_session(&mut self, name: String) -> GremlinResult<()> {
let manager = GremlinConnectionManager::new(self.options.clone());
let pool = Pool::builder().max_size(1).build(manager)?;
self.session = Some(Session { pool, name });
Ok(())
}

pub fn close_session(&mut self) {
self.session = None
}

/// Return a cloned client with the provided alias
pub fn alias<T>(&self, alias: T) -> GremlinClient
where
Expand Down Expand Up @@ -83,15 +102,29 @@ impl GremlinClient {

args.insert(String::from("bindings"), GValue::from(bindings));

if let Some(session) = &self.session {
args.insert(String::from("session"), GValue::from(session.name.clone()));
}

let args = self.options.serializer.write(&GValue::from(args))?;

let processor = if self.session.is_some() {
"session".to_string()
} else {
String::default()
};

let message = match self.options.serializer {
GraphSON::V1 => message_with_args_v1(String::from("eval"), String::default(), args),
GraphSON::V2 => message_with_args_v2(String::from("eval"), String::default(), args),
GraphSON::V3 => message_with_args(String::from("eval"), String::default(), args),
GraphSON::V1 => message_with_args_v1(String::from("eval"), processor, args),
GraphSON::V2 => message_with_args_v2(String::from("eval"), processor, args),
GraphSON::V3 => message_with_args(String::from("eval"), processor, args),
};

let conn = self.pool.get()?;
let conn = if let Some(session) = &self.session {
session.pool.get()?
} else {
self.pool.get()?
};

self.send_message(conn, message)
}
Expand Down
16 changes: 16 additions & 0 deletions gremlin-client/tests/integration_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,22 @@ fn test_empty_query() {
)
}

#[test]
fn test_session_empty_query() {
let mut graph = graph();
graph
.create_session("test-session".to_string())
.expect("It should create a session.");
assert_eq!(
0,
graph
.execute("g.V().hasLabel('Not Found')", &[])
.expect("It should execute a traversal")
.count()
);
graph.close_session();
}

#[test]
fn test_ok_credentials() {
let client = GremlinClient::connect(
Expand Down
20 changes: 20 additions & 0 deletions gremlin-client/tests/integration_client_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,26 @@ mod aio {
)
}

#[cfg(feature = "async-std-runtime")]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
async fn test_session_empty_query() {
let mut graph = connect().await;
graph
.create_session("test-session".to_string())
.expect("It should create a session");

assert_eq!(
0,
graph
.execute("g.V().hasLabel('NotFound')", &[])
.await
.expect("It should execute a traversal")
.count()
.await
);

graph.close_session();
}
#[cfg(feature = "async-std-runtime")]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
async fn test_keep_alive_query() {
Expand Down

0 comments on commit a923624

Please sign in to comment.