Skip to content

Commit

Permalink
Go: Add command XClaim
Browse files Browse the repository at this point in the history
Signed-off-by: TJ Zhang <[email protected]>
  • Loading branch information
TJ Zhang committed Jan 21, 2025
1 parent 4050baf commit aac3cde
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 0 deletions.
76 changes: 76 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2969,3 +2969,79 @@ func (client *baseClient) XAck(key string, group string, ids []string) (int64, e
}
return handleIntResponse(result)
}

func (client *baseClient) XClaim(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
) (map[Result[string]][][]Result[string], error) {
result, err := client.executeCommand(
C.XClaim,
append([]string{key, group, consumer, utils.IntToString(minIdleTime)}, ids...),
)
if err != nil {
return nil, err
}
return handleStringToArrayOfStringArrayMapResponse(result)
}

func (client *baseClient) XClaimWithOptions(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
opts *options.StreamClaimOptions,
) (map[Result[string]][][]Result[string], error) {
args := append([]string{key, group, consumer, utils.IntToString(minIdleTime)}, ids...)
optionArgs, err := opts.ToArgs()
if err != nil {
return nil, err
}
args = append(args, optionArgs...)
result, err := client.executeCommand(C.XClaim, args)
if err != nil {
return nil, err
}
return handleStringToArrayOfStringArrayMapResponse(result)
}

func (client *baseClient) XClaimJustId(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
) ([]Result[string], error) {
args := append([]string{key, group, consumer, utils.IntToString(minIdleTime)}, ids...)
args = append(args, options.JUST_ID_VALKEY_API)
result, err := client.executeCommand(C.XClaim, args)
if err != nil {
return nil, err
}
return handleStringArrayResponse(result)
}

func (client *baseClient) XClaimJustIdWithOptions(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
opts *options.StreamClaimOptions,
) ([]Result[string], error) {
args := append([]string{key, group, consumer, utils.IntToString(minIdleTime)}, ids...)
optionArgs, err := opts.ToArgs()
if err != nil {
return nil, err
}
args = append(args, optionArgs...)
args = append(args, options.JUST_ID_VALKEY_API)
result, err := client.executeCommand(C.XClaim, args)
if err != nil {
return nil, err
}
return handleStringArrayResponse(result)
}
66 changes: 66 additions & 0 deletions go/api/options/stream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,69 @@ func (xgco *XGroupCreateOptions) ToArgs() ([]string, error) {

return args, nil
}

// Optional arguments for `XClaim` in [StreamCommands]
type StreamClaimOptions struct {
idleTime int64
idleUnixTime int64
retryCount int64
isForce bool
}

func NewStreamClaimOptions() *StreamClaimOptions {
return &StreamClaimOptions{}
}

// Set the idle time in milliseconds.
func (sco *StreamClaimOptions) SetIdleTime(idleTime int64) *StreamClaimOptions {
sco.idleTime = idleTime
return sco
}

// Set the idle time in unix-milliseconds.
func (sco *StreamClaimOptions) SetIdleUnixTime(idleUnixTime int64) *StreamClaimOptions {
sco.idleUnixTime = idleUnixTime
return sco
}

// Set the retry count.
func (sco *StreamClaimOptions) SetRetryCount(retryCount int64) *StreamClaimOptions {
sco.retryCount = retryCount
return sco
}

// Valkey API keywords for stream claim options
const (
// ValKey API string to designate IDLE time in milliseconds
IDLE_VALKEY_API string = "IDLE"
// ValKey API string to designate TIME time in unix-milliseconds
TIME_VALKEY_API string = "TIME"
// ValKey API string to designate RETRYCOUNT
RETRY_COUNT_VALKEY_API string = "RETRYCOUNT"
// ValKey API string to designate FORCE
FORCE_VALKEY_API string = "FORCE"
// ValKey API string to designate JUSTID
JUST_ID_VALKEY_API string = "JUSTID"
)

