Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into node/integ_aarzola_…
Browse files Browse the repository at this point in the history
…add_blpop

Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Apr 10, 2024
2 parents 508e604 + 78cfa33 commit 21ab3f0
Show file tree
Hide file tree
Showing 61 changed files with 4,988 additions and 2,627 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/ort.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ jobs:
with:
submodules: "true"
ref: ${{ env.BASE_BRANCH }}
# This is a temporary fix, till ORT will fix thire issue with newer v of Cargo - https://github.com/oss-review-toolkit/ort/issues/8480
- name: Install Rust toolchain
uses: dtolnay/[email protected]
with:
targets: ${{ inputs.target }}

- name: Set up JDK 11 for the ORT package
uses: actions/setup-java@v4
Expand Down Expand Up @@ -204,4 +209,4 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
EVENT_NAME: ${{ github.event_name }}
INPUT_VERSION: ${{ github.event.inputs.version }}


4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#### Changes
* Python: Added JSON.DEL JSON.FORGET commands ([#1146](https://github.com/aws/glide-for-redis/pull/1146))
* Python: Added STRLEN command ([#1230](https://github.com/aws/glide-for-redis/pull/1230))
* Python: Added HKEYS command ([#1228](https://github.com/aws/glide-for-redis/pull/1228))
* Python: Added ZREMRANGEBYSCORE command ([#1151](https://github.com/aws/glide-for-redis/pull/1151))
* Node: Added SPOP, SPOPCOUNT commands. ([#1117](https://github.com/aws/glide-for-redis/pull/1117))
* Node: Added `BLPOP` command ([#1223](https://github.com/aws/glide-for-redis/pull/1223))

#### Fixes
Expand Down
150 changes: 138 additions & 12 deletions csharp/lib/AsyncClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0
*/

using System.Buffers;
using System.Runtime.InteropServices;

namespace Glide;
Expand All @@ -22,18 +23,35 @@ public AsyncClient(string host, UInt32 port, bool useTLS)
}
}

public async Task SetAsync(string key, string value)
private async Task<string?> command(IntPtr[] args, int argsCount, RequestType requestType)
{
var message = messageContainer.GetMessageForCall(key, value);
SetFfi(clientPointer, (ulong)message.Index, message.KeyPtr, message.ValuePtr);
await message;
// We need to pin the array in place, in order to ensure that the GC doesn't move it while the operation is running.
GCHandle pinnedArray = GCHandle.Alloc(args, GCHandleType.Pinned);
IntPtr pointer = pinnedArray.AddrOfPinnedObject();
var message = messageContainer.GetMessageForCall(args, argsCount);
CommandFfi(clientPointer, (ulong)message.Index, (int)requestType, pointer, (uint)argsCount);
var result = await message;
pinnedArray.Free();
return result;
}

public async Task<string?> SetAsync(string key, string value)
{
var args = this.arrayPool.Rent(2);
args[0] = Marshal.StringToHGlobalAnsi(key);
args[1] = Marshal.StringToHGlobalAnsi(value);
var result = await command(args, 2, RequestType.SetString);
this.arrayPool.Return(args);
return result;
}

public async Task<string?> GetAsync(string key)
{
var message = messageContainer.GetMessageForCall(key, null);
GetFfi(clientPointer, (ulong)message.Index, message.KeyPtr);
return await message;
var args = this.arrayPool.Rent(1);
args[0] = Marshal.StringToHGlobalAnsi(key);
var result = await command(args, 1, RequestType.GetString);
this.arrayPool.Return(args);
return result;
}

public void Dispose()
Expand Down Expand Up @@ -89,18 +107,16 @@ private void FailureCallback(ulong index)
private IntPtr clientPointer;

private readonly MessageContainer<string> messageContainer = new();
private readonly ArrayPool<IntPtr> arrayPool = ArrayPool<IntPtr>.Shared;

#endregion private fields

#region FFI function declarations

private delegate void StringAction(ulong index, IntPtr str);
private delegate void FailureAction(ulong index);
[DllImport("libglide_rs", CallingConvention = CallingConvention.Cdecl, EntryPoint = "get")]
private static extern void GetFfi(IntPtr client, ulong index, IntPtr key);

[DllImport("libglide_rs", CallingConvention = CallingConvention.Cdecl, EntryPoint = "set")]
private static extern void SetFfi(IntPtr client, ulong index, IntPtr key, IntPtr value);
[DllImport("libglide_rs", CallingConvention = CallingConvention.Cdecl, EntryPoint = "command")]
private static extern void CommandFfi(IntPtr client, ulong index, Int32 requestType, IntPtr args, UInt32 argCount);

private delegate void IntAction(IntPtr arg);
[DllImport("libglide_rs", CallingConvention = CallingConvention.Cdecl, EntryPoint = "create_client")]
Expand All @@ -110,4 +126,114 @@ private void FailureCallback(ulong index)
private static extern void CloseClientFfi(IntPtr client);

#endregion

#region RequestType

// TODO: generate this with a bindings generator
private enum RequestType
{
InvalidRequest = 0,
CustomCommand = 1,
GetString = 2,
SetString = 3,
Ping = 4,
Info = 5,
Del = 6,
Select = 7,
ConfigGet = 8,
ConfigSet = 9,
ConfigResetStat = 10,
ConfigRewrite = 11,
ClientGetName = 12,
ClientGetRedir = 13,
ClientId = 14,
ClientInfo = 15,
ClientKill = 16,
ClientList = 17,
ClientNoEvict = 18,
ClientNoTouch = 19,
ClientPause = 20,
ClientReply = 21,
ClientSetInfo = 22,
ClientSetName = 23,
ClientUnblock = 24,
ClientUnpause = 25,
Expire = 26,
HashSet = 27,
HashGet = 28,
HashDel = 29,
HashExists = 30,
MGet = 31,
MSet = 32,
Incr = 33,
IncrBy = 34,
Decr = 35,
IncrByFloat = 36,
DecrBy = 37,
HashGetAll = 38,
HashMSet = 39,
HashMGet = 40,
HashIncrBy = 41,
HashIncrByFloat = 42,
LPush = 43,
LPop = 44,
RPush = 45,
RPop = 46,
LLen = 47,
LRem = 48,
LRange = 49,
LTrim = 50,
SAdd = 51,
SRem = 52,
SMembers = 53,
SCard = 54,
PExpireAt = 55,
PExpire = 56,
ExpireAt = 57,
Exists = 58,
Unlink = 59,
TTL = 60,
Zadd = 61,
Zrem = 62,
Zrange = 63,
Zcard = 64,
Zcount = 65,
ZIncrBy = 66,
ZScore = 67,
Type = 68,
HLen = 69,
Echo = 70,
ZPopMin = 71,
Strlen = 72,
Lindex = 73,
ZPopMax = 74,
XRead = 75,
XAdd = 76,
XReadGroup = 77,
XAck = 78,
XTrim = 79,
XGroupCreate = 80,
XGroupDestroy = 81,
HSetNX = 82,
SIsMember = 83,
Hvals = 84,
PTTL = 85,
ZRemRangeByRank = 86,
Persist = 87,
ZRemRangeByScore = 88,
Time = 89,
Zrank = 90,
Rename = 91,
DBSize = 92,
Brpop = 93,
Hkeys = 94,
PfAdd = 96,
PfCount = 97,
PfMerge = 98,
Blpop = 100,
RPushX = 102,
LPushX = 103,
}

#endregion
}
30 changes: 14 additions & 16 deletions csharp/lib/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ internal class Message<T> : INotifyCompletion
/// know how to find the message and set its result.
public int Index { get; }

/// The pointer to the unmanaged memory that contains the operation's key.
public IntPtr KeyPtr { get; private set; }

/// The pointer to the unmanaged memory that contains the operation's key.
public IntPtr ValuePtr { get; private set; }
/// The array holding the pointers to the unmanaged memory that contains the operation's arguments.
public IntPtr[]? args { get; private set; }
// We need to save the args count, because sometimes we get arrays that are larger than they need to be. We can't rely on `this.args.Length`, due to it coming from an array pool.
private int argsCount;
private readonly MessageContainer<T> container;

public Message(int index, MessageContainer<T> container)
Expand Down Expand Up @@ -84,30 +83,29 @@ private void CheckRaceAndCallContinuation()
/// This returns a task that will complete once SetException / SetResult are called,
/// and ensures that the internal state of the message is set-up before the task is created,
/// and cleaned once it is complete.
public void StartTask(string? key, string? value, object client)
public void SetupTask(IntPtr[] args, int argsCount, object client)
{
continuation = null;
this.completionState = COMPLETION_STAGE_STARTED;
this.result = default(T);
this.exception = null;
this.client = client;
this.KeyPtr = key is null ? IntPtr.Zero : Marshal.StringToHGlobalAnsi(key);
this.ValuePtr = value is null ? IntPtr.Zero : Marshal.StringToHGlobalAnsi(value);
this.args = args;
this.argsCount = argsCount;
}

// This function isn't thread-safe. Access to it should be from a single thread, and only once per operation.
// For the sake of performance, this responsibility is on the caller, and the function doesn't contain any safety measures.
private void FreePointers()
{
if (KeyPtr != IntPtr.Zero)
{
Marshal.FreeHGlobal(KeyPtr);
KeyPtr = IntPtr.Zero;
}
if (ValuePtr != IntPtr.Zero)
if (this.args is not null)
{
Marshal.FreeHGlobal(ValuePtr);
ValuePtr = IntPtr.Zero;
for (var i = 0; i < this.argsCount; i++)
{
Marshal.FreeHGlobal(this.args[i]);
}
this.args = null;
this.argsCount = 0;
}
client = null;
}
Expand Down
4 changes: 2 additions & 2 deletions csharp/lib/MessageContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ internal class MessageContainer<T>
{
internal Message<T> GetMessage(int index) => messages[index];

internal Message<T> GetMessageForCall(string? key, string? value)
internal Message<T> GetMessageForCall(IntPtr[] args, int argsCount)
{
var message = GetFreeMessage();
message.StartTask(key, value, this);
message.SetupTask(args, argsCount, this);
return message;
}

Expand Down
67 changes: 25 additions & 42 deletions csharp/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
*/
use glide_core::client;
use glide_core::client::Client as GlideClient;
use redis::{Cmd, FromRedisValue, RedisResult};
use glide_core::request_type::RequestType;
use redis::{FromRedisValue, RedisResult};
use std::{
ffi::{c_void, CStr, CString},
os::raw::c_char,
Expand Down Expand Up @@ -91,61 +92,43 @@ pub extern "C" fn close_client(client_ptr: *const c_void) {

/// Expects that key and value will be kept valid until the callback is called.
#[no_mangle]
pub extern "C" fn set(
pub extern "C" fn command(
client_ptr: *const c_void,
callback_index: usize,
key: *const c_char,
value: *const c_char,
request_type: RequestType,
args: *const *mut c_char,
arg_count: u32,
) {
let client = unsafe { Box::leak(Box::from_raw(client_ptr as *mut Client)) };
// The safety of this needs to be ensured by the calling code. Cannot dispose of the pointer before all operations have completed.
let ptr_address = client_ptr as usize;

let key_cstring = unsafe { CStr::from_ptr(key as *mut c_char) };
let value_cstring = unsafe { CStr::from_ptr(value as *mut c_char) };
let mut client_clone = client.client.clone();
client.runtime.spawn(async move {
let key_bytes = key_cstring.to_bytes();
let value_bytes = value_cstring.to_bytes();
let mut cmd = Cmd::new();
cmd.arg("SET").arg(key_bytes).arg(value_bytes);
let result = client_clone.send_command(&cmd, None).await;
unsafe {
let client = Box::leak(Box::from_raw(ptr_address as *mut Client));
match result {
Ok(_) => (client.success_callback)(callback_index, std::ptr::null()), // TODO - should return "OK" string.
Err(_) => (client.failure_callback)(callback_index), // TODO - report errors
};
}
});
}

/// Expects that key will be kept valid until the callback is called. If the callback is called with a string pointer, the pointer must
/// be used synchronously, because the string will be dropped after the callback.
#[no_mangle]
pub extern "C" fn get(client_ptr: *const c_void, callback_index: usize, key: *const c_char) {
let client = unsafe { Box::leak(Box::from_raw(client_ptr as *mut Client)) };
// The safety of this needs to be ensured by the calling code. Cannot dispose of the pointer before all operations have completed.
// The safety of these needs to be ensured by the calling code. Cannot dispose of the pointer before all operations have completed.
let ptr_address = client_ptr as usize;
let args_address = args as usize;

let key_cstring = unsafe { CStr::from_ptr(key as *mut c_char) };
let mut client_clone = client.client.clone();
client.runtime.spawn(async move {
let key_bytes = key_cstring.to_bytes();
let mut cmd = Cmd::new();
cmd.arg("GET").arg(key_bytes);
let result = client_clone.send_command(&cmd, None).await;
let client = unsafe { Box::leak(Box::from_raw(ptr_address as *mut Client)) };
let value = match result {
Ok(value) => value,
Err(_) => {
unsafe { (client.failure_callback)(callback_index) }; // TODO - report errors,
let Some(mut cmd) = request_type.get_command() else {
unsafe {
let client = Box::leak(Box::from_raw(ptr_address as *mut Client));
(client.failure_callback)(callback_index); // TODO - report errors
return;
}
};
let result = Option::<CString>::from_owned_redis_value(value);

let args_slice = unsafe {
std::slice::from_raw_parts(args_address as *const *mut c_char, arg_count as usize)
};
for arg in args_slice {
let c_str = unsafe { CStr::from_ptr(*arg as *mut c_char) };
cmd.arg(c_str.to_bytes());
}

let result = client_clone
.send_command(&cmd, None)
.await
.and_then(Option::<CString>::from_owned_redis_value);
unsafe {
let client = Box::leak(Box::from_raw(ptr_address as *mut Client));
match result {
Ok(None) => (client.success_callback)(callback_index, std::ptr::null()),
Ok(Some(c_str)) => (client.success_callback)(callback_index, c_str.as_ptr()),
Expand Down
Loading

0 comments on commit 21ab3f0

Please sign in to comment.