From aac3cde4caa3ee2299e3c653047192fd32a41865 Mon Sep 17 00:00:00 2001 From: TJ Zhang Date: Mon, 6 Jan 2025 22:22:34 -0800 Subject: [PATCH] Go: Add command XClaim Signed-off-by: TJ Zhang --- go/api/base_client.go | 76 +++++++++++++++++++ go/api/options/stream_options.go | 66 +++++++++++++++++ go/api/response_handlers.go | 57 +++++++++++++++ go/api/stream_commands.go | 105 +++++++++++++++++++++++++++ go/integTest/shared_commands_test.go | 9 +++ 5 files changed, 313 insertions(+) diff --git a/go/api/base_client.go b/go/api/base_client.go index b05528fbcb..e2e6d0f83a 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -2969,3 +2969,79 @@ func (client *baseClient) XAck(key string, group string, ids []string) (int64, e } return handleIntResponse(result) } + +func (client *baseClient) XClaim( + key string, + group string, + consumer string, + minIdleTime int64, + ids []string, +) (map[Result[string]][][]Result[string], error) { + result, err := client.executeCommand( + C.XClaim, + append([]string{key, group, consumer, utils.IntToString(minIdleTime)}, ids...), + ) + if err != nil { + return nil, err + } + return handleStringToArrayOfStringArrayMapResponse(result) +} + +func (client *baseClient) XClaimWithOptions( + key string, + group string, + consumer string, + minIdleTime int64, + ids []string, + opts *options.StreamClaimOptions, +) (map[Result[string]][][]Result[string], error) { + args := append([]string{key, group, consumer, utils.IntToString(minIdleTime)}, ids...) + optionArgs, err := opts.ToArgs() + if err != nil { + return nil, err + } + args = append(args, optionArgs...) + result, err := client.executeCommand(C.XClaim, args) + if err != nil { + return nil, err + } + return handleStringToArrayOfStringArrayMapResponse(result) +} + +func (client *baseClient) XClaimJustId( + key string, + group string, + consumer string, + minIdleTime int64, + ids []string, +) ([]Result[string], error) { + args := append([]string{key, group, consumer, utils.IntToString(minIdleTime)}, ids...) + args = append(args, options.JUST_ID_VALKEY_API) + result, err := client.executeCommand(C.XClaim, args) + if err != nil { + return nil, err + } + return handleStringArrayResponse(result) +} + +func (client *baseClient) XClaimJustIdWithOptions( + key string, + group string, + consumer string, + minIdleTime int64, + ids []string, + opts *options.StreamClaimOptions, +) ([]Result[string], error) { + args := append([]string{key, group, consumer, utils.IntToString(minIdleTime)}, ids...) + optionArgs, err := opts.ToArgs() + if err != nil { + return nil, err + } + args = append(args, optionArgs...) + args = append(args, options.JUST_ID_VALKEY_API) + result, err := client.executeCommand(C.XClaim, args) + if err != nil { + return nil, err + } + return handleStringArrayResponse(result) +} diff --git a/go/api/options/stream_options.go b/go/api/options/stream_options.go index cb27269f3b..8d776e3a87 100644 --- a/go/api/options/stream_options.go +++ b/go/api/options/stream_options.go @@ -302,3 +302,69 @@ func (xgco *XGroupCreateOptions) ToArgs() ([]string, error) { return args, nil } + +// Optional arguments for `XClaim` in [StreamCommands] +type StreamClaimOptions struct { + idleTime int64 + idleUnixTime int64 + retryCount int64 + isForce bool +} + +func NewStreamClaimOptions() *StreamClaimOptions { + return &StreamClaimOptions{} +} + +// Set the idle time in milliseconds. +func (sco *StreamClaimOptions) SetIdleTime(idleTime int64) *StreamClaimOptions { + sco.idleTime = idleTime + return sco +} + +// Set the idle time in unix-milliseconds. +func (sco *StreamClaimOptions) SetIdleUnixTime(idleUnixTime int64) *StreamClaimOptions { + sco.idleUnixTime = idleUnixTime + return sco +} + +// Set the retry count. +func (sco *StreamClaimOptions) SetRetryCount(retryCount int64) *StreamClaimOptions { + sco.retryCount = retryCount + return sco +} + +// Valkey API keywords for stream claim options +const ( + // ValKey API string to designate IDLE time in milliseconds + IDLE_VALKEY_API string = "IDLE" + // ValKey API string to designate TIME time in unix-milliseconds + TIME_VALKEY_API string = "TIME" + // ValKey API string to designate RETRYCOUNT + RETRY_COUNT_VALKEY_API string = "RETRYCOUNT" + // ValKey API string to designate FORCE + FORCE_VALKEY_API string = "FORCE" + // ValKey API string to designate JUSTID + JUST_ID_VALKEY_API string = "JUSTID" +) + +func (sco *StreamClaimOptions) ToArgs() ([]string, error) { + optionArgs := []string{} + + if sco.idleTime > 0 { + optionArgs = append(optionArgs, IDLE_VALKEY_API, utils.IntToString(sco.idleTime)) + } + + if sco.idleUnixTime > 0 { + optionArgs = append(optionArgs, TIME_VALKEY_API, utils.IntToString(sco.idleUnixTime)) + } + + if sco.retryCount > 0 { + optionArgs = append(optionArgs, RETRY_COUNT_VALKEY_API, utils.IntToString(sco.retryCount)) + } + + if sco.isForce { + optionArgs = append(optionArgs, FORCE_VALKEY_API) + } + + return optionArgs, nil +} diff --git a/go/api/response_handlers.go b/go/api/response_handlers.go index dd7fbe2712..eb54ff294a 100644 --- a/go/api/response_handlers.go +++ b/go/api/response_handlers.go @@ -189,6 +189,33 @@ func convertStringOrNilArray(response *C.struct_CommandResponse) ([]Result[strin // array could be nillable, but strings - aren't func convertStringArray(response *C.struct_CommandResponse, isNilable bool) ([]string, error) { typeErr := checkResponseType(response, C.Array, isNilable) +func convertArrayOfArrayOfString(response *C.struct_CommandResponse) ([][]Result[string], error) { + typeErr := checkResponseType(response, C.Array, false) + if typeErr != nil { + return nil, typeErr + } + + slice := make([][]Result[string], 0, response.array_value_len) + for _, v := range unsafe.Slice(response.array_value, response.array_value_len) { + res, err := convertStringArray(&v) + if err != nil { + return nil, err + } + slice = append(slice, res) + } + return slice, nil +} + +func handleStringArrayResponse(response *C.struct_CommandResponse) ([]Result[string], error) { + defer C.free_command_response(response) + + return convertStringArray(response) +} + +func handleStringArrayOrNullResponse(response *C.struct_CommandResponse) ([]Result[string], error) { + defer C.free_command_response(response) + + typeErr := checkResponseType(response, C.Array, true) if typeErr != nil { return nil, typeErr } @@ -437,6 +464,36 @@ func handleStringToStringArrayMapOrNilResponse( return result, nil } + return m, nil +} + +func handleStringToArrayOfStringArrayMapResponse( + response *C.struct_CommandResponse, +) (map[Result[string]][][]Result[string], error) { + defer C.free_command_response(response) + + typeErr := checkResponseType(response, C.Map, true) + if typeErr != nil { + return nil, typeErr + } + + if response.response_type == C.Null { + return nil, nil + } + + m := make(map[Result[string]][][]Result[string], response.array_value_len) + for _, v := range unsafe.Slice(response.array_value, response.array_value_len) { + key, err := convertCharArrayToString(v.map_key, true) + if err != nil { + return nil, err + } + value, err := convertArrayOfArrayOfString(v.map_value) + if err != nil { + return nil, err + } + m[key] = value + } + return nil, &RequestError{fmt.Sprintf("unexpected type received: %T", res)} } diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index c5c62e8a13..58a434f2ad 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -158,4 +158,109 @@ type StreamCommands interface { XGroupDelConsumer(key string, group string, consumer string) (int64, error) XAck(key string, group string, ids []string) (int64, error) + + // Changes the ownership of a pending message. + // + // See [valkey.io] for details. + // + // Parameters: + // key - The key of the stream. + // group - The name of the consumer group. + // consumer - The name of the consumer. + // minIdleTime - The minimum idle time in milliseconds. + // ids - The ids of the entries to claim. + // + // Return value: + // A `map of message entries with the format `{"entryId": [["entry", "data"], ...], ...}` that were claimed by + // the consumer. + // + // Example: + // + // [valkey.io]: https://valkey.io/commands/xclaim/ + XClaim( + key string, + group string, + consumer string, + minIdleTime int64, + ids []string, + ) (map[Result[string]][][]Result[string], error) + + // Changes the ownership of a pending message. + // + // See [valkey.io] for details. + // + // Parameters: + // key - The key of the stream. + // group - The name of the consumer group. + // consumer - The name of the consumer. + // minIdleTime - The minimum idle time in milliseconds. + // ids - The ids of the entries to claim. + // options - Stream claim options. + // + // Return value: + // A `map` of message entries with the format `{"entryId": [["entry", "data"], ...], ...}` that were claimed by + // the consumer. + // + // Example: + // result, err := ... + // + // [valkey.io]: https://valkey.io/commands/xclaim/ + XClaimWithOptions( + key string, + group string, + consumer string, + minIdleTime int64, + ids []string, + options *options.StreamClaimOptions, + ) (map[Result[string]][][]Result[string], error) + + // Changes the ownership of a pending message. This function returns an `array` with + // only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API. + // + // See [valkey.io] for details. + // + // Parameters: + // key - The key of the stream. + // group - The name of the consumer group. + // consumer - The name of the consumer. + // minIdleTime - The minimum idle time in milliseconds. + // ids - The ids of the entries to claim. + // options - Stream claim options. + // + // Return value: + // An array of the ids of the entries that were claimed by the consumer. + // + // Example: + // result, err := ... + // [valkey.io]: https://valkey.io/commands/xclaim/ + XClaimJustId(key string, group string, consumer string, minIdleTime int64, ids []string) ([]Result[string], error) + + // Changes the ownership of a pending message. This function returns an `array` with + // only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API. + // + // See [valkey.io] for details. + // + // Parameters: + // key - The key of the stream. + // group - The name of the consumer group. + // consumer - The name of the consumer. + // minIdleTime - The minimum idle time in milliseconds. + // ids - The ids of the entries to claim. + // options - Stream claim options. + // + // Return value: + // An array of the ids of the entries that were claimed by the consumer. + // + // Example: + // result, err := ... + // + // [valkey.io]: https://valkey.io/commands/xclaim/ + XClaimJustIdWithOptions( + key string, + group string, + consumer string, + minIdleTime int64, + ids []string, + options *options.StreamClaimOptions, + ) ([]Result[string], error) } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index adc95558a0..b16033b868 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -6539,3 +6539,12 @@ func (suite *GlideTestSuite) TestXGroupStreamCommands() { assert.IsType(suite.T(), &api.RequestError{}, err) }) } + +func (suite *GlideTestSuite) TestXClaim() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key := uuid.New().String() + client.XAdd(key, [][]string{{"field1", "value1"}}) + + // TODO: finish test case using custom commands + }) +}