From d33a91487be70e1807611da0ca54d1e7fc564a73 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Sun, 30 Jun 2024 20:58:08 -0700 Subject: [PATCH] Java: Add XCLAIM command (#1735) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Java: Add XCLAIM command (#392) * Python: add XPENDING command (#1704) * Python: add XPENDING command * PR suggestions * PR suggestions * Java: Add Command GeoSearch & GeoSearchStore * Java: Add Command GeoSearch & GeoSearchStore --------- * trigger build * Python: add RANDOMKEY command (#1701) * Python: add RANDOMKEY command * Enable randomkey() test for that redis-rs is fixed Signed-off-by: Andrew Carbonetto * NOP push Signed-off-by: Andrew Carbonetto --------- Signed-off-by: Andrew Carbonetto Co-authored-by: Andrew Carbonetto * Python: add FUNCTION FLUSH command (#1700) * Python: Added FUNCTION LOAD command * Python: adds FUNCTION FLUSH command * Updated CHANGELOG.md * Resolved merge issues related to FlushMode * Minor adjustments on command documentation * Revert one minor change in example. --------- Co-authored-by: Shoham Elias Co-authored-by: Andrew Carbonetto * removing redis references * Java: Handle panics and errors in the Java FFI layer (#1601) * Restructure Java FFI layer to handle errors properly * Fix failing tests * Address clippy lints * Add tests for error and panic handling * Add missing errors module * Fix clippy lint * Fix FFI tests * Apply Spotless * Fix some minor issue I forgot about * Add some comments * Apply Spotless * Make handle_panics return Option instead * Java: Add SSCAN and ZSCAN commands (#1705) * Java: Add `SSCAN` command (#394) * Add ScanOptions base class for scan-family options. * Expose the cursor as a String to support unsigned 64-bit cursor values. Co-authored-by: James Duong * Java: Add `ZSCAN` command (#397) --------- Co-authored-by: James Duong * WIP TODO: support transactions, docs, and more IT * Added more tests * Added tests and javadocs * Improved examples and tests * Correct use of SScanOptions instead of ScanOptions for SScan * Remove plumbing for SCAN command * Sleep after sadd() calls before sscan() calls Due to eventual consistency * Change sscan cursor to be a String Also fix bug in SharedCommandTests * WIP with todos * Add ZScan to TransactionTestUtilities * Spotless cleanup * Test fixes * Cleanup test code * Apply IntelliJ suggestions * Use String.valueOf() instead of concatenating empty string * Added better error info for set comparison failures * More logging for test failures * Add sleeps after zadd() calls To help make sure data is consistent without WAIT * Longer sleeps * Reduce wait time * Experiment with unsigned 64-bit cursors * Fix rebase error * WIP TODO: support transactions, docs, and more IT * Added more tests * Added tests and javadocs * Improved examples and tests * Apply PR comments * Fix method ordering in BaseTransaction * Fix broken line breaks within code tags in ScanOptions * More thoroughly test results in SharedCommandTests * Add better logging for set comparisons * Spotless * Sleep after sadd() calls before sscan() calls Due to eventual consistency * Change sscan cursor to be a String Also fix bug in SharedCommandTests * Update java/integTest/src/test/java/glide/SharedCommandTests.java Co-authored-by: Guian Gumpac * Update java/integTest/src/test/java/glide/SharedCommandTests.java Co-authored-by: Guian Gumpac * Fix rebase conflicts * Fix another rebase conflict * Spotless * Update java/client/src/main/java/glide/api/models/BaseTransaction.java Co-authored-by: Andrew Carbonetto * Update java/client/src/main/java/glide/api/models/BaseTransaction.java Co-authored-by: Andrew Carbonetto * Update java/client/src/main/java/glide/api/models/BaseTransaction.java Co-authored-by: Andrew Carbonetto * Update java/client/src/main/java/glide/api/models/BaseTransaction.java Co-authored-by: Andrew Carbonetto * Correctly use constants in TransactionTests * Rename ScanOptions to BaseScanOptions * Doc PR fixes * Treat end of cursor as failure * Spotless * Fixes * Update java/client/src/main/java/glide/api/commands/SortedSetBaseCommands.java Co-authored-by: Andrew Carbonetto * Update java/client/src/main/java/glide/api/commands/SortedSetBaseCommands.java Co-authored-by: Andrew Carbonetto * Update java/client/src/main/java/glide/api/commands/SortedSetBaseCommands.java Co-authored-by: Andrew Carbonetto * Update java/client/src/main/java/glide/api/commands/SortedSetBaseCommands.java Co-authored-by: Andrew Carbonetto * Minor doc changes --------- Co-authored-by: Guian Gumpac Co-authored-by: Andrew Carbonetto * CI: Add Support for Valkey 6.2, 7.0 and 7.2 (#1711) - Transitioned the engine building process to be sourced from the Valkey repository. - Introduced compatibility with the following engine versions: Valkey and Redis 6.2 Valkey and Redis 7.0 Valkey and Redis 7.2 (first stable release of Valkey 7.2.5.) - Engine Installation Checks: Added check that the engine is installed with the requested version. - Moved the engine version matrix to a JSON file for better management and readability. - Fixed Object Encoding tests to expect quicklist on versions < 7.2 instead of 7.0 - Fixed C# and Java version parsing from redis-server -v output to support also Valkey's output - Updated the README file with the supported versions & engine typ * Python: add FUNCTION DELETE command (#1714) * Python: adds FUNCTION DELETE command Co-authored-by: Shoham Elias * Python: add `SSCAN` command (#1709) * Added sscan command to python * Fixed formatting * Fixed CI failures * Lint * Improved example and test * Changes based on sscan java PR * Added to changelog * Addressed PR comments * Added string casting * Java: Add HSCAN command (#1706) * Java: Add `SSCAN` command (#394) * Add ScanOptions base class for scan-family options. * Expose the cursor as a String to support unsigned 64-bit cursor values. Co-authored-by: James Duong * Java: Add `ZSCAN` command (#397) --------- Co-authored-by: James Duong * WIP TODO: support transactions, docs, and more IT * Added more tests * Added tests and javadocs * Improved examples and tests * Correct use of SScanOptions instead of ScanOptions for SScan * Remove plumbing for SCAN command * Sleep after sadd() calls before sscan() calls Due to eventual consistency * Change sscan cursor to be a String Also fix bug in SharedCommandTests * WIP with todos * Add ZScan to TransactionTestUtilities * Spotless cleanup * Test fixes * Cleanup test code * Apply IntelliJ suggestions * Use String.valueOf() instead of concatenating empty string * Added better error info for set comparison failures * More logging for test failures * Add sleeps after zadd() calls To help make sure data is consistent without WAIT * Longer sleeps * Reduce wait time * Experiment with unsigned 64-bit cursors * Fix rebase error * WIP TODO: support transactions, docs, and more IT * Added more tests * Added tests and javadocs * Improved examples and tests * Sleep after sadd() calls before sscan() calls Due to eventual consistency * Change sscan cursor to be a String Also fix bug in SharedCommandTests * Fix rebase conflicts * Fix another rebase conflict * Spotless * HScan * Flakey test * Add HScan transaction unit test * Rename ScanOptions to BaseScanOptions * Fix merge issues * Fix module-info ordering * Tidy up docs * PR comments Fix up merge duplication and use HScanOptions constants. --------- Co-authored-by: Guian Gumpac * Python: add LCS command (#1716) * python: add LCS command (#406) * python: add LCS command * update CHANGELOG * add more comment explaning the functionality of the command * address comments on the docs * Java: Changed handling of large requests to transfer them as leaked pointers (#1708) * Restructure Java FFI layer to handle errors properly * Address clippy lints * Add tests for error and panic handling * Fix FFI tests * Apply Spotless * Fix some minor issue I forgot about * Add API to create the leaked bytes vec * Bridge the MAX_REQUEST_ARGS_LENGTH constant from Rust to Java * Fix warnings in Rust * Update Java client to utilize the pointer with large argument sizes * Update createLeakedBytesVec to handle panics * spotless * Add docs and run Rust linters * Add large value tests * Fix transactions and add transaction tests * dummy commit for CI * Revert "dummy commit for CI" This reverts commit 3ed193770fe21407e2d12e0d3341f27be1d455e1. * Fix JDK11 build issue Due to using a JDK17 function * Fix another JDK11 issue * Fix merge issues. * Remove unneccesary mut prefix * Clarify the MAX_REQUEST_ARGS_LENGTH_IN_BYTES constant * Fix merge issue --------- Co-authored-by: Jonathan Louie * Create initial workflow for publishing to Maven Central (#1600) * WIP Create initial workflow for publishing to Maven Central (#1594) * WIP Create initial workflow for publishing to Maven Central * Add classifier to workflow * Remove condition to allow all jobs to run * Try to fix Gradle workflow errors * Re-enable aws related options * Add missing property * Revert "Add missing property" This reverts commit 6cc5fba787385063bb6faac0885abeb78a7a49b6. * Add AWS_ACTIONS option * Sign JAR file * Fix signing issue * Try to fix issue with generating secring.gpg file * Fix path to secring.gpg * Try to fix secring.gpg retrieval issue * Remove base64 decode * Try to fix multi-line issue with GPG key secret * Go back to echo approach * Decode base64 properly this time * Use GPG_KEY_ID * Surround password in quotes * Publish JAR to local Maven and upload * Update examples build.gradle * Sign publishToMavenLocal build * Update version of Java JAR * Properly fetch src_folder variable contents * Reorganize JAR contents * Update path of uploaded JAR * Update artifact ID * Add missing comma * Replace placeholders in build.gradle * Update examples build.gradle * Remove test runs from java.yml workflow * Add debugging info to workflow * Adjust debug info * Readd placeholder text in build.gradle * Add more debug info * Change how the JAR is copied * Add configurations for ARM linux and x86 macos * Prevent output artifacts from being swallowed * Update build matrix to use proper RUNNERs * Try to use self-hosted runner for ARM Linux builds * Delete gradle-cd workflow * Add id-token permissions * Add step to setup self-hosted runner access * Add CONTAINER property to java.yml workflow * Remove install Redis step from java.yml workflow * Remove test-benchmark step from java.yml workflow * Fix issue with Java classifier * Update java.yml to use classifier * Bump version and add archiveClassifier * Change groupId to valkey-client * Update example and base archive name * Update workflow * Rename to glide-for-redis * Extracting Java Deployment to a different workflow Workflow will only trigger when a tag is pushed to the repo Version is extracted from the tag and replaced in the build.grade files reverted changes of java.yml file * trying to make the workflow to build * testing action to prepare build * forcing new action to trigger * Revert "forcing new action to trigger" This reverts commit d097a1f50608d0e053d847135436325256a3bdfb. * Revert "testing action to prepare build" This reverts commit 8864434d2bbecc8b6e8eb9ab800b90ed1cf22462. * Revert "trying to make the workflow to build" This reverts commit 143818abb87269bde21d1372205d61d2e9fae173. * Revert "Extracting Java Deployment to a different workflow" This reverts commit faff84694a309b0ccf80fa40d58d9c5f9d56bdb2. * Revert "Revert "Extracting Java Deployment to a different workflow"" This reverts commit 11f8470ff1ce859ab85b6858522dc9fef8b928cb. * fixing workflow * fixed path for the local maven * removing bundle from the tests fix to the JAVA CI not finding tests dependencies * fix java workflow * removing classifier from the pom * fixing concurrency * Remove publishToMavenLocal line in examples build.gradle * fix examples * cleaning up java.yml * testing removing test dependency * adding skip signing * Revert "adding skip signing" This reverts commit e4487882d523ac33828125767afda54472515c14. * Revert "testing removing test dependency" This reverts commit d0e06b70119b0b11d8c8683743dac64c8dd2eef2. * Revert "cleaning up java.yml" This reverts commit e7394d77220f4c6bff4f46fd32bc161ac4b78a0a. * removing dependency of singing in the local build * java.yml clean up * removing steps from java.yml * added comments * removed step on sed the examples and removed if always from the upload artifacts --------- Co-authored-by: affonsov Co-authored-by: affonsov * #1715: fix flakey test in xpending (#1717) Signed-off-by: Andrew Carbonetto * Java: Adding command WAIT (#1707) * Java: Adding command WAIT Java: Adding command WAIT * addressing comments * fixing timeout_idx in get_timeout_from_cmd_args call * update timeout check * fixing rust test * adding special case for WAIT * rust linter * remove special case in get_timeout_from_cmd_args * adding description for timeout 0 * rust linter * updating timeout test * changing transaction documentation --------- Co-authored-by: TJ Zhang * support smismember with GlideString (#1694) * Support GlideString for sdiff commands (#1722) Co-authored-by: Yulazari * Updated attribution files * support hset, hget, lindex, linsert, blmove, incr, hlen and lmove wit… (#1667) * Python: move the commands return value to bytes (#1617) * In the case of Simple String, Bulk String, or Verbatim String commands, Bytes will be returned instead of strings. --------- Co-authored-by: GilboaAWS Co-authored-by: Ubuntu Co-authored-by: Adar Ovadia Co-authored-by: Shoham Elias Co-authored-by: Shoham Elias <116083498+shohamazon@users.noreply.github.com> * Java: Add XGROUP SETID command (#1720) * Initial implementation of XGroupSetId * Unit tests * Add integration tests * PR feedback * Address PR comments doc updates * Add 7.0.0 transaction integration test * Java: update README directory to include Java's README.md (#1734) add java part to readme directory * Java: Add XCLAIM command Signed-off-by: Andrew Carbonetto * Add unit tests Signed-off-by: Andrew Carbonetto * Add transaction tests Signed-off-by: Andrew Carbonetto * SPOTLESS Signed-off-by: Andrew Carbonetto * Update IT tests Signed-off-by: Andrew Carbonetto * Update IT tests Signed-off-by: Andrew Carbonetto * Add UT tests Signed-off-by: Andrew Carbonetto * Add transaction tests Signed-off-by: Andrew Carbonetto * Fix transaction tests Signed-off-by: Andrew Carbonetto * Update XCLAIM with options; remove LASTID Signed-off-by: Andrew Carbonetto * Add a couple more test cases Signed-off-by: Andrew Carbonetto * clean up Signed-off-by: Andrew Carbonetto * Clean rust Signed-off-by: Andrew Carbonetto * Add examples Signed-off-by: Andrew Carbonetto * Move to 2D string array in response Signed-off-by: Andrew Carbonetto * Fix Transaction tests; update examples Signed-off-by: Andrew Carbonetto * SPOTLESS Signed-off-by: Andrew Carbonetto --------- Signed-off-by: Andrew Carbonetto Co-authored-by: Aaron <69273634+aaron-congo@users.noreply.github.com> Co-authored-by: tjzhang-BQ <111323543+tjzhang-BQ@users.noreply.github.com> Co-authored-by: TJ Zhang Co-authored-by: Yi-Pin Chen Co-authored-by: Shoham Elias Co-authored-by: jonathanl-bq <72158117+jonathanl-bq@users.noreply.github.com> Co-authored-by: James Duong Co-authored-by: Guian Gumpac Co-authored-by: Bar Shaul <88437685+barshaul@users.noreply.github.com> Co-authored-by: James Duong Co-authored-by: James Xin <126831592+jamesx-improving@users.noreply.github.com> Co-authored-by: Jonathan Louie Co-authored-by: affonsov Co-authored-by: affonsov Co-authored-by: Alon Arenberg <93711356+alon-arenberg@users.noreply.github.com> Co-authored-by: yulazariy Co-authored-by: Yulazari Co-authored-by: ort-bot Co-authored-by: adarovadya Co-authored-by: GilboaAWS Co-authored-by: Ubuntu Co-authored-by: Adar Ovadia Co-authored-by: Shoham Elias <116083498+shohamazon@users.noreply.github.com> Co-authored-by: Chloe Yip <168601573+cyip10@users.noreply.github.com> * SPOTLESS Signed-off-by: Andrew Carbonetto * Fix merge conflicts Signed-off-by: Andrew Carbonetto * Review comments Signed-off-by: Andrew Carbonetto * Update docs for review comments Signed-off-by: Andrew Carbonetto * small doc fix Signed-off-by: Andrew Carbonetto --------- Signed-off-by: Andrew Carbonetto Co-authored-by: Aaron <69273634+aaron-congo@users.noreply.github.com> Co-authored-by: tjzhang-BQ <111323543+tjzhang-BQ@users.noreply.github.com> Co-authored-by: TJ Zhang Co-authored-by: Yi-Pin Chen Co-authored-by: Shoham Elias Co-authored-by: jonathanl-bq <72158117+jonathanl-bq@users.noreply.github.com> Co-authored-by: James Duong Co-authored-by: Guian Gumpac Co-authored-by: Bar Shaul <88437685+barshaul@users.noreply.github.com> Co-authored-by: James Duong Co-authored-by: James Xin <126831592+jamesx-improving@users.noreply.github.com> Co-authored-by: Jonathan Louie Co-authored-by: affonsov Co-authored-by: affonsov Co-authored-by: Alon Arenberg <93711356+alon-arenberg@users.noreply.github.com> Co-authored-by: yulazariy Co-authored-by: Yulazari Co-authored-by: ort-bot Co-authored-by: adarovadya Co-authored-by: GilboaAWS Co-authored-by: Ubuntu Co-authored-by: Adar Ovadia Co-authored-by: Shoham Elias <116083498+shohamazon@users.noreply.github.com> Co-authored-by: Chloe Yip <168601573+cyip10@users.noreply.github.com> --- glide-core/src/client/value_conversion.rs | 40 ++++ glide-core/src/protobuf/redis_request.proto | 1 + glide-core/src/request_type.rs | 3 + .../src/main/java/glide/api/BaseClient.java | 63 ++++++ .../api/commands/StreamBaseCommands.java | 116 ++++++++++ .../glide/api/models/BaseTransaction.java | 111 ++++++++++ .../commands/stream/StreamClaimOptions.java | 104 +++++++++ .../test/java/glide/api/RedisClientTest.java | 156 ++++++++++++++ .../glide/api/models/TransactionTests.java | 59 +++++ .../test/java/glide/SharedCommandTests.java | 202 +++++++++++++++++- .../java/glide/TransactionTestUtilities.java | 23 ++ 11 files changed, 870 insertions(+), 8 deletions(-) create mode 100644 java/client/src/main/java/glide/api/models/commands/stream/StreamClaimOptions.java diff --git a/glide-core/src/client/value_conversion.rs b/glide-core/src/client/value_conversion.rs index b2ad3f1560..e49eedc74f 100644 --- a/glide-core/src/client/value_conversion.rs +++ b/glide-core/src/client/value_conversion.rs @@ -925,6 +925,16 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option { key_type: &None, value_type: &None, }), + b"XCLAIM" => { + if cmd.position(b"JUSTID").is_some() { + Some(ExpectedReturnType::ArrayOfStrings) + } else { + Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::SimpleString), + value_type: &Some(ExpectedReturnType::ArrayOfPairs), + }) + } + } b"XAUTOCLAIM" => { if cmd.position(b"JUSTID").is_some() { // Value conversion is not needed if the JUSTID arg was passed. @@ -1262,6 +1272,36 @@ mod tests { assert!(converted_4.is_err()); } + #[test] + fn convert_xclaim() { + assert!(matches!( + expected_type_for_cmd( + redis::cmd("XCLAIM") + .arg("key") + .arg("grou") + .arg("consumer") + .arg("0") + .arg("id") + ), + Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::SimpleString), + value_type: &Some(ExpectedReturnType::ArrayOfPairs), + }) + )); + assert!(matches!( + expected_type_for_cmd( + redis::cmd("XCLAIM") + .arg("key") + .arg("grou") + .arg("consumer") + .arg("0") + .arg("id") + .arg("JUSTID") + ), + Some(ExpectedReturnType::ArrayOfStrings) + )); + } + #[test] fn convert_xrange_xrevrange() { assert!(matches!( diff --git a/glide-core/src/protobuf/redis_request.proto b/glide-core/src/protobuf/redis_request.proto index 63199878d9..d93d6c7c35 100644 --- a/glide-core/src/protobuf/redis_request.proto +++ b/glide-core/src/protobuf/redis_request.proto @@ -243,6 +243,7 @@ enum RequestType { HScan = 202; XAutoClaim = 203; Wait = 208; + XClaim = 209; } message Command { diff --git a/glide-core/src/request_type.rs b/glide-core/src/request_type.rs index 81e9332ea1..7064e9a0ce 100644 --- a/glide-core/src/request_type.rs +++ b/glide-core/src/request_type.rs @@ -213,6 +213,7 @@ pub enum RequestType { HScan = 202, XAutoClaim = 203, Wait = 208, + XClaim = 209, } fn get_two_word_command(first: &str, second: &str) -> Cmd { @@ -429,6 +430,7 @@ impl From<::protobuf::EnumOrUnknown> for RequestType { ProtobufRequestType::HScan => RequestType::HScan, ProtobufRequestType::XAutoClaim => RequestType::XAutoClaim, ProtobufRequestType::Wait => RequestType::Wait, + ProtobufRequestType::XClaim => RequestType::XClaim, } } } @@ -643,6 +645,7 @@ impl RequestType { RequestType::HScan => Some(cmd("HSCAN")), RequestType::XAutoClaim => Some(cmd("XAUTOCLAIM")), RequestType::Wait => Some(cmd("WAIT")), + RequestType::XClaim => Some(cmd("XCLAIM")), } } } diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index 2d567b0b75..fcc8cc253b 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -6,6 +6,7 @@ import static glide.api.models.commands.bitmap.BitFieldOptions.BitFieldReadOnlySubCommands; import static glide.api.models.commands.bitmap.BitFieldOptions.BitFieldSubCommands; import static glide.api.models.commands.bitmap.BitFieldOptions.createBitFieldArgs; +import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API; import static glide.ffi.resolvers.SocketListenerResolver.getSocket; import static glide.utils.ArrayTransformUtils.cast3DArray; import static glide.utils.ArrayTransformUtils.castArray; @@ -137,6 +138,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Watch; import static redis_request.RedisRequestOuterClass.RequestType.XAck; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; +import static redis_request.RedisRequestOuterClass.RequestType.XClaim; import static redis_request.RedisRequestOuterClass.RequestType.XDel; import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate; import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer; @@ -226,6 +228,7 @@ import glide.api.models.commands.scan.SScanOptions; import glide.api.models.commands.scan.ZScanOptions; import glide.api.models.commands.stream.StreamAddOptions; +import glide.api.models.commands.stream.StreamClaimOptions; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamPendingOptions; import glide.api.models.commands.stream.StreamRange; @@ -2178,6 +2181,66 @@ public CompletableFuture xpending( XPending, args, response -> castArray(handleArrayResponse(response), Object[].class)); } + @Override + public CompletableFuture> xclaim( + @NonNull String key, + @NonNull String group, + @NonNull String consumer, + long minIdleTime, + @NonNull String[] ids) { + String[] args = + concatenateArrays(new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids); + return commandManager.submitNewCommand(XClaim, args, this::handleMapResponse); + } + + @Override + public CompletableFuture> xclaim( + @NonNull String key, + @NonNull String group, + @NonNull String consumer, + long minIdleTime, + @NonNull String[] ids, + @NonNull StreamClaimOptions options) { + String[] args = + concatenateArrays( + new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids, options.toArgs()); + return commandManager.submitNewCommand(XClaim, args, this::handleMapResponse); + } + + @Override + public CompletableFuture xclaimJustId( + @NonNull String key, + @NonNull String group, + @NonNull String consumer, + long minIdleTime, + @NonNull String[] ids) { + String[] args = + concatenateArrays( + new String[] {key, group, consumer, Long.toString(minIdleTime)}, + ids, + new String[] {JUST_ID_REDIS_API}); + return commandManager.submitNewCommand( + XClaim, args, response -> castArray(handleArrayResponse(response), String.class)); + } + + @Override + public CompletableFuture xclaimJustId( + @NonNull String key, + @NonNull String group, + @NonNull String consumer, + long minIdleTime, + @NonNull String[] ids, + @NonNull StreamClaimOptions options) { + String[] args = + concatenateArrays( + new String[] {key, group, consumer, Long.toString(minIdleTime)}, + ids, + options.toArgs(), + new String[] {JUST_ID_REDIS_API}); + return commandManager.submitNewCommand( + XClaim, args, response -> castArray(handleArrayResponse(response), String.class)); + } + @Override public CompletableFuture pttl(@NonNull String key) { return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse); diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java index 9644d11f8c..e92c723cc1 100644 --- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -4,6 +4,7 @@ import glide.api.models.GlideString; import glide.api.models.commands.stream.StreamAddOptions; import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder; +import glide.api.models.commands.stream.StreamClaimOptions; import glide.api.models.commands.stream.StreamGroupOptions; import glide.api.models.commands.stream.StreamPendingOptions; import glide.api.models.commands.stream.StreamRange; @@ -711,4 +712,119 @@ CompletableFuture xpending( StreamRange end, long count, StreamPendingOptions options); + + /** + * Changes the ownership of a pending message. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param group The consumer group name. + * @param consumer The group consumer. + * @param minIdleTime The minimum idle time for the message to be claimed. + * @param ids A array of entry ids. + * @return A Map of message entries with the format + * {"entryId": [["entry", "data"], ...], ...} that are claimed by the consumer. + * @example + *
+     * // read messages from streamId for consumer1
+     * var readResult = client.xreadgroup(Map.of("mystream", ">"), "mygroup", "consumer1").get();
+     * // "entryId" is now read, and we can assign the pending messages to consumer2
+     * Map results = client.xclaim("mystream", "mygroup", "consumer2", 0L, new String[] {entryId}).get();
+     * for (String key: results.keySet()) {
+     *     System.out.println(key);
+     *     for (String[] entry: results.get(key)) {
+     *         System.out.printf("{%s=%s}%n", entry[0], entry[1]);
+     *     }
+     * }
+     * 
+ */ + CompletableFuture> xclaim( + String key, String group, String consumer, long minIdleTime, String[] ids); + + /** + * Changes the ownership of a pending message. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param group The consumer group name. + * @param consumer The group consumer. + * @param minIdleTime The minimum idle time for the message to be claimed. + * @param ids An array of entry ids. + * @param options Stream claim options {@link StreamClaimOptions}. + * @return A Map of message entries with the format + * {"entryId": [["entry", "data"], ...], ...} that are claimed by the consumer. + * @example + *
+     * // assign (force) unread and unclaimed messages to consumer2
+     * StreamClaimOptions options = StreamClaimOptions.builder().force().build();
+     * Map results = client.xclaim("mystream", "mygroup", "consumer2", 0L, new String[] {entryId}, options).get();
+     * for (String key: results.keySet()) {
+     *     System.out.println(key);
+     *     for (String[] entry: results.get(key)) {
+     *         System.out.printf("{%s=%s}%n", entry[0], entry[1]);
+     *     }
+     * }
+     * 
+ */ + CompletableFuture> xclaim( + String key, + String group, + String consumer, + long minIdleTime, + String[] ids, + StreamClaimOptions options); + + /** + * Changes the ownership of a pending message. This function returns an array with + * only the message/entry IDs, and is equivalent to using JUSTID in the Redis API. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param group The consumer group name. + * @param consumer The group consumer. + * @param minIdleTime The minimum idle time for the message to be claimed. + * @param ids An array of entry ids. + * @return An array of message ids claimed by the consumer. + * @example + *
+     * // read messages from streamId for consumer1
+     * var readResult = client.xreadgroup(Map.of("mystream", ">"), "mygroup", "consumer1").get();
+     * // "entryId" is now read, and we can assign the pending messages to consumer2
+     * String[] results = client.xclaimJustId("mystream", "mygroup", "consumer2", 0L, new String[] {entryId}).get();
+     * for (String id: results) {
+     *     System.out.printf("consumer2 claimed stream entry ID: %s %n", id);
+     * }
+     * 
+ */ + CompletableFuture xclaimJustId( + String key, String group, String consumer, long minIdleTime, String[] ids); + + /** + * Changes the ownership of a pending message. This function returns an array with + * only the message/entry IDs, and is equivalent to using JUSTID in the Redis API. + * + * @see valkey.io for details. + * @param key The key of the stream. + * @param group The consumer group name. + * @param consumer The group consumer. + * @param minIdleTime The minimum idle time for the message to be claimed. + * @param ids An array of entry ids. + * @param options Stream claim options {@link StreamClaimOptions}. + * @return An array of message ids claimed by the consumer. + * @example + *
+     * // assign (force) unread and unclaimed messages to consumer2
+     * StreamClaimOptions options = StreamClaimOptions.builder().force().build();
+     * String[] results = client.xclaimJustId("mystream", "mygroup", "consumer2", 0L, new String[] {entryId}, options).get();
+     * for (String id: results) {
+     *     System.out.printf("consumer2 claimed stream entry ID: %s %n", id);
+     * }
+     */
+    CompletableFuture xclaimJustId(
+            String key,
+            String group,
+            String consumer,
+            long minIdleTime,
+            String[] ids,
+            StreamClaimOptions options);
 }
diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java
index 593cda2bba..13659f4b4c 100644
--- a/java/client/src/main/java/glide/api/models/BaseTransaction.java
+++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java
@@ -20,6 +20,7 @@
 import static glide.api.models.commands.function.FunctionListOptions.LIBRARY_NAME_REDIS_API;
 import static glide.api.models.commands.function.FunctionListOptions.WITH_CODE_REDIS_API;
 import static glide.api.models.commands.function.FunctionLoadOptions.REPLACE;
+import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API;
 import static glide.utils.ArrayTransformUtils.concatenateArrays;
 import static glide.utils.ArrayTransformUtils.convertMapToKeyValueStringArray;
 import static glide.utils.ArrayTransformUtils.convertMapToValueKeyStringArray;
@@ -163,6 +164,7 @@
 import static redis_request.RedisRequestOuterClass.RequestType.Wait;
 import static redis_request.RedisRequestOuterClass.RequestType.XAck;
 import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
+import static redis_request.RedisRequestOuterClass.RequestType.XClaim;
 import static redis_request.RedisRequestOuterClass.RequestType.XDel;
 import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
 import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
@@ -257,6 +259,7 @@
 import glide.api.models.commands.scan.ZScanOptions;
 import glide.api.models.commands.stream.StreamAddOptions;
 import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder;
