Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
shachlanAmazon committed Nov 15, 2023
1 parent fbf7398 commit 7d1dfc8
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 63 deletions.
2 changes: 0 additions & 2 deletions babushka-core/build.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::path::Path;

use flatc_rust;

fn main() {
flatc_rust::run(flatc_rust::Args {
inputs: &[
Expand Down
14 changes: 7 additions & 7 deletions babushka-core/src/flatbuffers-schema/connection_request.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@ enum TlsMode : int {
}

table AddressInfo {
host:string;
host:string (required);
port:uint;
}

table AuthenticationInfo {
password:string;
password:string (required);
username:string;
}

table ConnectionRequest {
addresses:[connection_request.AddressInfo];
tls_mode:connection_request.TlsMode;
addresses:[AddressInfo] (required);
tls_mode:TlsMode;
cluster_mode_enabled:bool;
request_timeout:uint;
client_creation_timeout:uint;
read_from:connection_request.ReadFrom;
connection_retry_strategy:connection_request.ConnectionRetryStrategy;
authentication_info:connection_request.AuthenticationInfo;
read_from:ReadFrom;
connection_retry_strategy:ConnectionRetryStrategy;
authentication_info:AuthenticationInfo;
database_id:uint;
}

Expand Down
10 changes: 5 additions & 5 deletions babushka-core/src/flatbuffers-schema/redis_request.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ table SlotIdRoute {

table SlotKeyRoute {
slot_type:SlotTypes;
slot_key:string;
slot_key:string (required);
}

union Routes {SimpleRoutesTable, SlotKeyRoute, SlotIdRoute}

table ArgsArray {
args:[string];
args:[string] (required);
}

table ArgsVecPointer {
Expand All @@ -109,11 +109,11 @@ union ArgsOptions {

table Command {
request_type:RequestType;
args:ArgsOptions;
args:ArgsOptions (required);
}

table Transaction {
commands:[Command];
commands:[Command] (required);
}

union CommandOptions {
Expand All @@ -123,6 +123,6 @@ union CommandOptions {

table RedisRequest {
callback_idx:uint;
command:CommandOptions;
command:CommandOptions (required);
route:Routes;
}
4 changes: 2 additions & 2 deletions babushka-core/src/flatbuffers-schema/response.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ enum ConstantResponse : int {

table RequestError {
type:RequestErrorType;
message:string;
message:string (required);
}

table Response {
callback_idx:uint;
value:Value;
value:Value (required);
}

table Value {
Expand Down
75 changes: 28 additions & 47 deletions babushka-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use super::rotating_buffer::RotatingBuffer;
use crate::client::Client;
use crate::connection_request::connection_request::ConnectionRequest;
use crate::redis_request::redis_request::{
ArgsArray, ArgsVecPointer, Command, RedisRequest, RequestType, Routes, SimpleRoutes,
SlotTypes, Transaction, CommandOptions,
ArgsArray, ArgsOptions, ArgsVecPointer, Command, CommandOptions, RedisRequest, RequestType,
Routes, SimpleRoutes, SlotTypes, Transaction,
};
use crate::response::response;
use crate::response::response::Response;
Expand Down Expand Up @@ -344,19 +344,21 @@ fn get_redis_command(command: &Command) -> Result<Cmd, ClienUsageError> {
)));
};

match &command.args_type() {
Some(ArgsArray(args_vec)) => {
for arg in args_vec.args.iter() {
match command.args_type() {
ArgsOptions::ArgsArray => {
let args_array = command.args_as_args_array().unwrap().args();
for arg in args_array.iter() {
cmd.arg(arg.as_bytes());
}
}
Some(ArgsVecPointer { pointer }) => {
let res = *unsafe { Box::from_raw(*pointer as *mut Vec<String>) };
ArgsOptions::ArgsVecPointer => {
let pointer = command.args_as_args_vec_pointer().unwrap().pointer();
let res = *unsafe { Box::from_raw(pointer as *mut Vec<String>) };
for arg in res {
cmd.arg(arg.as_bytes());
}
}
None => {
ArgsOptions::NONE => {
return Err(ClienUsageError::InternalError(
"Failed to get request arguemnts, no arguments are set".to_string(),
));
Expand All @@ -383,7 +385,7 @@ async fn send_transaction(
routing: Option<RoutingInfo>,
) -> ClientUsageResult<Value> {
let commands = request.commands();
let mut pipeline = redis::Pipeline::with_capacity(commands.capacity());
let mut pipeline = redis::Pipeline::with_capacity(commands.len());
let offset = commands.len() + 1;
pipeline.atomic();
for command in commands {
Expand Down Expand Up @@ -429,20 +431,24 @@ fn get_route(
Ok(Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)))
}
}
},
}
Routes::SlotIdRoute => {
let slot_id_route = request.route_as_slot_id_route().unwrap();
Ok(Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route::new(
slot_id_route.slot_id() as u16,
get_slot_addr(&slot_id_route.slot_type())?,
)))))
},
Ok(Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(Route::new(
slot_id_route.slot_id() as u16,
get_slot_addr(&slot_id_route.slot_type())?,
)),
)))
}
Routes::SlotKeyRoute => {
let slot_key_route = request.route_as_slot_key_route().unwrap();
Ok(Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route::new(
redis::cluster_topology::get_slot(slot_key_route.slot_key().unwrap().as_bytes()),
get_slot_addr(&slot_key_route.slot_type())?,
)))))
let slot_key_route = request.route_as_slot_key_route().unwrap();
Ok(Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(Route::new(
redis::cluster_topology::get_slot(slot_key_route.slot_key().as_bytes()),
get_slot_addr(&slot_key_route.slot_type())?,
)),
)))
}
}
}
Expand All @@ -465,40 +471,15 @@ fn handle_request(request: RedisRequest, client: Client, writer: Rc<Writer>) {
}
Err(e) => Err(e),
}
},
}
CommandOptions::Transaction => {
let transaction = request.command_as_transaction().unwrap();
match get_route(&request, None) {
Ok(routes) => send_transaction(transaction, client, routes).await,
Err(e) => Err(e),
},

}
}
};
// let result = match request.command() {
// Some(action) => match action. {
// SingleCommand(command) => match get_redis_command(&command) {
// Ok(cmd) => {
// let response_policy = cmd
// .command()
// .map(|cmd| ResponsePolicy::for_command(&cmd))
// .unwrap_or(None);
// match get_route(request.route.0, response_policy) {
// Ok(routes) => send_command(cmd, client, routes).await,
// Err(e) => Err(e),
// }
// }
// Err(e) => Err(e),
// },
// Transaction(transaction) => match get_route(request.route.0, None) {
// Ok(routes) => send_transaction(transaction, client, routes).await,
// Err(e) => Err(e),
// },
// },
// None => Err(ClienUsageError::InternalError(
// "Received empty request".to_string(),
// )),
// };

let _res = write_result(result, request.callback_idx(), &writer).await;
});
Expand Down

0 comments on commit 7d1dfc8

Please sign in to comment.