Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go: Add command XClaim #2920

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2969,3 +2969,79 @@ func (client *baseClient) XAck(key string, group string, ids []string) (int64, e
}
return handleIntResponse(result)
}

func (client *baseClient) XClaim(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
) (map[Result[string]][][]Result[string], error) {
result, err := client.executeCommand(
C.XClaim,
append([]string{key, group, consumer, utils.IntToString(minIdleTime)}, ids...),
)
if err != nil {
return nil, err
}
return handleStringToArrayOfStringArrayMapResponse(result)
}

func (client *baseClient) XClaimWithOptions(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
opts *options.StreamClaimOptions,
) (map[Result[string]][][]Result[string], error) {
args := append([]string{key, group, consumer, utils.IntToString(minIdleTime)}, ids...)
optionArgs, err := opts.ToArgs()
if err != nil {
return nil, err
}
args = append(args, optionArgs...)
result, err := client.executeCommand(C.XClaim, args)
if err != nil {
return nil, err
}
return handleStringToArrayOfStringArrayMapResponse(result)
}

func (client *baseClient) XClaimJustId(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
) ([]Result[string], error) {
args := append([]string{key, group, consumer, utils.IntToString(minIdleTime)}, ids...)
args = append(args, options.JUST_ID_VALKEY_API)
result, err := client.executeCommand(C.XClaim, args)
if err != nil {
return nil, err
}
return handleStringArrayResponse(result)
}

func (client *baseClient) XClaimJustIdWithOptions(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
opts *options.StreamClaimOptions,
) ([]Result[string], error) {
args := append([]string{key, group, consumer, utils.IntToString(minIdleTime)}, ids...)
optionArgs, err := opts.ToArgs()
if err != nil {
return nil, err
}
args = append(args, optionArgs...)
args = append(args, options.JUST_ID_VALKEY_API)
result, err := client.executeCommand(C.XClaim, args)
if err != nil {
return nil, err
}
return handleStringArrayResponse(result)
}
66 changes: 66 additions & 0 deletions go/api/options/stream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,69 @@ func (xgco *XGroupCreateOptions) ToArgs() ([]string, error) {

return args, nil
}

// Optional arguments for `XClaim` in [StreamCommands]
type StreamClaimOptions struct {
idleTime int64
idleUnixTime int64
retryCount int64
isForce bool
}

func NewStreamClaimOptions() *StreamClaimOptions {
return &StreamClaimOptions{}
}

// Set the idle time in milliseconds.
func (sco *StreamClaimOptions) SetIdleTime(idleTime int64) *StreamClaimOptions {
sco.idleTime = idleTime
return sco
}

// Set the idle time in unix-milliseconds.
func (sco *StreamClaimOptions) SetIdleUnixTime(idleUnixTime int64) *StreamClaimOptions {
sco.idleUnixTime = idleUnixTime
return sco
}

// Set the retry count.
func (sco *StreamClaimOptions) SetRetryCount(retryCount int64) *StreamClaimOptions {
sco.retryCount = retryCount
return sco
}

// Valkey API keywords for stream claim options
const (
// ValKey API string to designate IDLE time in milliseconds
IDLE_VALKEY_API string = "IDLE"
// ValKey API string to designate TIME time in unix-milliseconds
TIME_VALKEY_API string = "TIME"
// ValKey API string to designate RETRYCOUNT
RETRY_COUNT_VALKEY_API string = "RETRYCOUNT"
// ValKey API string to designate FORCE
FORCE_VALKEY_API string = "FORCE"
// ValKey API string to designate JUSTID
JUST_ID_VALKEY_API string = "JUSTID"
)

