Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement graph binary protocol #217

Open
wants to merge 56 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
153b6cd
Generalized protocol enum to graphson alternatives
criminosis Oct 7, 2024
076949f
Removed redundant serializer content type mapping
criminosis Oct 7, 2024
cbaa2bf
Missed import in generalization
criminosis Oct 8, 2024
946d132
Renamed Protocol to IoProtcol following Tinkerpop naming
criminosis Oct 8, 2024
dce5400
Added rstest
criminosis Oct 18, 2024
e426d0d
Serialized message binary parity to java driver for test traversal
criminosis Oct 18, 2024
6cfc168
Centralized on new serializer based message building & reading
criminosis Oct 19, 2024
97a3de6
Formatting
criminosis Oct 19, 2024
0a39163
Removed defunct methods and restored binary deserialization functions
criminosis Oct 19, 2024
36a9989
Successful graph binary traversal
criminosis Oct 19, 2024
c810c07
Handle mapping JSON Value::Null to GValue::Null
criminosis Oct 20, 2024
6de4156
Fix aio compiling errors
criminosis Oct 20, 2024
5330d8f
Set IT client v2 test's deserializer
criminosis Oct 20, 2024
ee6086f
Handle null in GraphSONV2 g_serializer_2
criminosis Oct 20, 2024
f273b14
Formatting
criminosis Oct 20, 2024
5b5108c
Fix graph binary demo IT
criminosis Oct 20, 2024
678792f
Formatting
criminosis Oct 20, 2024
4ca0eb5
Additional value binary serdes
criminosis Oct 22, 2024
455e329
Created graph binary rw tests
criminosis Oct 23, 2024
4a42319
Adding omni test class
criminosis Oct 24, 2024
32b47ac
Handling more test required serialization types
criminosis Oct 24, 2024
815767a
Formatting
criminosis Oct 25, 2024
753dbf7
Made test_unwrap_map pass
criminosis Oct 31, 2024
0368454
Made test_value_map pass
criminosis Nov 1, 2024
54b2457
Made test_element_map pass
criminosis Nov 1, 2024
35d22ed
test_anonymous_traversal_properties_drop
criminosis Nov 1, 2024
490e925
Saving vertex serde progress
criminosis Nov 5, 2024
69da91e
Express possibly null response request id & made vertex serde tests pass
criminosis Nov 5, 2024
6db1750
Fix passing test_has_with_p_steps
criminosis Nov 7, 2024
e570908
Made test_add_v_with_properties pass
criminosis Nov 8, 2024
f2880a0
Made test_by_columns pass
criminosis Nov 8, 2024
2492d09
Made test_select_pop pass
criminosis Nov 8, 2024
e9a0c6d
Implemented string literal GraphBinaryV1Ser
criminosis Nov 8, 2024
baccdd5
All omni tests passing except merge and test_has_with_text_p_step
criminosis Nov 8, 2024
6626be3
test_has_with_text_p_step passing
criminosis Nov 8, 2024
d07679e
Modifed test to reflect has clause not needing predicate for equality…
criminosis Nov 8, 2024
fe2c0de
Fixed merge tests
criminosis Nov 9, 2024
2083461
Supplanted integration_traversal tests into single parameterized test
criminosis Nov 9, 2024
e2cbd2c
Cleaned up loose ends
criminosis Nov 9, 2024
fb3bf91
Parameterized integration_traversal_async
criminosis Nov 12, 2024
1bd393d
Moved merge_capable_serializers to merge_test mod block so its under …
criminosis Nov 12, 2024
0bd9f9a
Consolidated integration_client tests
criminosis Nov 13, 2024
ae34c00
Centralized integration_traversal_async
criminosis Nov 13, 2024
aacd284
Consolidated to serializers templated in common test module
criminosis Nov 13, 2024
f54ea77
Formatting
criminosis Nov 13, 2024
85a58f1
Use centralized serializer template
criminosis Nov 13, 2024
a1bbe79
Created parameterized integration_client_async
criminosis Nov 13, 2024
b4d9015
Formatting
criminosis Nov 13, 2024
5a4931b
Import cleanup
criminosis Nov 13, 2024
97ca6c1
Added serial annotation for test_partial_content to prevent test stat…
criminosis Nov 13, 2024
2d2b13a
Added derive feature flag to GH Action test
criminosis Nov 13, 2024
081ad4e
Implemented map get & try_get backwards compatability logic to not br…
criminosis Nov 13, 2024
5fb1adb
Parameterized custom vertex id tests, added support for JanusGraph's …
criminosis Nov 14, 2024
03cb90a
Populate edge properties in GraphSONV2/V3 & GraphBinary. Also impleme…
criminosis Nov 14, 2024
49799e2
Removed opaque assert_eq losing Err message
criminosis Nov 14, 2024
766f045
Use universal JG test serial annotation to prevent flakey test failure
criminosis Nov 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/test.yml
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added derive flag to tests, notice the derive tests weren't being ran

Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,29 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path gremlin-client/Cargo.toml --features=tokio-runtime
args: --manifest-path gremlin-client/Cargo.toml --features=tokio-runtime,derive
- name: Run cargo test with async-std
if: matrix.gremlin-server == '3.5.7'
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path gremlin-client/Cargo.toml --features=async-std-runtime
args: --manifest-path gremlin-client/Cargo.toml --features=async-std-runtime,derive
# MergeV as a step doesn't exist in 3.5.x, so selectively run those tests
- name: Run cargo test with blocking client
if: matrix.gremlin-server != '3.5.7'
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path gremlin-client/Cargo.toml --features=merge_tests
args: --manifest-path gremlin-client/Cargo.toml --features=merge_tests,derive
- name: Run cargo test with tokio
if: matrix.gremlin-server != '3.5.7'
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path gremlin-client/Cargo.toml --features=tokio-runtime,merge_tests
args: --manifest-path gremlin-client/Cargo.toml --features=tokio-runtime,merge_tests,derive
- name: Run cargo test with async-std
if: matrix.gremlin-server != '3.5.7'
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path gremlin-client/Cargo.toml --features=async-std-runtime,merge_tests
args: --manifest-path gremlin-client/Cargo.toml --features=async-std-runtime,merge_tests,derive
8 changes: 4 additions & 4 deletions gremlin-cli/src/actions/connect.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{actions::Action, command::Command, context::GremlinContext};
use futures::FutureExt;
use gremlin_client::{aio::GremlinClient, ConnectionOptions, GraphSON, TlsOptions};
use gremlin_client::{aio::GremlinClient, ConnectionOptions, IoProtocol, TlsOptions};
use std::str::FromStr;
use structopt::StructOpt;

