Skip to content

Commit

Permalink
RESP3 versions of ZRANGE and HGETALL (microsoft#503)
Browse files Browse the repository at this point in the history
* RESP3 versions of ZRANGE and HGETALL
  • Loading branch information
badrishc authored Jun 29, 2024
1 parent 96a7c75 commit 9fcc1b6
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 55 deletions.
60 changes: 60 additions & 0 deletions libs/common/RespWriteUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,37 @@ public static bool TryWriteDoubleBulkString(double value, ref byte* curr, byte*
return true;
}

/// <summary>
/// Try to write a double-precision floating-point <paramref name="value"/> as bulk string.
/// </summary>
/// <returns><see langword="true"/> if the <paramref name="value"/> could be written to <paramref name="curr"/>; <see langword="false"/> otherwise.</returns>
[SkipLocalsInit]
public static bool TryWriteDoubleNumeric(double value, ref byte* curr, byte* end)
{
if (double.IsNaN(value))
{
return TryWriteNaN_Numeric(value, ref curr, end);
}
else if (double.IsInfinity(value))
{
return TryWriteInfinity_Numeric(value, ref curr, end);
}

Span<byte> buffer = stackalloc byte[32];
if (!Utf8Formatter.TryFormat(value, buffer, out var bytesWritten, format: default))
return false;

var itemDigits = NumUtils.NumDigits(bytesWritten);
int totalLen = 1 + bytesWritten + 2;
if (totalLen > (int)(end - curr))
return false;

*curr++ = (byte)',';
buffer.Slice(0, bytesWritten).CopyTo(new Span<byte>(curr, bytesWritten));
curr += bytesWritten;
WriteNewline(ref curr);
return true;
}

[MethodImpl(MethodImplOptions.NoInlining)]
private static bool TryWriteInfinity(double value, ref byte* curr, byte* end)
Expand All @@ -508,6 +539,25 @@ private static bool TryWriteInfinity(double value, ref byte* curr, byte* end)
return true;
}

[MethodImpl(MethodImplOptions.NoInlining)]
private static bool TryWriteInfinity_Numeric(double value, ref byte* curr, byte* end)
{
var buffer = new Span<byte>(curr, (int)(end - curr));
if (double.IsPositiveInfinity(value))
{
if (!",+inf\r\n"u8.TryCopyTo(buffer))
return false;
}
else
{
if (!",-inf\r\n"u8.TryCopyTo(buffer))
return false;
}

curr += 7;
return true;
}

[MethodImpl(MethodImplOptions.NoInlining)]
private static bool TryWriteNaN(double value, ref byte* curr, byte* end)
{
Expand All @@ -518,6 +568,16 @@ private static bool TryWriteNaN(double value, ref byte* curr, byte* end)
return true;
}

[MethodImpl(MethodImplOptions.NoInlining)]
private static bool TryWriteNaN_Numeric(double value, ref byte* curr, byte* end)
{
var buffer = new Span<byte>(curr, (int)(end - curr));
if (!",nan\r\n"u8.TryCopyTo(buffer))
return false;
curr += 6;
return true;
}