+import glide.api.models.commands.stream.StreamClaimOptions;
 import glide.api.models.commands.stream.StreamGroupOptions;
 import glide.api.models.commands.stream.StreamPendingOptions;
 import glide.api.models.commands.stream.StreamRange;
@@ -3330,6 +3333,114 @@ public T xpending(
         return getThis();
     }
 
+    /**
+     * Changes the ownership of a pending message.
+     *
+     * @see valkey.io for details.
+     * @param key The key of the stream.
+     * @param group The consumer group name.
+     * @param consumer The group consumer.
+     * @param minIdleTime The minimum idle time for the message to be claimed.
+     * @param ids An array of entry ids.
+     * @return Command Response - A Map of message entries with the format 
+     *      {"entryId": [["entry", "data"], ...], ...} that are claimed by the consumer.
+     */
+    public T xclaim(
+            @NonNull String key,
+            @NonNull String group,
+            @NonNull String consumer,
+            long minIdleTime,
+            @NonNull String[] ids) {
+        String[] args =
+                concatenateArrays(new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids);
+        protobufTransaction.addCommands(buildCommand(XClaim, buildArgs(args)));
+        return getThis();
+    }
+
+    /**
+     * Changes the ownership of a pending message.
+     *
+     * @see valkey.io for details.
+     * @param key The key of the stream.
+     * @param group The consumer group name.
+     * @param consumer The group consumer.
+     * @param minIdleTime The minimum idle time for the message to be claimed.
+     * @param ids An array of entry ids.
+     * @param options Stream claim options {@link StreamClaimOptions}.
+     * @return Command Response - A Map of message entries with the format 
+     *      {"entryId": [["entry", "data"], ...], ...} that are claimed by the consumer.
+     */
+    public T xclaim(
+            @NonNull String key,
+            @NonNull String group,
+            @NonNull String consumer,
+            long minIdleTime,
+            @NonNull String[] ids,
+            @NonNull StreamClaimOptions options) {
+        String[] args =
+                concatenateArrays(
+                        new String[] {key, group, consumer, Long.toString(minIdleTime)}, ids, options.toArgs());
+        protobufTransaction.addCommands(buildCommand(XClaim, buildArgs(args)));
+        return getThis();
+    }
+
+    /**
+     * Changes the ownership of a pending message. This function returns an array with
+     * only the message/entry IDs, and is equivalent to using JUSTID in the Redis API.
+     *
+     * @see valkey.io for details.
+     * @param key The key of the stream.
+     * @param group The consumer group name.
+     * @param consumer The group consumer.
+     * @param minIdleTime The minimum idle time for the message to be claimed.
+     * @param ids An array of entry ids.
+     * @return Command Response - An array of message ids claimed by the consumer.
+     */
+    public T xclaimJustId(
+            @NonNull String key,
+            @NonNull String group,
+            @NonNull String consumer,
+            long minIdleTime,
+            @NonNull String[] ids) {
+        String[] args =
+                concatenateArrays(
+                        new String[] {key, group, consumer, Long.toString(minIdleTime)},
+                        ids,
+                        new String[] {JUST_ID_REDIS_API});
+        protobufTransaction.addCommands(buildCommand(XClaim, buildArgs(args)));
+        return getThis();
+    }
+
+    /**
+     * Changes the ownership of a pending message. This function returns an array with
+     * only the message/entry IDs, and is equivalent to using JUSTID in the Redis API.
+     *
+     * @see valkey.io for details.
+     * @param key The key of the stream.
+     * @param group The consumer group name.
+     * @param consumer The group consumer.
+     * @param minIdleTime The minimum idle time for the message to be claimed.
+     * @param ids An array of entry ids.
+     * @param options Stream claim options {@link StreamClaimOptions}.
+     * @return Command Response - An array of message ids claimed by the consumer.
+     */
+    public T xclaimJustId(
+            @NonNull String key,
+            @NonNull String group,
+            @NonNull String consumer,
+            long minIdleTime,
+            @NonNull String[] ids,
+            @NonNull StreamClaimOptions options) {
+        String[] args =
+                concatenateArrays(
+                        new String[] {key, group, consumer, Long.toString(minIdleTime)},
+                        ids,
+                        options.toArgs(),
+                        new String[] {JUST_ID_REDIS_API});
+        protobufTransaction.addCommands(buildCommand(XClaim, buildArgs(args)));
+        return getThis();
+    }
+
     /**
      * Returns the remaining time to live of key that has a timeout, in milliseconds.
      *
diff --git a/java/client/src/main/java/glide/api/models/commands/stream/StreamClaimOptions.java b/java/client/src/main/java/glide/api/models/commands/stream/StreamClaimOptions.java
new file mode 100644
index 0000000000..9122096582
--- /dev/null
+++ b/java/client/src/main/java/glide/api/models/commands/stream/StreamClaimOptions.java
@@ -0,0 +1,104 @@
+/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
+package glide.api.models.commands.stream;
+
+import glide.api.commands.StreamBaseCommands;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Builder;
+
+/**
+ * Optional arguments to {@link StreamBaseCommands#xclaim(String, String, String, long, String[],
+ * StreamClaimOptions)}
+ *
+ * @see valkey.io
+ */
+@Builder
+public class StreamClaimOptions {
+
+    /** ValKey API string to designate IDLE time in milliseconds */
+    public static final String IDLE_REDIS_API = "IDLE";
+
+    /** ValKey API string to designate TIME time in unix-milliseconds */
+    public static final String TIME_REDIS_API = "TIME";
+
+    /** ValKey API string to designate RETRYCOUNT */
+    public static final String RETRY_COUNT_REDIS_API = "RETRYCOUNT";
+
+    /** ValKey API string to designate FORCE */
+    public static final String FORCE_REDIS_API = "FORCE";
+
+    /** ValKey API string to designate JUSTID */
+    public static final String JUST_ID_REDIS_API = "JUSTID";
+
+    /**
+     * Set the idle time (last time it was delivered) of the message in milliseconds. If idle
+     *  is not specified, an idle of 0 is assumed, that is, the time
+     * count is reset because the message now has a new owner trying to process it.
+     */
+    private final Long idle; // in milliseconds
+
+    /**
+     * This is the same as {@link #idle} but instead of a relative amount of milliseconds, it sets the
+     * idle time to a specific Unix time (in milliseconds). This is useful in order to rewrite the AOF
+     * file generating XCLAIM commands.
+     */
+    private final Long idleUnixTime; // in unix-time milliseconds
+
+    /**
+     * Set the retry counter to the specified value. This counter is incremented every time a message
+     * is delivered again. Normally {@link StreamBaseCommands#xclaim} does not alter this counter,
+     * which is just served to clients when the {@link StreamBaseCommands#xpending} command is called:
+     * this way clients can detect anomalies, like messages that are never processed for some reason
+     * after a big number of delivery attempts.
+     */
+    private final Long retryCount;
+
+    /**
+     * Creates the pending message entry in the PEL even if certain specified IDs are not already in
+     * the PEL assigned to a different client. However, the message must exist in the stream,
+     * otherwise the IDs of non-existing messages are ignored.
+     */
+    private final boolean isForce;
+
+    public static class StreamClaimOptionsBuilder {
+
+        /**
+         * Creates the pending message entry in the PEL even if certain specified IDs are not already in
+         * the PEL assigned to a different client. However, the message must exist in the stream,
+         * otherwise the IDs of non-existing messages are ignored.
+         */
+        public StreamClaimOptionsBuilder force() {
+            return isForce(true);
+        }
+    }
+
+    /**
+     * Converts options for Xclaim into a String[].
+     *
+     * @return String[]
+     */
+    public String[] toArgs() {
+        List optionArgs = new ArrayList<>();
+
+        if (idle != null) {
+            optionArgs.add(IDLE_REDIS_API);
+            optionArgs.add(Long.toString(idle));
+        }
+
+        if (idleUnixTime != null) {
+            optionArgs.add(TIME_REDIS_API);
+            optionArgs.add(Long.toString(idleUnixTime));
+        }
+
+        if (retryCount != null) {
+            optionArgs.add(RETRY_COUNT_REDIS_API);
+            optionArgs.add(Long.toString(retryCount));
+        }
+
+        if (isForce) {
+            optionArgs.add(FORCE_REDIS_API);
+        }
+
+        return optionArgs.toArray(new String[0]);
+    }
+}
diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java
index 4a0b8144cd..51025218d9 100644
--- a/java/client/src/test/java/glide/api/RedisClientTest.java
+++ b/java/client/src/test/java/glide/api/RedisClientTest.java
@@ -43,6 +43,11 @@
 import static glide.api.models.commands.scan.BaseScanOptions.COUNT_OPTION_STRING;
 import static glide.api.models.commands.scan.BaseScanOptions.MATCH_OPTION_STRING;
 import static glide.api.models.commands.stream.StreamAddOptions.NO_MAKE_STREAM_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.FORCE_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.IDLE_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.RETRY_COUNT_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.TIME_REDIS_API;
 import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_VALKEY_API;
 import static glide.api.models.commands.stream.StreamGroupOptions.MAKE_STREAM_VALKEY_API;
 import static glide.api.models.commands.stream.StreamPendingOptions.IDLE_TIME_REDIS_API;
