From 7d1dfc8b7563eec7a059b29d45306d9aa279c28e Mon Sep 17 00:00:00 2001 From: Shachar Langbeheim Date: Wed, 15 Nov 2023 09:47:58 +0000 Subject: [PATCH] temp --- babushka-core/build.rs | 2 - .../flatbuffers-schema/connection_request.fbs | 14 ++-- .../src/flatbuffers-schema/redis_request.fbs | 10 +-- .../src/flatbuffers-schema/response.fbs | 4 +- babushka-core/src/socket_listener.rs | 75 +++++++------------ 5 files changed, 42 insertions(+), 63 deletions(-) diff --git a/babushka-core/build.rs b/babushka-core/build.rs index f6d3594..ec6bc94 100644 --- a/babushka-core/build.rs +++ b/babushka-core/build.rs @@ -1,7 +1,5 @@ use std::path::Path; -use flatc_rust; - fn main() { flatc_rust::run(flatc_rust::Args { inputs: &[ diff --git a/babushka-core/src/flatbuffers-schema/connection_request.fbs b/babushka-core/src/flatbuffers-schema/connection_request.fbs index aa06fce..7a79a79 100644 --- a/babushka-core/src/flatbuffers-schema/connection_request.fbs +++ b/babushka-core/src/flatbuffers-schema/connection_request.fbs @@ -14,24 +14,24 @@ enum TlsMode : int { } table AddressInfo { - host:string; + host:string (required); port:uint; } table AuthenticationInfo { - password:string; + password:string (required); username:string; } table ConnectionRequest { - addresses:[connection_request.AddressInfo]; - tls_mode:connection_request.TlsMode; + addresses:[AddressInfo] (required); + tls_mode:TlsMode; cluster_mode_enabled:bool; request_timeout:uint; client_creation_timeout:uint; - read_from:connection_request.ReadFrom; - connection_retry_strategy:connection_request.ConnectionRetryStrategy; - authentication_info:connection_request.AuthenticationInfo; + read_from:ReadFrom; + connection_retry_strategy:ConnectionRetryStrategy; + authentication_info:AuthenticationInfo; database_id:uint; } diff --git a/babushka-core/src/flatbuffers-schema/redis_request.fbs b/babushka-core/src/flatbuffers-schema/redis_request.fbs index 47a44cc..04c00b3 100644 --- a/babushka-core/src/flatbuffers-schema/redis_request.fbs +++ b/babushka-core/src/flatbuffers-schema/redis_request.fbs @@ -90,13 +90,13 @@ table SlotIdRoute { table SlotKeyRoute { slot_type:SlotTypes; - slot_key:string; + slot_key:string (required); } union Routes {SimpleRoutesTable, SlotKeyRoute, SlotIdRoute} table ArgsArray { - args:[string]; + args:[string] (required); } table ArgsVecPointer { @@ -109,11 +109,11 @@ union ArgsOptions { table Command { request_type:RequestType; - args:ArgsOptions; + args:ArgsOptions (required); } table Transaction { - commands:[Command]; + commands:[Command] (required); } union CommandOptions { @@ -123,6 +123,6 @@ union CommandOptions { table RedisRequest { callback_idx:uint; - command:CommandOptions; + command:CommandOptions (required); route:Routes; } diff --git a/babushka-core/src/flatbuffers-schema/response.fbs b/babushka-core/src/flatbuffers-schema/response.fbs index e674e74..0e45e14 100644 --- a/babushka-core/src/flatbuffers-schema/response.fbs +++ b/babushka-core/src/flatbuffers-schema/response.fbs @@ -13,12 +13,12 @@ enum ConstantResponse : int { table RequestError { type:RequestErrorType; - message:string; + message:string (required); } table Response { callback_idx:uint; - value:Value; + value:Value (required); } table Value { diff --git a/babushka-core/src/socket_listener.rs b/babushka-core/src/socket_listener.rs index 8c70f7f..25f63d9 100644 --- a/babushka-core/src/socket_listener.rs +++ b/babushka-core/src/socket_listener.rs @@ -2,8 +2,8 @@ use super::rotating_buffer::RotatingBuffer; use crate::client::Client; use crate::connection_request::connection_request::ConnectionRequest; use crate::redis_request::redis_request::{ - ArgsArray, ArgsVecPointer, Command, RedisRequest, RequestType, Routes, SimpleRoutes, - SlotTypes, Transaction, CommandOptions, + ArgsArray, ArgsOptions, ArgsVecPointer, Command, CommandOptions, RedisRequest, RequestType, + Routes, SimpleRoutes, SlotTypes, Transaction, }; use crate::response::response; use crate::response::response::Response; @@ -344,19 +344,21 @@ fn get_redis_command(command: &Command) -> Result { ))); }; - match &command.args_type() { - Some(ArgsArray(args_vec)) => { - for arg in args_vec.args.iter() { + match command.args_type() { + ArgsOptions::ArgsArray => { + let args_array = command.args_as_args_array().unwrap().args(); + for arg in args_array.iter() { cmd.arg(arg.as_bytes()); } } - Some(ArgsVecPointer { pointer }) => { - let res = *unsafe { Box::from_raw(*pointer as *mut Vec) }; + ArgsOptions::ArgsVecPointer => { + let pointer = command.args_as_args_vec_pointer().unwrap().pointer(); + let res = *unsafe { Box::from_raw(pointer as *mut Vec) }; for arg in res { cmd.arg(arg.as_bytes()); } } - None => { + ArgsOptions::NONE => { return Err(ClienUsageError::InternalError( "Failed to get request arguemnts, no arguments are set".to_string(), )); @@ -383,7 +385,7 @@ async fn send_transaction( routing: Option, ) -> ClientUsageResult { let commands = request.commands(); - let mut pipeline = redis::Pipeline::with_capacity(commands.capacity()); + let mut pipeline = redis::Pipeline::with_capacity(commands.len()); let offset = commands.len() + 1; pipeline.atomic(); for command in commands { @@ -429,20 +431,24 @@ fn get_route( Ok(Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))) } } - }, + } Routes::SlotIdRoute => { let slot_id_route = request.route_as_slot_id_route().unwrap(); - Ok(Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route::new( - slot_id_route.slot_id() as u16, - get_slot_addr(&slot_id_route.slot_type())?, - ))))) - }, + Ok(Some(RoutingInfo::SingleNode( + SingleNodeRoutingInfo::SpecificNode(Route::new( + slot_id_route.slot_id() as u16, + get_slot_addr(&slot_id_route.slot_type())?, + )), + ))) + } Routes::SlotKeyRoute => { -let slot_key_route = request.route_as_slot_key_route().unwrap(); -Ok(Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route::new( - redis::cluster_topology::get_slot(slot_key_route.slot_key().unwrap().as_bytes()), - get_slot_addr(&slot_key_route.slot_type())?, - ))))) + let slot_key_route = request.route_as_slot_key_route().unwrap(); + Ok(Some(RoutingInfo::SingleNode( + SingleNodeRoutingInfo::SpecificNode(Route::new( + redis::cluster_topology::get_slot(slot_key_route.slot_key().as_bytes()), + get_slot_addr(&slot_key_route.slot_type())?, + )), + ))) } } } @@ -465,40 +471,15 @@ fn handle_request(request: RedisRequest, client: Client, writer: Rc) { } Err(e) => Err(e), } - }, + } CommandOptions::Transaction => { let transaction = request.command_as_transaction().unwrap(); match get_route(&request, None) { Ok(routes) => send_transaction(transaction, client, routes).await, Err(e) => Err(e), - }, - + } } }; - // let result = match request.command() { - // Some(action) => match action. { - // SingleCommand(command) => match get_redis_command(&command) { - // Ok(cmd) => { - // let response_policy = cmd - // .command() - // .map(|cmd| ResponsePolicy::for_command(&cmd)) - // .unwrap_or(None); - // match get_route(request.route.0, response_policy) { - // Ok(routes) => send_command(cmd, client, routes).await, - // Err(e) => Err(e), - // } - // } - // Err(e) => Err(e), - // }, - // Transaction(transaction) => match get_route(request.route.0, None) { - // Ok(routes) => send_transaction(transaction, client, routes).await, - // Err(e) => Err(e), - // }, - // }, - // None => Err(ClienUsageError::InternalError( - // "Received empty request".to_string(), - // )), - // }; let _res = write_result(result, request.callback_idx(), &writer).await; });