From b9534402a421bef6906931efcef72586f6fef286 Mon Sep 17 00:00:00 2001 From: Hans Christian Wilhelm Date: Fri, 21 Feb 2025 14:19:37 +0100 Subject: [PATCH] Support new options in xAdd (#1039) --- .../src/test/scala/zio/redis/KeysSpec.scala | 2 +- .../test/scala/zio/redis/StreamsSpec.scala | 462 +++++++++++++----- .../src/main/scala/zio/redis/Input.scala | 89 +++- .../src/main/scala/zio/redis/Output.scala | 65 ++- .../main/scala/zio/redis/api/Streams.scala | 287 +++++++++-- .../scala/zio/redis/options/Streams.scala | 86 +++- .../src/test/scala/zio/redis/InputSpec.scala | 53 +- .../src/test/scala/zio/redis/OutputSpec.scala | 95 +++- 8 files changed, 885 insertions(+), 254 deletions(-) diff --git a/modules/redis-it/src/test/scala/zio/redis/KeysSpec.scala b/modules/redis-it/src/test/scala/zio/redis/KeysSpec.scala index 1518696c0..85147ae61 100644 --- a/modules/redis-it/src/test/scala/zio/redis/KeysSpec.scala +++ b/modules/redis-it/src/test/scala/zio/redis/KeysSpec.scala @@ -418,7 +418,7 @@ trait KeysSpec extends IntegrationSpec { key <- uuid field <- uuid value <- uuid - _ <- redis.xAdd(key, "*", (field, value)).returning[String] + _ <- redis.xAdd(key, "*")((field, value)).returning[String] stream <- redis.typeOf(key) } yield assert(stream)(equalTo(RedisType.Stream)) } diff --git a/modules/redis-it/src/test/scala/zio/redis/StreamsSpec.scala b/modules/redis-it/src/test/scala/zio/redis/StreamsSpec.scala index 01a57e1b9..c104408f4 100644 --- a/modules/redis-it/src/test/scala/zio/redis/StreamsSpec.scala +++ b/modules/redis-it/src/test/scala/zio/redis/StreamsSpec.scala @@ -7,7 +7,7 @@ import zio.test.TestAspect.{flaky, ignore} import zio.test._ trait StreamsSpec extends IntegrationSpec { - def streamsSuite: Spec[Redis, RedisError] = + def streamsSuite: Spec[Redis, Any] = suite("streams")( suite("xAck")( test("one message") { @@ -17,7 +17,7 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, consumer)(stream -> ">").returning[String, String] result <- redis.xAck(stream, group, id) } yield assert(result)(equalTo(1L)) @@ -29,8 +29,8 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - first <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - second <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + first <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + second <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, consumer)(stream -> ">").returning[String, String] result <- redis.xAck(stream, group, first, second) } yield assert(result)(equalTo(2L)) @@ -49,7 +49,7 @@ trait StreamsSpec extends IntegrationSpec { redis <- ZIO.service[Redis] stream <- uuid group <- uuid - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String] result <- redis.xAck(stream, group, id) } yield assert(result)(equalTo(0L)) }, @@ -60,7 +60,7 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] _ <- redis.xReadGroup(group, consumer)(stream -> ">").returning[String, String] result <- redis.xAck(stream, group, "0-0") } yield assert(result)(equalTo(0L)) @@ -92,23 +92,23 @@ trait StreamsSpec extends IntegrationSpec { redis <- ZIO.service[Redis] stream <- uuid id = "1-0" - result <- redis.xAdd(stream, id, "a" -> "b").returning[String] - } yield assert(result)(equalTo(id)) + result <- redis.xAdd(stream, id)("a" -> "b").returning[String] + } yield assert(result)(equalTo(Some(id))) }, test("object with multiple fields") { for { redis <- ZIO.service[Redis] stream <- uuid id = "1-0" - result <- redis.xAdd(stream, id, "a" -> "b", "c" -> "d").returning[String] - } yield assert(result)(equalTo(id)) + result <- redis.xAdd(stream, id)("a" -> "b", "c" -> "d").returning[String] + } yield assert(result)(equalTo(Some(id))) }, test("error when ID should be greater") { for { redis <- ZIO.service[Redis] stream <- uuid id = "0-0" - result <- redis.xAdd(stream, id, "a" -> "b").returning[String].either + result <- redis.xAdd(stream, id)("a" -> "b").returning[String].either } yield assert(result)(isLeft(isSubtype[ProtocolError](anything))) }, test("error when invalid ID format") { @@ -116,7 +116,7 @@ trait StreamsSpec extends IntegrationSpec { redis <- ZIO.service[Redis] stream <- uuid id <- uuid - result <- redis.xAdd(stream, id, "a" -> "b").returning[String].either + result <- redis.xAdd(stream, id)("a" -> "b").returning[String].either } yield assert(result)(isLeft(isSubtype[ProtocolError](anything))) }, test("error when not stream") { @@ -125,8 +125,15 @@ trait StreamsSpec extends IntegrationSpec { nonStream <- uuid id = "1-0" _ <- redis.set(nonStream, "value") - result <- redis.xAdd(nonStream, id, "a" -> "b").returning[String].either + result <- redis.xAdd(nonStream, id)("a" -> "b").returning[String].either } yield assert(result)(isLeft(isSubtype[WrongType](anything))) + }, + test("Null reply with NOMKSTREAM option") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + result <- redis.xAdd(stream, "*", noMakeStream = true)("a" -> "b").returning[String] + } yield assert(result)(isNone) } ), suite("xAddWithMaxLen")( @@ -135,7 +142,15 @@ trait StreamsSpec extends IntegrationSpec { redis <- ZIO.service[Redis] stream <- uuid id = "1-0" - result <- redis.xAddWithMaxLen(stream, id, 10)("a" -> "b").returning[String] + result <- redis.xAddWithMaxLen(stream, id, 10)("a" -> "b").returning[String].some + } yield assert(result)(equalTo(id)) + }, + test("with positive count and without approximate but with limit") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + id = "1-0" + result <- redis.xAddWithMaxLen(stream, id, 10, limit = Some(100L))("a" -> "b").returning[String].some } yield assert(result)(equalTo(id)) }, test("with positive count and with approximate") { @@ -143,7 +158,18 @@ trait StreamsSpec extends IntegrationSpec { redis <- ZIO.service[Redis] stream <- uuid id = "1-0" - result <- redis.xAddWithMaxLen(stream, id, 10, approximate = true)("a" -> "b").returning[String] + result <- redis.xAddWithMaxLen(stream, id, 10, approximate = true)("a" -> "b").returning[String].some + } yield assert(result)(equalTo(id)) + }, + test("with positive count and with approximate and limit") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + id = "1-0" + result <- redis + .xAddWithMaxLen(stream, id, 10, approximate = true, limit = Some(100L))("a" -> "b") + .returning[String] + .some } yield assert(result)(equalTo(id)) }, test("error with negative count and without approximate") { @@ -161,6 +187,61 @@ trait StreamsSpec extends IntegrationSpec { id = "1-0" result <- redis.xAddWithMaxLen(stream, id, -10, approximate = true)("a" -> "b").returning[String].either } yield assert(result)(isLeft(isSubtype[ProtocolError](anything))) + }, + test("Null reply with NOMKSTREAM option") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + result <- redis + .xAddWithMaxLen(stream, "*", 10, approximate = true, noMakeStream = true)("a" -> "b") + .returning[String] + } yield assert(result)(isNone) + } + ), + suite("xAddWithMinId")( + test("with positive minId and without approximate") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + id = "1-0" + result <- redis.xAddWithMinId(stream, id, "0-0")("a" -> "b").returning[String].some + } yield assert(result)(equalTo(id)) + }, + test("with positive minId and without approximate but with limit") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + id = "1-0" + result <- redis.xAddWithMinId(stream, id, "0-0", limit = Some(100L))("a" -> "b").returning[String].some + } yield assert(result)(equalTo(id)) + }, + test("with positive minId and with approximate") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + id = "1-0" + result <- redis.xAddWithMinId(stream, id, "0-0", approximate = true)("a" -> "b").returning[String].some + } yield assert(result)(equalTo(id)) + }, + test("with positive minId and with approximate and limit") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + id = "1-0" + result <- redis + .xAddWithMinId(stream, id, "0-0", approximate = true, limit = Some(100L))("a" -> "b") + .returning[String] + .some + } yield assert(result)(equalTo(id)) + }, + test("Null reply with NOMKSTREAM option") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + result <- redis + .xAddWithMinId(stream, "*", "0-0", approximate = true, noMakeStream = true)("a" -> "b") + .returning[String] + } yield assert(result)(isNone) } ), suite("xAutoClaim")( @@ -172,7 +253,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xAutoClaim(stream, group, second, 0.millis)(id).returning[String, String, String] } yield assertTrue(result.streamId == "0-0", result.entries == Chunk(StreamEntry(id, Map("a" -> "b")))) @@ -185,8 +266,8 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - id1 <- redis.xAdd(stream, "*", "c" -> "d", "e" -> "f").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + id1 <- redis.xAdd(stream, "*")("c" -> "d", "e" -> "f").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xAutoClaim(stream, group, second, 0.millis)(id).returning[String, String, String] } yield assert(result.entries)( @@ -210,7 +291,7 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xAutoClaim(stream, group, second, 0.millis)(id).returning[String, String, String] } yield assertTrue(result.streamId == "0-0", result.entries.isEmpty, result.deletedIds.isEmpty) }, @@ -232,7 +313,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xAutoClaim(stream, group, second, 360000.millis)(id).returning[String, String, String] } yield assertTrue(result.streamId == "0-0", result.entries.isEmpty, result.deletedIds.isEmpty) @@ -245,7 +326,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xAutoClaim(stream, group, second, (-360000).millis)(id).returning[String, String, String] } yield assertTrue(result.streamId == "0-0", result.deletedIds.isEmpty) && assert(result.entries)( @@ -262,7 +343,7 @@ trait StreamsSpec extends IntegrationSpec { randomChars <- ZIO.loop(0)(_ < 100, _ + 1)(_ => zio.Random.nextPrintableChar) entries = randomChars.map(_.toString).map(char => char -> char) _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "a", entries: _*).returning[String] + id <- redis.xAdd(stream, "*")("a" -> "a", entries: _*).returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis .xAutoClaim(stream, group, second, 0.millis, count = Some(Count(1L)))(id) @@ -277,7 +358,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "a").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "a").returning[String] _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis .xAutoClaim(stream, group, second, 0.millis, count = Some(Count(-1L)))(id) @@ -306,7 +387,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xAutoClaimWithJustId(stream, group, second, 0.millis)(id).returning[String] } yield assertTrue(result.streamId == "0-0") && assert(result.claimedIds)(hasSameElements(List(id))) @@ -319,8 +400,8 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - id1 <- redis.xAdd(stream, "*", "c" -> "d", "e" -> "f").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + id1 <- redis.xAdd(stream, "*")("c" -> "d", "e" -> "f").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xAutoClaimWithJustId(stream, group, second, 0.millis)(id).returning[String] } yield assertTrue(result.streamId == "0-0") && assert(result.claimedIds)(hasSameElements(List(id, id1))) @@ -342,7 +423,7 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xAutoClaimWithJustId(stream, group, second, 0.millis)(id).returning[String] } yield assertTrue(result.streamId == "0-0", result.claimedIds.isEmpty, result.deletedIds.isEmpty) }, @@ -363,7 +444,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xAutoClaimWithJustId(stream, group, second, 360000.millis)(id).returning[String] } yield assertTrue(result.streamId == "0-0", result.claimedIds.isEmpty, result.deletedIds.isEmpty) @@ -376,7 +457,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xAutoClaimWithJustId(stream, group, second, (-360000).millis)(id).returning[String] } yield assert(result.claimedIds)(hasSameElements(Chunk.single(id))) @@ -391,7 +472,7 @@ trait StreamsSpec extends IntegrationSpec { randomChars <- ZIO.loop(0)(_ < 100, _ + 1)(_ => zio.Random.nextPrintableChar) entries = randomChars.map(_.toString).map(char => char -> char) _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "a", entries: _*).returning[String] + id <- redis.xAdd(stream, "*")("a" -> "a", entries: _*).returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xAutoClaimWithJustId(stream, group, second, 0.millis, count = Some(Count(1L)))(id).returning[String] @@ -405,7 +486,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "a").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "a").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis .xAutoClaimWithJustId(stream, group, second, 0.millis, count = Some(Count(-1L)))(id) @@ -436,7 +517,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaim(stream, group, second, 0.millis)(id).returning[String, String] } yield assert(result)(equalTo(Chunk(StreamEntry(id, Map("a" -> "b"))))) @@ -449,8 +530,8 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - id1 <- redis.xAdd(stream, "*", "c" -> "d", "e" -> "f").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + id1 <- redis.xAdd(stream, "*")("c" -> "d", "e" -> "f").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaim(stream, group, second, 0.millis)(id, id1).returning[String, String] } yield assert(result)( @@ -474,7 +555,7 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xClaim(stream, group, second, 0.millis)(id).returning[String, String] } yield assert(result)(isEmpty) }, @@ -495,7 +576,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaim(stream, group, second, 360000.millis)(id).returning[String, String] } yield assert(result)(isEmpty) @@ -508,7 +589,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaim(stream, group, second, (-360000).millis)(id).returning[String, String] } yield assert(result)(equalTo(Chunk(StreamEntry(id, Map("a" -> "b"))))) @@ -521,7 +602,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaim(stream, group, second, 0.millis, Some(360000.millis))(id).returning[String, String] } yield assert(result)(equalTo(Chunk(StreamEntry(id, Map("a" -> "b"))))) @@ -534,7 +615,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaim(stream, group, second, 0.millis, Some((-360000).millis))(id).returning[String, String] @@ -548,12 +629,28 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaim(stream, group, second, 0.millis, time = Some(360000.millis))(id).returning[String, String] } yield assert(result)(equalTo(Chunk(StreamEntry(id, Map("a" -> "b"))))) }, + test("with positive time and lastId") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + group <- uuid + first <- uuid + second <- uuid + _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] + result <- + redis + .xClaim(stream, group, second, 0.millis, time = Some(360000.millis), lastId = Some("0-1"))(id) + .returning[String, String] + } yield assert(result)(equalTo(Chunk(StreamEntry(id, Map("a" -> "b"))))) + }, test("with negative time") { for { redis <- ZIO.service[Redis] @@ -562,7 +659,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaim(stream, group, second, 0.millis, time = Some((-360000).millis))(id).returning[String, String] @@ -576,7 +673,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaim(stream, group, second, 0.millis, retryCount = Some(3))(id).returning[String, String] } yield assert(result)(equalTo(Chunk(StreamEntry(id, Map("a" -> "b"))))) @@ -589,7 +686,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaim(stream, group, second, 0.millis, retryCount = Some(-3))(id).returning[String, String] } yield assert(result)(equalTo(Chunk(StreamEntry(id, Map("a" -> "b"))))) @@ -601,7 +698,7 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xClaim(stream, group, consumer, 0.millis, force = true)(id).returning[String, String] } yield assert(result)(equalTo(Chunk(StreamEntry(id, Map("a" -> "b"))))) }, @@ -626,7 +723,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaimWithJustId(stream, group, second, 0.millis)(id).returning[String] } yield assert(result)(hasSameElements(Chunk.single(id))) @@ -639,12 +736,27 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - id1 <- redis.xAdd(stream, "*", "c" -> "d", "e" -> "f").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + id1 <- redis.xAdd(stream, "*")("c" -> "d", "e" -> "f").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaimWithJustId(stream, group, second, 0.millis)(id, id1).returning[String] } yield assert(result)(hasSameElements(Chunk(id, id1))) }, + test("multiple pending messages with lastId") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + group <- uuid + first <- uuid + second <- uuid + _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + id1 <- redis.xAdd(stream, "*")("c" -> "d", "e" -> "f").returning[String].some + _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] + result <- + redis.xClaimWithJustId(stream, group, second, 0.millis, lastId = Some(id))(id, id1).returning[String] + } yield assert(result)(hasSameElements(Chunk(id, id1))) + }, test("non-existent message") { for { redis <- ZIO.service[Redis] @@ -662,7 +774,7 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xClaimWithJustId(stream, group, second, 0.millis)(id).returning[String] } yield assert(result)(isEmpty) }, @@ -683,7 +795,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaimWithJustId(stream, group, second, 360000.millis)(id).returning[String] } yield assert(result)(isEmpty) @@ -696,7 +808,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaimWithJustId(stream, group, second, (-360000).millis)(id).returning[String] } yield assert(result)(hasSameElements(Chunk.single(id))) @@ -709,7 +821,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaimWithJustId(stream, group, second, 0.millis, Some(360000.millis))(id).returning[String] } yield assert(result)(hasSameElements(Chunk.single(id))) @@ -722,7 +834,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaimWithJustId(stream, group, second, 0.millis, Some((-360000).millis))(id).returning[String] @@ -736,7 +848,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis .xClaimWithJustId(stream, group, second, 0.millis, time = Some(360000.millis))(id) @@ -751,7 +863,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis .xClaimWithJustId(stream, group, second, 0.millis, time = Some((-360000).millis))(id) @@ -766,7 +878,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaimWithJustId(stream, group, second, 0.millis, retryCount = Some(3))(id).returning[String] @@ -780,7 +892,7 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] result <- redis.xClaimWithJustId(stream, group, second, 0.millis, retryCount = Some(-3))(id).returning[String] @@ -793,7 +905,7 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xClaimWithJustId(stream, group, consumer, 0.millis, force = true)(id).returning[String] } yield assert(result)(hasSameElements(Chunk.single(id))) }, @@ -816,7 +928,7 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xDel(stream, id) } yield assert(result)(equalTo(1L)) }, @@ -844,6 +956,16 @@ trait StreamsSpec extends IntegrationSpec { } yield assert(result)(isLeft(isSubtype[WrongType](anything))) } ), + suite("xGroupCreate")( + test("new group") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + group <- uuid + result <- redis.xGroupCreateLastEntry(stream, group, mkStream = true).either + } yield assert(result)(isRight) + } + ), suite("xGroupCreate")( test("new group") { for { @@ -895,17 +1017,39 @@ trait StreamsSpec extends IntegrationSpec { redis <- ZIO.service[Redis] stream <- uuid group <- uuid - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xGroupCreate(stream, group, "$") result <- redis.xGroupSetId(stream, group, id).either } yield assert(result)(isRight) }, + test("for an existing group and message with entries read option") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + group <- uuid + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] + _ <- redis.xGroupCreate(stream, group, "$") + result <- redis.xGroupSetId(stream, group, id, entriesRead = Some(2L)).either + } yield assert(result)(isRight) + }, + test("for an existing group and message last entry") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + group <- uuid + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + _ <- redis.xGroupCreate(stream, group, "$") + result <- redis.xGroupSetIdLastEntry(stream, group).either + } yield assert(result)(isRight) + }, test("error when non-existent group and an existing message") { for { redis <- ZIO.service[Redis] stream <- uuid group <- uuid - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xGroupSetId(stream, group, id).either } yield assert(result)(isLeft(isSubtype[NoGroup](anything))) }, @@ -942,7 +1086,7 @@ trait StreamsSpec extends IntegrationSpec { redis <- ZIO.service[Redis] stream <- uuid group <- uuid - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] result <- redis.xGroupDestroy(stream, group) } yield assert(result)(isFalse) }, @@ -1013,7 +1157,7 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, consumer)(stream -> ">").returning[String, String] result <- redis.xGroupDelConsumer(stream, group, consumer) } yield assert(result)(equalTo(1L)) @@ -1025,8 +1169,8 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, consumer)(stream -> ">").returning[String, String] result <- redis.xGroupDelConsumer(stream, group, consumer) } yield assert(result)(equalTo(2L)) @@ -1046,7 +1190,7 @@ trait StreamsSpec extends IntegrationSpec { stream <- uuid group <- uuid consumer <- uuid - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xGroupDelConsumer(stream, group, consumer).either } yield assert(result)(isLeft(isSubtype[NoGroup](anything))) }, @@ -1075,7 +1219,7 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] result <- redis.xLen(stream) } yield assert(result)(equalTo(1L)) }, @@ -1105,10 +1249,10 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String] _ <- redis.xReadGroup(group, consumer)(stream -> ">").returning[String, String] result <- redis.xPending(stream, group) - } yield assert(result)(equalTo(PendingInfo(1L, Some(id), Some(id), Map(consumer -> 1L)))) + } yield assert(result)(equalTo(PendingInfo(1L, id, id, Map(consumer -> 1L)))) }, test("with multiple consumers and multiple messages") { for { @@ -1118,13 +1262,13 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - firstMsg <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + firstMsg <- redis.xAdd(stream, "*")("a" -> "b").returning[String] _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] - lastMsg <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + lastMsg <- redis.xAdd(stream, "*")("a" -> "b").returning[String] _ <- redis.xReadGroup(group, second)(stream -> ">").returning[String, String] result <- redis.xPending(stream, group) } yield assert(result)( - equalTo(PendingInfo(2L, Some(firstMsg), Some(lastMsg), Map(first -> 1L, second -> 1L))) + equalTo(PendingInfo(2L, firstMsg, lastMsg, Map(first -> 1L, second -> 1L))) ) }, test("with 0ms idle time") { @@ -1134,7 +1278,7 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, consumer)(stream -> ">").returning[String, String] cons: Option[String] = None result <- redis.xPending(stream, group, "-", "+", 10L, cons, Some(0.millis)) @@ -1150,7 +1294,7 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, consumer)(stream -> ">").returning[String, String] cons: Option[String] = None result <- redis.xPending(stream, group, "-", "+", 10L, cons, Some(1.minute)) @@ -1161,7 +1305,7 @@ trait StreamsSpec extends IntegrationSpec { redis <- ZIO.service[Redis] stream <- uuid group <- uuid - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] result <- redis.xPending(stream, group).either } yield assert(result)(isLeft(isSubtype[NoGroup](anything))) }, @@ -1181,7 +1325,7 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, consumer)(stream -> ">").returning[String, String] messages <- redis.xPending[String, String, String, String](stream, group, "-", "+", 10L) result = messages.head @@ -1199,9 +1343,9 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - firstMsg <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + firstMsg <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] - secondMsg <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + secondMsg <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, second)(stream -> ">").returning[String, String] messages <- redis.xPending[String, String, String, String](stream, group, "-", "+", 10L) (firstResult, secondResult) = (messages(0), messages(1)) @@ -1223,9 +1367,9 @@ trait StreamsSpec extends IntegrationSpec { first <- uuid second <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, first)(stream -> ">").returning[String, String] - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, second)(stream -> ">").returning[String, String] messages <- redis.xPending[String, String, String, String](stream, group, "-", "+", 10L, Some(first)) result = messages.head @@ -1249,7 +1393,7 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xRange(stream, "-", "+").returning[String, String] } yield assert(result)(equalTo(Chunk(StreamEntry(id, Map("a" -> "b"))))) }, @@ -1257,8 +1401,8 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - first <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + first <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] result <- redis.xRange(stream, "-", "+", 1L).returning[String, String] } yield assert(result)(equalTo(Chunk(StreamEntry(first, Map("a" -> "b"))))) }, @@ -1266,8 +1410,8 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] result <- redis.xRange(stream, "-", "+", -1L).returning[String, String] } yield assert(result)(isEmpty) }, @@ -1275,8 +1419,8 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] result <- redis.xRange(stream, "-", "+", 0L).returning[String, String] } yield assert(result)(isEmpty) }, @@ -1315,7 +1459,7 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xRead()(stream -> "0-0").returning[String, String] } yield assert(result)(equalTo(Chunk(StreamChunk(stream, Chunk(StreamEntry(id, Map("a" -> "b"))))))) }, @@ -1331,8 +1475,8 @@ trait StreamsSpec extends IntegrationSpec { redis <- ZIO.service[Redis] first <- uuid second <- uuid - firstMsg <- redis.xAdd(first, "*", "a" -> "b").returning[String] - secondMsg <- redis.xAdd(second, "*", "a" -> "b").returning[String] + firstMsg <- redis.xAdd(first, "*")("a" -> "b").returning[String].some + secondMsg <- redis.xAdd(second, "*")("a" -> "b").returning[String].some result <- redis.xRead()(first -> "0-0", second -> "0-0").returning[String, String] } yield assert(result)( equalTo( @@ -1347,8 +1491,8 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] result <- redis.xRead(Some(1L))(stream -> "0-0").returning[String, String] } yield assert(result)(equalTo(Chunk(StreamChunk(stream, Chunk(StreamEntry(id, Map("a" -> "b"))))))) }, @@ -1356,8 +1500,8 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - firstMsg <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - secondMsg <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + firstMsg <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + secondMsg <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xRead(Some(0L))(stream -> "0-0").returning[String, String] } yield assert(result)( equalTo( @@ -1374,8 +1518,8 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - firstMsg <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - secondMsg <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + firstMsg <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + secondMsg <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xRead(Some(-1L))(stream -> "0-0").returning[String, String] } yield assert(result)( equalTo( @@ -1393,7 +1537,7 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] result <- redis.xRead(block = Some(1.second))(stream -> "$").returning[String, String] } yield assert(result)(isEmpty) } @@ ignore, @@ -1401,7 +1545,7 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] result <- redis.xRead(block = Some(0.second))(stream -> "$").returning[String, String] } yield assert(result)(isEmpty) } @@ ignore, @@ -1409,7 +1553,7 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] result <- redis.xRead(block = Some((-1).second))(stream -> "$").returning[String, String] } yield assert(result)(isEmpty) } @@ ignore, @@ -1417,7 +1561,7 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] result <- redis.xRead()(stream -> "invalid").returning[String, String].either } yield assert(result)(isLeft(isSubtype[ProtocolError](anything))) }, @@ -1438,7 +1582,7 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xReadGroup(group, consumer)(stream -> ">").returning[String, String] } yield assert(result)(equalTo(Chunk(StreamChunk(stream, Chunk(StreamEntry(id, Map("a" -> "b"))))))) }, @@ -1449,8 +1593,8 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - first <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - second <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + first <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + second <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xReadGroup(group, consumer)(stream -> ">").returning[String, String] } yield assert(result)( equalTo( @@ -1479,8 +1623,8 @@ trait StreamsSpec extends IntegrationSpec { consumer <- uuid _ <- redis.xGroupCreate(first, group, "$", mkStream = true) _ <- redis.xGroupCreate(second, group, "$", mkStream = true) - firstMsg <- redis.xAdd(first, "*", "a" -> "b").returning[String] - secondMsg <- redis.xAdd(second, "*", "a" -> "b").returning[String] + firstMsg <- redis.xAdd(first, "*")("a" -> "b").returning[String].some + secondMsg <- redis.xAdd(second, "*")("a" -> "b").returning[String].some result <- redis.xReadGroup(group, consumer)(first -> ">", second -> ">").returning[String, String] } yield assert(result)( equalTo( @@ -1498,8 +1642,8 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - first <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + first <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] result <- redis.xReadGroup(group, consumer, Some(1L))(stream -> ">").returning[String, String] } yield assert(result)(equalTo(Chunk(StreamChunk(stream, Chunk(StreamEntry(first, Map("a" -> "b"))))))) }, @@ -1510,8 +1654,8 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - first <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - second <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + first <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + second <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xReadGroup(group, consumer, Some(0L))(stream -> ">").returning[String, String] } yield assert(result)( equalTo( @@ -1528,8 +1672,8 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - first <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - second <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + first <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some + second <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xReadGroup(group, consumer, Some(-1L))(stream -> ">").returning[String, String] } yield assert(result)( equalTo( @@ -1585,7 +1729,7 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xRevRange(stream, "+", "-").returning[String, String] } yield assert(result)(equalTo(Chunk(StreamEntry(id, Map("a" -> "b"))))) }, @@ -1593,8 +1737,8 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - second <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] + second <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xRevRange(stream, "+", "-", 1L).returning[String, String] } yield assert(result)(equalTo(Chunk(StreamEntry(second, Map("a" -> "b"))))) }, @@ -1602,8 +1746,8 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] result <- redis.xRevRange(stream, "+", "-", -1L).returning[String, String] } yield assert(result)(isEmpty) }, @@ -1611,8 +1755,8 @@ trait StreamsSpec extends IntegrationSpec { for { redis <- ZIO.service[Redis] stream <- uuid - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] result <- redis.xRevRange(stream, "+", "-", 0L).returning[String, String] } yield assert(result)(isEmpty) }, @@ -1646,36 +1790,94 @@ trait StreamsSpec extends IntegrationSpec { } yield assert(result)(isLeft(isSubtype[WrongType](anything))) } ), - suite("xTrim")( + suite("xTrimWithMaxLen")( test("an empty stream") { for { redis <- ZIO.service[Redis] stream <- uuid - result <- redis.xTrim(stream, 1000L) + result <- redis.xTrimWithMaxLen(stream, 1000L) } yield assert(result)(equalTo(0L)) }, test("a non-empty stream") { for { redis <- ZIO.service[Redis] stream <- uuid - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String].repeatN(3) - result <- redis.xTrim(stream, 2L) + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String].repeatN(3) + result <- redis.xTrimWithMaxLen(stream, 2L) } yield assert(result)(equalTo(2L)) }, test("a non-empty stream with an approximate") { for { redis <- ZIO.service[Redis] stream <- uuid - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - result <- redis.xTrim(stream, 1000L, approximate = true) + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] + result <- redis.xTrimWithMaxLen(stream, 1000L, approximate = true) + } yield assert(result)(equalTo(0L)) + }, + test("a non-empty stream with an approximate and limit") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] + result <- redis.xTrimWithMaxLen(stream, 1000L, approximate = true, limit = Some(100)) } yield assert(result)(equalTo(0L)) }, test("error when negative count") { for { redis <- ZIO.service[Redis] stream <- uuid - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] - result <- redis.xTrim(stream, -1000L).either + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] + result <- redis.xTrimWithMaxLen(stream, -1000L).either + } yield assert(result)(isLeft(isSubtype[ProtocolError](anything))) + }, + test("error when not stream") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + _ <- redis.set(stream, "value") + result <- redis.xTrimWithMaxLen(stream, 1000L).either + } yield assert(result)(isLeft(isSubtype[WrongType](anything))) + } + ), + suite("xTrimWithMinId")( + test("an empty stream") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + result <- redis.xTrimWithMinId(stream, "0-0") + } yield assert(result)(equalTo(0L)) + }, + test("a non-empty stream") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].repeatN(2).some + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] + result <- redis.xTrimWithMinId(stream, id) + } yield assert(result)(equalTo(2L)) + }, + test("a non-empty stream with an approximate") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] + result <- redis.xTrimWithMinId(stream, "0-0", approximate = true) + } yield assert(result)(equalTo(0L)) + }, + test("a non-empty stream with an approximate and limit") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] + result <- redis.xTrimWithMinId(stream, "0-0", approximate = true, limit = Some(100)) + } yield assert(result)(equalTo(0L)) + }, + test("error when invalid id") { + for { + redis <- ZIO.service[Redis] + stream <- uuid + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] + result <- redis.xTrimWithMinId(stream, "-100").either } yield assert(result)(isLeft(isSubtype[ProtocolError](anything))) }, test("error when not stream") { @@ -1683,7 +1885,7 @@ trait StreamsSpec extends IntegrationSpec { redis <- ZIO.service[Redis] stream <- uuid _ <- redis.set(stream, "value") - result <- redis.xTrim(stream, 1000L).either + result <- redis.xTrimWithMinId(stream, "0-0").either } yield assert(result)(isLeft(isSubtype[WrongType](anything))) } ), @@ -1694,7 +1896,7 @@ trait StreamsSpec extends IntegrationSpec { stream <- uuid group <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some result <- redis.xInfoStream(stream).returning[String, String, String] } yield assert(result.lastEntry.map(_.id))(isSome(equalTo(id))) }, @@ -1721,7 +1923,7 @@ trait StreamsSpec extends IntegrationSpec { stream <- uuid group <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] result <- redis.xInfoGroups[String](stream) } yield assert(result.toList.head.name)(equalTo(group)) }, @@ -1749,7 +1951,7 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - _ <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + _ <- redis.xAdd(stream, "*")("a" -> "b").returning[String] _ <- redis.xReadGroup(group, consumer)(stream -> ">").returning[String, String] result <- redis.xInfoConsumers(stream, group) } yield assert(result.toList.head.name)(equalTo(consumer)) @@ -1780,7 +1982,7 @@ trait StreamsSpec extends IntegrationSpec { group <- uuid consumer <- uuid _ <- redis.xGroupCreate(stream, group, "$", mkStream = true) - id <- redis.xAdd(stream, "*", "a" -> "b").returning[String] + id <- redis.xAdd(stream, "*")("a" -> "b").returning[String].some _ <- redis.xReadGroup(group, consumer)(stream -> ">").returning[String, String] result <- redis.xInfoStreamFull(stream).returning[String, String, String] } yield assert { diff --git a/modules/redis/src/main/scala/zio/redis/Input.scala b/modules/redis/src/main/scala/zio/redis/Input.scala index 5cde6684b..d3f85991f 100644 --- a/modules/redis/src/main/scala/zio/redis/Input.scala +++ b/modules/redis/src/main/scala/zio/redis/Input.scala @@ -126,7 +126,7 @@ object Input { case object DbInput extends Input[Long] { def encode(db: Long): RespCommand = - RespCommand(RespCommandArgument.Literal("DB"), RespCommandArgument.Value(db.toString())) + RespCommand(RespCommandArgument.Literal("DB"), RespCommandArgument.Value(db.toString)) } case object BoolInput extends Input[Boolean] { @@ -438,16 +438,71 @@ object Input { RespCommand(RespCommandArgument.Literal("STORE"), RespCommandArgument.Value(data.key)) } - case object StreamMaxLenInput extends Input[StreamMaxLen] { - def encode(data: StreamMaxLen): RespCommand = { - val chunk = - if (data.approximate) Chunk(RespCommandArgument.Literal("MAXLEN"), RespCommandArgument.Literal("~")) - else Chunk.single(RespCommandArgument.Literal("MAXLEN")) + case object MaxLenApproxInput extends Input[CappedStreamType.MaxLenApprox] { + def encode(data: CappedStreamType.MaxLenApprox): RespCommand = { + val maxLenChunk: Chunk[RespCommandArgument] = Chunk( + RespCommandArgument.Literal("MAXLEN"), + RespCommandArgument.Literal("~"), + RespCommandArgument.Value(data.count.toString) + ) + + val limitChunk = data.limit.fold(Chunk.empty[RespCommandArgument])(limit => + Chunk(RespCommandArgument.Literal("LIMIT"), RespCommandArgument.Value(limit.toString)) + ) + + RespCommand(maxLenChunk ++ limitChunk) + } + } + + case object MaxLenExactInput extends Input[CappedStreamType.MaxLenExact] { + def encode(data: CappedStreamType.MaxLenExact): RespCommand = + RespCommand( + Chunk( + RespCommandArgument.Literal("MAXLEN"), + RespCommandArgument.Literal("="), + RespCommandArgument.Value(data.count.toString) + ) + ) + } + + final case class MinIdApproxInput[I: BinaryCodec]() extends Input[CappedStreamType.MinIdApprox[I]] { + def encode(data: CappedStreamType.MinIdApprox[I]): RespCommand = { + val minIdChunk = + Chunk( + RespCommandArgument.Literal("MINID"), + RespCommandArgument.Literal("~"), + RespCommandArgument.Value(data.id) + ) - RespCommand(chunk :+ RespCommandArgument.Value(data.count.toString)) + val limitChunk = data.limit.fold(Chunk.empty[RespCommandArgument])(limit => + Chunk(RespCommandArgument.Literal("LIMIT"), RespCommandArgument.Value(limit.toString)) + ) + + RespCommand(minIdChunk ++ limitChunk) } } + final case class MinIdExactInput[I: BinaryCodec]() extends Input[CappedStreamType.MinIdExact[I]] { + def encode(data: _root_.zio.redis.CappedStreamType.MinIdExact[I]): RespCommand = + RespCommand( + Chunk( + RespCommandArgument.Literal("MINID"), + RespCommandArgument.Literal("="), + RespCommandArgument.Value(data.id) + ) + ) + } + + case object MkStreamInput extends Input[MkStream] { + def encode(data: MkStream): RespCommand = + RespCommand(RespCommandArgument.Value(data.asString)) + } + + case object NoMkStreamInput extends Input[NoMkStream] { + def encode(data: NoMkStream): RespCommand = + RespCommand(RespCommandArgument.Value(data.asString)) + } + final case class StreamsInput[K: BinaryCodec, V: BinaryCodec]() extends Input[((K, V), Chunk[(K, V)])] { def encode(data: ((K, V), Chunk[(K, V)])): RespCommand = { val (keys, ids) = (data._1 +: data._2).map { case (key, value) => @@ -619,6 +674,13 @@ object Input { RespCommand(RespCommandArgument.Literal(data.asString)) } + final case class LastIdInput[I: BinaryCodec]() extends Input[LastId[I]] { + def encode(data: LastId[I]): RespCommand = { + val chunk = Chunk(RespCommandArgument.Literal("LASTID"), RespCommandArgument.Value(data.lastId)) + RespCommand(chunk) + } + } + case object WithHashInput extends Input[WithHash] { def encode(data: WithHash): RespCommand = RespCommand(RespCommandArgument.Literal(data.asString)) @@ -639,6 +701,11 @@ object Input { RespCommand(RespCommandArgument.Literal(data.asString)) } + case object WithEntriesReadInput extends Input[WithEntriesRead] { + def encode(data: WithEntriesRead): RespCommand = + RespCommand(RespCommandArgument.Literal("ENTRIESREAD"), RespCommandArgument.Value(data.entries.toString)) + } + final case class XGroupCreateConsumerInput[K: BinaryCodec, G: BinaryCodec, C: BinaryCodec]() extends Input[XGroupCommand.CreateConsumer[K, G, C]] { def encode(data: XGroupCommand.CreateConsumer[K, G, C]): RespCommand = @@ -652,16 +719,13 @@ object Input { final case class XGroupCreateInput[K: BinaryCodec, G: BinaryCodec, I: BinaryCodec]() extends Input[XGroupCommand.Create[K, G, I]] { - def encode(data: XGroupCommand.Create[K, G, I]): RespCommand = { - val chunk = Chunk( + def encode(data: XGroupCommand.Create[K, G, I]): RespCommand = + RespCommand( RespCommandArgument.Literal("CREATE"), RespCommandArgument.Key(data.key), RespCommandArgument.Value(data.group), RespCommandArgument.Value(data.id) ) - - RespCommand(if (data.mkStream) chunk :+ RespCommandArgument.Literal(MkStream.asString) else chunk) - } } final case class XGroupDelConsumerInput[K: BinaryCodec, G: BinaryCodec, C: BinaryCodec]() @@ -693,5 +757,6 @@ object Input { RespCommandArgument.Value(data.group), RespCommandArgument.Value(data.id) ) + } } diff --git a/modules/redis/src/main/scala/zio/redis/Output.scala b/modules/redis/src/main/scala/zio/redis/Output.scala index e72f9811b..01c60d818 100644 --- a/modules/redis/src/main/scala/zio/redis/Output.scala +++ b/modules/redis/src/main/scala/zio/redis/Output.scala @@ -370,6 +370,8 @@ object Output { streamConsumersInfo = streamConsumersInfo.copy(pending = value.value) else if (key.asString == XInfoFields.Idle) streamConsumersInfo = streamConsumersInfo.copy(idle = value.value.millis) + else if (key.asString == XInfoFields.Inactive) + streamConsumersInfo = streamConsumersInfo.copy(inactive = value.value.millis) case _ => } pos += 2 @@ -476,6 +478,10 @@ object Output { streamGroupsInfo = streamGroupsInfo.copy(pending = value.value) else if (key.asString == XInfoFields.Consumers) streamGroupsInfo = streamGroupsInfo.copy(consumers = value.value) + else if (key.asString == XInfoFields.EntriesRead) + streamGroupsInfo = streamGroupsInfo.copy(entriesRead = value.value) + else if (key.asString == XInfoFields.Lag) + streamGroupsInfo = streamGroupsInfo.copy(lag = value.value) case _ => } pos += 2 @@ -507,14 +513,26 @@ object Output { // Get the basic information of the outermost stream. case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) => key.asString match { - case XInfoFields.Length => streamInfoFull = streamInfoFull.copy(length = value.value) - case XInfoFields.RadixTreeNodes => streamInfoFull = streamInfoFull.copy(radixTreeNodes = value.value) - case XInfoFields.RadixTreeKeys => streamInfoFull = streamInfoFull.copy(radixTreeKeys = value.value) + case XInfoFields.Length => + streamInfoFull = streamInfoFull.copy(length = value.value) + case XInfoFields.RadixTreeNodes => + streamInfoFull = streamInfoFull.copy(radixTreeNodes = value.value) + case XInfoFields.RadixTreeKeys => + streamInfoFull = streamInfoFull.copy(radixTreeKeys = value.value) + case XInfoFields.EntriesAdded => + streamInfoFull = streamInfoFull.copy(entriesAdded = value.value) case _ => } - case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_)) - if key.asString == XInfoFields.LastGeneratedId => - streamInfoFull = streamInfoFull.copy(lastGeneratedId = value.asString) + case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_)) => + key.asString match { + case XInfoFields.LastGeneratedId => + streamInfoFull = streamInfoFull.copy(lastGeneratedId = value.asString) + case XInfoFields.MaxDeletedEntryId => + streamInfoFull = streamInfoFull.copy(maxDeletedEntryId = value.asString) + case XInfoFields.RecordedFirstEntryId => + streamInfoFull = streamInfoFull.copy(recordedFirstEntryId = value.asString) + case _ => + } case (key @ RespValue.BulkString(_), value) if key.asString == XInfoFields.Entries => streamInfoFull = streamInfoFull.copy(entries = StreamEntriesOutput[I, K, V]().unsafeDecode(value)) case (key @ RespValue.BulkString(_), RespValue.Array(values)) if key.asString == XInfoFields.Groups => @@ -544,7 +562,7 @@ object Output { var pos = 0 while (pos < len) { (elements(pos), elements(pos + 1)) match { - case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) => + case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) => if (key.asString == XInfoFields.Length) streamInfo = streamInfo.copy(length = value.value) else if (key.asString == XInfoFields.RadixTreeNodes) @@ -553,15 +571,21 @@ object Output { streamInfo = streamInfo.copy(radixTreeKeys = value.value) else if (key.asString == XInfoFields.Groups) streamInfo = streamInfo.copy(groups = value.value) - case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_)) - if key.asString == XInfoFields.LastGeneratedId => - streamInfo = streamInfo.copy(lastGeneratedId = value.asString) - case (key @ RespValue.BulkString(_), value @ RespValue.Array(_)) => + else if (key.asString == XInfoFields.EntriesAdded) + streamInfo = streamInfo.copy(entriesAdded = value.value) + case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_)) => + if (key.asString == XInfoFields.LastGeneratedId) + streamInfo = streamInfo.copy(lastGeneratedId = value.asString) + else if (key.asString == XInfoFields.MaxDeletedEntryId) + streamInfo = streamInfo.copy(maxDeletedEntryId = value.asString) + else if (key.asString == XInfoFields.RecordedFirstEntryId) + streamInfo = streamInfo.copy(recordedFirstEntryId = value.asString) + case (key @ RespValue.BulkString(_), value @ RespValue.Array(_)) => if (key.asString == XInfoFields.FirstEntry) streamInfo = streamInfo.copy(firstEntry = Some(StreamEntryOutput[I, K, V]().unsafeDecode(value))) else if (key.asString == XInfoFields.LastEntry) streamInfo = streamInfo.copy(lastEntry = Some(StreamEntryOutput[I, K, V]().unsafeDecode(value))) - case _ => + case _ => } pos += 2 } @@ -767,9 +791,13 @@ object Output { case XInfoFields.Name => readyGroup = readyGroup.copy(name = value.asString) case _ => } - case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) - if XInfoFields.PelCount == key.asString => - readyGroup = readyGroup.copy(pelCount = value.value) + case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) => + key.asString match { + case XInfoFields.PelCount => readyGroup = readyGroup.copy(pelCount = value.value) + case XInfoFields.Lag => readyGroup = readyGroup.copy(lag = value.value) + case XInfoFields.EntriesRead => readyGroup = readyGroup.copy(entriesRead = value.value) + case _ => + } case (key @ RespValue.BulkString(_), RespValue.Array(values)) => // Get the consumer list of the current group. key.asString match { @@ -821,9 +849,10 @@ object Output { readyConsumer = readyConsumer.copy(name = value.asString) case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) => key.asString match { - case XInfoFields.PelCount => readyConsumer = readyConsumer.copy(pelCount = value.value) - case XInfoFields.SeenTime => readyConsumer = readyConsumer.copy(seenTime = value.value.millis) - case _ => + case XInfoFields.PelCount => readyConsumer = readyConsumer.copy(pelCount = value.value) + case XInfoFields.SeenTime => readyConsumer = readyConsumer.copy(seenTime = value.value.millis) + case XInfoFields.ActiveTime => readyConsumer = readyConsumer.copy(activeTime = value.value.millis) + case _ => } // Get the pel list of the current consumer. case (key @ RespValue.BulkString(_), RespValue.Array(values)) if key.asString == XInfoFields.Pending => diff --git a/modules/redis/src/main/scala/zio/redis/api/Streams.scala b/modules/redis/src/main/scala/zio/redis/api/Streams.scala index b6a8cc4de..e364728da 100644 --- a/modules/redis/src/main/scala/zio/redis/api/Streams.scala +++ b/modules/redis/src/main/scala/zio/redis/api/Streams.scala @@ -64,6 +64,8 @@ trait Streams[G[+_]] extends RedisEnvironment[G] { * ID of the stream * @param id * ID of the message + * @param noMakeStream + * The creation of stream's key can be disabled with the noMakeStream option * @param pair * field and value pair * @param pairs @@ -71,25 +73,25 @@ trait Streams[G[+_]] extends RedisEnvironment[G] { * @return * ID of the added entry. */ - final def xAdd[SK: Schema, I: Schema, K: Schema, V: Schema]( - key: SK, - id: I, + final def xAdd[SK: Schema, I: Schema, K: Schema, V: Schema](key: SK, id: I, noMakeStream: Boolean = false)( pair: (K, V), pairs: (K, V)* - ): ResultBuilder1[Id, G] = - new ResultBuilder1[Id, G] { - def returning[R: Schema]: G[Id[R]] = { + ): ResultBuilder1[Option, G] = + new ResultBuilder1[Option, G] { + def returning[R: Schema]: G[Option[Id[R]]] = { val command = RedisCommand( XAdd, Tuple4( ArbitraryKeyInput[SK](), - OptionalInput(StreamMaxLenInput), + OptionalInput(NoMkStreamInput), ArbitraryValueInput[I](), NonEmptyList(Tuple2(ArbitraryKeyInput[K](), ArbitraryValueInput[V]())) ), - ArbitraryOutput[R]() + OptionalOutput(ArbitraryOutput[R]()) ) - command.run((key, None, id, (pair, pairs.toList))) + + val noMkStreamOpt = if (noMakeStream) Some(NoMkStream) else None + command.run((key, noMkStreamOpt, id, (pair, pairs.toList))) } } @@ -104,6 +106,10 @@ trait Streams[G[+_]] extends RedisEnvironment[G] { * maximum number of elements in a stream * @param approximate * flag that indicates if a stream should be limited to the exact number of elements + * @param limit + * limit parameter + * @param noMakeStream + * the creation of stream's key can be disabled with the NOMKSTREAM option. * @param pair * field and value pair * @param pairs @@ -115,24 +121,115 @@ trait Streams[G[+_]] extends RedisEnvironment[G] { key: SK, id: I, count: Long, - approximate: Boolean = false + approximate: Boolean = false, + limit: Option[Long] = None, + noMakeStream: Boolean = false )( pair: (K, V), pairs: (K, V)* - ): ResultBuilder1[Id, G] = - new ResultBuilder1[Id, G] { - def returning[R: Schema]: G[Id[R]] = { - val command = RedisCommand( - XAdd, - Tuple4( - ArbitraryKeyInput[SK](), - OptionalInput(StreamMaxLenInput), - ArbitraryValueInput[I](), - NonEmptyList(Tuple2(ArbitraryKeyInput[K](), ArbitraryValueInput[V]())) - ), - ArbitraryOutput[R]() - ) - command.run((key, Some(StreamMaxLen(approximate, count)), id, (pair, pairs.toList))) + ): ResultBuilder1[Option, G] = + new ResultBuilder1[Option, G] { + def returning[R: Schema]: G[Option[Id[R]]] = { + val noMkStreamOpt = if (noMakeStream) Some(NoMkStream) else None + + if (approximate) { + val command = RedisCommand( + XAdd, + Tuple5( + ArbitraryKeyInput[SK](), + OptionalInput(NoMkStreamInput), + OptionalInput(MaxLenApproxInput), + ArbitraryValueInput[I](), + NonEmptyList(Tuple2(ArbitraryKeyInput[K](), ArbitraryValueInput[V]())) + ), + OptionalOutput(ArbitraryOutput[R]()) + ) + + command.run((key, noMkStreamOpt, Some(CappedStreamType.MaxLenApprox(count, limit)), id, (pair, pairs.toList))) + } else { + val command = RedisCommand( + XAdd, + Tuple5( + ArbitraryKeyInput[SK](), + OptionalInput(NoMkStreamInput), + OptionalInput(MaxLenExactInput), + ArbitraryValueInput[I](), + NonEmptyList(Tuple2(ArbitraryKeyInput[K](), ArbitraryValueInput[V]())) + ), + OptionalOutput(ArbitraryOutput[R]()) + ) + + command.run((key, noMkStreamOpt, Some(CappedStreamType.MaxLenExact(count)), id, (pair, pairs.toList))) + } + } + } + + /** + * Appends the specified stream entry to the stream at the specified key while limiting the size of the stream. + * + * @param key + * ID of the stream + * @param id + * ID of the message + * @param minId + * Evicts entries with IDs lower than minId, where minId is a stream ID. + * @param approximate + * flag that indicates if a stream should be limited to the exact number of elements + * @param limit + * limit parameter + * @param noMakeStream + * the creation of stream's key can be disabled with the NOMKSTREAM option. + * @param pair + * field and value pair + * @param pairs + * rest of the field and value pairs + * @return + * ID of the added entry. + */ + final def xAddWithMinId[SK: Schema, I: Schema, K: Schema, V: Schema]( + key: SK, + id: I, + minId: I, + approximate: Boolean = false, + limit: Option[Long] = None, + noMakeStream: Boolean = false + )( + pair: (K, V), + pairs: (K, V)* + ): ResultBuilder1[Option, G] = + new ResultBuilder1[Option, G] { + def returning[R: Schema]: G[Option[Id[R]]] = { + val noMkStreamOpt = if (noMakeStream) Some(NoMkStream) else None + + if (approximate) { + val command = RedisCommand( + XAdd, + Tuple5( + ArbitraryKeyInput[SK](), + OptionalInput(NoMkStreamInput), + OptionalInput(MinIdApproxInput[I]()), + ArbitraryValueInput[I](), + NonEmptyList(Tuple2(ArbitraryKeyInput[K](), ArbitraryValueInput[V]())) + ), + OptionalOutput(ArbitraryOutput[R]()) + ) + + command.run((key, noMkStreamOpt, Some(CappedStreamType.MinIdApprox(minId, limit)), id, (pair, pairs.toList))) + } else { + val command = RedisCommand( + XAdd, + Tuple5( + ArbitraryKeyInput[SK](), + OptionalInput(NoMkStreamInput), + OptionalInput(MinIdExactInput[I]()), + ArbitraryValueInput[I](), + NonEmptyList(Tuple2(ArbitraryKeyInput[K](), ArbitraryValueInput[V]())) + ), + OptionalOutput(ArbitraryOutput[R]()) + ) + + command.run((key, noMkStreamOpt, Some(CappedStreamType.MinIdExact(minId)), id, (pair, pairs.toList))) + } } } @@ -258,14 +355,15 @@ trait Streams[G[+_]] extends RedisEnvironment[G] { idle: Option[Duration] = None, time: Option[Duration] = None, retryCount: Option[Long] = None, - force: Boolean = false + force: Boolean = false, + lastId: Option[I] = None )(id: I, ids: I*): ResultBuilder2[({ type lambda[x, y] = StreamEntries[I, x, y] })#lambda, G] = new ResultBuilder2[({ type lambda[x, y] = StreamEntries[I, x, y] })#lambda, G] { def returning[RK: Schema, RV: Schema]: G[StreamEntries[I, RK, RV]] = { val command = RedisCommand( XClaim, - Tuple9( + Tuple10( ArbitraryKeyInput[SK](), ArbitraryValueInput[SG](), ArbitraryValueInput[SC](), @@ -274,13 +372,16 @@ trait Streams[G[+_]] extends RedisEnvironment[G] { OptionalInput(IdleInput), OptionalInput(TimeInput), OptionalInput(RetryCountInput), - OptionalInput(WithForceInput) + OptionalInput(WithForceInput), + Input.OptionalInput(LastIdInput[I]()) ), StreamEntriesOutput[I, RK, RV]() ) val forceOpt = if (force) Some(WithForce) else None - command.run((key, group, consumer, minIdleTime, (id, ids.toList), idle, time, retryCount, forceOpt)) + command.run( + (key, group, consumer, minIdleTime, (id, ids.toList), idle, time, retryCount, forceOpt, lastId.map(LastId(_))) + ) } } @@ -319,13 +420,14 @@ trait Streams[G[+_]] extends RedisEnvironment[G] { idle: Option[Duration] = None, time: Option[Duration] = None, retryCount: Option[Long] = None, - force: Boolean = false + force: Boolean = false, + lastId: Option[I] = None )(id: I, ids: I*): ResultBuilder1[Chunk, G] = new ResultBuilder1[Chunk, G] { def returning[R: Schema]: G[Chunk[R]] = { val command = RedisCommand( XClaim, - Tuple10( + Tuple11( ArbitraryKeyInput[SK](), ArbitraryValueInput[SG](), ArbitraryValueInput[SC](), @@ -335,12 +437,27 @@ trait Streams[G[+_]] extends RedisEnvironment[G] { OptionalInput(TimeInput), OptionalInput(RetryCountInput), OptionalInput(WithForceInput), - WithJustIdInput + WithJustIdInput, + OptionalInput(LastIdInput[I]()) ), ChunkOutput(ArbitraryOutput[R]()) ) val forceOpt = if (force) Some(WithForce) else None - command.run((key, group, consumer, minIdleTime, (id, ids.toList), idle, time, retryCount, forceOpt, WithJustId)) + command.run( + ( + key, + group, + consumer, + minIdleTime, + (id, ids.toList), + idle, + time, + retryCount, + forceOpt, + WithJustId, + lastId.map(LastId(_)) + ) + ) } } @@ -373,17 +490,46 @@ trait Streams[G[+_]] extends RedisEnvironment[G] { * ID of the last item in the stream to consider already delivered * @param mkStream * ID of the last item in the stream to consider already delivered + * @param entriesRead + * Enable consumer group lag tracking */ final def xGroupCreate[SK: Schema, SG: Schema, I: Schema]( key: SK, group: SG, id: I, - mkStream: Boolean = false + mkStream: Boolean = false, + entriesRead: Option[Long] = None ): G[Unit] = { - val command = RedisCommand(XGroup, XGroupCreateInput[SK, SG, I](), UnitOutput) - command.run(Create(key, group, id, mkStream)) + val command = + RedisCommand( + XGroup, + Tuple3(XGroupCreateInput[SK, SG, I](), OptionalInput(MkStreamInput), OptionalInput(WithEntriesReadInput)), + UnitOutput + ) + command.run( + (Create(key, group, id), if (mkStream) Some(MkStream) else None, entriesRead.map(WithEntriesRead(_))) + ) } + /** + * Create a new consumer group associated with a stream. + * + * @param key + * ID of the stream + * @param group + * ID of the consumer group + * @param mkStream + * ID of the last item in the stream to consider already delivered + * @param entriesRead + * Enable consumer group lag tracking + */ + final def xGroupCreateLastEntry[SK: Schema, SG: Schema]( + key: SK, + group: SG, + mkStream: Boolean = false, + entriesRead: Option[Long] = None + ): G[Unit] = xGroupCreate(key, group, "$", mkStream, entriesRead) + /** * Create a new consumer associated with a consumer group. * @@ -448,16 +594,38 @@ trait Streams[G[+_]] extends RedisEnvironment[G] { * ID of the consumer group * @param id * last delivered ID to set + * @param entriesRead + * Enable consumer group lag tracking */ final def xGroupSetId[SK: Schema, SG: Schema, I: Schema]( key: SK, group: SG, - id: I + id: I, + entriesRead: Option[Long] = None ): G[Unit] = { - val command = RedisCommand(XGroup, XGroupSetIdInput[SK, SG, I](), UnitOutput) - command.run(SetId(key, group, id)) + val command = + RedisCommand(XGroup, Tuple2(XGroupSetIdInput[SK, SG, I](), OptionalInput(WithEntriesReadInput)), UnitOutput) + command.run((SetId(key, group, id), entriesRead.map(WithEntriesRead(_)))) } + /** + * Set the consumer group last delivered ID to something else. + * + * @param key + * ID of the stream + * @param group + * ID of the consumer group + * @param id + * last delivered ID to set + * @param entriesRead + * Enable consumer group lag tracking + */ + final def xGroupSetIdLastEntry[SK: Schema, SG: Schema]( + key: SK, + group: SG, + entriesRead: Option[Long] = None + ): G[Unit] = xGroupSetId(key, group, "$", entriesRead) + /** * An introspection command used in order to retrieve different information about the consumers. * @@ -866,14 +1034,45 @@ trait Streams[G[+_]] extends RedisEnvironment[G] { * @return * the number of entries deleted from the stream. */ - final def xTrim[SK: Schema]( + final def xTrimWithMaxLen[SK: Schema]( key: SK, count: Long, - approximate: Boolean = false - ): G[Long] = { - val command = RedisCommand(XTrim, Tuple2(ArbitraryKeyInput[SK](), StreamMaxLenInput), LongOutput) - command.run((key, StreamMaxLen(approximate, count))) - } + approximate: Boolean = false, + limit: Option[Long] = None + ): G[Long] = + if (approximate) { + val command = RedisCommand(XTrim, Tuple2(ArbitraryKeyInput[SK](), MaxLenApproxInput), LongOutput) + command.run((key, CappedStreamType.MaxLenApprox(count, limit))) + } else { + val command = RedisCommand(XTrim, Tuple2(ArbitraryKeyInput[SK](), MaxLenExactInput), LongOutput) + command.run((key, CappedStreamType.MaxLenExact(count))) + } + + /** + * Trims the stream to a given number of items, evicting older items (items with lower IDs) if needed. + * + * @param key + * ID of the stream + * @param minId + * minimal stream id + * @param approximate + * flag that indicates if the stream length should be exactly count or few tens of entries more + * @return + * the number of entries deleted from the stream. + */ + final def xTrimWithMinId[SK: Schema, I: Schema]( + key: SK, + minId: I, + approximate: Boolean = false, + limit: Option[Long] = None + ): G[Long] = + if (approximate) { + val command = RedisCommand(XTrim, Tuple2(ArbitraryKeyInput[SK](), MinIdApproxInput[I]()), LongOutput) + command.run((key, CappedStreamType.MinIdApprox(minId, limit))) + } else { + val command = RedisCommand(XTrim, Tuple2(ArbitraryKeyInput[SK](), MinIdExactInput[I]()), LongOutput) + command.run((key, CappedStreamType.MinIdExact(minId))) + } } private object Streams { diff --git a/modules/redis/src/main/scala/zio/redis/options/Streams.scala b/modules/redis/src/main/scala/zio/redis/options/Streams.scala index 7037f10df..4c01c3fd0 100644 --- a/modules/redis/src/main/scala/zio/redis/options/Streams.scala +++ b/modules/redis/src/main/scala/zio/redis/options/Streams.scala @@ -31,14 +31,18 @@ trait Streams { type WithJustId = WithJustId.type + sealed case class LastId[I](lastId: I) + + sealed case class WithEntriesRead(entries: Long) + sealed trait XGroupCommand object XGroupCommand { - sealed case class Create[SK, SG, I](key: SK, group: SG, id: I, mkStream: Boolean) extends XGroupCommand - sealed case class SetId[SK, SG, I](key: SK, group: SG, id: I) extends XGroupCommand - sealed case class Destroy[SK, SG](key: SK, group: SG) extends XGroupCommand - sealed case class CreateConsumer[SK, SG, SC](key: SK, group: SG, consumer: SC) extends XGroupCommand - sealed case class DelConsumer[SK, SG, SC](key: SK, group: SG, consumer: SC) extends XGroupCommand + sealed case class Create[SK, SG, I](key: SK, group: SG, id: I) extends XGroupCommand + sealed case class SetId[SK, SG, I](key: SK, group: SG, id: I) extends XGroupCommand + sealed case class Destroy[SK, SG](key: SK, group: SG) extends XGroupCommand + sealed case class CreateConsumer[SK, SG, SC](key: SK, group: SG, consumer: SC) extends XGroupCommand + sealed case class DelConsumer[SK, SG, SC](key: SK, group: SG, consumer: SC) extends XGroupCommand } case object MkStream { @@ -47,6 +51,12 @@ trait Streams { type MkStream = MkStream.type + case object NoMkStream { + private[redis] def asString = "NOMKSTREAM" + } + + type NoMkStream = NoMkStream.type + sealed case class PendingInfo( total: Long, first: Option[String], @@ -71,7 +81,14 @@ trait Streams { type StreamChunks[N, I, K, V] = Chunk[StreamChunk[N, I, K, V]] - sealed case class StreamMaxLen(approximate: Boolean, count: Long) + sealed trait CappedStreamType + + object CappedStreamType { + sealed case class MaxLenApprox(count: Long, limit: Option[Long]) extends CappedStreamType + sealed case class MaxLenExact(count: Long) extends CappedStreamType + sealed case class MinIdApprox[I](id: I, limit: Option[Long]) extends CappedStreamType + sealed case class MinIdExact[I](id: I) extends CappedStreamType + } sealed case class StreamEntry[I, K, V](id: I, fields: Map[K, V]) @@ -89,35 +106,41 @@ trait Streams { length: Long, radixTreeKeys: Long, radixTreeNodes: Long, - groups: Long, lastGeneratedId: String, + maxDeletedEntryId: String, + entriesAdded: Long, + recordedFirstEntryId: String, + groups: Long, firstEntry: Option[StreamEntry[I, K, V]], lastEntry: Option[StreamEntry[I, K, V]] ) object StreamInfo { - def empty[I, K, V]: StreamInfo[I, K, V] = StreamInfo(0, 0, 0, 0, "", None, None) + def empty[I, K, V]: StreamInfo[I, K, V] = StreamInfo(0, 0, 0, "", "", 0, "", 0, None, None) } sealed case class StreamGroupsInfo( name: String, consumers: Long, pending: Long, - lastDeliveredId: String + lastDeliveredId: String, + entriesRead: Long, + lag: Long ) object StreamGroupsInfo { - def empty: StreamGroupsInfo = StreamGroupsInfo("", 0, 0, "") + def empty: StreamGroupsInfo = StreamGroupsInfo("", 0, 0, "", 0, 0) } sealed case class StreamConsumersInfo( name: String, pending: Long, - idle: Duration + idle: Duration, + inactive: Duration ) object StreamConsumersInfo { - def empty: StreamConsumersInfo = StreamConsumersInfo("", 0, 0.millis) + def empty: StreamConsumersInfo = StreamConsumersInfo("", 0, 0.millis, 0.millis) } object StreamInfoWithFull { @@ -127,44 +150,58 @@ trait Streams { radixTreeKeys: Long, radixTreeNodes: Long, lastGeneratedId: String, + maxDeletedEntryId: String, + entriesAdded: Long, + recordedFirstEntryId: String, entries: Chunk[StreamEntry[I, K, V]], groups: Chunk[ConsumerGroups] ) object FullStreamInfo { - def empty[I, K, V]: FullStreamInfo[I, K, V] = FullStreamInfo(0, 0, 0, "", Chunk.empty, Chunk.empty) + def empty[I, K, V]: FullStreamInfo[I, K, V] = FullStreamInfo(0, 0, 0, "", "", 0, "", Chunk.empty, Chunk.empty) } sealed case class ConsumerGroups( name: String, lastDeliveredId: String, + entriesRead: Long, + lag: Long, pelCount: Long, pending: Chunk[GroupPel], consumers: Chunk[Consumers] ) object ConsumerGroups { - def empty: ConsumerGroups = ConsumerGroups("", "", 0, Chunk.empty, Chunk.empty) + def empty: ConsumerGroups = ConsumerGroups("", "", 0, 0, 0, Chunk.empty, Chunk.empty) } sealed case class GroupPel(entryId: String, consumerName: String, deliveryTime: Duration, deliveryCount: Long) - sealed case class Consumers(name: String, seenTime: Duration, pelCount: Long, pending: Chunk[ConsumerPel]) + sealed case class Consumers( + name: String, + seenTime: Duration, + activeTime: Duration, + pelCount: Long, + pending: Chunk[ConsumerPel] + ) object Consumers { - def empty: Consumers = Consumers("", 0.millis, 0, Chunk.empty) + def empty: Consumers = Consumers("", 0.millis, 0.millis, 0, Chunk.empty) } sealed case class ConsumerPel(entryId: String, deliveryTime: Duration, deliveryCount: Long) } private[redis] object XInfoFields { - val Name: String = "name" - val Idle: String = "idle" - val Pending: String = "pending" + val Name: String = "name" + val Idle: String = "idle" + val Pending: String = "pending" + val Inactive: String = "inactive" val Consumers: String = "consumers" val LastDeliveredId: String = "last-delivered-id" + val EntriesRead: String = "entries-read" + val Lag: String = "lag" val Length: String = "length" val RadixTreeKeys: String = "radix-tree-keys" @@ -174,8 +211,13 @@ trait Streams { val FirstEntry: String = "first-entry" val LastEntry: String = "last-entry" - val Entries: String = "entries" - val PelCount: String = "pel-count" - val SeenTime: String = "seen-time" + val MaxDeletedEntryId: String = "max-deleted-entry-id" + val EntriesAdded: String = "entries-added" + val RecordedFirstEntryId: String = "recorded-first-entry-id" + + val Entries: String = "entries" + val PelCount: String = "pel-count" + val SeenTime: String = "seen-time" + val ActiveTime: String = "active-time" } } diff --git a/modules/redis/src/test/scala/zio/redis/InputSpec.scala b/modules/redis/src/test/scala/zio/redis/InputSpec.scala index d753f895f..b96f0310c 100644 --- a/modules/redis/src/test/scala/zio/redis/InputSpec.scala +++ b/modules/redis/src/test/scala/zio/redis/InputSpec.scala @@ -14,9 +14,9 @@ object InputSpec extends BaseSpec { import BitFieldCommand._ import BitFieldType._ import BitOperation._ + import LcsQueryType._ import Order._ import RadiusUnit._ - import LcsQueryType._ def spec: Spec[Any, Throwable] = suite("Input encoders")( @@ -1148,7 +1148,7 @@ object InputSpec extends BaseSpec { ZIO .attempt( XGroupCreateInput[String, String, String]().encode( - XGroupCommand.Create("key", "group", "id", mkStream = false) + XGroupCommand.Create("key", "group", "id") ) ) .map(assert(_)(equalTo(RespCommand(Literal("CREATE"), Key("key"), Value("group"), Value("id"))))) @@ -1157,13 +1157,13 @@ object InputSpec extends BaseSpec { ZIO .attempt( XGroupCreateInput[String, String, String]().encode( - XGroupCommand.Create("key", "group", "id", mkStream = true) + XGroupCommand.Create("key", "group", "id") ) ) .map( assert(_)( equalTo( - RespCommand(Literal("CREATE"), Key("key"), Value("group"), Value("id"), Literal("MKSTREAM")) + RespCommand(Literal("CREATE"), Key("key"), Value("group"), Value("id")) ) ) ) @@ -1174,6 +1174,11 @@ object InputSpec extends BaseSpec { ZIO .attempt(XGroupSetIdInput[String, String, String]().encode(XGroupCommand.SetId("key", "group", "id"))) .map(assert(_)(equalTo(RespCommand(Literal("SETID"), Key("key"), Value("group"), Value("id"))))) + }, + test("valid value with entries read") { + ZIO + .attempt(XGroupSetIdInput[String, String, String]().encode(XGroupCommand.SetId("key", "group", "id"))) + .map(assert(_)(equalTo(RespCommand(Literal("SETID"), Key("key"), Value("group"), Value("id"))))) } ), suite("XGroupDestroy")( @@ -1245,16 +1250,44 @@ object InputSpec extends BaseSpec { ZIO.attempt(NoAckInput.encode(NoAck)).map(assert(_)(equalTo(RespCommand(Value("NOACK"))))) } ), - suite("MaxLen")( - test("with approximate") { + suite("CappedStreamOption")( + test("MaxLen with approx") { ZIO - .attempt(StreamMaxLenInput.encode(StreamMaxLen(approximate = true, 10))) + .attempt(MaxLenApproxInput.encode(CappedStreamType.MaxLenApprox(10, None))) .map(assert(_)(equalTo(RespCommand(Literal("MAXLEN"), Literal("~"), Value("10"))))) }, - test("without approximate") { + test("MaxLen with approx and Limit") { ZIO - .attempt(StreamMaxLenInput.encode(StreamMaxLen(approximate = false, 10))) - .map(assert(_)(equalTo(RespCommand(Literal("MAXLEN"), Value("10"))))) + .attempt(MaxLenApproxInput.encode(CappedStreamType.MaxLenApprox(10, Some(100)))) + .map( + assert(_)( + equalTo(RespCommand(Literal("MAXLEN"), Literal("~"), Value("10"), Literal("LIMIT"), Value("100"))) + ) + ) + }, + test("MaxLen with exact") { + ZIO + .attempt(MaxLenExactInput.encode(CappedStreamType.MaxLenExact(10))) + .map(assert(_)(equalTo(RespCommand(Literal("MAXLEN"), Literal("="), Value("10"))))) + }, + test("MinId with approx") { + ZIO + .attempt(MinIdApproxInput[String]().encode(CappedStreamType.MinIdApprox("10", None))) + .map(assert(_)(equalTo(RespCommand(Literal("MINID"), Literal("~"), Value("10"))))) + }, + test("MinId with approx and Limit") { + ZIO + .attempt(MinIdApproxInput[String]().encode(CappedStreamType.MinIdApprox("10", Some(100)))) + .map( + assert(_)( + equalTo(RespCommand(Literal("MINID"), Literal("~"), Value("10"), Literal("LIMIT"), Value("100"))) + ) + ) + }, + test("MinId with exact") { + ZIO + .attempt(MinIdExactInput[String]().encode(CappedStreamType.MinIdExact("10"))) + .map(assert(_)(equalTo(RespCommand(Literal("MINID"), Literal("="), Value("10"))))) } ), suite("WithForce")( diff --git a/modules/redis/src/test/scala/zio/redis/OutputSpec.scala b/modules/redis/src/test/scala/zio/redis/OutputSpec.scala index 81a0588a3..a35f41bf0 100644 --- a/modules/redis/src/test/scala/zio/redis/OutputSpec.scala +++ b/modules/redis/src/test/scala/zio/redis/OutputSpec.scala @@ -632,10 +632,16 @@ object OutputSpec extends BaseSpec { RespValue.Integer(2), RespValue.bulkString("radix-tree-nodes"), RespValue.Integer(3), - RespValue.bulkString("groups"), - RespValue.Integer(2), RespValue.bulkString("last-generated-id"), RespValue.bulkString("2-0"), + RespValue.bulkString("max-deleted-entry-id"), + RespValue.bulkString("3-0"), + RespValue.bulkString("entries-added"), + RespValue.Integer(5), + RespValue.bulkString("recorded-first-entry-id"), + RespValue.bulkString("1-0"), + RespValue.bulkString("groups"), + RespValue.Integer(2), RespValue.bulkString("first-entry"), RespValue.array( RespValue.bulkString("1-0"), @@ -660,8 +666,11 @@ object OutputSpec extends BaseSpec { 1, 2, 3, - 2, "2-0", + "3-0", + 5, + "1-0", + 2, Some(StreamEntry("1-0", Map("key1" -> "value1"))), Some(StreamEntry("2-0", Map("key2" -> "value2"))) ) @@ -676,14 +685,20 @@ object OutputSpec extends BaseSpec { RespValue.Integer(2), RespValue.bulkString("radix-tree-nodes"), RespValue.Integer(3), - RespValue.bulkString("groups"), - RespValue.Integer(1), RespValue.bulkString("last-generated-id"), - RespValue.bulkString("0-0") + RespValue.bulkString("2-0"), + RespValue.bulkString("max-deleted-entry-id"), + RespValue.bulkString("3-0"), + RespValue.bulkString("entries-added"), + RespValue.Integer(5), + RespValue.bulkString("recorded-first-entry-id"), + RespValue.bulkString("1-0"), + RespValue.bulkString("groups"), + RespValue.Integer(2) ) assertZIO(ZIO.attempt(StreamInfoOutput[String, String, String]().unsafeDecode(resp)))( - equalTo(StreamInfo[String, String, String](1, 2, 3, 1, "0-0", None, None)) + equalTo(StreamInfo[String, String, String](1, 2, 3, "2-0", "3-0", 5, "1-0", 2, None, None)) ) } ), @@ -698,7 +713,11 @@ object OutputSpec extends BaseSpec { RespValue.bulkString("pending"), RespValue.Integer(1), RespValue.bulkString("last-delivered-id"), - RespValue.bulkString("1-0") + RespValue.bulkString("1-0"), + RespValue.bulkString("entries-read"), + RespValue.Integer(1), + RespValue.bulkString("lag"), + RespValue.Integer(1) ), RespValue.array( RespValue.bulkString("name"), @@ -708,15 +727,19 @@ object OutputSpec extends BaseSpec { RespValue.bulkString("pending"), RespValue.Integer(2), RespValue.bulkString("last-delivered-id"), - RespValue.bulkString("2-0") + RespValue.bulkString("2-0"), + RespValue.bulkString("entries-read"), + RespValue.Integer(2), + RespValue.bulkString("lag"), + RespValue.Integer(2) ) ) assertZIO(ZIO.attempt(StreamGroupsInfoOutput.unsafeDecode(resp)))( equalTo( Chunk( - StreamGroupsInfo("group1", 1, 1, "1-0"), - StreamGroupsInfo("group2", 2, 2, "2-0") + StreamGroupsInfo("group1", 1, 1, "1-0", 1, 1), + StreamGroupsInfo("group2", 2, 2, "2-0", 2, 2) ) ) ) @@ -736,6 +759,8 @@ object OutputSpec extends BaseSpec { RespValue.bulkString("pending"), RespValue.Integer(1), RespValue.bulkString("idle"), + RespValue.Integer(100), + RespValue.bulkString("inactive"), RespValue.Integer(100) ), RespValue.array( @@ -744,6 +769,8 @@ object OutputSpec extends BaseSpec { RespValue.bulkString("pending"), RespValue.Integer(2), RespValue.bulkString("idle"), + RespValue.Integer(200), + RespValue.bulkString("inactive"), RespValue.Integer(200) ) ) @@ -751,8 +778,8 @@ object OutputSpec extends BaseSpec { assertZIO(ZIO.attempt(StreamConsumersInfoOutput.unsafeDecode(resp)))( equalTo( Chunk( - StreamConsumersInfo("consumer1", 1, 100.millis), - StreamConsumersInfo("consumer2", 2, 200.millis) + StreamConsumersInfo("consumer1", 1, 100.millis, 100.millis), + StreamConsumersInfo("consumer2", 2, 200.millis, 200.millis) ) ) ) @@ -774,6 +801,12 @@ object OutputSpec extends BaseSpec { RespValue.Integer(3), RespValue.bulkString("last-generated-id"), RespValue.bulkString("0-0"), + RespValue.bulkString("max-deleted-entry-id"), + RespValue.bulkString("1-0"), + RespValue.bulkString("entries-added"), + RespValue.Integer(4), + RespValue.bulkString("recorded-first-entry-id"), + RespValue.bulkString("2-0"), RespValue.bulkString("entries"), RespValue.array( RespValue.array( @@ -800,6 +833,9 @@ object OutputSpec extends BaseSpec { 2, 3, "0-0", + "1-0", + 4, + "2-0", Chunk(StreamEntry("3-0", Map("key1" -> "value1")), StreamEntry("4-0", Map("key1" -> "value1"))), Chunk.empty ) @@ -815,11 +851,18 @@ object OutputSpec extends BaseSpec { RespValue.bulkString("radix-tree-nodes"), RespValue.Integer(3), RespValue.bulkString("last-generated-id"), - RespValue.bulkString("0-0") + RespValue.bulkString("0-0"), + RespValue.bulkString("max-deleted-entry-id"), + RespValue.bulkString("1-0"), + RespValue.bulkString("entries-added"), + RespValue.Integer(4), + RespValue.bulkString("recorded-first-entry-id"), + RespValue.bulkString("2-0") ) assertZIO(ZIO.attempt(StreamInfoFullOutput[String, String, String]().unsafeDecode(resp)))( equalTo( - StreamInfoWithFull.FullStreamInfo[String, String, String](1, 2, 3, "0-0", Chunk.empty, Chunk.empty) + StreamInfoWithFull + .FullStreamInfo[String, String, String](1, 2, 3, "0-0", "1-0", 4, "2-0", Chunk.empty, Chunk.empty) ) ) }, @@ -827,7 +870,7 @@ object OutputSpec extends BaseSpec { val resp = RespValue.array() assertZIO(ZIO.attempt(StreamConsumersInfoOutput.unsafeDecode(resp)))(isEmpty) }, - test("extract a valid value with groups") { + test("extract a valid value with groups and entries") { val resp = RespValue.array( RespValue.bulkString("length"), RespValue.Integer(1), @@ -837,6 +880,12 @@ object OutputSpec extends BaseSpec { RespValue.Integer(3), RespValue.bulkString("last-generated-id"), RespValue.bulkString("0-0"), + RespValue.bulkString("max-deleted-entry-id"), + RespValue.bulkString("1-0"), + RespValue.bulkString("entries-added"), + RespValue.Integer(4), + RespValue.bulkString("recorded-first-entry-id"), + RespValue.bulkString("2-0"), RespValue.bulkString("entries"), RespValue.array( RespValue.array( @@ -861,8 +910,12 @@ object OutputSpec extends BaseSpec { RespValue.bulkString("name1"), RespValue.bulkString("last-delivered-id"), RespValue.bulkString("lastDeliveredId"), - RespValue.bulkString("pel-count"), + RespValue.bulkString("entries-read"), RespValue.Integer(1), + RespValue.bulkString("lag"), + RespValue.Integer(2), + RespValue.bulkString("pel-count"), + RespValue.Integer(3), RespValue.bulkString("pending"), RespValue.array( RespValue.array( @@ -885,6 +938,8 @@ object OutputSpec extends BaseSpec { RespValue.bulkString("Alice"), RespValue.bulkString("seen-time"), RespValue.Integer(1588152520299L), + RespValue.bulkString("active-time"), + RespValue.Integer(1588152520299L), RespValue.bulkString("pel-count"), RespValue.Integer(1), RespValue.bulkString("pending"), @@ -908,12 +963,17 @@ object OutputSpec extends BaseSpec { 2, 3, "0-0", + "1-0", + 4, + "2-0", Chunk(StreamEntry("3-0", Map("key1" -> "value1")), StreamEntry("4-0", Map("key1" -> "value1"))), Chunk( StreamInfoWithFull.ConsumerGroups( "name1", "lastDeliveredId", 1, + 2, + 3, Chunk( StreamInfoWithFull.GroupPel("entryId1", "consumerName1", 1588152520299L.millis, 1L), StreamInfoWithFull.GroupPel("entryId2", "consumerName2", 1588152520299L.millis, 1L) @@ -922,6 +982,7 @@ object OutputSpec extends BaseSpec { StreamInfoWithFull.Consumers( "Alice", 1588152520299L.millis, + 1588152520299L.millis, 1, Chunk( StreamInfoWithFull.ConsumerPel("entryId3", 1588152520299L.millis, 1)