func (sco *StreamClaimOptions) ToArgs() ([]string, error) {
optionArgs := []string{}

if sco.idleTime > 0 {
optionArgs = append(optionArgs, IDLE_VALKEY_API, utils.IntToString(sco.idleTime))
}

if sco.idleUnixTime > 0 {
optionArgs = append(optionArgs, TIME_VALKEY_API, utils.IntToString(sco.idleUnixTime))
}

if sco.retryCount > 0 {
optionArgs = append(optionArgs, RETRY_COUNT_VALKEY_API, utils.IntToString(sco.retryCount))
}

if sco.isForce {
optionArgs = append(optionArgs, FORCE_VALKEY_API)
}

return optionArgs, nil
}
57 changes: 57 additions & 0 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,33 @@ func convertStringOrNilArray(response *C.struct_CommandResponse) ([]Result[strin
// array could be nillable, but strings - aren't
func convertStringArray(response *C.struct_CommandResponse, isNilable bool) ([]string, error) {
typeErr := checkResponseType(response, C.Array, isNilable)
func convertArrayOfArrayOfString(response *C.struct_CommandResponse) ([][]Result[string], error) {
typeErr := checkResponseType(response, C.Array, false)
if typeErr != nil {
return nil, typeErr
}

slice := make([][]Result[string], 0, response.array_value_len)
for _, v := range unsafe.Slice(response.array_value, response.array_value_len) {
res, err := convertStringArray(&v)
if err != nil {
return nil, err
}
slice = append(slice, res)
}
return slice, nil
}

func handleStringArrayResponse(response *C.struct_CommandResponse) ([]Result[string], error) {
defer C.free_command_response(response)

return convertStringArray(response)
}

func handleStringArrayOrNullResponse(response *C.struct_CommandResponse) ([]Result[string], error) {
defer C.free_command_response(response)

typeErr := checkResponseType(response, C.Array, true)
if typeErr != nil {
return nil, typeErr
}
Expand Down Expand Up @@ -437,6 +464,36 @@ func handleStringToStringArrayMapOrNilResponse(
return result, nil
}

return m, nil
}

func handleStringToArrayOfStringArrayMapResponse(
response *C.struct_CommandResponse,
) (map[Result[string]][][]Result[string], error) {
defer C.free_command_response(response)

typeErr := checkResponseType(response, C.Map, true)
if typeErr != nil {
return nil, typeErr
}

if response.response_type == C.Null {
return nil, nil
}

m := make(map[Result[string]][][]Result[string], response.array_value_len)
for _, v := range unsafe.Slice(response.array_value, response.array_value_len) {
key, err := convertCharArrayToString(v.map_key, true)
if err != nil {
return nil, err
}
value, err := convertArrayOfArrayOfString(v.map_value)
if err != nil {
return nil, err
}
m[key] = value
}

return nil, &RequestError{fmt.Sprintf("unexpected type received: %T", res)}
}

Expand Down
105 changes: 105 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,109 @@ type StreamCommands interface {
XGroupDelConsumer(key string, group string, consumer string) (int64, error)

XAck(key string, group string, ids []string) (int64, error)

// Changes the ownership of a pending message.
//
// See [valkey.io] for details.
//
// Parameters:
// key - The key of the stream.
// group - The name of the consumer group.
// consumer - The name of the consumer.
// minIdleTime - The minimum idle time in milliseconds.
// ids - The ids of the entries to claim.
//
// Return value:
// A `map of message entries with the format `{"entryId": [["entry", "data"], ...], ...}` that were claimed by
// the consumer.
//
// Example:
//
// [valkey.io]: https://valkey.io/commands/xclaim/
XClaim(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
) (map[Result[string]][][]Result[string], error)

// Changes the ownership of a pending message.
//
// See [valkey.io] for details.
//
// Parameters:
// key - The key of the stream.
// group - The name of the consumer group.
// consumer - The name of the consumer.
// minIdleTime - The minimum idle time in milliseconds.
// ids - The ids of the entries to claim.
// options - Stream claim options.
//
// Return value:
// A `map` of message entries with the format `{"entryId": [["entry", "data"], ...], ...}` that were claimed by
// the consumer.
//
// Example:
// result, err := ...
//
// [valkey.io]: https://valkey.io/commands/xclaim/
XClaimWithOptions(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
options *options.StreamClaimOptions,
) (map[Result[string]][][]Result[string], error)

// Changes the ownership of a pending message. This function returns an `array` with
// only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API.
//
// See [valkey.io] for details.
//
// Parameters:
// key - The key of the stream.
// group - The name of the consumer group.
// consumer - The name of the consumer.
// minIdleTime - The minimum idle time in milliseconds.
// ids - The ids of the entries to claim.
// options - Stream claim options.
//
// Return value:
// An array of the ids of the entries that were claimed by the consumer.
//
// Example:
// result, err := ...
// [valkey.io]: https://valkey.io/commands/xclaim/
XClaimJustId(key string, group string, consumer string, minIdleTime int64, ids []string) ([]Result[string], error)

// Changes the ownership of a pending message. This function returns an `array` with
// only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API.
//
// See [valkey.io] for details.
//
// Parameters:
// key - The key of the stream.
// group - The name of the consumer group.
// consumer - The name of the consumer.
// minIdleTime - The minimum idle time in milliseconds.
// ids - The ids of the entries to claim.
// options - Stream claim options.
//
// Return value:
// An array of the ids of the entries that were claimed by the consumer.
//
// Example:
// result, err := ...
//
// [valkey.io]: https://valkey.io/commands/xclaim/
XClaimJustIdWithOptions(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
options *options.StreamClaimOptions,
) ([]Result[string], error)
}
9 changes: 9 additions & 0 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6539,3 +6539,12 @@ func (suite *GlideTestSuite) TestXGroupStreamCommands() {
assert.IsType(suite.T(), &api.RequestError{}, err)
})
}

func (suite *GlideTestSuite) TestXClaim() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.New().String()
client.XAdd(key, [][]string{{"field1", "value1"}})

// TODO: finish test case using custom commands
})
}

0 comments on commit aac3cde

Please sign in to comment.