From c7bd13d8921d2a203afdf865e0df681a3d9e589f Mon Sep 17 00:00:00 2001 From: iGxnon Date: Mon, 4 Mar 2024 10:32:36 +0800 Subject: [PATCH] feat: add txn classes --- .../src/main/java/cloud/xline/jxline/Txn.java | 4 +- .../java/cloud/xline/jxline/impl/KVImpl.java | 6 +- .../main/java/cloud/xline/jxline/op/Cmp.java | 73 +++++++++ .../java/cloud/xline/jxline/op/CmpTarget.java | 108 +++++++++++++ .../main/java/cloud/xline/jxline/op/Op.java | 151 ++++++++++++++++++ .../java/cloud/xline/jxline/op/TxnImpl.java | 112 +++++++++++++ .../cloud/xline/jxline/support/Requests.java | 68 ++++---- 7 files changed, 489 insertions(+), 33 deletions(-) create mode 100644 jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java create mode 100644 jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java create mode 100644 jxline-core/src/main/java/cloud/xline/jxline/op/Op.java create mode 100644 jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java diff --git a/jxline-core/src/main/java/cloud/xline/jxline/Txn.java b/jxline-core/src/main/java/cloud/xline/jxline/Txn.java index 1f631d0..635fb5e 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/Txn.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/Txn.java @@ -17,8 +17,8 @@ package cloud.xline.jxline; import cloud.xline.jxline.kv.TxnResponse; -import io.etcd.jetcd.op.Cmp; -import io.etcd.jetcd.op.Op; +import cloud.xline.jxline.op.Cmp; +import cloud.xline.jxline.op.Op; import java.util.concurrent.CompletableFuture; diff --git a/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java index da2379d..233da92 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java @@ -37,7 +37,7 @@ public CompletableFuture put( requireNonNull(value, "value should not be null"); requireNonNull(option, "option should not be null"); Command cmd = - Requests.mapPutRequest(key, value, option, this.connectionManager().getNamespace()); + Requests.mapPutCommand(key, value, option, this.connectionManager().getNamespace()); return protocolClient.propose( cmd, true, @@ -55,7 +55,7 @@ public CompletableFuture get(ByteSequence key, GetOption option) { requireNonNull(key, "key should not be null"); requireNonNull(option, "option should not be null"); Command cmd = - Requests.mapRangeRequest(key, option, this.connectionManager().getNamespace()); + Requests.mapRangeCommand(key, option, this.connectionManager().getNamespace()); return protocolClient.propose( cmd, true, @@ -73,7 +73,7 @@ public CompletableFuture delete(ByteSequence key, DeleteOption o requireNonNull(key, "key should not be null"); requireNonNull(option, "option should not be null"); Command cmd = - Requests.mapDeleteRequest(key, option, this.connectionManager().getNamespace()); + Requests.mapDeleteCommand(key, option, this.connectionManager().getNamespace()); return protocolClient.propose( cmd, true, diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java b/jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java new file mode 100644 index 0000000..5f92dd6 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java @@ -0,0 +1,73 @@ +package cloud.xline.jxline.op; + +import cloud.xline.jxline.Txn; +import com.google.protobuf.ByteString; +import com.xline.protobuf.Compare; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.support.Util; + + +/** The compare predicate in {@link Txn}. */ +public class Cmp { + + public enum Op { + EQUAL, + GREATER, + LESS, + NOT_EQUAL + } + + private final ByteString key; + private final Op op; + private final CmpTarget target; + + public Cmp(ByteSequence key, Op compareOp, CmpTarget target) { + this.key = ByteString.copyFrom(key.getBytes()); + this.op = compareOp; + this.target = target; + } + + Compare toCompare(ByteSequence namespace) { + Compare.Builder compareBuilder = + Compare.newBuilder().setKey(Util.prefixNamespace(this.key, namespace)); + switch (this.op) { + case EQUAL: + compareBuilder.setResult(Compare.CompareResult.EQUAL); + break; + case GREATER: + compareBuilder.setResult(Compare.CompareResult.GREATER); + break; + case LESS: + compareBuilder.setResult(Compare.CompareResult.LESS); + break; + case NOT_EQUAL: + compareBuilder.setResult(Compare.CompareResult.NOT_EQUAL); + break; + default: + throw new IllegalArgumentException("Unexpected compare type (" + this.op + ")"); + } + + Compare.CompareTarget target = this.target.getTarget(); + Object value = this.target.getTargetValue(); + + compareBuilder.setTarget(target); + switch (target) { + case VERSION: + compareBuilder.setVersion((Long) value); + break; + case VALUE: + compareBuilder.setValue((ByteString) value); + break; + case MOD: + compareBuilder.setModRevision((Long) value); + break; + case CREATE: + compareBuilder.setCreateRevision((Long) value); + break; + default: + throw new IllegalArgumentException("Unexpected target type (" + target + ")"); + } + + return compareBuilder.build(); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java b/jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java new file mode 100644 index 0000000..96ec1fb --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java @@ -0,0 +1,108 @@ +package cloud.xline.jxline.op; + + +import cloud.xline.jxline.Txn; +import com.google.protobuf.ByteString; +import com.xline.protobuf.Compare; +import io.etcd.jetcd.ByteSequence; + +/** + * Cmp target used in {@link Txn}. + */ +public abstract class CmpTarget { + + /** + * Cmp on a given version. + * + * @param version version to compare + * @return the version compare target + */ + public static VersionCmpTarget version(long version) { + return new VersionCmpTarget(version); + } + + /** + * Cmp on the create revision. + * + * @param revision the create revision + * @return the create revision compare target + */ + public static CreateRevisionCmpTarget createRevision(long revision) { + return new CreateRevisionCmpTarget(revision); + } + + /** + * Cmp on the modification revision. + * + * @param revision the modification revision + * @return the modification revision compare target + */ + public static ModRevisionCmpTarget modRevision(long revision) { + return new ModRevisionCmpTarget(revision); + } + + /** + * Cmp on the value. + * + * @param value the value to compare + * @return the value compare target + */ + public static ValueCmpTarget value(ByteSequence value) { + return new ValueCmpTarget(ByteString.copyFrom(value.getBytes())); + } + + private final Compare.CompareTarget target; + private final T targetValue; + + protected CmpTarget(Compare.CompareTarget target, T targetValue) { + this.target = target; + this.targetValue = targetValue; + } + + /** + * Get the compare target used for this compare. + * + * @return the compare target used for this compare + */ + public Compare.CompareTarget getTarget() { + return target; + } + + /** + * Get the compare target value of this compare. + * + * @return the compare target value of this compare. + */ + public T getTargetValue() { + return targetValue; + } + + public static final class VersionCmpTarget extends CmpTarget { + + VersionCmpTarget(Long targetValue) { + super(Compare.CompareTarget.VERSION, targetValue); + } + } + + public static final class CreateRevisionCmpTarget extends CmpTarget { + + CreateRevisionCmpTarget(Long targetValue) { + super(Compare.CompareTarget.CREATE, targetValue); + } + } + + public static final class ModRevisionCmpTarget extends CmpTarget { + + ModRevisionCmpTarget(Long targetValue) { + super(Compare.CompareTarget.MOD, targetValue); + } + } + + public static final class ValueCmpTarget extends CmpTarget { + + ValueCmpTarget(ByteString targetValue) { + super(Compare.CompareTarget.VALUE, targetValue); + } + } + +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/Op.java b/jxline-core/src/main/java/cloud/xline/jxline/op/Op.java new file mode 100644 index 0000000..e6c65e2 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/Op.java @@ -0,0 +1,151 @@ +package cloud.xline.jxline.op; + +import cloud.xline.jxline.support.Requests; +import com.google.protobuf.ByteString; +import com.xline.protobuf.DeleteRangeRequest; +import com.xline.protobuf.RequestOp; +import com.xline.protobuf.TxnRequest; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.options.DeleteOption; +import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.PutOption; +import io.etcd.jetcd.support.Util; + +/** Copied From Etcd Operation. */ +public abstract class Op { + + /** Operation type. */ + public enum Type { + PUT, + RANGE, + DELETE_RANGE, + TXN + } + + protected final Type type; + protected final ByteString key; + + protected Op(Type type, ByteString key) { + this.type = type; + this.key = key; + } + + abstract RequestOp toRequestOp(ByteSequence namespace); + + public static PutOp put(ByteSequence key, ByteSequence value, PutOption option) { + return new PutOp( + ByteString.copyFrom(key.getBytes()), ByteString.copyFrom(value.getBytes()), option); + } + + public static GetOp get(ByteSequence key, GetOption option) { + return new GetOp(ByteString.copyFrom(key.getBytes()), option); + } + + public static DeleteOp delete(ByteSequence key, DeleteOption option) { + return new DeleteOp(ByteString.copyFrom(key.getBytes()), option); + } + + public static TxnOp txn(Cmp[] cmps, Op[] thenOps, Op[] elseOps) { + return new TxnOp(cmps, thenOps, elseOps); + } + + public static final class PutOp extends Op { + + private final ByteString value; + private final PutOption option; + + private PutOp(ByteString key, ByteString value, PutOption option) { + super(Type.PUT, key); + this.value = value; + this.option = option; + } + + @Override + RequestOp toRequestOp(ByteSequence namespace) { + return RequestOp.newBuilder() + .setRequestPut( + Requests.mapPutRequest( + ByteSequence.from(key), + ByteSequence.from(value), + option, + namespace)) + .build(); + } + } + + public static final class GetOp extends Op { + + private final GetOption option; + + private GetOp(ByteString key, GetOption option) { + super(Type.RANGE, key); + this.option = option; + } + + @Override + RequestOp toRequestOp(ByteSequence namespace) { + return RequestOp.newBuilder() + .setRequestRange( + Requests.mapRangeRequest(ByteSequence.from(key), option, namespace)) + .build(); + } + } + + public static final class DeleteOp extends Op { + + private final DeleteOption option; + + DeleteOp(ByteString key, DeleteOption option) { + super(Type.DELETE_RANGE, key); + this.option = option; + } + + @Override + RequestOp toRequestOp(ByteSequence namespace) { + return RequestOp.newBuilder() + .setRequestDeleteRange( + DeleteRangeRequest.newBuilder() + .setKey(Util.prefixNamespace(key, namespace)) + .setPrevKv(option.isPrevKV())) + .build(); + } + } + + public static final class TxnOp extends Op { + private final Cmp[] cmps; + private final Op[] thenOps; + private final Op[] elseOps; + + private TxnOp(Cmp[] cmps, Op[] thenOps, Op[] elseOps) { + super(Type.TXN, null); + this.cmps = cmps; + this.thenOps = thenOps; + this.elseOps = elseOps; + } + + @Override + RequestOp toRequestOp(ByteSequence namespace) { + TxnRequest.Builder txn = TxnRequest.newBuilder(); + + if (cmps != null) { + for (Cmp cmp : cmps) { + txn.addCompare(cmp.toCompare(namespace)); + } + } + + if (thenOps != null) { + for (Op thenOp : thenOps) { + txn.addSuccess(thenOp.toRequestOp(namespace)); + } + } + + if (elseOps != null) { + for (Op elseOp : elseOps) { + txn.addFailure(elseOp.toRequestOp(namespace)); + } + } + + return RequestOp.newBuilder().setRequestTxn(txn).build(); + } + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java b/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java new file mode 100644 index 0000000..4cee5c0 --- /dev/null +++ b/jxline-core/src/main/java/cloud/xline/jxline/op/TxnImpl.java @@ -0,0 +1,112 @@ +package cloud.xline.jxline.op; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import cloud.xline.jxline.Txn; +import cloud.xline.jxline.kv.TxnResponse; +import com.xline.protobuf.TxnRequest; +import io.etcd.jetcd.ByteSequence; + +import com.google.common.annotations.VisibleForTesting; + +/** Build a transaction. */ +public class TxnImpl implements Txn { + + public static TxnImpl newTxn( + Function> f, ByteSequence namespace) { + return new TxnImpl(f, namespace); + } + + @VisibleForTesting + static TxnImpl newTxn(Function> f) { + return newTxn(f, ByteSequence.EMPTY); + } + + private final ByteSequence namespace; + + private final List cmpList = new ArrayList<>(); + private final List successOpList = new ArrayList<>(); + private final List failureOpList = new ArrayList<>(); + private final Function> requestF; + + private boolean seenThen = false; + private boolean seenElse = false; + + private TxnImpl( + Function> f, ByteSequence namespace) { + this.requestF = f; + this.namespace = namespace; + } + + @Override + public TxnImpl If(Cmp... cmps) { + return If(Arrays.asList(cmps)); + } + + TxnImpl If(List cmps) { + if (this.seenThen) { + throw new IllegalArgumentException("cannot call If after Then!"); + } + if (this.seenElse) { + throw new IllegalArgumentException("cannot call If after Else!"); + } + + cmpList.addAll(cmps); + return this; + } + + @Override + public TxnImpl Then(Op... ops) { + return Then(Arrays.asList(ops)); + } + + TxnImpl Then(List ops) { + if (this.seenElse) { + throw new IllegalArgumentException("cannot call Then after Else!"); + } + + this.seenThen = true; + + successOpList.addAll(ops); + return this; + } + + @Override + public TxnImpl Else(Op... ops) { + return Else(Arrays.asList(ops)); + } + + TxnImpl Else(List ops) { + this.seenElse = true; + + failureOpList.addAll(ops); + return this; + } + + @Override + public CompletableFuture commit() { + return this.requestF.apply(this.toTxnRequest()); + } + + private TxnRequest toTxnRequest() { + TxnRequest.Builder requestBuilder = TxnRequest.newBuilder(); + + for (Cmp c : this.cmpList) { + requestBuilder.addCompare(c.toCompare(namespace)); + } + + for (Op o : this.successOpList) { + requestBuilder.addSuccess(o.toRequestOp(namespace)); + } + + for (Op o : this.failureOpList) { + requestBuilder.addFailure(o.toRequestOp(namespace)); + } + + return requestBuilder.build(); + } +} diff --git a/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java b/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java index 9ed28fc..9f9bb77 100644 --- a/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java +++ b/jxline-core/src/main/java/cloud/xline/jxline/support/Requests.java @@ -10,15 +10,20 @@ import java.util.function.Consumer; public final class Requests { - public static Command mapPutRequest( + + public static PutRequest mapPutRequest( ByteSequence key, ByteSequence value, PutOption option, ByteSequence namespace) { - PutRequest req = - PutRequest.newBuilder() - .setKey(Util.prefixNamespace(key, namespace)) - .setValue(ByteString.copyFrom(value.getBytes())) - .setLease(option.getLeaseId()) - .setPrevKv(option.getPrevKV()) - .build(); + return PutRequest.newBuilder() + .setKey(Util.prefixNamespace(key, namespace)) + .setValue(ByteString.copyFrom(value.getBytes())) + .setLease(option.getLeaseId()) + .setPrevKv(option.getPrevKV()) + .build(); + } + + public static Command mapPutCommand( + ByteSequence key, ByteSequence value, PutOption option, ByteSequence namespace) { + PutRequest req = mapPutRequest(key, value, option, namespace); return Command.newBuilder() .addKeys(KeyRange.newBuilder().setKey(ByteString.copyFrom(key.getBytes())).build()) .setRequest( @@ -26,22 +31,26 @@ public static Command mapPutRequest( .build(); } - public static Command mapRangeRequest( + public static RangeRequest.Builder mapRangeRequest( ByteSequence key, GetOption option, ByteSequence namespace) { - RangeRequest.Builder builder = - RangeRequest.newBuilder() - .setKey(Util.prefixNamespace(key, namespace)) - .setCountOnly(option.isCountOnly()) - .setLimit(option.getLimit()) - .setRevision(option.getRevision()) - .setKeysOnly(option.isKeysOnly()) - .setSerializable(option.isSerializable()) - .setSortOrder(toRangeRequestSortOrder(option.getSortOrder())) - .setSortTarget(toRangeRequestSortTarget(option.getSortField())) - .setMinCreateRevision(option.getMinCreateRevision()) - .setMaxCreateRevision(option.getMaxCreateRevision()) - .setMinModRevision(option.getMinModRevision()) - .setMaxModRevision(option.getMaxModRevision()); + return RangeRequest.newBuilder() + .setKey(Util.prefixNamespace(key, namespace)) + .setCountOnly(option.isCountOnly()) + .setLimit(option.getLimit()) + .setRevision(option.getRevision()) + .setKeysOnly(option.isKeysOnly()) + .setSerializable(option.isSerializable()) + .setSortOrder(toRangeRequestSortOrder(option.getSortOrder())) + .setSortTarget(toRangeRequestSortTarget(option.getSortField())) + .setMinCreateRevision(option.getMinCreateRevision()) + .setMaxCreateRevision(option.getMaxCreateRevision()) + .setMinModRevision(option.getMinModRevision()) + .setMaxModRevision(option.getMaxModRevision()); + } + + public static Command mapRangeCommand( + ByteSequence key, GetOption option, ByteSequence namespace) { + RangeRequest.Builder builder = mapRangeRequest(key, option, namespace); defineRangeRequestEnd( key, option.getEndKey(), option.isPrefix(), namespace, builder::setRangeEnd); @@ -54,13 +63,16 @@ public static Command mapRangeRequest( .build(); } - public static Command mapDeleteRequest( + public static DeleteRangeRequest.Builder mapDeleteRequest( ByteSequence key, DeleteOption option, ByteSequence namespace) { - DeleteRangeRequest.Builder builder = - DeleteRangeRequest.newBuilder() - .setKey(Util.prefixNamespace(key, namespace)) - .setPrevKv(option.isPrevKV()); + return DeleteRangeRequest.newBuilder() + .setKey(Util.prefixNamespace(key, namespace)) + .setPrevKv(option.isPrevKV()); + } + public static Command mapDeleteCommand( + ByteSequence key, DeleteOption option, ByteSequence namespace) { + DeleteRangeRequest.Builder builder = mapDeleteRequest(key, option, namespace); defineRangeRequestEnd( key, option.getEndKey(), option.isPrefix(), namespace, builder::setRangeEnd);