Skip to content

Commit

Permalink
feat: add txn classes
Browse files Browse the repository at this point in the history
  • Loading branch information
iGxnon committed Mar 4, 2024
1 parent 79ea74b commit c7bd13d
Show file tree
Hide file tree
Showing 7 changed files with 489 additions and 33 deletions.
4 changes: 2 additions & 2 deletions jxline-core/src/main/java/cloud/xline/jxline/Txn.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package cloud.xline.jxline;

import cloud.xline.jxline.kv.TxnResponse;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.Op;
import cloud.xline.jxline.op.Cmp;
import cloud.xline.jxline.op.Op;

import java.util.concurrent.CompletableFuture;

Expand Down
6 changes: 3 additions & 3 deletions jxline-core/src/main/java/cloud/xline/jxline/impl/KVImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public CompletableFuture<PutResponse> put(
requireNonNull(value, "value should not be null");
requireNonNull(option, "option should not be null");
Command cmd =
Requests.mapPutRequest(key, value, option, this.connectionManager().getNamespace());
Requests.mapPutCommand(key, value, option, this.connectionManager().getNamespace());
return protocolClient.propose(
cmd,
true,
Expand All @@ -55,7 +55,7 @@ public CompletableFuture<GetResponse> get(ByteSequence key, GetOption option) {
requireNonNull(key, "key should not be null");
requireNonNull(option, "option should not be null");
Command cmd =
Requests.mapRangeRequest(key, option, this.connectionManager().getNamespace());
Requests.mapRangeCommand(key, option, this.connectionManager().getNamespace());
return protocolClient.propose(
cmd,
true,
Expand All @@ -73,7 +73,7 @@ public CompletableFuture<DeleteResponse> delete(ByteSequence key, DeleteOption o
requireNonNull(key, "key should not be null");
requireNonNull(option, "option should not be null");
Command cmd =
Requests.mapDeleteRequest(key, option, this.connectionManager().getNamespace());
Requests.mapDeleteCommand(key, option, this.connectionManager().getNamespace());
return protocolClient.propose(
cmd,
true,
Expand Down
73 changes: 73 additions & 0 deletions jxline-core/src/main/java/cloud/xline/jxline/op/Cmp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package cloud.xline.jxline.op;

import cloud.xline.jxline.Txn;
import com.google.protobuf.ByteString;
import com.xline.protobuf.Compare;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.support.Util;


/** The compare predicate in {@link Txn}. */
public class Cmp {

public enum Op {
EQUAL,
GREATER,
LESS,
NOT_EQUAL
}

private final ByteString key;
private final Op op;
private final CmpTarget<?> target;

public Cmp(ByteSequence key, Op compareOp, CmpTarget<?> target) {
this.key = ByteString.copyFrom(key.getBytes());
this.op = compareOp;
this.target = target;
}

Compare toCompare(ByteSequence namespace) {
Compare.Builder compareBuilder =
Compare.newBuilder().setKey(Util.prefixNamespace(this.key, namespace));
switch (this.op) {
case EQUAL:
compareBuilder.setResult(Compare.CompareResult.EQUAL);
break;
case GREATER:
compareBuilder.setResult(Compare.CompareResult.GREATER);
break;
case LESS:
compareBuilder.setResult(Compare.CompareResult.LESS);
break;
case NOT_EQUAL:
compareBuilder.setResult(Compare.CompareResult.NOT_EQUAL);
break;
default:
throw new IllegalArgumentException("Unexpected compare type (" + this.op + ")");
}

Compare.CompareTarget target = this.target.getTarget();
Object value = this.target.getTargetValue();

compareBuilder.setTarget(target);
switch (target) {
case VERSION:
compareBuilder.setVersion((Long) value);
break;
case VALUE:
compareBuilder.setValue((ByteString) value);
break;
case MOD:
compareBuilder.setModRevision((Long) value);
break;
case CREATE:
compareBuilder.setCreateRevision((Long) value);
break;
default:
throw new IllegalArgumentException("Unexpected target type (" + target + ")");
}

return compareBuilder.build();
}
}
108 changes: 108 additions & 0 deletions jxline-core/src/main/java/cloud/xline/jxline/op/CmpTarget.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package cloud.xline.jxline.op;


import cloud.xline.jxline.Txn;
import com.google.protobuf.ByteString;
import com.xline.protobuf.Compare;
import io.etcd.jetcd.ByteSequence;

/**
* Cmp target used in {@link Txn}.
*/
public abstract class CmpTarget<T> {

/**
* Cmp on a given <i>version</i>.
*
* @param version version to compare
* @return the version compare target
*/
public static VersionCmpTarget version(long version) {
return new VersionCmpTarget(version);
}

/**
* Cmp on the create <i>revision</i>.
*
* @param revision the create revision
* @return the create revision compare target
*/
public static CreateRevisionCmpTarget createRevision(long revision) {
return new CreateRevisionCmpTarget(revision);
}

/**
* Cmp on the modification <i>revision</i>.
*
* @param revision the modification revision
* @return the modification revision compare target
*/
public static ModRevisionCmpTarget modRevision(long revision) {
return new ModRevisionCmpTarget(revision);
}

/**
* Cmp on the <i>value</i>.
*
* @param value the value to compare
* @return the value compare target
*/
public static ValueCmpTarget value(ByteSequence value) {
return new ValueCmpTarget(ByteString.copyFrom(value.getBytes()));
}

private final Compare.CompareTarget target;
private final T targetValue;

protected CmpTarget(Compare.CompareTarget target, T targetValue) {
this.target = target;
this.targetValue = targetValue;
}

/**
* Get the compare target used for this compare.
*
* @return the compare target used for this compare
*/
public Compare.CompareTarget getTarget() {
return target;
}

/**
* Get the compare target value of this compare.
*
* @return the compare target value of this compare.
*/
public T getTargetValue() {
return targetValue;
}

public static final class VersionCmpTarget extends CmpTarget<Long> {

VersionCmpTarget(Long targetValue) {
super(Compare.CompareTarget.VERSION, targetValue);
}
}

public static final class CreateRevisionCmpTarget extends CmpTarget<Long> {

CreateRevisionCmpTarget(Long targetValue) {
super(Compare.CompareTarget.CREATE, targetValue);
}
}

public static final class ModRevisionCmpTarget extends CmpTarget<Long> {

ModRevisionCmpTarget(Long targetValue) {
super(Compare.CompareTarget.MOD, targetValue);
}
}

public static final class ValueCmpTarget extends CmpTarget<ByteString> {

ValueCmpTarget(ByteString targetValue) {
super(Compare.CompareTarget.VALUE, targetValue);
}
}

}
151 changes: 151 additions & 0 deletions jxline-core/src/main/java/cloud/xline/jxline/op/Op.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package cloud.xline.jxline.op;

import cloud.xline.jxline.support.Requests;
import com.google.protobuf.ByteString;
import com.xline.protobuf.DeleteRangeRequest;
import com.xline.protobuf.RequestOp;
import com.xline.protobuf.TxnRequest;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.support.Util;

/** Copied From Etcd Operation. */
public abstract class Op {

/** Operation type. */
public enum Type {
PUT,
RANGE,
DELETE_RANGE,
TXN
}

protected final Type type;
protected final ByteString key;

protected Op(Type type, ByteString key) {
this.type = type;
this.key = key;
}

abstract RequestOp toRequestOp(ByteSequence namespace);

public static PutOp put(ByteSequence key, ByteSequence value, PutOption option) {
return new PutOp(
ByteString.copyFrom(key.getBytes()), ByteString.copyFrom(value.getBytes()), option);
}

public static GetOp get(ByteSequence key, GetOption option) {
return new GetOp(ByteString.copyFrom(key.getBytes()), option);
}

public static DeleteOp delete(ByteSequence key, DeleteOption option) {
return new DeleteOp(ByteString.copyFrom(key.getBytes()), option);
}

public static TxnOp txn(Cmp[] cmps, Op[] thenOps, Op[] elseOps) {
return new TxnOp(cmps, thenOps, elseOps);
}

public static final class PutOp extends Op {

private final ByteString value;
private final PutOption option;

private PutOp(ByteString key, ByteString value, PutOption option) {
super(Type.PUT, key);
this.value = value;
this.option = option;
}

@Override
RequestOp toRequestOp(ByteSequence namespace) {
return RequestOp.newBuilder()
.setRequestPut(
Requests.mapPutRequest(
ByteSequence.from(key),
ByteSequence.from(value),
option,
namespace))
.build();
}
}

public static final class GetOp extends Op {

private final GetOption option;

private GetOp(ByteString key, GetOption option) {
super(Type.RANGE, key);
this.option = option;
}

@Override
RequestOp toRequestOp(ByteSequence namespace) {
return RequestOp.newBuilder()
.setRequestRange(
Requests.mapRangeRequest(ByteSequence.from(key), option, namespace))
.build();
}
}

public static final class DeleteOp extends Op {

private final DeleteOption option;

DeleteOp(ByteString key, DeleteOption option) {
super(Type.DELETE_RANGE, key);
this.option = option;
}

@Override
RequestOp toRequestOp(ByteSequence namespace) {
return RequestOp.newBuilder()
.setRequestDeleteRange(
DeleteRangeRequest.newBuilder()
.setKey(Util.prefixNamespace(key, namespace))
.setPrevKv(option.isPrevKV()))
.build();
}
}

public static final class TxnOp extends Op {
private final Cmp[] cmps;
private final Op[] thenOps;
private final Op[] elseOps;

private TxnOp(Cmp[] cmps, Op[] thenOps, Op[] elseOps) {
super(Type.TXN, null);
this.cmps = cmps;
this.thenOps = thenOps;
this.elseOps = elseOps;
}

@Override
RequestOp toRequestOp(ByteSequence namespace) {
TxnRequest.Builder txn = TxnRequest.newBuilder();

if (cmps != null) {
for (Cmp cmp : cmps) {
txn.addCompare(cmp.toCompare(namespace));
}
}

if (thenOps != null) {
for (Op thenOp : thenOps) {
txn.addSuccess(thenOp.toRequestOp(namespace));
}
}

if (elseOps != null) {
for (Op elseOp : elseOps) {
txn.addFailure(elseOp.toRequestOp(namespace));
}
}

return RequestOp.newBuilder().setRequestTxn(txn).build();
}
}
}
Loading

0 comments on commit c7bd13d

Please sign in to comment.