Skip to content

Commit

Permalink
feat: first implementation of server scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
tglman committed Oct 23, 2024
1 parent e442429 commit c29a573
Show file tree
Hide file tree
Showing 15 changed files with 510 additions and 7 deletions.
34 changes: 34 additions & 0 deletions orientdb-client/src/asynchronous/client.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use super::network::cluster::AsyncConnection;
use super::network::cluster::{Cluster, Server};
use super::session::{OSession, SessionPool, SessionPoolManager};
use crate::asynchronous::server_statement::ServerStatement;
use crate::asynchronous::types::resultset::ServerResultSet;
use crate::common::protocol::messages::request::{
Close, Connect, CreateDB, DropDB, ExistDB, MsgHeader, Open,
};
use crate::common::protocol::messages::response;
use crate::common::types::result::OResult;
use crate::common::ConnectionOptions;
use crate::{DatabaseType, OrientResult};
use futures::Stream;
use std::future::Future;
use std::net::SocketAddr;
use std::net::ToSocketAddrs;
Expand Down Expand Up @@ -205,4 +209,34 @@ impl OrientDBClientInternal {
})
.await
}

pub async fn execute(
&self,
user: &str,
password: &str,
query: &str,
) -> OrientResult<ServerStatement> {
Ok(ServerStatement::new(
self,
user.to_string(),
password.to_string(),
query.to_string(),
))
}

pub(crate) async fn run(
&self,
stmt: ServerStatement<'_>,
) -> OrientResult<impl Stream<Item = OrientResult<OResult>>> {
let user = stmt.user.clone();
let pwd = stmt.password.clone();
self.run_as_admin(&user, &pwd, move |session, mut conn| async move {
let response: response::ServerQuery = conn
.send(stmt.into_query(session.session_id, session.token).into())
.await?
.payload();
Ok((conn, ServerResultSet::new(response)))
})
.await
}
}
1 change: 1 addition & 0 deletions orientdb-client/src/asynchronous/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod client;
pub mod live;
pub mod live_statement;
pub mod network;
pub mod server_statement;
pub mod session;
pub mod statement;
pub mod types;
Expand Down
129 changes: 129 additions & 0 deletions orientdb-client/src/asynchronous/server_statement.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use super::client::OrientDBClientInternal;
use crate::common::protocol::messages::request::ServerQuery;
use crate::common::types::value::{IntoOValue, OValue};
use crate::common::types::OResult;
#[cfg(feature = "sugar")]
use crate::types::result::FromResult;
use crate::OrientResult;
use futures::Stream;
use std::collections::HashMap;

#[cfg(feature = "sugar")]
use futures::StreamExt;

pub struct ServerStatement<'a> {
client: &'a OrientDBClientInternal,
pub(crate) user: String,
pub(crate) password: String,
stm: String,
params: HashMap<String, OValue>,
language: String,
page_size: i32,
mode: i8,
named: bool,
}

impl<'a> ServerStatement<'a> {
pub(crate) fn new(
client: &'a OrientDBClientInternal,
user: String,
password: String,
stm: String,
) -> ServerStatement<'a> {
ServerStatement {
client,
user,
password,
stm,
params: HashMap::new(),
named: true,
mode: 1,
language: String::from("sql"),
page_size: 150,
}
}

pub fn positional(mut self, params: &[&dyn IntoOValue]) -> Self {
let mut p = HashMap::new();
for (i, elem) in params.iter().enumerate() {
p.insert(i.to_string(), elem.into_ovalue());
}
self.params = p;
self.named = false;
self
}
pub fn named(mut self, params: &[(&str, &dyn IntoOValue)]) -> Self {
self.params = params
.iter()
.map(|&(k, ref v)| (String::from(k), v.into_ovalue()))
.collect();

self.named = true;
self
}

pub async fn run(self) -> OrientResult<impl Stream<Item = OrientResult<OResult>>> {
self.client.run(self.into()).await
}

#[cfg(feature = "sugar")]
pub async fn fetch_one<T>(self) -> OrientResult<Option<T>>
where
T: FromResult,
{
let mut stream = self
.client
.run(self.into())
.await?
.map(|r| r.and_then(T::from_result));

match stream.next().await {
Some(r) => Ok(Some(r?)),
None => Ok(None),
}
}

#[cfg(feature = "sugar")]
pub async fn fetch<T>(self) -> OrientResult<Vec<T>>
where
T: FromResult,
{
let mut stream = self
.client
.run(self.into())
.await?
.map(|r| r.and_then(T::from_result));

let mut results = Vec::new();

while let Some(r) = stream.next().await {
results.push(r?);
}
Ok(results)
}

#[cfg(feature = "sugar")]
pub async fn stream<T>(self) -> OrientResult<impl Stream<Item = OrientResult<T>>>
where
T: FromResult,
{
Ok(self
.client
.run(self.into())
.await?
.map(|r| r.and_then(T::from_result)))
}

pub(crate) fn into_query(self, session_id: i32, token: Option<Vec<u8>>) -> ServerQuery {
ServerQuery {
session_id,
token,
query: self.stm,
language: self.language,
named: self.named,
parameters: self.params,
page_size: self.page_size,
mode: self.mode,
}
}
}
20 changes: 19 additions & 1 deletion orientdb-client/src/asynchronous/types/resultset.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::asynchronous::network::cluster::Server;
use crate::common::protocol::messages::request::{QueryClose, QueryNext};
use crate::common::protocol::messages::response::Query;
use crate::common::protocol::messages::response::{Query, ServerQuery};
use crate::common::types::result::OResult;
use crate::OrientResult;
#[cfg(feature = "async-std-runtime")]
Expand Down Expand Up @@ -123,3 +123,21 @@ async fn close_result(
conn.send(msg.into()).await?;
Ok(())
}