Expand Down Expand Up @@ -30,11 +30,11 @@ impl FromStr for Serializer {
}
}

impl From<Serializer> for GraphSON {
impl From<Serializer> for IoProtocol {
fn from(serializer: Serializer) -> Self {
match serializer {
Serializer::GraphSONV2 => GraphSON::V2,
Serializer::GraphSONV3 => GraphSON::V3,
Serializer::GraphSONV2 => IoProtocol::GraphSONV2,
Serializer::GraphSONV3 => IoProtocol::GraphSONV3,
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions gremlin-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ tokio = { version = "1", optional=true, features = ["full"] }
features = ["serde", "v4"]
version = "1.1.2"



[dev-dependencies]
rstest = "0.23.0"
rstest_reuse = "0.7.0"
serial_test = "3.1.1"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each test generally used labels to prevent tests from stepping on each other. However the parameterized serializers will run concurrently. So I used serial_test on tests that parameters that operate on the same test don't step on each other and cause flakey failures.


[[example]]
name = "traversal_async"
Expand Down
91 changes: 37 additions & 54 deletions gremlin-client/src/aio/client.rs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general all the changes here and in the blocking client are to encapsulate the differences between the serializers so the binary protocol can be easily called from the same code path as the graphson protocols

Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
use crate::aio::pool::GremlinConnectionManager;
use crate::aio::GResultSet;
use crate::io::GraphSON;
use crate::message::{
message_with_args, message_with_args_and_uuid, message_with_args_v2, Message,
};
use crate::process::traversal::Bytecode;
use crate::GValue;
use crate::ToGValue;
use crate::{ConnectionOptions, GremlinError, GremlinResult};
use base64::encode;

Check warning on line 7 in gremlin-client/src/aio/client.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

use of deprecated function `base64::encode`: Use Engine::encode

Check warning on line 7 in gremlin-client/src/aio/client.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

use of deprecated function `base64::encode`: Use Engine::encode

Check warning on line 7 in gremlin-client/src/aio/client.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

use of deprecated function `base64::encode`: Use Engine::encode

Check warning on line 7 in gremlin-client/src/aio/client.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

use of deprecated function `base64::encode`: Use Engine::encode
use futures::future::{BoxFuture, FutureExt};
use mobc::{Connection, Pool};
use serde::Serialize;
use std::collections::{HashMap, VecDeque};
use uuid::Uuid;

pub type SessionedClient = GremlinClient;

Expand All @@ -21,18 +17,14 @@
if let Some(session_name) = self.session.take() {
let mut args = HashMap::new();
args.insert(String::from("session"), GValue::from(session_name.clone()));
let args = self.options.serializer.write(&GValue::from(args))?;

let processor = "session".to_string();

let message = match self.options.serializer {
GraphSON::V2 => message_with_args_v2(String::from("close"), processor, args),
GraphSON::V3 => message_with_args(String::from("close"), processor, args),
};
let (id, message) = self
.options
.serializer
.build_message("close", "session", args, None)?;

let conn = self.pool.get().await?;

self.send_message_new(conn, message).await
self.send_message_new(conn, id, message).await
} else {
Err(GremlinError::Generic("No session to close".to_string()))
}
Expand Down Expand Up @@ -131,47 +123,38 @@
args.insert(String::from("session"), GValue::from(session_name.clone()));
}

let args = self.options.serializer.write(&GValue::from(args))?;

let processor = if self.session.is_some() {
"session".to_string()
"session"
} else {
String::default()
""
};

let message = match self.options.serializer {
GraphSON::V2 => message_with_args_v2(String::from("eval"), processor, args),
GraphSON::V3 => message_with_args(String::from("eval"), processor, args),
};
let (id, message) = self
.options
.serializer
.build_message("eval", processor, args, None)?;

let conn = self.pool.get().await?;

self.send_message_new(conn, message).await
self.send_message_new(conn, id, message).await
}

pub(crate) fn send_message_new<'a, T: Serialize>(
pub(crate) fn send_message_new<'a>(
&'a self,
mut conn: Connection<GremlinConnectionManager>,
msg: Message<T>,
id: Uuid,
binary: Vec<u8>,
) -> BoxFuture<'a, GremlinResult<GResultSet>> {
let id = msg.id().clone();
let message = self.build_message(msg).unwrap();

async move {
let content_type = self.options.serializer.content_type();
let payload = String::from("") + content_type + &message;
let mut binary = payload.into_bytes();
binary.insert(0, content_type.len() as u8);

let (response, receiver) = conn.send(id, binary).await?;

let (response, results) = match response.status.code {
200 | 206 => {
let results: VecDeque<GValue> = self
.options
.deserializer
.read(&response.result.data)?
.map(|v| v.into())
let results: VecDeque<GValue> = response
.result
.data
.clone()
.map(Into::into)
.unwrap_or_else(VecDeque::new);
Ok((response, results))
}
Expand All @@ -182,18 +165,21 @@

args.insert(
String::from("sasl"),
GValue::String(encode(&format!("\0{}\0{}", c.username, c.password))),

Check warning on line 168 in gremlin-client/src/aio/client.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

use of deprecated function `base64::encode`: Use Engine::encode

Check warning on line 168 in gremlin-client/src/aio/client.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

use of deprecated function `base64::encode`: Use Engine::encode

Check warning on line 168 in gremlin-client/src/aio/client.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

use of deprecated function `base64::encode`: Use Engine::encode

Check warning on line 168 in gremlin-client/src/aio/client.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

use of deprecated function `base64::encode`: Use Engine::encode
);

let args = self.options.serializer.write(&GValue::from(args))?;
let message = message_with_args_and_uuid(
String::from("authentication"),
String::from("traversal"),
response.request_id,
let (id, message) = self.options.serializer.build_message(
"authentication",
"traversal",
args,
);

return self.send_message_new(conn, message).await;
Some(
response
.request_id
.expect("Auth challenge requires response id"),
),
)?;

return self.send_message_new(conn, id, message).await;
}
None => Err(GremlinError::Request((
response.status.code,
Expand Down Expand Up @@ -229,16 +215,13 @@

args.insert(String::from("aliases"), GValue::from(aliases));

let args = self.options.serializer.write(&GValue::from(args))?;

let message = message_with_args(String::from("bytecode"), String::from("traversal"), args);
let (id, message) =
self.options
.serializer
.build_message("bytecode", "traversal", args, None)?;

let conn = self.pool.get().await?;

self.send_message_new(conn, message).await
}

fn build_message<T: Serialize>(&self, msg: Message<T>) -> GremlinResult<String> {
serde_json::to_string(&msg).map_err(GremlinError::from)
self.send_message_new(conn, id, message).await
}
}
25 changes: 20 additions & 5 deletions gremlin-client/src/aio/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{GremlinError, GremlinResult, WebSocketOptions};
use crate::{GremlinError, GremlinResult, IoProtocol};

use crate::connection::ConnectionOptions;

Expand Down Expand Up @@ -90,7 +90,7 @@
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::TLSError> {

Check warning on line 93 in gremlin-client/src/aio/connection.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

use of deprecated type alias `rustls::TLSError`: Use Error

Check warning on line 93 in gremlin-client/src/aio/connection.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

use of deprecated type alias `rustls::TLSError`: Use Error
Ok(rustls::client::ServerCertVerified::assertion())
}
}
Expand Down Expand Up @@ -165,7 +165,7 @@

sender_loop(sink, requests.clone(), receiver);

receiver_loop(stream, requests.clone(), sender.clone());
receiver_loop(stream, requests.clone(), sender.clone(), opts.deserializer);

Ok(Conn {
sender,
Expand Down Expand Up @@ -266,6 +266,7 @@
mut stream: SplitStream<WSStream>,
requests: Arc<Mutex<HashMap<Uuid, Sender<GremlinResult<Response>>>>>,
mut sender: Sender<Cmd>,
deserializer: IoProtocol,
) {
task::spawn(async move {
loop {
Expand All @@ -283,10 +284,24 @@
}
Some(Ok(item)) => match item {
Message::Binary(data) => {
let response: Response = serde_json::from_slice(&data).unwrap();
let response = deserializer
.read_response(data)
.expect("Unable to parse message");
let mut guard = requests.lock().await;

//GraphBinary permits a null response request id, so in lieu of a request id assume
//a single entry in the requests to be the one we should respond to given connection
//multiplexing isn't currently implemented
Comment on lines +292 to +294
Copy link
Contributor Author

@criminosis criminosis Nov 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least on the GraphBinary side its documented that request Id can come back null. This seemed like the best means to try to mitigate that when it happens. Since connection multiplexing isn't in play, it seems like this should be an okay tradeoff.

The alternative would be removing the whole requests session ledger since there isn't multiplexing anyways.

let request_id = response.request_id.unwrap_or_else(|| {
if guard.len() == 1 {
guard.keys().next().expect("Should have had only 1 key").clone()
} else {
panic!("Request response without request id was received, but there isn't only 1 request currently submitted");
}
});

if response.status.code != 206 {
let item = guard.remove(&response.request_id);
let item = guard.remove(&request_id);
drop(guard);
if let Some(mut s) = item {
match s.send(Ok(response)).await {
Expand All @@ -295,7 +310,7 @@
};
}
} else {
let item = guard.get_mut(&response.request_id);
let item = guard.get_mut(&request_id);
if let Some(s) = item {
match s.send(Ok(response)).await {
Ok(_r) => {}
Expand Down
51 changes: 16 additions & 35 deletions gremlin-client/src/aio/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
use crate::aio::connection::Conn;
use crate::connection::ConnectionOptions;
use crate::error::GremlinError;
use crate::message::{message_with_args, message_with_args_and_uuid, message_with_args_v2};
use crate::{GValue, GraphSON};
use crate::GValue;
use async_trait::async_trait;
use base64::encode;

Check warning on line 8 in gremlin-client/src/aio/pool.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

use of deprecated function `base64::encode`: Use Engine::encode

Check warning on line 8 in gremlin-client/src/aio/pool.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

use of deprecated function `base64::encode`: Use Engine::encode
use std::collections::HashMap;

#[derive(Debug)]
Expand Down Expand Up @@ -40,23 +39,13 @@
String::from("language"),
GValue::String(String::from("gremlin-groovy")),
);
let args = self.options.serializer.write(&GValue::from(args))?;

let message = match self.options.serializer {
GraphSON::V2 => message_with_args_v2(String::from("eval"), String::default(), args),
GraphSON::V3 => message_with_args(String::from("eval"), String::default(), args),
};
let (id, message) = self
.options
.serializer
.build_message("eval", "", args, None)?;

let id = message.id().clone();
let msg = serde_json::to_string(&message).map_err(GremlinError::from)?;

let content_type = self.options.serializer.content_type();

let payload = String::from("") + content_type + &msg;
let mut binary = payload.into_bytes();
binary.insert(0, content_type.len() as u8);

let (response, _receiver) = conn.send(id, binary).await?;
let (response, _receiver) = conn.send(id, message).await?;

match response.status.code {
200 | 206 => Ok(conn),
Expand All @@ -70,25 +59,17 @@
GValue::String(encode(&format!("\0{}\0{}", c.username, c.password))),
);

let args = self.options.serializer.write(&GValue::from(args))?;
let message = message_with_args_and_uuid(
String::from("authentication"),
String::from("traversal"),
response.request_id,
let (id, message) = self.options.serializer.build_message(
"authentication",
"traversal",
args,
);

let id = message.id().clone();
let msg = serde_json::to_string(&message).map_err(GremlinError::from)?;

let content_type = self.options.serializer.content_type();
let payload = String::from("") + content_type + &msg;

let mut binary = payload.into_bytes();
binary.insert(0, content_type.len() as u8);

let (response, _receiver) = conn.send(id, binary).await?;

Some(
response
.request_id
.expect("Auth challenge requires response id"),
),
)?;
let (response, _receiver) = conn.send(id, message).await?;
match response.status.code {
200 | 206 => Ok(conn),
204 => Ok(conn),
Expand Down
12 changes: 5 additions & 7 deletions gremlin-client/src/aio/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,12 @@ impl Stream for GResultSet {
if this.response.status.code == 206 {
match futures::ready!(this.receiver.as_mut().poll_next(cx)) {
Some(Ok(response)) => {
let results: VecDeque<GValue> = this
.client
.options
.serializer
.read(&response.result.data)?
.map(|v| v.into())
let results: VecDeque<GValue> = response
.result
.data
.clone()
.map(Into::into)
.unwrap_or_else(VecDeque::new);

*this.results = results;
*this.response = response;
}
Expand Down
Loading
Loading