/// <summary>
/// Create header for *Scan output
/// *scan commands have an array of two elements
Expand Down
8 changes: 4 additions & 4 deletions libs/server/Objects/Hash/HashObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou
fixed (byte* _input = input.AsSpan())
fixed (byte* _output = output.SpanByte.AsSpan())
{
var header = (RespInputHeader*)_input;
if (header->type != GarnetObjectType.Hash)
var header = (ObjectInputHeader*)_input;
if (header->header.type != GarnetObjectType.Hash)
{
//Indicates when there is an incorrect type
output.Length = 0;
Expand All @@ -125,7 +125,7 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou
}

var previousSize = this.Size;
switch (header->HashOp)
switch (header->header.HashOp)
{
case HashOperation.HSET:
HashSet(_input, input.Length, _output);
Expand All @@ -140,7 +140,7 @@ public override unsafe bool Operate(ref SpanByte input, ref SpanByteAndMemory ou
HashMultipleGet(_input, input.Length, ref output);
break;
case HashOperation.HGETALL:
HashGetAll(ref output);
HashGetAll(respProtocolVersion: header->arg1, ref output);
break;
case HashOperation.HDEL:
HashDelete(_input, input.Length, _output);
Expand Down
15 changes: 11 additions & 4 deletions libs/server/Objects/Hash/HashObjectImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using Garnet.common;
using Tsavorite.core;
Expand Down Expand Up @@ -123,7 +122,7 @@ private void HashMultipleGet(byte* input, int length, ref SpanByteAndMemory outp
}
}

private void HashGetAll(ref SpanByteAndMemory output)
private void HashGetAll(int respProtocolVersion, ref SpanByteAndMemory output)
{
var isMemory = false;
MemoryHandle ptrHandle = default;
Expand All @@ -135,8 +134,16 @@ private void HashGetAll(ref SpanByteAndMemory output)
ObjectOutputHeader _output = default;
try
{
while (!RespWriteUtils.WriteArrayLength(hash.Count * 2, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
if (respProtocolVersion < 3)
{
while (!RespWriteUtils.WriteArrayLength(hash.Count * 2, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}
else
{
while (!RespWriteUtils.WriteMapLength(hash.Count, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}

foreach (var item in hash)
{
Expand Down
86 changes: 42 additions & 44 deletions libs/server/Objects/SortedSet/SortedSetObjectImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using Garnet.common;
using Tsavorite.core;
Expand Down Expand Up @@ -323,6 +322,7 @@ private void SortedSetRange(byte* input, int length, ref SpanByteAndMemory outpu
//ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
var _input = (ObjectInputHeader*)input;
int count = _input->arg1;
int respProtocolVersion = _input->arg2;

byte* input_startptr = input + sizeof(ObjectInputHeader);
byte* input_currptr = input_startptr;
Expand Down Expand Up @@ -410,23 +410,9 @@ private void SortedSetRange(byte* input, int length, ref SpanByteAndMemory outpu

if (options.ByScore)
{

var scoredElements = GetElementsInRangeByScore(minValue, maxValue, minExclusive, maxExclusive, options.WithScores, options.Reverse, options.ValidLimit, false, options.Limit);

// write the size of the array reply
while (!RespWriteUtils.WriteArrayLength(options.WithScores ? scoredElements.Count * 2 : scoredElements.Count, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);

foreach (var (score, element) in scoredElements)
{
while (!RespWriteUtils.WriteBulkString(element, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
if (options.WithScores)
{
while (!RespWriteUtils.TryWriteDoubleBulkString(score, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}
}
WriteSortedSetResult(options.WithScores, scoredElements.Count, respProtocolVersion, scoredElements, ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
countDone = _input->arg1;
}
else
Expand Down Expand Up @@ -469,20 +455,7 @@ private void SortedSetRange(byte* input, int length, ref SpanByteAndMemory outpu
var iterator = options.Reverse ? sortedSet.Reverse() : sortedSet;
iterator = iterator.Skip(minIndex).Take(n);

// write the size of the array reply
while (!RespWriteUtils.WriteArrayLength(options.WithScores ? n * 2 : n, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);

foreach (var (score, element) in iterator)
{
while (!RespWriteUtils.WriteBulkString(element, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
if (options.WithScores)
{
while (!RespWriteUtils.TryWriteDoubleBulkString(score, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}
}
WriteSortedSetResult(options.WithScores, n, respProtocolVersion, iterator, ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
countDone = _input->arg1;
}
}
Expand All @@ -502,20 +475,7 @@ private void SortedSetRange(byte* input, int length, ref SpanByteAndMemory outpu
}
else
{
//write the size of the array reply
while (!RespWriteUtils.WriteArrayLength(options.WithScores ? elementsInLex.Count * 2 : elementsInLex.Count, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);

foreach (var (score, element) in elementsInLex)
{
while (!RespWriteUtils.WriteBulkString(element, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
if (options.WithScores)
{
while (!RespWriteUtils.TryWriteDoubleBulkString(score, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}
}
WriteSortedSetResult(options.WithScores, elementsInLex.Count, respProtocolVersion, elementsInLex, ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
countDone = _input->arg1;
}
}
Expand All @@ -531,6 +491,44 @@ private void SortedSetRange(byte* input, int length, ref SpanByteAndMemory outpu
}
}

void WriteSortedSetResult(bool withScores, int count, int respProtocolVersion, IEnumerable<(double, byte[])> iterator, ref SpanByteAndMemory output, ref bool isMemory, ref byte* ptr, ref MemoryHandle ptrHandle, ref byte* curr, ref byte* end)
{
if (withScores && respProtocolVersion >= 3)
{
// write the size of the array reply
while (!RespWriteUtils.WriteArrayLength(count, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);

foreach (var (score, element) in iterator)
{
while (!RespWriteUtils.WriteArrayLength(2, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);

while (!RespWriteUtils.WriteBulkString(element, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
while (!RespWriteUtils.TryWriteDoubleNumeric(score, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}
}
else
{
// write the size of the array reply
while (!RespWriteUtils.WriteArrayLength(withScores ? count * 2 : count, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);

foreach (var (score, element) in iterator)
{
while (!RespWriteUtils.WriteBulkString(element, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
if (withScores)
{
while (!RespWriteUtils.TryWriteDoubleBulkString(score, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}
}
}
}

private void SortedSetRangeByScore(byte* input, int length, ref SpanByteAndMemory output)
{
SortedSetRange(input, length, ref output);
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/Objects/HashCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ private bool HashGetAll<TGarnetApi>(RespCommand command, int count, byte* ptr, r
inputPtr->header.type = GarnetObjectType.Hash;
inputPtr->header.flags = 0;
inputPtr->header.HashOp = HashOperation.HGETALL;
inputPtr->arg1 = respProtocolVersion;

// Prepare GarnetObjectStore output
var outputFooter = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(dcurr, (int)(dend - dcurr)) };
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/Objects/SortedSetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ private unsafe bool SortedSetRange<TGarnetApi>(RespCommand command, int count, b
inputPtr->header.flags = 0;
inputPtr->header.SortedSetOp = op;
inputPtr->arg1 = count - 1;
inputPtr->arg2 = respProtocolVersion;

var outputFooter = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(dcurr, (int)(dend - dcurr)) };

Expand Down
15 changes: 13 additions & 2 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,25 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived)
// Send message and dispose the network sender to end the session
if (dcurr > networkSender.GetResponseObjectHead())
Send(networkSender.GetResponseObjectHead());
networkSender.Dispose();

// The session is no longer usable, dispose it
networkSender.DisposeNetworkSender(true);
}
catch (GarnetException ex)
{
sessionMetrics?.incr_total_number_resp_server_session_exceptions(1);
logger?.Log(ex.LogLevel, ex, "ProcessMessages threw a GarnetException:");

// Forward Garnet error as RESP error
while (!RespWriteUtils.WriteError($"ERR Garnet Exception: {ex.Message}", ref dcurr, dend))
SendAndReset();

// Send message and dispose the network sender to end the session
if (dcurr > networkSender.GetResponseObjectHead())
Send(networkSender.GetResponseObjectHead());

// The session is no longer usable, dispose it
networkSender.Dispose();
networkSender.DisposeNetworkSender(true);
}
catch (Exception ex)
{
Expand Down
18 changes: 17 additions & 1 deletion libs/server/Storage/Session/ObjectStore/Common.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ sealed partial class StorageSession : IDisposable
unsafe GarnetStatus RMWObjectStoreOperation<TObjectContext>(byte[] key, ArgSlice input, out ObjectOutputHeader output, ref TObjectContext objectStoreContext)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions>
{
if (objectStoreContext.Session is null)
StorageSession.ThrowObjectStoreUninitializedException();

var _input = input.SpanByte;

output = new();
Expand Down Expand Up @@ -48,6 +51,9 @@ unsafe GarnetStatus RMWObjectStoreOperation<TObjectContext>(byte[] key, ArgSlice
GarnetStatus RMWObjectStoreOperationWithOutput<TObjectContext>(byte[] key, ArgSlice input, ref TObjectContext objectStoreContext, ref GarnetObjectStoreOutput outputFooter)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions>
{
if (objectStoreContext.Session is null)
StorageSession.ThrowObjectStoreUninitializedException();

var _input = input.SpanByte;

// Perform RMW on object store
Expand Down Expand Up @@ -75,6 +81,9 @@ GarnetStatus RMWObjectStoreOperationWithOutput<TObjectContext>(byte[] key, ArgSl
GarnetStatus ReadObjectStoreOperationWithOutput<TObjectContext>(byte[] key, ArgSlice input, ref TObjectContext objectStoreContext, ref GarnetObjectStoreOutput outputFooter)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions>
{
if (objectStoreContext.Session is null)
StorageSession.ThrowObjectStoreUninitializedException();

var _input = input.SpanByte;

// Perform read on object store
Expand Down Expand Up @@ -221,8 +230,11 @@ unsafe ArgSlice ProcessRespSingleTokenOutput(GarnetObjectStoreOutput outputFoote
/// <param name="objectStoreContext"></param>
/// <returns></returns>
unsafe GarnetStatus ReadObjectStoreOperation<TObjectContext>(byte[] key, ArgSlice input, out ObjectOutputHeader output, ref TObjectContext objectStoreContext)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions>
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions>
{
if (objectStoreContext.Session is null)
StorageSession.ThrowObjectStoreUninitializedException();

var _input = input.SpanByte;

output = new();
Expand Down Expand Up @@ -256,6 +268,10 @@ public GarnetStatus ObjectScan<TObjectContext>(byte[] key, ArgSlice input, ref G
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, SpanByte, GarnetObjectStoreOutput, long, ObjectStoreFunctions>
=> ReadObjectStoreOperationWithOutput(key, input, ref objectStoreContext, ref outputFooter);

[MethodImpl(MethodImplOptions.NoInlining)]
static void ThrowObjectStoreUninitializedException()
=> throw new GarnetException("Object store is disabled");

#endregion
}
}

0 comments on commit 9fcc1b6

Please sign in to comment.