@@ -221,6 +226,7 @@
 import static redis_request.RedisRequestOuterClass.RequestType.Watch;
 import static redis_request.RedisRequestOuterClass.RequestType.XAck;
 import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
+import static redis_request.RedisRequestOuterClass.RequestType.XClaim;
 import static redis_request.RedisRequestOuterClass.RequestType.XDel;
 import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
 import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
@@ -317,6 +323,7 @@
 import glide.api.models.commands.scan.SScanOptions;
 import glide.api.models.commands.scan.ZScanOptions;
 import glide.api.models.commands.stream.StreamAddOptions;
+import glide.api.models.commands.stream.StreamClaimOptions;
 import glide.api.models.commands.stream.StreamGroupOptions;
 import glide.api.models.commands.stream.StreamPendingOptions;
 import glide.api.models.commands.stream.StreamRange;
@@ -6053,6 +6060,155 @@ public void xack_returns_success() {
         assertEquals(mockResult, payload);
     }
 
+    @SneakyThrows
+    @Test
+    public void xclaim_returns_success() {
+        // setup
+        String key = "testKey";
+        String groupName = "testGroupName";
+        String consumer = "testConsumer";
+        Long minIdleTime = 18L;
+        String[] ids = new String[] {"testId"};
+        String[] arguments = concatenateArrays(new String[] {key, groupName, consumer, "18"}, ids);
+        Map mockResult = Map.of("1234-0", new String[][] {{"message", "log"}});
+
+        CompletableFuture> testResponse = new CompletableFuture<>();
+        testResponse.complete(mockResult);
+
+        // match on protobuf request
+        when(commandManager.>submitNewCommand(eq(XClaim), eq(arguments), any()))
+                .thenReturn(testResponse);
+
+        // exercise
+        CompletableFuture> response =
+                service.xclaim(key, groupName, consumer, minIdleTime, ids);
+        Map payload = response.get();
+
+        // verify
+        assertEquals(testResponse, response);
+        assertEquals(mockResult, payload);
+    }
+
+    @SneakyThrows
+    @Test
+    public void xclaim_with_options_returns_success() {
+        // setup
+        String key = "testKey";
+        String groupName = "testGroupName";
+        String consumer = "testConsumer";
+        Long minIdleTime = 18L;
+        String[] ids = new String[] {"testId"};
+        StreamClaimOptions options =
+                StreamClaimOptions.builder().force().idle(11L).idleUnixTime(12L).retryCount(5L).build();
+        String[] arguments =
+                new String[] {
+                    key,
+                    groupName,
+                    consumer,
+                    "18",
+                    "testId",
+                    IDLE_REDIS_API,
+                    "11",
+                    TIME_REDIS_API,
+                    "12",
+                    RETRY_COUNT_REDIS_API,
+                    "5",
+                    FORCE_REDIS_API
+                };
+        Map mockResult = Map.of("1234-0", new String[][] {{"message", "log"}});
+
+        CompletableFuture> testResponse = new CompletableFuture<>();
+        testResponse.complete(mockResult);
+
+        // match on protobuf request
+        when(commandManager.>submitNewCommand(eq(XClaim), eq(arguments), any()))
+                .thenReturn(testResponse);
+
+        // exercise
+        CompletableFuture> response =
+                service.xclaim(key, groupName, consumer, minIdleTime, ids, options);
+        Map payload = response.get();
+
+        // verify
+        assertEquals(testResponse, response);
+        assertEquals(mockResult, payload);
+    }
+
+    @SneakyThrows
+    @Test
+    public void xclaimJustId_returns_success() {
+        // setup
+        String key = "testKey";
+        String groupName = "testGroupName";
+        String consumer = "testConsumer";
+        Long minIdleTime = 18L;
+        String[] ids = new String[] {"testId"};
+        String[] arguments = new String[] {key, groupName, consumer, "18", "testId", JUST_ID_REDIS_API};
+        String[] mockResult = {"message", "log"};
+
+        CompletableFuture testResponse = new CompletableFuture<>();
+        testResponse.complete(mockResult);
+
+        // match on protobuf request
+        when(commandManager.submitNewCommand(eq(XClaim), eq(arguments), any()))
+                .thenReturn(testResponse);
+
+        // exercise
+        CompletableFuture response =
+                service.xclaimJustId(key, groupName, consumer, minIdleTime, ids);
+        String[] payload = response.get();
+
+        // verify
+        assertEquals(testResponse, response);
+        assertEquals(mockResult, payload);
+    }
+
+    @SneakyThrows
+    @Test
+    public void xclaimJustId_with_options_returns_success() {
+        // setup
+        String key = "testKey";
+        String groupName = "testGroupName";
+        String consumer = "testConsumer";
+        Long minIdleTime = 18L;
+        String[] ids = new String[] {"testId"};
+        StreamClaimOptions options =
+                StreamClaimOptions.builder().force().idle(11L).idleUnixTime(12L).retryCount(5L).build();
+        String[] arguments =
+                new String[] {
+                    key,
+                    groupName,
+                    consumer,
+                    "18",
+                    "testId",
+                    IDLE_REDIS_API,
+                    "11",
+                    TIME_REDIS_API,
+                    "12",
+                    RETRY_COUNT_REDIS_API,
+                    "5",
+                    FORCE_REDIS_API,
+                    JUST_ID_REDIS_API
+                };
+        String[] mockResult = {"message", "log"};
+
+        CompletableFuture testResponse = new CompletableFuture<>();
+        testResponse.complete(mockResult);
+
+        // match on protobuf request
+        when(commandManager.submitNewCommand(eq(XClaim), eq(arguments), any()))
+                .thenReturn(testResponse);
+
+        // exercise
+        CompletableFuture response =
+                service.xclaimJustId(key, groupName, consumer, minIdleTime, ids, options);
+        String[] payload = response.get();
+
+        // verify
+        assertEquals(testResponse, response);
+        assertEquals(mockResult, payload);
+    }
+
     @SneakyThrows
     @Test
     public void xack_binary_returns_success() {
diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java
index 9c70ed1886..ad9e56ac0d 100644
--- a/java/client/src/test/java/glide/api/models/TransactionTests.java
+++ b/java/client/src/test/java/glide/api/models/TransactionTests.java
@@ -31,6 +31,11 @@
 import static glide.api.models.commands.geospatial.GeoAddOptions.CHANGED_REDIS_API;
 import static glide.api.models.commands.geospatial.GeoSearchOrigin.FROMLONLAT_VALKEY_API;
 import static glide.api.models.commands.geospatial.GeoSearchOrigin.FROMMEMBER_VALKEY_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.FORCE_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.IDLE_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.JUST_ID_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.RETRY_COUNT_REDIS_API;
+import static glide.api.models.commands.stream.StreamClaimOptions.TIME_REDIS_API;
 import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_VALKEY_API;
 import static glide.api.models.commands.stream.StreamGroupOptions.MAKE_STREAM_VALKEY_API;
 import static glide.api.models.commands.stream.StreamPendingOptions.IDLE_TIME_REDIS_API;
@@ -184,6 +189,7 @@
 import static redis_request.RedisRequestOuterClass.RequestType.Wait;
 import static redis_request.RedisRequestOuterClass.RequestType.XAck;
 import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
+import static redis_request.RedisRequestOuterClass.RequestType.XClaim;
 import static redis_request.RedisRequestOuterClass.RequestType.XDel;
 import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
 import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
@@ -268,6 +274,7 @@
 import glide.api.models.commands.scan.SScanOptions;
 import glide.api.models.commands.scan.ZScanOptions;
 import glide.api.models.commands.stream.StreamAddOptions;
+import glide.api.models.commands.stream.StreamClaimOptions;
 import glide.api.models.commands.stream.StreamGroupOptions;
 import glide.api.models.commands.stream.StreamPendingOptions;
 import glide.api.models.commands.stream.StreamRange;
@@ -896,6 +903,58 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
                                 "99",
                                 "consumer")));
 
