Skip to content

Commit

Permalink
Eval refactoring for multithreading (DiceDB#634)
Browse files Browse the repository at this point in the history
* support for eval refactoring for multithreading

* fixed errors

* review comments

* review comments

* eval changes

* fixed multithreading error issue

* linter fixes

* Update eval_test.go

* fixed unit tests

* fix unit tests

* review comments

* more refactoring

* added shard numbers to num core
  • Loading branch information
AshwinKul28 authored Sep 21, 2024
1 parent a3b1cb7 commit 7eae156
Show file tree
Hide file tree
Showing 15 changed files with 932 additions and 473 deletions.
81 changes: 52 additions & 29 deletions internal/eval/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@ type DiceCmdMeta struct {
Eval func([]string, *dstore.Store) []byte
Arity int // number of arguments, it is possible to use -N to say >= N
KeySpecs

// IsMigrated indicates whether a command has been migrated to a new evaluation
// mechanism. If true, the command uses the newer evaluation logic represented by
// the NewEval function. This allows backward compatibility for commands that have
// not yet been migrated, ensuring they continue to use the older Eval function.
// As part of the transition process, commands can be flagged with IsMigrated to
// signal that they are using the updated execution path.
IsMigrated bool

// NewEval is the newer evaluation function for commands. It follows an updated
// execution model that returns an EvalResponse struct, offering more structured
// and detailed results, including metadata such as errors and additional info,
// instead of just raw bytes. Commands that have been migrated to this new model
// will utilize this function for evaluation, allowing for better handling of
// complex command execution scenarios and improved response consistency.
NewEval func([]string, *dstore.Store) EvalResponse
}

type KeySpecs struct {
Expand All @@ -19,24 +35,20 @@ type KeySpecs struct {
var (
DiceCmds = map[string]DiceCmdMeta{}

pingCmdMeta = DiceCmdMeta{
Name: "PING",
Info: `PING returns with an encoded "PONG" If any message is added with the ping command,the message will be returned.`,
Eval: evalPING,
Arity: -1,
}
echoCmdMeta = DiceCmdMeta{
Name: "ECHO",
Info: `ECHO returns the string given as argument.`,
Eval: evalECHO,
Arity: 1,
Name: "ECHO",
Info: `ECHO returns the string given as argument.`,
Eval: evalECHO,
Arity: 1,
}
authCmdMeta = DiceCmdMeta{
Name: "AUTH",
Info: `AUTH returns with an encoded "OK" if the user is authenticated.
If the user is not authenticated, it returns with an encoded error message`,
Eval: nil,

pingCmdMeta = DiceCmdMeta{
Name: "PING",
Info: `PING returns with an encoded "PONG" If any message is added with the ping command,the message will be returned.`,
Arity: -1,
IsMigrated: true,
}

setCmdMeta = DiceCmdMeta{
Name: "SET",
Info: `SET puts a new <key, value> pair in db as in the args
Expand All @@ -47,19 +59,36 @@ var (
Returns encoded error response if expiry tme value in not integer
Returns encoded OK RESP once new entry is added
If the key already exists then the value will be overwritten and expiry will be discarded`,
Eval: evalSET,
Arity: -3,
KeySpecs: KeySpecs{BeginIndex: 1},
Arity: -3,
KeySpecs: KeySpecs{BeginIndex: 1},
IsMigrated: true,
NewEval: evalSET,
}
getCmdMeta = DiceCmdMeta{
Name: "GET",
Info: `GET returns the value for the queried key in args
The key should be the only param in args
The RESP value of the key is encoded and then returned
GET returns RespNIL if key is expired or it does not exist`,
Eval: evalGET,
Arity: 2,
KeySpecs: KeySpecs{BeginIndex: 1},
Arity: 2,
KeySpecs: KeySpecs{BeginIndex: 1},
IsMigrated: true,
NewEval: evalGET,
}

getSetCmdMeta = DiceCmdMeta{
Name: "GETSET",
Info: `GETSET returns the previous string value of a key after setting it to a new value.`,
Arity: 2,
IsMigrated: true,
NewEval: evalGETSET,
}

authCmdMeta = DiceCmdMeta{
Name: "AUTH",
Info: `AUTH returns with an encoded "OK" if the user is authenticated.
If the user is not authenticated, it returns with an encoded error message`,
Eval: nil,
}
getDelCmdMeta = DiceCmdMeta{
Name: "GETDEL",
Expand Down Expand Up @@ -626,12 +655,6 @@ var (
Eval: evalDBSIZE,
Arity: 1,
}
getSetCmdMeta = DiceCmdMeta{
Name: "GETSET",
Info: `GETSET returns the previous string value of a key after setting it to a new value.`,
Eval: evalGETSET,
Arity: 2,
}
flushdbCmdMeta = DiceCmdMeta{
Name: "FLUSHDB",
Info: `FLUSHDB deletes all the keys of the currently selected DB`,
Expand Down Expand Up @@ -876,15 +899,15 @@ func init() {
}

// Function to convert DiceCmdMeta to []interface{}
func convertCmdMetaToSlice(cmdMeta DiceCmdMeta) []interface{} {
func convertCmdMetaToSlice(cmdMeta *DiceCmdMeta) []interface{} {
return []interface{}{cmdMeta.Name, cmdMeta.Arity, cmdMeta.KeySpecs.BeginIndex, cmdMeta.KeySpecs.LastKey, cmdMeta.KeySpecs.Step}
}

// Function to convert map[string]DiceCmdMeta{} to []interface{}
func convertDiceCmdsMapToSlice() []interface{} {
var result []interface{}
for _, cmdMeta := range DiceCmds {
result = append(result, convertCmdMetaToSlice(cmdMeta))
result = append(result, convertCmdMetaToSlice(&cmdMeta))
}
return result
}
222 changes: 5 additions & 217 deletions internal/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ var (
diceCommandsCount int
)

type EvalResponse struct {
Result interface{} // Result of the Store operation, for now the type is set to []byte, but this can change in the future.
Error error
}
type jsonOperation string

const (
Expand All @@ -60,26 +64,7 @@ func init() {
serverID = fmt.Sprintf("%s:%d", config.DiceConfig.Server.Addr, config.DiceConfig.Server.Port)
}

// evalPING returns with an encoded "PONG"
// If any message is added with the ping command,
// the message will be returned.
func evalPING(args []string, store *dstore.Store) []byte {
var b []byte

if len(args) >= 2 {
return diceerrors.NewErrArity("PING")
}

if len(args) == 0 {
b = clientio.Encode("PONG", true)
} else {
b = clientio.Encode(args[0], false)
}

return b
}

// evalECHO returns the argument passed by the user
// evalECHO returns the argument passed by the user
func evalECHO(args []string, store *dstore.Store) []byte {
if len(args) != 1 {
return diceerrors.NewErrArity("ECHO")
Expand Down Expand Up @@ -114,127 +99,6 @@ func EvalAUTH(args []string, c *comm.Client) []byte {
return clientio.RespOK
}

// evalSET puts a new <key, value> pair in db as in the args
// args must contain key and value.
// args can also contain multiple options -
//
// EX or ex which will set the expiry time(in secs) for the key
// PX or px which will set the expiry time(in milliseconds) for the key
// EXAT or exat which will set the specified Unix time at which the key will expire, in seconds (a positive integer).
// PXAT or PX which will the specified Unix time at which the key will expire, in milliseconds (a positive integer).
// XX orr xx which will only set the key if it already exists.
//
// Returns encoded error response if at least a <key, value> pair is not part of args
// Returns encoded error response if expiry time value in not integer
// Returns encoded error response if both PX and EX flags are present
// Returns encoded OK RESP once new entry is added
// If the key already exists then the value will be overwritten and expiry will be discarded
func evalSET(args []string, store *dstore.Store) []byte {
if len(args) <= 1 {
return diceerrors.NewErrArity("SET")
}

var key, value string
var exDurationMs int64 = -1
var state exDurationState = Uninitialized
var keepttl bool = false

key, value = args[0], args[1]
oType, oEnc := deduceTypeEncoding(value)

for i := 2; i < len(args); i++ {
arg := strings.ToUpper(args[i])
switch arg {
case Ex, Px:
if state != Uninitialized {
return diceerrors.NewErrWithMessage(diceerrors.SyntaxErr)
}
i++
if i == len(args) {
return diceerrors.NewErrWithMessage(diceerrors.SyntaxErr)
}

exDuration, err := strconv.ParseInt(args[i], 10, 64)
if err != nil {
return diceerrors.NewErrWithMessage(diceerrors.IntOrOutOfRangeErr)
}

if exDuration <= 0 || exDuration >= maxExDuration {
return diceerrors.NewErrExpireTime("SET")
}

// converting seconds to milliseconds
if arg == Ex {
exDuration *= 1000
}
exDurationMs = exDuration
state = Initialized

case Pxat, Exat:
if state != Uninitialized {
return diceerrors.NewErrWithMessage(diceerrors.SyntaxErr)
}
i++
if i == len(args) {
return diceerrors.NewErrWithMessage(diceerrors.SyntaxErr)
}
exDuration, err := strconv.ParseInt(args[i], 10, 64)
if err != nil {
return diceerrors.NewErrWithMessage(diceerrors.IntOrOutOfRangeErr)
}

if exDuration < 0 {
return diceerrors.NewErrExpireTime("SET")
}

if arg == Exat {
exDuration *= 1000
}
exDurationMs = exDuration - utils.GetCurrentTime().UnixMilli()
// If the expiry time is in the past, set exDurationMs to 0
// This will be used to signal immediate expiration
if exDurationMs < 0 {
exDurationMs = 0
}
state = Initialized

case XX:
// Get the key from the hash table
obj := store.Get(key)

// if key does not exist, return RESP encoded nil
if obj == nil {
return clientio.RespNIL
}
case NX:
obj := store.Get(key)
if obj != nil {
return clientio.RespNIL
}
case KEEPTTL, Keepttl:
keepttl = true
default:
return diceerrors.NewErrWithMessage(diceerrors.SyntaxErr)
}
}

// Cast the value properly based on the encoding type
var storedValue interface{}
switch oEnc {
case object.ObjEncodingInt:
storedValue, _ = strconv.ParseInt(value, 10, 64)
case object.ObjEncodingEmbStr, object.ObjEncodingRaw:
storedValue = value
default:
return clientio.Encode(fmt.Errorf("ERR unsupported encoding: %d", oEnc), false)
}

// putting the k and value in a Hash Table
store.Put(key, store.NewObj(storedValue, exDurationMs, oType, oEnc), dstore.WithKeepTTL(keepttl))

return clientio.RespOK
}

// evalMSET puts multiple <key, value> pairs in db as in the args
// MSET is atomic, so all given keys are set at once.
// args must contain key and value pairs.
Expand Down Expand Up @@ -270,52 +134,6 @@ func evalMSET(args []string, store *dstore.Store) []byte {
return clientio.RespOK
}

// evalGET returns the value for the queried key in args
// The key should be the only param in args
// The RESP value of the key is encoded and then returned
// evalGET returns response.RespNIL if key is expired or it does not exist
func evalGET(args []string, store *dstore.Store) []byte {
if len(args) != 1 {
return diceerrors.NewErrArity("GET")
}

key := args[0]

obj := store.Get(key)

// if key does not exist, return RESP encoded nil
if obj == nil {
return clientio.RespNIL
}

// Decode and return the value based on its encoding
switch _, oEnc := object.ExtractTypeEncoding(obj); oEnc {
case object.ObjEncodingInt:
// Value is stored as an int64, so use type assertion
if val, ok := obj.Value.(int64); ok {
return clientio.Encode(val, false)
}
return diceerrors.NewErrWithFormattedMessage("expected int64 but got another type: %s", obj.Value)

case object.ObjEncodingEmbStr, object.ObjEncodingRaw:
// Value is stored as a string, use type assertion
if val, ok := obj.Value.(string); ok {
return clientio.Encode(val, false)
}
return diceerrors.NewErrWithMessage("expected string but got another type")

case object.ObjEncodingByteArray:
// Value is stored as a bytearray, use type assertion
if val, ok := obj.Value.(*ByteArray); ok {
return clientio.Encode(string(val.data), false)
}
return diceerrors.NewErrWithMessage(diceerrors.WrongTypeErr)

default:
return diceerrors.NewErrWithMessage(diceerrors.WrongTypeErr)
}
}

// evalDBSIZE returns the number of keys in the database.
func evalDBSIZE(args []string, store *dstore.Store) []byte {
if len(args) > 0 {
Expand Down Expand Up @@ -3050,36 +2868,6 @@ func evalLLEN(args []string, store *dstore.Store) []byte {
return clientio.Encode(deq.Length, false)
}

// GETSET atomically sets key to value and returns the old value stored at key.
// Returns an error when key exists but does not hold a string value.
// Any previous time to live associated with the key is
// discarded on successful SET operation.
//
// Returns:
// Bulk string reply: the old value stored at the key.
// Nil reply: if the key does not exist.
func evalGETSET(args []string, store *dstore.Store) []byte {
if len(args) != 2 {
return diceerrors.NewErrArity("GETSET")
}

key, value := args[0], args[1]
getResp := evalGET([]string{key}, store)
// Check if it's an error resp from GET
if strings.HasPrefix(string(getResp), "-") {
return getResp
}

// Previous TTL needs to be reset
setResp := evalSET([]string{key, value}, store)
// Check if it's an error resp from SET
if strings.HasPrefix(string(setResp), "-") {
return setResp
}

return getResp
}

func evalFLUSHDB(args []string, store *dstore.Store) []byte {
slog.Info("FLUSHDB called", slog.Any("args", args))
if len(args) > 1 {
Expand Down
Loading

0 comments on commit 7eae156

Please sign in to comment.