Skip to content

Commit

Permalink
add functions support (#850)
Browse files Browse the repository at this point in the history
* add functions support

* add functions support. fix build for 2.12

---------

Co-authored-by: Roman Makurin <[email protected]>
  • Loading branch information
Ravenow and Roman Makurin authored Mar 6, 2024
1 parent 7cb3c76 commit f3c5248
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package dev.profunktor.redis4cats.algebra

import dev.profunktor.redis4cats.effects.ScriptOutputType
import dev.profunktor.redis4cats.effects.{ FlushMode, FunctionRestoreMode, ScriptOutputType }

trait ScriptCommands[F[_], K, V] extends Scripting[F, K, V]
trait ScriptCommands[F[_], K, V] extends Scripting[F, K, V] with Functions[F, K, V]

trait Scripting[F[_], K, V] {
// these methods don't use varargs as they cause problems with type inference, see:
Expand All @@ -35,3 +35,19 @@ trait Scripting[F[_], K, V] {
def scriptFlush: F[Unit]
def digest(script: String): F[String]
}

trait Functions[F[_], K, V] {
def fcall(function: String, output: ScriptOutputType[V], keys: List[K]): F[output.R]
def fcall(function: String, output: ScriptOutputType[V], keys: List[K], values: List[V]): F[output.R]
def fcallReadOnly(function: String, output: ScriptOutputType[V], keys: List[K]): F[output.R]
def fcallReadOnly(function: String, output: ScriptOutputType[V], keys: List[K], values: List[V]): F[output.R]
def functionLoad(functionCode: String): F[String]
def functionLoad(functionCode: String, replace: Boolean): F[String]
def functionDump(): F[Array[Byte]]
def functionRestore(dump: Array[Byte]): F[String]
def functionRestore(dump: Array[Byte], mode: FunctionRestoreMode): F[String]
def functionFlush(flushMode: FlushMode): F[String]
def functionKill(): F[String]
def functionList(): F[List[Map[String, Any]]]
def functionList(libraryName: String): F[List[Map[String, Any]]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ object effects {
override private[redis4cats] def convert(in: java.util.List[V]): List[V] = in.asScala.toList
}

def Status[V]: ScriptOutputType.Aux[V, Unit] = new ScriptOutputType[V] {
type R = Unit
def Status[V]: ScriptOutputType.Aux[V, String] = new ScriptOutputType[V] {
type R = String
private[redis4cats] type Underlying = String
override private[redis4cats] val outputType = JScriptOutputType.STATUS
override private[redis4cats] def convert(in: String): Unit = ()
override private[redis4cats] val outputType = JScriptOutputType.STATUS
override private[redis4cats] def convert(in: String): String = in
}
}

Expand All @@ -104,6 +104,19 @@ object effects {
def apply(`match`: String, count: Long): ScanArgs = ScanArgs(Some(`match`), Some(count))
}

sealed trait FlushMode
object FlushMode {
case object Sync extends FlushMode
case object Async extends FlushMode
}

sealed trait FunctionRestoreMode
object FunctionRestoreMode {
case object Append extends FunctionRestoreMode
case object Flush extends FunctionRestoreMode
case object Replace extends FunctionRestoreMode
}

sealed trait GetExArg
object GetExArg {

Expand Down
107 changes: 99 additions & 8 deletions modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package dev.profunktor.redis4cats

import java.time.Instant
import java.util.concurrent.TimeUnit
import cats._
import cats.data.NonEmptyList
import cats.effect.kernel._
Expand All @@ -27,10 +25,13 @@ import dev.profunktor.redis4cats.algebra.BitCommandOperation.Overflows
import dev.profunktor.redis4cats.config.Redis4CatsConfig
import dev.profunktor.redis4cats.connection._
import dev.profunktor.redis4cats.data._
import dev.profunktor.redis4cats.effect._
import dev.profunktor.redis4cats.effect.FutureLift._
import dev.profunktor.redis4cats.effect._
import dev.profunktor.redis4cats.effects._
import dev.profunktor.redis4cats.tx.{ TransactionDiscarded, TxRunner, TxStore }
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands
import io.lettuce.core.cluster.api.sync.{ RedisClusterCommands => RedisClusterSyncCommands }
import io.lettuce.core.{
BitFieldArgs,
ClientOptions,
Expand All @@ -42,18 +43,19 @@ import io.lettuce.core.{
ZAddArgs,
ZAggregateArgs,
ZStoreArgs,
ExpireArgs => JExpireArgs,
FlushMode => JFlushMode,
FunctionRestoreMode => JFunctionRestoreMode,
GetExArgs => JGetExArgs,
Limit => JLimit,
Range => JRange,
ReadFrom => JReadFrom,
ScanCursor => JScanCursor,
SetArgs => JSetArgs,
ExpireArgs => JExpireArgs
SetArgs => JSetArgs
}
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands
import io.lettuce.core.cluster.api.sync.{ RedisClusterCommands => RedisClusterSyncCommands }

import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._

object Redis {
Expand Down Expand Up @@ -1289,6 +1291,95 @@ private[redis4cats] class BaseRedis[F[_]: FutureLift: MonadThrow: Log, K, V](
override def digest(script: String): F[String] =
async.map(_.digest(script))

override def fcall(function: String, output: ScriptOutputType[V], keys: List[K]): F[output.R] =
async.flatMap(
_.fcall[output.Underlying](
function,
output.outputType,
keys: _*
).futureLift.map(output.convert(_))
)

override def fcall(function: String, output: ScriptOutputType[V], keys: List[K], values: List[V]): F[output.R] =
async.flatMap(
_.fcall[output.Underlying](
function,
output.outputType,
// The Object requirement comes from the limitations of Java Generics. It is safe to assume K <: Object as
// the underlying JRedisCodec would also only support K <: Object.
keys.toArray[Any].asInstanceOf[Array[K with Object]],
values: _*
).futureLift.map(output.convert(_))
)

override def fcallReadOnly(function: String, output: ScriptOutputType[V], keys: List[K]): F[output.R] =
async.flatMap(
_.fcallReadOnly[output.Underlying](
function,
output.outputType,
keys: _*
).futureLift.map(output.convert(_))
)

override def fcallReadOnly(
function: String,
output: ScriptOutputType[V],
keys: List[K],
values: List[V]
): F[output.R] =
async.flatMap(
_.fcallReadOnly[output.Underlying](
function,
output.outputType,
// The Object requirement comes from the limitations of Java Generics. It is safe to assume K <: Object as
// the underlying JRedisCodec would also only support K <: Object.
keys.toArray[Any].asInstanceOf[Array[K with Object]],
values: _*
).futureLift.map(output.convert(_))
)

override def functionLoad(functionCode: String): F[String] =
async.flatMap(_.functionLoad(functionCode).futureLift)

override def functionLoad(functionCode: String, replace: Boolean): F[String] =
async.flatMap(_.functionLoad(functionCode, replace).futureLift)

override def functionDump(): F[Array[Byte]] =
async.flatMap(_.functionDump().futureLift)

override def functionRestore(dump: Array[Byte]): F[String] =
async.flatMap(_.functionRestore(dump).futureLift)

override def functionRestore(dump: Array[Byte], mode: FunctionRestoreMode): F[String] = {
val jMode = mode match {
case FunctionRestoreMode.Flush => JFunctionRestoreMode.FLUSH
case FunctionRestoreMode.Append => JFunctionRestoreMode.APPEND
case FunctionRestoreMode.Replace => JFunctionRestoreMode.REPLACE
}
async.flatMap(_.functionRestore(dump, jMode).futureLift)
}

override def functionFlush(flushMode: FlushMode): F[String] = {
val jFlushMode = flushMode match {
case FlushMode.Sync => JFlushMode.SYNC
case FlushMode.Async => JFlushMode.ASYNC
}
async.flatMap(_.functionFlush(jFlushMode).futureLift)
}

override def functionKill(): F[String] =
async.flatMap(_.functionKill().futureLift)

override def functionList(): F[List[Map[String, Any]]] =
async
.flatMap(_.functionList().futureLift)
.map(_.asScala.map(_.asScala.toMap).toList)

override def functionList(libraryName: String): F[List[Map[String, Any]]] =
async
.flatMap(_.functionList(libraryName).futureLift)
.map(_.asScala.map(_.asScala.toMap).toList)

/** ***************************** HyperLoglog API **********************************/
override def pfAdd(key: K, values: V*): F[Long] =
async.flatMap(_.pfadd(key, values: _*).futureLift.map(Long.box(_)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class RedisClusterSpec extends Redis4CatsFunSuite(true) with TestScenarios {

test("cluster: scripts")(withRedis(scriptsScenario))

test("cluster: functions")(withRedis(functionsScenario))

test("cluster: hyperloglog api")(withRedis(hyperloglogScenario))

// FIXME: The Cluster impl cannot connect to a single node just yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class RedisSpec extends Redis4CatsFunSuite(false) with TestScenarios {

test("scripts")(withRedis(scriptsScenario))

test("functions")(withRedis(functionsScenario))

test("hyperloglog api")(withRedis(hyperloglogScenario))

test("pattern key sub")(withRedisClient(keyPatternSubScenario))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,23 @@

package dev.profunktor.redis4cats

import java.time.Instant

import scala.concurrent.duration._

import cats.data.NonEmptyList
import cats.effect._
import cats.implicits._
import fs2.Stream

import dev.profunktor.redis4cats.algebra.BitCommandOperation.{ IncrUnsignedBy, SetUnsigned }
import dev.profunktor.redis4cats.algebra.BitCommands
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.data._
import dev.profunktor.redis4cats.effects._
import dev.profunktor.redis4cats.pubsub.PubSub
import dev.profunktor.redis4cats.tx._
import io.lettuce.core.{ GeoArgs, RedisException, ZAggregateArgs }
import fs2.Stream
import io.lettuce.core.{ GeoArgs, RedisCommandExecutionException, RedisException, ZAggregateArgs }
import munit.FunSuite

import java.time.Instant
import scala.concurrent.duration._

trait TestScenarios { self: FunSuite =>

def locationScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = {
Expand Down Expand Up @@ -610,6 +608,48 @@ trait TestScenarios { self: FunSuite =>
} yield ()
}

def functionsScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = {
val myFunc =
"""#!lua name=mylib
| redis.register_function('myfunc', function(keys, args) return args[1] end)
| """.stripMargin

val myEcho =
"""#!lua name=mylib_2
| local function my_echo(keys, args)
| return args[1]
| end
| redis.register_function{function_name='my_echo',callback=my_echo, flags={ 'no-writes' }}
| """.stripMargin

for {
_ <- redis.functionFlush(FlushMode.Sync)
_ <- redis.functionLoad(myFunc)
_ <- redis.functionLoad(myFunc).recover({ case _: RedisCommandExecutionException => "" })
_ <- redis.functionLoad(myFunc, replace = true)
fcallResult <- redis.fcall("myfunc", ScriptOutputType.Status, List("key"), List("Hello"))
_ <- IO(assertEquals(fcallResult, "Hello"))
_ <- redis.functionFlush(FlushMode.Sync)
_ <- redis.functionLoad(myEcho)
fcallReadOnlyResult <- redis.fcallReadOnly("my_echo", ScriptOutputType.Status, List("key"), List("Hello"))
_ <- IO(assertEquals(fcallReadOnlyResult, "Hello"))
_ <- redis.functionFlush(FlushMode.Sync)
_ <- redis.functionLoad(myFunc)
dump <- redis.functionDump()
_ <- redis.functionFlush(FlushMode.Sync)
_ <- redis.functionRestore(dump)
fcallRestoreResult <- redis.fcall("myfunc", ScriptOutputType.Status, List("key"), List("Hello"))
_ <- IO(assertEquals(fcallRestoreResult, "Hello"))
_ <- redis.functionFlush(FlushMode.Sync)
listResult <- redis.functionList()
_ = assertEquals(listResult.size, 0)
_ <- redis.functionLoad(myFunc)
_ <- redis.functionLoad(myEcho)
listResult <- redis.functionList()
_ = assertEquals(listResult.size, 2)
} yield ()
}

def hyperloglogScenario(redis: RedisCommands[IO, String, String]): IO[Unit] = {
val key = "hll"
val key2 = "hll2"
Expand Down

0 comments on commit f3c5248

Please sign in to comment.