func (sco *StreamClaimOptions) ToArgs() ([]string, error) {
optionArgs := []string{}

if sco.idleTime > 0 {
optionArgs = append(optionArgs, IDLE_VALKEY_API, utils.IntToString(sco.idleTime))
}

if sco.idleUnixTime > 0 {
optionArgs = append(optionArgs, TIME_VALKEY_API, utils.IntToString(sco.idleUnixTime))
}

if sco.retryCount > 0 {
optionArgs = append(optionArgs, RETRY_COUNT_VALKEY_API, utils.IntToString(sco.retryCount))
}

if sco.isForce {
optionArgs = append(optionArgs, FORCE_VALKEY_API)
}

return optionArgs, nil
}
57 changes: 57 additions & 0 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,33 @@ func convertStringOrNilArray(response *C.struct_CommandResponse) ([]Result[strin
// array could be nillable, but strings - aren't
func convertStringArray(response *C.struct_CommandResponse, isNilable bool) ([]string, error) {
typeErr := checkResponseType(response, C.Array, isNilable)
func convertArrayOfArrayOfString(response *C.struct_CommandResponse) ([][]Result[string], error) {
typeErr := checkResponseType(response, C.Array, false)
if typeErr != nil {
return nil, typeErr
}

slice := make([][]Result[string], 0, response.array_value_len)
for _, v := range unsafe.Slice(response.array_value, response.array_value_len) {
res, err := convertStringArray(&v)
if err != nil {
return nil, err
}
slice = append(slice, res)
}
return slice, nil
}

func handleStringArrayResponse(response *C.struct_CommandResponse) ([]Result[string], error) {
defer C.free_command_response(response)

return convertStringArray(response)
}

func handleStringArrayOrNullResponse(response *C.struct_CommandResponse) ([]Result[string], error) {
defer C.free_command_response(response)

typeErr := checkResponseType(response, C.Array, true)
if typeErr != nil {
return nil, typeErr
}
Expand Down Expand Up @@ -437,6 +464,36 @@ func handleStringToStringArrayMapOrNilResponse(
return result, nil
}

return m, nil
}

func handleStringToArrayOfStringArrayMapResponse(
response *C.struct_CommandResponse,
) (map[Result[string]][][]Result[string], error) {
defer C.free_command_response(response)

typeErr := checkResponseType(response, C.Map, true)
if typeErr != nil {
return nil, typeErr
}

if response.response_type == C.Null {
return nil, nil
}

m := make(map[Result[string]][][]Result[string], response.array_value_len)
for _, v := range unsafe.Slice(response.array_value, response.array_value_len) {
key, err := convertCharArrayToString(v.map_key, true)
if err != nil {
return nil, err
}
value, err := convertArrayOfArrayOfString(v.map_value)
if err != nil {
return nil, err
}
m[key] = value
}

return nil, &RequestError{fmt.Sprintf("unexpected type received: %T", res)}
}

Expand Down
105 changes: 105 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,109 @@ type StreamCommands interface {
XGroupDelConsumer(key string, group string, consumer string) (int64, error)

XAck(key string, group string, ids []string) (int64, error)

// Changes the ownership of a pending message.
//
// See [valkey.io] for details.
//
// Parameters:
// key - The key of the stream.
// group - The name of the consumer group.
// consumer - The name of the consumer.
// minIdleTime - The minimum idle time in milliseconds.
// ids - The ids of the entries to claim.
//
// Return value:
// A `map of message entries with the format `{"entryId": [["entry", "data"], ...], ...}` that were claimed by
// the consumer.
//
// Example:
//
// [valkey.io]: https://valkey.io/commands/xclaim/
XClaim(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
) (map[Result[string]][][]Result[string], error)

// Changes the ownership of a pending message.
//
// See [valkey.io] for details.
//
// Parameters:
// key - The key of the stream.
// group - The name of the consumer group.
// consumer - The name of the consumer.
// minIdleTime - The minimum idle time in milliseconds.
// ids - The ids of the entries to claim.
// options - Stream claim options.
//
// Return value:
// A `map` of message entries with the format `{"entryId": [["entry", "data"], ...], ...}` that were claimed by
// the consumer.
//
// Example:
// result, err := ...
//
// [valkey.io]: https://valkey.io/commands/xclaim/
XClaimWithOptions(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
options *options.StreamClaimOptions,
) (map[Result[string]][][]Result[string], error)

// Changes the ownership of a pending message. This function returns an `array` with
// only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API.
//
// See [valkey.io] for details.
//
// Parameters:
// key - The key of the stream.
// group - The name of the consumer group.
// consumer - The name of the consumer.
// minIdleTime - The minimum idle time in milliseconds.
// ids - The ids of the entries to claim.
// options - Stream claim options.
//
// Return value:
// An array of the ids of the entries that were claimed by the consumer.
//
// Example:
// result, err := ...
// [valkey.io]: https://valkey.io/commands/xclaim/
XClaimJustId(key string, group string, consumer string, minIdleTime int64, ids []string) ([]Result[string], error)

// Changes the ownership of a pending message. This function returns an `array` with
// only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API.
//
// See [valkey.io] for details.
//
// Parameters:
// key - The key of the stream.
// group - The name of the consumer group.
// consumer - The name of the consumer.
// minIdleTime - The minimum idle time in milliseconds.
// ids - The ids of the entries to claim.
// options - Stream claim options.
//
// Return value:
// An array of the ids of the entries that were claimed by the consumer.
//
// Example:
// result, err := ...
//
// [valkey.io]: https://valkey.io/commands/xclaim/
XClaimJustIdWithOptions(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
options *options.StreamClaimOptions,
) ([]Result[string], error)
}
9 changes: 9 additions & 0 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6539,3 +6539,12 @@ func (suite *GlideTestSuite) TestXGroupStreamCommands() {
assert.IsType(suite.T(), &api.RequestError{}, err)
})
}

func (suite *GlideTestSuite) TestXClaim() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.New().String()
client.XAdd(key, [][]string{{"field1", "value1"}})

// TODO: finish test case using custom commands
})
}
Loading