+        transaction.xclaim("key", "group", "consumer", 99L, new String[] {"12345-1", "98765-4"});
+        results.add(Pair.of(XClaim, buildArgs("key", "group", "consumer", "99", "12345-1", "98765-4")));
+
+        StreamClaimOptions claimOptions =
+                StreamClaimOptions.builder().force().idle(11L).idleUnixTime(12L).retryCount(5L).build();
+        transaction.xclaim(
+                "key", "group", "consumer", 99L, new String[] {"12345-1", "98765-4"}, claimOptions);
+        results.add(
+                Pair.of(
+                        XClaim,
+                        buildArgs(
+                                "key",
+                                "group",
+                                "consumer",
+                                "99",
+                                "12345-1",
+                                "98765-4",
+                                IDLE_REDIS_API,
+                                "11",
+                                TIME_REDIS_API,
+                                "12",
+                                RETRY_COUNT_REDIS_API,
+                                "5",
+                                FORCE_REDIS_API)));
+
+        transaction.xclaimJustId("key", "group", "consumer", 99L, new String[] {"12345-1", "98765-4"});
+        results.add(
+                Pair.of(
+                        XClaim,
+                        buildArgs("key", "group", "consumer", "99", "12345-1", "98765-4", JUST_ID_REDIS_API)));
+
+        transaction.xclaimJustId(
+                "key", "group", "consumer", 99L, new String[] {"12345-1", "98765-4"}, claimOptions);
+        results.add(
+                Pair.of(
+                        XClaim,
+                        buildArgs(
+                                "key",
+                                "group",
+                                "consumer",
+                                "99",
+                                "12345-1",
+                                "98765-4",
+                                IDLE_REDIS_API,
+                                "11",
+                                TIME_REDIS_API,
+                                "12",
+                                RETRY_COUNT_REDIS_API,
+                                "5",
+                                FORCE_REDIS_API,
+                                JUST_ID_REDIS_API)));
+
         transaction.time();
         results.add(Pair.of(Time, buildArgs()));
 
diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java
index 83f737be7f..bd2e237a17 100644
--- a/java/integTest/src/test/java/glide/SharedCommandTests.java
+++ b/java/integTest/src/test/java/glide/SharedCommandTests.java
@@ -82,6 +82,7 @@
 import glide.api.models.commands.scan.SScanOptions;
 import glide.api.models.commands.scan.ZScanOptions;
 import glide.api.models.commands.stream.StreamAddOptions;
+import glide.api.models.commands.stream.StreamClaimOptions;
 import glide.api.models.commands.stream.StreamGroupOptions;
 import glide.api.models.commands.stream.StreamPendingOptions;
 import glide.api.models.commands.stream.StreamRange.IdBound;
@@ -4618,7 +4619,7 @@ public void xack_return_failures(BaseClient client) {
     @SneakyThrows
     @ParameterizedTest(autoCloseArguments = false)
     @MethodSource("getClients")
-    public void xpending(BaseClient client) {
+    public void xpending_xclaim(BaseClient client) {
 
         String key = UUID.randomUUID().toString();
         String groupName = "group" + UUID.randomUUID();
@@ -4709,9 +4710,52 @@ public void xpending(BaseClient client) {
                 ArrayUtils.remove(pending_results_extended[4], 2));
         assertTrue((Long) pending_results_extended[4][2] >= 0L);
 
-        // acknowledge streams 2-4 and remove them from the xpending results
+        // use claim to claim stream 3 and 5 for consumer 1
+        var claimResults =
+                client.xclaim(key, groupName, consumer1, 0L, new String[] {streamid_3, streamid_5}).get();
+        assertDeepEquals(
+                Map.of(
+                        streamid_3,
+                        new String[][] {{"field3", "value3"}},
+                        streamid_5,
+                        new String[][] {{"field5", "value5"}}),
+                claimResults);
+
+        var claimResultsJustId =
+                client
+                        .xclaimJustId(key, groupName, consumer1, 0L, new String[] {streamid_3, streamid_5})
+                        .get();
+        assertArrayEquals(new String[] {streamid_3, streamid_5}, claimResultsJustId);
+
+        // add one more stream
+        String streamid_6 = client.xadd(key, Map.of("field6", "value6")).get();
+        assertNotNull(streamid_6);
+
+        // using force, we can xclaim the message without reading it
+        var claimForceResults =
+                client
+                        .xclaim(
+                                key,
+                                groupName,
+                                consumer2,
+                                0L,
+                                new String[] {streamid_6},
+                                StreamClaimOptions.builder().force().retryCount(99L).build())
+                        .get();
+        assertDeepEquals(Map.of(streamid_6, new String[][] {{"field6", "value6"}}), claimForceResults);
+
+        Object[][] forcePendingResults =
+                client.xpending(key, groupName, IdBound.of(streamid_6), IdBound.of(streamid_6), 1L).get();
+        assertEquals(streamid_6, forcePendingResults[0][0]);
+        assertEquals(consumer2, forcePendingResults[0][1]);
+        assertEquals(99L, forcePendingResults[0][3]);
+
+        // acknowledge streams 2, 3, 4, and 6 and remove them from the xpending results
         assertEquals(
-                3L, client.xack(key, groupName, new String[] {streamid_2, streamid_3, streamid_4}).get());
+                4L,
+                client
+                        .xack(key, groupName, new String[] {streamid_2, streamid_3, streamid_4, streamid_6})
+                        .get());
 
         pending_results_extended =
                 client
@@ -4719,7 +4763,7 @@ public void xpending(BaseClient client) {
                         .get();
         assertEquals(1, pending_results_extended.length);
         assertEquals(streamid_5, pending_results_extended[0][0]);
-        assertEquals(consumer2, pending_results_extended[0][1]);
+        assertEquals(consumer1, pending_results_extended[0][1]);
 
         pending_results_extended =
                 client
@@ -4737,11 +4781,10 @@ public void xpending(BaseClient client) {
                                 InfRangeBound.MIN,
                                 InfRangeBound.MAX,
                                 10L,
-                                StreamPendingOptions.builder().minIdleTime(1L).consumer(consumer2).build())
+                                StreamPendingOptions.builder().minIdleTime(1L).consumer(consumer1).build())
                         .get();
-        assertEquals(1, pending_results_extended.length);
-        assertEquals(streamid_5, pending_results_extended[0][0]);
-        assertEquals(consumer2, pending_results_extended[0][1]);
+        // note: streams ID 1 and 5 are still pending, all others were acknowledged
+        assertEquals(2, pending_results_extended.length);
     }
 
     @SneakyThrows
@@ -4903,6 +4946,149 @@ public void xpending_return_failures(BaseClient client) {
         assertInstanceOf(RequestException.class, executionException.getCause());
     }
 
+    @SneakyThrows
+    @ParameterizedTest(autoCloseArguments = false)
+    @MethodSource("getClients")
+    public void xclaim_return_failures(BaseClient client) {
+
+        String key = UUID.randomUUID().toString();
+        String stringkey = UUID.randomUUID().toString();
+        String groupName = "group" + UUID.randomUUID();
+        String zeroStreamId = "0";
+        String consumer1 = "consumer-1-" + UUID.randomUUID();
+        String consumer2 = "consumer-2-" + UUID.randomUUID();
+
+        // create group and consumer for the group
+        assertEquals(
+                OK,
+                client
+                        .xgroupCreate(
+                                key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build())
+                        .get());
+        assertTrue(client.xgroupCreateConsumer(key, groupName, consumer1).get());
+
+        // Add stream entry and mark as pending:
+        String streamid_1 = client.xadd(key, Map.of("field1", "value1")).get();
+        assertNotNull(streamid_1);
+        assertNotNull(client.xreadgroup(Map.of(key, ">"), groupName, consumer1).get());
+
+        // claim with invalid stream entry IDs
+        ExecutionException executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client.xclaimJustId(key, groupName, consumer1, 1L, new String[] {"invalid"}).get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+
+        // claim with empty stream entry IDs returns no results
+        var emptyClaim = client.xclaimJustId(key, groupName, consumer1, 1L, new String[0]).get();
+        assertEquals(0L, emptyClaim.length);
+
+        // non-existent key throws a RequestError (NOGROUP)
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xclaim(stringkey, groupName, consumer1, 1L, new String[] {streamid_1})
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+        assertTrue(executionException.getMessage().contains("NOGROUP"));
+
+        final var claimOptions = StreamClaimOptions.builder().idle(1L).build();
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xclaim(
+                                                stringkey,
+                                                groupName,
+                                                consumer1,
+                                                1L,
+                                                new String[] {streamid_1},
+                                                claimOptions)
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+        assertTrue(executionException.getMessage().contains("NOGROUP"));
+
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xclaimJustId(stringkey, groupName, consumer1, 1L, new String[] {streamid_1})
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+        assertTrue(executionException.getMessage().contains("NOGROUP"));
+
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xclaimJustId(
+                                                stringkey,
+                                                groupName,
+                                                consumer1,
+                                                1L,
+                                                new String[] {streamid_1},
+                                                claimOptions)
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+        assertTrue(executionException.getMessage().contains("NOGROUP"));
+
+        // Key exists, but it is not a stream
+        assertEquals(OK, client.set(stringkey, "bar").get());
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xclaim(stringkey, groupName, consumer1, 1L, new String[] {streamid_1})
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xclaim(
+                                                stringkey,
+                                                groupName,
+                                                consumer1,
+                                                1L,
+                                                new String[] {streamid_1},
+                                                claimOptions)
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xclaimJustId(stringkey, groupName, consumer1, 1L, new String[] {streamid_1})
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+
+        executionException =
+                assertThrows(
+                        ExecutionException.class,
+                        () ->
+                                client
+                                        .xclaimJustId(
+                                                stringkey,
+                                                groupName,
+                                                consumer1,
+                                                1L,
+                                                new String[] {streamid_1},
+                                                claimOptions)
+                                        .get());
+        assertInstanceOf(RequestException.class, executionException.getCause());
+    }
+
     @SneakyThrows
     @ParameterizedTest(autoCloseArguments = false)
     @MethodSource("getClients")
diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java
index b55c127247..8c45614e08 100644
--- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java
+++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java
@@ -46,6 +46,7 @@
 import glide.api.models.commands.scan.SScanOptions;
 import glide.api.models.commands.scan.ZScanOptions;
 import glide.api.models.commands.stream.StreamAddOptions;
+import glide.api.models.commands.stream.StreamClaimOptions;
 import glide.api.models.commands.stream.StreamGroupOptions;
 import glide.api.models.commands.stream.StreamRange;
 import glide.api.models.commands.stream.StreamRange.IdBound;
@@ -841,6 +842,22 @@ private static Object[] streamCommands(BaseTransaction transaction) {
                         groupName1,
                         consumer1,
                         StreamReadGroupOptions.builder().count(2L).build())
+                .xclaim(streamKey1, groupName1, consumer1, 0L, new String[] {"0-1"})
+                .xclaim(
+                        streamKey1,
+                        groupName1,
+                        consumer1,
+                        0L,
+                        new String[] {"0-3"},
+                        StreamClaimOptions.builder().force().build())
+                .xclaimJustId(streamKey1, groupName1, consumer1, 0L, new String[] {"0-3"})
+                .xclaimJustId(
+                        streamKey1,
+                        groupName1,
+                        consumer1,
+                        0L,
+                        new String[] {"0-4"},
+                        StreamClaimOptions.builder().force().build())
                 .xpending(streamKey1, groupName1)
                 .xack(streamKey1, groupName1, new String[] {"0-3"})
                 .xpending(
@@ -894,6 +911,12 @@ private static Object[] streamCommands(BaseTransaction transaction) {
             Map.of(
                     streamKey1,
                     Map.of()), // xreadgroup(Map.of(streamKey1, ">"), groupName1, consumer1, options);
+            Map.of(), // xclaim(streamKey1, groupName1, consumer1, 0L, new String[] {"0-1"})
+            Map.of(
+                    "0-3",
+                    new String[][] {{"field3", "value3"}}), // xclaim(streamKey1, ..., {"0-3"}, options)
+            new String[] {"0-3"}, // xclaimJustId(streamKey1, ..., new String[] {"0-3"})
+            new String[0], // xclaimJustId(streamKey1, ..., new String[] {"0-4"}, options)
             new Object[] {
                 1L, "0-3", "0-3", new Object[][] {{consumer1, "1"}}
             }, // xpending(streamKey1, groupName1)