Skip to content

Commit

Permalink
Support new options in xAdd (#1039)
Browse files Browse the repository at this point in the history
  • Loading branch information
hcwilhelm authored Feb 21, 2025
1 parent 63ddb52 commit b953440
Show file tree
Hide file tree
Showing 8 changed files with 885 additions and 254 deletions.
2 changes: 1 addition & 1 deletion modules/redis-it/src/test/scala/zio/redis/KeysSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ trait KeysSpec extends IntegrationSpec {
key <- uuid
field <- uuid
value <- uuid
_ <- redis.xAdd(key, "*", (field, value)).returning[String]
_ <- redis.xAdd(key, "*")((field, value)).returning[String]
stream <- redis.typeOf(key)
} yield assert(stream)(equalTo(RedisType.Stream))
}
Expand Down
462 changes: 332 additions & 130 deletions modules/redis-it/src/test/scala/zio/redis/StreamsSpec.scala

Large diffs are not rendered by default.

89 changes: 77 additions & 12 deletions modules/redis/src/main/scala/zio/redis/Input.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ object Input {

case object DbInput extends Input[Long] {
def encode(db: Long): RespCommand =
RespCommand(RespCommandArgument.Literal("DB"), RespCommandArgument.Value(db.toString()))
RespCommand(RespCommandArgument.Literal("DB"), RespCommandArgument.Value(db.toString))
}

case object BoolInput extends Input[Boolean] {
Expand Down Expand Up @@ -438,16 +438,71 @@ object Input {
RespCommand(RespCommandArgument.Literal("STORE"), RespCommandArgument.Value(data.key))
}

case object StreamMaxLenInput extends Input[StreamMaxLen] {
def encode(data: StreamMaxLen): RespCommand = {
val chunk =
if (data.approximate) Chunk(RespCommandArgument.Literal("MAXLEN"), RespCommandArgument.Literal("~"))
else Chunk.single(RespCommandArgument.Literal("MAXLEN"))
case object MaxLenApproxInput extends Input[CappedStreamType.MaxLenApprox] {
def encode(data: CappedStreamType.MaxLenApprox): RespCommand = {
val maxLenChunk: Chunk[RespCommandArgument] = Chunk(
RespCommandArgument.Literal("MAXLEN"),
RespCommandArgument.Literal("~"),
RespCommandArgument.Value(data.count.toString)
)

val limitChunk = data.limit.fold(Chunk.empty[RespCommandArgument])(limit =>
Chunk(RespCommandArgument.Literal("LIMIT"), RespCommandArgument.Value(limit.toString))
)

RespCommand(maxLenChunk ++ limitChunk)
}
}

case object MaxLenExactInput extends Input[CappedStreamType.MaxLenExact] {
def encode(data: CappedStreamType.MaxLenExact): RespCommand =
RespCommand(
Chunk(
RespCommandArgument.Literal("MAXLEN"),
RespCommandArgument.Literal("="),
RespCommandArgument.Value(data.count.toString)
)
)
}

final case class MinIdApproxInput[I: BinaryCodec]() extends Input[CappedStreamType.MinIdApprox[I]] {
def encode(data: CappedStreamType.MinIdApprox[I]): RespCommand = {
val minIdChunk =
Chunk(
RespCommandArgument.Literal("MINID"),
RespCommandArgument.Literal("~"),
RespCommandArgument.Value(data.id)
)

RespCommand(chunk :+ RespCommandArgument.Value(data.count.toString))
val limitChunk = data.limit.fold(Chunk.empty[RespCommandArgument])(limit =>
Chunk(RespCommandArgument.Literal("LIMIT"), RespCommandArgument.Value(limit.toString))
)

RespCommand(minIdChunk ++ limitChunk)
}
}

final case class MinIdExactInput[I: BinaryCodec]() extends Input[CappedStreamType.MinIdExact[I]] {
def encode(data: _root_.zio.redis.CappedStreamType.MinIdExact[I]): RespCommand =
RespCommand(
Chunk(
RespCommandArgument.Literal("MINID"),
RespCommandArgument.Literal("="),
RespCommandArgument.Value(data.id)
)
)
}

case object MkStreamInput extends Input[MkStream] {
def encode(data: MkStream): RespCommand =
RespCommand(RespCommandArgument.Value(data.asString))
}

case object NoMkStreamInput extends Input[NoMkStream] {
def encode(data: NoMkStream): RespCommand =
RespCommand(RespCommandArgument.Value(data.asString))
}

final case class StreamsInput[K: BinaryCodec, V: BinaryCodec]() extends Input[((K, V), Chunk[(K, V)])] {
def encode(data: ((K, V), Chunk[(K, V)])): RespCommand = {
val (keys, ids) = (data._1 +: data._2).map { case (key, value) =>
Expand Down Expand Up @@ -619,6 +674,13 @@ object Input {
RespCommand(RespCommandArgument.Literal(data.asString))
}

final case class LastIdInput[I: BinaryCodec]() extends Input[LastId[I]] {
def encode(data: LastId[I]): RespCommand = {
val chunk = Chunk(RespCommandArgument.Literal("LASTID"), RespCommandArgument.Value(data.lastId))
RespCommand(chunk)
}
}

case object WithHashInput extends Input[WithHash] {
def encode(data: WithHash): RespCommand =
RespCommand(RespCommandArgument.Literal(data.asString))
Expand All @@ -639,6 +701,11 @@ object Input {
RespCommand(RespCommandArgument.Literal(data.asString))
}

case object WithEntriesReadInput extends Input[WithEntriesRead] {
def encode(data: WithEntriesRead): RespCommand =
RespCommand(RespCommandArgument.Literal("ENTRIESREAD"), RespCommandArgument.Value(data.entries.toString))
}

final case class XGroupCreateConsumerInput[K: BinaryCodec, G: BinaryCodec, C: BinaryCodec]()
extends Input[XGroupCommand.CreateConsumer[K, G, C]] {
def encode(data: XGroupCommand.CreateConsumer[K, G, C]): RespCommand =
Expand All @@ -652,16 +719,13 @@ object Input {

final case class XGroupCreateInput[K: BinaryCodec, G: BinaryCodec, I: BinaryCodec]()
extends Input[XGroupCommand.Create[K, G, I]] {
def encode(data: XGroupCommand.Create[K, G, I]): RespCommand = {
val chunk = Chunk(
def encode(data: XGroupCommand.Create[K, G, I]): RespCommand =
RespCommand(
RespCommandArgument.Literal("CREATE"),
RespCommandArgument.Key(data.key),
RespCommandArgument.Value(data.group),
RespCommandArgument.Value(data.id)
)

RespCommand(if (data.mkStream) chunk :+ RespCommandArgument.Literal(MkStream.asString) else chunk)
}
}

final case class XGroupDelConsumerInput[K: BinaryCodec, G: BinaryCodec, C: BinaryCodec]()
Expand Down Expand Up @@ -693,5 +757,6 @@ object Input {
RespCommandArgument.Value(data.group),
RespCommandArgument.Value(data.id)
)

}
}
65 changes: 47 additions & 18 deletions modules/redis/src/main/scala/zio/redis/Output.scala
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ object Output {
streamConsumersInfo = streamConsumersInfo.copy(pending = value.value)
else if (key.asString == XInfoFields.Idle)
streamConsumersInfo = streamConsumersInfo.copy(idle = value.value.millis)
else if (key.asString == XInfoFields.Inactive)
streamConsumersInfo = streamConsumersInfo.copy(inactive = value.value.millis)
case _ =>
}
pos += 2
Expand Down Expand Up @@ -476,6 +478,10 @@ object Output {
streamGroupsInfo = streamGroupsInfo.copy(pending = value.value)
else if (key.asString == XInfoFields.Consumers)
streamGroupsInfo = streamGroupsInfo.copy(consumers = value.value)
else if (key.asString == XInfoFields.EntriesRead)
streamGroupsInfo = streamGroupsInfo.copy(entriesRead = value.value)
else if (key.asString == XInfoFields.Lag)
streamGroupsInfo = streamGroupsInfo.copy(lag = value.value)
case _ =>
}
pos += 2
Expand Down Expand Up @@ -507,14 +513,26 @@ object Output {
// Get the basic information of the outermost stream.
case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) =>
key.asString match {
case XInfoFields.Length => streamInfoFull = streamInfoFull.copy(length = value.value)
case XInfoFields.RadixTreeNodes => streamInfoFull = streamInfoFull.copy(radixTreeNodes = value.value)
case XInfoFields.RadixTreeKeys => streamInfoFull = streamInfoFull.copy(radixTreeKeys = value.value)
case XInfoFields.Length =>
streamInfoFull = streamInfoFull.copy(length = value.value)
case XInfoFields.RadixTreeNodes =>
streamInfoFull = streamInfoFull.copy(radixTreeNodes = value.value)
case XInfoFields.RadixTreeKeys =>
streamInfoFull = streamInfoFull.copy(radixTreeKeys = value.value)
case XInfoFields.EntriesAdded =>
streamInfoFull = streamInfoFull.copy(entriesAdded = value.value)
case _ =>
}
case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_))
if key.asString == XInfoFields.LastGeneratedId =>
streamInfoFull = streamInfoFull.copy(lastGeneratedId = value.asString)
case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_)) =>
key.asString match {
case XInfoFields.LastGeneratedId =>
streamInfoFull = streamInfoFull.copy(lastGeneratedId = value.asString)
case XInfoFields.MaxDeletedEntryId =>
streamInfoFull = streamInfoFull.copy(maxDeletedEntryId = value.asString)
case XInfoFields.RecordedFirstEntryId =>
streamInfoFull = streamInfoFull.copy(recordedFirstEntryId = value.asString)
case _ =>
}
case (key @ RespValue.BulkString(_), value) if key.asString == XInfoFields.Entries =>
streamInfoFull = streamInfoFull.copy(entries = StreamEntriesOutput[I, K, V]().unsafeDecode(value))
case (key @ RespValue.BulkString(_), RespValue.Array(values)) if key.asString == XInfoFields.Groups =>
Expand Down Expand Up @@ -544,7 +562,7 @@ object Output {
var pos = 0
while (pos < len) {
(elements(pos), elements(pos + 1)) match {
case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) =>
case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) =>
if (key.asString == XInfoFields.Length)
streamInfo = streamInfo.copy(length = value.value)
else if (key.asString == XInfoFields.RadixTreeNodes)
Expand All @@ -553,15 +571,21 @@ object Output {
streamInfo = streamInfo.copy(radixTreeKeys = value.value)
else if (key.asString == XInfoFields.Groups)
streamInfo = streamInfo.copy(groups = value.value)
case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_))
if key.asString == XInfoFields.LastGeneratedId =>
streamInfo = streamInfo.copy(lastGeneratedId = value.asString)
case (key @ RespValue.BulkString(_), value @ RespValue.Array(_)) =>
else if (key.asString == XInfoFields.EntriesAdded)
streamInfo = streamInfo.copy(entriesAdded = value.value)
case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_)) =>
if (key.asString == XInfoFields.LastGeneratedId)
streamInfo = streamInfo.copy(lastGeneratedId = value.asString)
else if (key.asString == XInfoFields.MaxDeletedEntryId)
streamInfo = streamInfo.copy(maxDeletedEntryId = value.asString)
else if (key.asString == XInfoFields.RecordedFirstEntryId)
streamInfo = streamInfo.copy(recordedFirstEntryId = value.asString)
case (key @ RespValue.BulkString(_), value @ RespValue.Array(_)) =>
if (key.asString == XInfoFields.FirstEntry)
streamInfo = streamInfo.copy(firstEntry = Some(StreamEntryOutput[I, K, V]().unsafeDecode(value)))
else if (key.asString == XInfoFields.LastEntry)
streamInfo = streamInfo.copy(lastEntry = Some(StreamEntryOutput[I, K, V]().unsafeDecode(value)))
case _ =>
case _ =>
}
pos += 2
}
Expand Down Expand Up @@ -767,9 +791,13 @@ object Output {
case XInfoFields.Name => readyGroup = readyGroup.copy(name = value.asString)
case _ =>
}
case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_))
if XInfoFields.PelCount == key.asString =>
readyGroup = readyGroup.copy(pelCount = value.value)
case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) =>
key.asString match {
case XInfoFields.PelCount => readyGroup = readyGroup.copy(pelCount = value.value)
case XInfoFields.Lag => readyGroup = readyGroup.copy(lag = value.value)
case XInfoFields.EntriesRead => readyGroup = readyGroup.copy(entriesRead = value.value)
case _ =>
}
case (key @ RespValue.BulkString(_), RespValue.Array(values)) =>
// Get the consumer list of the current group.
key.asString match {
Expand Down Expand Up @@ -821,9 +849,10 @@ object Output {
readyConsumer = readyConsumer.copy(name = value.asString)
case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) =>
key.asString match {
case XInfoFields.PelCount => readyConsumer = readyConsumer.copy(pelCount = value.value)
case XInfoFields.SeenTime => readyConsumer = readyConsumer.copy(seenTime = value.value.millis)
case _ =>
case XInfoFields.PelCount => readyConsumer = readyConsumer.copy(pelCount = value.value)
case XInfoFields.SeenTime => readyConsumer = readyConsumer.copy(seenTime = value.value.millis)
case XInfoFields.ActiveTime => readyConsumer = readyConsumer.copy(activeTime = value.value.millis)
case _ =>
}
// Get the pel list of the current consumer.
case (key @ RespValue.BulkString(_), RespValue.Array(values)) if key.asString == XInfoFields.Pending =>
Expand Down
Loading

0 comments on commit b953440

Please sign in to comment.