pub struct ServerResultSet {
response: ServerQuery,
}

impl ServerResultSet {
pub(crate) fn new(response: ServerQuery) -> ServerResultSet {
ServerResultSet { response }
}
}

impl futures::Stream for ServerResultSet {
type Item = OrientResult<OResult>;

fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.response.records.pop_front().map(|x| Ok(x)))
}
}
48 changes: 48 additions & 0 deletions orientdb-client/src/common/protocol/messages/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,13 +347,61 @@ impl From<DropDB> for Request {
}
}

// Server Query Message
#[derive(Debug)]
pub struct ServerQuery {
pub session_id: i32,
pub token: Option<Vec<u8>>,
pub query: String,
pub parameters: HashMap<String, OValue>,
pub named: bool,
pub language: String,
pub mode: i8,
pub page_size: i32,
}

impl ServerQuery {
#[allow(clippy::too_many_arguments)]
pub fn new<T>(
session_id: i32,
token: Option<Vec<u8>>,
query: T,
parameters: HashMap<String, OValue>,
named: bool,
language: T,
mode: i8,
page_size: i32,
) -> ServerQuery
where
T: Into<String>,
{
ServerQuery {
session_id,
token,
query: query.into(),
parameters,
named,
language: language.into(),
mode,
page_size,
}
}
}

impl From<ServerQuery> for Request {
fn from(input: ServerQuery) -> Request {
Request::ServerQuery(input)
}
}

#[derive(Debug)]
pub enum Request {
HandShake(HandShake),
Connect(Connect),
CreateDB(CreateDB),
ExistDB(ExistDB),
DropDB(DropDB),
ServerQuery(ServerQuery),
Open(Open),
Query(Query),
LiveQuery(LiveQuery),
Expand Down
41 changes: 41 additions & 0 deletions orientdb-client/src/common/protocol/messages/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,45 @@ impl From<QueryClose> for ResponseType {
}
}

#[derive(Debug)]
pub struct ServerQuery {
pub query_id: String,
pub tx_changes: bool,
pub execution_plan: Option<OResult>,
pub records: VecDeque<OResult>,
pub has_next: bool,
pub stats: HashMap<String, i64>,
}

impl ServerQuery {
pub fn new<T>(
query_id: T,
tx_changes: bool,
execution_plan: Option<OResult>,
records: VecDeque<OResult>,
has_next: bool,
stats: HashMap<String, i64>,
) -> ServerQuery
where
T: Into<String>,
{
ServerQuery {
query_id: query_id.into(),
tx_changes,
execution_plan,
records,
has_next,
stats,
}
}
}

impl From<ServerQuery> for ResponseType {
fn from(input: ServerQuery) -> ResponseType {
ResponseType::ServerQuery(Some(input))
}
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum ResponseType {
Expand All @@ -200,6 +239,7 @@ pub enum ResponseType {
CreateDB(Option<CreateDB>),
ExistDB(Option<ExistDB>),
DropDB(Option<DropDB>),
ServerQuery(Option<ServerQuery>),
LiveQuery(Option<LiveQuery>),
LiveQueryResult(Option<LiveQueryResult>),
QueryClose(Option<QueryClose>),
Expand Down Expand Up @@ -259,3 +299,4 @@ impl_payload!(ExistDB);
impl_payload!(Connect);
impl_payload!(LiveQuery);
impl_payload!(LiveQueryResult);
impl_payload!(ServerQuery);
29 changes: 28 additions & 1 deletion orientdb-client/src/sync/client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use super::network::cluster::SyncConnection;
use super::network::cluster::{Cluster, Server};
use crate::common::protocol::messages::request::{
Close, Connect, CreateDB, DropDB, ExistDB, MsgHeader, Open,
Close, Connect, CreateDB, DropDB, ExistDB, MsgHeader, Open, ServerQuery,
};
use crate::common::protocol::messages::response;
use crate::common::ConnectionOptions;
use crate::sync::server_statement::ServerStatement;
use crate::sync::session::{OSession, SessionPool, SessionPoolManager};
use crate::sync::types::resultset::{ResultSet, ServerResultSet};
use crate::{DatabaseType, OrientResult};
use std::net::SocketAddr;
use std::net::ToSocketAddrs;
Expand Down Expand Up @@ -187,4 +189,29 @@ impl OrientDBClientInternal {
Ok(())
})
}

pub fn execute(
&self,
user: &str,
password: &str,
query: &str,
) -> OrientResult<ServerStatement> {
Ok(ServerStatement::new(
self,
user.to_string(),
password.to_string(),
query.to_string(),
))
}

pub(crate) fn run(&self, stmt: ServerStatement) -> OrientResult<impl ResultSet> {
let user = stmt.user.clone();
let pwd = stmt.password.clone();
self.run_as_admin(&user, &pwd, move |session, conn| {
let response: response::ServerQuery = conn
.send(stmt.into_query(session.session_id, session.token).into())?
.payload();
Ok(ServerResultSet::new(response))
})
}
}
1 change: 1 addition & 0 deletions orientdb-client/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod client;
pub mod network;
pub(crate) mod protocol;
pub mod server_statement;
pub mod session;
pub mod statement;
pub mod types;
Loading

0 comments on commit c29a573

Please sign in to comment.